Folly::FutureDAG <-> Gossit
[folly.git] / folly / experimental / FutureDAG.h
index ba5b17756dde1e04dea478d554a36abc7d4afcbb..a825e617aa10f444b7a2c00a7728c79b997bbf75 100644 (file)
@@ -34,11 +34,58 @@ class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
     return nodes.size() - 1;
   }
 
+  void remove(Handle a) {
+    if (nodes.size() > a && nodes[a].hasDependents) {
+      for (auto& node : nodes) {
+        auto& deps = node.dependencies;
+        deps.erase(
+            std::remove(std::begin(deps), std::end(deps), a), std::end(deps));
+        for (Handle& handle : deps) {
+          if (handle > a) {
+            handle--;
+          }
+        }
+      }
+    }
+    nodes.erase(nodes.begin() + a);
+  }
+
+  void reset() {
+    // Delete all but source node, and reset dependency properties
+    Handle source_node;
+    std::unordered_set<Handle> memo;
+    for (auto& node : nodes) {
+      for (Handle handle : node.dependencies) {
+        memo.insert(handle);
+      }
+    }
+    for (Handle handle = 0; handle < nodes.size(); handle++) {
+      if (memo.find(handle) == memo.end()) {
+        source_node = handle;
+      }
+    }
+
+    // Faster to just create a new vector with the element in it?
+    nodes.erase(nodes.begin(), nodes.begin() + source_node);
+    nodes.erase(nodes.begin() + 1, nodes.end());
+    nodes[0].hasDependents = false;
+    nodes[0].dependencies.clear();
+  }
+
   void dependency(Handle a, Handle b) {
     nodes[b].dependencies.push_back(a);
     nodes[a].hasDependents = true;
   }
 
+  void clean_state(Handle source, Handle sink) {
+    for (auto handle : nodes[sink].dependencies) {
+      nodes[handle].hasDependents = false;
+    }
+    nodes[0].hasDependents = false;
+    remove(source);
+    remove(sink);
+  }
+
   Future<Unit> go() {
     if (hasCycle()) {
       return makeFuture<Unit>(std::runtime_error("Cycle in FutureDAG graph"));
@@ -71,21 +118,23 @@ class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
       }
 
       collect(dependencies)
-        .via(nodes[handle].executor)
-        .then([this, handle] {
-          nodes[handle].func()
-            .then([this, handle] (Try<Unit>&& t) {
+          .via(nodes[handle].executor)
+          .then([this, handle] {
+            nodes[handle].func().then([this, handle](Try<Unit>&& t) {
               nodes[handle].promise.setTry(std::move(t));
             });
-        })
-        .onError([this, handle] (exception_wrapper ew) {
-          nodes[handle].promise.setException(std::move(ew));
-        });
+          })
+          .onError([this, handle](exception_wrapper ew) {
+            nodes[handle].promise.setException(std::move(ew));
+          });
     }
 
     nodes[sourceHandle].promise.setValue();
     auto that = shared_from_this();
-    return nodes[sinkHandle].promise.getFuture().ensure([that]{});
+    return nodes[sinkHandle].promise.getFuture().ensure([that] {}).then(
+        [this, sourceHandle, sinkHandle]() {
+          clean_state(sourceHandle, sinkHandle);
+        });
   }
 
  private:
@@ -134,8 +183,8 @@ class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
   }
 
   struct Node {
-    Node(FutureFunc&& funcArg, Executor* executorArg) :
-      func(std::move(funcArg)), executor(executorArg) {}
+    Node(FutureFunc&& funcArg, Executor* executorArg)
+        : func(std::move(funcArg)), executor(executorArg) {}
 
     FutureFunc func{nullptr};
     Executor* executor{nullptr};