Folly::FutureDAG <-> Gossit
authorShayan Mohanty <shayanjm@fb.com>
Sat, 9 Jul 2016 00:12:44 +0000 (17:12 -0700)
committerFacebook Github Bot 1 <facebook-github-bot-1-bot@fb.com>
Sat, 9 Jul 2016 00:24:10 +0000 (17:24 -0700)
Summary:
Implements remove(), state_clean(), and reset() functions in order to allow for static FutureDAGS that can be modified in place and executed multiple times.

remove() removes the given handle from the nodes vector and cleans up all dependencies associated with it. Because of the way Handles are implemented, all Handles greater than the one removed are decremented (and therefore must be accounted for in the client-code). Current best-practice would be to remove nodes by most-recently added.

state_clean() removes the sink/source nodes added by go().

reset() removes all nodes but the top-level source node and resets dependency properties.

Reviewed By: tjkswaine

Differential Revision: D3486947

fbshipit-source-id: c8b9db6a139ee5b36aae6e9366c9b338cc49ede1

folly/experimental/FutureDAG.h
folly/experimental/test/FutureDAGTest.cpp

index ba5b17756dde1e04dea478d554a36abc7d4afcbb..a825e617aa10f444b7a2c00a7728c79b997bbf75 100644 (file)
@@ -34,11 +34,58 @@ class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
     return nodes.size() - 1;
   }
 
+  void remove(Handle a) {
+    if (nodes.size() > a && nodes[a].hasDependents) {
+      for (auto& node : nodes) {
+        auto& deps = node.dependencies;
+        deps.erase(
+            std::remove(std::begin(deps), std::end(deps), a), std::end(deps));
+        for (Handle& handle : deps) {
+          if (handle > a) {
+            handle--;
+          }
+        }
+      }
+    }
+    nodes.erase(nodes.begin() + a);
+  }
+
+  void reset() {
+    // Delete all but source node, and reset dependency properties
+    Handle source_node;
+    std::unordered_set<Handle> memo;
+    for (auto& node : nodes) {
+      for (Handle handle : node.dependencies) {
+        memo.insert(handle);
+      }
+    }
+    for (Handle handle = 0; handle < nodes.size(); handle++) {
+      if (memo.find(handle) == memo.end()) {
+        source_node = handle;
+      }
+    }
+
+    // Faster to just create a new vector with the element in it?
+    nodes.erase(nodes.begin(), nodes.begin() + source_node);
+    nodes.erase(nodes.begin() + 1, nodes.end());
+    nodes[0].hasDependents = false;
+    nodes[0].dependencies.clear();
+  }
+
   void dependency(Handle a, Handle b) {
     nodes[b].dependencies.push_back(a);
     nodes[a].hasDependents = true;
   }
 
