2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/wangle/detail/State.h>
23 #include <folly/Baton.h>
25 namespace folly { namespace wangle {
29 static const bool value = false;
33 struct isFuture<Future<T> > {
34 static const bool value = true;
38 Future<T>::Future(Future<T>&& other) noexcept : state_(nullptr) {
39 *this = std::move(other);
43 Future<T>& Future<T>::operator=(Future<T>&& other) {
44 std::swap(state_, other.state_);
49 Future<T>::~Future() {
54 void Future<T>::detach() {
56 state_->detachFuture();
62 void Future<T>::throwIfInvalid() const {
69 void Future<T>::setCallback_(F&& func) {
71 state_->setCallback(std::move(func));
76 typename std::enable_if<
77 !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
78 Future<typename std::result_of<F(Try<T>&&)>::type> >::type
79 Future<T>::then(F&& func) {
80 typedef typename std::result_of<F(Try<T>&&)>::type B;
84 // wrap these so we can move them into the lambda
85 folly::MoveWrapper<Promise<B>> p;
86 folly::MoveWrapper<F> funcm(std::forward<F>(func));
88 // grab the Future now before we lose our handle on the Promise
89 auto f = p->getFuture();
91 /* This is a bit tricky.
93 We can't just close over *this in case this Future gets moved. So we
94 make a new dummy Future. We could figure out something more
95 sophisticated that avoids making a new Future object when it can, as an
96 optimization. But this is correct.
98 state_ can't be moved, it is explicitly disallowed (as is copying). But
99 if there's ever a reason to allow it, this is one place that makes that
100 assumption and would need to be fixed. We use a standard shared pointer
101 for state_ (by copying it in), which means in essence obj holds a shared
102 pointer to itself. But this shouldn't leak because Promise will not
103 outlive the continuation, because Promise will setException() with a
104 broken Promise if it is destructed before completed. We could use a
105 weak pointer but it would have to be converted to a shared pointer when
106 func is executed (because the Future returned by func may possibly
107 persist beyond the callback, if it gets moved), and so it is an
108 optimization to just make it shared from the get-go.
110 We have to move in the Promise and func using the MoveWrapper
111 hack. (func could be copied but it's a big drag on perf).
113 Two subtle but important points about this design. detail::State has no
114 back pointers to Future or Promise, so if Future or Promise get moved
115 (and they will be moved in performant code) we don't have to do
116 anything fancy. And because we store the continuation in the
117 detail::State, not in the Future, we can execute the continuation even
118 after the Future has gone out of scope. This is an intentional design
119 decision. It is likely we will want to be able to cancel a continuation
120 in some circumstances, but I think it should be explicit not implicit
121 in the destruction of the Future used to create it.
124 [p, funcm](Try<T>&& t) mutable {
126 return (*funcm)(std::move(t));
135 typename std::enable_if<
136 isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
137 Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
138 Future<T>::then(F&& func) {
139 typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
143 // wrap these so we can move them into the lambda
144 folly::MoveWrapper<Promise<B>> p;
145 folly::MoveWrapper<F> funcm(std::forward<F>(func));
147 // grab the Future now before we lose our handle on the Promise
148 auto f = p->getFuture();
151 [p, funcm](Try<T>&& t) mutable {
153 auto f2 = (*funcm)(std::move(t));
154 // that didn't throw, now we can steal p
155 f2.setCallback_([p](Try<B>&& b) mutable {
156 p->fulfilTry(std::move(b));
159 p->setException(std::current_exception());
167 Future<void> Future<T>::then() {
168 return then([] (Try<T>&& t) {});
172 typename std::add_lvalue_reference<T>::type Future<T>::value() {
175 return state_->value();
179 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
182 return state_->value();
186 Try<T>& Future<T>::getTry() {
189 return state_->getTry();
193 template <typename Executor>
194 inline Future<T> Future<T>::via(Executor* executor) {
196 MoveWrapper<Promise<T>> promise;
198 auto f = promise->getFuture();
199 // We are obligated to return a cold future.
201 // But we also need to make this one cold for via to at least work some of
202 // the time. (see below)
205 then([=](Try<T>&& t) mutable {
206 MoveWrapper<Try<T>> tw(std::move(t));
207 // There is a race here.
208 // When the promise is fulfilled, and the future is still inactive, when
209 // the future is activated (e.g. by destruction) the callback will happen
210 // in that context, not in the intended context (which has already left
213 // Currently, this will work fine because all the temporaries are
214 // destructed in an order that is compatible with this implementation:
216 // makeFuture().via(x).then(a).then(b);
218 // However, this will not work reliably:
220 // auto f2 = makeFuture().via(x);
221 // f2.then(a).then(b);
223 // Because the first temporary is destructed on the first line, and the
224 // executor is fed. But by the time f2 is destructed, the executor
225 // may have already fulfilled the promise on the other thread.
227 // TODO(#4920689) fix it
228 executor->add([=]() mutable { promise->fulfilTry(std::move(*tw)); });
235 bool Future<T>::isReady() const {
237 return state_->ready();
243 Future<typename std::decay<T>::type> makeFuture(T&& t) {
244 Promise<typename std::decay<T>::type> p;
245 auto f = p.getFuture();
246 p.setValue(std::forward<T>(t));
250 inline // for multiple translation units
251 Future<void> makeFuture() {
253 auto f = p.getFuture();
261 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
262 -> Future<decltype(func())> {
263 Promise<decltype(func())> p;
264 auto f = p.getFuture();
273 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
275 return makeFutureTry(std::move(copy));
279 Future<T> makeFuture(std::exception_ptr const& e) {
281 auto f = p.getFuture();
286 template <class T, class E>
287 typename std::enable_if<std::is_base_of<std::exception, E>::value,
289 makeFuture(E const& e) {
291 auto f = p.getFuture();
292 p.fulfil([&]() -> T { throw e; });
297 Future<T> makeFuture(Try<T>&& t) {
299 return makeFuture<T>(std::move(t.value()));
301 return makeFuture<T>(std::current_exception());
306 inline Future<void> makeFuture(Try<void>&& t) {
311 return makeFuture<void>(std::current_exception());
317 template <typename... Fs>
318 typename detail::VariadicContext<
319 typename std::decay<Fs>::type::value_type...>::type
323 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
324 ctx->total = sizeof...(fs);
325 auto f_saved = ctx->p.getFuture();
326 detail::whenAllVariadicHelper(ctx,
327 std::forward<typename std::decay<Fs>::type>(fs)...);
328 return std::move(f_saved);
333 template <class InputIterator>
336 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
337 whenAll(InputIterator first, InputIterator last)
340 typename std::iterator_traits<InputIterator>::value_type::value_type T;
342 auto n = std::distance(first, last);
344 return makeFuture(std::vector<Try<T>>());
347 auto ctx = new detail::WhenAllContext<T>();
350 ctx->results.resize(ctx->total);
352 auto f_saved = ctx->p.getFuture();
354 for (size_t i = 0; first != last; ++first, ++i) {
356 f.setCallback_([ctx, i](Try<T>&& t) {
357 ctx->results[i] = std::move(t);
358 if (++ctx->count == ctx->total) {
359 ctx->p.setValue(std::move(ctx->results));
365 return std::move(f_saved);
368 template <class InputIterator>
373 std::iterator_traits<InputIterator>::value_type::value_type> > >
374 whenAny(InputIterator first, InputIterator last) {
376 typename std::iterator_traits<InputIterator>::value_type::value_type T;
378 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
379 auto f_saved = ctx->p.getFuture();
381 for (size_t i = 0; first != last; first++, i++) {
383 f.setCallback_([i, ctx](Try<T>&& t) {
384 if (!ctx->done.exchange(true)) {
385 ctx->p.setValue(std::make_pair(i, std::move(t)));
391 return std::move(f_saved);
394 template <class InputIterator>
395 Future<std::vector<std::pair<size_t, Try<typename
396 std::iterator_traits<InputIterator>::value_type::value_type>>>>
397 whenN(InputIterator first, InputIterator last, size_t n) {
399 std::iterator_traits<InputIterator>::value_type::value_type T;
400 typedef std::vector<std::pair<size_t, Try<T>>> V;
407 auto ctx = std::make_shared<ctx_t>();
410 // for each completed Future, increase count and add to vector, until we
411 // have n completed futures at which point we fulfil our Promise with the
416 it->then([ctx, n, i](Try<T>&& t) {
418 auto c = ++ctx->completed;
420 assert(ctx->v.size() < n);
421 v.push_back(std::make_pair(i, std::move(t)));
423 ctx->p.fulfilTry(Try<V>(std::move(v)));
433 ctx->p.setException(std::runtime_error("Not enough futures"));
436 return ctx->p.getFuture();
439 template <typename T>
441 waitWithSemaphore(Future<T>&& f) {
443 auto done = f.then([&](Try<T> &&t) {
445 return std::move(t.value());
448 while (!done.isReady()) {
449 // There's a race here between the return here and the actual finishing of
450 // the future. f is completed, but the setup may not have finished on done
451 // after the baton has posted.
452 std::this_thread::yield();
458 inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
460 auto done = f.then([&](Try<void> &&t) {
465 while (!done.isReady()) {
466 // There's a race here between the return here and the actual finishing of
467 // the future. f is completed, but the setup may not have finished on done
468 // after the baton has posted.
469 std::this_thread::yield();
474 template <typename T, class Duration>
476 waitWithSemaphore(Future<T>&& f, Duration timeout) {
477 auto baton = std::make_shared<Baton<>>();
478 auto done = f.then([baton](Try<T> &&t) {
480 return std::move(t.value());
482 baton->timed_wait(std::chrono::system_clock::now() + timeout);
486 template <class Duration>
488 waitWithSemaphore(Future<void>&& f, Duration timeout) {
489 auto baton = std::make_shared<Baton<>>();
490 auto done = f.then([baton](Try<void> &&t) {
494 baton->timed_wait(std::chrono::system_clock::now() + timeout);
500 // I haven't included a Future<T&> specialization because I don't forsee us
501 // using it, however it is not difficult to add when needed. Refer to
502 // Future<void> for guidance. std::future and boost::future code would also be