2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
31 Timekeeper* getTimekeeperSingleton();
36 static const bool value = false;
40 struct isFuture<Future<T> > {
41 static const bool value = true;
45 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
46 *this = std::move(other);
50 Future<T>& Future<T>::operator=(Future<T>&& other) {
51 std::swap(core_, other.core_);
56 Future<T>::~Future() {
61 void Future<T>::detach() {
63 core_->detachFuture();
69 void Future<T>::throwIfInvalid() const {
76 void Future<T>::setCallback_(F&& func) {
78 core_->setCallback(std::move(func));
81 // Variant: f.then([](Try<T>&& t){ return t.value(); });
84 typename std::enable_if<
85 !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
86 Future<typename std::result_of<F(Try<T>&&)>::type> >::type
87 Future<T>::then(F&& func) {
88 typedef typename std::result_of<F(Try<T>&&)>::type B;
92 // wrap these so we can move them into the lambda
93 folly::MoveWrapper<Promise<B>> p;
94 folly::MoveWrapper<F> funcm(std::forward<F>(func));
96 // grab the Future now before we lose our handle on the Promise
97 auto f = p->getFuture();
99 /* This is a bit tricky.
101 We can't just close over *this in case this Future gets moved. So we
102 make a new dummy Future. We could figure out something more
103 sophisticated that avoids making a new Future object when it can, as an
104 optimization. But this is correct.
106 core_ can't be moved, it is explicitly disallowed (as is copying). But
107 if there's ever a reason to allow it, this is one place that makes that
108 assumption and would need to be fixed. We use a standard shared pointer
109 for core_ (by copying it in), which means in essence obj holds a shared
110 pointer to itself. But this shouldn't leak because Promise will not
111 outlive the continuation, because Promise will setException() with a
112 broken Promise if it is destructed before completed. We could use a
113 weak pointer but it would have to be converted to a shared pointer when
114 func is executed (because the Future returned by func may possibly
115 persist beyond the callback, if it gets moved), and so it is an
116 optimization to just make it shared from the get-go.
118 We have to move in the Promise and func using the MoveWrapper
119 hack. (func could be copied but it's a big drag on perf).
121 Two subtle but important points about this design. detail::Core has no
122 back pointers to Future or Promise, so if Future or Promise get moved
123 (and they will be moved in performant code) we don't have to do
124 anything fancy. And because we store the continuation in the
125 detail::Core, not in the Future, we can execute the continuation even
126 after the Future has gone out of scope. This is an intentional design
127 decision. It is likely we will want to be able to cancel a continuation
128 in some circumstances, but I think it should be explicit not implicit
129 in the destruction of the Future used to create it.
132 [p, funcm](Try<T>&& t) mutable {
134 return (*funcm)(std::move(t));
141 // Variant: f.then([](T&& t){ return t; });
144 typename std::enable_if<
145 !std::is_same<T, void>::value &&
146 !isFuture<typename std::result_of<
147 F(typename detail::AliasIfVoid<T>::type&&)>::type>::value,
148 Future<typename std::result_of<
149 F(typename detail::AliasIfVoid<T>::type&&)>::type> >::type
150 Future<T>::then(F&& func) {
151 typedef typename std::result_of<F(T&&)>::type B;
155 folly::MoveWrapper<Promise<B>> p;
156 folly::MoveWrapper<F> funcm(std::forward<F>(func));
157 auto f = p->getFuture();
160 [p, funcm](Try<T>&& t) mutable {
161 if (t.hasException()) {
162 p->setException(std::move(t.exception()));
165 return (*funcm)(std::move(t.value()));
173 // Variant: f.then([](){ return; });
176 typename std::enable_if<
177 std::is_same<T, void>::value &&
178 !isFuture<typename std::result_of<F()>::type>::value,
179 Future<typename std::result_of<F()>::type> >::type
180 Future<T>::then(F&& func) {
181 typedef typename std::result_of<F()>::type B;
185 folly::MoveWrapper<Promise<B>> p;
186 folly::MoveWrapper<F> funcm(std::forward<F>(func));
187 auto f = p->getFuture();
190 [p, funcm](Try<T>&& t) mutable {
191 if (t.hasException()) {
192 p->setException(std::move(t.exception()));
203 // Variant: f.then([](Try<T>&& t){ return makeFuture<T>(t.value()); });
206 typename std::enable_if<
207 isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
208 Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
209 Future<T>::then(F&& func) {
210 typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
214 // wrap these so we can move them into the lambda
215 folly::MoveWrapper<Promise<B>> p;
216 folly::MoveWrapper<F> funcm(std::forward<F>(func));
218 // grab the Future now before we lose our handle on the Promise
219 auto f = p->getFuture();
222 [p, funcm](Try<T>&& t) mutable {
224 auto f2 = (*funcm)(std::move(t));
225 // that didn't throw, now we can steal p
226 f2.setCallback_([p](Try<B>&& b) mutable {
227 p->fulfilTry(std::move(b));
229 } catch (const std::exception& e) {
230 p->setException(exception_wrapper(std::current_exception(), e));
232 p->setException(exception_wrapper(std::current_exception()));
239 // Variant: f.then([](T&& t){ return makeFuture<T>(t); });
242 typename std::enable_if<
243 !std::is_same<T, void>::value &&
244 isFuture<typename std::result_of<
245 F(typename detail::AliasIfVoid<T>::type&&)>::type>::value,
246 Future<typename std::result_of<
247 F(typename detail::AliasIfVoid<T>::type&&)>::type::value_type> >::type
248 Future<T>::then(F&& func) {
249 typedef typename std::result_of<F(T&&)>::type::value_type B;
253 folly::MoveWrapper<Promise<B>> p;
254 folly::MoveWrapper<F> funcm(std::forward<F>(func));
255 auto f = p->getFuture();
258 [p, funcm](Try<T>&& t) mutable {
259 if (t.hasException()) {
260 p->setException(std::move(t.exception()));
263 auto f2 = (*funcm)(std::move(t.value()));
264 f2.setCallback_([p](Try<B>&& b) mutable {
265 p->fulfilTry(std::move(b));
267 } catch (const std::exception& e) {
268 p->setException(exception_wrapper(std::current_exception(), e));
270 p->setException(exception_wrapper(std::current_exception()));
278 // Variant: f.then([](){ return makeFuture(); });
281 typename std::enable_if<
282 std::is_same<T, void>::value &&
283 isFuture<typename std::result_of<F()>::type>::value,
284 Future<typename std::result_of<F()>::type::value_type> >::type
285 Future<T>::then(F&& func) {
286 typedef typename std::result_of<F()>::type::value_type B;
290 folly::MoveWrapper<Promise<B>> p;
291 folly::MoveWrapper<F> funcm(std::forward<F>(func));
293 auto f = p->getFuture();
296 [p, funcm](Try<T>&& t) mutable {
297 if (t.hasException()) {
298 p->setException(t.exception());
301 auto f2 = (*funcm)();
302 f2.setCallback_([p](Try<B>&& b) mutable {
303 p->fulfilTry(std::move(b));
305 } catch (const std::exception& e) {
306 p->setException(exception_wrapper(std::current_exception(), e));
308 p->setException(exception_wrapper(std::current_exception()));
317 Future<void> Future<T>::then() {
318 return then([] (Try<T>&& t) {});
321 // onError where the callback returns T
324 typename std::enable_if<
325 !detail::Extract<F>::ReturnsFuture::value,
327 Future<T>::onError(F&& func) {
328 typedef typename detail::Extract<F>::FirstArg Exn;
330 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
331 "Return type of onError callback must be T or Future<T>");
334 auto f = p.getFuture();
335 auto pm = folly::makeMoveWrapper(std::move(p));
336 auto funcm = folly::makeMoveWrapper(std::move(func));
337 setCallback_([pm, funcm](Try<T>&& t) mutable {
338 if (!t.template withException<Exn>([&] (Exn& e) {
343 pm->fulfilTry(std::move(t));
350 // onError where the callback returns Future<T>
353 typename std::enable_if<
354 detail::Extract<F>::ReturnsFuture::value,
356 Future<T>::onError(F&& func) {
358 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
359 "Return type of onError callback must be T or Future<T>");
360 typedef typename detail::Extract<F>::FirstArg Exn;
363 auto f = p.getFuture();
364 auto pm = folly::makeMoveWrapper(std::move(p));
365 auto funcm = folly::makeMoveWrapper(std::move(func));
366 setCallback_([pm, funcm](Try<T>&& t) mutable {
367 if (!t.template withException<Exn>([&] (Exn& e) {
369 auto f2 = (*funcm)(e);
370 f2.setCallback_([pm](Try<T>&& t2) mutable {
371 pm->fulfilTry(std::move(t2));
373 } catch (const std::exception& e2) {
374 pm->setException(exception_wrapper(std::current_exception(), e2));
376 pm->setException(exception_wrapper(std::current_exception()));
379 pm->fulfilTry(std::move(t));
387 typename std::add_lvalue_reference<T>::type Future<T>::value() {
390 return core_->getTry().value();
394 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
397 return core_->getTry().value();
401 Try<T>& Future<T>::getTry() {
404 return core_->getTry();
408 template <typename Executor>
409 inline Future<T> Future<T>::via(Executor* executor) && {
413 core_->setExecutor(executor);
415 return std::move(*this);
419 template <typename Executor>
420 inline Future<T> Future<T>::via(Executor* executor) & {
423 MoveWrapper<Promise<T>> p;
424 auto f = p->getFuture();
425 then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
426 return std::move(f).via(executor);
430 bool Future<T>::isReady() const {
432 return core_->ready();
436 void Future<T>::raise(exception_wrapper exception) {
437 core_->raise(std::move(exception));
443 Future<typename std::decay<T>::type> makeFuture(T&& t) {
444 Promise<typename std::decay<T>::type> p;
445 p.setValue(std::forward<T>(t));
446 return p.getFuture();
449 inline // for multiple translation units
450 Future<void> makeFuture() {
453 return p.getFuture();
459 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
460 -> Future<decltype(func())> {
461 Promise<decltype(func())> p;
466 return p.getFuture();
470 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
472 return makeFutureTry(std::move(copy));
476 Future<T> makeFuture(std::exception_ptr const& e) {
479 return p.getFuture();
483 Future<T> makeFuture(exception_wrapper ew) {
485 p.setException(std::move(ew));
486 return p.getFuture();
489 template <class T, class E>
490 typename std::enable_if<std::is_base_of<std::exception, E>::value,
492 makeFuture(E const& e) {
494 p.setException(make_exception_wrapper<E>(e));
495 return p.getFuture();
499 Future<T> makeFuture(Try<T>&& t) {
500 if (t.hasException()) {
501 return makeFuture<T>(std::move(t.exception()));
503 return makeFuture<T>(std::move(t.value()));
508 inline Future<void> makeFuture(Try<void>&& t) {
509 if (t.hasException()) {
510 return makeFuture<void>(std::move(t.exception()));
517 template <typename Executor>
518 Future<void> via(Executor* executor) {
519 return makeFuture().via(executor);
524 template <typename... Fs>
525 typename detail::VariadicContext<
526 typename std::decay<Fs>::type::value_type...>::type
530 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
531 ctx->total = sizeof...(fs);
532 auto f_saved = ctx->p.getFuture();
533 detail::whenAllVariadicHelper(ctx,
534 std::forward<typename std::decay<Fs>::type>(fs)...);
540 template <class InputIterator>
543 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
544 whenAll(InputIterator first, InputIterator last)
547 typename std::iterator_traits<InputIterator>::value_type::value_type T;
550 return makeFuture(std::vector<Try<T>>());
552 size_t n = std::distance(first, last);
554 auto ctx = new detail::WhenAllContext<T>();
556 ctx->results.resize(n);
558 auto f_saved = ctx->p.getFuture();
560 for (size_t i = 0; first != last; ++first, ++i) {
563 f.setCallback_([ctx, i, n](Try<T>&& t) {
564 ctx->results[i] = std::move(t);
565 if (++ctx->count == n) {
566 ctx->p.setValue(std::move(ctx->results));
575 template <class InputIterator>
580 std::iterator_traits<InputIterator>::value_type::value_type> > >
581 whenAny(InputIterator first, InputIterator last) {
583 typename std::iterator_traits<InputIterator>::value_type::value_type T;
585 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
586 auto f_saved = ctx->p.getFuture();
588 for (size_t i = 0; first != last; first++, i++) {
590 f.setCallback_([i, ctx](Try<T>&& t) {
591 if (!ctx->done.exchange(true)) {
592 ctx->p.setValue(std::make_pair(i, std::move(t)));
601 template <class InputIterator>
602 Future<std::vector<std::pair<size_t, Try<typename
603 std::iterator_traits<InputIterator>::value_type::value_type>>>>
604 whenN(InputIterator first, InputIterator last, size_t n) {
606 std::iterator_traits<InputIterator>::value_type::value_type T;
607 typedef std::vector<std::pair<size_t, Try<T>>> V;
614 auto ctx = std::make_shared<ctx_t>();
617 // for each completed Future, increase count and add to vector, until we
618 // have n completed futures at which point we fulfil our Promise with the
623 it->then([ctx, n, i](Try<T>&& t) {
625 auto c = ++ctx->completed;
627 assert(ctx->v.size() < n);
628 v.push_back(std::make_pair(i, std::move(t)));
630 ctx->p.fulfilTry(Try<V>(std::move(v)));
640 ctx->p.setException(std::runtime_error("Not enough futures"));
643 return ctx->p.getFuture();
646 template <typename T>
648 waitWithSemaphore(Future<T>&& f) {
650 auto done = f.then([&](Try<T> &&t) {
652 return std::move(t.value());
655 while (!done.isReady()) {
656 // There's a race here between the return here and the actual finishing of
657 // the future. f is completed, but the setup may not have finished on done
658 // after the baton has posted.
659 std::this_thread::yield();
665 inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
667 auto done = f.then([&](Try<void> &&t) {
672 while (!done.isReady()) {
673 // There's a race here between the return here and the actual finishing of
674 // the future. f is completed, but the setup may not have finished on done
675 // after the baton has posted.
676 std::this_thread::yield();
681 template <typename T, class Dur>
683 waitWithSemaphore(Future<T>&& f, Dur timeout) {
684 auto baton = std::make_shared<Baton<>>();
685 auto done = f.then([baton](Try<T> &&t) {
687 return std::move(t.value());
689 baton->timed_wait(std::chrono::system_clock::now() + timeout);
695 waitWithSemaphore(Future<void>&& f, Dur timeout) {
696 auto baton = std::make_shared<Baton<>>();
697 auto done = f.then([baton](Try<void> &&t) {
701 baton->timed_wait(std::chrono::system_clock::now() + timeout);
707 void getWaitHelper(Future<T>* f) {
708 // If we already have a value do the cheap thing
713 folly::Baton<> baton;
714 f->then([&](Try<T> const&) {
721 Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
722 // TODO make and use variadic whenAny #5877971
724 auto token = std::make_shared<std::atomic<bool>>();
725 folly::Baton<> baton;
727 folly::detail::getTimekeeperSingleton()->after(dur)
728 .then([&,token](Try<void> const& t) {
729 if (token->exchange(true) == false) {
732 p.setException(TimedOut());
733 } catch (std::exception const& e) {
734 p.setException(exception_wrapper(std::current_exception(), e));
736 p.setException(exception_wrapper(std::current_exception()));
742 f->then([&, token](Try<T>&& t) {
743 if (token->exchange(true) == false) {
744 p.fulfilTry(std::move(t));
750 return p.getFuture();
758 // Big assumption here: the then() call above, since it doesn't move out
759 // the value, leaves us with a value to return here. This would be a big
760 // no-no in user code, but I'm invoking internal developer privilege. This
761 // is slightly more efficient (save a move()) especially if there's an
762 // exception (save a throw).
763 return std::move(value());
767 inline void Future<void>::get() {
773 T Future<T>::get(Duration dur) {
774 return std::move(getWaitTimeoutHelper(this, dur).value());
778 inline void Future<void>::get(Duration dur) {
779 getWaitTimeoutHelper(this, dur).value();
783 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
784 return within(dur, TimedOut(), tk);
789 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
792 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
795 std::atomic<bool> token;
797 auto ctx = std::make_shared<Context>(std::move(e));
800 tk = folly::detail::getTimekeeperSingleton();
804 .then([ctx](Try<void> const& t) {
805 if (ctx->token.exchange(true) == false) {
808 ctx->promise.setException(std::move(ctx->exception));
809 } catch (std::exception const& e2) {
810 ctx->promise.setException(
811 exception_wrapper(std::current_exception(), e2));
813 ctx->promise.setException(
814 exception_wrapper(std::current_exception()));
819 this->then([ctx](Try<T>&& t) {
820 if (ctx->token.exchange(true) == false) {
821 ctx->promise.fulfilTry(std::move(t));
825 return ctx->promise.getFuture();
829 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
830 return whenAll(*this, futures::sleep(dur, tk))
831 .then([](std::tuple<Try<T>, Try<void>> tup) {
832 Try<T>& t = std::get<0>(tup);
833 return makeFuture<T>(std::move(t));
839 // I haven't included a Future<T&> specialization because I don't forsee us
840 // using it, however it is not difficult to add when needed. Refer to
841 // Future<void> for guidance. std::future and boost::future code would also be