2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/futures/Future.h>
19 #include <folly/futures/SharedPromise.h>
23 class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
25 static std::shared_ptr<FutureDAG> create() {
26 return std::shared_ptr<FutureDAG>(new FutureDAG());
29 typedef size_t Handle;
30 typedef std::function<Future<Unit>()> FutureFunc;
32 Handle add(FutureFunc func, Executor* executor = nullptr) {
33 nodes.emplace_back(std::move(func), executor);
34 return nodes.size() - 1;
37 void remove(Handle a) {
38 if (a >= nodes.size()) {
42 if (nodes[a].hasDependents) {
43 for (auto& node : nodes) {
44 auto& deps = node.dependencies;
46 std::remove(std::begin(deps), std::end(deps), a), std::end(deps));
47 for (Handle& handle : deps) {
55 nodes.erase(nodes.begin() + a);
59 // Delete all but source node, and reset dependency properties
61 std::unordered_set<Handle> memo;
62 for (auto& node : nodes) {
63 for (Handle handle : node.dependencies) {
67 for (Handle handle = 0; handle < nodes.size(); handle++) {
68 if (memo.find(handle) == memo.end()) {
73 nodes.erase(nodes.begin(), nodes.begin() + source_node);
74 nodes.erase(nodes.begin() + 1, nodes.end());
75 nodes[0].hasDependents = false;
76 nodes[0].dependencies.clear();
79 void dependency(Handle a, Handle b) {
80 nodes[b].dependencies.push_back(a);
81 nodes[a].hasDependents = true;
84 void clean_state(Handle source, Handle sink) {
85 for (auto handle : nodes[sink].dependencies) {
86 nodes[handle].hasDependents = false;
88 nodes[0].hasDependents = false;
95 return makeFuture<Unit>(std::runtime_error("Cycle in FutureDAG graph"));
97 std::vector<Handle> rootNodes;
98 std::vector<Handle> leafNodes;
99 for (Handle handle = 0; handle < nodes.size(); handle++) {
100 if (nodes[handle].dependencies.empty()) {
101 rootNodes.push_back(handle);
103 if (!nodes[handle].hasDependents) {
104 leafNodes.push_back(handle);
108 auto sinkHandle = add([] { return Future<Unit>(); });
109 for (auto handle : leafNodes) {
110 dependency(handle, sinkHandle);
113 auto sourceHandle = add(nullptr);
114 for (auto handle : rootNodes) {
115 dependency(sourceHandle, handle);
118 for (Handle handle = 0; handle < nodes.size() - 1; handle++) {
119 std::vector<Future<Unit>> dependencies;
120 for (auto depHandle : nodes[handle].dependencies) {
121 dependencies.push_back(nodes[depHandle].promise.getFuture());
124 collect(dependencies)
125 .via(nodes[handle].executor)
126 .then([this, handle] {
127 nodes[handle].func().then([this, handle](Try<Unit>&& t) {
128 nodes[handle].promise.setTry(std::move(t));
131 .onError([this, handle](exception_wrapper ew) {
132 nodes[handle].promise.setException(std::move(ew));
136 nodes[sourceHandle].promise.setValue();
137 auto that = shared_from_this();
138 return nodes[sinkHandle].promise.getFuture().ensure([that] {}).then(
139 [this, sourceHandle, sinkHandle]() {
140 clean_state(sourceHandle, sinkHandle);
145 FutureDAG() = default;
148 // Perform a modified topological sort to detect cycles
149 std::vector<std::vector<Handle>> dependencies;
150 for (auto& node : nodes) {
151 dependencies.push_back(node.dependencies);
154 std::vector<size_t> dependents(nodes.size());
155 for (auto& dependencyEdges : dependencies) {
156 for (auto handle : dependencyEdges) {
157 dependents[handle]++;
161 std::vector<Handle> handles;
162 for (Handle handle = 0; handle < nodes.size(); handle++) {
163 if (!nodes[handle].hasDependents) {
164 handles.push_back(handle);
168 while (!handles.empty()) {
169 auto handle = handles.back();
171 while (!dependencies[handle].empty()) {
172 auto dependency = dependencies[handle].back();
173 dependencies[handle].pop_back();
174 if (--dependents[dependency] == 0) {
175 handles.push_back(dependency);
180 for (auto& dependencyEdges : dependencies) {
181 if (!dependencyEdges.empty()) {
190 Node(FutureFunc&& funcArg, Executor* executorArg)
191 : func(std::move(funcArg)), executor(executorArg) {}
193 FutureFunc func{nullptr};
194 Executor* executor{nullptr};
195 SharedPromise<Unit> promise;
196 std::vector<Handle> dependencies;
197 bool hasDependents{false};
201 std::vector<Node> nodes;
204 // Polymorphic functor implementation
205 template <typename T>
206 class FutureDAGFunctor {
208 std::shared_ptr<FutureDAG> dag = FutureDAG::create();
210 std::vector<T> dep_states;
214 // execReset() runs DAG & clears all nodes except for source
216 this->dag->go().get();
220 this->dag->go().get();
222 virtual void operator()(){}
223 explicit FutureDAGFunctor(T init_val) : state(init_val) {}
224 FutureDAGFunctor() : state() {}
225 virtual ~FutureDAGFunctor(){}