2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/executors/InlineExecutor.h>
27 #include <folly/futures/Timekeeper.h>
28 #include <folly/futures/detail/Core.h>
30 #ifndef FOLLY_FUTURE_USING_FIBER
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
45 #if FOLLY_FUTURE_USING_FIBER
46 typedef folly::fibers::Baton FutureBatonType;
48 typedef folly::Baton<> FutureBatonType;
51 } // namespace futures
54 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
59 // Guarantees that the stored functor is destructed before the stored promise
60 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
61 template <typename T, typename F>
62 class CoreCallbackState {
64 template <typename FF>
65 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
66 noexcept(F(std::declval<FF>())))
67 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
68 assert(before_barrier());
71 CoreCallbackState(CoreCallbackState&& that) noexcept(
72 noexcept(F(std::declval<F>()))) {
73 if (that.before_barrier()) {
74 new (&func_) F(std::move(that.func_));
75 promise_ = that.stealPromise();
79 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
81 ~CoreCallbackState() {
82 if (before_barrier()) {
87 template <typename... Args>
88 auto invoke(Args&&... args) noexcept(
89 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
90 assert(before_barrier());
91 return std::move(func_)(std::forward<Args>(args)...);
94 template <typename... Args>
95 auto tryInvoke(Args&&... args) noexcept {
96 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
99 void setTry(Try<T>&& t) {
100 stealPromise().setTry(std::move(t));
103 void setException(exception_wrapper&& ew) {
104 stealPromise().setException(std::move(ew));
107 Promise<T> stealPromise() noexcept {
108 assert(before_barrier());
110 return std::move(promise_);
114 bool before_barrier() const noexcept {
115 return !promise_.isFulfilled();
121 Promise<T> promise_{Promise<T>::makeEmpty()};
124 template <typename T, typename F>
125 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
126 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
127 std::declval<Promise<T>&&>(),
128 std::declval<F&&>()))) {
129 return CoreCallbackState<T, _t<std::decay<F>>>(
130 std::move(p), std::forward<F>(f));
134 FutureBase<T>::FutureBase(SemiFuture<T>&& other) noexcept : core_(other.core_) {
135 other.core_ = nullptr;
139 FutureBase<T>::FutureBase(Future<T>&& other) noexcept : core_(other.core_) {
140 other.core_ = nullptr;
144 template <class T2, typename>
145 FutureBase<T>::FutureBase(T2&& val)
146 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
149 template <typename T2>
150 FutureBase<T>::FutureBase(
151 typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
152 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
157 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
159 FutureBase<T>::FutureBase(in_place_t, Args&&... args)
161 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
165 template <class FutureType>
166 void FutureBase<T>::assign(FutureType& other) noexcept {
167 std::swap(core_, other.core_);
171 FutureBase<T>::~FutureBase() {
176 T& FutureBase<T>::value() & {
179 return core_->getTry().value();
183 T const& FutureBase<T>::value() const& {
186 return core_->getTry().value();
190 T&& FutureBase<T>::value() && {
193 return std::move(core_->getTry().value());
197 T const&& FutureBase<T>::value() const&& {
200 return std::move(core_->getTry().value());
204 bool FutureBase<T>::isReady() const {
206 return core_->ready();
210 bool FutureBase<T>::hasValue() {
211 return getTry().hasValue();
215 bool FutureBase<T>::hasException() {
216 return getTry().hasException();
220 void FutureBase<T>::detach() {
222 core_->detachFuture();
228 Try<T>& FutureBase<T>::getTry() {
231 return core_->getTry();
235 void FutureBase<T>::throwIfInvalid() const {
242 Optional<Try<T>> FutureBase<T>::poll() {
244 if (core_->ready()) {
245 o = std::move(core_->getTry());
251 void FutureBase<T>::raise(exception_wrapper exception) {
252 core_->raise(std::move(exception));
257 void FutureBase<T>::setCallback_(F&& func) {
259 core_->setCallback(std::forward<F>(func));
263 FutureBase<T>::FutureBase(futures::detail::EmptyConstruct) noexcept
268 // Variant: returns a value
269 // e.g. f.then([](Try<T>&& t){ return t.value(); });
271 template <typename F, typename R, bool isTry, typename... Args>
272 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
273 FutureBase<T>::thenImplementation(
275 futures::detail::argResult<isTry, F, Args...>) {
276 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
277 typedef typename R::ReturnsFuture::Inner B;
279 this->throwIfInvalid();
282 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
284 // grab the Future now before we lose our handle on the Promise
285 auto f = p.getFuture();
286 f.core_->setExecutorNoLock(this->getExecutor());
288 /* This is a bit tricky.
290 We can't just close over *this in case this Future gets moved. So we
291 make a new dummy Future. We could figure out something more
292 sophisticated that avoids making a new Future object when it can, as an
293 optimization. But this is correct.
295 core_ can't be moved, it is explicitly disallowed (as is copying). But
296 if there's ever a reason to allow it, this is one place that makes that
297 assumption and would need to be fixed. We use a standard shared pointer
298 for core_ (by copying it in), which means in essence obj holds a shared
299 pointer to itself. But this shouldn't leak because Promise will not
300 outlive the continuation, because Promise will setException() with a
301 broken Promise if it is destructed before completed. We could use a
302 weak pointer but it would have to be converted to a shared pointer when
303 func is executed (because the Future returned by func may possibly
304 persist beyond the callback, if it gets moved), and so it is an
305 optimization to just make it shared from the get-go.
307 Two subtle but important points about this design. futures::detail::Core
308 has no back pointers to Future or Promise, so if Future or Promise get
309 moved (and they will be moved in performant code) we don't have to do
310 anything fancy. And because we store the continuation in the
311 futures::detail::Core, not in the Future, we can execute the continuation
312 even after the Future has gone out of scope. This is an intentional design
313 decision. It is likely we will want to be able to cancel a continuation
314 in some circumstances, but I think it should be explicit not implicit
315 in the destruction of the Future used to create it.
318 [state = futures::detail::makeCoreCallbackState(
319 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
321 if (!isTry && t.hasException()) {
322 state.setException(std::move(t.exception()));
324 state.setTry(makeTryWith(
325 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
331 // Variant: returns a Future
332 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
334 template <typename F, typename R, bool isTry, typename... Args>
335 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
336 FutureBase<T>::thenImplementation(
338 futures::detail::argResult<isTry, F, Args...>) {
339 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
340 typedef typename R::ReturnsFuture::Inner B;
341 this->throwIfInvalid();
344 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
346 // grab the Future now before we lose our handle on the Promise
347 auto f = p.getFuture();
348 f.core_->setExecutorNoLock(this->getExecutor());
351 [state = futures::detail::makeCoreCallbackState(
352 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
353 if (!isTry && t.hasException()) {
354 state.setException(std::move(t.exception()));
356 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
357 if (tf2.hasException()) {
358 state.setException(std::move(tf2.exception()));
360 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
361 p.setTry(std::move(b));
369 } // namespace detail
370 } // namespace futures
373 SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
374 return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
377 // makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
379 typename std::enable_if<
380 isSemiFuture<typename std::result_of<F()>::type>::value,
381 typename std::result_of<F()>::type>::type
382 makeSemiFutureWith(F&& func) {
384 typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
386 return std::forward<F>(func)();
387 } catch (std::exception& e) {
388 return makeSemiFuture<InnerType>(
389 exception_wrapper(std::current_exception(), e));
391 return makeSemiFuture<InnerType>(
392 exception_wrapper(std::current_exception()));
396 // makeSemiFutureWith(T()) -> SemiFuture<T>
397 // makeSemiFutureWith(void()) -> SemiFuture<Unit>
399 typename std::enable_if<
400 !(isSemiFuture<typename std::result_of<F()>::type>::value),
401 SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
402 makeSemiFutureWith(F&& func) {
403 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
404 return makeSemiFuture<LiftedResult>(
405 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
409 SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
410 return makeSemiFuture(Try<T>(e));
414 SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
415 return makeSemiFuture(Try<T>(std::move(ew)));
418 template <class T, class E>
420 enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
421 makeSemiFuture(E const& e) {
422 return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
426 SemiFuture<T> makeSemiFuture(Try<T>&& t) {
427 return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
430 // This must be defined after the constructors to avoid a bug in MSVC
431 // https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
432 inline SemiFuture<Unit> makeSemiFuture() {
433 return makeSemiFuture(Unit{});
437 SemiFuture<T> SemiFuture<T>::makeEmpty() {
438 return SemiFuture<T>(futures::detail::EmptyConstruct{});
442 SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept
443 : futures::detail::FutureBase<T>(std::move(other)) {}
446 SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept
447 : futures::detail::FutureBase<T>(std::move(other)) {
448 // SemiFuture should not have an executor on construction
450 this->setExecutor(nullptr);
455 SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
461 SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
463 // SemiFuture should not have an executor on construction
465 this->setExecutor(nullptr);
471 void SemiFuture<T>::boost_() {
472 // If a SemiFuture has an executor it should be deferred, so boost it
473 if (auto e = this->getExecutor()) {
474 // We know in a SemiFuture that if we have an executor it should be
475 // DeferredExecutor. Verify this in debug mode.
476 DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(e));
478 auto ka = static_cast<DeferredExecutor*>(e)->getKeepAliveToken();
479 static_cast<DeferredExecutor*>(e)->boost();
484 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
490 // If current executor is deferred, boost block to ensure that work
491 // progresses and is run on the new executor.
492 auto oldExecutor = this->getExecutor();
493 if (oldExecutor && executor && (executor != oldExecutor)) {
494 // We know in a SemiFuture that if we have an executor it should be
495 // DeferredExecutor. Verify this in debug mode.
496 DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(this->getExecutor()));
497 if (static_cast<DeferredExecutor*>(oldExecutor)) {
498 executor->add([oldExecutorKA = oldExecutor->getKeepAliveToken()]() {
499 static_cast<DeferredExecutor*>(oldExecutorKA.get())->boost();
504 this->setExecutor(executor, priority);
506 auto newFuture = Future<T>(this->core_);
507 this->core_ = nullptr;
512 template <typename F>
513 SemiFuture<typename futures::detail::callableResult<T, F>::Return::value_type>
514 SemiFuture<T>::defer(F&& func) && {
515 // If we already have a deferred executor, use it, otherwise create one
516 auto defKeepAlive = this->getExecutor()
517 ? this->getExecutor()->getKeepAliveToken()
518 : DeferredExecutor::create();
519 auto e = defKeepAlive.get();
520 // We know in a SemiFuture that if we have an executor it should be
521 // DeferredExecutor (either it was that way before, or we just created it).
522 // Verify this in debug mode.
523 DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(e));
524 // Convert to a folly::future with a deferred executor
525 // Will be low-cost if this is not a new executor as via optimises for that
530 // Then add the work, with a wrapper function that captures the
531 // keepAlive so the executor is destroyed at the right time.
533 DeferredExecutor::wrap(std::move(defKeepAlive), std::move(func)))
534 // Finally, convert back o a folly::SemiFuture to hide the executor
536 // Carry deferred executor through chain as constructor from Future will
543 Future<T> Future<T>::makeEmpty() {
544 return Future<T>(futures::detail::EmptyConstruct{});
548 Future<T>::Future(Future<T>&& other) noexcept
549 : futures::detail::FutureBase<T>(std::move(other)) {}
552 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
560 typename std::enable_if<
561 !std::is_same<T, typename std::decay<T2>::type>::value &&
562 std::is_constructible<T, T2&&>::value &&
563 std::is_convertible<T2&&, T>::value,
565 Future<T>::Future(Future<T2>&& other)
566 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
571 typename std::enable_if<
572 !std::is_same<T, typename std::decay<T2>::type>::value &&
573 std::is_constructible<T, T2&&>::value &&
574 !std::is_convertible<T2&&, T>::value,
576 Future<T>::Future(Future<T2>&& other)
577 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
582 typename std::enable_if<
583 !std::is_same<T, typename std::decay<T2>::type>::value &&
584 std::is_constructible<T, T2&&>::value,
586 Future<T>& Future<T>::operator=(Future<T2>&& other) {
588 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
596 enable_if<isFuture<F>::value, Future<typename isFuture<T>::Inner>>::type
597 Future<T>::unwrap() {
598 return then([](Future<typename isFuture<T>::Inner> internal_future) {
599 return internal_future;
604 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
605 this->throwIfInvalid();
607 this->setExecutor(executor, priority);
609 auto newFuture = Future<T>(this->core_);
610 this->core_ = nullptr;
615 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
616 this->throwIfInvalid();
618 auto f = p.getFuture();
619 auto func = [p = std::move(p)](Try<T>&& t) mutable {
620 p.setTry(std::move(t));
622 using R = futures::detail::callableResult<T, decltype(func)>;
623 this->template thenImplementation<decltype(func), R>(
624 std::move(func), typename R::Arg());
625 return std::move(f).via(executor, priority);
628 template <typename T>
629 template <typename R, typename Caller, typename... Args>
630 Future<typename isFuture<R>::Inner>
631 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
632 typedef typename std::remove_cv<typename std::remove_reference<
633 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
636 return then([instance, func](Try<T>&& t){
637 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
642 Future<Unit> Future<T>::then() {
643 return then([] () {});
646 // onError where the callback returns T
649 typename std::enable_if<
650 !futures::detail::callableWith<F, exception_wrapper>::value &&
651 !futures::detail::callableWith<F, exception_wrapper&>::value &&
652 !futures::detail::Extract<F>::ReturnsFuture::value,
654 Future<T>::onError(F&& func) {
655 typedef std::remove_reference_t<
656 typename futures::detail::Extract<F>::FirstArg>
659 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
660 "Return type of onError callback must be T or Future<T>");
663 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
664 auto f = p.getFuture();
667 [state = futures::detail::makeCoreCallbackState(
668 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
669 if (auto e = t.template tryGetExceptionObject<Exn>()) {
670 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
672 state.setTry(std::move(t));
679 // onError where the callback returns Future<T>
682 typename std::enable_if<
683 !futures::detail::callableWith<F, exception_wrapper>::value &&
684 !futures::detail::callableWith<F, exception_wrapper&>::value &&
685 futures::detail::Extract<F>::ReturnsFuture::value,
687 Future<T>::onError(F&& func) {
689 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
691 "Return type of onError callback must be T or Future<T>");
692 typedef std::remove_reference_t<
693 typename futures::detail::Extract<F>::FirstArg>
697 auto f = p.getFuture();
700 [state = futures::detail::makeCoreCallbackState(
701 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
702 if (auto e = t.template tryGetExceptionObject<Exn>()) {
703 auto tf2 = state.tryInvoke(*e);
704 if (tf2.hasException()) {
705 state.setException(std::move(tf2.exception()));
707 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
708 p.setTry(std::move(t3));
712 state.setTry(std::move(t));
721 Future<T> Future<T>::ensure(F&& func) {
722 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
724 return makeFuture(std::move(t));
730 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
731 return within(dur, tk).onError([funcw = std::forward<F>(func)](
732 TimedOut const&) { return std::move(funcw)(); });
737 typename std::enable_if<
738 futures::detail::callableWith<F, exception_wrapper>::value &&
739 futures::detail::Extract<F>::ReturnsFuture::value,
741 Future<T>::onError(F&& func) {
743 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
745 "Return type of onError callback must be T or Future<T>");
748 auto f = p.getFuture();
750 [state = futures::detail::makeCoreCallbackState(
751 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
752 if (t.hasException()) {
753 auto tf2 = state.tryInvoke(std::move(t.exception()));
754 if (tf2.hasException()) {
755 state.setException(std::move(tf2.exception()));
757 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
758 p.setTry(std::move(t3));
762 state.setTry(std::move(t));
769 // onError(exception_wrapper) that returns T
772 typename std::enable_if<
773 futures::detail::callableWith<F, exception_wrapper>::value &&
774 !futures::detail::Extract<F>::ReturnsFuture::value,
776 Future<T>::onError(F&& func) {
778 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
780 "Return type of onError callback must be T or Future<T>");
783 auto f = p.getFuture();
785 [state = futures::detail::makeCoreCallbackState(
786 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
787 if (t.hasException()) {
788 state.setTry(makeTryWith(
789 [&] { return state.invoke(std::move(t.exception())); }));
791 state.setTry(std::move(t));
799 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
800 return waitVia(e).getTry();
803 template <class Func>
804 auto via(Executor* x, Func&& func)
805 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
806 // TODO make this actually more performant. :-P #7260175
807 return via(x).then(std::forward<Func>(func));
813 Future<typename std::decay<T>::type> makeFuture(T&& t) {
814 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
817 inline Future<Unit> makeFuture() {
818 return makeFuture(Unit{});
821 // makeFutureWith(Future<T>()) -> Future<T>
823 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
824 typename std::result_of<F()>::type>::type
825 makeFutureWith(F&& func) {
827 typename isFuture<typename std::result_of<F()>::type>::Inner;
829 return std::forward<F>(func)();
830 } catch (std::exception& e) {
831 return makeFuture<InnerType>(
832 exception_wrapper(std::current_exception(), e));
834 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
838 // makeFutureWith(T()) -> Future<T>
839 // makeFutureWith(void()) -> Future<Unit>
841 typename std::enable_if<
842 !(isFuture<typename std::result_of<F()>::type>::value),
843 Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
844 makeFutureWith(F&& func) {
845 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
846 return makeFuture<LiftedResult>(
847 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
851 Future<T> makeFuture(std::exception_ptr const& e) {
852 return makeFuture(Try<T>(e));
856 Future<T> makeFuture(exception_wrapper ew) {
857 return makeFuture(Try<T>(std::move(ew)));
860 template <class T, class E>
861 typename std::enable_if<std::is_base_of<std::exception, E>::value,
863 makeFuture(E const& e) {
864 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
868 Future<T> makeFuture(Try<T>&& t) {
869 return Future<T>(new futures::detail::Core<T>(std::move(t)));
873 Future<Unit> via(Executor* executor, int8_t priority) {
874 return makeFuture().via(executor, priority);
877 // mapSetCallback calls func(i, Try<T>) when every future completes
879 template <class T, class InputIterator, class F>
880 void mapSetCallback(InputIterator first, InputIterator last, F func) {
881 for (size_t i = 0; first != last; ++first, ++i) {
882 first->setCallback_([func, i](Try<T>&& t) {
883 func(i, std::move(t));
888 // collectAll (variadic)
890 template <typename... Fs>
891 typename futures::detail::CollectAllVariadicContext<
892 typename std::decay<Fs>::type::value_type...>::type
893 collectAll(Fs&&... fs) {
894 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
895 typename std::decay<Fs>::type::value_type...>>();
896 futures::detail::collectVariadicHelper<
897 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
898 return ctx->p.getFuture();
901 // collectAll (iterator)
903 template <class InputIterator>
906 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
907 collectAll(InputIterator first, InputIterator last) {
909 typename std::iterator_traits<InputIterator>::value_type::value_type T;
911 struct CollectAllContext {
912 CollectAllContext(size_t n) : results(n) {}
913 ~CollectAllContext() {
914 p.setValue(std::move(results));
916 Promise<std::vector<Try<T>>> p;
917 std::vector<Try<T>> results;
921 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
922 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
923 ctx->results[i] = std::move(t);
925 return ctx->p.getFuture();
928 // collect (iterator)
933 template <typename T>
934 struct CollectContext {
936 explicit Nothing(int /* n */) {}
939 using Result = typename std::conditional<
940 std::is_void<T>::value,
942 std::vector<T>>::type;
944 using InternalResult = typename std::conditional<
945 std::is_void<T>::value,
947 std::vector<Optional<T>>>::type;
949 explicit CollectContext(size_t n) : result(n) {}
951 if (!threw.exchange(true)) {
952 // map Optional<T> -> T
953 std::vector<T> finalResult;
954 finalResult.reserve(result.size());
955 std::transform(result.begin(), result.end(),
956 std::back_inserter(finalResult),
957 [](Optional<T>& o) { return std::move(o.value()); });
958 p.setValue(std::move(finalResult));
961 inline void setPartialResult(size_t i, Try<T>& t) {
962 result[i] = std::move(t.value());
965 InternalResult result;
966 std::atomic<bool> threw {false};
969 } // namespace detail
970 } // namespace futures
972 template <class InputIterator>
973 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
974 InputIterator>::value_type::value_type>::Result>
975 collect(InputIterator first, InputIterator last) {
977 typename std::iterator_traits<InputIterator>::value_type::value_type T;
979 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
980 std::distance(first, last));
981 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
982 if (t.hasException()) {
983 if (!ctx->threw.exchange(true)) {
984 ctx->p.setException(std::move(t.exception()));
986 } else if (!ctx->threw) {
987 ctx->setPartialResult(i, t);
990 return ctx->p.getFuture();
993 // collect (variadic)
995 template <typename... Fs>
996 typename futures::detail::CollectVariadicContext<
997 typename std::decay<Fs>::type::value_type...>::type
998 collect(Fs&&... fs) {
999 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
1000 typename std::decay<Fs>::type::value_type...>>();
1001 futures::detail::collectVariadicHelper<
1002 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
1003 return ctx->p.getFuture();
1006 // collectAny (iterator)
1008 template <class InputIterator>
1013 std::iterator_traits<InputIterator>::value_type::value_type>>>
1014 collectAny(InputIterator first, InputIterator last) {
1016 typename std::iterator_traits<InputIterator>::value_type::value_type T;
1018 struct CollectAnyContext {
1019 CollectAnyContext() {}
1020 Promise<std::pair<size_t, Try<T>>> p;
1021 std::atomic<bool> done {false};
1024 auto ctx = std::make_shared<CollectAnyContext>();
1025 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
1026 if (!ctx->done.exchange(true)) {
1027 ctx->p.setValue(std::make_pair(i, std::move(t)));
1030 return ctx->p.getFuture();
1033 // collectAnyWithoutException (iterator)
1035 template <class InputIterator>
1038 typename std::iterator_traits<InputIterator>::value_type::value_type>>
1039 collectAnyWithoutException(InputIterator first, InputIterator last) {
1041 typename std::iterator_traits<InputIterator>::value_type::value_type T;
1043 struct CollectAnyWithoutExceptionContext {
1044 CollectAnyWithoutExceptionContext(){}
1045 Promise<std::pair<size_t, T>> p;
1046 std::atomic<bool> done{false};
1047 std::atomic<size_t> nFulfilled{0};
1051 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
1052 ctx->nTotal = size_t(std::distance(first, last));
1054 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
1055 if (!t.hasException() && !ctx->done.exchange(true)) {
1056 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
1057 } else if (++ctx->nFulfilled == ctx->nTotal) {
1058 ctx->p.setException(t.exception());
1061 return ctx->p.getFuture();
1064 // collectN (iterator)
1066 template <class InputIterator>
1067 Future<std::vector<std::pair<size_t, Try<typename
1068 std::iterator_traits<InputIterator>::value_type::value_type>>>>
1069 collectN(InputIterator first, InputIterator last, size_t n) {
1071 std::iterator_traits<InputIterator>::value_type::value_type T;
1072 typedef std::vector<std::pair<size_t, Try<T>>> V;
1074 struct CollectNContext {
1076 std::atomic<size_t> completed = {0};
1079 auto ctx = std::make_shared<CollectNContext>();
1081 if (size_t(std::distance(first, last)) < n) {
1082 ctx->p.setException(std::runtime_error("Not enough futures"));
1084 // for each completed Future, increase count and add to vector, until we
1085 // have n completed futures at which point we fulfil our Promise with the
1087 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
1088 auto c = ++ctx->completed;
1090 assert(ctx->v.size() < n);
1091 ctx->v.emplace_back(i, std::move(t));
1093 ctx->p.setTry(Try<V>(std::move(ctx->v)));
1099 return ctx->p.getFuture();
1102 // reduce (iterator)
1104 template <class It, class T, class F>
1105 Future<T> reduce(It first, It last, T&& initial, F&& func) {
1106 if (first == last) {
1107 return makeFuture(std::move(initial));
1110 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
1111 typedef typename std::conditional<
1112 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
1115 typedef isTry<Arg> IsTry;
1117 auto sfunc = std::make_shared<F>(std::move(func));
1119 auto f = first->then(
1120 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
1122 std::move(minitial), head.template get<IsTry::value, Arg&&>());
1125 for (++first; first != last; ++first) {
1126 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
1127 return (*sfunc)(std::move(std::get<0>(t).value()),
1128 // Either return a ItT&& or a Try<ItT>&& depending
1129 // on the type of the argument of func.
1130 std::get<1>(t).template get<IsTry::value, Arg&&>());
1137 // window (collection)
1139 template <class Collection, class F, class ItT, class Result>
1140 std::vector<Future<Result>>
1141 window(Collection input, F func, size_t n) {
1142 // Use global inline executor singleton
1143 auto executor = &InlineExecutor::instance();
1144 return window(executor, std::move(input), std::move(func), n);
1147 template <class Collection, class F, class ItT, class Result>
1148 std::vector<Future<Result>>
1149 window(Executor* executor, Collection input, F func, size_t n) {
1150 struct WindowContext {
1151 WindowContext(Executor* executor_, Collection&& input_, F&& func_)
1152 : executor(executor_),
1153 input(std::move(input_)),
1154 promises(input.size()),
1155 func(std::move(func_)) {}
1156 std::atomic<size_t> i{0};
1159 std::vector<Promise<Result>> promises;
1162 static inline void spawn(std::shared_ptr<WindowContext> ctx) {
1163 size_t i = ctx->i++;
1164 if (i < ctx->input.size()) {
1165 auto fut = ctx->func(std::move(ctx->input[i]));
1166 fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
1167 const auto executor_ = ctx->executor;
1168 executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
1169 ctx->promises[i].setTry(std::move(t));
1170 // Chain another future onto this one
1171 spawn(std::move(ctx));
1178 auto max = std::min(n, input.size());
1180 auto ctx = std::make_shared<WindowContext>(
1181 executor, std::move(input), std::move(func));
1183 // Start the first n Futures
1184 for (size_t i = 0; i < max; ++i) {
1185 executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
1188 std::vector<Future<Result>> futures;
1189 futures.reserve(ctx->promises.size());
1190 for (auto& promise : ctx->promises) {
1191 futures.emplace_back(promise.getFuture());
1200 template <class I, class F>
1201 Future<I> Future<T>::reduce(I&& initial, F&& func) {
1203 minitial = std::forward<I>(initial),
1204 mfunc = std::forward<F>(func)
1205 ](T& vals) mutable {
1206 auto ret = std::move(minitial);
1207 for (auto& val : vals) {
1208 ret = mfunc(std::move(ret), std::move(val));
1214 // unorderedReduce (iterator)
1216 template <class It, class T, class F, class ItT, class Arg>
1217 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1218 if (first == last) {
1219 return makeFuture(std::move(initial));
1222 typedef isTry<Arg> IsTry;
1224 struct UnorderedReduceContext {
1225 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1226 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1227 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1229 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1232 size_t numThens_; // how many Futures completed and called .then()
1233 size_t numFutures_; // how many Futures in total
1234 Promise<T> promise_;
1237 auto ctx = std::make_shared<UnorderedReduceContext>(
1238 std::move(initial), std::move(func), std::distance(first, last));
1240 mapSetCallback<ItT>(
1243 [ctx](size_t /* i */, Try<ItT>&& t) {
1244 // Futures can be completed in any order, simultaneously.
1245 // To make this non-blocking, we create a new Future chain in
1246 // the order of completion to reduce the values.
1247 // The spinlock just protects chaining a new Future, not actually
1248 // executing the reduce, which should be really fast.
1249 folly::MSLGuard lock(ctx->lock_);
1251 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1252 // Either return a ItT&& or a Try<ItT>&& depending
1253 // on the type of the argument of func.
1254 return ctx->func_(std::move(v),
1255 mt.template get<IsTry::value, Arg&&>());
1257 if (++ctx->numThens_ == ctx->numFutures_) {
1258 // After reducing the value of the last Future, fulfill the Promise
1259 ctx->memo_.setCallback_(
1260 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1264 return ctx->promise_.getFuture();
1270 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1271 return within(dur, TimedOut(), tk);
1276 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1279 Context(E ex) : exception(std::move(ex)), promise() {}
1281 Future<Unit> thisFuture;
1283 std::atomic<bool> token {false};
1286 if (this->isReady()) {
1287 return std::move(*this);
1290 std::shared_ptr<Timekeeper> tks;
1292 tks = folly::detail::getTimekeeperSingleton();
1296 if (UNLIKELY(!tk)) {
1297 return makeFuture<T>(NoTimekeeper());
1300 auto ctx = std::make_shared<Context>(std::move(e));
1302 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1303 if (ctx->token.exchange(true) == false) {
1304 ctx->promise.setTry(std::move(t));
1308 // Have time keeper use a weak ptr to hold ctx,
1309 // so that ctx can be deallocated as soon as the future job finished.
1310 tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1311 auto lockedCtx = weakCtx.lock();
1313 // ctx already released. "this" completed first, cancel "after"
1316 // "after" completed first, cancel "this"
1317 lockedCtx->thisFuture.raise(TimedOut());
1318 if (lockedCtx->token.exchange(true) == false) {
1319 if (t.hasException()) {
1320 lockedCtx->promise.setException(std::move(t.exception()));
1322 lockedCtx->promise.setException(std::move(lockedCtx->exception));
1327 return ctx->promise.getFuture().via(this->getExecutor());
1333 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1334 return collectAll(*this, futures::sleep(dur, tk))
1335 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1336 Try<T>& t = std::get<0>(tup);
1337 return makeFuture<T>(std::move(t));
1345 void doBoost(folly::Future<T>& /* usused */) {}
1348 void doBoost(folly::SemiFuture<T>& f) {
1352 template <class FutureType, typename T = typename FutureType::value_type>
1353 void waitImpl(FutureType& f) {
1354 // short-circuit if there's nothing to do
1359 FutureBatonType baton;
1360 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1363 assert(f.isReady());
1366 template <class FutureType, typename T = typename FutureType::value_type>
1367 void waitImpl(FutureType& f, Duration dur) {
1368 // short-circuit if there's nothing to do
1374 auto ret = promise.getFuture();
1375 auto baton = std::make_shared<FutureBatonType>();
1376 f.setCallback_([baton, promise = std::move(promise)](Try<T>&& t) mutable {
1377 promise.setTry(std::move(t));
1382 if (baton->timed_wait(dur)) {
1383 assert(f.isReady());
1388 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1389 // Set callback so to ensure that the via executor has something on it
1390 // so that once the preceding future triggers this callback, drive will
1391 // always have a callback to satisfy it
1395 f = f.via(e).then([](T&& t) { return std::move(t); });
1396 while (!f.isReady()) {
1399 assert(f.isReady());
1402 } // namespace detail
1403 } // namespace futures
1406 SemiFuture<T>& SemiFuture<T>::wait() & {
1407 futures::detail::waitImpl(*this);
1412 SemiFuture<T>&& SemiFuture<T>::wait() && {
1413 futures::detail::waitImpl(*this);
1414 return std::move(*this);
1418 SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
1419 futures::detail::waitImpl(*this, dur);
1424 SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
1425 futures::detail::waitImpl(*this, dur);
1426 return std::move(*this);
1430 T SemiFuture<T>::get() && {
1431 return std::move(wait().value());
1435 T SemiFuture<T>::get(Duration dur) && {
1437 if (this->isReady()) {
1438 return std::move(this->value());
1445 Future<T>& Future<T>::wait() & {
1446 futures::detail::waitImpl(*this);
1451 Future<T>&& Future<T>::wait() && {
1452 futures::detail::waitImpl(*this);
1453 return std::move(*this);
1457 Future<T>& Future<T>::wait(Duration dur) & {
1458 futures::detail::waitImpl(*this, dur);
1463 Future<T>&& Future<T>::wait(Duration dur) && {
1464 futures::detail::waitImpl(*this, dur);
1465 return std::move(*this);
1469 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1470 futures::detail::waitViaImpl(*this, e);
1475 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1476 futures::detail::waitViaImpl(*this, e);
1477 return std::move(*this);
1481 T Future<T>::get() {
1482 return std::move(wait().value());
1486 T Future<T>::get(Duration dur) {
1488 if (this->isReady()) {
1489 return std::move(this->value());
1496 T Future<T>::getVia(DrivableExecutor* e) {
1497 return std::move(waitVia(e).value());
1504 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1505 return t1.value() == t2.value();
1508 } // namespace detail
1509 } // namespace futures
1512 Future<bool> Future<T>::willEqual(Future<T>& f) {
1513 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1514 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1515 return futures::detail::TryEquals<T>::equals(
1516 std::get<0>(t), std::get<1>(t));
1525 Future<T> Future<T>::filter(F&& predicate) {
1526 return this->then([p = std::forward<F>(predicate)](T val) {
1527 T const& valConstRef = val;
1528 if (!p(valConstRef)) {
1529 throwPredicateDoesNotObtain();
1536 inline Future<Unit> when(bool p, F&& thunk) {
1537 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1540 template <class P, class F>
1541 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1543 auto future = thunk();
1544 return future.then([
1545 predicate = std::forward<P>(predicate),
1546 thunk = std::forward<F>(thunk)
1548 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1551 return makeFuture();
1555 Future<Unit> times(const int n, F&& thunk) {
1556 return folly::whileDo(
1557 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1558 return count->fetch_add(1) < n;
1560 std::forward<F>(thunk));
1564 template <class It, class F, class ItT, class Result>
1565 std::vector<Future<Result>> map(It first, It last, F func) {
1566 std::vector<Future<Result>> results;
1567 for (auto it = first; it != last; it++) {
1568 results.push_back(it->then(func));
1572 } // namespace futures
1574 // Instantiate the most common Future types to save compile time
1575 extern template class Future<Unit>;
1576 extern template class Future<bool>;
1577 extern template class Future<int>;
1578 extern template class Future<int64_t>;
1579 extern template class Future<std::string>;
1580 extern template class Future<double>;
1581 } // namespace folly