X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fexperimental%2FFutureDAG.h;h=bf14f33e9cb4770af69bf85adb8f2ef9816e992f;hb=79e5fd2fa293cf03269d4b7bba9ed7a31dda1cd8;hp=ebf20e84d304145b9418ba23eb6c1e8cda490c4e;hpb=f51b046693ba4e381b19e1f1f4c1e0b8efdbb287;p=folly.git diff --git a/folly/experimental/FutureDAG.h b/folly/experimental/FutureDAG.h index ebf20e84..bf14f33e 100644 --- a/folly/experimental/FutureDAG.h +++ b/folly/experimental/FutureDAG.h @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,21 +27,72 @@ class FutureDAG : public std::enable_shared_from_this { } typedef size_t Handle; - typedef std::function()> FutureFunc; + typedef std::function()> FutureFunc; Handle add(FutureFunc func, Executor* executor = nullptr) { nodes.emplace_back(std::move(func), executor); return nodes.size() - 1; } + void remove(Handle a) { + if (a >= nodes.size()) { + return; + } + + if (nodes[a].hasDependents) { + for (auto& node : nodes) { + auto& deps = node.dependencies; + deps.erase( + std::remove(std::begin(deps), std::end(deps), a), std::end(deps)); + for (Handle& handle : deps) { + if (handle > a) { + handle--; + } + } + } + } + + nodes.erase(nodes.begin() + a); + } + + void reset() { + // Delete all but source node, and reset dependency properties + Handle source_node; + std::unordered_set memo; + for (auto& node : nodes) { + for (Handle handle : node.dependencies) { + memo.insert(handle); + } + } + for (Handle handle = 0; handle < nodes.size(); handle++) { + if (memo.find(handle) == memo.end()) { + source_node = handle; + } + } + + nodes.erase(nodes.begin(), nodes.begin() + source_node); + nodes.erase(nodes.begin() + 1, nodes.end()); + nodes[0].hasDependents = false; + nodes[0].dependencies.clear(); + } + void dependency(Handle a, Handle b) { nodes[b].dependencies.push_back(a); nodes[a].hasDependents = true; } - Future go() { + void clean_state(Handle source, Handle sink) { + for (auto handle : nodes[sink].dependencies) { + nodes[handle].hasDependents = false; + } + nodes[0].hasDependents = false; + remove(source); + remove(sink); + } + + Future go() { if (hasCycle()) { - return makeFuture(std::runtime_error("Cycle in FutureDAG graph")); + return makeFuture(std::runtime_error("Cycle in FutureDAG graph")); } std::vector rootNodes; std::vector leafNodes; @@ -54,7 +105,7 @@ class FutureDAG : public std::enable_shared_from_this { } } - auto sinkHandle = add([] { return Future(); }); + auto sinkHandle = add([] { return Future(); }); for (auto handle : leafNodes) { dependency(handle, sinkHandle); } @@ -65,27 +116,29 @@ class FutureDAG : public std::enable_shared_from_this { } for (Handle handle = 0; handle < nodes.size() - 1; handle++) { - std::vector> dependencies; + std::vector> dependencies; for (auto depHandle : nodes[handle].dependencies) { dependencies.push_back(nodes[depHandle].promise.getFuture()); } collect(dependencies) - .via(nodes[handle].executor) - .then([this, handle] { - nodes[handle].func() - .then([this, handle] (Try&& t) { + .via(nodes[handle].executor) + .then([this, handle] { + nodes[handle].func().then([this, handle](Try&& t) { nodes[handle].promise.setTry(std::move(t)); }); - }) - .onError([this, handle] (exception_wrapper ew) { - nodes[handle].promise.setException(std::move(ew)); - }); + }) + .onError([this, handle](exception_wrapper ew) { + nodes[handle].promise.setException(std::move(ew)); + }); } nodes[sourceHandle].promise.setValue(); auto that = shared_from_this(); - return nodes[sinkHandle].promise.getFuture().ensure([that]{}); + return nodes[sinkHandle].promise.getFuture().ensure([that] {}).then( + [this, sourceHandle, sinkHandle]() { + clean_state(sourceHandle, sinkHandle); + }); } private: @@ -134,12 +187,12 @@ class FutureDAG : public std::enable_shared_from_this { } struct Node { - Node(FutureFunc&& funcArg, Executor* executorArg) : - func(std::move(funcArg)), executor(executorArg) {} + Node(FutureFunc&& funcArg, Executor* executorArg) + : func(std::move(funcArg)), executor(executorArg) {} FutureFunc func{nullptr}; Executor* executor{nullptr}; - SharedPromise promise; + SharedPromise promise; std::vector dependencies; bool hasDependents{false}; bool visited{false}; @@ -148,4 +201,28 @@ class FutureDAG : public std::enable_shared_from_this { std::vector nodes; }; +// Polymorphic functor implementation +template +class FutureDAGFunctor { + public: + std::shared_ptr dag = FutureDAG::create(); + T state; + std::vector dep_states; + T result() { + return state; + } + // execReset() runs DAG & clears all nodes except for source + void execReset() { + this->dag->go().get(); + this->dag->reset(); + } + void exec() { + this->dag->go().get(); + } + virtual void operator()(){} + explicit FutureDAGFunctor(T init_val) : state(init_val) {} + FutureDAGFunctor() : state() {} + virtual ~FutureDAGFunctor(){} +}; + } // folly