Folly::FutureDAG <-> Gossit
[folly.git] / folly / experimental / FutureDAG.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/futures/Future.h>
19 #include <folly/futures/SharedPromise.h>
20
21 namespace folly {
22
23 class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
24  public:
25   static std::shared_ptr<FutureDAG> create() {
26     return std::shared_ptr<FutureDAG>(new FutureDAG());
27   }
28
29   typedef size_t Handle;
30   typedef std::function<Future<Unit>()> FutureFunc;
31
32   Handle add(FutureFunc func, Executor* executor = nullptr) {
33     nodes.emplace_back(std::move(func), executor);
34     return nodes.size() - 1;
35   }
36
37   void remove(Handle a) {
38     if (nodes.size() > a && nodes[a].hasDependents) {
39       for (auto& node : nodes) {
40         auto& deps = node.dependencies;
41         deps.erase(
42             std::remove(std::begin(deps), std::end(deps), a), std::end(deps));
43         for (Handle& handle : deps) {
44           if (handle > a) {
45             handle--;
46           }
47         }
48       }
49     }
50     nodes.erase(nodes.begin() + a);
51   }
52
53   void reset() {
54     // Delete all but source node, and reset dependency properties
55     Handle source_node;
56     std::unordered_set<Handle> memo;
57     for (auto& node : nodes) {
58       for (Handle handle : node.dependencies) {
59         memo.insert(handle);
60       }
61     }
62     for (Handle handle = 0; handle < nodes.size(); handle++) {
63       if (memo.find(handle) == memo.end()) {
64         source_node = handle;
65       }
66     }
67
68     // Faster to just create a new vector with the element in it?
69     nodes.erase(nodes.begin(), nodes.begin() + source_node);
70     nodes.erase(nodes.begin() + 1, nodes.end());
71     nodes[0].hasDependents = false;
72     nodes[0].dependencies.clear();
73   }
74
75   void dependency(Handle a, Handle b) {
76     nodes[b].dependencies.push_back(a);
77     nodes[a].hasDependents = true;
78   }
79
80   void clean_state(Handle source, Handle sink) {
81     for (auto handle : nodes[sink].dependencies) {
82       nodes[handle].hasDependents = false;
83     }
84     nodes[0].hasDependents = false;
85     remove(source);
86     remove(sink);
87   }
88
89   Future<Unit> go() {
90     if (hasCycle()) {
91       return makeFuture<Unit>(std::runtime_error("Cycle in FutureDAG graph"));
92     }
93     std::vector<Handle> rootNodes;
94     std::vector<Handle> leafNodes;
95     for (Handle handle = 0; handle < nodes.size(); handle++) {
96       if (nodes[handle].dependencies.empty()) {
97         rootNodes.push_back(handle);
98       }
99       if (!nodes[handle].hasDependents) {
100         leafNodes.push_back(handle);
101       }
102     }
103
104     auto sinkHandle = add([] { return Future<Unit>(); });
105     for (auto handle : leafNodes) {
106       dependency(handle, sinkHandle);
107     }
108
109     auto sourceHandle = add(nullptr);
110     for (auto handle : rootNodes) {
111       dependency(sourceHandle, handle);
112     }
113
114     for (Handle handle = 0; handle < nodes.size() - 1; handle++) {
115       std::vector<Future<Unit>> dependencies;
116       for (auto depHandle : nodes[handle].dependencies) {
117         dependencies.push_back(nodes[depHandle].promise.getFuture());
118       }
119
120       collect(dependencies)
121           .via(nodes[handle].executor)
122           .then([this, handle] {
123             nodes[handle].func().then([this, handle](Try<Unit>&& t) {
124               nodes[handle].promise.setTry(std::move(t));
125             });
126           })
127           .onError([this, handle](exception_wrapper ew) {
128             nodes[handle].promise.setException(std::move(ew));
129           });
130     }
131
132     nodes[sourceHandle].promise.setValue();
133     auto that = shared_from_this();
134     return nodes[sinkHandle].promise.getFuture().ensure([that] {}).then(
135         [this, sourceHandle, sinkHandle]() {
136           clean_state(sourceHandle, sinkHandle);
137         });
138   }
139
140  private:
141   FutureDAG() = default;
142
143   bool hasCycle() {
144     // Perform a modified topological sort to detect cycles
145     std::vector<std::vector<Handle>> dependencies;
146     for (auto& node : nodes) {
147       dependencies.push_back(node.dependencies);
148     }
149
150     std::vector<size_t> dependents(nodes.size());
151     for (auto& dependencyEdges : dependencies) {
152       for (auto handle : dependencyEdges) {
153         dependents[handle]++;
154       }
155     }
156
157     std::vector<Handle> handles;
158     for (Handle handle = 0; handle < nodes.size(); handle++) {
159       if (!nodes[handle].hasDependents) {
160         handles.push_back(handle);
161       }
162     }
163
164     while (!handles.empty()) {
165       auto handle = handles.back();
166       handles.pop_back();
167       while (!dependencies[handle].empty()) {
168         auto dependency = dependencies[handle].back();
169         dependencies[handle].pop_back();
170         if (--dependents[dependency] == 0) {
171           handles.push_back(dependency);
172         }
173       }
174     }
175
176     for (auto& dependencyEdges : dependencies) {
177       if (!dependencyEdges.empty()) {
178         return true;
179       }
180     }
181
182     return false;
183   }
184
185   struct Node {
186     Node(FutureFunc&& funcArg, Executor* executorArg)
187         : func(std::move(funcArg)), executor(executorArg) {}
188
189     FutureFunc func{nullptr};
190     Executor* executor{nullptr};
191     SharedPromise<Unit> promise;
192     std::vector<Handle> dependencies;
193     bool hasDependents{false};
194     bool visited{false};
195   };
196
197   std::vector<Node> nodes;
198 };
199
200 } // folly