2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
31 Timekeeper* getTimekeeperSingleton();
35 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
36 *this = std::move(other);
40 Future<T>& Future<T>::operator=(Future<T>&& other) {
41 std::swap(core_, other.core_);
46 Future<T>::~Future() {
51 void Future<T>::detach() {
53 core_->detachFuture();
59 void Future<T>::throwIfInvalid() const {
66 void Future<T>::setCallback_(F&& func) {
68 core_->setCallback(std::move(func));
71 // Variant: returns a value
72 // e.g. f.then([](Try<T>&& t){ return t.value(); });
74 template <typename F, typename R, bool isTry, typename... Args>
75 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
76 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
77 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
78 typedef typename R::ReturnsFuture::Inner B;
82 // wrap these so we can move them into the lambda
83 folly::MoveWrapper<Promise<B>> p;
84 folly::MoveWrapper<F> funcm(std::forward<F>(func));
86 // grab the Future now before we lose our handle on the Promise
87 auto f = p->getFuture();
89 /* This is a bit tricky.
91 We can't just close over *this in case this Future gets moved. So we
92 make a new dummy Future. We could figure out something more
93 sophisticated that avoids making a new Future object when it can, as an
94 optimization. But this is correct.
96 core_ can't be moved, it is explicitly disallowed (as is copying). But
97 if there's ever a reason to allow it, this is one place that makes that
98 assumption and would need to be fixed. We use a standard shared pointer
99 for core_ (by copying it in), which means in essence obj holds a shared
100 pointer to itself. But this shouldn't leak because Promise will not
101 outlive the continuation, because Promise will setException() with a
102 broken Promise if it is destructed before completed. We could use a
103 weak pointer but it would have to be converted to a shared pointer when
104 func is executed (because the Future returned by func may possibly
105 persist beyond the callback, if it gets moved), and so it is an
106 optimization to just make it shared from the get-go.
108 We have to move in the Promise and func using the MoveWrapper
109 hack. (func could be copied but it's a big drag on perf).
111 Two subtle but important points about this design. detail::Core has no
112 back pointers to Future or Promise, so if Future or Promise get moved
113 (and they will be moved in performant code) we don't have to do
114 anything fancy. And because we store the continuation in the
115 detail::Core, not in the Future, we can execute the continuation even
116 after the Future has gone out of scope. This is an intentional design
117 decision. It is likely we will want to be able to cancel a continuation
118 in some circumstances, but I think it should be explicit not implicit
119 in the destruction of the Future used to create it.
122 [p, funcm](Try<T>&& t) mutable {
123 if (!isTry && t.hasException()) {
124 p->setException(std::move(t.exception()));
127 return (*funcm)(t.template get<isTry, Args>()...);
135 // Variant: returns a Future
136 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
138 template <typename F, typename R, bool isTry, typename... Args>
139 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
140 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
141 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
142 typedef typename R::ReturnsFuture::Inner B;
146 // wrap these so we can move them into the lambda
147 folly::MoveWrapper<Promise<B>> p;
148 folly::MoveWrapper<F> funcm(std::forward<F>(func));
150 // grab the Future now before we lose our handle on the Promise
151 auto f = p->getFuture();
154 [p, funcm](Try<T>&& t) mutable {
155 if (!isTry && t.hasException()) {
156 p->setException(std::move(t.exception()));
159 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
160 // that didn't throw, now we can steal p
161 f2.setCallback_([p](Try<B>&& b) mutable {
162 p->fulfilTry(std::move(b));
164 } catch (const std::exception& e) {
165 p->setException(exception_wrapper(std::current_exception()));
173 template <typename T>
174 template <typename Caller, typename R, typename... Args>
175 Future<typename isFuture<R>::Inner>
176 Future<T>::then(Caller *instance, R(Caller::*func)(Args...)) {
177 typedef typename std::remove_cv<
178 typename std::remove_reference<
179 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
180 return then([instance, func](Try<T>&& t){
181 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
186 Future<void> Future<T>::then() {
187 return then([] (Try<T>&& t) {});
190 // onError where the callback returns T
193 typename std::enable_if<
194 !detail::Extract<F>::ReturnsFuture::value,
196 Future<T>::onError(F&& func) {
197 typedef typename detail::Extract<F>::FirstArg Exn;
199 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
200 "Return type of onError callback must be T or Future<T>");
203 auto f = p.getFuture();
204 auto pm = folly::makeMoveWrapper(std::move(p));
205 auto funcm = folly::makeMoveWrapper(std::move(func));
206 setCallback_([pm, funcm](Try<T>&& t) mutable {
207 if (!t.template withException<Exn>([&] (Exn& e) {
212 pm->fulfilTry(std::move(t));
219 // onError where the callback returns Future<T>
222 typename std::enable_if<
223 detail::Extract<F>::ReturnsFuture::value,
225 Future<T>::onError(F&& func) {
227 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
228 "Return type of onError callback must be T or Future<T>");
229 typedef typename detail::Extract<F>::FirstArg Exn;
232 auto f = p.getFuture();
233 auto pm = folly::makeMoveWrapper(std::move(p));
234 auto funcm = folly::makeMoveWrapper(std::move(func));
235 setCallback_([pm, funcm](Try<T>&& t) mutable {
236 if (!t.template withException<Exn>([&] (Exn& e) {
238 auto f2 = (*funcm)(e);
239 f2.setCallback_([pm](Try<T>&& t2) mutable {
240 pm->fulfilTry(std::move(t2));
242 } catch (const std::exception& e2) {
243 pm->setException(exception_wrapper(std::current_exception(), e2));
245 pm->setException(exception_wrapper(std::current_exception()));
248 pm->fulfilTry(std::move(t));
256 typename std::add_lvalue_reference<T>::type Future<T>::value() {
259 return core_->getTry().value();
263 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
266 return core_->getTry().value();
270 Try<T>& Future<T>::getTry() {
273 return core_->getTry();
277 template <typename Executor>
278 inline Future<T> Future<T>::via(Executor* executor) && {
282 core_->setExecutor(executor);
284 return std::move(*this);
288 template <typename Executor>
289 inline Future<T> Future<T>::via(Executor* executor) & {
292 MoveWrapper<Promise<T>> p;
293 auto f = p->getFuture();
294 then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
295 return std::move(f).via(executor);
299 bool Future<T>::isReady() const {
301 return core_->ready();
305 void Future<T>::raise(exception_wrapper exception) {
306 core_->raise(std::move(exception));
312 Future<typename std::decay<T>::type> makeFuture(T&& t) {
313 Promise<typename std::decay<T>::type> p;
314 p.setValue(std::forward<T>(t));
315 return p.getFuture();
318 inline // for multiple translation units
319 Future<void> makeFuture() {
322 return p.getFuture();
328 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
329 -> Future<decltype(func())> {
330 Promise<decltype(func())> p;
335 return p.getFuture();
339 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
341 return makeFutureTry(std::move(copy));
345 Future<T> makeFuture(std::exception_ptr const& e) {
348 return p.getFuture();
352 Future<T> makeFuture(exception_wrapper ew) {
354 p.setException(std::move(ew));
355 return p.getFuture();
358 template <class T, class E>
359 typename std::enable_if<std::is_base_of<std::exception, E>::value,
361 makeFuture(E const& e) {
363 p.setException(make_exception_wrapper<E>(e));
364 return p.getFuture();
368 Future<T> makeFuture(Try<T>&& t) {
369 Promise<typename std::decay<T>::type> p;
370 p.fulfilTry(std::move(t));
371 return p.getFuture();
375 inline Future<void> makeFuture(Try<void>&& t) {
376 if (t.hasException()) {
377 return makeFuture<void>(std::move(t.exception()));
384 template <typename Executor>
385 Future<void> via(Executor* executor) {
386 return makeFuture().via(executor);
391 template <typename... Fs>
392 typename detail::VariadicContext<
393 typename std::decay<Fs>::type::value_type...>::type
397 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
398 ctx->total = sizeof...(fs);
399 auto f_saved = ctx->p.getFuture();
400 detail::whenAllVariadicHelper(ctx,
401 std::forward<typename std::decay<Fs>::type>(fs)...);
407 template <class InputIterator>
410 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
411 whenAll(InputIterator first, InputIterator last)
414 typename std::iterator_traits<InputIterator>::value_type::value_type T;
417 return makeFuture(std::vector<Try<T>>());
419 size_t n = std::distance(first, last);
421 auto ctx = new detail::WhenAllContext<T>();
423 ctx->results.resize(n);
425 auto f_saved = ctx->p.getFuture();
427 for (size_t i = 0; first != last; ++first, ++i) {
430 f.setCallback_([ctx, i, n](Try<T>&& t) {
431 ctx->results[i] = std::move(t);
432 if (++ctx->count == n) {
433 ctx->p.setValue(std::move(ctx->results));
442 template <class InputIterator>
447 std::iterator_traits<InputIterator>::value_type::value_type> > >
448 whenAny(InputIterator first, InputIterator last) {
450 typename std::iterator_traits<InputIterator>::value_type::value_type T;
452 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
453 auto f_saved = ctx->p.getFuture();
455 for (size_t i = 0; first != last; first++, i++) {
457 f.setCallback_([i, ctx](Try<T>&& t) {
458 if (!ctx->done.exchange(true)) {
459 ctx->p.setValue(std::make_pair(i, std::move(t)));
468 template <class InputIterator>
469 Future<std::vector<std::pair<size_t, Try<typename
470 std::iterator_traits<InputIterator>::value_type::value_type>>>>
471 whenN(InputIterator first, InputIterator last, size_t n) {
473 std::iterator_traits<InputIterator>::value_type::value_type T;
474 typedef std::vector<std::pair<size_t, Try<T>>> V;
481 auto ctx = std::make_shared<ctx_t>();
484 // for each completed Future, increase count and add to vector, until we
485 // have n completed futures at which point we fulfil our Promise with the
490 it->then([ctx, n, i](Try<T>&& t) {
492 auto c = ++ctx->completed;
494 assert(ctx->v.size() < n);
495 v.push_back(std::make_pair(i, std::move(t)));
497 ctx->p.fulfilTry(Try<V>(std::move(v)));
507 ctx->p.setException(std::runtime_error("Not enough futures"));
510 return ctx->p.getFuture();
515 void getWaitHelper(Future<T>* f) {
516 // If we already have a value do the cheap thing
521 folly::Baton<> baton;
522 f->then([&](Try<T> const&) {
529 Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
530 // TODO make and use variadic whenAny #5877971
532 auto token = std::make_shared<std::atomic<bool>>();
533 folly::Baton<> baton;
535 folly::detail::getTimekeeperSingleton()->after(dur)
536 .then([&,token](Try<void> const& t) {
537 if (token->exchange(true) == false) {
538 if (t.hasException()) {
539 p.setException(std::move(t.exception()));
541 p.setException(TimedOut());
547 f->then([&, token](Try<T>&& t) {
548 if (token->exchange(true) == false) {
549 p.fulfilTry(std::move(t));
555 return p.getFuture();
563 // Big assumption here: the then() call above, since it doesn't move out
564 // the value, leaves us with a value to return here. This would be a big
565 // no-no in user code, but I'm invoking internal developer privilege. This
566 // is slightly more efficient (save a move()) especially if there's an
567 // exception (save a throw).
568 return std::move(value());
572 inline void Future<void>::get() {
578 T Future<T>::get(Duration dur) {
579 return std::move(getWaitTimeoutHelper(this, dur).value());
583 inline void Future<void>::get(Duration dur) {
584 getWaitTimeoutHelper(this, dur).value();
588 T Future<T>::getVia(DrivableExecutor* e) {
592 return std::move(value());
596 inline void Future<void>::getVia(DrivableExecutor* e) {
604 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
605 return within(dur, TimedOut(), tk);
610 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
613 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
616 std::atomic<bool> token;
618 auto ctx = std::make_shared<Context>(std::move(e));
621 tk = folly::detail::getTimekeeperSingleton();
625 .then([ctx](Try<void> const& t) {
626 if (ctx->token.exchange(true) == false) {
627 if (t.hasException()) {
628 ctx->promise.setException(std::move(t.exception()));
630 ctx->promise.setException(std::move(ctx->exception));
635 this->then([ctx](Try<T>&& t) {
636 if (ctx->token.exchange(true) == false) {
637 ctx->promise.fulfilTry(std::move(t));
641 return ctx->promise.getFuture();
645 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
646 return whenAll(*this, futures::sleep(dur, tk))
647 .then([](std::tuple<Try<T>, Try<void>> tup) {
648 Try<T>& t = std::get<0>(tup);
649 return makeFuture<T>(std::move(t));
654 Future<T> Future<T>::wait() {
656 auto done = then([&](Try<T> t) {
658 return makeFuture(std::move(t));
661 while (!done.isReady()) {
662 // There's a race here between the return here and the actual finishing of
663 // the future. f is completed, but the setup may not have finished on done
664 // after the baton has posted.
665 std::this_thread::yield();
671 Future<T> Future<T>::wait(Duration dur) {
672 auto baton = std::make_shared<Baton<>>();
673 auto done = then([baton](Try<T> t) {
675 return makeFuture(std::move(t));
677 // Let's preserve the invariant that if we did not timeout (timed_wait returns
678 // true), then the returned Future is complete when it is returned to the
679 // caller. We need to wait out the race for that Future to complete.
680 if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
681 while (!done.isReady()) {
682 std::this_thread::yield();
689 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
697 Future<T> Future<T>::waitVia(DrivableExecutor* e) && {
701 return std::move(*this);
706 // I haven't included a Future<T&> specialization because I don't forsee us
707 // using it, however it is not difficult to add when needed. Refer to
708 // Future<void> for guidance. std::future and boost::future code would also be