Merge commit '64f2f2734ad853784bdd260bcf31e065c47c0741' into fix-configure-pthread...
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
25
26 namespace folly {
27
28 class Timekeeper;
29
30 namespace detail {
31   Timekeeper* getTimekeeperSingleton();
32 }
33
34 template <class T>
35 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
36   *this = std::move(other);
37 }
38
39 template <class T>
40 Future<T>& Future<T>::operator=(Future<T>&& other) {
41   std::swap(core_, other.core_);
42   return *this;
43 }
44
45 template <class T>
46 template <class F>
47 Future<T>::Future(
48   const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
49     : core_(nullptr) {
50   Promise<F> p;
51   p.setValue(val);
52   *this = p.getFuture();
53 }
54
55 template <class T>
56 template <class F>
57 Future<T>::Future(
58   typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
59     : core_(nullptr) {
60   Promise<F> p;
61   p.setValue(std::forward<F>(val));
62   *this = p.getFuture();
63 }
64
65 template <>
66 template <class F,
67           typename std::enable_if<std::is_void<F>::value, int>::type>
68 Future<void>::Future() : core_(nullptr) {
69   Promise<void> p;
70   p.setValue();
71   *this = p.getFuture();
72 }
73
74
75 template <class T>
76 Future<T>::~Future() {
77   detach();
78 }
79
80 template <class T>
81 void Future<T>::detach() {
82   if (core_) {
83     core_->detachFuture();
84     core_ = nullptr;
85   }
86 }
87
88 template <class T>
89 void Future<T>::throwIfInvalid() const {
90   if (!core_)
91     throw NoState();
92 }
93
94 template <class T>
95 template <class F>
96 void Future<T>::setCallback_(F&& func) {
97   throwIfInvalid();
98   core_->setCallback(std::move(func));
99 }
100
101 // Variant: returns a value
102 // e.g. f.then([](Try<T>&& t){ return t.value(); });
103 template <class T>
104 template <typename F, typename R, bool isTry, typename... Args>
105 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
106 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
107   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
108   typedef typename R::ReturnsFuture::Inner B;
109
110   throwIfInvalid();
111
112   // wrap these so we can move them into the lambda
113   folly::MoveWrapper<Promise<B>> p;
114   folly::MoveWrapper<F> funcm(std::forward<F>(func));
115
116   // grab the Future now before we lose our handle on the Promise
117   auto f = p->getFuture();
118
119   /* This is a bit tricky.
120
121      We can't just close over *this in case this Future gets moved. So we
122      make a new dummy Future. We could figure out something more
123      sophisticated that avoids making a new Future object when it can, as an
124      optimization. But this is correct.
125
126      core_ can't be moved, it is explicitly disallowed (as is copying). But
127      if there's ever a reason to allow it, this is one place that makes that
128      assumption and would need to be fixed. We use a standard shared pointer
129      for core_ (by copying it in), which means in essence obj holds a shared
130      pointer to itself.  But this shouldn't leak because Promise will not
131      outlive the continuation, because Promise will setException() with a
132      broken Promise if it is destructed before completed. We could use a
133      weak pointer but it would have to be converted to a shared pointer when
134      func is executed (because the Future returned by func may possibly
135      persist beyond the callback, if it gets moved), and so it is an
136      optimization to just make it shared from the get-go.
137
138      We have to move in the Promise and func using the MoveWrapper
139      hack. (func could be copied but it's a big drag on perf).
140
141      Two subtle but important points about this design. detail::Core has no
142      back pointers to Future or Promise, so if Future or Promise get moved
143      (and they will be moved in performant code) we don't have to do
144      anything fancy. And because we store the continuation in the
145      detail::Core, not in the Future, we can execute the continuation even
146      after the Future has gone out of scope. This is an intentional design
147      decision. It is likely we will want to be able to cancel a continuation
148      in some circumstances, but I think it should be explicit not implicit
149      in the destruction of the Future used to create it.
150      */
151   setCallback_(
152     [p, funcm](Try<T>&& t) mutable {
153       if (!isTry && t.hasException()) {
154         p->setException(std::move(t.exception()));
155       } else {
156         p->fulfil([&]() {
157           return (*funcm)(t.template get<isTry, Args>()...);
158         });
159       }
160     });
161
162   return f;
163 }
164
165 // Variant: returns a Future
166 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
167 template <class T>
168 template <typename F, typename R, bool isTry, typename... Args>
169 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
170 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
171   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
172   typedef typename R::ReturnsFuture::Inner B;
173
174   throwIfInvalid();
175
176   // wrap these so we can move them into the lambda
177   folly::MoveWrapper<Promise<B>> p;
178   folly::MoveWrapper<F> funcm(std::forward<F>(func));
179
180   // grab the Future now before we lose our handle on the Promise
181   auto f = p->getFuture();
182
183   setCallback_(
184     [p, funcm](Try<T>&& t) mutable {
185       if (!isTry && t.hasException()) {
186         p->setException(std::move(t.exception()));
187       } else {
188         try {
189           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
190           // that didn't throw, now we can steal p
191           f2.setCallback_([p](Try<B>&& b) mutable {
192             p->fulfilTry(std::move(b));
193           });
194         } catch (const std::exception& e) {
195           p->setException(exception_wrapper(std::current_exception()));
196         }
197       }
198     });
199
200   return f;
201 }
202
203 template <typename T>
204 template <typename Caller, typename R, typename... Args>
205   Future<typename isFuture<R>::Inner>
206 Future<T>::then(Caller *instance, R(Caller::*func)(Args...)) {
207   typedef typename std::remove_cv<
208     typename std::remove_reference<
209       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
210   return then([instance, func](Try<T>&& t){
211     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
212   });
213 }
214
215 template <class T>
216 Future<void> Future<T>::then() {
217   return then([] (Try<T>&& t) {});
218 }
219
220 // onError where the callback returns T
221 template <class T>
222 template <class F>
223 typename std::enable_if<
224   !detail::Extract<F>::ReturnsFuture::value,
225   Future<T>>::type
226 Future<T>::onError(F&& func) {
227   typedef typename detail::Extract<F>::FirstArg Exn;
228   static_assert(
229       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
230       "Return type of onError callback must be T or Future<T>");
231
232   Promise<T> p;
233   auto f = p.getFuture();
234   auto pm = folly::makeMoveWrapper(std::move(p));
235   auto funcm = folly::makeMoveWrapper(std::move(func));
236   setCallback_([pm, funcm](Try<T>&& t) mutable {
237     if (!t.template withException<Exn>([&] (Exn& e) {
238           pm->fulfil([&]{
239             return (*funcm)(e);
240           });
241         })) {
242       pm->fulfilTry(std::move(t));
243     }
244   });
245
246   return f;
247 }
248
249 // onError where the callback returns Future<T>
250 template <class T>
251 template <class F>
252 typename std::enable_if<
253   detail::Extract<F>::ReturnsFuture::value,
254   Future<T>>::type
255 Future<T>::onError(F&& func) {
256   static_assert(
257       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
258       "Return type of onError callback must be T or Future<T>");
259   typedef typename detail::Extract<F>::FirstArg Exn;
260
261   Promise<T> p;
262   auto f = p.getFuture();
263   auto pm = folly::makeMoveWrapper(std::move(p));
264   auto funcm = folly::makeMoveWrapper(std::move(func));
265   setCallback_([pm, funcm](Try<T>&& t) mutable {
266     if (!t.template withException<Exn>([&] (Exn& e) {
267           try {
268             auto f2 = (*funcm)(e);
269             f2.setCallback_([pm](Try<T>&& t2) mutable {
270               pm->fulfilTry(std::move(t2));
271             });
272           } catch (const std::exception& e2) {
273             pm->setException(exception_wrapper(std::current_exception(), e2));
274           } catch (...) {
275             pm->setException(exception_wrapper(std::current_exception()));
276           }
277         })) {
278       pm->fulfilTry(std::move(t));
279     }
280   });
281
282   return f;
283 }
284
285 template <class T>
286 typename std::add_lvalue_reference<T>::type Future<T>::value() {
287   throwIfInvalid();
288
289   return core_->getTry().value();
290 }
291
292 template <class T>
293 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
294   throwIfInvalid();
295
296   return core_->getTry().value();
297 }
298
299 template <class T>
300 Try<T>& Future<T>::getTry() {
301   throwIfInvalid();
302
303   return core_->getTry();
304 }
305
306 template <class T>
307 template <typename Executor>
308 inline Future<T> Future<T>::via(Executor* executor) && {
309   throwIfInvalid();
310
311   this->deactivate();
312   core_->setExecutor(executor);
313
314   return std::move(*this);
315 }
316
317 template <class T>
318 template <typename Executor>
319 inline Future<T> Future<T>::via(Executor* executor) & {
320   throwIfInvalid();
321
322   MoveWrapper<Promise<T>> p;
323   auto f = p->getFuture();
324   then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
325   return std::move(f).via(executor);
326 }
327
328 template <class T>
329 bool Future<T>::isReady() const {
330   throwIfInvalid();
331   return core_->ready();
332 }
333
334 template <class T>
335 void Future<T>::raise(exception_wrapper exception) {
336   core_->raise(std::move(exception));
337 }
338
339 // makeFuture
340
341 template <class T>
342 Future<typename std::decay<T>::type> makeFuture(T&& t) {
343   Promise<typename std::decay<T>::type> p;
344   p.setValue(std::forward<T>(t));
345   return p.getFuture();
346 }
347
348 inline // for multiple translation units
349 Future<void> makeFuture() {
350   Promise<void> p;
351   p.setValue();
352   return p.getFuture();
353 }
354
355 template <class F>
356 auto makeFutureTry(
357     F&& func,
358     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
359     -> Future<decltype(func())> {
360   Promise<decltype(func())> p;
361   p.fulfil(
362     [&func]() {
363       return (func)();
364     });
365   return p.getFuture();
366 }
367
368 template <class F>
369 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
370   F copy = func;
371   return makeFutureTry(std::move(copy));
372 }
373
374 template <class T>
375 Future<T> makeFuture(std::exception_ptr const& e) {
376   Promise<T> p;
377   p.setException(e);
378   return p.getFuture();
379 }
380
381 template <class T>
382 Future<T> makeFuture(exception_wrapper ew) {
383   Promise<T> p;
384   p.setException(std::move(ew));
385   return p.getFuture();
386 }
387
388 template <class T, class E>
389 typename std::enable_if<std::is_base_of<std::exception, E>::value,
390                         Future<T>>::type
391 makeFuture(E const& e) {
392   Promise<T> p;
393   p.setException(make_exception_wrapper<E>(e));
394   return p.getFuture();
395 }
396
397 template <class T>
398 Future<T> makeFuture(Try<T>&& t) {
399   Promise<typename std::decay<T>::type> p;
400   p.fulfilTry(std::move(t));
401   return p.getFuture();
402 }
403
404 template <>
405 inline Future<void> makeFuture(Try<void>&& t) {
406   if (t.hasException()) {
407     return makeFuture<void>(std::move(t.exception()));
408   } else {
409     return makeFuture();
410   }
411 }
412
413 // via
414 template <typename Executor>
415 Future<void> via(Executor* executor) {
416   return makeFuture().via(executor);
417 }
418
419 // when (variadic)
420
421 template <typename... Fs>
422 typename detail::VariadicContext<
423   typename std::decay<Fs>::type::value_type...>::type
424 whenAll(Fs&&... fs)
425 {
426   auto ctx =
427     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
428   ctx->total = sizeof...(fs);
429   auto f_saved = ctx->p.getFuture();
430   detail::whenAllVariadicHelper(ctx,
431     std::forward<typename std::decay<Fs>::type>(fs)...);
432   return f_saved;
433 }
434
435 // when (iterator)
436
437 template <class InputIterator>
438 Future<
439   std::vector<
440   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
441 whenAll(InputIterator first, InputIterator last)
442 {
443   typedef
444     typename std::iterator_traits<InputIterator>::value_type::value_type T;
445
446   if (first >= last) {
447     return makeFuture(std::vector<Try<T>>());
448   }
449   size_t n = std::distance(first, last);
450
451   auto ctx = new detail::WhenAllContext<T>();
452
453   ctx->results.resize(n);
454
455   auto f_saved = ctx->p.getFuture();
456
457   for (size_t i = 0; first != last; ++first, ++i) {
458      assert(i < n);
459      auto& f = *first;
460      f.setCallback_([ctx, i, n](Try<T>&& t) {
461          ctx->results[i] = std::move(t);
462          if (++ctx->count == n) {
463            ctx->p.setValue(std::move(ctx->results));
464            delete ctx;
465          }
466        });
467   }
468
469   return f_saved;
470 }
471
472 template <class InputIterator>
473 Future<
474   std::pair<size_t,
475             Try<
476               typename
477               std::iterator_traits<InputIterator>::value_type::value_type> > >
478 whenAny(InputIterator first, InputIterator last) {
479   typedef
480     typename std::iterator_traits<InputIterator>::value_type::value_type T;
481
482   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
483   auto f_saved = ctx->p.getFuture();
484
485   for (size_t i = 0; first != last; first++, i++) {
486     auto& f = *first;
487     f.setCallback_([i, ctx](Try<T>&& t) {
488       if (!ctx->done.exchange(true)) {
489         ctx->p.setValue(std::make_pair(i, std::move(t)));
490       }
491       ctx->decref();
492     });
493   }
494
495   return f_saved;
496 }
497
498 template <class InputIterator>
499 Future<std::vector<std::pair<size_t, Try<typename
500   std::iterator_traits<InputIterator>::value_type::value_type>>>>
501 whenN(InputIterator first, InputIterator last, size_t n) {
502   typedef typename
503     std::iterator_traits<InputIterator>::value_type::value_type T;
504   typedef std::vector<std::pair<size_t, Try<T>>> V;
505
506   struct ctx_t {
507     V v;
508     size_t completed;
509     Promise<V> p;
510   };
511   auto ctx = std::make_shared<ctx_t>();
512   ctx->completed = 0;
513
514   // for each completed Future, increase count and add to vector, until we
515   // have n completed futures at which point we fulfil our Promise with the
516   // vector
517   auto it = first;
518   size_t i = 0;
519   while (it != last) {
520     it->then([ctx, n, i](Try<T>&& t) {
521       auto& v = ctx->v;
522       auto c = ++ctx->completed;
523       if (c <= n) {
524         assert(ctx->v.size() < n);
525         v.push_back(std::make_pair(i, std::move(t)));
526         if (c == n) {
527           ctx->p.fulfilTry(Try<V>(std::move(v)));
528         }
529       }
530     });
531
532     it++;
533     i++;
534   }
535
536   if (i < n) {
537     ctx->p.setException(std::runtime_error("Not enough futures"));
538   }
539
540   return ctx->p.getFuture();
541 }
542
543 namespace {
544   template <class T>
545   void getWaitHelper(Future<T>* f) {
546     // If we already have a value do the cheap thing
547     if (f->isReady()) {
548       return;
549     }
550
551     folly::Baton<> baton;
552     f->then([&](Try<T> const&) {
553       baton.post();
554     });
555     baton.wait();
556   }
557
558   template <class T>
559   Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
560     // TODO make and use variadic whenAny #5877971
561     Promise<T> p;
562     auto token = std::make_shared<std::atomic<bool>>();
563     folly::Baton<> baton;
564
565     folly::detail::getTimekeeperSingleton()->after(dur)
566       .then([&,token](Try<void> const& t) {
567         if (token->exchange(true) == false) {
568           if (t.hasException()) {
569             p.setException(std::move(t.exception()));
570           } else {
571             p.setException(TimedOut());
572           }
573           baton.post();
574         }
575       });
576
577     f->then([&, token](Try<T>&& t) {
578       if (token->exchange(true) == false) {
579         p.fulfilTry(std::move(t));
580         baton.post();
581       }
582     });
583
584     baton.wait();
585     return p.getFuture();
586   }
587 }
588
589 template <class T>
590 T Future<T>::get() {
591   getWaitHelper(this);
592
593   // Big assumption here: the then() call above, since it doesn't move out
594   // the value, leaves us with a value to return here. This would be a big
595   // no-no in user code, but I'm invoking internal developer privilege. This
596   // is slightly more efficient (save a move()) especially if there's an
597   // exception (save a throw).
598   return std::move(value());
599 }
600
601 template <>
602 inline void Future<void>::get() {
603   getWaitHelper(this);
604   value();
605 }
606
607 template <class T>
608 T Future<T>::get(Duration dur) {
609   return std::move(getWaitTimeoutHelper(this, dur).value());
610 }
611
612 template <>
613 inline void Future<void>::get(Duration dur) {
614   getWaitTimeoutHelper(this, dur).value();
615 }
616
617 template <class T>
618 T Future<T>::getVia(DrivableExecutor* e) {
619   while (!isReady()) {
620     e->drive();
621   }
622   return std::move(value());
623 }
624
625 template <>
626 inline void Future<void>::getVia(DrivableExecutor* e) {
627   while (!isReady()) {
628     e->drive();
629   }
630   value();
631 }
632
633 template <class T>
634 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
635   return within(dur, TimedOut(), tk);
636 }
637
638 template <class T>
639 template <class E>
640 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
641
642   struct Context {
643     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
644     E exception;
645     Promise<T> promise;
646     std::atomic<bool> token;
647   };
648   auto ctx = std::make_shared<Context>(std::move(e));
649
650   if (!tk) {
651     tk = folly::detail::getTimekeeperSingleton();
652   }
653
654   tk->after(dur)
655     .then([ctx](Try<void> const& t) {
656       if (ctx->token.exchange(true) == false) {
657         if (t.hasException()) {
658           ctx->promise.setException(std::move(t.exception()));
659         } else {
660           ctx->promise.setException(std::move(ctx->exception));
661         }
662       }
663     });
664
665   this->then([ctx](Try<T>&& t) {
666     if (ctx->token.exchange(true) == false) {
667       ctx->promise.fulfilTry(std::move(t));
668     }
669   });
670
671   return ctx->promise.getFuture();
672 }
673
674 template <class T>
675 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
676   return whenAll(*this, futures::sleep(dur, tk))
677     .then([](std::tuple<Try<T>, Try<void>> tup) {
678       Try<T>& t = std::get<0>(tup);
679       return makeFuture<T>(std::move(t));
680     });
681 }
682
683 template <class T>
684 Future<T> Future<T>::wait() {
685   Baton<> baton;
686   auto done = then([&](Try<T> t) {
687     baton.post();
688     return makeFuture(std::move(t));
689   });
690   baton.wait();
691   while (!done.isReady()) {
692     // There's a race here between the return here and the actual finishing of
693     // the future. f is completed, but the setup may not have finished on done
694     // after the baton has posted.
695     std::this_thread::yield();
696   }
697   return done;
698 }
699
700 template <class T>
701 Future<T> Future<T>::wait(Duration dur) {
702   auto baton = std::make_shared<Baton<>>();
703   auto done = then([baton](Try<T> t) {
704     baton->post();
705     return makeFuture(std::move(t));
706   });
707   // Let's preserve the invariant that if we did not timeout (timed_wait returns
708   // true), then the returned Future is complete when it is returned to the
709   // caller. We need to wait out the race for that Future to complete.
710   if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
711     while (!done.isReady()) {
712       std::this_thread::yield();
713     }
714   }
715   return done;
716 }
717
718 template <class T>
719 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
720   while (!isReady()) {
721     e->drive();
722   }
723   return *this;
724 }
725
726 template <class T>
727 Future<T> Future<T>::waitVia(DrivableExecutor* e) && {
728   while (!isReady()) {
729     e->drive();
730   }
731   return std::move(*this);
732 }
733
734 }
735
736 // I haven't included a Future<T&> specialization because I don't forsee us
737 // using it, however it is not difficult to add when needed. Refer to
738 // Future<void> for guidance. std::future and boost::future code would also be
739 // instructive.