Fix ASAN failure in FutureDAG test
[folly.git] / folly / experimental / FutureDAG.h
index c9ca715d5d05391ad17b130bd2c7a0281e2d9337..ee3dcffbf649f6f74d6953f50f9492640ca39a19 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -34,11 +34,62 @@ class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
     return nodes.size() - 1;
   }
 
+  void remove(Handle a) {
+    if (a >= nodes.size()) {
+      return;
+    }
+
+    if (nodes[a].hasDependents) {
+      for (auto& node : nodes) {
+        auto& deps = node.dependencies;
+        deps.erase(
+            std::remove(std::begin(deps), std::end(deps), a), std::end(deps));
+        for (Handle& handle : deps) {
+          if (handle > a) {
+            handle--;
+          }
+        }
+      }
+    }
+
+    nodes.erase(nodes.begin() + a);
+  }
+
+  void reset() {
+    // Delete all but source node, and reset dependency properties
+    Handle source_node;
+    std::unordered_set<Handle> memo;
+    for (auto& node : nodes) {
+      for (Handle handle : node.dependencies) {
+        memo.insert(handle);
+      }
+    }
+    for (Handle handle = 0; handle < nodes.size(); handle++) {
+      if (memo.find(handle) == memo.end()) {
+        source_node = handle;
+      }
+    }
+
+    nodes.erase(nodes.begin(), nodes.begin() + source_node);
+    nodes.erase(nodes.begin() + 1, nodes.end());
+    nodes[0].hasDependents = false;
+    nodes[0].dependencies.clear();
+  }
+
   void dependency(Handle a, Handle b) {
     nodes[b].dependencies.push_back(a);
     nodes[a].hasDependents = true;
   }
 
+  void clean_state(Handle source, Handle sink) {
+    for (auto handle : nodes[sink].dependencies) {
+      nodes[handle].hasDependents = false;
+    }
+    nodes[0].hasDependents = false;
+    remove(source);
+    remove(sink);
+  }
+
   Future<Unit> go() {
     if (hasCycle()) {
       return makeFuture<Unit>(std::runtime_error("Cycle in FutureDAG graph"));
@@ -71,21 +122,22 @@ class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
       }
 
       collect(dependencies)
-        .via(nodes[handle].executor)
-        .then([this, handle] {
-          nodes[handle].func()
-            .then([this, handle] (Try<Unit>&& t) {
+          .via(nodes[handle].executor)
+          .then([this, handle] {
+            nodes[handle].func().then([this, handle](Try<Unit>&& t) {
               nodes[handle].promise.setTry(std::move(t));
             });
-        })
-        .onError([this, handle] (exception_wrapper ew) {
-          nodes[handle].promise.setException(std::move(ew));
-        });
+          })
+          .onError([this, handle](exception_wrapper ew) {
+            nodes[handle].promise.setException(std::move(ew));
+          });
     }
 
     nodes[sourceHandle].promise.setValue();
-    auto that = shared_from_this();
-    return nodes[sinkHandle].promise.getFuture().ensure([that]{});
+    return nodes[sinkHandle].promise.getFuture().then(
+        [that = shared_from_this(), sourceHandle, sinkHandle]() {
+          that->clean_state(sourceHandle, sinkHandle);
+        });
   }
 
  private:
@@ -134,8 +186,8 @@ class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
   }
 
   struct Node {
-    Node(FutureFunc&& funcArg, Executor* executorArg) :
-      func(std::move(funcArg)), executor(executorArg) {}
+    Node(FutureFunc&& funcArg, Executor* executorArg)
+        : func(std::move(funcArg)), executor(executorArg) {}
 
     FutureFunc func{nullptr};
     Executor* executor{nullptr};
@@ -148,4 +200,28 @@ class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
   std::vector<Node> nodes;
 };
 
-} // folly
+// Polymorphic functor implementation
+template <typename T>
+class FutureDAGFunctor {
+ public:
+  std::shared_ptr<FutureDAG> dag = FutureDAG::create();
+  T state;
+  std::vector<T> dep_states;
+  T result() {
+    return state;
+  }
+  // execReset() runs DAG & clears all nodes except for source
+  void execReset() {
+    this->dag->go().get();
+    this->dag->reset();
+  }
+  void exec() {
+    this->dag->go().get();
+  }
+  virtual void operator()(){}
+  explicit FutureDAGFunctor(T init_val) : state(init_val) {}
+  FutureDAGFunctor() : state() {}
+  virtual ~FutureDAGFunctor(){}
+};
+
+} // namespace folly