2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
31 Timekeeper* getTimekeeperSingleton();
35 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
36 *this = std::move(other);
40 Future<T>& Future<T>::operator=(Future<T>&& other) {
41 std::swap(core_, other.core_);
48 const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
52 *this = p.getFuture();
58 typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
61 p.setValue(std::forward<F>(val));
62 *this = p.getFuture();
67 typename std::enable_if<std::is_void<F>::value, int>::type>
68 Future<void>::Future() : core_(nullptr) {
71 *this = p.getFuture();
76 Future<T>::~Future() {
81 void Future<T>::detach() {
83 core_->detachFuture();
89 void Future<T>::throwIfInvalid() const {
96 void Future<T>::setCallback_(F&& func) {
98 core_->setCallback(std::move(func));
101 // Variant: returns a value
102 // e.g. f.then([](Try<T>&& t){ return t.value(); });
104 template <typename F, typename R, bool isTry, typename... Args>
105 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
106 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
107 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
108 typedef typename R::ReturnsFuture::Inner B;
112 // wrap these so we can move them into the lambda
113 folly::MoveWrapper<Promise<B>> p;
114 folly::MoveWrapper<F> funcm(std::forward<F>(func));
116 // grab the Future now before we lose our handle on the Promise
117 auto f = p->getFuture();
119 /* This is a bit tricky.
121 We can't just close over *this in case this Future gets moved. So we
122 make a new dummy Future. We could figure out something more
123 sophisticated that avoids making a new Future object when it can, as an
124 optimization. But this is correct.
126 core_ can't be moved, it is explicitly disallowed (as is copying). But
127 if there's ever a reason to allow it, this is one place that makes that
128 assumption and would need to be fixed. We use a standard shared pointer
129 for core_ (by copying it in), which means in essence obj holds a shared
130 pointer to itself. But this shouldn't leak because Promise will not
131 outlive the continuation, because Promise will setException() with a
132 broken Promise if it is destructed before completed. We could use a
133 weak pointer but it would have to be converted to a shared pointer when
134 func is executed (because the Future returned by func may possibly
135 persist beyond the callback, if it gets moved), and so it is an
136 optimization to just make it shared from the get-go.
138 We have to move in the Promise and func using the MoveWrapper
139 hack. (func could be copied but it's a big drag on perf).
141 Two subtle but important points about this design. detail::Core has no
142 back pointers to Future or Promise, so if Future or Promise get moved
143 (and they will be moved in performant code) we don't have to do
144 anything fancy. And because we store the continuation in the
145 detail::Core, not in the Future, we can execute the continuation even
146 after the Future has gone out of scope. This is an intentional design
147 decision. It is likely we will want to be able to cancel a continuation
148 in some circumstances, but I think it should be explicit not implicit
149 in the destruction of the Future used to create it.
152 [p, funcm](Try<T>&& t) mutable {
153 if (!isTry && t.hasException()) {
154 p->setException(std::move(t.exception()));
157 return (*funcm)(t.template get<isTry, Args>()...);
165 // Variant: returns a Future
166 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
168 template <typename F, typename R, bool isTry, typename... Args>
169 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
170 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
171 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
172 typedef typename R::ReturnsFuture::Inner B;
176 // wrap these so we can move them into the lambda
177 folly::MoveWrapper<Promise<B>> p;
178 folly::MoveWrapper<F> funcm(std::forward<F>(func));
180 // grab the Future now before we lose our handle on the Promise
181 auto f = p->getFuture();
184 [p, funcm](Try<T>&& t) mutable {
185 if (!isTry && t.hasException()) {
186 p->setException(std::move(t.exception()));
189 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
190 // that didn't throw, now we can steal p
191 f2.setCallback_([p](Try<B>&& b) mutable {
192 p->fulfilTry(std::move(b));
194 } catch (const std::exception& e) {
195 p->setException(exception_wrapper(std::current_exception()));
203 template <typename T>
204 template <typename Caller, typename R, typename... Args>
205 Future<typename isFuture<R>::Inner>
206 Future<T>::then(Caller *instance, R(Caller::*func)(Args...)) {
207 typedef typename std::remove_cv<
208 typename std::remove_reference<
209 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
210 return then([instance, func](Try<T>&& t){
211 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
216 Future<void> Future<T>::then() {
217 return then([] (Try<T>&& t) {});
220 // onError where the callback returns T
223 typename std::enable_if<
224 !detail::Extract<F>::ReturnsFuture::value,
226 Future<T>::onError(F&& func) {
227 typedef typename detail::Extract<F>::FirstArg Exn;
229 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
230 "Return type of onError callback must be T or Future<T>");
233 auto f = p.getFuture();
234 auto pm = folly::makeMoveWrapper(std::move(p));
235 auto funcm = folly::makeMoveWrapper(std::move(func));
236 setCallback_([pm, funcm](Try<T>&& t) mutable {
237 if (!t.template withException<Exn>([&] (Exn& e) {
242 pm->fulfilTry(std::move(t));
249 // onError where the callback returns Future<T>
252 typename std::enable_if<
253 detail::Extract<F>::ReturnsFuture::value,
255 Future<T>::onError(F&& func) {
257 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
258 "Return type of onError callback must be T or Future<T>");
259 typedef typename detail::Extract<F>::FirstArg Exn;
262 auto f = p.getFuture();
263 auto pm = folly::makeMoveWrapper(std::move(p));
264 auto funcm = folly::makeMoveWrapper(std::move(func));
265 setCallback_([pm, funcm](Try<T>&& t) mutable {
266 if (!t.template withException<Exn>([&] (Exn& e) {
268 auto f2 = (*funcm)(e);
269 f2.setCallback_([pm](Try<T>&& t2) mutable {
270 pm->fulfilTry(std::move(t2));
272 } catch (const std::exception& e2) {
273 pm->setException(exception_wrapper(std::current_exception(), e2));
275 pm->setException(exception_wrapper(std::current_exception()));
278 pm->fulfilTry(std::move(t));
287 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
288 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
289 return within(dur, tk)
290 .onError([funcw](TimedOut const&) { return (*funcw)(); });
294 typename std::add_lvalue_reference<T>::type Future<T>::value() {
297 return core_->getTry().value();
301 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
304 return core_->getTry().value();
308 Try<T>& Future<T>::getTry() {
311 return core_->getTry();
315 template <typename Executor>
316 inline Future<T> Future<T>::via(Executor* executor) && {
320 core_->setExecutor(executor);
322 return std::move(*this);
326 template <typename Executor>
327 inline Future<T> Future<T>::via(Executor* executor) & {
330 MoveWrapper<Promise<T>> p;
331 auto f = p->getFuture();
332 then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
333 return std::move(f).via(executor);
337 bool Future<T>::isReady() const {
339 return core_->ready();
343 void Future<T>::raise(exception_wrapper exception) {
344 core_->raise(std::move(exception));
350 Future<typename std::decay<T>::type> makeFuture(T&& t) {
351 Promise<typename std::decay<T>::type> p;
352 p.setValue(std::forward<T>(t));
353 return p.getFuture();
356 inline // for multiple translation units
357 Future<void> makeFuture() {
360 return p.getFuture();
366 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
367 -> Future<decltype(func())> {
368 Promise<decltype(func())> p;
373 return p.getFuture();
377 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
379 return makeFutureTry(std::move(copy));
383 Future<T> makeFuture(std::exception_ptr const& e) {
386 return p.getFuture();
390 Future<T> makeFuture(exception_wrapper ew) {
392 p.setException(std::move(ew));
393 return p.getFuture();
396 template <class T, class E>
397 typename std::enable_if<std::is_base_of<std::exception, E>::value,
399 makeFuture(E const& e) {
401 p.setException(make_exception_wrapper<E>(e));
402 return p.getFuture();
406 Future<T> makeFuture(Try<T>&& t) {
407 Promise<typename std::decay<T>::type> p;
408 p.fulfilTry(std::move(t));
409 return p.getFuture();
413 inline Future<void> makeFuture(Try<void>&& t) {
414 if (t.hasException()) {
415 return makeFuture<void>(std::move(t.exception()));
422 template <typename Executor>
423 Future<void> via(Executor* executor) {
424 return makeFuture().via(executor);
429 template <typename... Fs>
430 typename detail::VariadicContext<
431 typename std::decay<Fs>::type::value_type...>::type
435 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
436 ctx->total = sizeof...(fs);
437 auto f_saved = ctx->p.getFuture();
438 detail::whenAllVariadicHelper(ctx,
439 std::forward<typename std::decay<Fs>::type>(fs)...);
445 template <class InputIterator>
448 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
449 whenAll(InputIterator first, InputIterator last)
452 typename std::iterator_traits<InputIterator>::value_type::value_type T;
455 return makeFuture(std::vector<Try<T>>());
457 size_t n = std::distance(first, last);
459 auto ctx = new detail::WhenAllContext<T>();
461 ctx->results.resize(n);
463 auto f_saved = ctx->p.getFuture();
465 for (size_t i = 0; first != last; ++first, ++i) {
468 f.setCallback_([ctx, i, n](Try<T>&& t) {
469 ctx->results[i] = std::move(t);
470 if (++ctx->count == n) {
471 ctx->p.setValue(std::move(ctx->results));
480 template <class InputIterator>
485 std::iterator_traits<InputIterator>::value_type::value_type> > >
486 whenAny(InputIterator first, InputIterator last) {
488 typename std::iterator_traits<InputIterator>::value_type::value_type T;
490 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
491 auto f_saved = ctx->p.getFuture();
493 for (size_t i = 0; first != last; first++, i++) {
495 f.setCallback_([i, ctx](Try<T>&& t) {
496 if (!ctx->done.exchange(true)) {
497 ctx->p.setValue(std::make_pair(i, std::move(t)));
506 template <class InputIterator>
507 Future<std::vector<std::pair<size_t, Try<typename
508 std::iterator_traits<InputIterator>::value_type::value_type>>>>
509 whenN(InputIterator first, InputIterator last, size_t n) {
511 std::iterator_traits<InputIterator>::value_type::value_type T;
512 typedef std::vector<std::pair<size_t, Try<T>>> V;
519 auto ctx = std::make_shared<ctx_t>();
522 // for each completed Future, increase count and add to vector, until we
523 // have n completed futures at which point we fulfil our Promise with the
528 it->then([ctx, n, i](Try<T>&& t) {
530 auto c = ++ctx->completed;
532 assert(ctx->v.size() < n);
533 v.push_back(std::make_pair(i, std::move(t)));
535 ctx->p.fulfilTry(Try<V>(std::move(v)));
545 ctx->p.setException(std::runtime_error("Not enough futures"));
548 return ctx->p.getFuture();
553 void getWaitHelper(Future<T>* f) {
554 // If we already have a value do the cheap thing
559 folly::Baton<> baton;
560 f->then([&](Try<T> const&) {
567 Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
568 // TODO make and use variadic whenAny #5877971
570 auto token = std::make_shared<std::atomic<bool>>();
571 folly::Baton<> baton;
573 folly::detail::getTimekeeperSingleton()->after(dur)
574 .then([&,token](Try<void> const& t) {
575 if (token->exchange(true) == false) {
576 if (t.hasException()) {
577 p.setException(std::move(t.exception()));
579 p.setException(TimedOut());
585 f->then([&, token](Try<T>&& t) {
586 if (token->exchange(true) == false) {
587 p.fulfilTry(std::move(t));
593 return p.getFuture();
601 // Big assumption here: the then() call above, since it doesn't move out
602 // the value, leaves us with a value to return here. This would be a big
603 // no-no in user code, but I'm invoking internal developer privilege. This
604 // is slightly more efficient (save a move()) especially if there's an
605 // exception (save a throw).
606 return std::move(value());
610 inline void Future<void>::get() {
616 T Future<T>::get(Duration dur) {
617 return std::move(getWaitTimeoutHelper(this, dur).value());
621 inline void Future<void>::get(Duration dur) {
622 getWaitTimeoutHelper(this, dur).value();
626 T Future<T>::getVia(DrivableExecutor* e) {
630 return std::move(value());
634 inline void Future<void>::getVia(DrivableExecutor* e) {
642 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
643 return within(dur, TimedOut(), tk);
648 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
651 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
654 std::atomic<bool> token;
656 auto ctx = std::make_shared<Context>(std::move(e));
659 tk = folly::detail::getTimekeeperSingleton();
663 .then([ctx](Try<void> const& t) {
664 if (ctx->token.exchange(true) == false) {
665 if (t.hasException()) {
666 ctx->promise.setException(std::move(t.exception()));
668 ctx->promise.setException(std::move(ctx->exception));
673 this->then([ctx](Try<T>&& t) {
674 if (ctx->token.exchange(true) == false) {
675 ctx->promise.fulfilTry(std::move(t));
679 return ctx->promise.getFuture();
683 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
684 return whenAll(*this, futures::sleep(dur, tk))
685 .then([](std::tuple<Try<T>, Try<void>> tup) {
686 Try<T>& t = std::get<0>(tup);
687 return makeFuture<T>(std::move(t));
694 void waitImpl(Future<T>& f) {
696 f = f.then([&](Try<T> t) {
698 return makeFuture(std::move(t));
701 // There's a race here between the return here and the actual finishing of
702 // the future. f is completed, but the setup may not have finished on done
703 // after the baton has posted.
704 while (!f.isReady()) {
705 std::this_thread::yield();
710 void waitImpl(Future<T>& f, Duration dur) {
711 auto baton = std::make_shared<Baton<>>();
712 f = f.then([baton](Try<T> t) {
714 return makeFuture(std::move(t));
716 // Let's preserve the invariant that if we did not timeout (timed_wait returns
717 // true), then the returned Future is complete when it is returned to the
718 // caller. We need to wait out the race for that Future to complete.
719 if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
720 while (!f.isReady()) {
721 std::this_thread::yield();
727 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
728 while (!f.isReady()) {
736 Future<T>& Future<T>::wait() & {
737 detail::waitImpl(*this);
742 Future<T>&& Future<T>::wait() && {
743 detail::waitImpl(*this);
744 return std::move(*this);
748 Future<T>& Future<T>::wait(Duration dur) & {
749 detail::waitImpl(*this, dur);
754 Future<T>&& Future<T>::wait(Duration dur) && {
755 detail::waitImpl(*this, dur);
756 return std::move(*this);
760 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
761 detail::waitViaImpl(*this, e);
766 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
767 detail::waitViaImpl(*this, e);
768 return std::move(*this);
773 // I haven't included a Future<T&> specialization because I don't forsee us
774 // using it, however it is not difficult to add when needed. Refer to
775 // Future<void> for guidance. std::future and boost::future code would also be