From f51b046693ba4e381b19e1f1f4c1e0b8efdbb287 Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Mon, 18 May 2015 08:42:50 -0700 Subject: [PATCH] RFC: FutureDAG Summary: See task. Set up a DAG of Future-returning tasks (optionally with executors) and eventually kick them off. One big question is ownership. Currently the user would be responsible for ensuring that the FutureDAG outlives its own completion. This requirement could go away with shared_from_this magic maybe Test Plan: unit. I didn't bother to test via() functionality because it's too much work for now - the functionality is trivial. Same for "true-async" dags... Reviewed By: hans@fb.com Subscribers: folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D2073481 Signature: t1:2073481:1431961131:82a8898502d5308f6ab3cc8cc5b84b016d3998fe --- folly/Makefile.am | 1 + folly/experimental/FutureDAG.h | 151 +++++++++++++++ folly/experimental/test/FutureDAGTest.cpp | 214 ++++++++++++++++++++++ 3 files changed, 366 insertions(+) create mode 100644 folly/experimental/FutureDAG.h create mode 100644 folly/experimental/test/FutureDAGTest.cpp diff --git a/folly/Makefile.am b/folly/Makefile.am index 92d56b73..3d656b78 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -101,6 +101,7 @@ nobase_follyinclude_HEADERS = \ experimental/fibers/WhenN.h \ experimental/fibers/WhenN-inl.h \ experimental/FunctionScheduler.h \ + experimental/FutureDAG.h \ experimental/io/FsUtil.h \ experimental/JSONSchema.h \ experimental/Select64.h \ diff --git a/folly/experimental/FutureDAG.h b/folly/experimental/FutureDAG.h new file mode 100644 index 00000000..ebf20e84 --- /dev/null +++ b/folly/experimental/FutureDAG.h @@ -0,0 +1,151 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace folly { + +class FutureDAG : public std::enable_shared_from_this { + public: + static std::shared_ptr create() { + return std::shared_ptr(new FutureDAG()); + } + + typedef size_t Handle; + typedef std::function()> FutureFunc; + + Handle add(FutureFunc func, Executor* executor = nullptr) { + nodes.emplace_back(std::move(func), executor); + return nodes.size() - 1; + } + + void dependency(Handle a, Handle b) { + nodes[b].dependencies.push_back(a); + nodes[a].hasDependents = true; + } + + Future go() { + if (hasCycle()) { + return makeFuture(std::runtime_error("Cycle in FutureDAG graph")); + } + std::vector rootNodes; + std::vector leafNodes; + for (Handle handle = 0; handle < nodes.size(); handle++) { + if (nodes[handle].dependencies.empty()) { + rootNodes.push_back(handle); + } + if (!nodes[handle].hasDependents) { + leafNodes.push_back(handle); + } + } + + auto sinkHandle = add([] { return Future(); }); + for (auto handle : leafNodes) { + dependency(handle, sinkHandle); + } + + auto sourceHandle = add(nullptr); + for (auto handle : rootNodes) { + dependency(sourceHandle, handle); + } + + for (Handle handle = 0; handle < nodes.size() - 1; handle++) { + std::vector> dependencies; + for (auto depHandle : nodes[handle].dependencies) { + dependencies.push_back(nodes[depHandle].promise.getFuture()); + } + + collect(dependencies) + .via(nodes[handle].executor) + .then([this, handle] { + nodes[handle].func() + .then([this, handle] (Try&& t) { + nodes[handle].promise.setTry(std::move(t)); + }); + }) + .onError([this, handle] (exception_wrapper ew) { + nodes[handle].promise.setException(std::move(ew)); + }); + } + + nodes[sourceHandle].promise.setValue(); + auto that = shared_from_this(); + return nodes[sinkHandle].promise.getFuture().ensure([that]{}); + } + + private: + FutureDAG() = default; + + bool hasCycle() { + // Perform a modified topological sort to detect cycles + std::vector> dependencies; + for (auto& node : nodes) { + dependencies.push_back(node.dependencies); + } + + std::vector dependents(nodes.size()); + for (auto& dependencyEdges : dependencies) { + for (auto handle : dependencyEdges) { + dependents[handle]++; + } + } + + std::vector handles; + for (Handle handle = 0; handle < nodes.size(); handle++) { + if (!nodes[handle].hasDependents) { + handles.push_back(handle); + } + } + + while (!handles.empty()) { + auto handle = handles.back(); + handles.pop_back(); + while (!dependencies[handle].empty()) { + auto dependency = dependencies[handle].back(); + dependencies[handle].pop_back(); + if (--dependents[dependency] == 0) { + handles.push_back(dependency); + } + } + } + + for (auto& dependencyEdges : dependencies) { + if (!dependencyEdges.empty()) { + return true; + } + } + + return false; + } + + struct Node { + Node(FutureFunc&& funcArg, Executor* executorArg) : + func(std::move(funcArg)), executor(executorArg) {} + + FutureFunc func{nullptr}; + Executor* executor{nullptr}; + SharedPromise promise; + std::vector dependencies; + bool hasDependents{false}; + bool visited{false}; + }; + + std::vector nodes; +}; + +} // folly diff --git a/folly/experimental/test/FutureDAGTest.cpp b/folly/experimental/test/FutureDAGTest.cpp new file mode 100644 index 00000000..95dad8ba --- /dev/null +++ b/folly/experimental/test/FutureDAGTest.cpp @@ -0,0 +1,214 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include + +using namespace folly; + +struct FutureDAGTest : public testing::Test { + typedef FutureDAG::Handle Handle; + + Handle add() { + auto node = folly::make_unique(this); + auto handle = node->handle; + nodes.emplace(handle, std::move(node)); + return handle; + } + + void dependency(Handle a, Handle b) { + nodes.at(b)->dependencies.push_back(a); + dag->dependency(a, b); + } + + void checkOrder() { + EXPECT_EQ(nodes.size(), order.size()); + for (auto& kv : nodes) { + auto handle = kv.first; + auto& node = kv.second; + auto it = order.begin(); + while (*it != handle) { + it++; + } + for (auto dep : node->dependencies) { + EXPECT_TRUE(std::find(it, order.end(), dep) == order.end()); + } + } + } + + struct TestNode { + explicit TestNode(FutureDAGTest* test) { + func = [this, test] { + test->order.push_back(handle); + return Future(); + }; + handle = test->dag->add(func); + } + + FutureDAG::FutureFunc func; + Handle handle; + std::vector dependencies; + }; + + std::shared_ptr dag = FutureDAG::create(); + std::map> nodes; + std::vector order; +}; + + +TEST_F(FutureDAGTest, SingleNode) { + add(); + ASSERT_NO_THROW(dag->go().get()); + checkOrder(); +} + +TEST_F(FutureDAGTest, FanOut) { + auto h1 = add(); + auto h2 = add(); + auto h3 = add(); + dependency(h1, h2); + dependency(h1, h3); + ASSERT_NO_THROW(dag->go().get()); + checkOrder(); +} + +TEST_F(FutureDAGTest, FanIn) { + auto h1 = add(); + auto h2 = add(); + auto h3 = add(); + dependency(h1, h3); + dependency(h2, h3); + ASSERT_NO_THROW(dag->go().get()); + checkOrder(); +} + +TEST_F(FutureDAGTest, FanOutFanIn) { + auto h1 = add(); + auto h2 = add(); + auto h3 = add(); + auto h4 = add(); + dependency(h1, h3); + dependency(h1, h2); + dependency(h2, h4); + dependency(h3, h4); + ASSERT_NO_THROW(dag->go().get()); + checkOrder(); +} + +TEST_F(FutureDAGTest, Complex) { + auto A = add(); + auto B = add(); + auto C = add(); + auto D = add(); + auto E = add(); + auto F = add(); + auto G = add(); + auto H = add(); + auto I = add(); + auto J = add(); + auto K = add(); + auto L = add(); + auto M = add(); + auto N = add(); + + dependency(A, B); + dependency(A, C); + dependency(A, D); + dependency(A, J); + dependency(C, H); + dependency(D, E); + dependency(E, F); + dependency(E, G); + dependency(F, H); + dependency(G, H); + dependency(H, I); + dependency(J, K); + dependency(K, L); + dependency(K, M); + dependency(L, N); + dependency(I, N); + + ASSERT_NO_THROW(dag->go().get()); + checkOrder(); +} + +FutureDAG::FutureFunc makeFutureFunc = []{ + return makeFuture(); +}; + +FutureDAG::FutureFunc throwFunc = []{ + return makeFuture(std::runtime_error("oops")); +}; + +TEST_F(FutureDAGTest, ThrowBegin) { + auto h1 = dag->add(throwFunc); + auto h2 = dag->add(makeFutureFunc); + dag->dependency(h1, h2); + EXPECT_THROW(dag->go().get(), std::runtime_error); +} + +TEST_F(FutureDAGTest, ThrowEnd) { + auto h1 = dag->add(makeFutureFunc); + auto h2 = dag->add(throwFunc); + dag->dependency(h1, h2); + EXPECT_THROW(dag->go().get(), std::runtime_error); +} + +TEST_F(FutureDAGTest, Cycle1) { + auto h1 = add(); + dependency(h1, h1); + EXPECT_THROW(dag->go().get(), std::runtime_error); +} + +TEST_F(FutureDAGTest, Cycle2) { + auto h1 = add(); + auto h2 = add(); + dependency(h1, h2); + dependency(h2, h1); + EXPECT_THROW(dag->go().get(), std::runtime_error); +} + +TEST_F(FutureDAGTest, Cycle3) { + auto h1 = add(); + auto h2 = add(); + auto h3 = add(); + dependency(h1, h2); + dependency(h2, h3); + dependency(h3, h1); + EXPECT_THROW(dag->go().get(), std::runtime_error); +} + +TEST_F(FutureDAGTest, DestroyBeforeComplete) { + auto barrier = std::make_shared(2); + Future f; + { + auto dag = FutureDAG::create(); + auto h1 = dag->add([barrier] { + auto p = std::make_shared>(); + std::thread t([p, barrier]{ + barrier->wait(); + p->setValue(); + }); + t.detach(); + return p->getFuture(); + }); + auto h2 = dag->add(makeFutureFunc); + dag->dependency(h1, h2); + f = dag->go(); + } + barrier->wait(); + ASSERT_NO_THROW(f.get()); +} -- 2.34.1