2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
32 Timekeeper* getTimekeeperSingleton();
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37 other.core_ = nullptr;
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42 std::swap(core_, other.core_);
47 template <class T2, typename>
48 Future<T>::Future(T2&& val)
49 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
52 template <typename, typename>
54 : core_(new detail::Core<T>(Try<T>())) {}
57 Future<T>::~Future() {
62 void Future<T>::detach() {
64 core_->detachFuture();
70 void Future<T>::throwIfInvalid() const {
77 void Future<T>::setCallback_(F&& func) {
79 core_->setCallback(std::move(func));
86 typename std::enable_if<isFuture<F>::value,
87 Future<typename isFuture<T>::Inner>>::type
89 return then([](Future<typename isFuture<T>::Inner> internal_future) {
90 return internal_future;
96 // Variant: returns a value
97 // e.g. f.then([](Try<T>&& t){ return t.value(); });
99 template <typename F, typename R, bool isTry, typename... Args>
100 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
101 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
102 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
103 typedef typename R::ReturnsFuture::Inner B;
107 // wrap these so we can move them into the lambda
108 folly::MoveWrapper<Promise<B>> p;
109 p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
110 folly::MoveWrapper<F> funcm(std::forward<F>(func));
112 // grab the Future now before we lose our handle on the Promise
113 auto f = p->getFuture();
114 f.core_->setExecutorNoLock(getExecutor());
116 /* This is a bit tricky.
118 We can't just close over *this in case this Future gets moved. So we
119 make a new dummy Future. We could figure out something more
120 sophisticated that avoids making a new Future object when it can, as an
121 optimization. But this is correct.
123 core_ can't be moved, it is explicitly disallowed (as is copying). But
124 if there's ever a reason to allow it, this is one place that makes that
125 assumption and would need to be fixed. We use a standard shared pointer
126 for core_ (by copying it in), which means in essence obj holds a shared
127 pointer to itself. But this shouldn't leak because Promise will not
128 outlive the continuation, because Promise will setException() with a
129 broken Promise if it is destructed before completed. We could use a
130 weak pointer but it would have to be converted to a shared pointer when
131 func is executed (because the Future returned by func may possibly
132 persist beyond the callback, if it gets moved), and so it is an
133 optimization to just make it shared from the get-go.
135 We have to move in the Promise and func using the MoveWrapper
136 hack. (func could be copied but it's a big drag on perf).
138 Two subtle but important points about this design. detail::Core has no
139 back pointers to Future or Promise, so if Future or Promise get moved
140 (and they will be moved in performant code) we don't have to do
141 anything fancy. And because we store the continuation in the
142 detail::Core, not in the Future, we can execute the continuation even
143 after the Future has gone out of scope. This is an intentional design
144 decision. It is likely we will want to be able to cancel a continuation
145 in some circumstances, but I think it should be explicit not implicit
146 in the destruction of the Future used to create it.
149 [p, funcm](Try<T>&& t) mutable {
150 if (!isTry && t.hasException()) {
151 p->setException(std::move(t.exception()));
154 return (*funcm)(t.template get<isTry, Args>()...);
162 // Variant: returns a Future
163 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
165 template <typename F, typename R, bool isTry, typename... Args>
166 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
167 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
168 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
169 typedef typename R::ReturnsFuture::Inner B;
173 // wrap these so we can move them into the lambda
174 folly::MoveWrapper<Promise<B>> p;
175 p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
176 folly::MoveWrapper<F> funcm(std::forward<F>(func));
178 // grab the Future now before we lose our handle on the Promise
179 auto f = p->getFuture();
180 f.core_->setExecutorNoLock(getExecutor());
183 [p, funcm](Try<T>&& t) mutable {
184 if (!isTry && t.hasException()) {
185 p->setException(std::move(t.exception()));
188 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
189 // that didn't throw, now we can steal p
190 f2.setCallback_([p](Try<B>&& b) mutable {
191 p->setTry(std::move(b));
193 } catch (const std::exception& e) {
194 p->setException(exception_wrapper(std::current_exception(), e));
196 p->setException(exception_wrapper(std::current_exception()));
204 template <typename T>
205 template <typename R, typename Caller, typename... Args>
206 Future<typename isFuture<R>::Inner>
207 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
208 typedef typename std::remove_cv<
209 typename std::remove_reference<
210 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
211 return then([instance, func](Try<T>&& t){
212 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
217 template <class Executor, class Arg, class... Args>
218 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
219 -> decltype(this->then(std::forward<Arg>(arg),
220 std::forward<Args>(args)...))
222 auto oldX = getExecutor();
224 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
229 Future<void> Future<T>::then() {
230 return then([] () {});
233 // onError where the callback returns T
236 typename std::enable_if<
237 !detail::callableWith<F, exception_wrapper>::value &&
238 !detail::Extract<F>::ReturnsFuture::value,
240 Future<T>::onError(F&& func) {
241 typedef typename detail::Extract<F>::FirstArg Exn;
243 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
244 "Return type of onError callback must be T or Future<T>");
247 auto f = p.getFuture();
248 auto pm = folly::makeMoveWrapper(std::move(p));
249 auto funcm = folly::makeMoveWrapper(std::move(func));
250 setCallback_([pm, funcm](Try<T>&& t) mutable {
251 if (!t.template withException<Exn>([&] (Exn& e) {
256 pm->setTry(std::move(t));
263 // onError where the callback returns Future<T>
266 typename std::enable_if<
267 !detail::callableWith<F, exception_wrapper>::value &&
268 detail::Extract<F>::ReturnsFuture::value,
270 Future<T>::onError(F&& func) {
272 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
273 "Return type of onError callback must be T or Future<T>");
274 typedef typename detail::Extract<F>::FirstArg Exn;
277 auto f = p.getFuture();
278 auto pm = folly::makeMoveWrapper(std::move(p));
279 auto funcm = folly::makeMoveWrapper(std::move(func));
280 setCallback_([pm, funcm](Try<T>&& t) mutable {
281 if (!t.template withException<Exn>([&] (Exn& e) {
283 auto f2 = (*funcm)(e);
284 f2.setCallback_([pm](Try<T>&& t2) mutable {
285 pm->setTry(std::move(t2));
287 } catch (const std::exception& e2) {
288 pm->setException(exception_wrapper(std::current_exception(), e2));
290 pm->setException(exception_wrapper(std::current_exception()));
293 pm->setTry(std::move(t));
302 Future<T> Future<T>::ensure(F func) {
303 MoveWrapper<F> funcw(std::move(func));
304 return this->then([funcw](Try<T>&& t) {
306 return makeFuture(std::move(t));
312 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
313 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
314 return within(dur, tk)
315 .onError([funcw](TimedOut const&) { return (*funcw)(); });
320 typename std::enable_if<
321 detail::callableWith<F, exception_wrapper>::value &&
322 detail::Extract<F>::ReturnsFuture::value,
324 Future<T>::onError(F&& func) {
326 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
327 "Return type of onError callback must be T or Future<T>");
330 auto f = p.getFuture();
331 auto pm = folly::makeMoveWrapper(std::move(p));
332 auto funcm = folly::makeMoveWrapper(std::move(func));
333 setCallback_([pm, funcm](Try<T> t) mutable {
334 if (t.hasException()) {
336 auto f2 = (*funcm)(std::move(t.exception()));
337 f2.setCallback_([pm](Try<T> t2) mutable {
338 pm->setTry(std::move(t2));
340 } catch (const std::exception& e2) {
341 pm->setException(exception_wrapper(std::current_exception(), e2));
343 pm->setException(exception_wrapper(std::current_exception()));
346 pm->setTry(std::move(t));
353 // onError(exception_wrapper) that returns T
356 typename std::enable_if<
357 detail::callableWith<F, exception_wrapper>::value &&
358 !detail::Extract<F>::ReturnsFuture::value,
360 Future<T>::onError(F&& func) {
362 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
363 "Return type of onError callback must be T or Future<T>");
366 auto f = p.getFuture();
367 auto pm = folly::makeMoveWrapper(std::move(p));
368 auto funcm = folly::makeMoveWrapper(std::move(func));
369 setCallback_([pm, funcm](Try<T> t) mutable {
370 if (t.hasException()) {
372 return (*funcm)(std::move(t.exception()));
375 pm->setTry(std::move(t));
383 typename std::add_lvalue_reference<T>::type Future<T>::value() {
386 return core_->getTry().value();
390 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
393 return core_->getTry().value();
397 Try<T>& Future<T>::getTry() {
400 return core_->getTry();
404 Optional<Try<T>> Future<T>::poll() {
406 if (core_->ready()) {
407 o = std::move(core_->getTry());
413 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
416 setExecutor(executor, priority);
418 return std::move(*this);
422 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
425 MoveWrapper<Promise<T>> p;
426 auto f = p->getFuture();
427 then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
428 return std::move(f).via(executor, priority);
432 template <class Func>
433 auto via(Executor* x, Func func)
434 -> Future<typename isFuture<decltype(func())>::Inner>
435 // this would work, if not for Future<void> :-/
436 // -> decltype(via(x).then(func))
438 // TODO make this actually more performant. :-P #7260175
439 return via(x).then(func);
443 bool Future<T>::isReady() const {
445 return core_->ready();
449 bool Future<T>::hasValue() {
450 return getTry().hasValue();
454 bool Future<T>::hasException() {
455 return getTry().hasException();
459 void Future<T>::raise(exception_wrapper exception) {
460 core_->raise(std::move(exception));
466 Future<typename std::decay<T>::type> makeFuture(T&& t) {
467 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
470 inline // for multiple translation units
471 Future<void> makeFuture() {
472 return makeFuture(Try<void>());
478 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
479 -> Future<decltype(func())> {
480 return makeFuture(makeTryWith([&func]() {
486 auto makeFutureWith(F const& func) -> Future<decltype(func())> {
488 return makeFuture(makeTryWith(std::move(copy)));
492 Future<T> makeFuture(std::exception_ptr const& e) {
493 return makeFuture(Try<T>(e));
497 Future<T> makeFuture(exception_wrapper ew) {
498 return makeFuture(Try<T>(std::move(ew)));
501 template <class T, class E>
502 typename std::enable_if<std::is_base_of<std::exception, E>::value,
504 makeFuture(E const& e) {
505 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
509 Future<T> makeFuture(Try<T>&& t) {
510 return Future<T>(new detail::Core<T>(std::move(t)));
514 Future<void> via(Executor* executor, int8_t priority) {
515 return makeFuture().via(executor, priority);
518 // mapSetCallback calls func(i, Try<T>) when every future completes
520 template <class T, class InputIterator, class F>
521 void mapSetCallback(InputIterator first, InputIterator last, F func) {
522 for (size_t i = 0; first != last; ++first, ++i) {
523 first->setCallback_([func, i](Try<T>&& t) {
524 func(i, std::move(t));
529 // collectAll (variadic)
531 template <typename... Fs>
532 typename detail::CollectAllVariadicContext<
533 typename std::decay<Fs>::type::value_type...>::type
534 collectAll(Fs&&... fs) {
535 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
536 typename std::decay<Fs>::type::value_type...>>();
537 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
538 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
539 return ctx->p.getFuture();
542 // collectAll (iterator)
544 template <class InputIterator>
547 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
548 collectAll(InputIterator first, InputIterator last) {
550 typename std::iterator_traits<InputIterator>::value_type::value_type T;
552 struct CollectAllContext {
553 CollectAllContext(int n) : results(n) {}
554 ~CollectAllContext() {
555 p.setValue(std::move(results));
557 Promise<std::vector<Try<T>>> p;
558 std::vector<Try<T>> results;
561 auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
562 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
563 ctx->results[i] = std::move(t);
565 return ctx->p.getFuture();
568 // collect (iterator)
572 template <typename T>
573 struct CollectContext {
574 struct Nothing { explicit Nothing(int n) {} };
576 using Result = typename std::conditional<
577 std::is_void<T>::value,
579 std::vector<T>>::type;
581 using InternalResult = typename std::conditional<
582 std::is_void<T>::value,
584 std::vector<Optional<T>>>::type;
586 explicit CollectContext(int n) : result(n) {}
588 if (!threw.exchange(true)) {
589 // map Optional<T> -> T
590 std::vector<T> finalResult;
591 finalResult.reserve(result.size());
592 std::transform(result.begin(), result.end(),
593 std::back_inserter(finalResult),
594 [](Optional<T>& o) { return std::move(o.value()); });
595 p.setValue(std::move(finalResult));
598 inline void setPartialResult(size_t i, Try<T>& t) {
599 result[i] = std::move(t.value());
602 InternalResult result;
603 std::atomic<bool> threw {false};
606 // Specialize for void (implementations in Future.cpp)
609 CollectContext<void>::~CollectContext();
612 void CollectContext<void>::setPartialResult(size_t i, Try<void>& t);
616 template <class InputIterator>
617 Future<typename detail::CollectContext<
618 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
619 collect(InputIterator first, InputIterator last) {
621 typename std::iterator_traits<InputIterator>::value_type::value_type T;
623 auto ctx = std::make_shared<detail::CollectContext<T>>(
624 std::distance(first, last));
625 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
626 if (t.hasException()) {
627 if (!ctx->threw.exchange(true)) {
628 ctx->p.setException(std::move(t.exception()));
630 } else if (!ctx->threw) {
631 ctx->setPartialResult(i, t);
634 return ctx->p.getFuture();
637 // collect (variadic)
639 template <typename... Fs>
640 typename detail::CollectVariadicContext<
641 typename std::decay<Fs>::type::value_type...>::type
642 collect(Fs&&... fs) {
643 auto ctx = std::make_shared<detail::CollectVariadicContext<
644 typename std::decay<Fs>::type::value_type...>>();
645 detail::collectVariadicHelper<detail::CollectVariadicContext>(
646 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
647 return ctx->p.getFuture();
650 // collectAny (iterator)
652 template <class InputIterator>
657 std::iterator_traits<InputIterator>::value_type::value_type>>>
658 collectAny(InputIterator first, InputIterator last) {
660 typename std::iterator_traits<InputIterator>::value_type::value_type T;
662 struct CollectAnyContext {
663 CollectAnyContext() {};
664 Promise<std::pair<size_t, Try<T>>> p;
665 std::atomic<bool> done {false};
668 auto ctx = std::make_shared<CollectAnyContext>();
669 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
670 if (!ctx->done.exchange(true)) {
671 ctx->p.setValue(std::make_pair(i, std::move(t)));
674 return ctx->p.getFuture();
677 // collectN (iterator)
679 template <class InputIterator>
680 Future<std::vector<std::pair<size_t, Try<typename
681 std::iterator_traits<InputIterator>::value_type::value_type>>>>
682 collectN(InputIterator first, InputIterator last, size_t n) {
684 std::iterator_traits<InputIterator>::value_type::value_type T;
685 typedef std::vector<std::pair<size_t, Try<T>>> V;
687 struct CollectNContext {
689 std::atomic<size_t> completed = {0};
692 auto ctx = std::make_shared<CollectNContext>();
694 if (size_t(std::distance(first, last)) < n) {
695 ctx->p.setException(std::runtime_error("Not enough futures"));
697 // for each completed Future, increase count and add to vector, until we
698 // have n completed futures at which point we fulfil our Promise with the
700 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
701 auto c = ++ctx->completed;
703 assert(ctx->v.size() < n);
704 ctx->v.emplace_back(i, std::move(t));
706 ctx->p.setTry(Try<V>(std::move(ctx->v)));
712 return ctx->p.getFuture();
717 template <class It, class T, class F>
718 Future<T> reduce(It first, It last, T&& initial, F&& func) {
720 return makeFuture(std::move(initial));
723 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
724 typedef typename std::conditional<
725 detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
726 typedef isTry<Arg> IsTry;
728 folly::MoveWrapper<T> minitial(std::move(initial));
729 auto sfunc = std::make_shared<F>(std::move(func));
731 auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
732 return (*sfunc)(std::move(*minitial),
733 head.template get<IsTry::value, Arg&&>());
736 for (++first; first != last; ++first) {
737 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
738 return (*sfunc)(std::move(std::get<0>(t).value()),
739 // Either return a ItT&& or a Try<ItT>&& depending
740 // on the type of the argument of func.
741 std::get<1>(t).template get<IsTry::value, Arg&&>());
748 // window (collection)
750 template <class Collection, class F, class ItT, class Result>
751 std::vector<Future<Result>>
752 window(Collection input, F func, size_t n) {
753 struct WindowContext {
754 WindowContext(Collection&& i, F&& fn)
755 : input_(std::move(i)), promises_(input_.size()),
758 std::atomic<size_t> i_ {0};
760 std::vector<Promise<Result>> promises_;
763 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
764 size_t i = ctx->i_++;
765 if (i < ctx->input_.size()) {
766 // Using setCallback_ directly since we don't need the Future
767 ctx->func_(std::move(ctx->input_[i])).setCallback_(
768 // ctx is captured by value
769 [ctx, i](Try<Result>&& t) {
770 ctx->promises_[i].setTry(std::move(t));
771 // Chain another future onto this one
772 spawn(std::move(ctx));
778 auto max = std::min(n, input.size());
780 auto ctx = std::make_shared<WindowContext>(
781 std::move(input), std::move(func));
783 for (size_t i = 0; i < max; ++i) {
784 // Start the first n Futures
785 WindowContext::spawn(ctx);
788 std::vector<Future<Result>> futures;
789 futures.reserve(ctx->promises_.size());
790 for (auto& promise : ctx->promises_) {
791 futures.emplace_back(promise.getFuture());
800 template <class I, class F>
801 Future<I> Future<T>::reduce(I&& initial, F&& func) {
802 folly::MoveWrapper<I> minitial(std::move(initial));
803 folly::MoveWrapper<F> mfunc(std::move(func));
804 return then([minitial, mfunc](T& vals) mutable {
805 auto ret = std::move(*minitial);
806 for (auto& val : vals) {
807 ret = (*mfunc)(std::move(ret), std::move(val));
813 // unorderedReduce (iterator)
815 template <class It, class T, class F, class ItT, class Arg>
816 Future<T> unorderedReduce(It first, It last, T initial, F func) {
818 return makeFuture(std::move(initial));
821 typedef isTry<Arg> IsTry;
823 struct UnorderedReduceContext {
824 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
825 : lock_(), memo_(makeFuture<T>(std::move(memo))),
826 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
828 folly::MicroSpinLock lock_; // protects memo_ and numThens_
831 size_t numThens_; // how many Futures completed and called .then()
832 size_t numFutures_; // how many Futures in total
836 auto ctx = std::make_shared<UnorderedReduceContext>(
837 std::move(initial), std::move(func), std::distance(first, last));
839 mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
840 folly::MoveWrapper<Try<ItT>> mt(std::move(t));
841 // Futures can be completed in any order, simultaneously.
842 // To make this non-blocking, we create a new Future chain in
843 // the order of completion to reduce the values.
844 // The spinlock just protects chaining a new Future, not actually
845 // executing the reduce, which should be really fast.
846 folly::MSLGuard lock(ctx->lock_);
847 ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
848 // Either return a ItT&& or a Try<ItT>&& depending
849 // on the type of the argument of func.
850 return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
852 if (++ctx->numThens_ == ctx->numFutures_) {
853 // After reducing the value of the last Future, fulfill the Promise
854 ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
855 ctx->promise_.setValue(std::move(t2));
860 return ctx->promise_.getFuture();
866 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
867 return within(dur, TimedOut(), tk);
872 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
875 Context(E ex) : exception(std::move(ex)), promise() {}
878 std::atomic<bool> token {false};
880 auto ctx = std::make_shared<Context>(std::move(e));
883 tk = folly::detail::getTimekeeperSingleton();
887 .then([ctx](Try<void> const& t) {
888 if (ctx->token.exchange(true) == false) {
889 if (t.hasException()) {
890 ctx->promise.setException(std::move(t.exception()));
892 ctx->promise.setException(std::move(ctx->exception));
897 this->then([ctx](Try<T>&& t) {
898 if (ctx->token.exchange(true) == false) {
899 ctx->promise.setTry(std::move(t));
903 return ctx->promise.getFuture().via(getExecutor());
909 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
910 return collectAll(*this, futures::sleep(dur, tk))
911 .then([](std::tuple<Try<T>, Try<void>> tup) {
912 Try<T>& t = std::get<0>(tup);
913 return makeFuture<T>(std::move(t));
920 void waitImpl(Future<T>& f) {
921 // short-circuit if there's nothing to do
922 if (f.isReady()) return;
924 folly::fibers::Baton baton;
925 f = f.then([&](Try<T> t) {
927 return makeFuture(std::move(t));
931 // There's a race here between the return here and the actual finishing of
932 // the future. f is completed, but the setup may not have finished on done
933 // after the baton has posted.
934 while (!f.isReady()) {
935 std::this_thread::yield();
940 void waitImpl(Future<T>& f, Duration dur) {
941 // short-circuit if there's nothing to do
942 if (f.isReady()) return;
944 auto baton = std::make_shared<folly::fibers::Baton>();
945 f = f.then([baton](Try<T> t) {
947 return makeFuture(std::move(t));
950 // Let's preserve the invariant that if we did not timeout (timed_wait returns
951 // true), then the returned Future is complete when it is returned to the
952 // caller. We need to wait out the race for that Future to complete.
953 if (baton->timed_wait(dur)) {
954 while (!f.isReady()) {
955 std::this_thread::yield();
961 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
962 while (!f.isReady()) {
970 Future<T>& Future<T>::wait() & {
971 detail::waitImpl(*this);
976 Future<T>&& Future<T>::wait() && {
977 detail::waitImpl(*this);
978 return std::move(*this);
982 Future<T>& Future<T>::wait(Duration dur) & {
983 detail::waitImpl(*this, dur);
988 Future<T>&& Future<T>::wait(Duration dur) && {
989 detail::waitImpl(*this, dur);
990 return std::move(*this);
994 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
995 detail::waitViaImpl(*this, e);
1000 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1001 detail::waitViaImpl(*this, e);
1002 return std::move(*this);
1006 T Future<T>::get() {
1007 return std::move(wait().value());
1011 inline void Future<void>::get() {
1016 T Future<T>::get(Duration dur) {
1019 return std::move(value());
1026 inline void Future<void>::get(Duration dur) {
1036 T Future<T>::getVia(DrivableExecutor* e) {
1037 return std::move(waitVia(e).value());
1041 inline void Future<void>::getVia(DrivableExecutor* e) {
1048 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1049 return t1.value() == t2.value();
1054 struct TryEquals<void> {
1055 static bool equals(const Try<void>& t1, const Try<void>& t2) {
1062 Future<bool> Future<T>::willEqual(Future<T>& f) {
1063 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1064 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1065 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1074 Future<T> Future<T>::filter(F predicate) {
1075 auto p = folly::makeMoveWrapper(std::move(predicate));
1076 return this->then([p](T val) {
1077 T const& valConstRef = val;
1078 if (!(*p)(valConstRef)) {
1079 throw PredicateDoesNotObtain();
1086 template <class Callback>
1087 auto Future<T>::thenMulti(Callback&& fn)
1088 -> decltype(this->then(std::forward<Callback>(fn))) {
1089 // thenMulti with one callback is just a then
1090 return then(std::forward<Callback>(fn));
1094 template <class Callback, class... Callbacks>
1095 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1096 -> decltype(this->then(std::forward<Callback>(fn)).
1097 thenMulti(std::forward<Callbacks>(fns)...)) {
1098 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1099 return then(std::forward<Callback>(fn)).
1100 thenMulti(std::forward<Callbacks>(fns)...);
1104 template <class Callback, class... Callbacks>
1105 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1107 -> decltype(this->then(std::forward<Callback>(fn)).
1108 thenMulti(std::forward<Callbacks>(fns)...)) {
1109 // thenMultiExecutor with two callbacks is
1110 // via(x).then(a).thenMulti(b, ...).via(oldX)
1111 auto oldX = getExecutor();
1113 return then(std::forward<Callback>(fn)).
1114 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1118 template <class Callback>
1119 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1120 -> decltype(this->then(std::forward<Callback>(fn))) {
1121 // thenMulti with one callback is just a then with an executor
1122 return then(x, std::forward<Callback>(fn));
1126 template <class It, class F, class ItT, class Result>
1127 std::vector<Future<Result>> map(It first, It last, F func) {
1128 std::vector<Future<Result>> results;
1129 for (auto it = first; it != last; it++) {
1130 results.push_back(it->then(func));
1136 // Instantiate the most common Future types to save compile time
1137 extern template class Future<void>;
1138 extern template class Future<bool>;
1139 extern template class Future<int>;
1140 extern template class Future<int64_t>;
1141 extern template class Future<std::string>;
1142 extern template class Future<double>;
1144 } // namespace folly
1146 // I haven't included a Future<T&> specialization because I don't forsee us
1147 // using it, however it is not difficult to add when needed. Refer to
1148 // Future<void> for guidance. std::future and boost::future code would also be