+  void clean_state(Handle source, Handle sink) {
+    for (auto handle : nodes[sink].dependencies) {
+      nodes[handle].hasDependents = false;
+    }
+    nodes[0].hasDependents = false;
+    remove(source);
+    remove(sink);
+  }
+
   Future<Unit> go() {
     if (hasCycle()) {
       return makeFuture<Unit>(std::runtime_error("Cycle in FutureDAG graph"));
@@ -71,21 +118,23 @@ class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
       }
 
       collect(dependencies)
-        .via(nodes[handle].executor)
-        .then([this, handle] {
-          nodes[handle].func()
-            .then([this, handle] (Try<Unit>&& t) {
+          .via(nodes[handle].executor)
+          .then([this, handle] {
+            nodes[handle].func().then([this, handle](Try<Unit>&& t) {
               nodes[handle].promise.setTry(std::move(t));
             });
-        })
-        .onError([this, handle] (exception_wrapper ew) {
-          nodes[handle].promise.setException(std::move(ew));
-        });
+          })
+          .onError([this, handle](exception_wrapper ew) {
+            nodes[handle].promise.setException(std::move(ew));
+          });
     }
 
     nodes[sourceHandle].promise.setValue();
     auto that = shared_from_this();
-    return nodes[sinkHandle].promise.getFuture().ensure([that]{});
+    return nodes[sinkHandle].promise.getFuture().ensure([that] {}).then(
+        [this, sourceHandle, sinkHandle]() {
+          clean_state(sourceHandle, sinkHandle);
+        });
   }
 
  private:
@@ -134,8 +183,8 @@ class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
   }
 
   struct Node {
-    Node(FutureFunc&& funcArg, Executor* executorArg) :
-      func(std::move(funcArg)), executor(executorArg) {}
+    Node(FutureFunc&& funcArg, Executor* executorArg)
+        : func(std::move(funcArg)), executor(executorArg) {}
 
     FutureFunc func{nullptr};
     Executor* executor{nullptr};
index 434a77f1d327171f4c5742e040a2371ebc9127bd..fa577bf1bb7061132414d8496a55687ea5dc609a 100644 (file)
@@ -13,9 +13,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include <boost/thread/barrier.hpp>
 #include <folly/experimental/FutureDAG.h>
 #include <gtest/gtest.h>
-#include <boost/thread/barrier.hpp>
 
 using namespace folly;
 
@@ -29,6 +29,39 @@ struct FutureDAGTest : public testing::Test {
     return handle;
   }
 
+  void reset() {
+    Handle source_node;
+    std::unordered_set<Handle> memo;
+    for (auto& node : nodes) {
+      for (Handle handle : node.second->dependencies) {
+        memo.insert(handle);
+      }
+    }
+    for (auto& node : nodes) {
+      if (memo.find(node.first) == memo.end()) {
+        source_node = node.first;
+      }
+    }
+    for (auto it = nodes.cbegin(); it != nodes.cend();) {
+      if (it->first != source_node) {
+        it = nodes.erase(it);
+      } else {
+        ++it;
+      }
+    }
+    dag->reset();
+  }
+
+  void remove(Handle a) {
+    for (auto itr = nodes.begin(); itr != nodes.end(); itr++) {
+      auto& deps = itr->second->dependencies;
+      if (std::find(deps.begin(), deps.end(), a) != deps.end()) {
+        deps.erase(deps.begin() + a);
+      }
+    }
+    nodes.erase(a);
+    dag->remove(a);
+  }
   void dependency(Handle a, Handle b) {
     nodes.at(b)->dependencies.push_back(a);
     dag->dependency(a, b);
@@ -68,13 +101,44 @@ struct FutureDAGTest : public testing::Test {
   std::vector<Handle> order;
 };
 
-
 TEST_F(FutureDAGTest, SingleNode) {
   add();
   ASSERT_NO_THROW(dag->go().get());
   checkOrder();
 }
 
+TEST_F(FutureDAGTest, RemoveSingleNode) {
+  auto h1 = add();
+  auto h2 = add();
+  remove(h2);
+  ASSERT_NO_THROW(dag->go().get());
+  checkOrder();
+}
+
+TEST_F(FutureDAGTest, RemoveNodeComplex) {
+  auto h1 = add();
+  auto h2 = add();
+  auto h3 = add();
+  dependency(h1, h3);
+  dependency(h2, h1);
+  remove(h1);
+  remove(h2);
+  ASSERT_NO_THROW(dag->go().get());
+  checkOrder();
+}
+
+TEST_F(FutureDAGTest, ResetDAG) {
+  auto h1 = add();
+  auto h2 = add();
+  auto h3 = add();
+  dependency(h3, h1);
+  dependency(h2, h3);
+
+  reset();
+  ASSERT_NO_THROW(dag->go().get());
+  checkOrder();
+}
+
 TEST_F(FutureDAGTest, FanOut) {
   auto h1 = add();
   auto h2 = add();
@@ -145,11 +209,9 @@ TEST_F(FutureDAGTest, Complex) {
   checkOrder();
 }
 
-FutureDAG::FutureFunc makeFutureFunc = []{
-  return makeFuture();
-};
+FutureDAG::FutureFunc makeFutureFunc = [] { return makeFuture(); };
 
-FutureDAG::FutureFunc throwFunc = []{
+FutureDAG::FutureFunc throwFunc = [] {
   return makeFuture<Unit>(std::runtime_error("oops"));
 };
 
@@ -198,7 +260,7 @@ TEST_F(FutureDAGTest, DestroyBeforeComplete) {
     auto dag = FutureDAG::create();
     auto h1 = dag->add([barrier] {
       auto p = std::make_shared<Promise<Unit>>();
-      std::thread t([p, barrier]{
+      std::thread t([p, barrier] {
         barrier->wait();
         p->setValue();
       });