From 2f9865aa90e91cb20a3ebf7545bd7036e2cd8102 Mon Sep 17 00:00:00 2001 From: Shayan Mohanty Date: Fri, 8 Jul 2016 17:12:44 -0700 Subject: [PATCH] Folly::FutureDAG <-> Gossit Summary: Implements remove(), state_clean(), and reset() functions in order to allow for static FutureDAGS that can be modified in place and executed multiple times. remove() removes the given handle from the nodes vector and cleans up all dependencies associated with it. Because of the way Handles are implemented, all Handles greater than the one removed are decremented (and therefore must be accounted for in the client-code). Current best-practice would be to remove nodes by most-recently added. state_clean() removes the sink/source nodes added by go(). reset() removes all nodes but the top-level source node and resets dependency properties. Reviewed By: tjkswaine Differential Revision: D3486947 fbshipit-source-id: c8b9db6a139ee5b36aae6e9366c9b338cc49ede1 --- folly/experimental/FutureDAG.h | 71 +++++++++++++++++---- folly/experimental/test/FutureDAGTest.cpp | 76 ++++++++++++++++++++--- 2 files changed, 129 insertions(+), 18 deletions(-) diff --git a/folly/experimental/FutureDAG.h b/folly/experimental/FutureDAG.h index ba5b1775..a825e617 100644 --- a/folly/experimental/FutureDAG.h +++ b/folly/experimental/FutureDAG.h @@ -34,11 +34,58 @@ class FutureDAG : public std::enable_shared_from_this { return nodes.size() - 1; } + void remove(Handle a) { + if (nodes.size() > a && nodes[a].hasDependents) { + for (auto& node : nodes) { + auto& deps = node.dependencies; + deps.erase( + std::remove(std::begin(deps), std::end(deps), a), std::end(deps)); + for (Handle& handle : deps) { + if (handle > a) { + handle--; + } + } + } + } + nodes.erase(nodes.begin() + a); + } + + void reset() { + // Delete all but source node, and reset dependency properties + Handle source_node; + std::unordered_set memo; + for (auto& node : nodes) { + for (Handle handle : node.dependencies) { + memo.insert(handle); + } + } + for (Handle handle = 0; handle < nodes.size(); handle++) { + if (memo.find(handle) == memo.end()) { + source_node = handle; + } + } + + // Faster to just create a new vector with the element in it? + nodes.erase(nodes.begin(), nodes.begin() + source_node); + nodes.erase(nodes.begin() + 1, nodes.end()); + nodes[0].hasDependents = false; + nodes[0].dependencies.clear(); + } + void dependency(Handle a, Handle b) { nodes[b].dependencies.push_back(a); nodes[a].hasDependents = true; } + void clean_state(Handle source, Handle sink) { + for (auto handle : nodes[sink].dependencies) { + nodes[handle].hasDependents = false; + } + nodes[0].hasDependents = false; + remove(source); + remove(sink); + } + Future go() { if (hasCycle()) { return makeFuture(std::runtime_error("Cycle in FutureDAG graph")); @@ -71,21 +118,23 @@ class FutureDAG : public std::enable_shared_from_this { } collect(dependencies) - .via(nodes[handle].executor) - .then([this, handle] { - nodes[handle].func() - .then([this, handle] (Try&& t) { + .via(nodes[handle].executor) + .then([this, handle] { + nodes[handle].func().then([this, handle](Try&& t) { nodes[handle].promise.setTry(std::move(t)); }); - }) - .onError([this, handle] (exception_wrapper ew) { - nodes[handle].promise.setException(std::move(ew)); - }); + }) + .onError([this, handle](exception_wrapper ew) { + nodes[handle].promise.setException(std::move(ew)); + }); } nodes[sourceHandle].promise.setValue(); auto that = shared_from_this(); - return nodes[sinkHandle].promise.getFuture().ensure([that]{}); + return nodes[sinkHandle].promise.getFuture().ensure([that] {}).then( + [this, sourceHandle, sinkHandle]() { + clean_state(sourceHandle, sinkHandle); + }); } private: @@ -134,8 +183,8 @@ class FutureDAG : public std::enable_shared_from_this { } struct Node { - Node(FutureFunc&& funcArg, Executor* executorArg) : - func(std::move(funcArg)), executor(executorArg) {} + Node(FutureFunc&& funcArg, Executor* executorArg) + : func(std::move(funcArg)), executor(executorArg) {} FutureFunc func{nullptr}; Executor* executor{nullptr}; diff --git a/folly/experimental/test/FutureDAGTest.cpp b/folly/experimental/test/FutureDAGTest.cpp index 434a77f1..fa577bf1 100644 --- a/folly/experimental/test/FutureDAGTest.cpp +++ b/folly/experimental/test/FutureDAGTest.cpp @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include #include #include -#include using namespace folly; @@ -29,6 +29,39 @@ struct FutureDAGTest : public testing::Test { return handle; } + void reset() { + Handle source_node; + std::unordered_set memo; + for (auto& node : nodes) { + for (Handle handle : node.second->dependencies) { + memo.insert(handle); + } + } + for (auto& node : nodes) { + if (memo.find(node.first) == memo.end()) { + source_node = node.first; + } + } + for (auto it = nodes.cbegin(); it != nodes.cend();) { + if (it->first != source_node) { + it = nodes.erase(it); + } else { + ++it; + } + } + dag->reset(); + } + + void remove(Handle a) { + for (auto itr = nodes.begin(); itr != nodes.end(); itr++) { + auto& deps = itr->second->dependencies; + if (std::find(deps.begin(), deps.end(), a) != deps.end()) { + deps.erase(deps.begin() + a); + } + } + nodes.erase(a); + dag->remove(a); + } void dependency(Handle a, Handle b) { nodes.at(b)->dependencies.push_back(a); dag->dependency(a, b); @@ -68,13 +101,44 @@ struct FutureDAGTest : public testing::Test { std::vector order; }; - TEST_F(FutureDAGTest, SingleNode) { add(); ASSERT_NO_THROW(dag->go().get()); checkOrder(); } +TEST_F(FutureDAGTest, RemoveSingleNode) { + auto h1 = add(); + auto h2 = add(); + remove(h2); + ASSERT_NO_THROW(dag->go().get()); + checkOrder(); +} + +TEST_F(FutureDAGTest, RemoveNodeComplex) { + auto h1 = add(); + auto h2 = add(); + auto h3 = add(); + dependency(h1, h3); + dependency(h2, h1); + remove(h1); + remove(h2); + ASSERT_NO_THROW(dag->go().get()); + checkOrder(); +} + +TEST_F(FutureDAGTest, ResetDAG) { + auto h1 = add(); + auto h2 = add(); + auto h3 = add(); + dependency(h3, h1); + dependency(h2, h3); + + reset(); + ASSERT_NO_THROW(dag->go().get()); + checkOrder(); +} + TEST_F(FutureDAGTest, FanOut) { auto h1 = add(); auto h2 = add(); @@ -145,11 +209,9 @@ TEST_F(FutureDAGTest, Complex) { checkOrder(); } -FutureDAG::FutureFunc makeFutureFunc = []{ - return makeFuture(); -}; +FutureDAG::FutureFunc makeFutureFunc = [] { return makeFuture(); }; -FutureDAG::FutureFunc throwFunc = []{ +FutureDAG::FutureFunc throwFunc = [] { return makeFuture(std::runtime_error("oops")); }; @@ -198,7 +260,7 @@ TEST_F(FutureDAGTest, DestroyBeforeComplete) { auto dag = FutureDAG::create(); auto h1 = dag->add([barrier] { auto p = std::make_shared>(); - std::thread t([p, barrier]{ + std::thread t([p, barrier] { barrier->wait(); p->setValue(); }); -- 2.34.1