2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
32 Timekeeper* getTimekeeperSingleton();
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37 other.core_ = nullptr;
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42 std::swap(core_, other.core_);
48 Future<T>::Future(T2&& val) : core_(nullptr) {
50 p.setValue(std::forward<T2>(val));
51 *this = p.getFuture();
56 typename std::enable_if<std::is_void<F>::value, int>::type>
57 Future<void>::Future() : core_(nullptr) {
60 *this = p.getFuture();
65 Future<T>::~Future() {
70 void Future<T>::detach() {
72 core_->detachFuture();
78 void Future<T>::throwIfInvalid() const {
85 void Future<T>::setCallback_(F&& func) {
87 core_->setCallback(std::move(func));
94 typename std::enable_if<isFuture<F>::value,
95 Future<typename isFuture<T>::Inner>>::type
97 return then([](Future<typename isFuture<T>::Inner> internal_future) {
98 return internal_future;
104 // Variant: returns a value
105 // e.g. f.then([](Try<T>&& t){ return t.value(); });
107 template <typename F, typename R, bool isTry, typename... Args>
108 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
109 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
110 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
111 typedef typename R::ReturnsFuture::Inner B;
115 // wrap these so we can move them into the lambda
116 folly::MoveWrapper<Promise<B>> p;
117 folly::MoveWrapper<F> funcm(std::forward<F>(func));
119 // grab the Future now before we lose our handle on the Promise
120 auto f = p->getFuture();
122 f.setExecutor(getExecutor());
125 /* This is a bit tricky.
127 We can't just close over *this in case this Future gets moved. So we
128 make a new dummy Future. We could figure out something more
129 sophisticated that avoids making a new Future object when it can, as an
130 optimization. But this is correct.
132 core_ can't be moved, it is explicitly disallowed (as is copying). But
133 if there's ever a reason to allow it, this is one place that makes that
134 assumption and would need to be fixed. We use a standard shared pointer
135 for core_ (by copying it in), which means in essence obj holds a shared
136 pointer to itself. But this shouldn't leak because Promise will not
137 outlive the continuation, because Promise will setException() with a
138 broken Promise if it is destructed before completed. We could use a
139 weak pointer but it would have to be converted to a shared pointer when
140 func is executed (because the Future returned by func may possibly
141 persist beyond the callback, if it gets moved), and so it is an
142 optimization to just make it shared from the get-go.
144 We have to move in the Promise and func using the MoveWrapper
145 hack. (func could be copied but it's a big drag on perf).
147 Two subtle but important points about this design. detail::Core has no
148 back pointers to Future or Promise, so if Future or Promise get moved
149 (and they will be moved in performant code) we don't have to do
150 anything fancy. And because we store the continuation in the
151 detail::Core, not in the Future, we can execute the continuation even
152 after the Future has gone out of scope. This is an intentional design
153 decision. It is likely we will want to be able to cancel a continuation
154 in some circumstances, but I think it should be explicit not implicit
155 in the destruction of the Future used to create it.
158 [p, funcm](Try<T>&& t) mutable {
159 if (!isTry && t.hasException()) {
160 p->setException(std::move(t.exception()));
163 return (*funcm)(t.template get<isTry, Args>()...);
171 // Variant: returns a Future
172 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
174 template <typename F, typename R, bool isTry, typename... Args>
175 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
176 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
177 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
178 typedef typename R::ReturnsFuture::Inner B;
182 // wrap these so we can move them into the lambda
183 folly::MoveWrapper<Promise<B>> p;
184 folly::MoveWrapper<F> funcm(std::forward<F>(func));
186 // grab the Future now before we lose our handle on the Promise
187 auto f = p->getFuture();
189 f.setExecutor(getExecutor());
193 [p, funcm](Try<T>&& t) mutable {
194 if (!isTry && t.hasException()) {
195 p->setException(std::move(t.exception()));
198 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
199 // that didn't throw, now we can steal p
200 f2.setCallback_([p](Try<B>&& b) mutable {
201 p->setTry(std::move(b));
203 } catch (const std::exception& e) {
204 p->setException(exception_wrapper(std::current_exception(), e));
206 p->setException(exception_wrapper(std::current_exception()));
214 template <typename T>
215 template <typename R, typename Caller, typename... Args>
216 Future<typename isFuture<R>::Inner>
217 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
218 typedef typename std::remove_cv<
219 typename std::remove_reference<
220 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
221 return then([instance, func](Try<T>&& t){
222 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
229 template <class... Args>
230 auto Future<T>::then(Executor* x, Args&&... args)
231 -> decltype(this->then(std::forward<Args>(args)...))
233 auto oldX = getExecutor();
235 return this->then(std::forward<Args>(args)...).via(oldX);
240 Future<void> Future<T>::then() {
241 return then([] (Try<T>&& t) {});
244 // onError where the callback returns T
247 typename std::enable_if<
248 !detail::callableWith<F, exception_wrapper>::value &&
249 !detail::Extract<F>::ReturnsFuture::value,
251 Future<T>::onError(F&& func) {
252 typedef typename detail::Extract<F>::FirstArg Exn;
254 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
255 "Return type of onError callback must be T or Future<T>");
258 auto f = p.getFuture();
259 auto pm = folly::makeMoveWrapper(std::move(p));
260 auto funcm = folly::makeMoveWrapper(std::move(func));
261 setCallback_([pm, funcm](Try<T>&& t) mutable {
262 if (!t.template withException<Exn>([&] (Exn& e) {
267 pm->setTry(std::move(t));
274 // onError where the callback returns Future<T>
277 typename std::enable_if<
278 !detail::callableWith<F, exception_wrapper>::value &&
279 detail::Extract<F>::ReturnsFuture::value,
281 Future<T>::onError(F&& func) {
283 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
284 "Return type of onError callback must be T or Future<T>");
285 typedef typename detail::Extract<F>::FirstArg Exn;
288 auto f = p.getFuture();
289 auto pm = folly::makeMoveWrapper(std::move(p));
290 auto funcm = folly::makeMoveWrapper(std::move(func));
291 setCallback_([pm, funcm](Try<T>&& t) mutable {
292 if (!t.template withException<Exn>([&] (Exn& e) {
294 auto f2 = (*funcm)(e);
295 f2.setCallback_([pm](Try<T>&& t2) mutable {
296 pm->setTry(std::move(t2));
298 } catch (const std::exception& e2) {
299 pm->setException(exception_wrapper(std::current_exception(), e2));
301 pm->setException(exception_wrapper(std::current_exception()));
304 pm->setTry(std::move(t));
313 Future<T> Future<T>::ensure(F func) {
314 MoveWrapper<F> funcw(std::move(func));
315 return this->then([funcw](Try<T>&& t) {
317 return makeFuture(std::move(t));
323 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
324 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
325 return within(dur, tk)
326 .onError([funcw](TimedOut const&) { return (*funcw)(); });
331 typename std::enable_if<
332 detail::callableWith<F, exception_wrapper>::value &&
333 detail::Extract<F>::ReturnsFuture::value,
335 Future<T>::onError(F&& func) {
337 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
338 "Return type of onError callback must be T or Future<T>");
341 auto f = p.getFuture();
342 auto pm = folly::makeMoveWrapper(std::move(p));
343 auto funcm = folly::makeMoveWrapper(std::move(func));
344 setCallback_([pm, funcm](Try<T> t) mutable {
345 if (t.hasException()) {
347 auto f2 = (*funcm)(std::move(t.exception()));
348 f2.setCallback_([pm](Try<T> t2) mutable {
349 pm->setTry(std::move(t2));
351 } catch (const std::exception& e2) {
352 pm->setException(exception_wrapper(std::current_exception(), e2));
354 pm->setException(exception_wrapper(std::current_exception()));
357 pm->setTry(std::move(t));
364 // onError(exception_wrapper) that returns T
367 typename std::enable_if<
368 detail::callableWith<F, exception_wrapper>::value &&
369 !detail::Extract<F>::ReturnsFuture::value,
371 Future<T>::onError(F&& func) {
373 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
374 "Return type of onError callback must be T or Future<T>");
377 auto f = p.getFuture();
378 auto pm = folly::makeMoveWrapper(std::move(p));
379 auto funcm = folly::makeMoveWrapper(std::move(func));
380 setCallback_([pm, funcm](Try<T> t) mutable {
381 if (t.hasException()) {
383 return (*funcm)(std::move(t.exception()));
386 pm->setTry(std::move(t));
394 typename std::add_lvalue_reference<T>::type Future<T>::value() {
397 return core_->getTry().value();
401 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
404 return core_->getTry().value();
408 Try<T>& Future<T>::getTry() {
411 return core_->getTry();
415 Optional<Try<T>> Future<T>::poll() {
417 if (core_->ready()) {
418 o = std::move(core_->getTry());
424 template <typename Executor>
425 inline Future<T> Future<T>::via(Executor* executor) && {
428 setExecutor(executor);
430 return std::move(*this);
434 template <typename Executor>
435 inline Future<T> Future<T>::via(Executor* executor) & {
438 MoveWrapper<Promise<T>> p;
439 auto f = p->getFuture();
440 then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
441 return std::move(f).via(executor);
445 bool Future<T>::isReady() const {
447 return core_->ready();
451 void Future<T>::raise(exception_wrapper exception) {
452 core_->raise(std::move(exception));
458 Future<typename std::decay<T>::type> makeFuture(T&& t) {
459 Promise<typename std::decay<T>::type> p;
460 p.setValue(std::forward<T>(t));
461 return p.getFuture();
464 inline // for multiple translation units
465 Future<void> makeFuture() {
468 return p.getFuture();
474 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
475 -> Future<decltype(func())> {
476 Promise<decltype(func())> p;
481 return p.getFuture();
485 auto makeFutureWith(F const& func) -> Future<decltype(func())> {
487 return makeFutureWith(std::move(copy));
491 Future<T> makeFuture(std::exception_ptr const& e) {
494 return p.getFuture();
498 Future<T> makeFuture(exception_wrapper ew) {
500 p.setException(std::move(ew));
501 return p.getFuture();
504 template <class T, class E>
505 typename std::enable_if<std::is_base_of<std::exception, E>::value,
507 makeFuture(E const& e) {
509 p.setException(make_exception_wrapper<E>(e));
510 return p.getFuture();
514 Future<T> makeFuture(Try<T>&& t) {
515 Promise<typename std::decay<T>::type> p;
516 p.setTry(std::move(t));
517 return p.getFuture();
521 inline Future<void> makeFuture(Try<void>&& t) {
522 if (t.hasException()) {
523 return makeFuture<void>(std::move(t.exception()));
530 template <typename Executor>
531 Future<void> via(Executor* executor) {
532 return makeFuture().via(executor);
537 template <typename... Fs>
538 typename detail::VariadicContext<
539 typename std::decay<Fs>::type::value_type...>::type
540 collectAll(Fs&&... fs) {
542 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
543 ctx->total = sizeof...(fs);
544 auto f_saved = ctx->p.getFuture();
545 detail::collectAllVariadicHelper(ctx,
546 std::forward<typename std::decay<Fs>::type>(fs)...);
552 template <class InputIterator>
555 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
556 collectAll(InputIterator first, InputIterator last) {
558 typename std::iterator_traits<InputIterator>::value_type::value_type T;
561 return makeFuture(std::vector<Try<T>>());
563 size_t n = std::distance(first, last);
565 auto ctx = new detail::WhenAllContext<T>();
567 ctx->results.resize(n);
569 auto f_saved = ctx->p.getFuture();
571 for (size_t i = 0; first != last; ++first, ++i) {
574 f.setCallback_([ctx, i, n](Try<T> t) {
575 ctx->results[i] = std::move(t);
576 if (++ctx->count == n) {
577 ctx->p.setValue(std::move(ctx->results));
588 template <class, class, typename = void> struct CollectContextHelper;
590 template <class T, class VecT>
591 struct CollectContextHelper<T, VecT,
592 typename std::enable_if<std::is_same<T, VecT>::value>::type> {
593 static inline std::vector<T>&& getResults(std::vector<VecT>& results) {
594 return std::move(results);
598 template <class T, class VecT>
599 struct CollectContextHelper<T, VecT,
600 typename std::enable_if<!std::is_same<T, VecT>::value>::type> {
601 static inline std::vector<T> getResults(std::vector<VecT>& results) {
602 std::vector<T> finalResults;
603 finalResults.reserve(results.size());
604 for (auto& opt : results) {
605 finalResults.push_back(std::move(opt.value()));
611 template <typename T>
612 struct CollectContext {
614 typedef typename std::conditional<
615 std::is_default_constructible<T>::value,
620 explicit CollectContext(int n) : count(0), threw(false) {
624 Promise<std::vector<T>> p;
625 std::vector<VecT> results;
626 std::atomic<size_t> count;
627 std::atomic_bool threw;
629 typedef std::vector<T> result_type;
631 static inline Future<std::vector<T>> makeEmptyFuture() {
632 return makeFuture(std::vector<T>());
635 inline void setValue() {
636 p.setValue(CollectContextHelper<T, VecT>::getResults(results));
639 inline void addResult(int i, Try<T>& t) {
640 results[i] = std::move(t.value());
645 struct CollectContext<void> {
647 explicit CollectContext(int n) : count(0), threw(false) {}
650 std::atomic<size_t> count;
651 std::atomic_bool threw;
653 typedef void result_type;
655 static inline Future<void> makeEmptyFuture() {
659 inline void setValue() {
663 inline void addResult(int i, Try<void>& t) {
670 template <class InputIterator>
671 Future<typename detail::CollectContext<
672 typename std::iterator_traits<InputIterator>::value_type::value_type
674 collect(InputIterator first, InputIterator last) {
676 typename std::iterator_traits<InputIterator>::value_type::value_type T;
679 return detail::CollectContext<T>::makeEmptyFuture();
682 size_t n = std::distance(first, last);
683 auto ctx = new detail::CollectContext<T>(n);
684 auto f_saved = ctx->p.getFuture();
686 for (size_t i = 0; first != last; ++first, ++i) {
689 f.setCallback_([ctx, i, n](Try<T> t) {
690 auto c = ++ctx->count;
692 if (t.hasException()) {
693 if (!ctx->threw.exchange(true)) {
694 ctx->p.setException(std::move(t.exception()));
696 } else if (!ctx->threw) {
697 ctx->addResult(i, t);
712 template <class InputIterator>
717 std::iterator_traits<InputIterator>::value_type::value_type> > >
718 collectAny(InputIterator first, InputIterator last) {
720 typename std::iterator_traits<InputIterator>::value_type::value_type T;
722 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
723 auto f_saved = ctx->p.getFuture();
725 for (size_t i = 0; first != last; first++, i++) {
727 f.setCallback_([i, ctx](Try<T>&& t) {
728 if (!ctx->done.exchange(true)) {
729 ctx->p.setValue(std::make_pair(i, std::move(t)));
738 template <class InputIterator>
739 Future<std::vector<std::pair<size_t, Try<typename
740 std::iterator_traits<InputIterator>::value_type::value_type>>>>
741 collectN(InputIterator first, InputIterator last, size_t n) {
743 std::iterator_traits<InputIterator>::value_type::value_type T;
744 typedef std::vector<std::pair<size_t, Try<T>>> V;
751 auto ctx = std::make_shared<ctx_t>();
754 // for each completed Future, increase count and add to vector, until we
755 // have n completed futures at which point we fulfill our Promise with the
760 it->then([ctx, n, i](Try<T>&& t) {
762 auto c = ++ctx->completed;
764 assert(ctx->v.size() < n);
765 v.push_back(std::make_pair(i, std::move(t)));
767 ctx->p.setTry(Try<V>(std::move(v)));
777 ctx->p.setException(std::runtime_error("Not enough futures"));
780 return ctx->p.getFuture();
783 template <class It, class T, class F, class ItT, class Arg>
784 typename std::enable_if<!isFutureResult<F, T, Arg>::value, Future<T>>::type
785 reduce(It first, It last, T initial, F func) {
787 return makeFuture(std::move(initial));
790 typedef isTry<Arg> IsTry;
792 return collectAll(first, last)
793 .then([initial, func](std::vector<Try<ItT>>& vals) mutable {
794 for (auto& val : vals) {
795 initial = func(std::move(initial),
796 // Either return a ItT&& or a Try<ItT>&& depending
797 // on the type of the argument of func.
798 val.template get<IsTry::value, Arg&&>());
804 template <class It, class T, class F, class ItT, class Arg>
805 typename std::enable_if<isFutureResult<F, T, Arg>::value, Future<T>>::type
806 reduce(It first, It last, T initial, F func) {
808 return makeFuture(std::move(initial));
811 typedef isTry<Arg> IsTry;
813 auto f = first->then([initial, func](Try<ItT>& head) mutable {
814 return func(std::move(initial),
815 head.template get<IsTry::value, Arg&&>());
818 for (++first; first != last; ++first) {
819 f = collectAll(f, *first).then([func](std::tuple<Try<T>, Try<ItT>>& t) {
820 return func(std::move(std::get<0>(t).value()),
821 // Either return a ItT&& or a Try<ItT>&& depending
822 // on the type of the argument of func.
823 std::get<1>(t).template get<IsTry::value, Arg&&>());
831 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
832 return within(dur, TimedOut(), tk);
837 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
840 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
843 std::atomic<bool> token;
845 auto ctx = std::make_shared<Context>(std::move(e));
848 tk = folly::detail::getTimekeeperSingleton();
852 .then([ctx](Try<void> const& t) {
853 if (ctx->token.exchange(true) == false) {
854 if (t.hasException()) {
855 ctx->promise.setException(std::move(t.exception()));
857 ctx->promise.setException(std::move(ctx->exception));
862 this->then([ctx](Try<T>&& t) {
863 if (ctx->token.exchange(true) == false) {
864 ctx->promise.setTry(std::move(t));
868 return ctx->promise.getFuture();
872 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
873 return collectAll(*this, futures::sleep(dur, tk))
874 .then([](std::tuple<Try<T>, Try<void>> tup) {
875 Try<T>& t = std::get<0>(tup);
876 return makeFuture<T>(std::move(t));
883 void waitImpl(Future<T>& f) {
884 // short-circuit if there's nothing to do
885 if (f.isReady()) return;
887 folly::fibers::Baton baton;
888 f = f.then([&](Try<T> t) {
890 return makeFuture(std::move(t));
894 // There's a race here between the return here and the actual finishing of
895 // the future. f is completed, but the setup may not have finished on done
896 // after the baton has posted.
897 while (!f.isReady()) {
898 std::this_thread::yield();
903 void waitImpl(Future<T>& f, Duration dur) {
904 // short-circuit if there's nothing to do
905 if (f.isReady()) return;
907 auto baton = std::make_shared<folly::fibers::Baton>();
908 f = f.then([baton](Try<T> t) {
910 return makeFuture(std::move(t));
913 // Let's preserve the invariant that if we did not timeout (timed_wait returns
914 // true), then the returned Future is complete when it is returned to the
915 // caller. We need to wait out the race for that Future to complete.
916 if (baton->timed_wait(dur)) {
917 while (!f.isReady()) {
918 std::this_thread::yield();
924 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
925 while (!f.isReady()) {
933 Future<T>& Future<T>::wait() & {
934 detail::waitImpl(*this);
939 Future<T>&& Future<T>::wait() && {
940 detail::waitImpl(*this);
941 return std::move(*this);
945 Future<T>& Future<T>::wait(Duration dur) & {
946 detail::waitImpl(*this, dur);
951 Future<T>&& Future<T>::wait(Duration dur) && {
952 detail::waitImpl(*this, dur);
953 return std::move(*this);
957 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
958 detail::waitViaImpl(*this, e);
963 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
964 detail::waitViaImpl(*this, e);
965 return std::move(*this);
970 return std::move(wait().value());
974 inline void Future<void>::get() {
979 T Future<T>::get(Duration dur) {
982 return std::move(value());
989 inline void Future<void>::get(Duration dur) {
999 T Future<T>::getVia(DrivableExecutor* e) {
1000 return std::move(waitVia(e).value());
1004 inline void Future<void>::getVia(DrivableExecutor* e) {
1009 Future<bool> Future<T>::willEqual(Future<T>& f) {
1010 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1011 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1012 return std::get<0>(t).value() == std::get<1>(t).value();
1021 Future<T> Future<T>::filter(F predicate) {
1022 auto p = folly::makeMoveWrapper(std::move(predicate));
1023 return this->then([p](T val) {
1024 T const& valConstRef = val;
1025 if (!(*p)(valConstRef)) {
1026 throw PredicateDoesNotObtain();
1035 Future<Z> chainHelper(Future<Z> f) {
1039 template <class Z, class F, class Fn, class... Callbacks>
1040 Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
1041 return chainHelper<Z>(f.then(fn), fns...);
1045 template <class A, class Z, class... Callbacks>
1046 std::function<Future<Z>(Try<A>)>
1047 chain(Callbacks... fns) {
1048 MoveWrapper<Promise<A>> pw;
1049 MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
1050 return [=](Try<A> t) mutable {
1051 pw->setTry(std::move(t));
1052 return std::move(*fw);
1056 template <class It, class F, class ItT, class Result>
1057 std::vector<Future<Result>> map(It first, It last, F func) {
1058 std::vector<Future<Result>> results;
1059 for (auto it = first; it != last; it++) {
1060 results.push_back(it->then(func));
1066 } // namespace folly
1068 // I haven't included a Future<T&> specialization because I don't forsee us
1069 // using it, however it is not difficult to add when needed. Refer to
1070 // Future<void> for guidance. std::future and boost::future code would also be