2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
31 Timekeeper* getTimekeeperSingleton();
35 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
36 *this = std::move(other);
40 Future<T>& Future<T>::operator=(Future<T>&& other) {
41 std::swap(core_, other.core_);
46 Future<T>::~Future() {
51 void Future<T>::detach() {
53 core_->detachFuture();
59 void Future<T>::throwIfInvalid() const {
66 void Future<T>::setCallback_(F&& func) {
68 core_->setCallback(std::move(func));
71 // Variant: returns a value
72 // e.g. f.then([](Try<T>&& t){ return t.value(); });
74 template <typename F, typename R, bool isTry, typename... Args>
75 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
76 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
77 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
78 typedef typename R::ReturnsFuture::Inner B;
82 // wrap these so we can move them into the lambda
83 folly::MoveWrapper<Promise<B>> p;
84 folly::MoveWrapper<F> funcm(std::forward<F>(func));
86 // grab the Future now before we lose our handle on the Promise
87 auto f = p->getFuture();
89 /* This is a bit tricky.
91 We can't just close over *this in case this Future gets moved. So we
92 make a new dummy Future. We could figure out something more
93 sophisticated that avoids making a new Future object when it can, as an
94 optimization. But this is correct.
96 core_ can't be moved, it is explicitly disallowed (as is copying). But
97 if there's ever a reason to allow it, this is one place that makes that
98 assumption and would need to be fixed. We use a standard shared pointer
99 for core_ (by copying it in), which means in essence obj holds a shared
100 pointer to itself. But this shouldn't leak because Promise will not
101 outlive the continuation, because Promise will setException() with a
102 broken Promise if it is destructed before completed. We could use a
103 weak pointer but it would have to be converted to a shared pointer when
104 func is executed (because the Future returned by func may possibly
105 persist beyond the callback, if it gets moved), and so it is an
106 optimization to just make it shared from the get-go.
108 We have to move in the Promise and func using the MoveWrapper
109 hack. (func could be copied but it's a big drag on perf).
111 Two subtle but important points about this design. detail::Core has no
112 back pointers to Future or Promise, so if Future or Promise get moved
113 (and they will be moved in performant code) we don't have to do
114 anything fancy. And because we store the continuation in the
115 detail::Core, not in the Future, we can execute the continuation even
116 after the Future has gone out of scope. This is an intentional design
117 decision. It is likely we will want to be able to cancel a continuation
118 in some circumstances, but I think it should be explicit not implicit
119 in the destruction of the Future used to create it.
122 [p, funcm](Try<T>&& t) mutable {
123 if (!isTry && t.hasException()) {
124 p->setException(std::move(t.exception()));
127 return (*funcm)(t.template get<isTry, Args>()...);
135 // Variant: returns a Future
136 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
138 template <typename F, typename R, bool isTry, typename... Args>
139 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
140 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
141 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
142 typedef typename R::ReturnsFuture::Inner B;
146 // wrap these so we can move them into the lambda
147 folly::MoveWrapper<Promise<B>> p;
148 folly::MoveWrapper<F> funcm(std::forward<F>(func));
150 // grab the Future now before we lose our handle on the Promise
151 auto f = p->getFuture();
154 [p, funcm](Try<T>&& t) mutable {
155 if (!isTry && t.hasException()) {
156 p->setException(std::move(t.exception()));
159 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
160 // that didn't throw, now we can steal p
161 f2.setCallback_([p](Try<B>&& b) mutable {
162 p->fulfilTry(std::move(b));
164 } catch (const std::exception& e) {
165 p->setException(exception_wrapper(std::current_exception()));
173 template <typename T>
174 template <typename Caller, typename R, typename... Args>
175 Future<typename isFuture<R>::Inner>
176 Future<T>::then(Caller *instance, R(Caller::*func)(Args...)) {
177 typedef typename std::remove_cv<
178 typename std::remove_reference<
179 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
180 return then([instance, func](Try<T>&& t){
181 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
186 Future<void> Future<T>::then() {
187 return then([] (Try<T>&& t) {});
190 // onError where the callback returns T
193 typename std::enable_if<
194 !detail::Extract<F>::ReturnsFuture::value,
196 Future<T>::onError(F&& func) {
197 typedef typename detail::Extract<F>::FirstArg Exn;
199 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
200 "Return type of onError callback must be T or Future<T>");
203 auto f = p.getFuture();
204 auto pm = folly::makeMoveWrapper(std::move(p));
205 auto funcm = folly::makeMoveWrapper(std::move(func));
206 setCallback_([pm, funcm](Try<T>&& t) mutable {
207 if (!t.template withException<Exn>([&] (Exn& e) {
212 pm->fulfilTry(std::move(t));
219 // onError where the callback returns Future<T>
222 typename std::enable_if<
223 detail::Extract<F>::ReturnsFuture::value,
225 Future<T>::onError(F&& func) {
227 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
228 "Return type of onError callback must be T or Future<T>");
229 typedef typename detail::Extract<F>::FirstArg Exn;
232 auto f = p.getFuture();
233 auto pm = folly::makeMoveWrapper(std::move(p));
234 auto funcm = folly::makeMoveWrapper(std::move(func));
235 setCallback_([pm, funcm](Try<T>&& t) mutable {
236 if (!t.template withException<Exn>([&] (Exn& e) {
238 auto f2 = (*funcm)(e);
239 f2.setCallback_([pm](Try<T>&& t2) mutable {
240 pm->fulfilTry(std::move(t2));
242 } catch (const std::exception& e2) {
243 pm->setException(exception_wrapper(std::current_exception(), e2));
245 pm->setException(exception_wrapper(std::current_exception()));
248 pm->fulfilTry(std::move(t));
256 typename std::add_lvalue_reference<T>::type Future<T>::value() {
259 return core_->getTry().value();
263 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
266 return core_->getTry().value();
270 Try<T>& Future<T>::getTry() {
273 return core_->getTry();
277 template <typename Executor>
278 inline Future<T> Future<T>::via(Executor* executor) && {
282 core_->setExecutor(executor);
284 return std::move(*this);
288 template <typename Executor>
289 inline Future<T> Future<T>::via(Executor* executor) & {
292 MoveWrapper<Promise<T>> p;
293 auto f = p->getFuture();
294 then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
295 return std::move(f).via(executor);
299 bool Future<T>::isReady() const {
301 return core_->ready();
305 void Future<T>::raise(exception_wrapper exception) {
306 core_->raise(std::move(exception));
312 Future<typename std::decay<T>::type> makeFuture(T&& t) {
313 Promise<typename std::decay<T>::type> p;
314 p.setValue(std::forward<T>(t));
315 return p.getFuture();
318 inline // for multiple translation units
319 Future<void> makeFuture() {
322 return p.getFuture();
328 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
329 -> Future<decltype(func())> {
330 Promise<decltype(func())> p;
335 return p.getFuture();
339 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
341 return makeFutureTry(std::move(copy));
345 Future<T> makeFuture(std::exception_ptr const& e) {
348 return p.getFuture();
352 Future<T> makeFuture(exception_wrapper ew) {
354 p.setException(std::move(ew));
355 return p.getFuture();
358 template <class T, class E>
359 typename std::enable_if<std::is_base_of<std::exception, E>::value,
361 makeFuture(E const& e) {
363 p.setException(make_exception_wrapper<E>(e));
364 return p.getFuture();
368 Future<T> makeFuture(Try<T>&& t) {
369 Promise<typename std::decay<T>::type> p;
370 p.fulfilTry(std::move(t));
371 return p.getFuture();
375 inline Future<void> makeFuture(Try<void>&& t) {
376 if (t.hasException()) {
377 return makeFuture<void>(std::move(t.exception()));
384 template <typename Executor>
385 Future<void> via(Executor* executor) {
386 return makeFuture().via(executor);
391 template <typename... Fs>
392 typename detail::VariadicContext<
393 typename std::decay<Fs>::type::value_type...>::type
397 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
398 ctx->total = sizeof...(fs);
399 auto f_saved = ctx->p.getFuture();
400 detail::whenAllVariadicHelper(ctx,
401 std::forward<typename std::decay<Fs>::type>(fs)...);
407 template <class InputIterator>
410 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
411 whenAll(InputIterator first, InputIterator last)
414 typename std::iterator_traits<InputIterator>::value_type::value_type T;
417 return makeFuture(std::vector<Try<T>>());
419 size_t n = std::distance(first, last);
421 auto ctx = new detail::WhenAllContext<T>();
423 ctx->results.resize(n);
425 auto f_saved = ctx->p.getFuture();
427 for (size_t i = 0; first != last; ++first, ++i) {
430 f.setCallback_([ctx, i, n](Try<T>&& t) {
431 ctx->results[i] = std::move(t);
432 if (++ctx->count == n) {
433 ctx->p.setValue(std::move(ctx->results));
442 template <class InputIterator>
447 std::iterator_traits<InputIterator>::value_type::value_type> > >
448 whenAny(InputIterator first, InputIterator last) {
450 typename std::iterator_traits<InputIterator>::value_type::value_type T;
452 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
453 auto f_saved = ctx->p.getFuture();
455 for (size_t i = 0; first != last; first++, i++) {
457 f.setCallback_([i, ctx](Try<T>&& t) {
458 if (!ctx->done.exchange(true)) {
459 ctx->p.setValue(std::make_pair(i, std::move(t)));
468 template <class InputIterator>
469 Future<std::vector<std::pair<size_t, Try<typename
470 std::iterator_traits<InputIterator>::value_type::value_type>>>>
471 whenN(InputIterator first, InputIterator last, size_t n) {
473 std::iterator_traits<InputIterator>::value_type::value_type T;
474 typedef std::vector<std::pair<size_t, Try<T>>> V;
481 auto ctx = std::make_shared<ctx_t>();
484 // for each completed Future, increase count and add to vector, until we
485 // have n completed futures at which point we fulfil our Promise with the
490 it->then([ctx, n, i](Try<T>&& t) {
492 auto c = ++ctx->completed;
494 assert(ctx->v.size() < n);
495 v.push_back(std::make_pair(i, std::move(t)));
497 ctx->p.fulfilTry(Try<V>(std::move(v)));
507 ctx->p.setException(std::runtime_error("Not enough futures"));
510 return ctx->p.getFuture();
515 void getWaitHelper(Future<T>* f) {
516 // If we already have a value do the cheap thing
521 folly::Baton<> baton;
522 f->then([&](Try<T> const&) {
529 Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
530 // TODO make and use variadic whenAny #5877971
532 auto token = std::make_shared<std::atomic<bool>>();
533 folly::Baton<> baton;
535 folly::detail::getTimekeeperSingleton()->after(dur)
536 .then([&,token](Try<void> const& t) {
537 if (token->exchange(true) == false) {
540 p.setException(TimedOut());
541 } catch (std::exception const& e) {
542 p.setException(exception_wrapper(std::current_exception(), e));
544 p.setException(exception_wrapper(std::current_exception()));
550 f->then([&, token](Try<T>&& t) {
551 if (token->exchange(true) == false) {
552 p.fulfilTry(std::move(t));
558 return p.getFuture();
566 // Big assumption here: the then() call above, since it doesn't move out
567 // the value, leaves us with a value to return here. This would be a big
568 // no-no in user code, but I'm invoking internal developer privilege. This
569 // is slightly more efficient (save a move()) especially if there's an
570 // exception (save a throw).
571 return std::move(value());
575 inline void Future<void>::get() {
581 T Future<T>::get(Duration dur) {
582 return std::move(getWaitTimeoutHelper(this, dur).value());
586 inline void Future<void>::get(Duration dur) {
587 getWaitTimeoutHelper(this, dur).value();
591 T Future<T>::getVia(DrivableExecutor* e) {
595 return std::move(value());
599 inline void Future<void>::getVia(DrivableExecutor* e) {
607 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
608 return within(dur, TimedOut(), tk);
613 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
616 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
619 std::atomic<bool> token;
621 auto ctx = std::make_shared<Context>(std::move(e));
624 tk = folly::detail::getTimekeeperSingleton();
628 .then([ctx](Try<void> const& t) {
629 if (ctx->token.exchange(true) == false) {
632 ctx->promise.setException(std::move(ctx->exception));
633 } catch (std::exception const& e2) {
634 ctx->promise.setException(
635 exception_wrapper(std::current_exception(), e2));
637 ctx->promise.setException(
638 exception_wrapper(std::current_exception()));
643 this->then([ctx](Try<T>&& t) {
644 if (ctx->token.exchange(true) == false) {
645 ctx->promise.fulfilTry(std::move(t));
649 return ctx->promise.getFuture();
653 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
654 return whenAll(*this, futures::sleep(dur, tk))
655 .then([](std::tuple<Try<T>, Try<void>> tup) {
656 Try<T>& t = std::get<0>(tup);
657 return makeFuture<T>(std::move(t));
662 Future<T> Future<T>::wait() {
664 auto done = then([&](Try<T> t) {
666 return makeFuture(std::move(t));
669 while (!done.isReady()) {
670 // There's a race here between the return here and the actual finishing of
671 // the future. f is completed, but the setup may not have finished on done
672 // after the baton has posted.
673 std::this_thread::yield();
679 Future<T> Future<T>::wait(Duration dur) {
680 auto baton = std::make_shared<Baton<>>();
681 auto done = then([baton](Try<T> t) {
683 return makeFuture(std::move(t));
685 // Let's preserve the invariant that if we did not timeout (timed_wait returns
686 // true), then the returned Future is complete when it is returned to the
687 // caller. We need to wait out the race for that Future to complete.
688 if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
689 while (!done.isReady()) {
690 std::this_thread::yield();
697 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
705 Future<T> Future<T>::waitVia(DrivableExecutor* e) && {
709 return std::move(*this);
714 // I haven't included a Future<T&> specialization because I don't forsee us
715 // using it, however it is not difficult to add when needed. Refer to
716 // Future<void> for guidance. std::future and boost::future code would also be