2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
31 Timekeeper* getTimekeeperSingleton();
35 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
36 other.core_ = nullptr;
40 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42 other.core_ = nullptr;
49 const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
53 *this = p.getFuture();
59 typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
62 p.setValue(std::forward<F>(val));
63 *this = p.getFuture();
68 typename std::enable_if<std::is_void<F>::value, int>::type>
69 Future<void>::Future() : core_(nullptr) {
72 *this = p.getFuture();
77 Future<T>::~Future() {
82 void Future<T>::detach() {
84 core_->detachFuture();
90 void Future<T>::throwIfInvalid() const {
97 void Future<T>::setCallback_(F&& func) {
99 core_->setCallback(std::move(func));
102 // Variant: returns a value
103 // e.g. f.then([](Try<T>&& t){ return t.value(); });
105 template <typename F, typename R, bool isTry, typename... Args>
106 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
107 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
108 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
109 typedef typename R::ReturnsFuture::Inner B;
113 // wrap these so we can move them into the lambda
114 folly::MoveWrapper<Promise<B>> p;
115 folly::MoveWrapper<F> funcm(std::forward<F>(func));
117 // grab the Future now before we lose our handle on the Promise
118 auto f = p->getFuture();
120 /* This is a bit tricky.
122 We can't just close over *this in case this Future gets moved. So we
123 make a new dummy Future. We could figure out something more
124 sophisticated that avoids making a new Future object when it can, as an
125 optimization. But this is correct.
127 core_ can't be moved, it is explicitly disallowed (as is copying). But
128 if there's ever a reason to allow it, this is one place that makes that
129 assumption and would need to be fixed. We use a standard shared pointer
130 for core_ (by copying it in), which means in essence obj holds a shared
131 pointer to itself. But this shouldn't leak because Promise will not
132 outlive the continuation, because Promise will setException() with a
133 broken Promise if it is destructed before completed. We could use a
134 weak pointer but it would have to be converted to a shared pointer when
135 func is executed (because the Future returned by func may possibly
136 persist beyond the callback, if it gets moved), and so it is an
137 optimization to just make it shared from the get-go.
139 We have to move in the Promise and func using the MoveWrapper
140 hack. (func could be copied but it's a big drag on perf).
142 Two subtle but important points about this design. detail::Core has no
143 back pointers to Future or Promise, so if Future or Promise get moved
144 (and they will be moved in performant code) we don't have to do
145 anything fancy. And because we store the continuation in the
146 detail::Core, not in the Future, we can execute the continuation even
147 after the Future has gone out of scope. This is an intentional design
148 decision. It is likely we will want to be able to cancel a continuation
149 in some circumstances, but I think it should be explicit not implicit
150 in the destruction of the Future used to create it.
153 [p, funcm](Try<T>&& t) mutable {
154 if (!isTry && t.hasException()) {
155 p->setException(std::move(t.exception()));
158 return (*funcm)(t.template get<isTry, Args>()...);
166 // Variant: returns a Future
167 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
169 template <typename F, typename R, bool isTry, typename... Args>
170 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
171 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
172 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
173 typedef typename R::ReturnsFuture::Inner B;
177 // wrap these so we can move them into the lambda
178 folly::MoveWrapper<Promise<B>> p;
179 folly::MoveWrapper<F> funcm(std::forward<F>(func));
181 // grab the Future now before we lose our handle on the Promise
182 auto f = p->getFuture();
185 [p, funcm](Try<T>&& t) mutable {
186 if (!isTry && t.hasException()) {
187 p->setException(std::move(t.exception()));
190 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
191 // that didn't throw, now we can steal p
192 f2.setCallback_([p](Try<B>&& b) mutable {
193 p->fulfilTry(std::move(b));
195 } catch (const std::exception& e) {
196 p->setException(exception_wrapper(std::current_exception(), e));
198 p->setException(exception_wrapper(std::current_exception()));
206 template <typename T>
207 template <typename R, typename Caller, typename... Args>
208 Future<typename isFuture<R>::Inner>
209 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
210 typedef typename std::remove_cv<
211 typename std::remove_reference<
212 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
213 return then([instance, func](Try<T>&& t){
214 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
219 Future<void> Future<T>::then() {
220 return then([] (Try<T>&& t) {});
223 // onError where the callback returns T
226 typename std::enable_if<
227 !detail::Extract<F>::ReturnsFuture::value,
229 Future<T>::onError(F&& func) {
230 typedef typename detail::Extract<F>::FirstArg Exn;
232 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
233 "Return type of onError callback must be T or Future<T>");
236 auto f = p.getFuture();
237 auto pm = folly::makeMoveWrapper(std::move(p));
238 auto funcm = folly::makeMoveWrapper(std::move(func));
239 setCallback_([pm, funcm](Try<T>&& t) mutable {
240 if (!t.template withException<Exn>([&] (Exn& e) {
245 pm->fulfilTry(std::move(t));
252 // onError where the callback returns Future<T>
255 typename std::enable_if<
256 detail::Extract<F>::ReturnsFuture::value,
258 Future<T>::onError(F&& func) {
260 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
261 "Return type of onError callback must be T or Future<T>");
262 typedef typename detail::Extract<F>::FirstArg Exn;
265 auto f = p.getFuture();
266 auto pm = folly::makeMoveWrapper(std::move(p));
267 auto funcm = folly::makeMoveWrapper(std::move(func));
268 setCallback_([pm, funcm](Try<T>&& t) mutable {
269 if (!t.template withException<Exn>([&] (Exn& e) {
271 auto f2 = (*funcm)(e);
272 f2.setCallback_([pm](Try<T>&& t2) mutable {
273 pm->fulfilTry(std::move(t2));
275 } catch (const std::exception& e2) {
276 pm->setException(exception_wrapper(std::current_exception(), e2));
278 pm->setException(exception_wrapper(std::current_exception()));
281 pm->fulfilTry(std::move(t));
290 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
291 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
292 return within(dur, tk)
293 .onError([funcw](TimedOut const&) { return (*funcw)(); });
297 typename std::add_lvalue_reference<T>::type Future<T>::value() {
300 return core_->getTry().value();
304 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
307 return core_->getTry().value();
311 Try<T>& Future<T>::getTry() {
314 return core_->getTry();
318 template <typename Executor>
319 inline Future<T> Future<T>::via(Executor* executor) && {
323 core_->setExecutor(executor);
325 return std::move(*this);
329 template <typename Executor>
330 inline Future<T> Future<T>::via(Executor* executor) & {
333 MoveWrapper<Promise<T>> p;
334 auto f = p->getFuture();
335 then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
336 return std::move(f).via(executor);
340 bool Future<T>::isReady() const {
342 return core_->ready();
346 void Future<T>::raise(exception_wrapper exception) {
347 core_->raise(std::move(exception));
353 Future<typename std::decay<T>::type> makeFuture(T&& t) {
354 Promise<typename std::decay<T>::type> p;
355 p.setValue(std::forward<T>(t));
356 return p.getFuture();
359 inline // for multiple translation units
360 Future<void> makeFuture() {
363 return p.getFuture();
369 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
370 -> Future<decltype(func())> {
371 Promise<decltype(func())> p;
376 return p.getFuture();
380 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
382 return makeFutureTry(std::move(copy));
386 Future<T> makeFuture(std::exception_ptr const& e) {
389 return p.getFuture();
393 Future<T> makeFuture(exception_wrapper ew) {
395 p.setException(std::move(ew));
396 return p.getFuture();
399 template <class T, class E>
400 typename std::enable_if<std::is_base_of<std::exception, E>::value,
402 makeFuture(E const& e) {
404 p.setException(make_exception_wrapper<E>(e));
405 return p.getFuture();
409 Future<T> makeFuture(Try<T>&& t) {
410 Promise<typename std::decay<T>::type> p;
411 p.fulfilTry(std::move(t));
412 return p.getFuture();
416 inline Future<void> makeFuture(Try<void>&& t) {
417 if (t.hasException()) {
418 return makeFuture<void>(std::move(t.exception()));
425 template <typename Executor>
426 Future<void> via(Executor* executor) {
427 return makeFuture().via(executor);
432 template <typename... Fs>
433 typename detail::VariadicContext<
434 typename std::decay<Fs>::type::value_type...>::type
438 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
439 ctx->total = sizeof...(fs);
440 auto f_saved = ctx->p.getFuture();
441 detail::whenAllVariadicHelper(ctx,
442 std::forward<typename std::decay<Fs>::type>(fs)...);
448 template <class InputIterator>
451 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
452 whenAll(InputIterator first, InputIterator last)
455 typename std::iterator_traits<InputIterator>::value_type::value_type T;
458 return makeFuture(std::vector<Try<T>>());
460 size_t n = std::distance(first, last);
462 auto ctx = new detail::WhenAllContext<T>();
464 ctx->results.resize(n);
466 auto f_saved = ctx->p.getFuture();
468 for (size_t i = 0; first != last; ++first, ++i) {
471 f.setCallback_([ctx, i, n](Try<T>&& t) {
472 ctx->results[i] = std::move(t);
473 if (++ctx->count == n) {
474 ctx->p.setValue(std::move(ctx->results));
483 template <class InputIterator>
488 std::iterator_traits<InputIterator>::value_type::value_type> > >
489 whenAny(InputIterator first, InputIterator last) {
491 typename std::iterator_traits<InputIterator>::value_type::value_type T;
493 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
494 auto f_saved = ctx->p.getFuture();
496 for (size_t i = 0; first != last; first++, i++) {
498 f.setCallback_([i, ctx](Try<T>&& t) {
499 if (!ctx->done.exchange(true)) {
500 ctx->p.setValue(std::make_pair(i, std::move(t)));
509 template <class InputIterator>
510 Future<std::vector<std::pair<size_t, Try<typename
511 std::iterator_traits<InputIterator>::value_type::value_type>>>>
512 whenN(InputIterator first, InputIterator last, size_t n) {
514 std::iterator_traits<InputIterator>::value_type::value_type T;
515 typedef std::vector<std::pair<size_t, Try<T>>> V;
522 auto ctx = std::make_shared<ctx_t>();
525 // for each completed Future, increase count and add to vector, until we
526 // have n completed futures at which point we fulfil our Promise with the
531 it->then([ctx, n, i](Try<T>&& t) {
533 auto c = ++ctx->completed;
535 assert(ctx->v.size() < n);
536 v.push_back(std::make_pair(i, std::move(t)));
538 ctx->p.fulfilTry(Try<V>(std::move(v)));
548 ctx->p.setException(std::runtime_error("Not enough futures"));
551 return ctx->p.getFuture();
556 void getWaitHelper(Future<T>* f) {
557 // If we already have a value do the cheap thing
562 folly::Baton<> baton;
563 f->then([&](Try<T> const&) {
570 Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
571 // TODO make and use variadic whenAny #5877971
573 auto token = std::make_shared<std::atomic<bool>>();
574 folly::Baton<> baton;
576 folly::detail::getTimekeeperSingleton()->after(dur)
577 .then([&,token](Try<void> const& t) {
578 if (token->exchange(true) == false) {
579 if (t.hasException()) {
580 p.setException(std::move(t.exception()));
582 p.setException(TimedOut());
588 f->then([&, token](Try<T>&& t) {
589 if (token->exchange(true) == false) {
590 p.fulfilTry(std::move(t));
596 return p.getFuture();
604 // Big assumption here: the then() call above, since it doesn't move out
605 // the value, leaves us with a value to return here. This would be a big
606 // no-no in user code, but I'm invoking internal developer privilege. This
607 // is slightly more efficient (save a move()) especially if there's an
608 // exception (save a throw).
609 return std::move(value());
613 inline void Future<void>::get() {
619 T Future<T>::get(Duration dur) {
620 return std::move(getWaitTimeoutHelper(this, dur).value());
624 inline void Future<void>::get(Duration dur) {
625 getWaitTimeoutHelper(this, dur).value();
629 T Future<T>::getVia(DrivableExecutor* e) {
633 return std::move(value());
637 inline void Future<void>::getVia(DrivableExecutor* e) {
645 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
646 return within(dur, TimedOut(), tk);
651 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
654 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
657 std::atomic<bool> token;
659 auto ctx = std::make_shared<Context>(std::move(e));
662 tk = folly::detail::getTimekeeperSingleton();
666 .then([ctx](Try<void> const& t) {
667 if (ctx->token.exchange(true) == false) {
668 if (t.hasException()) {
669 ctx->promise.setException(std::move(t.exception()));
671 ctx->promise.setException(std::move(ctx->exception));
676 this->then([ctx](Try<T>&& t) {
677 if (ctx->token.exchange(true) == false) {
678 ctx->promise.fulfilTry(std::move(t));
682 return ctx->promise.getFuture();
686 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
687 return whenAll(*this, futures::sleep(dur, tk))
688 .then([](std::tuple<Try<T>, Try<void>> tup) {
689 Try<T>& t = std::get<0>(tup);
690 return makeFuture<T>(std::move(t));
697 void waitImpl(Future<T>& f) {
699 f = f.then([&](Try<T> t) {
701 return makeFuture(std::move(t));
704 // There's a race here between the return here and the actual finishing of
705 // the future. f is completed, but the setup may not have finished on done
706 // after the baton has posted.
707 while (!f.isReady()) {
708 std::this_thread::yield();
713 void waitImpl(Future<T>& f, Duration dur) {
714 auto baton = std::make_shared<Baton<>>();
715 f = f.then([baton](Try<T> t) {
717 return makeFuture(std::move(t));
719 // Let's preserve the invariant that if we did not timeout (timed_wait returns
720 // true), then the returned Future is complete when it is returned to the
721 // caller. We need to wait out the race for that Future to complete.
722 if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
723 while (!f.isReady()) {
724 std::this_thread::yield();
730 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
731 while (!f.isReady()) {
739 Future<T>& Future<T>::wait() & {
740 detail::waitImpl(*this);
745 Future<T>&& Future<T>::wait() && {
746 detail::waitImpl(*this);
747 return std::move(*this);
751 Future<T>& Future<T>::wait(Duration dur) & {
752 detail::waitImpl(*this, dur);
757 Future<T>&& Future<T>::wait(Duration dur) && {
758 detail::waitImpl(*this, dur);
759 return std::move(*this);
763 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
764 detail::waitViaImpl(*this, e);
769 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
770 detail::waitViaImpl(*this, e);
771 return std::move(*this);
778 Future<Z> chainHelper(Future<Z> f) {
782 template <class Z, class F, class Fn, class... Callbacks>
783 Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
784 return chainHelper<Z>(f.then(fn), fns...);
788 template <class A, class Z, class... Callbacks>
789 std::function<Future<Z>(Try<A>)>
790 chain(Callbacks... fns) {
791 MoveWrapper<Promise<A>> pw;
792 MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
793 return [=](Try<A> t) mutable {
794 pw->fulfilTry(std::move(t));
795 return std::move(*fw);
803 // I haven't included a Future<T&> specialization because I don't forsee us
804 // using it, however it is not difficult to add when needed. Refer to
805 // Future<void> for guidance. std::future and boost::future code would also be