Consistency in namespace-closing comments
[folly.git] / folly / experimental / FutureDAG.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/futures/Future.h>
19 #include <folly/futures/SharedPromise.h>
20
21 namespace folly {
22
23 class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
24  public:
25   static std::shared_ptr<FutureDAG> create() {
26     return std::shared_ptr<FutureDAG>(new FutureDAG());
27   }
28
29   typedef size_t Handle;
30   typedef std::function<Future<Unit>()> FutureFunc;
31
32   Handle add(FutureFunc func, Executor* executor = nullptr) {
33     nodes.emplace_back(std::move(func), executor);
34     return nodes.size() - 1;
35   }
36
37   void remove(Handle a) {
38     if (a >= nodes.size()) {
39       return;
40     }
41
42     if (nodes[a].hasDependents) {
43       for (auto& node : nodes) {
44         auto& deps = node.dependencies;
45         deps.erase(
46             std::remove(std::begin(deps), std::end(deps), a), std::end(deps));
47         for (Handle& handle : deps) {
48           if (handle > a) {
49             handle--;
50           }
51         }
52       }
53     }
54
55     nodes.erase(nodes.begin() + a);
56   }
57
58   void reset() {
59     // Delete all but source node, and reset dependency properties
60     Handle source_node;
61     std::unordered_set<Handle> memo;
62     for (auto& node : nodes) {
63       for (Handle handle : node.dependencies) {
64         memo.insert(handle);
65       }
66     }
67     for (Handle handle = 0; handle < nodes.size(); handle++) {
68       if (memo.find(handle) == memo.end()) {
69         source_node = handle;
70       }
71     }
72
73     nodes.erase(nodes.begin(), nodes.begin() + source_node);
74     nodes.erase(nodes.begin() + 1, nodes.end());
75     nodes[0].hasDependents = false;
76     nodes[0].dependencies.clear();
77   }
78
79   void dependency(Handle a, Handle b) {
80     nodes[b].dependencies.push_back(a);
81     nodes[a].hasDependents = true;
82   }
83
84   void clean_state(Handle source, Handle sink) {
85     for (auto handle : nodes[sink].dependencies) {
86       nodes[handle].hasDependents = false;
87     }
88     nodes[0].hasDependents = false;
89     remove(source);
90     remove(sink);
91   }
92
93   Future<Unit> go() {
94     if (hasCycle()) {
95       return makeFuture<Unit>(std::runtime_error("Cycle in FutureDAG graph"));
96     }
97     std::vector<Handle> rootNodes;
98     std::vector<Handle> leafNodes;
99     for (Handle handle = 0; handle < nodes.size(); handle++) {
100       if (nodes[handle].dependencies.empty()) {
101         rootNodes.push_back(handle);
102       }
103       if (!nodes[handle].hasDependents) {
104         leafNodes.push_back(handle);
105       }
106     }
107
108     auto sinkHandle = add([] { return Future<Unit>(); });
109     for (auto handle : leafNodes) {
110       dependency(handle, sinkHandle);
111     }
112
113     auto sourceHandle = add(nullptr);
114     for (auto handle : rootNodes) {
115       dependency(sourceHandle, handle);
116     }
117
118     for (Handle handle = 0; handle < nodes.size() - 1; handle++) {
119       std::vector<Future<Unit>> dependencies;
120       for (auto depHandle : nodes[handle].dependencies) {
121         dependencies.push_back(nodes[depHandle].promise.getFuture());
122       }
123
124       collect(dependencies)
125           .via(nodes[handle].executor)
126           .then([this, handle] {
127             nodes[handle].func().then([this, handle](Try<Unit>&& t) {
128               nodes[handle].promise.setTry(std::move(t));
129             });
130           })
131           .onError([this, handle](exception_wrapper ew) {
132             nodes[handle].promise.setException(std::move(ew));
133           });
134     }
135
136     nodes[sourceHandle].promise.setValue();
137     auto that = shared_from_this();
138     return nodes[sinkHandle].promise.getFuture().ensure([that] {}).then(
139         [this, sourceHandle, sinkHandle]() {
140           clean_state(sourceHandle, sinkHandle);
141         });
142   }
143
144  private:
145   FutureDAG() = default;
146
147   bool hasCycle() {
148     // Perform a modified topological sort to detect cycles
149     std::vector<std::vector<Handle>> dependencies;
150     for (auto& node : nodes) {
151       dependencies.push_back(node.dependencies);
152     }
153
154     std::vector<size_t> dependents(nodes.size());
155     for (auto& dependencyEdges : dependencies) {
156       for (auto handle : dependencyEdges) {
157         dependents[handle]++;
158       }
159     }
160
161     std::vector<Handle> handles;
162     for (Handle handle = 0; handle < nodes.size(); handle++) {
163       if (!nodes[handle].hasDependents) {
164         handles.push_back(handle);
165       }
166     }
167
168     while (!handles.empty()) {
169       auto handle = handles.back();
170       handles.pop_back();
171       while (!dependencies[handle].empty()) {
172         auto dependency = dependencies[handle].back();
173         dependencies[handle].pop_back();
174         if (--dependents[dependency] == 0) {
175           handles.push_back(dependency);
176         }
177       }
178     }
179
180     for (auto& dependencyEdges : dependencies) {
181       if (!dependencyEdges.empty()) {
182         return true;
183       }
184     }
185
186     return false;
187   }
188
189   struct Node {
190     Node(FutureFunc&& funcArg, Executor* executorArg)
191         : func(std::move(funcArg)), executor(executorArg) {}
192
193     FutureFunc func{nullptr};
194     Executor* executor{nullptr};
195     SharedPromise<Unit> promise;
196     std::vector<Handle> dependencies;
197     bool hasDependents{false};
198     bool visited{false};
199   };
200
201   std::vector<Node> nodes;
202 };
203
204 // Polymorphic functor implementation
205 template <typename T>
206 class FutureDAGFunctor {
207  public:
208   std::shared_ptr<FutureDAG> dag = FutureDAG::create();
209   T state;
210   std::vector<T> dep_states;
211   T result() {
212     return state;
213   }
214   // execReset() runs DAG & clears all nodes except for source
215   void execReset() {
216     this->dag->go().get();
217     this->dag->reset();
218   }
219   void exec() {
220     this->dag->go().get();
221   }
222   virtual void operator()(){}
223   explicit FutureDAGFunctor(T init_val) : state(init_val) {}
224   FutureDAGFunctor() : state() {}
225   virtual ~FutureDAGFunctor(){}
226 };
227
228 } // namespace folly