Future<Unit> wangle fixup
[folly.git] / folly / experimental / FutureDAG.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/futures/Future.h>
19 #include <folly/futures/SharedPromise.h>
20
21 namespace folly {
22
23 class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
24  public:
25   static std::shared_ptr<FutureDAG> create() {
26     return std::shared_ptr<FutureDAG>(new FutureDAG());
27   }
28
29   typedef size_t Handle;
30   typedef std::function<Future<Unit>()> FutureFunc;
31
32   Handle add(FutureFunc func, Executor* executor = nullptr) {
33     nodes.emplace_back(std::move(func), executor);
34     return nodes.size() - 1;
35   }
36
37   void dependency(Handle a, Handle b) {
38     nodes[b].dependencies.push_back(a);
39     nodes[a].hasDependents = true;
40   }
41
42   Future<Unit> go() {
43     if (hasCycle()) {
44       return makeFuture<Unit>(std::runtime_error("Cycle in FutureDAG graph"));
45     }
46     std::vector<Handle> rootNodes;
47     std::vector<Handle> leafNodes;
48     for (Handle handle = 0; handle < nodes.size(); handle++) {
49       if (nodes[handle].dependencies.empty()) {
50         rootNodes.push_back(handle);
51       }
52       if (!nodes[handle].hasDependents) {
53         leafNodes.push_back(handle);
54       }
55     }
56
57     auto sinkHandle = add([] { return Future<Unit>(); });
58     for (auto handle : leafNodes) {
59       dependency(handle, sinkHandle);
60     }
61
62     auto sourceHandle = add(nullptr);
63     for (auto handle : rootNodes) {
64       dependency(sourceHandle, handle);
65     }
66
67     for (Handle handle = 0; handle < nodes.size() - 1; handle++) {
68       std::vector<Future<Unit>> dependencies;
69       for (auto depHandle : nodes[handle].dependencies) {
70         dependencies.push_back(nodes[depHandle].promise.getFuture());
71       }
72
73       collect(dependencies)
74         .via(nodes[handle].executor)
75         .then([this, handle] {
76           nodes[handle].func()
77             .then([this, handle] (Try<Unit>&& t) {
78               nodes[handle].promise.setTry(std::move(t));
79             });
80         })
81         .onError([this, handle] (exception_wrapper ew) {
82           nodes[handle].promise.setException(std::move(ew));
83         });
84     }
85
86     nodes[sourceHandle].promise.setValue();
87     auto that = shared_from_this();
88     return nodes[sinkHandle].promise.getFuture().ensure([that]{});
89   }
90
91  private:
92   FutureDAG() = default;
93
94   bool hasCycle() {
95     // Perform a modified topological sort to detect cycles
96     std::vector<std::vector<Handle>> dependencies;
97     for (auto& node : nodes) {
98       dependencies.push_back(node.dependencies);
99     }
100
101     std::vector<size_t> dependents(nodes.size());
102     for (auto& dependencyEdges : dependencies) {
103       for (auto handle : dependencyEdges) {
104         dependents[handle]++;
105       }
106     }
107
108     std::vector<Handle> handles;
109     for (Handle handle = 0; handle < nodes.size(); handle++) {
110       if (!nodes[handle].hasDependents) {
111         handles.push_back(handle);
112       }
113     }
114
115     while (!handles.empty()) {
116       auto handle = handles.back();
117       handles.pop_back();
118       while (!dependencies[handle].empty()) {
119         auto dependency = dependencies[handle].back();
120         dependencies[handle].pop_back();
121         if (--dependents[dependency] == 0) {
122           handles.push_back(dependency);
123         }
124       }
125     }
126
127     for (auto& dependencyEdges : dependencies) {
128       if (!dependencyEdges.empty()) {
129         return true;
130       }
131     }
132
133     return false;
134   }
135
136   struct Node {
137     Node(FutureFunc&& funcArg, Executor* executorArg) :
138       func(std::move(funcArg)), executor(executorArg) {}
139
140     FutureFunc func{nullptr};
141     Executor* executor{nullptr};
142     SharedPromise<Unit> promise;
143     std::vector<Handle> dependencies;
144     bool hasDependents{false};
145     bool visited{false};
146   };
147
148   std::vector<Node> nodes;
149 };
150
151 } // folly