2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
32 Timekeeper* getTimekeeperSingleton();
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37 other.core_ = nullptr;
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42 std::swap(core_, other.core_);
49 const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
53 *this = p.getFuture();
59 typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
62 p.setValue(std::forward<F>(val));
63 *this = p.getFuture();
68 typename std::enable_if<std::is_void<F>::value, int>::type>
69 Future<void>::Future() : core_(nullptr) {
72 *this = p.getFuture();
77 Future<T>::~Future() {
82 void Future<T>::detach() {
84 core_->detachFuture();
90 void Future<T>::throwIfInvalid() const {
97 void Future<T>::setCallback_(F&& func) {
99 core_->setCallback(std::move(func));
106 typename std::enable_if<isFuture<F>::value,
107 Future<typename isFuture<T>::Inner>>::type
108 Future<T>::unwrap() {
109 return then([](Future<typename isFuture<T>::Inner> internal_future) {
110 return internal_future;
116 // Variant: returns a value
117 // e.g. f.then([](Try<T>&& t){ return t.value(); });
119 template <typename F, typename R, bool isTry, typename... Args>
120 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
121 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
122 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
123 typedef typename R::ReturnsFuture::Inner B;
127 // wrap these so we can move them into the lambda
128 folly::MoveWrapper<Promise<B>> p;
129 folly::MoveWrapper<F> funcm(std::forward<F>(func));
131 // grab the Future now before we lose our handle on the Promise
132 auto f = p->getFuture();
134 f.setExecutor(getExecutor());
137 /* This is a bit tricky.
139 We can't just close over *this in case this Future gets moved. So we
140 make a new dummy Future. We could figure out something more
141 sophisticated that avoids making a new Future object when it can, as an
142 optimization. But this is correct.
144 core_ can't be moved, it is explicitly disallowed (as is copying). But
145 if there's ever a reason to allow it, this is one place that makes that
146 assumption and would need to be fixed. We use a standard shared pointer
147 for core_ (by copying it in), which means in essence obj holds a shared
148 pointer to itself. But this shouldn't leak because Promise will not
149 outlive the continuation, because Promise will setException() with a
150 broken Promise if it is destructed before completed. We could use a
151 weak pointer but it would have to be converted to a shared pointer when
152 func is executed (because the Future returned by func may possibly
153 persist beyond the callback, if it gets moved), and so it is an
154 optimization to just make it shared from the get-go.
156 We have to move in the Promise and func using the MoveWrapper
157 hack. (func could be copied but it's a big drag on perf).
159 Two subtle but important points about this design. detail::Core has no
160 back pointers to Future or Promise, so if Future or Promise get moved
161 (and they will be moved in performant code) we don't have to do
162 anything fancy. And because we store the continuation in the
163 detail::Core, not in the Future, we can execute the continuation even
164 after the Future has gone out of scope. This is an intentional design
165 decision. It is likely we will want to be able to cancel a continuation
166 in some circumstances, but I think it should be explicit not implicit
167 in the destruction of the Future used to create it.
170 [p, funcm](Try<T>&& t) mutable {
171 if (!isTry && t.hasException()) {
172 p->setException(std::move(t.exception()));
175 return (*funcm)(t.template get<isTry, Args>()...);
183 // Variant: returns a Future
184 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
186 template <typename F, typename R, bool isTry, typename... Args>
187 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
188 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
189 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
190 typedef typename R::ReturnsFuture::Inner B;
194 // wrap these so we can move them into the lambda
195 folly::MoveWrapper<Promise<B>> p;
196 folly::MoveWrapper<F> funcm(std::forward<F>(func));
198 // grab the Future now before we lose our handle on the Promise
199 auto f = p->getFuture();
201 f.setExecutor(getExecutor());
205 [p, funcm](Try<T>&& t) mutable {
206 if (!isTry && t.hasException()) {
207 p->setException(std::move(t.exception()));
210 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
211 // that didn't throw, now we can steal p
212 f2.setCallback_([p](Try<B>&& b) mutable {
213 p->setTry(std::move(b));
215 } catch (const std::exception& e) {
216 p->setException(exception_wrapper(std::current_exception(), e));
218 p->setException(exception_wrapper(std::current_exception()));
226 template <typename T>
227 template <typename R, typename Caller, typename... Args>
228 Future<typename isFuture<R>::Inner>
229 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
230 typedef typename std::remove_cv<
231 typename std::remove_reference<
232 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
233 return then([instance, func](Try<T>&& t){
234 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
239 template <class... Args>
240 auto Future<T>::then(Executor* x, Args&&... args)
241 -> decltype(this->then(std::forward<Args>(args)...))
243 auto oldX = getExecutor();
245 return this->then(std::forward<Args>(args)...).via(oldX);
249 Future<void> Future<T>::then() {
250 return then([] (Try<T>&& t) {});
253 // onError where the callback returns T
256 typename std::enable_if<
257 !detail::callableWith<F, exception_wrapper>::value &&
258 !detail::Extract<F>::ReturnsFuture::value,
260 Future<T>::onError(F&& func) {
261 typedef typename detail::Extract<F>::FirstArg Exn;
263 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
264 "Return type of onError callback must be T or Future<T>");
267 auto f = p.getFuture();
268 auto pm = folly::makeMoveWrapper(std::move(p));
269 auto funcm = folly::makeMoveWrapper(std::move(func));
270 setCallback_([pm, funcm](Try<T>&& t) mutable {
271 if (!t.template withException<Exn>([&] (Exn& e) {
276 pm->setTry(std::move(t));
283 // onError where the callback returns Future<T>
286 typename std::enable_if<
287 !detail::callableWith<F, exception_wrapper>::value &&
288 detail::Extract<F>::ReturnsFuture::value,
290 Future<T>::onError(F&& func) {
292 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
293 "Return type of onError callback must be T or Future<T>");
294 typedef typename detail::Extract<F>::FirstArg Exn;
297 auto f = p.getFuture();
298 auto pm = folly::makeMoveWrapper(std::move(p));
299 auto funcm = folly::makeMoveWrapper(std::move(func));
300 setCallback_([pm, funcm](Try<T>&& t) mutable {
301 if (!t.template withException<Exn>([&] (Exn& e) {
303 auto f2 = (*funcm)(e);
304 f2.setCallback_([pm](Try<T>&& t2) mutable {
305 pm->setTry(std::move(t2));
307 } catch (const std::exception& e2) {
308 pm->setException(exception_wrapper(std::current_exception(), e2));
310 pm->setException(exception_wrapper(std::current_exception()));
313 pm->setTry(std::move(t));
322 Future<T> Future<T>::ensure(F func) {
323 MoveWrapper<F> funcw(std::move(func));
324 return this->then([funcw](Try<T>&& t) {
326 return makeFuture(std::move(t));
332 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
333 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
334 return within(dur, tk)
335 .onError([funcw](TimedOut const&) { return (*funcw)(); });
340 typename std::enable_if<
341 detail::callableWith<F, exception_wrapper>::value &&
342 detail::Extract<F>::ReturnsFuture::value,
344 Future<T>::onError(F&& func) {
346 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
347 "Return type of onError callback must be T or Future<T>");
350 auto f = p.getFuture();
351 auto pm = folly::makeMoveWrapper(std::move(p));
352 auto funcm = folly::makeMoveWrapper(std::move(func));
353 setCallback_([pm, funcm](Try<T> t) mutable {
354 if (t.hasException()) {
356 auto f2 = (*funcm)(std::move(t.exception()));
357 f2.setCallback_([pm](Try<T> t2) mutable {
358 pm->setTry(std::move(t2));
360 } catch (const std::exception& e2) {
361 pm->setException(exception_wrapper(std::current_exception(), e2));
363 pm->setException(exception_wrapper(std::current_exception()));
366 pm->setTry(std::move(t));
373 // onError(exception_wrapper) that returns T
376 typename std::enable_if<
377 detail::callableWith<F, exception_wrapper>::value &&
378 !detail::Extract<F>::ReturnsFuture::value,
380 Future<T>::onError(F&& func) {
382 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
383 "Return type of onError callback must be T or Future<T>");
386 auto f = p.getFuture();
387 auto pm = folly::makeMoveWrapper(std::move(p));
388 auto funcm = folly::makeMoveWrapper(std::move(func));
389 setCallback_([pm, funcm](Try<T> t) mutable {
390 if (t.hasException()) {
392 return (*funcm)(std::move(t.exception()));
395 pm->setTry(std::move(t));
403 typename std::add_lvalue_reference<T>::type Future<T>::value() {
406 return core_->getTry().value();
410 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
413 return core_->getTry().value();
417 Try<T>& Future<T>::getTry() {
420 return core_->getTry();
424 Optional<Try<T>> Future<T>::poll() {
426 if (core_->ready()) {
427 o = std::move(core_->getTry());
433 template <typename Executor>
434 inline Future<T> Future<T>::via(Executor* executor) && {
437 setExecutor(executor);
439 return std::move(*this);
443 template <typename Executor>
444 inline Future<T> Future<T>::via(Executor* executor) & {
447 MoveWrapper<Promise<T>> p;
448 auto f = p->getFuture();
449 then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
450 return std::move(f).via(executor);
454 bool Future<T>::isReady() const {
456 return core_->ready();
460 void Future<T>::raise(exception_wrapper exception) {
461 core_->raise(std::move(exception));
467 Future<typename std::decay<T>::type> makeFuture(T&& t) {
468 Promise<typename std::decay<T>::type> p;
469 p.setValue(std::forward<T>(t));
470 return p.getFuture();
473 inline // for multiple translation units
474 Future<void> makeFuture() {
477 return p.getFuture();
483 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
484 -> Future<decltype(func())> {
485 Promise<decltype(func())> p;
490 return p.getFuture();
494 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
496 return makeFutureTry(std::move(copy));
500 Future<T> makeFuture(std::exception_ptr const& e) {
503 return p.getFuture();
507 Future<T> makeFuture(exception_wrapper ew) {
509 p.setException(std::move(ew));
510 return p.getFuture();
513 template <class T, class E>
514 typename std::enable_if<std::is_base_of<std::exception, E>::value,
516 makeFuture(E const& e) {
518 p.setException(make_exception_wrapper<E>(e));
519 return p.getFuture();
523 Future<T> makeFuture(Try<T>&& t) {
524 Promise<typename std::decay<T>::type> p;
525 p.setTry(std::move(t));
526 return p.getFuture();
530 inline Future<void> makeFuture(Try<void>&& t) {
531 if (t.hasException()) {
532 return makeFuture<void>(std::move(t.exception()));
539 template <typename Executor>
540 Future<void> via(Executor* executor) {
541 return makeFuture().via(executor);
546 template <typename... Fs>
547 typename detail::VariadicContext<
548 typename std::decay<Fs>::type::value_type...>::type
549 whenAll(Fs&&... fs) {
551 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
552 ctx->total = sizeof...(fs);
553 auto f_saved = ctx->p.getFuture();
554 detail::whenAllVariadicHelper(ctx,
555 std::forward<typename std::decay<Fs>::type>(fs)...);
561 template <class InputIterator>
564 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
565 whenAll(InputIterator first, InputIterator last) {
567 typename std::iterator_traits<InputIterator>::value_type::value_type T;
570 return makeFuture(std::vector<Try<T>>());
572 size_t n = std::distance(first, last);
574 auto ctx = new detail::WhenAllContext<T>();
576 ctx->results.resize(n);
578 auto f_saved = ctx->p.getFuture();
580 for (size_t i = 0; first != last; ++first, ++i) {
583 f.setCallback_([ctx, i, n](Try<T> t) {
584 ctx->results[i] = std::move(t);
585 if (++ctx->count == n) {
586 ctx->p.setValue(std::move(ctx->results));
597 template <typename T>
598 struct CollectContext {
599 explicit CollectContext(int n) : count(0), threw(false) {}
601 Promise<std::vector<T>> p;
602 std::vector<T> results;
603 std::atomic<size_t> count;
604 std::atomic_bool threw;
606 typedef std::vector<T> result_type;
608 static inline Future<std::vector<T>> makeEmptyFuture() {
609 return makeFuture(std::vector<T>());
612 inline void setValue() {
613 p.setValue(std::move(results));
616 inline void addResult(int i, Try<T>& t) {
617 results[i] = std::move(t.value());
622 struct CollectContext<void> {
623 explicit CollectContext(int n) : count(0), threw(false) {}
625 std::atomic<size_t> count;
626 std::atomic_bool threw;
628 typedef void result_type;
630 static inline Future<void> makeEmptyFuture() {
634 inline void setValue() {
638 inline void addResult(int i, Try<void>& t) {
645 template <class InputIterator>
646 Future<typename detail::CollectContext<
647 typename std::iterator_traits<InputIterator>::value_type::value_type
649 collect(InputIterator first, InputIterator last) {
651 typename std::iterator_traits<InputIterator>::value_type::value_type T;
654 return detail::CollectContext<T>::makeEmptyFuture();
657 size_t n = std::distance(first, last);
658 auto ctx = new detail::CollectContext<T>(n);
659 auto f_saved = ctx->p.getFuture();
661 for (size_t i = 0; first != last; ++first, ++i) {
664 f.setCallback_([ctx, i, n](Try<T> t) {
665 auto c = ++ctx->count;
667 if (t.hasException()) {
668 if (!ctx->threw.exchange(true)) {
669 ctx->p.setException(std::move(t.exception()));
671 } else if (!ctx->threw) {
672 ctx->addResult(i, t);
687 template <class InputIterator>
692 std::iterator_traits<InputIterator>::value_type::value_type> > >
693 whenAny(InputIterator first, InputIterator last) {
695 typename std::iterator_traits<InputIterator>::value_type::value_type T;
697 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
698 auto f_saved = ctx->p.getFuture();
700 for (size_t i = 0; first != last; first++, i++) {
702 f.setCallback_([i, ctx](Try<T>&& t) {
703 if (!ctx->done.exchange(true)) {
704 ctx->p.setValue(std::make_pair(i, std::move(t)));
713 template <class InputIterator>
714 Future<std::vector<std::pair<size_t, Try<typename
715 std::iterator_traits<InputIterator>::value_type::value_type>>>>
716 whenN(InputIterator first, InputIterator last, size_t n) {
718 std::iterator_traits<InputIterator>::value_type::value_type T;
719 typedef std::vector<std::pair<size_t, Try<T>>> V;
726 auto ctx = std::make_shared<ctx_t>();
729 // for each completed Future, increase count and add to vector, until we
730 // have n completed futures at which point we fulfill our Promise with the
735 it->then([ctx, n, i](Try<T>&& t) {
737 auto c = ++ctx->completed;
739 assert(ctx->v.size() < n);
740 v.push_back(std::make_pair(i, std::move(t)));
742 ctx->p.setTry(Try<V>(std::move(v)));
752 ctx->p.setException(std::runtime_error("Not enough futures"));
755 return ctx->p.getFuture();
758 template <class It, class T, class F, class ItT, class Arg>
759 typename std::enable_if<!isFutureResult<F, T, Arg>::value, Future<T>>::type
760 reduce(It first, It last, T initial, F func) {
762 return makeFuture(std::move(initial));
765 typedef isTry<Arg> IsTry;
767 return whenAll(first, last)
768 .then([initial, func](std::vector<Try<ItT>>& vals) mutable {
769 for (auto& val : vals) {
770 initial = func(std::move(initial),
771 // Either return a ItT&& or a Try<ItT>&& depending
772 // on the type of the argument of func.
773 val.template get<IsTry::value, Arg&&>());
779 template <class It, class T, class F, class ItT, class Arg>
780 typename std::enable_if<isFutureResult<F, T, Arg>::value, Future<T>>::type
781 reduce(It first, It last, T initial, F func) {
783 return makeFuture(std::move(initial));
786 typedef isTry<Arg> IsTry;
788 auto f = first->then([initial, func](Try<ItT>& head) mutable {
789 return func(std::move(initial),
790 head.template get<IsTry::value, Arg&&>());
793 for (++first; first != last; ++first) {
794 f = whenAll(f, *first).then([func](std::tuple<Try<T>, Try<ItT>>& t) {
795 return func(std::move(std::get<0>(t).value()),
796 // Either return a ItT&& or a Try<ItT>&& depending
797 // on the type of the argument of func.
798 std::get<1>(t).template get<IsTry::value, Arg&&>());
806 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
807 return within(dur, TimedOut(), tk);
812 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
815 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
818 std::atomic<bool> token;
820 auto ctx = std::make_shared<Context>(std::move(e));
823 tk = folly::detail::getTimekeeperSingleton();
827 .then([ctx](Try<void> const& t) {
828 if (ctx->token.exchange(true) == false) {
829 if (t.hasException()) {
830 ctx->promise.setException(std::move(t.exception()));
832 ctx->promise.setException(std::move(ctx->exception));
837 this->then([ctx](Try<T>&& t) {
838 if (ctx->token.exchange(true) == false) {
839 ctx->promise.setTry(std::move(t));
843 return ctx->promise.getFuture();
847 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
848 return whenAll(*this, futures::sleep(dur, tk))
849 .then([](std::tuple<Try<T>, Try<void>> tup) {
850 Try<T>& t = std::get<0>(tup);
851 return makeFuture<T>(std::move(t));
858 void waitImpl(Future<T>& f) {
859 // short-circuit if there's nothing to do
860 if (f.isReady()) return;
863 f = f.then([&](Try<T> t) {
865 return makeFuture(std::move(t));
869 // There's a race here between the return here and the actual finishing of
870 // the future. f is completed, but the setup may not have finished on done
871 // after the baton has posted.
872 while (!f.isReady()) {
873 std::this_thread::yield();
878 void waitImpl(Future<T>& f, Duration dur) {
879 // short-circuit if there's nothing to do
880 if (f.isReady()) return;
882 auto baton = std::make_shared<Baton<>>();
883 f = f.then([baton](Try<T> t) {
885 return makeFuture(std::move(t));
888 // Let's preserve the invariant that if we did not timeout (timed_wait returns
889 // true), then the returned Future is complete when it is returned to the
890 // caller. We need to wait out the race for that Future to complete.
891 if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
892 while (!f.isReady()) {
893 std::this_thread::yield();
899 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
900 while (!f.isReady()) {
908 Future<T>& Future<T>::wait() & {
909 detail::waitImpl(*this);
914 Future<T>&& Future<T>::wait() && {
915 detail::waitImpl(*this);
916 return std::move(*this);
920 Future<T>& Future<T>::wait(Duration dur) & {
921 detail::waitImpl(*this, dur);
926 Future<T>&& Future<T>::wait(Duration dur) && {
927 detail::waitImpl(*this, dur);
928 return std::move(*this);
932 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
933 detail::waitViaImpl(*this, e);
938 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
939 detail::waitViaImpl(*this, e);
940 return std::move(*this);
945 return std::move(wait().value());
949 inline void Future<void>::get() {
954 T Future<T>::get(Duration dur) {
957 return std::move(value());
964 inline void Future<void>::get(Duration dur) {
974 T Future<T>::getVia(DrivableExecutor* e) {
975 return std::move(waitVia(e).value());
979 inline void Future<void>::getVia(DrivableExecutor* e) {
984 Future<bool> Future<T>::willEqual(Future<T>& f) {
985 return whenAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
986 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
987 return std::get<0>(t).value() == std::get<1>(t).value();
996 Future<T> Future<T>::filter(F predicate) {
997 auto p = folly::makeMoveWrapper(std::move(predicate));
998 return this->then([p](T val) {
999 T const& valConstRef = val;
1000 if (!(*p)(valConstRef)) {
1001 throw PredicateDoesNotObtain();
1010 Future<Z> chainHelper(Future<Z> f) {
1014 template <class Z, class F, class Fn, class... Callbacks>
1015 Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
1016 return chainHelper<Z>(f.then(fn), fns...);
1020 template <class A, class Z, class... Callbacks>
1021 std::function<Future<Z>(Try<A>)>
1022 chain(Callbacks... fns) {
1023 MoveWrapper<Promise<A>> pw;
1024 MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
1025 return [=](Try<A> t) mutable {
1026 pw->setTry(std::move(t));
1027 return std::move(*fw);
1033 } // namespace folly
1035 // I haven't included a Future<T&> specialization because I don't forsee us
1036 // using it, however it is not difficult to add when needed. Refer to
1037 // Future<void> for guidance. std::future and boost::future code would also be