/*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
}
typedef size_t Handle;
- typedef std::function<Future<void>()> FutureFunc;
+ typedef std::function<Future<Unit>()> FutureFunc;
Handle add(FutureFunc func, Executor* executor = nullptr) {
nodes.emplace_back(std::move(func), executor);
return nodes.size() - 1;
}
+ void remove(Handle a) {
+ if (a >= nodes.size()) {
+ return;
+ }
+
+ if (nodes[a].hasDependents) {
+ for (auto& node : nodes) {
+ auto& deps = node.dependencies;
+ deps.erase(
+ std::remove(std::begin(deps), std::end(deps), a), std::end(deps));
+ for (Handle& handle : deps) {
+ if (handle > a) {
+ handle--;
+ }
+ }
+ }
+ }
+
+ nodes.erase(nodes.begin() + a);
+ }
+
+ void reset() {
+ // Delete all but source node, and reset dependency properties
+ Handle source_node;
+ std::unordered_set<Handle> memo;
+ for (auto& node : nodes) {
+ for (Handle handle : node.dependencies) {
+ memo.insert(handle);
+ }
+ }
+ for (Handle handle = 0; handle < nodes.size(); handle++) {
+ if (memo.find(handle) == memo.end()) {
+ source_node = handle;
+ }
+ }
+
+ nodes.erase(nodes.begin(), nodes.begin() + source_node);
+ nodes.erase(nodes.begin() + 1, nodes.end());
+ nodes[0].hasDependents = false;
+ nodes[0].dependencies.clear();
+ }
+
void dependency(Handle a, Handle b) {
nodes[b].dependencies.push_back(a);
nodes[a].hasDependents = true;
}
- Future<void> go() {
+ void clean_state(Handle source, Handle sink) {
+ for (auto handle : nodes[sink].dependencies) {
+ nodes[handle].hasDependents = false;
+ }
+ nodes[0].hasDependents = false;
+ remove(source);
+ remove(sink);
+ }
+
+ Future<Unit> go() {
if (hasCycle()) {
- return makeFuture<void>(std::runtime_error("Cycle in FutureDAG graph"));
+ return makeFuture<Unit>(std::runtime_error("Cycle in FutureDAG graph"));
}
std::vector<Handle> rootNodes;
std::vector<Handle> leafNodes;
}
}
- auto sinkHandle = add([] { return Future<void>(); });
+ auto sinkHandle = add([] { return Future<Unit>(); });
for (auto handle : leafNodes) {
dependency(handle, sinkHandle);
}
}
for (Handle handle = 0; handle < nodes.size() - 1; handle++) {
- std::vector<Future<void>> dependencies;
+ std::vector<Future<Unit>> dependencies;
for (auto depHandle : nodes[handle].dependencies) {
dependencies.push_back(nodes[depHandle].promise.getFuture());
}
collect(dependencies)
- .via(nodes[handle].executor)
- .then([this, handle] {
- nodes[handle].func()
- .then([this, handle] (Try<void>&& t) {
+ .via(nodes[handle].executor)
+ .then([this, handle] {
+ nodes[handle].func().then([this, handle](Try<Unit>&& t) {
nodes[handle].promise.setTry(std::move(t));
});
- })
- .onError([this, handle] (exception_wrapper ew) {
- nodes[handle].promise.setException(std::move(ew));
- });
+ })
+ .onError([this, handle](exception_wrapper ew) {
+ nodes[handle].promise.setException(std::move(ew));
+ });
}
nodes[sourceHandle].promise.setValue();
auto that = shared_from_this();
- return nodes[sinkHandle].promise.getFuture().ensure([that]{});
+ return nodes[sinkHandle].promise.getFuture().ensure([that] {}).then(
+ [this, sourceHandle, sinkHandle]() {
+ clean_state(sourceHandle, sinkHandle);
+ });
}
private:
}
struct Node {
- Node(FutureFunc&& funcArg, Executor* executorArg) :
- func(std::move(funcArg)), executor(executorArg) {}
+ Node(FutureFunc&& funcArg, Executor* executorArg)
+ : func(std::move(funcArg)), executor(executorArg) {}
FutureFunc func{nullptr};
Executor* executor{nullptr};
- SharedPromise<void> promise;
+ SharedPromise<Unit> promise;
std::vector<Handle> dependencies;
bool hasDependents{false};
bool visited{false};
std::vector<Node> nodes;
};
+// Polymorphic functor implementation
+template <typename T>
+class FutureDAGFunctor {
+ public:
+ std::shared_ptr<FutureDAG> dag = FutureDAG::create();
+ T state;
+ std::vector<T> dep_states;
+ T result() {
+ return state;
+ }
+ // execReset() runs DAG & clears all nodes except for source
+ void execReset() {
+ this->dag->go().get();
+ this->dag->reset();
+ }
+ void exec() {
+ this->dag->go().get();
+ }
+ virtual void operator()(){}
+ explicit FutureDAGFunctor(T init_val) : state(init_val) {}
+ FutureDAGFunctor() : state() {}
+ virtual ~FutureDAGFunctor(){}
+};
+
} // folly