RFC: FutureDAG
authorJames Sedgwick <jsedgwick@fb.com>
Mon, 18 May 2015 15:42:50 +0000 (08:42 -0700)
committerViswanath Sivakumar <viswanath@fb.com>
Wed, 20 May 2015 17:57:11 +0000 (10:57 -0700)
Summary:
See task. Set up a DAG of Future-returning tasks (optionally with executors) and eventually kick them off.
One big question is ownership. Currently the user would be responsible for ensuring that the FutureDAG outlives its own completion. This requirement could go away with shared_from_this magic maybe

Test Plan: unit. I didn't bother to test via() functionality because it's too much work for now - the functionality is trivial. Same for "true-async" dags...

Reviewed By: hans@fb.com

Subscribers: folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2073481

Signature: t1:2073481:1431961131:82a8898502d5308f6ab3cc8cc5b84b016d3998fe

folly/Makefile.am
folly/experimental/FutureDAG.h [new file with mode: 0644]
folly/experimental/test/FutureDAGTest.cpp [new file with mode: 0644]

index 92d56b7399f6105a6e995490e9c999e79f0e4272..3d656b786e394d6e8841aceb4eb2bd2e87bfe555 100644 (file)
@@ -101,6 +101,7 @@ nobase_follyinclude_HEADERS = \
        experimental/fibers/WhenN.h \
        experimental/fibers/WhenN-inl.h \
        experimental/FunctionScheduler.h \
+       experimental/FutureDAG.h \
        experimental/io/FsUtil.h \
        experimental/JSONSchema.h \
        experimental/Select64.h \
