2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
31 Timekeeper* getTimekeeperSingleton();
35 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
36 *this = std::move(other);
40 Future<T>& Future<T>::operator=(Future<T>&& other) {
41 std::swap(core_, other.core_);
48 const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
52 *this = p.getFuture();
58 typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
61 p.setValue(std::forward<F>(val));
62 *this = p.getFuture();
67 typename std::enable_if<std::is_void<F>::value, int>::type>
68 Future<void>::Future() : core_(nullptr) {
71 *this = p.getFuture();
76 Future<T>::~Future() {
81 void Future<T>::detach() {
83 core_->detachFuture();
89 void Future<T>::throwIfInvalid() const {
96 void Future<T>::setCallback_(F&& func) {
98 core_->setCallback(std::move(func));
105 typename std::enable_if<isFuture<F>::value,
106 Future<typename isFuture<T>::Inner>>::type
107 Future<T>::unwrap() {
108 return then([](Future<typename isFuture<T>::Inner> internal_future) {
109 return internal_future;
115 // Variant: returns a value
116 // e.g. f.then([](Try<T>&& t){ return t.value(); });
118 template <typename F, typename R, bool isTry, typename... Args>
119 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
120 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
121 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
122 typedef typename R::ReturnsFuture::Inner B;
126 // wrap these so we can move them into the lambda
127 folly::MoveWrapper<Promise<B>> p;
128 folly::MoveWrapper<F> funcm(std::forward<F>(func));
130 // grab the Future now before we lose our handle on the Promise
131 auto f = p->getFuture();
133 f.setExecutor(getExecutor());
136 /* This is a bit tricky.
138 We can't just close over *this in case this Future gets moved. So we
139 make a new dummy Future. We could figure out something more
140 sophisticated that avoids making a new Future object when it can, as an
141 optimization. But this is correct.
143 core_ can't be moved, it is explicitly disallowed (as is copying). But
144 if there's ever a reason to allow it, this is one place that makes that
145 assumption and would need to be fixed. We use a standard shared pointer
146 for core_ (by copying it in), which means in essence obj holds a shared
147 pointer to itself. But this shouldn't leak because Promise will not
148 outlive the continuation, because Promise will setException() with a
149 broken Promise if it is destructed before completed. We could use a
150 weak pointer but it would have to be converted to a shared pointer when
151 func is executed (because the Future returned by func may possibly
152 persist beyond the callback, if it gets moved), and so it is an
153 optimization to just make it shared from the get-go.
155 We have to move in the Promise and func using the MoveWrapper
156 hack. (func could be copied but it's a big drag on perf).
158 Two subtle but important points about this design. detail::Core has no
159 back pointers to Future or Promise, so if Future or Promise get moved
160 (and they will be moved in performant code) we don't have to do
161 anything fancy. And because we store the continuation in the
162 detail::Core, not in the Future, we can execute the continuation even
163 after the Future has gone out of scope. This is an intentional design
164 decision. It is likely we will want to be able to cancel a continuation
165 in some circumstances, but I think it should be explicit not implicit
166 in the destruction of the Future used to create it.
169 [p, funcm](Try<T>&& t) mutable {
170 if (!isTry && t.hasException()) {
171 p->setException(std::move(t.exception()));
174 return (*funcm)(t.template get<isTry, Args>()...);
182 // Variant: returns a Future
183 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
185 template <typename F, typename R, bool isTry, typename... Args>
186 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
187 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
188 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
189 typedef typename R::ReturnsFuture::Inner B;
193 // wrap these so we can move them into the lambda
194 folly::MoveWrapper<Promise<B>> p;
195 folly::MoveWrapper<F> funcm(std::forward<F>(func));
197 // grab the Future now before we lose our handle on the Promise
198 auto f = p->getFuture();
200 f.setExecutor(getExecutor());
204 [p, funcm](Try<T>&& t) mutable {
205 if (!isTry && t.hasException()) {
206 p->setException(std::move(t.exception()));
209 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
210 // that didn't throw, now we can steal p
211 f2.setCallback_([p](Try<B>&& b) mutable {
212 p->fulfilTry(std::move(b));
214 } catch (const std::exception& e) {
215 p->setException(exception_wrapper(std::current_exception(), e));
217 p->setException(exception_wrapper(std::current_exception()));
225 template <typename T>
226 template <typename R, typename Caller, typename... Args>
227 Future<typename isFuture<R>::Inner>
228 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
229 typedef typename std::remove_cv<
230 typename std::remove_reference<
231 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
232 return then([instance, func](Try<T>&& t){
233 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
238 Future<void> Future<T>::then() {
239 return then([] (Try<T>&& t) {});
242 // onError where the callback returns T
245 typename std::enable_if<
246 !detail::Extract<F>::ReturnsFuture::value,
248 Future<T>::onError(F&& func) {
249 typedef typename detail::Extract<F>::FirstArg Exn;
251 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
252 "Return type of onError callback must be T or Future<T>");
255 auto f = p.getFuture();
256 auto pm = folly::makeMoveWrapper(std::move(p));
257 auto funcm = folly::makeMoveWrapper(std::move(func));
258 setCallback_([pm, funcm](Try<T>&& t) mutable {
259 if (!t.template withException<Exn>([&] (Exn& e) {
264 pm->fulfilTry(std::move(t));
271 // onError where the callback returns Future<T>
274 typename std::enable_if<
275 detail::Extract<F>::ReturnsFuture::value,
277 Future<T>::onError(F&& func) {
279 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
280 "Return type of onError callback must be T or Future<T>");
281 typedef typename detail::Extract<F>::FirstArg Exn;
284 auto f = p.getFuture();
285 auto pm = folly::makeMoveWrapper(std::move(p));
286 auto funcm = folly::makeMoveWrapper(std::move(func));
287 setCallback_([pm, funcm](Try<T>&& t) mutable {
288 if (!t.template withException<Exn>([&] (Exn& e) {
290 auto f2 = (*funcm)(e);
291 f2.setCallback_([pm](Try<T>&& t2) mutable {
292 pm->fulfilTry(std::move(t2));
294 } catch (const std::exception& e2) {
295 pm->setException(exception_wrapper(std::current_exception(), e2));
297 pm->setException(exception_wrapper(std::current_exception()));
300 pm->fulfilTry(std::move(t));
309 Future<T> Future<T>::ensure(F func) {
310 MoveWrapper<F> funcw(std::move(func));
311 return this->then([funcw](Try<T>&& t) {
313 return makeFuture(std::move(t));
319 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
320 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
321 return within(dur, tk)
322 .onError([funcw](TimedOut const&) { return (*funcw)(); });
326 typename std::add_lvalue_reference<T>::type Future<T>::value() {
329 return core_->getTry().value();
333 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
336 return core_->getTry().value();
340 Try<T>& Future<T>::getTry() {
343 return core_->getTry();
347 template <typename Executor>
348 inline Future<T> Future<T>::via(Executor* executor) && {
351 setExecutor(executor);
353 return std::move(*this);
357 template <typename Executor>
358 inline Future<T> Future<T>::via(Executor* executor) & {
361 MoveWrapper<Promise<T>> p;
362 auto f = p->getFuture();
363 then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
364 return std::move(f).via(executor);
368 bool Future<T>::isReady() const {
370 return core_->ready();
374 void Future<T>::raise(exception_wrapper exception) {
375 core_->raise(std::move(exception));
381 Future<typename std::decay<T>::type> makeFuture(T&& t) {
382 Promise<typename std::decay<T>::type> p;
383 p.setValue(std::forward<T>(t));
384 return p.getFuture();
387 inline // for multiple translation units
388 Future<void> makeFuture() {
391 return p.getFuture();
397 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
398 -> Future<decltype(func())> {
399 Promise<decltype(func())> p;
404 return p.getFuture();
408 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
410 return makeFutureTry(std::move(copy));
414 Future<T> makeFuture(std::exception_ptr const& e) {
417 return p.getFuture();
421 Future<T> makeFuture(exception_wrapper ew) {
423 p.setException(std::move(ew));
424 return p.getFuture();
427 template <class T, class E>
428 typename std::enable_if<std::is_base_of<std::exception, E>::value,
430 makeFuture(E const& e) {
432 p.setException(make_exception_wrapper<E>(e));
433 return p.getFuture();
437 Future<T> makeFuture(Try<T>&& t) {
438 Promise<typename std::decay<T>::type> p;
439 p.fulfilTry(std::move(t));
440 return p.getFuture();
444 inline Future<void> makeFuture(Try<void>&& t) {
445 if (t.hasException()) {
446 return makeFuture<void>(std::move(t.exception()));
453 template <typename Executor>
454 Future<void> via(Executor* executor) {
455 return makeFuture().via(executor);
460 template <typename... Fs>
461 typename detail::VariadicContext<
462 typename std::decay<Fs>::type::value_type...>::type
466 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
467 ctx->total = sizeof...(fs);
468 auto f_saved = ctx->p.getFuture();
469 detail::whenAllVariadicHelper(ctx,
470 std::forward<typename std::decay<Fs>::type>(fs)...);
476 template <class InputIterator>
479 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
480 whenAll(InputIterator first, InputIterator last)
483 typename std::iterator_traits<InputIterator>::value_type::value_type T;
486 return makeFuture(std::vector<Try<T>>());
488 size_t n = std::distance(first, last);
490 auto ctx = new detail::WhenAllContext<T>();
492 ctx->results.resize(n);
494 auto f_saved = ctx->p.getFuture();
496 for (size_t i = 0; first != last; ++first, ++i) {
499 f.setCallback_([ctx, i, n](Try<T>&& t) {
500 ctx->results[i] = std::move(t);
501 if (++ctx->count == n) {
502 ctx->p.setValue(std::move(ctx->results));
511 template <class InputIterator>
516 std::iterator_traits<InputIterator>::value_type::value_type> > >
517 whenAny(InputIterator first, InputIterator last) {
519 typename std::iterator_traits<InputIterator>::value_type::value_type T;
521 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
522 auto f_saved = ctx->p.getFuture();
524 for (size_t i = 0; first != last; first++, i++) {
526 f.setCallback_([i, ctx](Try<T>&& t) {
527 if (!ctx->done.exchange(true)) {
528 ctx->p.setValue(std::make_pair(i, std::move(t)));
537 template <class InputIterator>
538 Future<std::vector<std::pair<size_t, Try<typename
539 std::iterator_traits<InputIterator>::value_type::value_type>>>>
540 whenN(InputIterator first, InputIterator last, size_t n) {
542 std::iterator_traits<InputIterator>::value_type::value_type T;
543 typedef std::vector<std::pair<size_t, Try<T>>> V;
550 auto ctx = std::make_shared<ctx_t>();
553 // for each completed Future, increase count and add to vector, until we
554 // have n completed futures at which point we fulfil our Promise with the
559 it->then([ctx, n, i](Try<T>&& t) {
561 auto c = ++ctx->completed;
563 assert(ctx->v.size() < n);
564 v.push_back(std::make_pair(i, std::move(t)));
566 ctx->p.fulfilTry(Try<V>(std::move(v)));
576 ctx->p.setException(std::runtime_error("Not enough futures"));
579 return ctx->p.getFuture();
583 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
584 return within(dur, TimedOut(), tk);
589 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
592 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
595 std::atomic<bool> token;
597 auto ctx = std::make_shared<Context>(std::move(e));
600 tk = folly::detail::getTimekeeperSingleton();
604 .then([ctx](Try<void> const& t) {
605 if (ctx->token.exchange(true) == false) {
606 if (t.hasException()) {
607 ctx->promise.setException(std::move(t.exception()));
609 ctx->promise.setException(std::move(ctx->exception));
614 this->then([ctx](Try<T>&& t) {
615 if (ctx->token.exchange(true) == false) {
616 ctx->promise.fulfilTry(std::move(t));
620 return ctx->promise.getFuture();
624 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
625 return whenAll(*this, futures::sleep(dur, tk))
626 .then([](std::tuple<Try<T>, Try<void>> tup) {
627 Try<T>& t = std::get<0>(tup);
628 return makeFuture<T>(std::move(t));
635 void waitImpl(Future<T>& f) {
636 // short-circuit if there's nothing to do
637 if (f.isReady()) return;
640 f = f.then([&](Try<T> t) {
642 return makeFuture(std::move(t));
646 // There's a race here between the return here and the actual finishing of
647 // the future. f is completed, but the setup may not have finished on done
648 // after the baton has posted.
649 while (!f.isReady()) {
650 std::this_thread::yield();
655 void waitImpl(Future<T>& f, Duration dur) {
656 // short-circuit if there's nothing to do
657 if (f.isReady()) return;
659 auto baton = std::make_shared<Baton<>>();
660 f = f.then([baton](Try<T> t) {
662 return makeFuture(std::move(t));
665 // Let's preserve the invariant that if we did not timeout (timed_wait returns
666 // true), then the returned Future is complete when it is returned to the
667 // caller. We need to wait out the race for that Future to complete.
668 if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
669 while (!f.isReady()) {
670 std::this_thread::yield();
676 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
677 while (!f.isReady()) {
685 Future<T>& Future<T>::wait() & {
686 detail::waitImpl(*this);
691 Future<T>&& Future<T>::wait() && {
692 detail::waitImpl(*this);
693 return std::move(*this);
697 Future<T>& Future<T>::wait(Duration dur) & {
698 detail::waitImpl(*this, dur);
703 Future<T>&& Future<T>::wait(Duration dur) && {
704 detail::waitImpl(*this, dur);
705 return std::move(*this);
709 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
710 detail::waitViaImpl(*this, e);
715 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
716 detail::waitViaImpl(*this, e);
717 return std::move(*this);
722 return std::move(wait().value());
726 inline void Future<void>::get() {
731 T Future<T>::get(Duration dur) {
734 return std::move(value());
741 inline void Future<void>::get(Duration dur) {
751 T Future<T>::getVia(DrivableExecutor* e) {
752 return std::move(waitVia(e).value());
756 inline void Future<void>::getVia(DrivableExecutor* e) {
761 Future<bool> Future<T>::willEqual(Future<T>& f) {
762 return whenAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
763 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
764 return std::get<0>(t).value() == std::get<1>(t).value();
774 Future<Z> chainHelper(Future<Z> f) {
778 template <class Z, class F, class Fn, class... Callbacks>
779 Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
780 return chainHelper<Z>(f.then(fn), fns...);
784 template <class A, class Z, class... Callbacks>
785 std::function<Future<Z>(Try<A>)>
786 chain(Callbacks... fns) {
787 MoveWrapper<Promise<A>> pw;
788 MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
789 return [=](Try<A> t) mutable {
790 pw->fulfilTry(std::move(t));
791 return std::move(*fw);
799 // I haven't included a Future<T&> specialization because I don't forsee us
800 // using it, however it is not difficult to add when needed. Refer to
801 // Future<void> for guidance. std::future and boost::future code would also be