2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/executors/InlineExecutor.h>
27 #include <folly/futures/Timekeeper.h>
28 #include <folly/futures/detail/Core.h>
30 #ifndef FOLLY_FUTURE_USING_FIBER
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
45 #if FOLLY_FUTURE_USING_FIBER
46 typedef folly::fibers::Baton FutureBatonType;
48 typedef folly::Baton<> FutureBatonType;
51 } // namespace futures
54 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
59 // Guarantees that the stored functor is destructed before the stored promise
60 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
61 template <typename T, typename F>
62 class CoreCallbackState {
64 template <typename FF>
65 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
66 noexcept(F(std::declval<FF>())))
67 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
68 assert(before_barrier());
71 CoreCallbackState(CoreCallbackState&& that) noexcept(
72 noexcept(F(std::declval<F>()))) {
73 if (that.before_barrier()) {
74 new (&func_) F(std::move(that.func_));
75 promise_ = that.stealPromise();
79 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
81 ~CoreCallbackState() {
82 if (before_barrier()) {
87 template <typename... Args>
88 auto invoke(Args&&... args) noexcept(
89 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
90 assert(before_barrier());
91 return std::move(func_)(std::forward<Args>(args)...);
94 template <typename... Args>
95 auto tryInvoke(Args&&... args) noexcept {
96 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
99 void setTry(Try<T>&& t) {
100 stealPromise().setTry(std::move(t));
103 void setException(exception_wrapper&& ew) {
104 stealPromise().setException(std::move(ew));
107 Promise<T> stealPromise() noexcept {
108 assert(before_barrier());
110 return std::move(promise_);
114 bool before_barrier() const noexcept {
115 return !promise_.isFulfilled();
121 Promise<T> promise_{Promise<T>::makeEmpty()};
124 template <typename T, typename F>
125 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
126 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
127 std::declval<Promise<T>&&>(),
128 std::declval<F&&>()))) {
129 return CoreCallbackState<T, _t<std::decay<F>>>(
130 std::move(p), std::forward<F>(f));
134 FutureBase<T>::FutureBase(SemiFuture<T>&& other) noexcept : core_(other.core_) {
135 other.core_ = nullptr;
139 FutureBase<T>::FutureBase(Future<T>&& other) noexcept : core_(other.core_) {
140 other.core_ = nullptr;
144 template <class T2, typename>
145 FutureBase<T>::FutureBase(T2&& val)
146 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
149 template <typename T2>
150 FutureBase<T>::FutureBase(
151 typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
152 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
157 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
159 FutureBase<T>::FutureBase(in_place_t, Args&&... args)
161 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
165 template <class FutureType>
166 void FutureBase<T>::assign(FutureType& other) noexcept {
167 std::swap(core_, other.core_);
171 FutureBase<T>::~FutureBase() {
176 T& FutureBase<T>::value() & {
179 return core_->getTry().value();
183 T const& FutureBase<T>::value() const& {
186 return core_->getTry().value();
190 T&& FutureBase<T>::value() && {
193 return std::move(core_->getTry().value());
197 T const&& FutureBase<T>::value() const&& {
200 return std::move(core_->getTry().value());
204 bool FutureBase<T>::isReady() const {
206 return core_->ready();
210 bool FutureBase<T>::hasValue() {
211 return getTry().hasValue();
215 bool FutureBase<T>::hasException() {
216 return getTry().hasException();
220 void FutureBase<T>::detach() {
222 core_->detachFuture();
228 Try<T>& FutureBase<T>::getTry() {
231 return core_->getTry();
235 void FutureBase<T>::throwIfInvalid() const {
242 Optional<Try<T>> FutureBase<T>::poll() {
244 if (core_->ready()) {
245 o = std::move(core_->getTry());
251 void FutureBase<T>::raise(exception_wrapper exception) {
252 core_->raise(std::move(exception));
257 void FutureBase<T>::setCallback_(F&& func) {
259 core_->setCallback(std::forward<F>(func));
263 FutureBase<T>::FutureBase(futures::detail::EmptyConstruct) noexcept
268 // Variant: returns a value
269 // e.g. f.then([](Try<T>&& t){ return t.value(); });
271 template <typename F, typename R, bool isTry, typename... Args>
272 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
273 FutureBase<T>::thenImplementation(
275 futures::detail::argResult<isTry, F, Args...>) {
276 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
277 typedef typename R::ReturnsFuture::Inner B;
279 this->throwIfInvalid();
282 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
284 // grab the Future now before we lose our handle on the Promise
285 auto f = p.getFuture();
286 f.core_->setExecutorNoLock(this->getExecutor());
288 /* This is a bit tricky.
290 We can't just close over *this in case this Future gets moved. So we
291 make a new dummy Future. We could figure out something more
292 sophisticated that avoids making a new Future object when it can, as an
293 optimization. But this is correct.
295 core_ can't be moved, it is explicitly disallowed (as is copying). But
296 if there's ever a reason to allow it, this is one place that makes that
297 assumption and would need to be fixed. We use a standard shared pointer
298 for core_ (by copying it in), which means in essence obj holds a shared
299 pointer to itself. But this shouldn't leak because Promise will not
300 outlive the continuation, because Promise will setException() with a
301 broken Promise if it is destructed before completed. We could use a
302 weak pointer but it would have to be converted to a shared pointer when
303 func is executed (because the Future returned by func may possibly
304 persist beyond the callback, if it gets moved), and so it is an
305 optimization to just make it shared from the get-go.
307 Two subtle but important points about this design. futures::detail::Core
308 has no back pointers to Future or Promise, so if Future or Promise get
309 moved (and they will be moved in performant code) we don't have to do
310 anything fancy. And because we store the continuation in the
311 futures::detail::Core, not in the Future, we can execute the continuation
312 even after the Future has gone out of scope. This is an intentional design
313 decision. It is likely we will want to be able to cancel a continuation
314 in some circumstances, but I think it should be explicit not implicit
315 in the destruction of the Future used to create it.
318 [state = futures::detail::makeCoreCallbackState(
319 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
321 if (!isTry && t.hasException()) {
322 state.setException(std::move(t.exception()));
324 state.setTry(makeTryWith(
325 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
331 // Variant: returns a Future
332 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
334 template <typename F, typename R, bool isTry, typename... Args>
335 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
336 FutureBase<T>::thenImplementation(
338 futures::detail::argResult<isTry, F, Args...>) {
339 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
340 typedef typename R::ReturnsFuture::Inner B;
341 this->throwIfInvalid();
344 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
346 // grab the Future now before we lose our handle on the Promise
347 auto f = p.getFuture();
348 f.core_->setExecutorNoLock(this->getExecutor());
351 [state = futures::detail::makeCoreCallbackState(
352 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
353 if (!isTry && t.hasException()) {
354 state.setException(std::move(t.exception()));
356 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
357 if (tf2.hasException()) {
358 state.setException(std::move(tf2.exception()));
360 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
361 p.setTry(std::move(b));
369 } // namespace detail
370 } // namespace futures
373 SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
374 return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
377 // makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
379 typename std::enable_if<
380 isSemiFuture<typename std::result_of<F()>::type>::value,
381 typename std::result_of<F()>::type>::type
382 makeSemiFutureWith(F&& func) {
384 typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
386 return std::forward<F>(func)();
387 } catch (std::exception& e) {
388 return makeSemiFuture<InnerType>(
389 exception_wrapper(std::current_exception(), e));
391 return makeSemiFuture<InnerType>(
392 exception_wrapper(std::current_exception()));
396 // makeSemiFutureWith(T()) -> SemiFuture<T>
397 // makeSemiFutureWith(void()) -> SemiFuture<Unit>
399 typename std::enable_if<
400 !(isSemiFuture<typename std::result_of<F()>::type>::value),
401 SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
402 makeSemiFutureWith(F&& func) {
403 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
404 return makeSemiFuture<LiftedResult>(
405 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
409 SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
410 return makeSemiFuture(Try<T>(e));
414 SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
415 return makeSemiFuture(Try<T>(std::move(ew)));
418 template <class T, class E>
420 enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
421 makeSemiFuture(E const& e) {
422 return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
426 SemiFuture<T> makeSemiFuture(Try<T>&& t) {
427 return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
430 // This must be defined after the constructors to avoid a bug in MSVC
431 // https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
432 inline SemiFuture<Unit> makeSemiFuture() {
433 return makeSemiFuture(Unit{});
437 SemiFuture<T> SemiFuture<T>::makeEmpty() {
438 return SemiFuture<T>(futures::detail::EmptyConstruct{});
442 SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept
443 : futures::detail::FutureBase<T>(std::move(other)) {}
446 SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept
447 : futures::detail::FutureBase<T>(std::move(other)) {
448 // SemiFuture should not have an executor on construction
450 this->setExecutor(nullptr);
455 SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
461 SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
463 // SemiFuture should not have an executor on construction
465 this->setExecutor(nullptr);
471 void SemiFuture<T>::boost_() {
472 // If a SemiFuture has an executor it should be deferred, so boost it
473 if (auto e = this->getExecutor()) {
474 // We know in a SemiFuture that if we have an executor it should be
475 // DeferredExecutor. Verify this in debug mode.
476 DCHECK(dynamic_cast<DeferredExecutor*>(e));
478 auto ka = static_cast<DeferredExecutor*>(e)->getKeepAliveToken();
479 static_cast<DeferredExecutor*>(e)->boost();
484 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
487 // If current executor is deferred, boost block to ensure that work
488 // progresses and is run on the new executor.
489 auto oldExecutor = this->getExecutor();
490 if (oldExecutor && executor && (executor != oldExecutor)) {
491 // We know in a SemiFuture that if we have an executor it should be
492 // DeferredExecutor. Verify this in debug mode.
493 DCHECK(dynamic_cast<DeferredExecutor*>(this->getExecutor()));
494 if (static_cast<DeferredExecutor*>(oldExecutor)) {
495 executor->add([oldExecutorKA = oldExecutor->getKeepAliveToken()]() {
496 static_cast<DeferredExecutor*>(oldExecutorKA.get())->boost();
501 this->setExecutor(executor, priority);
503 auto newFuture = Future<T>(this->core_);
504 this->core_ = nullptr;
509 template <typename F>
510 SemiFuture<typename futures::detail::callableResult<T, F>::Return::value_type>
511 SemiFuture<T>::defer(F&& func) && {
512 // If we already have a deferred executor, use it, otherwise create one
513 auto defKeepAlive = this->getExecutor()
514 ? this->getExecutor()->getKeepAliveToken()
515 : DeferredExecutor::create();
516 auto e = defKeepAlive.get();
517 // We know in a SemiFuture that if we have an executor it should be
518 // DeferredExecutor (either it was that way before, or we just created it).
519 // Verify this in debug mode.
520 DCHECK(dynamic_cast<DeferredExecutor*>(e));
521 // Convert to a folly::future with a deferred executor
522 // Will be low-cost if this is not a new executor as via optimises for that
527 // Then add the work, with a wrapper function that captures the
528 // keepAlive so the executor is destroyed at the right time.
530 DeferredExecutor::wrap(std::move(defKeepAlive), std::move(func)))
531 // Finally, convert back o a folly::SemiFuture to hide the executor
533 // Carry deferred executor through chain as constructor from Future will
540 Future<T> Future<T>::makeEmpty() {
541 return Future<T>(futures::detail::EmptyConstruct{});
545 Future<T>::Future(Future<T>&& other) noexcept
546 : futures::detail::FutureBase<T>(std::move(other)) {}
549 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
557 typename std::enable_if<
558 !std::is_same<T, typename std::decay<T2>::type>::value &&
559 std::is_constructible<T, T2&&>::value &&
560 std::is_convertible<T2&&, T>::value,
562 Future<T>::Future(Future<T2>&& other)
563 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
568 typename std::enable_if<
569 !std::is_same<T, typename std::decay<T2>::type>::value &&
570 std::is_constructible<T, T2&&>::value &&
571 !std::is_convertible<T2&&, T>::value,
573 Future<T>::Future(Future<T2>&& other)
574 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
579 typename std::enable_if<
580 !std::is_same<T, typename std::decay<T2>::type>::value &&
581 std::is_constructible<T, T2&&>::value,
583 Future<T>& Future<T>::operator=(Future<T2>&& other) {
585 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
593 enable_if<isFuture<F>::value, Future<typename isFuture<T>::Inner>>::type
594 Future<T>::unwrap() {
595 return then([](Future<typename isFuture<T>::Inner> internal_future) {
596 return internal_future;
601 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
602 this->throwIfInvalid();
604 this->setExecutor(executor, priority);
606 auto newFuture = Future<T>(this->core_);
607 this->core_ = nullptr;
612 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
613 this->throwIfInvalid();
615 auto f = p.getFuture();
616 auto func = [p = std::move(p)](Try<T>&& t) mutable {
617 p.setTry(std::move(t));
619 using R = futures::detail::callableResult<T, decltype(func)>;
620 this->template thenImplementation<decltype(func), R>(
621 std::move(func), typename R::Arg());
622 return std::move(f).via(executor, priority);
625 template <typename T>
626 template <typename R, typename Caller, typename... Args>
627 Future<typename isFuture<R>::Inner>
628 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
629 typedef typename std::remove_cv<typename std::remove_reference<
630 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
633 return then([instance, func](Try<T>&& t){
634 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
639 Future<Unit> Future<T>::then() {
640 return then([] () {});
643 // onError where the callback returns T
646 typename std::enable_if<
647 !futures::detail::callableWith<F, exception_wrapper>::value &&
648 !futures::detail::callableWith<F, exception_wrapper&>::value &&
649 !futures::detail::Extract<F>::ReturnsFuture::value,
651 Future<T>::onError(F&& func) {
652 typedef std::remove_reference_t<
653 typename futures::detail::Extract<F>::FirstArg>
656 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
657 "Return type of onError callback must be T or Future<T>");
660 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
661 auto f = p.getFuture();
664 [state = futures::detail::makeCoreCallbackState(
665 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
666 if (auto e = t.template tryGetExceptionObject<Exn>()) {
667 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
669 state.setTry(std::move(t));
676 // onError where the callback returns Future<T>
679 typename std::enable_if<
680 !futures::detail::callableWith<F, exception_wrapper>::value &&
681 !futures::detail::callableWith<F, exception_wrapper&>::value &&
682 futures::detail::Extract<F>::ReturnsFuture::value,
684 Future<T>::onError(F&& func) {
686 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
688 "Return type of onError callback must be T or Future<T>");
689 typedef std::remove_reference_t<
690 typename futures::detail::Extract<F>::FirstArg>
694 auto f = p.getFuture();
697 [state = futures::detail::makeCoreCallbackState(
698 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
699 if (auto e = t.template tryGetExceptionObject<Exn>()) {
700 auto tf2 = state.tryInvoke(*e);
701 if (tf2.hasException()) {
702 state.setException(std::move(tf2.exception()));
704 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
705 p.setTry(std::move(t3));
709 state.setTry(std::move(t));
718 Future<T> Future<T>::ensure(F&& func) {
719 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
721 return makeFuture(std::move(t));
727 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
728 return within(dur, tk).onError([funcw = std::forward<F>(func)](
729 TimedOut const&) { return std::move(funcw)(); });
734 typename std::enable_if<
735 futures::detail::callableWith<F, exception_wrapper>::value &&
736 futures::detail::Extract<F>::ReturnsFuture::value,
738 Future<T>::onError(F&& func) {
740 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
742 "Return type of onError callback must be T or Future<T>");
745 auto f = p.getFuture();
747 [state = futures::detail::makeCoreCallbackState(
748 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
749 if (t.hasException()) {
750 auto tf2 = state.tryInvoke(std::move(t.exception()));
751 if (tf2.hasException()) {
752 state.setException(std::move(tf2.exception()));
754 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
755 p.setTry(std::move(t3));
759 state.setTry(std::move(t));
766 // onError(exception_wrapper) that returns T
769 typename std::enable_if<
770 futures::detail::callableWith<F, exception_wrapper>::value &&
771 !futures::detail::Extract<F>::ReturnsFuture::value,
773 Future<T>::onError(F&& func) {
775 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
777 "Return type of onError callback must be T or Future<T>");
780 auto f = p.getFuture();
782 [state = futures::detail::makeCoreCallbackState(
783 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
784 if (t.hasException()) {
785 state.setTry(makeTryWith(
786 [&] { return state.invoke(std::move(t.exception())); }));
788 state.setTry(std::move(t));
796 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
797 return waitVia(e).getTry();
800 template <class Func>
801 auto via(Executor* x, Func&& func)
802 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
803 // TODO make this actually more performant. :-P #7260175
804 return via(x).then(std::forward<Func>(func));
810 Future<typename std::decay<T>::type> makeFuture(T&& t) {
811 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
814 inline Future<Unit> makeFuture() {
815 return makeFuture(Unit{});
818 // makeFutureWith(Future<T>()) -> Future<T>
820 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
821 typename std::result_of<F()>::type>::type
822 makeFutureWith(F&& func) {
824 typename isFuture<typename std::result_of<F()>::type>::Inner;
826 return std::forward<F>(func)();
827 } catch (std::exception& e) {
828 return makeFuture<InnerType>(
829 exception_wrapper(std::current_exception(), e));
831 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
835 // makeFutureWith(T()) -> Future<T>
836 // makeFutureWith(void()) -> Future<Unit>
838 typename std::enable_if<
839 !(isFuture<typename std::result_of<F()>::type>::value),
840 Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
841 makeFutureWith(F&& func) {
842 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
843 return makeFuture<LiftedResult>(
844 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
848 Future<T> makeFuture(std::exception_ptr const& e) {
849 return makeFuture(Try<T>(e));
853 Future<T> makeFuture(exception_wrapper ew) {
854 return makeFuture(Try<T>(std::move(ew)));
857 template <class T, class E>
858 typename std::enable_if<std::is_base_of<std::exception, E>::value,
860 makeFuture(E const& e) {
861 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
865 Future<T> makeFuture(Try<T>&& t) {
866 return Future<T>(new futures::detail::Core<T>(std::move(t)));
870 Future<Unit> via(Executor* executor, int8_t priority) {
871 return makeFuture().via(executor, priority);
874 // mapSetCallback calls func(i, Try<T>) when every future completes
876 template <class T, class InputIterator, class F>
877 void mapSetCallback(InputIterator first, InputIterator last, F func) {
878 for (size_t i = 0; first != last; ++first, ++i) {
879 first->setCallback_([func, i](Try<T>&& t) {
880 func(i, std::move(t));
885 // collectAll (variadic)
887 template <typename... Fs>
888 typename futures::detail::CollectAllVariadicContext<
889 typename std::decay<Fs>::type::value_type...>::type
890 collectAll(Fs&&... fs) {
891 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
892 typename std::decay<Fs>::type::value_type...>>();
893 futures::detail::collectVariadicHelper<
894 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
895 return ctx->p.getFuture();
898 // collectAll (iterator)
900 template <class InputIterator>
903 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
904 collectAll(InputIterator first, InputIterator last) {
906 typename std::iterator_traits<InputIterator>::value_type::value_type T;
908 struct CollectAllContext {
909 CollectAllContext(size_t n) : results(n) {}
910 ~CollectAllContext() {
911 p.setValue(std::move(results));
913 Promise<std::vector<Try<T>>> p;
914 std::vector<Try<T>> results;
918 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
919 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
920 ctx->results[i] = std::move(t);
922 return ctx->p.getFuture();
925 // collect (iterator)
930 template <typename T>
931 struct CollectContext {
933 explicit Nothing(int /* n */) {}
936 using Result = typename std::conditional<
937 std::is_void<T>::value,
939 std::vector<T>>::type;
941 using InternalResult = typename std::conditional<
942 std::is_void<T>::value,
944 std::vector<Optional<T>>>::type;
946 explicit CollectContext(size_t n) : result(n) {}
948 if (!threw.exchange(true)) {
949 // map Optional<T> -> T
950 std::vector<T> finalResult;
951 finalResult.reserve(result.size());
952 std::transform(result.begin(), result.end(),
953 std::back_inserter(finalResult),
954 [](Optional<T>& o) { return std::move(o.value()); });
955 p.setValue(std::move(finalResult));
958 inline void setPartialResult(size_t i, Try<T>& t) {
959 result[i] = std::move(t.value());
962 InternalResult result;
963 std::atomic<bool> threw {false};
966 } // namespace detail
967 } // namespace futures
969 template <class InputIterator>
970 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
971 InputIterator>::value_type::value_type>::Result>
972 collect(InputIterator first, InputIterator last) {
974 typename std::iterator_traits<InputIterator>::value_type::value_type T;
976 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
977 std::distance(first, last));
978 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
979 if (t.hasException()) {
980 if (!ctx->threw.exchange(true)) {
981 ctx->p.setException(std::move(t.exception()));
983 } else if (!ctx->threw) {
984 ctx->setPartialResult(i, t);
987 return ctx->p.getFuture();
990 // collect (variadic)
992 template <typename... Fs>
993 typename futures::detail::CollectVariadicContext<
994 typename std::decay<Fs>::type::value_type...>::type
995 collect(Fs&&... fs) {
996 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
997 typename std::decay<Fs>::type::value_type...>>();
998 futures::detail::collectVariadicHelper<
999 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
1000 return ctx->p.getFuture();
1003 // collectAny (iterator)
1005 template <class InputIterator>
1010 std::iterator_traits<InputIterator>::value_type::value_type>>>
1011 collectAny(InputIterator first, InputIterator last) {
1013 typename std::iterator_traits<InputIterator>::value_type::value_type T;
1015 struct CollectAnyContext {
1016 CollectAnyContext() {}
1017 Promise<std::pair<size_t, Try<T>>> p;
1018 std::atomic<bool> done {false};
1021 auto ctx = std::make_shared<CollectAnyContext>();
1022 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
1023 if (!ctx->done.exchange(true)) {
1024 ctx->p.setValue(std::make_pair(i, std::move(t)));
1027 return ctx->p.getFuture();
1030 // collectAnyWithoutException (iterator)
1032 template <class InputIterator>
1035 typename std::iterator_traits<InputIterator>::value_type::value_type>>
1036 collectAnyWithoutException(InputIterator first, InputIterator last) {
1038 typename std::iterator_traits<InputIterator>::value_type::value_type T;
1040 struct CollectAnyWithoutExceptionContext {
1041 CollectAnyWithoutExceptionContext(){}
1042 Promise<std::pair<size_t, T>> p;
1043 std::atomic<bool> done{false};
1044 std::atomic<size_t> nFulfilled{0};
1048 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
1049 ctx->nTotal = size_t(std::distance(first, last));
1051 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
1052 if (!t.hasException() && !ctx->done.exchange(true)) {
1053 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
1054 } else if (++ctx->nFulfilled == ctx->nTotal) {
1055 ctx->p.setException(t.exception());
1058 return ctx->p.getFuture();
1061 // collectN (iterator)
1063 template <class InputIterator>
1064 Future<std::vector<std::pair<size_t, Try<typename
1065 std::iterator_traits<InputIterator>::value_type::value_type>>>>
1066 collectN(InputIterator first, InputIterator last, size_t n) {
1068 std::iterator_traits<InputIterator>::value_type::value_type T;
1069 typedef std::vector<std::pair<size_t, Try<T>>> V;
1071 struct CollectNContext {
1073 std::atomic<size_t> completed = {0};
1076 auto ctx = std::make_shared<CollectNContext>();
1078 if (size_t(std::distance(first, last)) < n) {
1079 ctx->p.setException(std::runtime_error("Not enough futures"));
1081 // for each completed Future, increase count and add to vector, until we
1082 // have n completed futures at which point we fulfil our Promise with the
1084 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
1085 auto c = ++ctx->completed;
1087 assert(ctx->v.size() < n);
1088 ctx->v.emplace_back(i, std::move(t));
1090 ctx->p.setTry(Try<V>(std::move(ctx->v)));
1096 return ctx->p.getFuture();
1099 // reduce (iterator)
1101 template <class It, class T, class F>
1102 Future<T> reduce(It first, It last, T&& initial, F&& func) {
1103 if (first == last) {
1104 return makeFuture(std::move(initial));
1107 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
1108 typedef typename std::conditional<
1109 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
1112 typedef isTry<Arg> IsTry;
1114 auto sfunc = std::make_shared<F>(std::move(func));
1116 auto f = first->then(
1117 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
1119 std::move(minitial), head.template get<IsTry::value, Arg&&>());
1122 for (++first; first != last; ++first) {
1123 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
1124 return (*sfunc)(std::move(std::get<0>(t).value()),
1125 // Either return a ItT&& or a Try<ItT>&& depending
1126 // on the type of the argument of func.
1127 std::get<1>(t).template get<IsTry::value, Arg&&>());
1134 // window (collection)
1136 template <class Collection, class F, class ItT, class Result>
1137 std::vector<Future<Result>>
1138 window(Collection input, F func, size_t n) {
1139 // Use global inline executor singleton
1140 auto executor = &InlineExecutor::instance();
1141 return window(executor, std::move(input), std::move(func), n);
1144 template <class Collection, class F, class ItT, class Result>
1145 std::vector<Future<Result>>
1146 window(Executor* executor, Collection input, F func, size_t n) {
1147 struct WindowContext {
1148 WindowContext(Executor* executor_, Collection&& input_, F&& func_)
1149 : executor(executor_),
1150 input(std::move(input_)),
1151 promises(input.size()),
1152 func(std::move(func_)) {}
1153 std::atomic<size_t> i{0};
1156 std::vector<Promise<Result>> promises;
1159 static inline void spawn(std::shared_ptr<WindowContext> ctx) {
1160 size_t i = ctx->i++;
1161 if (i < ctx->input.size()) {
1162 auto fut = ctx->func(std::move(ctx->input[i]));
1163 fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
1164 const auto executor_ = ctx->executor;
1165 executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
1166 ctx->promises[i].setTry(std::move(t));
1167 // Chain another future onto this one
1168 spawn(std::move(ctx));
1175 auto max = std::min(n, input.size());
1177 auto ctx = std::make_shared<WindowContext>(
1178 executor, std::move(input), std::move(func));
1180 // Start the first n Futures
1181 for (size_t i = 0; i < max; ++i) {
1182 executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
1185 std::vector<Future<Result>> futures;
1186 futures.reserve(ctx->promises.size());
1187 for (auto& promise : ctx->promises) {
1188 futures.emplace_back(promise.getFuture());
1197 template <class I, class F>
1198 Future<I> Future<T>::reduce(I&& initial, F&& func) {
1200 minitial = std::forward<I>(initial),
1201 mfunc = std::forward<F>(func)
1202 ](T& vals) mutable {
1203 auto ret = std::move(minitial);
1204 for (auto& val : vals) {
1205 ret = mfunc(std::move(ret), std::move(val));
1211 // unorderedReduce (iterator)
1213 template <class It, class T, class F, class ItT, class Arg>
1214 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1215 if (first == last) {
1216 return makeFuture(std::move(initial));
1219 typedef isTry<Arg> IsTry;
1221 struct UnorderedReduceContext {
1222 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1223 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1224 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1226 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1229 size_t numThens_; // how many Futures completed and called .then()
1230 size_t numFutures_; // how many Futures in total
1231 Promise<T> promise_;
1234 auto ctx = std::make_shared<UnorderedReduceContext>(
1235 std::move(initial), std::move(func), std::distance(first, last));
1237 mapSetCallback<ItT>(
1240 [ctx](size_t /* i */, Try<ItT>&& t) {
1241 // Futures can be completed in any order, simultaneously.
1242 // To make this non-blocking, we create a new Future chain in
1243 // the order of completion to reduce the values.
1244 // The spinlock just protects chaining a new Future, not actually
1245 // executing the reduce, which should be really fast.
1246 folly::MSLGuard lock(ctx->lock_);
1248 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1249 // Either return a ItT&& or a Try<ItT>&& depending
1250 // on the type of the argument of func.
1251 return ctx->func_(std::move(v),
1252 mt.template get<IsTry::value, Arg&&>());
1254 if (++ctx->numThens_ == ctx->numFutures_) {
1255 // After reducing the value of the last Future, fulfill the Promise
1256 ctx->memo_.setCallback_(
1257 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1261 return ctx->promise_.getFuture();
1267 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1268 return within(dur, TimedOut(), tk);
1273 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1276 Context(E ex) : exception(std::move(ex)), promise() {}
1278 Future<Unit> thisFuture;
1280 std::atomic<bool> token {false};
1283 if (this->isReady()) {
1284 return std::move(*this);
1287 std::shared_ptr<Timekeeper> tks;
1289 tks = folly::detail::getTimekeeperSingleton();
1293 if (UNLIKELY(!tk)) {
1294 return makeFuture<T>(NoTimekeeper());
1297 auto ctx = std::make_shared<Context>(std::move(e));
1299 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1300 if (ctx->token.exchange(true) == false) {
1301 ctx->promise.setTry(std::move(t));
1305 // Have time keeper use a weak ptr to hold ctx,
1306 // so that ctx can be deallocated as soon as the future job finished.
1307 tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1308 auto lockedCtx = weakCtx.lock();
1310 // ctx already released. "this" completed first, cancel "after"
1313 // "after" completed first, cancel "this"
1314 lockedCtx->thisFuture.raise(TimedOut());
1315 if (lockedCtx->token.exchange(true) == false) {
1316 if (t.hasException()) {
1317 lockedCtx->promise.setException(std::move(t.exception()));
1319 lockedCtx->promise.setException(std::move(lockedCtx->exception));
1324 return ctx->promise.getFuture().via(this->getExecutor());
1330 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1331 return collectAll(*this, futures::sleep(dur, tk))
1332 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1333 Try<T>& t = std::get<0>(tup);
1334 return makeFuture<T>(std::move(t));
1342 void doBoost(folly::Future<T>& /* usused */) {}
1345 void doBoost(folly::SemiFuture<T>& f) {
1349 template <class FutureType, typename T = typename FutureType::value_type>
1350 void waitImpl(FutureType& f) {
1351 // short-circuit if there's nothing to do
1356 FutureBatonType baton;
1357 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1360 assert(f.isReady());
1363 template <class FutureType, typename T = typename FutureType::value_type>
1364 void waitImpl(FutureType& f, Duration dur) {
1365 // short-circuit if there's nothing to do
1371 auto ret = promise.getFuture();
1372 auto baton = std::make_shared<FutureBatonType>();
1373 f.setCallback_([baton, promise = std::move(promise)](Try<T>&& t) mutable {
1374 promise.setTry(std::move(t));
1379 if (baton->timed_wait(dur)) {
1380 assert(f.isReady());
1385 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1386 // Set callback so to ensure that the via executor has something on it
1387 // so that once the preceding future triggers this callback, drive will
1388 // always have a callback to satisfy it
1392 f = f.via(e).then([](T&& t) { return std::move(t); });
1393 while (!f.isReady()) {
1396 assert(f.isReady());
1399 } // namespace detail
1400 } // namespace futures
1403 SemiFuture<T>& SemiFuture<T>::wait() & {
1404 futures::detail::waitImpl(*this);
1409 SemiFuture<T>&& SemiFuture<T>::wait() && {
1410 futures::detail::waitImpl(*this);
1411 return std::move(*this);
1415 SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
1416 futures::detail::waitImpl(*this, dur);
1421 SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
1422 futures::detail::waitImpl(*this, dur);
1423 return std::move(*this);
1427 T SemiFuture<T>::get() && {
1428 return std::move(wait().value());
1432 T SemiFuture<T>::get(Duration dur) && {
1434 if (this->isReady()) {
1435 return std::move(this->value());
1442 Future<T>& Future<T>::wait() & {
1443 futures::detail::waitImpl(*this);
1448 Future<T>&& Future<T>::wait() && {
1449 futures::detail::waitImpl(*this);
1450 return std::move(*this);
1454 Future<T>& Future<T>::wait(Duration dur) & {
1455 futures::detail::waitImpl(*this, dur);
1460 Future<T>&& Future<T>::wait(Duration dur) && {
1461 futures::detail::waitImpl(*this, dur);
1462 return std::move(*this);
1466 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1467 futures::detail::waitViaImpl(*this, e);
1472 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1473 futures::detail::waitViaImpl(*this, e);
1474 return std::move(*this);
1478 T Future<T>::get() {
1479 return std::move(wait().value());
1483 T Future<T>::get(Duration dur) {
1485 if (this->isReady()) {
1486 return std::move(this->value());
1493 T Future<T>::getVia(DrivableExecutor* e) {
1494 return std::move(waitVia(e).value());
1501 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1502 return t1.value() == t2.value();
1505 } // namespace detail
1506 } // namespace futures
1509 Future<bool> Future<T>::willEqual(Future<T>& f) {
1510 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1511 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1512 return futures::detail::TryEquals<T>::equals(
1513 std::get<0>(t), std::get<1>(t));
1522 Future<T> Future<T>::filter(F&& predicate) {
1523 return this->then([p = std::forward<F>(predicate)](T val) {
1524 T const& valConstRef = val;
1525 if (!p(valConstRef)) {
1526 throwPredicateDoesNotObtain();
1533 inline Future<Unit> when(bool p, F&& thunk) {
1534 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1537 template <class P, class F>
1538 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1540 auto future = thunk();
1541 return future.then([
1542 predicate = std::forward<P>(predicate),
1543 thunk = std::forward<F>(thunk)
1545 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1548 return makeFuture();
1552 Future<Unit> times(const int n, F&& thunk) {
1553 return folly::whileDo(
1554 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1555 return count->fetch_add(1) < n;
1557 std::forward<F>(thunk));
1561 template <class It, class F, class ItT, class Result>
1562 std::vector<Future<Result>> map(It first, It last, F func) {
1563 std::vector<Future<Result>> results;
1564 for (auto it = first; it != last; it++) {
1565 results.push_back(it->then(func));
1569 } // namespace futures
1571 // Instantiate the most common Future types to save compile time
1572 extern template class Future<Unit>;
1573 extern template class Future<bool>;
1574 extern template class Future<int>;
1575 extern template class Future<int64_t>;
1576 extern template class Future<std::string>;
1577 extern template class Future<double>;
1578 } // namespace folly