diff --git a/folly/experimental/FutureDAG.h b/folly/experimental/FutureDAG.h
new file mode 100644 (file)
index 0000000..ebf20e8
--- /dev/null
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/futures/Future.h>
+#include <folly/futures/SharedPromise.h>
+
+namespace folly {
+
+class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
+ public:
+  static std::shared_ptr<FutureDAG> create() {
+    return std::shared_ptr<FutureDAG>(new FutureDAG());
+  }
+
+  typedef size_t Handle;
+  typedef std::function<Future<void>()> FutureFunc;
+
+  Handle add(FutureFunc func, Executor* executor = nullptr) {
+    nodes.emplace_back(std::move(func), executor);
+    return nodes.size() - 1;
+  }
+
+  void dependency(Handle a, Handle b) {
+    nodes[b].dependencies.push_back(a);
+    nodes[a].hasDependents = true;
+  }
+
+  Future<void> go() {
+    if (hasCycle()) {
+      return makeFuture<void>(std::runtime_error("Cycle in FutureDAG graph"));
+    }
+    std::vector<Handle> rootNodes;
+    std::vector<Handle> leafNodes;
+    for (Handle handle = 0; handle < nodes.size(); handle++) {
+      if (nodes[handle].dependencies.empty()) {
+        rootNodes.push_back(handle);
+      }
+      if (!nodes[handle].hasDependents) {
+        leafNodes.push_back(handle);
+      }
+    }
+
+    auto sinkHandle = add([] { return Future<void>(); });
+    for (auto handle : leafNodes) {
+      dependency(handle, sinkHandle);
+    }
+
+    auto sourceHandle = add(nullptr);
+    for (auto handle : rootNodes) {
+      dependency(sourceHandle, handle);
+    }
+
+    for (Handle handle = 0; handle < nodes.size() - 1; handle++) {
+      std::vector<Future<void>> dependencies;
+      for (auto depHandle : nodes[handle].dependencies) {
+        dependencies.push_back(nodes[depHandle].promise.getFuture());
+      }
+
+      collect(dependencies)
+        .via(nodes[handle].executor)
+        .then([this, handle] {
+          nodes[handle].func()
+            .then([this, handle] (Try<void>&& t) {
+              nodes[handle].promise.setTry(std::move(t));
+            });
+        })
+        .onError([this, handle] (exception_wrapper ew) {
+          nodes[handle].promise.setException(std::move(ew));
+        });
+    }
+
+    nodes[sourceHandle].promise.setValue();
+    auto that = shared_from_this();
+    return nodes[sinkHandle].promise.getFuture().ensure([that]{});
+  }
+
+ private:
+  FutureDAG() = default;
+
+  bool hasCycle() {
+    // Perform a modified topological sort to detect cycles
+    std::vector<std::vector<Handle>> dependencies;
+    for (auto& node : nodes) {
+      dependencies.push_back(node.dependencies);
+    }
+
+    std::vector<size_t> dependents(nodes.size());
+    for (auto& dependencyEdges : dependencies) {
+      for (auto handle : dependencyEdges) {
+        dependents[handle]++;
+      }
+    }
+
+    std::vector<Handle> handles;
+    for (Handle handle = 0; handle < nodes.size(); handle++) {
+      if (!nodes[handle].hasDependents) {
+        handles.push_back(handle);
+      }
+    }
+
+    while (!handles.empty()) {
+      auto handle = handles.back();
+      handles.pop_back();
+      while (!dependencies[handle].empty()) {
+        auto dependency = dependencies[handle].back();
+        dependencies[handle].pop_back();
+        if (--dependents[dependency] == 0) {
+          handles.push_back(dependency);
+        }
+      }
+    }
+
+    for (auto& dependencyEdges : dependencies) {
+      if (!dependencyEdges.empty()) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  struct Node {
+    Node(FutureFunc&& funcArg, Executor* executorArg) :
+      func(std::move(funcArg)), executor(executorArg) {}
+
+    FutureFunc func{nullptr};
+    Executor* executor{nullptr};
+    SharedPromise<void> promise;
+    std::vector<Handle> dependencies;
+    bool hasDependents{false};
+    bool visited{false};
+  };
+
+  std::vector<Node> nodes;
+};
+
+} // folly
diff --git a/folly/experimental/test/FutureDAGTest.cpp b/folly/experimental/test/FutureDAGTest.cpp
new file mode 100644 (file)
index 0000000..95dad8b
--- /dev/null
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/FutureDAG.h>
+#include <gtest/gtest.h>
+#include <boost/thread/barrier.hpp>
+
+using namespace folly;
+
+struct FutureDAGTest : public testing::Test {
+  typedef FutureDAG::Handle Handle;
+
+  Handle add() {
+    auto node = folly::make_unique<TestNode>(this);
+    auto handle = node->handle;
+    nodes.emplace(handle, std::move(node));
+    return handle;
+  }
+
+  void dependency(Handle a, Handle b) {
+    nodes.at(b)->dependencies.push_back(a);
+    dag->dependency(a, b);
+  }
+
+  void checkOrder() {
+    EXPECT_EQ(nodes.size(), order.size());
+    for (auto& kv : nodes) {
+      auto handle = kv.first;
+      auto& node = kv.second;
+      auto it = order.begin();
+      while (*it != handle) {
+        it++;
+      }
+      for (auto dep : node->dependencies) {
+        EXPECT_TRUE(std::find(it, order.end(), dep) == order.end());
+      }
+    }
+  }
+
+  struct TestNode {
+    explicit TestNode(FutureDAGTest* test) {
+      func = [this, test] {
+        test->order.push_back(handle);
+        return Future<void>();
+      };
+      handle = test->dag->add(func);
+    }
+
+    FutureDAG::FutureFunc func;
+    Handle handle;
+    std::vector<Handle> dependencies;
+  };
+
+  std::shared_ptr<FutureDAG> dag = FutureDAG::create();
+  std::map<Handle, std::unique_ptr<TestNode>> nodes;
+  std::vector<Handle> order;
+};
+
+
+TEST_F(FutureDAGTest, SingleNode) {
+  add();
+  ASSERT_NO_THROW(dag->go().get());
+  checkOrder();
+}
+
+TEST_F(FutureDAGTest, FanOut) {
+  auto h1 = add();
+  auto h2 = add();
+  auto h3 = add();
+  dependency(h1, h2);
+  dependency(h1, h3);
+  ASSERT_NO_THROW(dag->go().get());
+  checkOrder();
+}
+
+TEST_F(FutureDAGTest, FanIn) {
+  auto h1 = add();
+  auto h2 = add();
+  auto h3 = add();
+  dependency(h1, h3);
+  dependency(h2, h3);
+  ASSERT_NO_THROW(dag->go().get());
+  checkOrder();
+}
+
+TEST_F(FutureDAGTest, FanOutFanIn) {
+  auto h1 = add();
+  auto h2 = add();
+  auto h3 = add();
+  auto h4 = add();
+  dependency(h1, h3);
+  dependency(h1, h2);
+  dependency(h2, h4);
+  dependency(h3, h4);
+  ASSERT_NO_THROW(dag->go().get());
+  checkOrder();
+}
+
+TEST_F(FutureDAGTest, Complex) {
+  auto A = add();
+  auto B = add();
+  auto C = add();
+  auto D = add();
+  auto E = add();
+  auto F = add();
+  auto G = add();
+  auto H = add();
+  auto I = add();
+  auto J = add();
+  auto K = add();
+  auto L = add();
+  auto M = add();
+  auto N = add();
+
+  dependency(A, B);
+  dependency(A, C);
+  dependency(A, D);
+  dependency(A, J);
+  dependency(C, H);
+  dependency(D, E);
+  dependency(E, F);
+  dependency(E, G);
+  dependency(F, H);
+  dependency(G, H);
+  dependency(H, I);
+  dependency(J, K);
+  dependency(K, L);
+  dependency(K, M);
+  dependency(L, N);
+  dependency(I, N);
+
+  ASSERT_NO_THROW(dag->go().get());
+  checkOrder();
+}
+
+FutureDAG::FutureFunc makeFutureFunc = []{
+  return makeFuture();
+};
+
+FutureDAG::FutureFunc throwFunc = []{
+  return makeFuture<void>(std::runtime_error("oops"));
+};
+
+TEST_F(FutureDAGTest, ThrowBegin) {
+  auto h1 = dag->add(throwFunc);
+  auto h2 = dag->add(makeFutureFunc);
+  dag->dependency(h1, h2);
+  EXPECT_THROW(dag->go().get(), std::runtime_error);
+}
+
+TEST_F(FutureDAGTest, ThrowEnd) {
+  auto h1 = dag->add(makeFutureFunc);
+  auto h2 = dag->add(throwFunc);
+  dag->dependency(h1, h2);
+  EXPECT_THROW(dag->go().get(), std::runtime_error);
+}
+
+TEST_F(FutureDAGTest, Cycle1) {
+  auto h1 = add();
+  dependency(h1, h1);
+  EXPECT_THROW(dag->go().get(), std::runtime_error);
+}
+
+TEST_F(FutureDAGTest, Cycle2) {
+  auto h1 = add();
+  auto h2 = add();
+  dependency(h1, h2);
+  dependency(h2, h1);
+  EXPECT_THROW(dag->go().get(), std::runtime_error);
+}
+
+TEST_F(FutureDAGTest, Cycle3) {
+  auto h1 = add();
+  auto h2 = add();
+  auto h3 = add();
+  dependency(h1, h2);
+  dependency(h2, h3);
+  dependency(h3, h1);
+  EXPECT_THROW(dag->go().get(), std::runtime_error);
+}
+
+TEST_F(FutureDAGTest, DestroyBeforeComplete) {
+  auto barrier = std::make_shared<boost::barrier>(2);
+  Future<void> f;
+  {
+    auto dag = FutureDAG::create();
+    auto h1 = dag->add([barrier] {
+      auto p = std::make_shared<Promise<void>>();
+      std::thread t([p, barrier]{
+        barrier->wait();
+        p->setValue();
+      });
+      t.detach();
+      return p->getFuture();
+    });
+    auto h2 = dag->add(makeFutureFunc);
+    dag->dependency(h1, h2);
+    f = dag->go();
+  }
+  barrier->wait();
+  ASSERT_NO_THROW(f.get());
+}