2 * Copyright 2017-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
23 #include <folly/Optional.h>
24 #include <folly/executors/InlineExecutor.h>
25 #include <folly/futures/Timekeeper.h>
26 #include <folly/futures/detail/Core.h>
27 #include <folly/synchronization/Baton.h>
29 #ifndef FOLLY_FUTURE_USING_FIBER
30 #if FOLLY_MOBILE || defined(__APPLE__)
31 #define FOLLY_FUTURE_USING_FIBER 0
33 #define FOLLY_FUTURE_USING_FIBER 1
34 #include <folly/fibers/Baton.h>
44 #if FOLLY_FUTURE_USING_FIBER
45 typedef folly::fibers::Baton FutureBatonType;
47 typedef folly::Baton<> FutureBatonType;
50 } // namespace futures
53 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
58 // Guarantees that the stored functor is destructed before the stored promise
59 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
60 template <typename T, typename F>
61 class CoreCallbackState {
63 template <typename FF>
64 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
65 noexcept(F(std::declval<FF>())))
66 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
67 assert(before_barrier());
70 CoreCallbackState(CoreCallbackState&& that) noexcept(
71 noexcept(F(std::declval<F>()))) {
72 if (that.before_barrier()) {
73 new (&func_) F(std::move(that.func_));
74 promise_ = that.stealPromise();
78 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
80 ~CoreCallbackState() {
81 if (before_barrier()) {
86 template <typename... Args>
87 auto invoke(Args&&... args) noexcept(
88 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
89 assert(before_barrier());
90 return std::move(func_)(std::forward<Args>(args)...);
93 template <typename... Args>
94 auto tryInvoke(Args&&... args) noexcept {
95 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
98 void setTry(Try<T>&& t) {
99 stealPromise().setTry(std::move(t));
102 void setException(exception_wrapper&& ew) {
103 stealPromise().setException(std::move(ew));
106 Promise<T> stealPromise() noexcept {
107 assert(before_barrier());
109 return std::move(promise_);
113 bool before_barrier() const noexcept {
114 return !promise_.isFulfilled();
120 Promise<T> promise_{Promise<T>::makeEmpty()};
123 template <typename T, typename F>
124 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
125 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
126 std::declval<Promise<T>&&>(),
127 std::declval<F&&>()))) {
128 return CoreCallbackState<T, _t<std::decay<F>>>(
129 std::move(p), std::forward<F>(f));
133 FutureBase<T>::FutureBase(SemiFuture<T>&& other) noexcept : core_(other.core_) {
134 other.core_ = nullptr;
138 FutureBase<T>::FutureBase(Future<T>&& other) noexcept : core_(other.core_) {
139 other.core_ = nullptr;
143 template <class T2, typename>
144 FutureBase<T>::FutureBase(T2&& val)
145 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
148 template <typename T2>
149 FutureBase<T>::FutureBase(
150 typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
151 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
156 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
158 FutureBase<T>::FutureBase(in_place_t, Args&&... args)
160 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
164 template <class FutureType>
165 void FutureBase<T>::assign(FutureType& other) noexcept {
166 std::swap(core_, other.core_);
170 FutureBase<T>::~FutureBase() {
175 T& FutureBase<T>::value() & {
176 return result().value();
180 T const& FutureBase<T>::value() const& {
181 return result().value();
185 T&& FutureBase<T>::value() && {
186 return std::move(result().value());
190 T const&& FutureBase<T>::value() const&& {
191 return std::move(result().value());
195 Try<T>& FutureBase<T>::result() & {
198 return core_->getTry();
202 Try<T> const& FutureBase<T>::result() const& {
205 return core_->getTry();
209 Try<T>&& FutureBase<T>::result() && {
212 return std::move(core_->getTry());
216 Try<T> const&& FutureBase<T>::result() const&& {
219 return std::move(core_->getTry());
223 bool FutureBase<T>::isReady() const {
225 return core_->ready();
229 bool FutureBase<T>::hasValue() {
230 return core_->getTry().hasValue();
234 bool FutureBase<T>::hasException() {
235 return core_->getTry().hasException();
239 void FutureBase<T>::detach() {
241 core_->detachFuture();
247 void FutureBase<T>::throwIfInvalid() const {
254 Optional<Try<T>> FutureBase<T>::poll() {
256 if (core_->ready()) {
257 o = std::move(core_->getTry());
263 void FutureBase<T>::raise(exception_wrapper exception) {
264 core_->raise(std::move(exception));
269 void FutureBase<T>::setCallback_(F&& func) {
271 core_->setCallback(std::forward<F>(func));
275 FutureBase<T>::FutureBase(futures::detail::EmptyConstruct) noexcept
280 // Variant: returns a value
281 // e.g. f.then([](Try<T>&& t){ return t.value(); });
283 template <typename F, typename R, bool isTry, typename... Args>
284 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
285 FutureBase<T>::thenImplementation(
287 futures::detail::argResult<isTry, F, Args...>) {
288 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
289 typedef typename R::ReturnsFuture::Inner B;
291 this->throwIfInvalid();
294 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
296 // grab the Future now before we lose our handle on the Promise
297 auto f = p.getFuture();
298 f.core_->setExecutorNoLock(this->getExecutor());
300 /* This is a bit tricky.
302 We can't just close over *this in case this Future gets moved. So we
303 make a new dummy Future. We could figure out something more
304 sophisticated that avoids making a new Future object when it can, as an
305 optimization. But this is correct.
307 core_ can't be moved, it is explicitly disallowed (as is copying). But
308 if there's ever a reason to allow it, this is one place that makes that
309 assumption and would need to be fixed. We use a standard shared pointer
310 for core_ (by copying it in), which means in essence obj holds a shared
311 pointer to itself. But this shouldn't leak because Promise will not
312 outlive the continuation, because Promise will setException() with a
313 broken Promise if it is destructed before completed. We could use a
314 weak pointer but it would have to be converted to a shared pointer when
315 func is executed (because the Future returned by func may possibly
316 persist beyond the callback, if it gets moved), and so it is an
317 optimization to just make it shared from the get-go.
319 Two subtle but important points about this design. futures::detail::Core
320 has no back pointers to Future or Promise, so if Future or Promise get
321 moved (and they will be moved in performant code) we don't have to do
322 anything fancy. And because we store the continuation in the
323 futures::detail::Core, not in the Future, we can execute the continuation
324 even after the Future has gone out of scope. This is an intentional design
325 decision. It is likely we will want to be able to cancel a continuation
326 in some circumstances, but I think it should be explicit not implicit
327 in the destruction of the Future used to create it.
330 [state = futures::detail::makeCoreCallbackState(
331 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
333 if (!isTry && t.hasException()) {
334 state.setException(std::move(t.exception()));
336 state.setTry(makeTryWith(
337 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
343 // Variant: returns a Future
344 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
346 template <typename F, typename R, bool isTry, typename... Args>
347 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
348 FutureBase<T>::thenImplementation(
350 futures::detail::argResult<isTry, F, Args...>) {
351 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
352 typedef typename R::ReturnsFuture::Inner B;
353 this->throwIfInvalid();
356 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
358 // grab the Future now before we lose our handle on the Promise
359 auto f = p.getFuture();
360 f.core_->setExecutorNoLock(this->getExecutor());
363 [state = futures::detail::makeCoreCallbackState(
364 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
365 if (!isTry && t.hasException()) {
366 state.setException(std::move(t.exception()));
368 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
369 if (tf2.hasException()) {
370 state.setException(std::move(tf2.exception()));
372 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
373 p.setTry(std::move(b));
381 } // namespace detail
382 } // namespace futures
385 SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
386 return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
389 // makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
391 typename std::enable_if<
392 isSemiFuture<typename std::result_of<F()>::type>::value,
393 typename std::result_of<F()>::type>::type
394 makeSemiFutureWith(F&& func) {
396 typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
398 return std::forward<F>(func)();
399 } catch (std::exception& e) {
400 return makeSemiFuture<InnerType>(
401 exception_wrapper(std::current_exception(), e));
403 return makeSemiFuture<InnerType>(
404 exception_wrapper(std::current_exception()));
408 // makeSemiFutureWith(T()) -> SemiFuture<T>
409 // makeSemiFutureWith(void()) -> SemiFuture<Unit>
411 typename std::enable_if<
412 !(isSemiFuture<typename std::result_of<F()>::type>::value),
413 SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
414 makeSemiFutureWith(F&& func) {
415 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
416 return makeSemiFuture<LiftedResult>(
417 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
421 SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
422 return makeSemiFuture(Try<T>(e));
426 SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
427 return makeSemiFuture(Try<T>(std::move(ew)));
430 template <class T, class E>
432 enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
433 makeSemiFuture(E const& e) {
434 return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
438 SemiFuture<T> makeSemiFuture(Try<T>&& t) {
439 return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
442 // This must be defined after the constructors to avoid a bug in MSVC
443 // https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
444 inline SemiFuture<Unit> makeSemiFuture() {
445 return makeSemiFuture(Unit{});
449 SemiFuture<T> SemiFuture<T>::makeEmpty() {
450 return SemiFuture<T>(futures::detail::EmptyConstruct{});
454 SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept
455 : futures::detail::FutureBase<T>(std::move(other)) {}
458 SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept
459 : futures::detail::FutureBase<T>(std::move(other)) {
460 // SemiFuture should not have an executor on construction
462 this->setExecutor(nullptr);
467 SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
473 SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
475 // SemiFuture should not have an executor on construction
477 this->setExecutor(nullptr);
483 void SemiFuture<T>::boost_() {
484 // If a SemiFuture has an executor it should be deferred, so boost it
485 if (auto e = this->getExecutor()) {
486 // We know in a SemiFuture that if we have an executor it should be
487 // DeferredExecutor. Verify this in debug mode.
488 DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(e));
490 auto ka = static_cast<DeferredExecutor*>(e)->getKeepAliveToken();
491 static_cast<DeferredExecutor*>(e)->boost();
496 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
502 // If current executor is deferred, boost block to ensure that work
503 // progresses and is run on the new executor.
504 auto oldExecutor = this->getExecutor();
505 if (oldExecutor && executor && (executor != oldExecutor)) {
506 // We know in a SemiFuture that if we have an executor it should be
507 // DeferredExecutor. Verify this in debug mode.
508 DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(this->getExecutor()));
509 if (static_cast<DeferredExecutor*>(oldExecutor)) {
510 executor->add([oldExecutorKA = oldExecutor->getKeepAliveToken()]() {
511 static_cast<DeferredExecutor*>(oldExecutorKA.get())->boost();
516 this->setExecutor(executor, priority);
518 auto newFuture = Future<T>(this->core_);
519 this->core_ = nullptr;
524 template <typename F>
525 SemiFuture<typename futures::detail::callableResult<T, F>::Return::value_type>
526 SemiFuture<T>::defer(F&& func) && {
527 // If we already have a deferred executor, use it, otherwise create one
528 auto defKeepAlive = this->getExecutor()
529 ? this->getExecutor()->getKeepAliveToken()
530 : DeferredExecutor::create();
531 auto e = defKeepAlive.get();
532 // We know in a SemiFuture that if we have an executor it should be
533 // DeferredExecutor (either it was that way before, or we just created it).
534 // Verify this in debug mode.
535 DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(e));
536 // Convert to a folly::future with a deferred executor
537 // Will be low-cost if this is not a new executor as via optimises for that
542 // Then add the work, with a wrapper function that captures the
543 // keepAlive so the executor is destroyed at the right time.
545 DeferredExecutor::wrap(std::move(defKeepAlive), std::move(func)))
546 // Finally, convert back o a folly::SemiFuture to hide the executor
548 // Carry deferred executor through chain as constructor from Future will
555 Future<T> Future<T>::makeEmpty() {
556 return Future<T>(futures::detail::EmptyConstruct{});
560 Future<T>::Future(Future<T>&& other) noexcept
561 : futures::detail::FutureBase<T>(std::move(other)) {}
564 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
572 typename std::enable_if<
573 !std::is_same<T, typename std::decay<T2>::type>::value &&
574 std::is_constructible<T, T2&&>::value &&
575 std::is_convertible<T2&&, T>::value,
577 Future<T>::Future(Future<T2>&& other)
578 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
583 typename std::enable_if<
584 !std::is_same<T, typename std::decay<T2>::type>::value &&
585 std::is_constructible<T, T2&&>::value &&
586 !std::is_convertible<T2&&, T>::value,
588 Future<T>::Future(Future<T2>&& other)
589 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
594 typename std::enable_if<
595 !std::is_same<T, typename std::decay<T2>::type>::value &&
596 std::is_constructible<T, T2&&>::value,
598 Future<T>& Future<T>::operator=(Future<T2>&& other) {
600 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
608 enable_if<isFuture<F>::value, Future<typename isFuture<T>::Inner>>::type
609 Future<T>::unwrap() {
610 return then([](Future<typename isFuture<T>::Inner> internal_future) {
611 return internal_future;
616 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
617 this->throwIfInvalid();
619 this->setExecutor(executor, priority);
621 auto newFuture = Future<T>(this->core_);
622 this->core_ = nullptr;
627 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
628 this->throwIfInvalid();
630 auto f = p.getFuture();
631 auto func = [p = std::move(p)](Try<T>&& t) mutable {
632 p.setTry(std::move(t));
634 using R = futures::detail::callableResult<T, decltype(func)>;
635 this->template thenImplementation<decltype(func), R>(
636 std::move(func), typename R::Arg());
637 return std::move(f).via(executor, priority);
640 template <typename T>
641 template <typename R, typename Caller, typename... Args>
642 Future<typename isFuture<R>::Inner>
643 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
644 typedef typename std::remove_cv<typename std::remove_reference<
645 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
648 return then([instance, func](Try<T>&& t){
649 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
654 Future<Unit> Future<T>::then() {
655 return then([] () {});
658 // onError where the callback returns T
661 typename std::enable_if<
662 !futures::detail::callableWith<F, exception_wrapper>::value &&
663 !futures::detail::callableWith<F, exception_wrapper&>::value &&
664 !futures::detail::Extract<F>::ReturnsFuture::value,
666 Future<T>::onError(F&& func) {
667 typedef std::remove_reference_t<
668 typename futures::detail::Extract<F>::FirstArg>
671 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
672 "Return type of onError callback must be T or Future<T>");
675 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
676 auto f = p.getFuture();
679 [state = futures::detail::makeCoreCallbackState(
680 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
681 if (auto e = t.template tryGetExceptionObject<Exn>()) {
682 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
684 state.setTry(std::move(t));
691 // onError where the callback returns Future<T>
694 typename std::enable_if<
695 !futures::detail::callableWith<F, exception_wrapper>::value &&
696 !futures::detail::callableWith<F, exception_wrapper&>::value &&
697 futures::detail::Extract<F>::ReturnsFuture::value,
699 Future<T>::onError(F&& func) {
701 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
703 "Return type of onError callback must be T or Future<T>");
704 typedef std::remove_reference_t<
705 typename futures::detail::Extract<F>::FirstArg>
709 auto f = p.getFuture();
712 [state = futures::detail::makeCoreCallbackState(
713 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
714 if (auto e = t.template tryGetExceptionObject<Exn>()) {
715 auto tf2 = state.tryInvoke(*e);
716 if (tf2.hasException()) {
717 state.setException(std::move(tf2.exception()));
719 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
720 p.setTry(std::move(t3));
724 state.setTry(std::move(t));
733 Future<T> Future<T>::ensure(F&& func) {
734 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
736 return makeFuture(std::move(t));
742 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
743 return within(dur, tk).onError([funcw = std::forward<F>(func)](
744 TimedOut const&) { return std::move(funcw)(); });
749 typename std::enable_if<
750 futures::detail::callableWith<F, exception_wrapper>::value &&
751 futures::detail::Extract<F>::ReturnsFuture::value,
753 Future<T>::onError(F&& func) {
755 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
757 "Return type of onError callback must be T or Future<T>");
760 auto f = p.getFuture();
762 [state = futures::detail::makeCoreCallbackState(
763 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
764 if (t.hasException()) {
765 auto tf2 = state.tryInvoke(std::move(t.exception()));
766 if (tf2.hasException()) {
767 state.setException(std::move(tf2.exception()));
769 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
770 p.setTry(std::move(t3));
774 state.setTry(std::move(t));
781 // onError(exception_wrapper) that returns T
784 typename std::enable_if<
785 futures::detail::callableWith<F, exception_wrapper>::value &&
786 !futures::detail::Extract<F>::ReturnsFuture::value,
788 Future<T>::onError(F&& func) {
790 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
792 "Return type of onError callback must be T or Future<T>");
795 auto f = p.getFuture();
797 [state = futures::detail::makeCoreCallbackState(
798 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
799 if (t.hasException()) {
800 state.setTry(makeTryWith(
801 [&] { return state.invoke(std::move(t.exception())); }));
803 state.setTry(std::move(t));
810 template <class Func>
811 auto via(Executor* x, Func&& func)
812 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
813 // TODO make this actually more performant. :-P #7260175
814 return via(x).then(std::forward<Func>(func));
820 Future<typename std::decay<T>::type> makeFuture(T&& t) {
821 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
824 inline Future<Unit> makeFuture() {
825 return makeFuture(Unit{});
828 // makeFutureWith(Future<T>()) -> Future<T>
830 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
831 typename std::result_of<F()>::type>::type
832 makeFutureWith(F&& func) {
834 typename isFuture<typename std::result_of<F()>::type>::Inner;
836 return std::forward<F>(func)();
837 } catch (std::exception& e) {
838 return makeFuture<InnerType>(
839 exception_wrapper(std::current_exception(), e));
841 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
845 // makeFutureWith(T()) -> Future<T>
846 // makeFutureWith(void()) -> Future<Unit>
848 typename std::enable_if<
849 !(isFuture<typename std::result_of<F()>::type>::value),
850 Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
851 makeFutureWith(F&& func) {
852 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
853 return makeFuture<LiftedResult>(
854 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
858 Future<T> makeFuture(std::exception_ptr const& e) {
859 return makeFuture(Try<T>(e));
863 Future<T> makeFuture(exception_wrapper ew) {
864 return makeFuture(Try<T>(std::move(ew)));
867 template <class T, class E>
868 typename std::enable_if<std::is_base_of<std::exception, E>::value,
870 makeFuture(E const& e) {
871 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
875 Future<T> makeFuture(Try<T>&& t) {
876 return Future<T>(new futures::detail::Core<T>(std::move(t)));
880 Future<Unit> via(Executor* executor, int8_t priority) {
881 return makeFuture().via(executor, priority);
884 // mapSetCallback calls func(i, Try<T>) when every future completes
886 template <class T, class InputIterator, class F>
887 void mapSetCallback(InputIterator first, InputIterator last, F func) {
888 for (size_t i = 0; first != last; ++first, ++i) {
889 first->setCallback_([func, i](Try<T>&& t) {
890 func(i, std::move(t));
895 // collectAll (variadic)
897 template <typename... Fs>
898 typename futures::detail::CollectAllVariadicContext<
899 typename std::decay<Fs>::type::value_type...>::type
900 collectAll(Fs&&... fs) {
901 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
902 typename std::decay<Fs>::type::value_type...>>();
903 futures::detail::collectVariadicHelper<
904 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
905 return ctx->p.getFuture();
908 // collectAll (iterator)
910 template <class InputIterator>
913 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
914 collectAll(InputIterator first, InputIterator last) {
916 typename std::iterator_traits<InputIterator>::value_type::value_type T;
918 struct CollectAllContext {
919 CollectAllContext(size_t n) : results(n) {}
920 ~CollectAllContext() {
921 p.setValue(std::move(results));
923 Promise<std::vector<Try<T>>> p;
924 std::vector<Try<T>> results;
928 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
929 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
930 ctx->results[i] = std::move(t);
932 return ctx->p.getFuture();
935 // collect (iterator)
940 template <typename T>
941 struct CollectContext {
943 explicit Nothing(int /* n */) {}
946 using Result = typename std::conditional<
947 std::is_void<T>::value,
949 std::vector<T>>::type;
951 using InternalResult = typename std::conditional<
952 std::is_void<T>::value,
954 std::vector<Optional<T>>>::type;
956 explicit CollectContext(size_t n) : result(n) {}
958 if (!threw.exchange(true)) {
959 // map Optional<T> -> T
960 std::vector<T> finalResult;
961 finalResult.reserve(result.size());
962 std::transform(result.begin(), result.end(),
963 std::back_inserter(finalResult),
964 [](Optional<T>& o) { return std::move(o.value()); });
965 p.setValue(std::move(finalResult));
968 inline void setPartialResult(size_t i, Try<T>& t) {
969 result[i] = std::move(t.value());
972 InternalResult result;
973 std::atomic<bool> threw {false};
976 } // namespace detail
977 } // namespace futures
979 template <class InputIterator>
980 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
981 InputIterator>::value_type::value_type>::Result>
982 collect(InputIterator first, InputIterator last) {
984 typename std::iterator_traits<InputIterator>::value_type::value_type T;
986 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
987 std::distance(first, last));
988 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
989 if (t.hasException()) {
990 if (!ctx->threw.exchange(true)) {
991 ctx->p.setException(std::move(t.exception()));
993 } else if (!ctx->threw) {
994 ctx->setPartialResult(i, t);
997 return ctx->p.getFuture();
1000 // collect (variadic)
1002 template <typename... Fs>
1003 typename futures::detail::CollectVariadicContext<
1004 typename std::decay<Fs>::type::value_type...>::type
1005 collect(Fs&&... fs) {
1006 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
1007 typename std::decay<Fs>::type::value_type...>>();
1008 futures::detail::collectVariadicHelper<
1009 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
1010 return ctx->p.getFuture();
1013 // collectAny (iterator)
1015 template <class InputIterator>
1020 std::iterator_traits<InputIterator>::value_type::value_type>>>
1021 collectAny(InputIterator first, InputIterator last) {
1023 typename std::iterator_traits<InputIterator>::value_type::value_type T;
1025 struct CollectAnyContext {
1026 CollectAnyContext() {}
1027 Promise<std::pair<size_t, Try<T>>> p;
1028 std::atomic<bool> done {false};
1031 auto ctx = std::make_shared<CollectAnyContext>();
1032 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
1033 if (!ctx->done.exchange(true)) {
1034 ctx->p.setValue(std::make_pair(i, std::move(t)));
1037 return ctx->p.getFuture();
1040 // collectAnyWithoutException (iterator)
1042 template <class InputIterator>
1045 typename std::iterator_traits<InputIterator>::value_type::value_type>>
1046 collectAnyWithoutException(InputIterator first, InputIterator last) {
1048 typename std::iterator_traits<InputIterator>::value_type::value_type T;
1050 struct CollectAnyWithoutExceptionContext {
1051 CollectAnyWithoutExceptionContext(){}
1052 Promise<std::pair<size_t, T>> p;
1053 std::atomic<bool> done{false};
1054 std::atomic<size_t> nFulfilled{0};
1058 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
1059 ctx->nTotal = size_t(std::distance(first, last));
1061 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
1062 if (!t.hasException() && !ctx->done.exchange(true)) {
1063 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
1064 } else if (++ctx->nFulfilled == ctx->nTotal) {
1065 ctx->p.setException(t.exception());
1068 return ctx->p.getFuture();
1071 // collectN (iterator)
1073 template <class InputIterator>
1074 Future<std::vector<std::pair<size_t, Try<typename
1075 std::iterator_traits<InputIterator>::value_type::value_type>>>>
1076 collectN(InputIterator first, InputIterator last, size_t n) {
1078 std::iterator_traits<InputIterator>::value_type::value_type T;
1079 typedef std::vector<std::pair<size_t, Try<T>>> V;
1081 struct CollectNContext {
1083 std::atomic<size_t> completed = {0};
1086 auto ctx = std::make_shared<CollectNContext>();
1088 if (size_t(std::distance(first, last)) < n) {
1089 ctx->p.setException(std::runtime_error("Not enough futures"));
1091 // for each completed Future, increase count and add to vector, until we
1092 // have n completed futures at which point we fulfil our Promise with the
1094 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
1095 auto c = ++ctx->completed;
1097 assert(ctx->v.size() < n);
1098 ctx->v.emplace_back(i, std::move(t));
1100 ctx->p.setTry(Try<V>(std::move(ctx->v)));
1106 return ctx->p.getFuture();
1109 // reduce (iterator)
1111 template <class It, class T, class F>
1112 Future<T> reduce(It first, It last, T&& initial, F&& func) {
1113 if (first == last) {
1114 return makeFuture(std::move(initial));
1117 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
1118 typedef typename std::conditional<
1119 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
1122 typedef isTry<Arg> IsTry;
1124 auto sfunc = std::make_shared<F>(std::move(func));
1126 auto f = first->then(
1127 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
1129 std::move(minitial), head.template get<IsTry::value, Arg&&>());
1132 for (++first; first != last; ++first) {
1133 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
1134 return (*sfunc)(std::move(std::get<0>(t).value()),
1135 // Either return a ItT&& or a Try<ItT>&& depending
1136 // on the type of the argument of func.
1137 std::get<1>(t).template get<IsTry::value, Arg&&>());
1144 // window (collection)
1146 template <class Collection, class F, class ItT, class Result>
1147 std::vector<Future<Result>>
1148 window(Collection input, F func, size_t n) {
1149 // Use global inline executor singleton
1150 auto executor = &InlineExecutor::instance();
1151 return window(executor, std::move(input), std::move(func), n);
1154 template <class Collection, class F, class ItT, class Result>
1155 std::vector<Future<Result>>
1156 window(Executor* executor, Collection input, F func, size_t n) {
1157 struct WindowContext {
1158 WindowContext(Executor* executor_, Collection&& input_, F&& func_)
1159 : executor(executor_),
1160 input(std::move(input_)),
1161 promises(input.size()),
1162 func(std::move(func_)) {}
1163 std::atomic<size_t> i{0};
1166 std::vector<Promise<Result>> promises;
1169 static inline void spawn(std::shared_ptr<WindowContext> ctx) {
1170 size_t i = ctx->i++;
1171 if (i < ctx->input.size()) {
1172 auto fut = ctx->func(std::move(ctx->input[i]));
1173 fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
1174 const auto executor_ = ctx->executor;
1175 executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
1176 ctx->promises[i].setTry(std::move(t));
1177 // Chain another future onto this one
1178 spawn(std::move(ctx));
1185 auto max = std::min(n, input.size());
1187 auto ctx = std::make_shared<WindowContext>(
1188 executor, std::move(input), std::move(func));
1190 // Start the first n Futures
1191 for (size_t i = 0; i < max; ++i) {
1192 executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
1195 std::vector<Future<Result>> futures;
1196 futures.reserve(ctx->promises.size());
1197 for (auto& promise : ctx->promises) {
1198 futures.emplace_back(promise.getFuture());
1207 template <class I, class F>
1208 Future<I> Future<T>::reduce(I&& initial, F&& func) {
1210 minitial = std::forward<I>(initial),
1211 mfunc = std::forward<F>(func)
1212 ](T& vals) mutable {
1213 auto ret = std::move(minitial);
1214 for (auto& val : vals) {
1215 ret = mfunc(std::move(ret), std::move(val));
1221 // unorderedReduce (iterator)
1223 template <class It, class T, class F, class ItT, class Arg>
1224 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1225 if (first == last) {
1226 return makeFuture(std::move(initial));
1229 typedef isTry<Arg> IsTry;
1231 struct UnorderedReduceContext {
1232 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1233 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1234 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1236 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1239 size_t numThens_; // how many Futures completed and called .then()
1240 size_t numFutures_; // how many Futures in total
1241 Promise<T> promise_;
1244 auto ctx = std::make_shared<UnorderedReduceContext>(
1245 std::move(initial), std::move(func), std::distance(first, last));
1247 mapSetCallback<ItT>(
1250 [ctx](size_t /* i */, Try<ItT>&& t) {
1251 // Futures can be completed in any order, simultaneously.
1252 // To make this non-blocking, we create a new Future chain in
1253 // the order of completion to reduce the values.
1254 // The spinlock just protects chaining a new Future, not actually
1255 // executing the reduce, which should be really fast.
1256 folly::MSLGuard lock(ctx->lock_);
1258 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1259 // Either return a ItT&& or a Try<ItT>&& depending
1260 // on the type of the argument of func.
1261 return ctx->func_(std::move(v),
1262 mt.template get<IsTry::value, Arg&&>());
1264 if (++ctx->numThens_ == ctx->numFutures_) {
1265 // After reducing the value of the last Future, fulfill the Promise
1266 ctx->memo_.setCallback_(
1267 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1271 return ctx->promise_.getFuture();
1277 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1278 return within(dur, TimedOut(), tk);
1283 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1286 Context(E ex) : exception(std::move(ex)), promise() {}
1288 Future<Unit> thisFuture;
1290 std::atomic<bool> token {false};
1293 if (this->isReady()) {
1294 return std::move(*this);
1297 std::shared_ptr<Timekeeper> tks;
1299 tks = folly::detail::getTimekeeperSingleton();
1303 if (UNLIKELY(!tk)) {
1304 return makeFuture<T>(NoTimekeeper());
1307 auto ctx = std::make_shared<Context>(std::move(e));
1309 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1310 if (ctx->token.exchange(true) == false) {
1311 ctx->promise.setTry(std::move(t));
1315 // Have time keeper use a weak ptr to hold ctx,
1316 // so that ctx can be deallocated as soon as the future job finished.
1317 tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1318 auto lockedCtx = weakCtx.lock();
1320 // ctx already released. "this" completed first, cancel "after"
1323 // "after" completed first, cancel "this"
1324 lockedCtx->thisFuture.raise(TimedOut());
1325 if (lockedCtx->token.exchange(true) == false) {
1326 if (t.hasException()) {
1327 lockedCtx->promise.setException(std::move(t.exception()));
1329 lockedCtx->promise.setException(std::move(lockedCtx->exception));
1334 return ctx->promise.getFuture().via(this->getExecutor());
1340 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1341 return collectAll(*this, futures::sleep(dur, tk))
1342 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1343 Try<T>& t = std::get<0>(tup);
1344 return makeFuture<T>(std::move(t));
1352 void doBoost(folly::Future<T>& /* usused */) {}
1355 void doBoost(folly::SemiFuture<T>& f) {
1359 template <class FutureType, typename T = typename FutureType::value_type>
1360 void waitImpl(FutureType& f) {
1361 // short-circuit if there's nothing to do
1366 FutureBatonType baton;
1367 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1370 assert(f.isReady());
1373 template <class FutureType, typename T = typename FutureType::value_type>
1374 void waitImpl(FutureType& f, Duration dur) {
1375 // short-circuit if there's nothing to do
1381 auto ret = promise.getFuture();
1382 auto baton = std::make_shared<FutureBatonType>();
1383 f.setCallback_([baton, promise = std::move(promise)](Try<T>&& t) mutable {
1384 promise.setTry(std::move(t));
1389 if (baton->try_wait_for(dur)) {
1390 assert(f.isReady());
1395 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1396 // Set callback so to ensure that the via executor has something on it
1397 // so that once the preceding future triggers this callback, drive will
1398 // always have a callback to satisfy it
1402 f = f.via(e).then([](T&& t) { return std::move(t); });
1403 while (!f.isReady()) {
1406 assert(f.isReady());
1410 void waitViaImpl(SemiFuture<T>& f, DrivableExecutor* e) {
1411 // Set callback so to ensure that the via executor has something on it
1412 // so that once the preceding future triggers this callback, drive will
1413 // always have a callback to satisfy it
1417 f = std::move(f).via(e).then([](T&& t) { return std::move(t); });
1418 while (!f.isReady()) {
1421 assert(f.isReady());
1424 } // namespace detail
1425 } // namespace futures
1428 SemiFuture<T>& SemiFuture<T>::wait() & {
1429 futures::detail::waitImpl(*this);
1434 SemiFuture<T>&& SemiFuture<T>::wait() && {
1435 futures::detail::waitImpl(*this);
1436 return std::move(*this);
1440 SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
1441 futures::detail::waitImpl(*this, dur);
1446 SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
1447 futures::detail::waitImpl(*this, dur);
1448 return std::move(*this);
1452 SemiFuture<T>& SemiFuture<T>::waitVia(DrivableExecutor* e) & {
1453 futures::detail::waitViaImpl(*this, e);
1458 SemiFuture<T>&& SemiFuture<T>::waitVia(DrivableExecutor* e) && {
1459 futures::detail::waitViaImpl(*this, e);
1460 return std::move(*this);
1464 T SemiFuture<T>::get() && {
1465 return std::move(wait()).value();
1469 T SemiFuture<T>::get(Duration dur) && {
1471 if (this->isReady()) {
1472 return std::move(this->value());
1479 Try<T> SemiFuture<T>::getTry() && {
1480 return std::move(wait()).result();
1484 Try<T> SemiFuture<T>::getTry(Duration dur) && {
1486 if (this->isReady()) {
1487 return std::move(this->result());
1494 T SemiFuture<T>::getVia(DrivableExecutor* e) && {
1495 return std::move(waitVia(e)).value();
1499 Try<T> SemiFuture<T>::getTryVia(DrivableExecutor* e) && {
1500 return std::move(waitVia(e)).result();
1504 Future<T>& Future<T>::wait() & {
1505 futures::detail::waitImpl(*this);
1510 Future<T>&& Future<T>::wait() && {
1511 futures::detail::waitImpl(*this);
1512 return std::move(*this);
1516 Future<T>& Future<T>::wait(Duration dur) & {
1517 futures::detail::waitImpl(*this, dur);
1522 Future<T>&& Future<T>::wait(Duration dur) && {
1523 futures::detail::waitImpl(*this, dur);
1524 return std::move(*this);
1528 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1529 futures::detail::waitViaImpl(*this, e);
1534 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1535 futures::detail::waitViaImpl(*this, e);
1536 return std::move(*this);
1540 T Future<T>::get() {
1541 return std::move(wait().value());
1545 T Future<T>::get(Duration dur) {
1547 if (this->isReady()) {
1548 return std::move(this->value());
1555 Try<T>& Future<T>::getTry() {
1560 T Future<T>::getVia(DrivableExecutor* e) {
1561 return std::move(waitVia(e).value());
1565 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
1566 return waitVia(e).getTry();
1573 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1574 return t1.value() == t2.value();
1577 } // namespace detail
1578 } // namespace futures
1581 Future<bool> Future<T>::willEqual(Future<T>& f) {
1582 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1583 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1584 return futures::detail::TryEquals<T>::equals(
1585 std::get<0>(t), std::get<1>(t));
1594 Future<T> Future<T>::filter(F&& predicate) {
1595 return this->then([p = std::forward<F>(predicate)](T val) {
1596 T const& valConstRef = val;
1597 if (!p(valConstRef)) {
1598 throwPredicateDoesNotObtain();
1605 inline Future<Unit> when(bool p, F&& thunk) {
1606 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1609 template <class P, class F>
1610 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1612 auto future = thunk();
1613 return future.then([
1614 predicate = std::forward<P>(predicate),
1615 thunk = std::forward<F>(thunk)
1617 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1620 return makeFuture();
1624 Future<Unit> times(const int n, F&& thunk) {
1625 return folly::whileDo(
1626 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1627 return count->fetch_add(1) < n;
1629 std::forward<F>(thunk));
1633 template <class It, class F, class ItT, class Result>
1634 std::vector<Future<Result>> map(It first, It last, F func) {
1635 std::vector<Future<Result>> results;
1636 for (auto it = first; it != last; it++) {
1637 results.push_back(it->then(func));
1641 } // namespace futures
1643 // Instantiate the most common Future types to save compile time
1644 extern template class Future<Unit>;
1645 extern template class Future<bool>;
1646 extern template class Future<int>;
1647 extern template class Future<int64_t>;
1648 extern template class Future<std::string>;
1649 extern template class Future<double>;
1650 } // namespace folly