2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
32 Timekeeper* getTimekeeperSingleton();
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37 other.core_ = nullptr;
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42 std::swap(core_, other.core_);
47 template <class T2, typename>
48 Future<T>::Future(T2&& val) : core_(nullptr) {
50 p.setValue(std::forward<T2>(val));
51 *this = p.getFuture();
56 typename std::enable_if<
57 folly::is_void_or_unit<T2>::value,
59 Future<T>::Future() : core_(nullptr) {
62 *this = p.getFuture();
67 Future<T>::~Future() {
72 void Future<T>::detach() {
74 core_->detachFuture();
80 void Future<T>::throwIfInvalid() const {
87 void Future<T>::setCallback_(F&& func) {
89 core_->setCallback(std::move(func));
96 typename std::enable_if<isFuture<F>::value,
97 Future<typename isFuture<T>::Inner>>::type
99 return then([](Future<typename isFuture<T>::Inner> internal_future) {
100 return internal_future;
106 // Variant: returns a value
107 // e.g. f.then([](Try<T>&& t){ return t.value(); });
109 template <typename F, typename R, bool isTry, typename... Args>
110 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
111 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
112 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
113 typedef typename R::ReturnsFuture::Inner B;
117 // wrap these so we can move them into the lambda
118 folly::MoveWrapper<Promise<B>> p;
119 p->setInterruptHandler(core_->getInterruptHandler());
120 folly::MoveWrapper<F> funcm(std::forward<F>(func));
122 // grab the Future now before we lose our handle on the Promise
123 auto f = p->getFuture();
125 f.setExecutor(getExecutor());
128 /* This is a bit tricky.
130 We can't just close over *this in case this Future gets moved. So we
131 make a new dummy Future. We could figure out something more
132 sophisticated that avoids making a new Future object when it can, as an
133 optimization. But this is correct.
135 core_ can't be moved, it is explicitly disallowed (as is copying). But
136 if there's ever a reason to allow it, this is one place that makes that
137 assumption and would need to be fixed. We use a standard shared pointer
138 for core_ (by copying it in), which means in essence obj holds a shared
139 pointer to itself. But this shouldn't leak because Promise will not
140 outlive the continuation, because Promise will setException() with a
141 broken Promise if it is destructed before completed. We could use a
142 weak pointer but it would have to be converted to a shared pointer when
143 func is executed (because the Future returned by func may possibly
144 persist beyond the callback, if it gets moved), and so it is an
145 optimization to just make it shared from the get-go.
147 We have to move in the Promise and func using the MoveWrapper
148 hack. (func could be copied but it's a big drag on perf).
150 Two subtle but important points about this design. detail::Core has no
151 back pointers to Future or Promise, so if Future or Promise get moved
152 (and they will be moved in performant code) we don't have to do
153 anything fancy. And because we store the continuation in the
154 detail::Core, not in the Future, we can execute the continuation even
155 after the Future has gone out of scope. This is an intentional design
156 decision. It is likely we will want to be able to cancel a continuation
157 in some circumstances, but I think it should be explicit not implicit
158 in the destruction of the Future used to create it.
161 [p, funcm](Try<T>&& t) mutable {
162 if (!isTry && t.hasException()) {
163 p->setException(std::move(t.exception()));
166 return (*funcm)(t.template get<isTry, Args>()...);
174 // Variant: returns a Future
175 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
177 template <typename F, typename R, bool isTry, typename... Args>
178 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
179 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
180 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
181 typedef typename R::ReturnsFuture::Inner B;
185 // wrap these so we can move them into the lambda
186 folly::MoveWrapper<Promise<B>> p;
187 folly::MoveWrapper<F> funcm(std::forward<F>(func));
189 // grab the Future now before we lose our handle on the Promise
190 auto f = p->getFuture();
192 f.setExecutor(getExecutor());
196 [p, funcm](Try<T>&& t) mutable {
197 if (!isTry && t.hasException()) {
198 p->setException(std::move(t.exception()));
201 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
202 // that didn't throw, now we can steal p
203 f2.setCallback_([p](Try<B>&& b) mutable {
204 p->setTry(std::move(b));
206 } catch (const std::exception& e) {
207 p->setException(exception_wrapper(std::current_exception(), e));
209 p->setException(exception_wrapper(std::current_exception()));
217 template <typename T>
218 template <typename R, typename Caller, typename... Args>
219 Future<typename isFuture<R>::Inner>
220 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
221 typedef typename std::remove_cv<
222 typename std::remove_reference<
223 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
224 return then([instance, func](Try<T>&& t){
225 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
230 template <class Executor, class Arg, class... Args>
231 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
232 -> decltype(this->then(std::forward<Arg>(arg),
233 std::forward<Args>(args)...))
235 auto oldX = getExecutor();
237 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
242 Future<void> Future<T>::then() {
243 return then([] (Try<T>&& t) {});
246 // onError where the callback returns T
249 typename std::enable_if<
250 !detail::callableWith<F, exception_wrapper>::value &&
251 !detail::Extract<F>::ReturnsFuture::value,
253 Future<T>::onError(F&& func) {
254 typedef typename detail::Extract<F>::FirstArg Exn;
256 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
257 "Return type of onError callback must be T or Future<T>");
260 auto f = p.getFuture();
261 auto pm = folly::makeMoveWrapper(std::move(p));
262 auto funcm = folly::makeMoveWrapper(std::move(func));
263 setCallback_([pm, funcm](Try<T>&& t) mutable {
264 if (!t.template withException<Exn>([&] (Exn& e) {
269 pm->setTry(std::move(t));
276 // onError where the callback returns Future<T>
279 typename std::enable_if<
280 !detail::callableWith<F, exception_wrapper>::value &&
281 detail::Extract<F>::ReturnsFuture::value,
283 Future<T>::onError(F&& func) {
285 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
286 "Return type of onError callback must be T or Future<T>");
287 typedef typename detail::Extract<F>::FirstArg Exn;
290 auto f = p.getFuture();
291 auto pm = folly::makeMoveWrapper(std::move(p));
292 auto funcm = folly::makeMoveWrapper(std::move(func));
293 setCallback_([pm, funcm](Try<T>&& t) mutable {
294 if (!t.template withException<Exn>([&] (Exn& e) {
296 auto f2 = (*funcm)(e);
297 f2.setCallback_([pm](Try<T>&& t2) mutable {
298 pm->setTry(std::move(t2));
300 } catch (const std::exception& e2) {
301 pm->setException(exception_wrapper(std::current_exception(), e2));
303 pm->setException(exception_wrapper(std::current_exception()));
306 pm->setTry(std::move(t));
315 Future<T> Future<T>::ensure(F func) {
316 MoveWrapper<F> funcw(std::move(func));
317 return this->then([funcw](Try<T>&& t) {
319 return makeFuture(std::move(t));
325 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
326 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
327 return within(dur, tk)
328 .onError([funcw](TimedOut const&) { return (*funcw)(); });
333 typename std::enable_if<
334 detail::callableWith<F, exception_wrapper>::value &&
335 detail::Extract<F>::ReturnsFuture::value,
337 Future<T>::onError(F&& func) {
339 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
340 "Return type of onError callback must be T or Future<T>");
343 auto f = p.getFuture();
344 auto pm = folly::makeMoveWrapper(std::move(p));
345 auto funcm = folly::makeMoveWrapper(std::move(func));
346 setCallback_([pm, funcm](Try<T> t) mutable {
347 if (t.hasException()) {
349 auto f2 = (*funcm)(std::move(t.exception()));
350 f2.setCallback_([pm](Try<T> t2) mutable {
351 pm->setTry(std::move(t2));
353 } catch (const std::exception& e2) {
354 pm->setException(exception_wrapper(std::current_exception(), e2));
356 pm->setException(exception_wrapper(std::current_exception()));
359 pm->setTry(std::move(t));
366 // onError(exception_wrapper) that returns T
369 typename std::enable_if<
370 detail::callableWith<F, exception_wrapper>::value &&
371 !detail::Extract<F>::ReturnsFuture::value,
373 Future<T>::onError(F&& func) {
375 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
376 "Return type of onError callback must be T or Future<T>");
379 auto f = p.getFuture();
380 auto pm = folly::makeMoveWrapper(std::move(p));
381 auto funcm = folly::makeMoveWrapper(std::move(func));
382 setCallback_([pm, funcm](Try<T> t) mutable {
383 if (t.hasException()) {
385 return (*funcm)(std::move(t.exception()));
388 pm->setTry(std::move(t));
396 typename std::add_lvalue_reference<T>::type Future<T>::value() {
399 return core_->getTry().value();
403 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
406 return core_->getTry().value();
410 Try<T>& Future<T>::getTry() {
413 return core_->getTry();
417 Optional<Try<T>> Future<T>::poll() {
419 if (core_->ready()) {
420 o = std::move(core_->getTry());
426 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
429 setExecutor(executor, priority);
431 return std::move(*this);
435 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
438 MoveWrapper<Promise<T>> p;
439 auto f = p->getFuture();
440 then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
441 return std::move(f).via(executor, priority);
445 bool Future<T>::isReady() const {
447 return core_->ready();
451 void Future<T>::raise(exception_wrapper exception) {
452 core_->raise(std::move(exception));
458 Future<typename std::decay<T>::type> makeFuture(T&& t) {
459 Promise<typename std::decay<T>::type> p;
460 p.setValue(std::forward<T>(t));
461 return p.getFuture();
464 inline // for multiple translation units
465 Future<void> makeFuture() {
468 return p.getFuture();
474 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
475 -> Future<decltype(func())> {
476 Promise<decltype(func())> p;
481 return p.getFuture();
485 auto makeFutureWith(F const& func) -> Future<decltype(func())> {
487 return makeFutureWith(std::move(copy));
491 Future<T> makeFuture(std::exception_ptr const& e) {
494 return p.getFuture();
498 Future<T> makeFuture(exception_wrapper ew) {
500 p.setException(std::move(ew));
501 return p.getFuture();
504 template <class T, class E>
505 typename std::enable_if<std::is_base_of<std::exception, E>::value,
507 makeFuture(E const& e) {
509 p.setException(make_exception_wrapper<E>(e));
510 return p.getFuture();
514 Future<T> makeFuture(Try<T>&& t) {
515 Promise<typename std::decay<T>::type> p;
516 p.setTry(std::move(t));
517 return p.getFuture();
521 inline Future<void> makeFuture(Try<void>&& t) {
522 if (t.hasException()) {
523 return makeFuture<void>(std::move(t.exception()));
530 Future<void> via(Executor* executor, int8_t priority) {
531 return makeFuture().via(executor, priority);
534 // mapSetCallback calls func(i, Try<T>) when every future completes
536 template <class T, class InputIterator, class F>
537 void mapSetCallback(InputIterator first, InputIterator last, F func) {
538 for (size_t i = 0; first != last; ++first, ++i) {
539 first->setCallback_([func, i](Try<T>&& t) {
540 func(i, std::move(t));
545 // collectAll (variadic)
547 template <typename... Fs>
548 typename detail::CollectAllVariadicContext<
549 typename std::decay<Fs>::type::value_type...>::type
550 collectAll(Fs&&... fs) {
551 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
552 typename std::decay<Fs>::type::value_type...>>();
553 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
554 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
555 return ctx->p.getFuture();
558 // collectAll (iterator)
560 template <class InputIterator>
563 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
564 collectAll(InputIterator first, InputIterator last) {
566 typename std::iterator_traits<InputIterator>::value_type::value_type T;
568 struct CollectAllContext {
569 CollectAllContext(int n) : results(n) {}
570 ~CollectAllContext() {
571 p.setValue(std::move(results));
573 Promise<std::vector<Try<T>>> p;
574 std::vector<Try<T>> results;
577 auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
578 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
579 ctx->results[i] = std::move(t);
581 return ctx->p.getFuture();
584 // collect (iterator)
588 template <typename T>
589 struct CollectContext {
590 struct Nothing { explicit Nothing(int n) {} };
592 using Result = typename std::conditional<
593 std::is_void<T>::value,
595 std::vector<T>>::type;
597 using InternalResult = typename std::conditional<
598 std::is_void<T>::value,
600 std::vector<Optional<T>>>::type;
602 explicit CollectContext(int n) : result(n) {}
604 if (!threw.exchange(true)) {
605 // map Optional<T> -> T
606 std::vector<T> finalResult;
607 finalResult.reserve(result.size());
608 std::transform(result.begin(), result.end(),
609 std::back_inserter(finalResult),
610 [](Optional<T>& o) { return std::move(o.value()); });
611 p.setValue(std::move(finalResult));
614 inline void setPartialResult(size_t i, Try<T>& t) {
615 result[i] = std::move(t.value());
618 InternalResult result;
619 std::atomic<bool> threw;
622 // Specialize for void (implementations in Future.cpp)
625 CollectContext<void>::~CollectContext();
628 void CollectContext<void>::setPartialResult(size_t i, Try<void>& t);
632 template <class InputIterator>
633 Future<typename detail::CollectContext<
634 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
635 collect(InputIterator first, InputIterator last) {
637 typename std::iterator_traits<InputIterator>::value_type::value_type T;
639 auto ctx = std::make_shared<detail::CollectContext<T>>(
640 std::distance(first, last));
641 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
642 if (t.hasException()) {
643 if (!ctx->threw.exchange(true)) {
644 ctx->p.setException(std::move(t.exception()));
646 } else if (!ctx->threw) {
647 ctx->setPartialResult(i, t);
650 return ctx->p.getFuture();
653 // collect (variadic)
655 template <typename... Fs>
656 typename detail::CollectVariadicContext<
657 typename std::decay<Fs>::type::value_type...>::type
658 collect(Fs&&... fs) {
659 auto ctx = std::make_shared<detail::CollectVariadicContext<
660 typename std::decay<Fs>::type::value_type...>>();
661 detail::collectVariadicHelper<detail::CollectVariadicContext>(
662 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
663 return ctx->p.getFuture();
666 // collectAny (iterator)
668 template <class InputIterator>
673 std::iterator_traits<InputIterator>::value_type::value_type>>>
674 collectAny(InputIterator first, InputIterator last) {
676 typename std::iterator_traits<InputIterator>::value_type::value_type T;
678 struct CollectAnyContext {
679 CollectAnyContext(size_t n) : done(false) {};
680 Promise<std::pair<size_t, Try<T>>> p;
681 std::atomic<bool> done;
684 auto ctx = std::make_shared<CollectAnyContext>(std::distance(first, last));
685 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
686 if (!ctx->done.exchange(true)) {
687 ctx->p.setValue(std::make_pair(i, std::move(t)));
690 return ctx->p.getFuture();
693 // collectN (iterator)
695 template <class InputIterator>
696 Future<std::vector<std::pair<size_t, Try<typename
697 std::iterator_traits<InputIterator>::value_type::value_type>>>>
698 collectN(InputIterator first, InputIterator last, size_t n) {
700 std::iterator_traits<InputIterator>::value_type::value_type T;
701 typedef std::vector<std::pair<size_t, Try<T>>> V;
703 struct CollectNContext {
705 std::atomic<size_t> completed = {0};
708 auto ctx = std::make_shared<CollectNContext>();
710 if (size_t(std::distance(first, last)) < n) {
711 ctx->p.setException(std::runtime_error("Not enough futures"));
713 // for each completed Future, increase count and add to vector, until we
714 // have n completed futures at which point we fulfil our Promise with the
716 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
717 auto c = ++ctx->completed;
719 assert(ctx->v.size() < n);
720 ctx->v.push_back(std::make_pair(i, std::move(t)));
722 ctx->p.setTry(Try<V>(std::move(ctx->v)));
728 return ctx->p.getFuture();
733 template <class It, class T, class F>
734 Future<T> reduce(It first, It last, T&& initial, F&& func) {
736 return makeFuture(std::move(initial));
739 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
740 typedef typename std::conditional<
741 detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
742 typedef isTry<Arg> IsTry;
744 folly::MoveWrapper<T> minitial(std::move(initial));
745 auto sfunc = std::make_shared<F>(std::move(func));
747 auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
748 return (*sfunc)(std::move(*minitial),
749 head.template get<IsTry::value, Arg&&>());
752 for (++first; first != last; ++first) {
753 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
754 return (*sfunc)(std::move(std::get<0>(t).value()),
755 // Either return a ItT&& or a Try<ItT>&& depending
756 // on the type of the argument of func.
757 std::get<1>(t).template get<IsTry::value, Arg&&>());
764 // window (collection)
766 template <class Collection, class F, class ItT, class Result>
767 std::vector<Future<Result>>
768 window(Collection input, F func, size_t n) {
769 struct WindowContext {
770 WindowContext(Collection&& i, F&& fn)
771 : i_(0), input_(std::move(i)), promises_(input_.size()),
774 std::atomic<size_t> i_;
776 std::vector<Promise<Result>> promises_;
779 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
780 size_t i = ctx->i_++;
781 if (i < ctx->input_.size()) {
782 // Using setCallback_ directly since we don't need the Future
783 ctx->func_(std::move(ctx->input_[i])).setCallback_(
784 // ctx is captured by value
785 [ctx, i](Try<Result>&& t) {
786 ctx->promises_[i].setTry(std::move(t));
787 // Chain another future onto this one
788 spawn(std::move(ctx));
794 auto max = std::min(n, input.size());
796 auto ctx = std::make_shared<WindowContext>(
797 std::move(input), std::move(func));
799 for (size_t i = 0; i < max; ++i) {
800 // Start the first n Futures
801 WindowContext::spawn(ctx);
804 std::vector<Future<Result>> futures;
805 futures.reserve(ctx->promises_.size());
806 for (auto& promise : ctx->promises_) {
807 futures.emplace_back(promise.getFuture());
816 template <class I, class F>
817 Future<I> Future<T>::reduce(I&& initial, F&& func) {
818 folly::MoveWrapper<I> minitial(std::move(initial));
819 folly::MoveWrapper<F> mfunc(std::move(func));
820 return then([minitial, mfunc](T& vals) mutable {
821 auto ret = std::move(*minitial);
822 for (auto& val : vals) {
823 ret = (*mfunc)(std::move(ret), std::move(val));
829 // unorderedReduce (iterator)
831 template <class It, class T, class F, class ItT, class Arg>
832 Future<T> unorderedReduce(It first, It last, T initial, F func) {
834 return makeFuture(std::move(initial));
837 typedef isTry<Arg> IsTry;
839 struct UnorderedReduceContext {
840 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
841 : lock_(), memo_(makeFuture<T>(std::move(memo))),
842 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
844 folly::MicroSpinLock lock_; // protects memo_ and numThens_
847 size_t numThens_; // how many Futures completed and called .then()
848 size_t numFutures_; // how many Futures in total
852 auto ctx = std::make_shared<UnorderedReduceContext>(
853 std::move(initial), std::move(func), std::distance(first, last));
855 mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
856 folly::MoveWrapper<Try<ItT>> mt(std::move(t));
857 // Futures can be completed in any order, simultaneously.
858 // To make this non-blocking, we create a new Future chain in
859 // the order of completion to reduce the values.
860 // The spinlock just protects chaining a new Future, not actually
861 // executing the reduce, which should be really fast.
862 folly::MSLGuard lock(ctx->lock_);
863 ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
864 // Either return a ItT&& or a Try<ItT>&& depending
865 // on the type of the argument of func.
866 return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
868 if (++ctx->numThens_ == ctx->numFutures_) {
869 // After reducing the value of the last Future, fulfill the Promise
870 ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
871 ctx->promise_.setValue(std::move(t2));
876 return ctx->promise_.getFuture();
882 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
883 return within(dur, TimedOut(), tk);
888 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
891 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
894 std::atomic<bool> token;
896 auto ctx = std::make_shared<Context>(std::move(e));
899 tk = folly::detail::getTimekeeperSingleton();
903 .then([ctx](Try<void> const& t) {
904 if (ctx->token.exchange(true) == false) {
905 if (t.hasException()) {
906 ctx->promise.setException(std::move(t.exception()));
908 ctx->promise.setException(std::move(ctx->exception));
913 this->then([ctx](Try<T>&& t) {
914 if (ctx->token.exchange(true) == false) {
915 ctx->promise.setTry(std::move(t));
919 return ctx->promise.getFuture().via(getExecutor());
925 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
926 return collectAll(*this, futures::sleep(dur, tk))
927 .then([](std::tuple<Try<T>, Try<void>> tup) {
928 Try<T>& t = std::get<0>(tup);
929 return makeFuture<T>(std::move(t));
936 void waitImpl(Future<T>& f) {
937 // short-circuit if there's nothing to do
938 if (f.isReady()) return;
940 folly::fibers::Baton baton;
941 f = f.then([&](Try<T> t) {
943 return makeFuture(std::move(t));
947 // There's a race here between the return here and the actual finishing of
948 // the future. f is completed, but the setup may not have finished on done
949 // after the baton has posted.
950 while (!f.isReady()) {
951 std::this_thread::yield();
956 void waitImpl(Future<T>& f, Duration dur) {
957 // short-circuit if there's nothing to do
958 if (f.isReady()) return;
960 auto baton = std::make_shared<folly::fibers::Baton>();
961 f = f.then([baton](Try<T> t) {
963 return makeFuture(std::move(t));
966 // Let's preserve the invariant that if we did not timeout (timed_wait returns
967 // true), then the returned Future is complete when it is returned to the
968 // caller. We need to wait out the race for that Future to complete.
969 if (baton->timed_wait(dur)) {
970 while (!f.isReady()) {
971 std::this_thread::yield();
977 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
978 while (!f.isReady()) {
986 Future<T>& Future<T>::wait() & {
987 detail::waitImpl(*this);
992 Future<T>&& Future<T>::wait() && {
993 detail::waitImpl(*this);
994 return std::move(*this);
998 Future<T>& Future<T>::wait(Duration dur) & {
999 detail::waitImpl(*this, dur);
1004 Future<T>&& Future<T>::wait(Duration dur) && {
1005 detail::waitImpl(*this, dur);
1006 return std::move(*this);
1010 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1011 detail::waitViaImpl(*this, e);
1016 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1017 detail::waitViaImpl(*this, e);
1018 return std::move(*this);
1022 T Future<T>::get() {
1023 return std::move(wait().value());
1027 inline void Future<void>::get() {
1032 T Future<T>::get(Duration dur) {
1035 return std::move(value());
1042 inline void Future<void>::get(Duration dur) {
1052 T Future<T>::getVia(DrivableExecutor* e) {
1053 return std::move(waitVia(e).value());
1057 inline void Future<void>::getVia(DrivableExecutor* e) {
1064 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1065 return t1.value() == t2.value();
1070 struct TryEquals<void> {
1071 static bool equals(const Try<void>& t1, const Try<void>& t2) {
1078 Future<bool> Future<T>::willEqual(Future<T>& f) {
1079 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1080 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1081 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1090 Future<T> Future<T>::filter(F predicate) {
1091 auto p = folly::makeMoveWrapper(std::move(predicate));
1092 return this->then([p](T val) {
1093 T const& valConstRef = val;
1094 if (!(*p)(valConstRef)) {
1095 throw PredicateDoesNotObtain();
1102 template <class Callback>
1103 auto Future<T>::thenMulti(Callback&& fn)
1104 -> decltype(this->then(std::forward<Callback>(fn))) {
1105 // thenMulti with one callback is just a then
1106 return then(std::forward<Callback>(fn));
1110 template <class Callback, class... Callbacks>
1111 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1112 -> decltype(this->then(std::forward<Callback>(fn)).
1113 thenMulti(std::forward<Callbacks>(fns)...)) {
1114 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1115 return then(std::forward<Callback>(fn)).
1116 thenMulti(std::forward<Callbacks>(fns)...);
1120 template <class Callback, class... Callbacks>
1121 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1123 -> decltype(this->then(std::forward<Callback>(fn)).
1124 thenMulti(std::forward<Callbacks>(fns)...)) {
1125 // thenMultiExecutor with two callbacks is
1126 // via(x).then(a).thenMulti(b, ...).via(oldX)
1127 auto oldX = getExecutor();
1129 return then(std::forward<Callback>(fn)).
1130 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1134 template <class Callback>
1135 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1136 -> decltype(this->then(std::forward<Callback>(fn))) {
1137 // thenMulti with one callback is just a then with an executor
1138 return then(x, std::forward<Callback>(fn));
1142 template <class It, class F, class ItT, class Result>
1143 std::vector<Future<Result>> map(It first, It last, F func) {
1144 std::vector<Future<Result>> results;
1145 for (auto it = first; it != last; it++) {
1146 results.push_back(it->then(func));
1152 // Instantiate the most common Future types to save compile time
1153 extern template class Future<void>;
1154 extern template class Future<bool>;
1155 extern template class Future<int>;
1156 extern template class Future<int64_t>;
1157 extern template class Future<std::string>;
1158 extern template class Future<double>;
1160 } // namespace folly
1162 // I haven't included a Future<T&> specialization because I don't forsee us
1163 // using it, however it is not difficult to add when needed. Refer to
1164 // Future<void> for guidance. std::future and boost::future code would also be