2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
31 Timekeeper* getTimekeeperSingleton();
36 static const bool value = false;
40 struct isFuture<Future<T> > {
41 static const bool value = true;
45 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
46 *this = std::move(other);
50 Future<T>& Future<T>::operator=(Future<T>&& other) {
51 std::swap(core_, other.core_);
56 Future<T>::~Future() {
61 void Future<T>::detach() {
63 core_->detachFuture();
69 void Future<T>::throwIfInvalid() const {
76 void Future<T>::setCallback_(F&& func) {
78 core_->setCallback(std::move(func));
81 // Variant: f.then([](Try<T>&& t){ return t.value(); });
84 typename std::enable_if<
85 !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
86 Future<typename std::result_of<F(Try<T>&&)>::type> >::type
87 Future<T>::then(F&& func) {
88 typedef typename std::result_of<F(Try<T>&&)>::type B;
92 // wrap these so we can move them into the lambda
93 folly::MoveWrapper<Promise<B>> p;
94 folly::MoveWrapper<F> funcm(std::forward<F>(func));
96 // grab the Future now before we lose our handle on the Promise
97 auto f = p->getFuture();
99 /* This is a bit tricky.
101 We can't just close over *this in case this Future gets moved. So we
102 make a new dummy Future. We could figure out something more
103 sophisticated that avoids making a new Future object when it can, as an
104 optimization. But this is correct.
106 core_ can't be moved, it is explicitly disallowed (as is copying). But
107 if there's ever a reason to allow it, this is one place that makes that
108 assumption and would need to be fixed. We use a standard shared pointer
109 for core_ (by copying it in), which means in essence obj holds a shared
110 pointer to itself. But this shouldn't leak because Promise will not
111 outlive the continuation, because Promise will setException() with a
112 broken Promise if it is destructed before completed. We could use a
113 weak pointer but it would have to be converted to a shared pointer when
114 func is executed (because the Future returned by func may possibly
115 persist beyond the callback, if it gets moved), and so it is an
116 optimization to just make it shared from the get-go.
118 We have to move in the Promise and func using the MoveWrapper
119 hack. (func could be copied but it's a big drag on perf).
121 Two subtle but important points about this design. detail::Core has no
122 back pointers to Future or Promise, so if Future or Promise get moved
123 (and they will be moved in performant code) we don't have to do
124 anything fancy. And because we store the continuation in the
125 detail::Core, not in the Future, we can execute the continuation even
126 after the Future has gone out of scope. This is an intentional design
127 decision. It is likely we will want to be able to cancel a continuation
128 in some circumstances, but I think it should be explicit not implicit
129 in the destruction of the Future used to create it.
132 [p, funcm](Try<T>&& t) mutable {
134 return (*funcm)(std::move(t));
141 // Variant: f.then([](T&& t){ return t; });
144 typename std::enable_if<
145 !std::is_same<T, void>::value &&
146 !isFuture<typename std::result_of<
147 F(typename detail::AliasIfVoid<T>::type&&)>::type>::value,
148 Future<typename std::result_of<
149 F(typename detail::AliasIfVoid<T>::type&&)>::type> >::type
150 Future<T>::then(F&& func) {
151 typedef typename std::result_of<F(T&&)>::type B;
155 folly::MoveWrapper<Promise<B>> p;
156 folly::MoveWrapper<F> funcm(std::forward<F>(func));
157 auto f = p->getFuture();
160 [p, funcm](Try<T>&& t) mutable {
161 if (t.hasException()) {
162 p->setException(std::move(t.exception()));
165 return (*funcm)(std::move(t.value()));
173 // Variant: f.then([](){ return; });
176 typename std::enable_if<
177 std::is_same<T, void>::value &&
178 !isFuture<typename std::result_of<F()>::type>::value,
179 Future<typename std::result_of<F()>::type> >::type
180 Future<T>::then(F&& func) {
181 typedef typename std::result_of<F()>::type B;
185 folly::MoveWrapper<Promise<B>> p;
186 folly::MoveWrapper<F> funcm(std::forward<F>(func));
187 auto f = p->getFuture();
190 [p, funcm](Try<T>&& t) mutable {
191 if (t.hasException()) {
192 p->setException(std::move(t.exception()));
203 // Variant: f.then([](Try<T>&& t){ return makeFuture<T>(t.value()); });
206 typename std::enable_if<
207 isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
208 Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
209 Future<T>::then(F&& func) {
210 typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
214 // wrap these so we can move them into the lambda
215 folly::MoveWrapper<Promise<B>> p;
216 folly::MoveWrapper<F> funcm(std::forward<F>(func));
218 // grab the Future now before we lose our handle on the Promise
219 auto f = p->getFuture();
222 [p, funcm](Try<T>&& t) mutable {
224 auto f2 = (*funcm)(std::move(t));
225 // that didn't throw, now we can steal p
226 f2.setCallback_([p](Try<B>&& b) mutable {
227 p->fulfilTry(std::move(b));
229 } catch (const std::exception& e) {
230 p->setException(exception_wrapper(std::current_exception(), e));
232 p->setException(exception_wrapper(std::current_exception()));
239 // Variant: f.then([](T&& t){ return makeFuture<T>(t); });
242 typename std::enable_if<
243 !std::is_same<T, void>::value &&
244 isFuture<typename std::result_of<
245 F(typename detail::AliasIfVoid<T>::type&&)>::type>::value,
246 Future<typename std::result_of<
247 F(typename detail::AliasIfVoid<T>::type&&)>::type::value_type> >::type
248 Future<T>::then(F&& func) {
249 typedef typename std::result_of<F(T&&)>::type::value_type B;
253 folly::MoveWrapper<Promise<B>> p;
254 folly::MoveWrapper<F> funcm(std::forward<F>(func));
255 auto f = p->getFuture();
258 [p, funcm](Try<T>&& t) mutable {
259 if (t.hasException()) {
260 p->setException(std::move(t.exception()));
263 auto f2 = (*funcm)(std::move(t.value()));
264 f2.setCallback_([p](Try<B>&& b) mutable {
265 p->fulfilTry(std::move(b));
267 } catch (const std::exception& e) {
268 p->setException(exception_wrapper(std::current_exception(), e));
270 p->setException(exception_wrapper(std::current_exception()));
278 // Variant: f.then([](){ return makeFuture(); });
281 typename std::enable_if<
282 std::is_same<T, void>::value &&
283 isFuture<typename std::result_of<F()>::type>::value,
284 Future<typename std::result_of<F()>::type::value_type> >::type
285 Future<T>::then(F&& func) {
286 typedef typename std::result_of<F()>::type::value_type B;
290 folly::MoveWrapper<Promise<B>> p;
291 folly::MoveWrapper<F> funcm(std::forward<F>(func));
293 auto f = p->getFuture();
296 [p, funcm](Try<T>&& t) mutable {
297 if (t.hasException()) {
298 p->setException(t.exception());
301 auto f2 = (*funcm)();
302 f2.setCallback_([p](Try<B>&& b) mutable {
303 p->fulfilTry(std::move(b));
305 } catch (const std::exception& e) {
306 p->setException(exception_wrapper(std::current_exception(), e));
308 p->setException(exception_wrapper(std::current_exception()));
317 Future<void> Future<T>::then() {
318 return then([] (Try<T>&& t) {});
321 // onError where the callback returns T
324 typename std::enable_if<
325 !detail::Extract<F>::ReturnsFuture::value,
327 Future<T>::onError(F&& func) {
328 typedef typename detail::Extract<F>::FirstArg Exn;
330 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
331 "Return type of onError callback must be T or Future<T>");
334 auto f = p.getFuture();
335 auto pm = folly::makeMoveWrapper(std::move(p));
336 auto funcm = folly::makeMoveWrapper(std::move(func));
337 setCallback_([pm, funcm](Try<T>&& t) mutable {
338 if (!t.template withException<Exn>([&] (Exn& e) {
343 pm->fulfilTry(std::move(t));
350 // onError where the callback returns Future<T>
353 typename std::enable_if<
354 detail::Extract<F>::ReturnsFuture::value,
356 Future<T>::onError(F&& func) {
358 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
359 "Return type of onError callback must be T or Future<T>");
360 typedef typename detail::Extract<F>::FirstArg Exn;
363 auto f = p.getFuture();
364 auto pm = folly::makeMoveWrapper(std::move(p));
365 auto funcm = folly::makeMoveWrapper(std::move(func));
366 setCallback_([pm, funcm](Try<T>&& t) mutable {
367 if (!t.template withException<Exn>([&] (Exn& e) {
369 auto f2 = (*funcm)(e);
370 f2.setCallback_([pm](Try<T>&& t2) mutable {
371 pm->fulfilTry(std::move(t2));
373 } catch (const std::exception& e2) {
374 pm->setException(exception_wrapper(std::current_exception(), e2));
376 pm->setException(exception_wrapper(std::current_exception()));
379 pm->fulfilTry(std::move(t));
387 typename std::add_lvalue_reference<T>::type Future<T>::value() {
390 return core_->getTry().value();
394 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
397 return core_->getTry().value();
401 Try<T>& Future<T>::getTry() {
404 return core_->getTry();
408 template <typename Executor>
409 inline Future<T> Future<T>::via(Executor* executor) && {
413 core_->setExecutor(executor);
415 return std::move(*this);
419 template <typename Executor>
420 inline Future<T> Future<T>::via(Executor* executor) & {
423 MoveWrapper<Promise<T>> p;
424 auto f = p->getFuture();
425 then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
426 return std::move(f).via(executor);
430 bool Future<T>::isReady() const {
432 return core_->ready();
436 void Future<T>::raise(exception_wrapper exception) {
437 core_->raise(std::move(exception));
443 Future<typename std::decay<T>::type> makeFuture(T&& t) {
444 Promise<typename std::decay<T>::type> p;
445 p.setValue(std::forward<T>(t));
446 return p.getFuture();
449 inline // for multiple translation units
450 Future<void> makeFuture() {
453 return p.getFuture();
459 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
460 -> Future<decltype(func())> {
461 Promise<decltype(func())> p;
466 return p.getFuture();
470 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
472 return makeFutureTry(std::move(copy));
476 Future<T> makeFuture(std::exception_ptr const& e) {
479 return p.getFuture();
483 Future<T> makeFuture(exception_wrapper ew) {
485 p.setException(std::move(ew));
486 return p.getFuture();
489 template <class T, class E>
490 typename std::enable_if<std::is_base_of<std::exception, E>::value,
492 makeFuture(E const& e) {
494 p.setException(make_exception_wrapper<E>(e));
495 return p.getFuture();
499 Future<T> makeFuture(Try<T>&& t) {
500 Promise<typename std::decay<T>::type> p;
501 p.fulfilTry(std::move(t));
502 return p.getFuture();
506 inline Future<void> makeFuture(Try<void>&& t) {
507 if (t.hasException()) {
508 return makeFuture<void>(std::move(t.exception()));
515 template <typename Executor>
516 Future<void> via(Executor* executor) {
517 return makeFuture().via(executor);
522 template <typename... Fs>
523 typename detail::VariadicContext<
524 typename std::decay<Fs>::type::value_type...>::type
528 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
529 ctx->total = sizeof...(fs);
530 auto f_saved = ctx->p.getFuture();
531 detail::whenAllVariadicHelper(ctx,
532 std::forward<typename std::decay<Fs>::type>(fs)...);
538 template <class InputIterator>
541 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
542 whenAll(InputIterator first, InputIterator last)
545 typename std::iterator_traits<InputIterator>::value_type::value_type T;
548 return makeFuture(std::vector<Try<T>>());
550 size_t n = std::distance(first, last);
552 auto ctx = new detail::WhenAllContext<T>();
554 ctx->results.resize(n);
556 auto f_saved = ctx->p.getFuture();
558 for (size_t i = 0; first != last; ++first, ++i) {
561 f.setCallback_([ctx, i, n](Try<T>&& t) {
562 ctx->results[i] = std::move(t);
563 if (++ctx->count == n) {
564 ctx->p.setValue(std::move(ctx->results));
573 template <class InputIterator>
578 std::iterator_traits<InputIterator>::value_type::value_type> > >
579 whenAny(InputIterator first, InputIterator last) {
581 typename std::iterator_traits<InputIterator>::value_type::value_type T;
583 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
584 auto f_saved = ctx->p.getFuture();
586 for (size_t i = 0; first != last; first++, i++) {
588 f.setCallback_([i, ctx](Try<T>&& t) {
589 if (!ctx->done.exchange(true)) {
590 ctx->p.setValue(std::make_pair(i, std::move(t)));
599 template <class InputIterator>
600 Future<std::vector<std::pair<size_t, Try<typename
601 std::iterator_traits<InputIterator>::value_type::value_type>>>>
602 whenN(InputIterator first, InputIterator last, size_t n) {
604 std::iterator_traits<InputIterator>::value_type::value_type T;
605 typedef std::vector<std::pair<size_t, Try<T>>> V;
612 auto ctx = std::make_shared<ctx_t>();
615 // for each completed Future, increase count and add to vector, until we
616 // have n completed futures at which point we fulfil our Promise with the
621 it->then([ctx, n, i](Try<T>&& t) {
623 auto c = ++ctx->completed;
625 assert(ctx->v.size() < n);
626 v.push_back(std::make_pair(i, std::move(t)));
628 ctx->p.fulfilTry(Try<V>(std::move(v)));
638 ctx->p.setException(std::runtime_error("Not enough futures"));
641 return ctx->p.getFuture();
644 template <typename T>
646 waitWithSemaphore(Future<T>&& f) {
648 auto done = f.then([&](Try<T> &&t) {
650 return std::move(t.value());
653 while (!done.isReady()) {
654 // There's a race here between the return here and the actual finishing of
655 // the future. f is completed, but the setup may not have finished on done
656 // after the baton has posted.
657 std::this_thread::yield();
663 inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
665 auto done = f.then([&](Try<void> &&t) {
670 while (!done.isReady()) {
671 // There's a race here between the return here and the actual finishing of
672 // the future. f is completed, but the setup may not have finished on done
673 // after the baton has posted.
674 std::this_thread::yield();
679 template <typename T, class Dur>
681 waitWithSemaphore(Future<T>&& f, Dur timeout) {
682 auto baton = std::make_shared<Baton<>>();
683 auto done = f.then([baton](Try<T> &&t) {
685 return std::move(t.value());
687 baton->timed_wait(std::chrono::system_clock::now() + timeout);
693 waitWithSemaphore(Future<void>&& f, Dur timeout) {
694 auto baton = std::make_shared<Baton<>>();
695 auto done = f.then([baton](Try<void> &&t) {
699 baton->timed_wait(std::chrono::system_clock::now() + timeout);
705 void getWaitHelper(Future<T>* f) {
706 // If we already have a value do the cheap thing
711 folly::Baton<> baton;
712 f->then([&](Try<T> const&) {
719 Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
720 // TODO make and use variadic whenAny #5877971
722 auto token = std::make_shared<std::atomic<bool>>();
723 folly::Baton<> baton;
725 folly::detail::getTimekeeperSingleton()->after(dur)
726 .then([&,token](Try<void> const& t) {
727 if (token->exchange(true) == false) {
730 p.setException(TimedOut());
731 } catch (std::exception const& e) {
732 p.setException(exception_wrapper(std::current_exception(), e));
734 p.setException(exception_wrapper(std::current_exception()));
740 f->then([&, token](Try<T>&& t) {
741 if (token->exchange(true) == false) {
742 p.fulfilTry(std::move(t));
748 return p.getFuture();
756 // Big assumption here: the then() call above, since it doesn't move out
757 // the value, leaves us with a value to return here. This would be a big
758 // no-no in user code, but I'm invoking internal developer privilege. This
759 // is slightly more efficient (save a move()) especially if there's an
760 // exception (save a throw).
761 return std::move(value());
765 inline void Future<void>::get() {
771 T Future<T>::get(Duration dur) {
772 return std::move(getWaitTimeoutHelper(this, dur).value());
776 inline void Future<void>::get(Duration dur) {
777 getWaitTimeoutHelper(this, dur).value();
781 T Future<T>::getVia(DrivableExecutor* e) {
785 return std::move(value());
789 inline void Future<void>::getVia(DrivableExecutor* e) {
797 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
798 return within(dur, TimedOut(), tk);
803 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
806 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
809 std::atomic<bool> token;
811 auto ctx = std::make_shared<Context>(std::move(e));
814 tk = folly::detail::getTimekeeperSingleton();
818 .then([ctx](Try<void> const& t) {
819 if (ctx->token.exchange(true) == false) {
822 ctx->promise.setException(std::move(ctx->exception));
823 } catch (std::exception const& e2) {
824 ctx->promise.setException(
825 exception_wrapper(std::current_exception(), e2));
827 ctx->promise.setException(
828 exception_wrapper(std::current_exception()));
833 this->then([ctx](Try<T>&& t) {
834 if (ctx->token.exchange(true) == false) {
835 ctx->promise.fulfilTry(std::move(t));
839 return ctx->promise.getFuture();
843 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
844 return whenAll(*this, futures::sleep(dur, tk))
845 .then([](std::tuple<Try<T>, Try<void>> tup) {
846 Try<T>& t = std::get<0>(tup);
847 return makeFuture<T>(std::move(t));
852 Future<T> Future<T>::wait() {
854 auto done = then([&](Try<T> t) {
856 return makeFuture(std::move(t));
859 while (!done.isReady()) {
860 // There's a race here between the return here and the actual finishing of
861 // the future. f is completed, but the setup may not have finished on done
862 // after the baton has posted.
863 std::this_thread::yield();
869 Future<T> Future<T>::wait(Duration dur) {
870 auto baton = std::make_shared<Baton<>>();
871 auto done = then([baton](Try<T> t) {
873 return makeFuture(std::move(t));
875 baton->timed_wait(std::chrono::system_clock::now() + dur);
880 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
888 Future<T> Future<T>::waitVia(DrivableExecutor* e) && {
892 return std::move(*this);
897 // I haven't included a Future<T&> specialization because I don't forsee us
898 // using it, however it is not difficult to add when needed. Refer to
899 // Future<void> for guidance. std::future and boost::future code would also be