Future: some fixes re: handling of universal references
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
33 #else
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
36 #endif
37
38 namespace folly {
39
40 class Timekeeper;
41
42 namespace detail {
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
45 #else
46 typedef folly::Baton<> FutureBatonType;
47 #endif
48 }
49
50 namespace detail {
51   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52 }
53
54 template <class T>
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56   other.core_ = nullptr;
57 }
58
59 template <class T>
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61   std::swap(core_, other.core_);
62   return *this;
63 }
64
65 template <class T>
66 template <class T2, typename>
67 Future<T>::Future(T2&& val)
68     : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
69
70 template <class T>
71 template <typename T2>
72 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
73     : core_(new detail::Core<T>(Try<T>(T()))) {}
74
75 template <class T>
76 Future<T>::~Future() {
77   detach();
78 }
79
80 template <class T>
81 void Future<T>::detach() {
82   if (core_) {
83     core_->detachFuture();
84     core_ = nullptr;
85   }
86 }
87
88 template <class T>
89 void Future<T>::throwIfInvalid() const {
90   if (!core_)
91     throw NoState();
92 }
93
94 template <class T>
95 template <class F>
96 void Future<T>::setCallback_(F&& func) {
97   throwIfInvalid();
98   core_->setCallback(std::forward<F>(func));
99 }
100
101 // unwrap
102
103 template <class T>
104 template <class F>
105 typename std::enable_if<isFuture<F>::value,
106                         Future<typename isFuture<T>::Inner>>::type
107 Future<T>::unwrap() {
108   return then([](Future<typename isFuture<T>::Inner> internal_future) {
109       return internal_future;
110   });
111 }
112
113 // then
114
115 // Variant: returns a value
116 // e.g. f.then([](Try<T>&& t){ return t.value(); });
117 template <class T>
118 template <typename F, typename R, bool isTry, typename... Args>
119 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
120 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
121   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
122   typedef typename R::ReturnsFuture::Inner B;
123
124   throwIfInvalid();
125
126   Promise<B> p;
127   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
128
129   // grab the Future now before we lose our handle on the Promise
130   auto f = p.getFuture();
131   f.core_->setExecutorNoLock(getExecutor());
132
133   /* This is a bit tricky.
134
135      We can't just close over *this in case this Future gets moved. So we
136      make a new dummy Future. We could figure out something more
137      sophisticated that avoids making a new Future object when it can, as an
138      optimization. But this is correct.
139
140      core_ can't be moved, it is explicitly disallowed (as is copying). But
141      if there's ever a reason to allow it, this is one place that makes that
142      assumption and would need to be fixed. We use a standard shared pointer
143      for core_ (by copying it in), which means in essence obj holds a shared
144      pointer to itself.  But this shouldn't leak because Promise will not
145      outlive the continuation, because Promise will setException() with a
146      broken Promise if it is destructed before completed. We could use a
147      weak pointer but it would have to be converted to a shared pointer when
148      func is executed (because the Future returned by func may possibly
149      persist beyond the callback, if it gets moved), and so it is an
150      optimization to just make it shared from the get-go.
151
152      Two subtle but important points about this design. detail::Core has no
153      back pointers to Future or Promise, so if Future or Promise get moved
154      (and they will be moved in performant code) we don't have to do
155      anything fancy. And because we store the continuation in the
156      detail::Core, not in the Future, we can execute the continuation even
157      after the Future has gone out of scope. This is an intentional design
158      decision. It is likely we will want to be able to cancel a continuation
159      in some circumstances, but I think it should be explicit not implicit
160      in the destruction of the Future used to create it.
161      */
162   setCallback_(
163       [ func = std::forward<F>(func), pm = std::move(p) ](Try<T> && t) mutable {
164         if (!isTry && t.hasException()) {
165           pm.setException(std::move(t.exception()));
166         } else {
167           pm.setWith([&]() {
168             return std::move(func)(t.template get<isTry, Args>()...);
169           });
170         }
171       });
172
173   return f;
174 }
175
176 // Variant: returns a Future
177 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
178 template <class T>
179 template <typename F, typename R, bool isTry, typename... Args>
180 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
181 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
182   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
183   typedef typename R::ReturnsFuture::Inner B;
184
185   throwIfInvalid();
186
187   Promise<B> p;
188   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
189
190   // grab the Future now before we lose our handle on the Promise
191   auto f = p.getFuture();
192   f.core_->setExecutorNoLock(getExecutor());
193
194   setCallback_([ func = std::forward<F>(func), pm = std::move(p) ](
195       Try<T> && t) mutable {
196     auto ew = [&] {
197       if (!isTry && t.hasException()) {
198         return std::move(t.exception());
199       } else {
200         try {
201           auto f2 = std::move(func)(t.template get<isTry, Args>()...);
202           // that didn't throw, now we can steal p
203           f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
204             p.setTry(std::move(b));
205           });
206           return exception_wrapper();
207         } catch (const std::exception& e) {
208           return exception_wrapper(std::current_exception(), e);
209         } catch (...) {
210           return exception_wrapper(std::current_exception());
211         }
212       }
213     }();
214     if (ew) {
215       pm.setException(std::move(ew));
216     }
217   });
218
219   return f;
220 }
221
222 template <typename T>
223 template <typename R, typename Caller, typename... Args>
224   Future<typename isFuture<R>::Inner>
225 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
226   typedef typename std::remove_cv<
227     typename std::remove_reference<
228       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
229   return then([instance, func](Try<T>&& t){
230     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
231   });
232 }
233
234 template <class T>
235 template <class Executor, class Arg, class... Args>
236 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
237   -> decltype(this->then(std::forward<Arg>(arg),
238                          std::forward<Args>(args)...))
239 {
240   auto oldX = getExecutor();
241   setExecutor(x);
242   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
243                via(oldX);
244 }
245
246 template <class T>
247 Future<Unit> Future<T>::then() {
248   return then([] () {});
249 }
250
251 // onError where the callback returns T
252 template <class T>
253 template <class F>
254 typename std::enable_if<
255   !detail::callableWith<F, exception_wrapper>::value &&
256   !detail::Extract<F>::ReturnsFuture::value,
257   Future<T>>::type
258 Future<T>::onError(F&& func) {
259   typedef typename detail::Extract<F>::FirstArg Exn;
260   static_assert(
261       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
262       "Return type of onError callback must be T or Future<T>");
263
264   Promise<T> p;
265   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
266   auto f = p.getFuture();
267
268   setCallback_(
269       [ func = std::forward<F>(func), pm = std::move(p) ](Try<T> && t) mutable {
270         if (!t.template withException<Exn>([&](Exn& e) {
271               pm.setWith([&] { return std::move(func)(e); });
272             })) {
273           pm.setTry(std::move(t));
274         }
275       });
276
277   return f;
278 }
279
280 // onError where the callback returns Future<T>
281 template <class T>
282 template <class F>
283 typename std::enable_if<
284   !detail::callableWith<F, exception_wrapper>::value &&
285   detail::Extract<F>::ReturnsFuture::value,
286   Future<T>>::type
287 Future<T>::onError(F&& func) {
288   static_assert(
289       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
290       "Return type of onError callback must be T or Future<T>");
291   typedef typename detail::Extract<F>::FirstArg Exn;
292
293   Promise<T> p;
294   auto f = p.getFuture();
295
296   setCallback_([ pm = std::move(p), func = std::forward<F>(func) ](
297       Try<T> && t) mutable {
298     if (!t.template withException<Exn>([&](Exn& e) {
299           auto ew = [&] {
300             try {
301               auto f2 = std::move(func)(e);
302               f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
303                 pm.setTry(std::move(t2));
304               });
305               return exception_wrapper();
306             } catch (const std::exception& e2) {
307               return exception_wrapper(std::current_exception(), e2);
308             } catch (...) {
309               return exception_wrapper(std::current_exception());
310             }
311           }();
312           if (ew) {
313             pm.setException(std::move(ew));
314           }
315         })) {
316       pm.setTry(std::move(t));
317     }
318   });
319
320   return f;
321 }
322
323 template <class T>
324 template <class F>
325 Future<T> Future<T>::ensure(F&& func) {
326   return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
327     std::move(funcw)();
328     return makeFuture(std::move(t));
329   });
330 }
331
332 template <class T>
333 template <class F>
334 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
335   return within(dur, tk).onError([funcw = std::forward<F>(func)](
336       TimedOut const&) { return std::move(funcw)(); });
337 }
338
339 template <class T>
340 template <class F>
341 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
342                             detail::Extract<F>::ReturnsFuture::value,
343                         Future<T>>::type
344 Future<T>::onError(F&& func) {
345   static_assert(
346       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
347       "Return type of onError callback must be T or Future<T>");
348
349   Promise<T> p;
350   auto f = p.getFuture();
351   setCallback_(
352       [ pm = std::move(p), func = std::forward<F>(func) ](Try<T> t) mutable {
353         if (t.hasException()) {
354           auto ew = [&] {
355             try {
356               auto f2 = std::move(func)(std::move(t.exception()));
357               f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
358                 pm.setTry(std::move(t2));
359               });
360               return exception_wrapper();
361             } catch (const std::exception& e2) {
362               return exception_wrapper(std::current_exception(), e2);
363             } catch (...) {
364               return exception_wrapper(std::current_exception());
365             }
366           }();
367           if (ew) {
368             pm.setException(std::move(ew));
369           }
370         } else {
371           pm.setTry(std::move(t));
372         }
373       });
374
375   return f;
376 }
377
378 // onError(exception_wrapper) that returns T
379 template <class T>
380 template <class F>
381 typename std::enable_if<
382   detail::callableWith<F, exception_wrapper>::value &&
383   !detail::Extract<F>::ReturnsFuture::value,
384   Future<T>>::type
385 Future<T>::onError(F&& func) {
386   static_assert(
387       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
388       "Return type of onError callback must be T or Future<T>");
389
390   Promise<T> p;
391   auto f = p.getFuture();
392   setCallback_(
393       [ pm = std::move(p), func = std::forward<F>(func) ](Try<T> t) mutable {
394         if (t.hasException()) {
395           pm.setWith([&] { return std::move(func)(std::move(t.exception())); });
396         } else {
397           pm.setTry(std::move(t));
398         }
399       });
400
401   return f;
402 }
403
404 template <class T>
405 typename std::add_lvalue_reference<T>::type Future<T>::value() {
406   throwIfInvalid();
407
408   return core_->getTry().value();
409 }
410
411 template <class T>
412 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
413   throwIfInvalid();
414
415   return core_->getTry().value();
416 }
417
418 template <class T>
419 Try<T>& Future<T>::getTry() {
420   throwIfInvalid();
421
422   return core_->getTry();
423 }
424
425 template <class T>
426 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
427   return waitVia(e).getTry();
428 }
429
430 template <class T>
431 Optional<Try<T>> Future<T>::poll() {
432   Optional<Try<T>> o;
433   if (core_->ready()) {
434     o = std::move(core_->getTry());
435   }
436   return o;
437 }
438
439 template <class T>
440 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
441   throwIfInvalid();
442
443   setExecutor(executor, priority);
444
445   return std::move(*this);
446 }
447
448 template <class T>
449 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
450   throwIfInvalid();
451
452   Promise<T> p;
453   auto f = p.getFuture();
454   then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
455   return std::move(f).via(executor, priority);
456 }
457
458 template <class Func>
459 auto via(Executor* x, Func&& func)
460     -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
461   // TODO make this actually more performant. :-P #7260175
462   return via(x).then(std::forward<Func>(func));
463 }
464
465 template <class T>
466 bool Future<T>::isReady() const {
467   throwIfInvalid();
468   return core_->ready();
469 }
470
471 template <class T>
472 bool Future<T>::hasValue() {
473   return getTry().hasValue();
474 }
475
476 template <class T>
477 bool Future<T>::hasException() {
478   return getTry().hasException();
479 }
480
481 template <class T>
482 void Future<T>::raise(exception_wrapper exception) {
483   core_->raise(std::move(exception));
484 }
485
486 // makeFuture
487
488 template <class T>
489 Future<typename std::decay<T>::type> makeFuture(T&& t) {
490   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
491 }
492
493 inline // for multiple translation units
494 Future<Unit> makeFuture() {
495   return makeFuture(Unit{});
496 }
497
498 // makeFutureWith(Future<T>()) -> Future<T>
499 template <class F>
500 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
501                         typename std::result_of<F()>::type>::type
502 makeFutureWith(F&& func) {
503   using InnerType =
504       typename isFuture<typename std::result_of<F()>::type>::Inner;
505   try {
506     return std::forward<F>(func)();
507   } catch (std::exception& e) {
508     return makeFuture<InnerType>(
509         exception_wrapper(std::current_exception(), e));
510   } catch (...) {
511     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
512   }
513 }
514
515 // makeFutureWith(T()) -> Future<T>
516 // makeFutureWith(void()) -> Future<Unit>
517 template <class F>
518 typename std::enable_if<
519     !(isFuture<typename std::result_of<F()>::type>::value),
520     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
521 makeFutureWith(F&& func) {
522   using LiftedResult =
523       typename Unit::Lift<typename std::result_of<F()>::type>::type;
524   return makeFuture<LiftedResult>(
525       makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
526 }
527
528 template <class T>
529 Future<T> makeFuture(std::exception_ptr const& e) {
530   return makeFuture(Try<T>(e));
531 }
532
533 template <class T>
534 Future<T> makeFuture(exception_wrapper ew) {
535   return makeFuture(Try<T>(std::move(ew)));
536 }
537
538 template <class T, class E>
539 typename std::enable_if<std::is_base_of<std::exception, E>::value,
540                         Future<T>>::type
541 makeFuture(E const& e) {
542   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
543 }
544
545 template <class T>
546 Future<T> makeFuture(Try<T>&& t) {
547   return Future<T>(new detail::Core<T>(std::move(t)));
548 }
549
550 // via
551 Future<Unit> via(Executor* executor, int8_t priority) {
552   return makeFuture().via(executor, priority);
553 }
554
555 // mapSetCallback calls func(i, Try<T>) when every future completes
556
557 template <class T, class InputIterator, class F>
558 void mapSetCallback(InputIterator first, InputIterator last, F func) {
559   for (size_t i = 0; first != last; ++first, ++i) {
560     first->setCallback_([func, i](Try<T>&& t) {
561       func(i, std::move(t));
562     });
563   }
564 }
565
566 // collectAll (variadic)
567
568 template <typename... Fs>
569 typename detail::CollectAllVariadicContext<
570   typename std::decay<Fs>::type::value_type...>::type
571 collectAll(Fs&&... fs) {
572   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
573     typename std::decay<Fs>::type::value_type...>>();
574   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
575       ctx, std::forward<Fs>(fs)...);
576   return ctx->p.getFuture();
577 }
578
579 // collectAll (iterator)
580
581 template <class InputIterator>
582 Future<
583   std::vector<
584   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
585 collectAll(InputIterator first, InputIterator last) {
586   typedef
587     typename std::iterator_traits<InputIterator>::value_type::value_type T;
588
589   struct CollectAllContext {
590     CollectAllContext(size_t n) : results(n) {}
591     ~CollectAllContext() {
592       p.setValue(std::move(results));
593     }
594     Promise<std::vector<Try<T>>> p;
595     std::vector<Try<T>> results;
596   };
597
598   auto ctx =
599       std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
600   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
601     ctx->results[i] = std::move(t);
602   });
603   return ctx->p.getFuture();
604 }
605
606 // collect (iterator)
607
608 namespace detail {
609
610 template <typename T>
611 struct CollectContext {
612   struct Nothing {
613     explicit Nothing(int /* n */) {}
614   };
615
616   using Result = typename std::conditional<
617     std::is_void<T>::value,
618     void,
619     std::vector<T>>::type;
620
621   using InternalResult = typename std::conditional<
622     std::is_void<T>::value,
623     Nothing,
624     std::vector<Optional<T>>>::type;
625
626   explicit CollectContext(size_t n) : result(n) {}
627   ~CollectContext() {
628     if (!threw.exchange(true)) {
629       // map Optional<T> -> T
630       std::vector<T> finalResult;
631       finalResult.reserve(result.size());
632       std::transform(result.begin(), result.end(),
633                      std::back_inserter(finalResult),
634                      [](Optional<T>& o) { return std::move(o.value()); });
635       p.setValue(std::move(finalResult));
636     }
637   }
638   inline void setPartialResult(size_t i, Try<T>& t) {
639     result[i] = std::move(t.value());
640   }
641   Promise<Result> p;
642   InternalResult result;
643   std::atomic<bool> threw {false};
644 };
645
646 }
647
648 template <class InputIterator>
649 Future<typename detail::CollectContext<
650   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
651 collect(InputIterator first, InputIterator last) {
652   typedef
653     typename std::iterator_traits<InputIterator>::value_type::value_type T;
654
655   auto ctx = std::make_shared<detail::CollectContext<T>>(
656     std::distance(first, last));
657   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
658     if (t.hasException()) {
659        if (!ctx->threw.exchange(true)) {
660          ctx->p.setException(std::move(t.exception()));
661        }
662      } else if (!ctx->threw) {
663        ctx->setPartialResult(i, t);
664      }
665   });
666   return ctx->p.getFuture();
667 }
668
669 // collect (variadic)
670
671 template <typename... Fs>
672 typename detail::CollectVariadicContext<
673   typename std::decay<Fs>::type::value_type...>::type
674 collect(Fs&&... fs) {
675   auto ctx = std::make_shared<detail::CollectVariadicContext<
676     typename std::decay<Fs>::type::value_type...>>();
677   detail::collectVariadicHelper<detail::CollectVariadicContext>(
678       ctx, std::forward<Fs>(fs)...);
679   return ctx->p.getFuture();
680 }
681
682 // collectAny (iterator)
683
684 template <class InputIterator>
685 Future<
686   std::pair<size_t,
687             Try<
688               typename
689               std::iterator_traits<InputIterator>::value_type::value_type>>>
690 collectAny(InputIterator first, InputIterator last) {
691   typedef
692     typename std::iterator_traits<InputIterator>::value_type::value_type T;
693
694   struct CollectAnyContext {
695     CollectAnyContext() {}
696     Promise<std::pair<size_t, Try<T>>> p;
697     std::atomic<bool> done {false};
698   };
699
700   auto ctx = std::make_shared<CollectAnyContext>();
701   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
702     if (!ctx->done.exchange(true)) {
703       ctx->p.setValue(std::make_pair(i, std::move(t)));
704     }
705   });
706   return ctx->p.getFuture();
707 }
708
709 // collectAnyWithoutException (iterator)
710
711 template <class InputIterator>
712 Future<std::pair<
713     size_t,
714     typename std::iterator_traits<InputIterator>::value_type::value_type>>
715 collectAnyWithoutException(InputIterator first, InputIterator last) {
716   typedef
717       typename std::iterator_traits<InputIterator>::value_type::value_type T;
718
719   struct CollectAnyWithoutExceptionContext {
720     CollectAnyWithoutExceptionContext(){}
721     Promise<std::pair<size_t, T>> p;
722     std::atomic<bool> done{false};
723     std::atomic<size_t> nFulfilled{0};
724     size_t nTotal;
725   };
726
727   auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
728   ctx->nTotal = size_t(std::distance(first, last));
729
730   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
731     if (!t.hasException() && !ctx->done.exchange(true)) {
732       ctx->p.setValue(std::make_pair(i, std::move(t.value())));
733     } else if (++ctx->nFulfilled == ctx->nTotal) {
734       ctx->p.setException(t.exception());
735     }
736   });
737   return ctx->p.getFuture();
738 }
739
740 // collectN (iterator)
741
742 template <class InputIterator>
743 Future<std::vector<std::pair<size_t, Try<typename
744   std::iterator_traits<InputIterator>::value_type::value_type>>>>
745 collectN(InputIterator first, InputIterator last, size_t n) {
746   typedef typename
747     std::iterator_traits<InputIterator>::value_type::value_type T;
748   typedef std::vector<std::pair<size_t, Try<T>>> V;
749
750   struct CollectNContext {
751     V v;
752     std::atomic<size_t> completed = {0};
753     Promise<V> p;
754   };
755   auto ctx = std::make_shared<CollectNContext>();
756
757   if (size_t(std::distance(first, last)) < n) {
758     ctx->p.setException(std::runtime_error("Not enough futures"));
759   } else {
760     // for each completed Future, increase count and add to vector, until we
761     // have n completed futures at which point we fulfil our Promise with the
762     // vector
763     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
764       auto c = ++ctx->completed;
765       if (c <= n) {
766         assert(ctx->v.size() < n);
767         ctx->v.emplace_back(i, std::move(t));
768         if (c == n) {
769           ctx->p.setTry(Try<V>(std::move(ctx->v)));
770         }
771       }
772     });
773   }
774
775   return ctx->p.getFuture();
776 }
777
778 // reduce (iterator)
779
780 template <class It, class T, class F>
781 Future<T> reduce(It first, It last, T&& initial, F&& func) {
782   if (first == last) {
783     return makeFuture(std::move(initial));
784   }
785
786   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
787   typedef
788       typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
789                                 Try<ItT>,
790                                 ItT>::type Arg;
791   typedef isTry<Arg> IsTry;
792
793   auto sfunc = std::make_shared<F>(std::move(func));
794
795   auto f = first->then(
796       [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
797         return (*sfunc)(
798             std::move(minitial), head.template get<IsTry::value, Arg&&>());
799       });
800
801   for (++first; first != last; ++first) {
802     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
803       return (*sfunc)(std::move(std::get<0>(t).value()),
804                   // Either return a ItT&& or a Try<ItT>&& depending
805                   // on the type of the argument of func.
806                   std::get<1>(t).template get<IsTry::value, Arg&&>());
807     });
808   }
809
810   return f;
811 }
812
813 // window (collection)
814
815 template <class Collection, class F, class ItT, class Result>
816 std::vector<Future<Result>>
817 window(Collection input, F func, size_t n) {
818   struct WindowContext {
819     WindowContext(Collection&& i, F&& fn)
820         : input_(std::move(i)), promises_(input_.size()),
821           func_(std::move(fn))
822       {}
823     std::atomic<size_t> i_ {0};
824     Collection input_;
825     std::vector<Promise<Result>> promises_;
826     F func_;
827
828     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
829       size_t i = ctx->i_++;
830       if (i < ctx->input_.size()) {
831         // Using setCallback_ directly since we don't need the Future
832         ctx->func_(std::move(ctx->input_[i])).setCallback_(
833           // ctx is captured by value
834           [ctx, i](Try<Result>&& t) {
835             ctx->promises_[i].setTry(std::move(t));
836             // Chain another future onto this one
837             spawn(std::move(ctx));
838           });
839       }
840     }
841   };
842
843   auto max = std::min(n, input.size());
844
845   auto ctx = std::make_shared<WindowContext>(
846     std::move(input), std::move(func));
847
848   for (size_t i = 0; i < max; ++i) {
849     // Start the first n Futures
850     WindowContext::spawn(ctx);
851   }
852
853   std::vector<Future<Result>> futures;
854   futures.reserve(ctx->promises_.size());
855   for (auto& promise : ctx->promises_) {
856     futures.emplace_back(promise.getFuture());
857   }
858
859   return futures;
860 }
861
862 // reduce
863
864 template <class T>
865 template <class I, class F>
866 Future<I> Future<T>::reduce(I&& initial, F&& func) {
867   return then([
868     minitial = std::forward<I>(initial),
869     mfunc = std::forward<F>(func)
870   ](T& vals) mutable {
871     auto ret = std::move(minitial);
872     for (auto& val : vals) {
873       ret = mfunc(std::move(ret), std::move(val));
874     }
875     return ret;
876   });
877 }
878
879 // unorderedReduce (iterator)
880
881 template <class It, class T, class F, class ItT, class Arg>
882 Future<T> unorderedReduce(It first, It last, T initial, F func) {
883   if (first == last) {
884     return makeFuture(std::move(initial));
885   }
886
887   typedef isTry<Arg> IsTry;
888
889   struct UnorderedReduceContext {
890     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
891         : lock_(), memo_(makeFuture<T>(std::move(memo))),
892           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
893       {}
894     folly::MicroSpinLock lock_; // protects memo_ and numThens_
895     Future<T> memo_;
896     F func_;
897     size_t numThens_; // how many Futures completed and called .then()
898     size_t numFutures_; // how many Futures in total
899     Promise<T> promise_;
900   };
901
902   auto ctx = std::make_shared<UnorderedReduceContext>(
903     std::move(initial), std::move(func), std::distance(first, last));
904
905   mapSetCallback<ItT>(
906       first,
907       last,
908       [ctx](size_t /* i */, Try<ItT>&& t) {
909         // Futures can be completed in any order, simultaneously.
910         // To make this non-blocking, we create a new Future chain in
911         // the order of completion to reduce the values.
912         // The spinlock just protects chaining a new Future, not actually
913         // executing the reduce, which should be really fast.
914         folly::MSLGuard lock(ctx->lock_);
915         ctx->memo_ =
916             ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
917               // Either return a ItT&& or a Try<ItT>&& depending
918               // on the type of the argument of func.
919               return ctx->func_(std::move(v),
920                                 mt.template get<IsTry::value, Arg&&>());
921             });
922         if (++ctx->numThens_ == ctx->numFutures_) {
923           // After reducing the value of the last Future, fulfill the Promise
924           ctx->memo_.setCallback_(
925               [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
926         }
927       });
928
929   return ctx->promise_.getFuture();
930 }
931
932 // within
933
934 template <class T>
935 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
936   return within(dur, TimedOut(), tk);
937 }
938
939 template <class T>
940 template <class E>
941 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
942
943   struct Context {
944     Context(E ex) : exception(std::move(ex)), promise() {}
945     E exception;
946     Future<Unit> thisFuture;
947     Promise<T> promise;
948     std::atomic<bool> token {false};
949   };
950
951   std::shared_ptr<Timekeeper> tks;
952   if (!tk) {
953     tks = folly::detail::getTimekeeperSingleton();
954     tk = DCHECK_NOTNULL(tks.get());
955   }
956
957   auto ctx = std::make_shared<Context>(std::move(e));
958
959   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
960     // TODO: "this" completed first, cancel "after"
961     if (ctx->token.exchange(true) == false) {
962       ctx->promise.setTry(std::move(t));
963     }
964   });
965
966   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
967     // "after" completed first, cancel "this"
968     ctx->thisFuture.raise(TimedOut());
969     if (ctx->token.exchange(true) == false) {
970       if (t.hasException()) {
971         ctx->promise.setException(std::move(t.exception()));
972       } else {
973         ctx->promise.setException(std::move(ctx->exception));
974       }
975     }
976   });
977
978   return ctx->promise.getFuture().via(getExecutor());
979 }
980
981 // delayed
982
983 template <class T>
984 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
985   return collectAll(*this, futures::sleep(dur, tk))
986     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
987       Try<T>& t = std::get<0>(tup);
988       return makeFuture<T>(std::move(t));
989     });
990 }
991
992 namespace detail {
993
994 template <class T>
995 void waitImpl(Future<T>& f) {
996   // short-circuit if there's nothing to do
997   if (f.isReady()) return;
998
999   FutureBatonType baton;
1000   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1001   baton.wait();
1002   assert(f.isReady());
1003 }
1004
1005 template <class T>
1006 void waitImpl(Future<T>& f, Duration dur) {
1007   // short-circuit if there's nothing to do
1008   if (f.isReady()) {
1009     return;
1010   }
1011
1012   Promise<T> promise;
1013   auto ret = promise.getFuture();
1014   auto baton = std::make_shared<FutureBatonType>();
1015   f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1016     promise.setTry(std::move(t));
1017     baton->post();
1018   });
1019   f = std::move(ret);
1020   if (baton->timed_wait(dur)) {
1021     assert(f.isReady());
1022   }
1023 }
1024
1025 template <class T>
1026 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1027   // Set callback so to ensure that the via executor has something on it
1028   // so that once the preceding future triggers this callback, drive will
1029   // always have a callback to satisfy it
1030   if (f.isReady())
1031     return;
1032   f = f.via(e).then([](T&& t) { return std::move(t); });
1033   while (!f.isReady()) {
1034     e->drive();
1035   }
1036   assert(f.isReady());
1037 }
1038
1039 } // detail
1040
1041 template <class T>
1042 Future<T>& Future<T>::wait() & {
1043   detail::waitImpl(*this);
1044   return *this;
1045 }
1046
1047 template <class T>
1048 Future<T>&& Future<T>::wait() && {
1049   detail::waitImpl(*this);
1050   return std::move(*this);
1051 }
1052
1053 template <class T>
1054 Future<T>& Future<T>::wait(Duration dur) & {
1055   detail::waitImpl(*this, dur);
1056   return *this;
1057 }
1058
1059 template <class T>
1060 Future<T>&& Future<T>::wait(Duration dur) && {
1061   detail::waitImpl(*this, dur);
1062   return std::move(*this);
1063 }
1064
1065 template <class T>
1066 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1067   detail::waitViaImpl(*this, e);
1068   return *this;
1069 }
1070
1071 template <class T>
1072 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1073   detail::waitViaImpl(*this, e);
1074   return std::move(*this);
1075 }
1076
1077 template <class T>
1078 T Future<T>::get() {
1079   return std::move(wait().value());
1080 }
1081
1082 template <class T>
1083 T Future<T>::get(Duration dur) {
1084   wait(dur);
1085   if (isReady()) {
1086     return std::move(value());
1087   } else {
1088     throw TimedOut();
1089   }
1090 }
1091
1092 template <class T>
1093 T Future<T>::getVia(DrivableExecutor* e) {
1094   return std::move(waitVia(e).value());
1095 }
1096
1097 namespace detail {
1098   template <class T>
1099   struct TryEquals {
1100     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1101       return t1.value() == t2.value();
1102     }
1103   };
1104 }
1105
1106 template <class T>
1107 Future<bool> Future<T>::willEqual(Future<T>& f) {
1108   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1109     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1110       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1111     } else {
1112       return false;
1113       }
1114   });
1115 }
1116
1117 template <class T>
1118 template <class F>
1119 Future<T> Future<T>::filter(F&& predicate) {
1120   return this->then([p = std::forward<F>(predicate)](T val) {
1121     T const& valConstRef = val;
1122     if (!p(valConstRef)) {
1123       throw PredicateDoesNotObtain();
1124     }
1125     return val;
1126   });
1127 }
1128
1129 template <class T>
1130 template <class Callback>
1131 auto Future<T>::thenMulti(Callback&& fn)
1132     -> decltype(this->then(std::forward<Callback>(fn))) {
1133   // thenMulti with one callback is just a then
1134   return then(std::forward<Callback>(fn));
1135 }
1136
1137 template <class T>
1138 template <class Callback, class... Callbacks>
1139 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1140     -> decltype(this->then(std::forward<Callback>(fn)).
1141                       thenMulti(std::forward<Callbacks>(fns)...)) {
1142   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1143   return then(std::forward<Callback>(fn)).
1144          thenMulti(std::forward<Callbacks>(fns)...);
1145 }
1146
1147 template <class T>
1148 template <class Callback, class... Callbacks>
1149 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1150                                       Callbacks&&... fns)
1151     -> decltype(this->then(std::forward<Callback>(fn)).
1152                       thenMulti(std::forward<Callbacks>(fns)...)) {
1153   // thenMultiExecutor with two callbacks is
1154   // via(x).then(a).thenMulti(b, ...).via(oldX)
1155   auto oldX = getExecutor();
1156   setExecutor(x);
1157   return then(std::forward<Callback>(fn)).
1158          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1159 }
1160
1161 template <class T>
1162 template <class Callback>
1163 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1164     -> decltype(this->then(std::forward<Callback>(fn))) {
1165   // thenMulti with one callback is just a then with an executor
1166   return then(x, std::forward<Callback>(fn));
1167 }
1168
1169 template <class F>
1170 inline Future<Unit> when(bool p, F&& thunk) {
1171   return p ? std::forward<F>(thunk)().unit() : makeFuture();
1172 }
1173
1174 template <class P, class F>
1175 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1176   if (predicate()) {
1177     auto future = thunk();
1178     return future.then([
1179       predicate = std::forward<P>(predicate),
1180       thunk = std::forward<F>(thunk)
1181     ]() mutable {
1182       return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1183     });
1184   }
1185   return makeFuture();
1186 }
1187
1188 template <class F>
1189 Future<Unit> times(const int n, F&& thunk) {
1190   return folly::whileDo(
1191       [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1192         return count->fetch_add(1) < n;
1193       },
1194       std::forward<F>(thunk));
1195 }
1196
1197 namespace futures {
1198   template <class It, class F, class ItT, class Result>
1199   std::vector<Future<Result>> map(It first, It last, F func) {
1200     std::vector<Future<Result>> results;
1201     for (auto it = first; it != last; it++) {
1202       results.push_back(it->then(func));
1203     }
1204     return results;
1205   }
1206 }
1207
1208 namespace futures {
1209
1210 namespace detail {
1211
1212 struct retrying_policy_raw_tag {};
1213 struct retrying_policy_fut_tag {};
1214
1215 template <class Policy>
1216 struct retrying_policy_traits {
1217   using ew = exception_wrapper;
1218   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1219   template <class Ret>
1220   using has_op = typename std::integral_constant<bool,
1221         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1222         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1223   using is_raw = has_op<bool>;
1224   using is_fut = has_op<Future<bool>>;
1225   using tag = typename std::conditional<
1226         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1227         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1228 };
1229
1230 template <class Policy, class FF, class Prom>
1231 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1232   using F = typename std::result_of<FF(size_t)>::type;
1233   using T = typename F::value_type;
1234   auto f = makeFutureWith([&] { return ff(k++); });
1235   f.then([
1236     k,
1237     prom = std::move(prom),
1238     pm = std::forward<Policy>(p),
1239     ffm = std::forward<FF>(ff)
1240   ](Try<T> && t) mutable {
1241     if (t.hasValue()) {
1242       prom.setValue(std::move(t).value());
1243       return;
1244     }
1245     auto& x = t.exception();
1246     auto q = pm(k, x);
1247     q.then([
1248       k,
1249       prom = std::move(prom),
1250       xm = std::move(x),
1251       pm = std::move(pm),
1252       ffm = std::move(ffm)
1253     ](bool shouldRetry) mutable {
1254       if (shouldRetry) {
1255         retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1256       } else {
1257         prom.setException(std::move(xm));
1258       };
1259     });
1260   });
1261 }
1262
1263 template <class Policy, class FF>
1264 typename std::result_of<FF(size_t)>::type
1265 retrying(size_t k, Policy&& p, FF&& ff) {
1266   using F = typename std::result_of<FF(size_t)>::type;
1267   using T = typename F::value_type;
1268   auto prom = Promise<T>();
1269   auto f = prom.getFuture();
1270   retryingImpl(
1271       k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1272   return f;
1273 }
1274
1275 template <class Policy, class FF>
1276 typename std::result_of<FF(size_t)>::type
1277 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1278   auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1279     return makeFuture<bool>(pm(k, x));
1280   };
1281   return retrying(0, std::move(q), std::forward<FF>(ff));
1282 }
1283
1284 template <class Policy, class FF>
1285 typename std::result_of<FF(size_t)>::type
1286 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1287   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1288 }
1289
1290 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1291 template <class URNG>
1292 Duration retryingJitteredExponentialBackoffDur(
1293     size_t n,
1294     Duration backoff_min,
1295     Duration backoff_max,
1296     double jitter_param,
1297     URNG& rng) {
1298   using d = Duration;
1299   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1300   auto jitter = std::exp(dist(rng));
1301   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1302   return std::max(backoff_min, std::min(backoff_max, backoff));
1303 }
1304
1305 template <class Policy, class URNG>
1306 std::function<Future<bool>(size_t, const exception_wrapper&)>
1307 retryingPolicyCappedJitteredExponentialBackoff(
1308     size_t max_tries,
1309     Duration backoff_min,
1310     Duration backoff_max,
1311     double jitter_param,
1312     URNG&& rng,
1313     Policy&& p) {
1314   return [
1315     pm = std::forward<Policy>(p),
1316     max_tries,
1317     backoff_min,
1318     backoff_max,
1319     jitter_param,
1320     rngp = std::forward<URNG>(rng)
1321   ](size_t n, const exception_wrapper& ex) mutable {
1322     if (n == max_tries) {
1323       return makeFuture(false);
1324     }
1325     return pm(n, ex).then(
1326         [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1327             bool v) mutable {
1328           if (!v) {
1329             return makeFuture(false);
1330           }
1331           auto backoff = detail::retryingJitteredExponentialBackoffDur(
1332               n, backoff_min, backoff_max, jitter_param, rngp);
1333           return futures::sleep(backoff).then([] { return true; });
1334         });
1335   };
1336 }
1337
1338 template <class Policy, class URNG>
1339 std::function<Future<bool>(size_t, const exception_wrapper&)>
1340 retryingPolicyCappedJitteredExponentialBackoff(
1341     size_t max_tries,
1342     Duration backoff_min,
1343     Duration backoff_max,
1344     double jitter_param,
1345     URNG&& rng,
1346     Policy&& p,
1347     retrying_policy_raw_tag) {
1348   auto q = [pm = std::forward<Policy>(p)](
1349       size_t n, const exception_wrapper& e) {
1350     return makeFuture(pm(n, e));
1351   };
1352   return retryingPolicyCappedJitteredExponentialBackoff(
1353       max_tries,
1354       backoff_min,
1355       backoff_max,
1356       jitter_param,
1357       std::forward<URNG>(rng),
1358       std::move(q));
1359 }
1360
1361 template <class Policy, class URNG>
1362 std::function<Future<bool>(size_t, const exception_wrapper&)>
1363 retryingPolicyCappedJitteredExponentialBackoff(
1364     size_t max_tries,
1365     Duration backoff_min,
1366     Duration backoff_max,
1367     double jitter_param,
1368     URNG&& rng,
1369     Policy&& p,
1370     retrying_policy_fut_tag) {
1371   return retryingPolicyCappedJitteredExponentialBackoff(
1372       max_tries,
1373       backoff_min,
1374       backoff_max,
1375       jitter_param,
1376       std::forward<URNG>(rng),
1377       std::forward<Policy>(p));
1378 }
1379 }
1380
1381 template <class Policy, class FF>
1382 typename std::result_of<FF(size_t)>::type
1383 retrying(Policy&& p, FF&& ff) {
1384   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1385   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1386 }
1387
1388 inline
1389 std::function<bool(size_t, const exception_wrapper&)>
1390 retryingPolicyBasic(
1391     size_t max_tries) {
1392   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1393 }
1394
1395 template <class Policy, class URNG>
1396 std::function<Future<bool>(size_t, const exception_wrapper&)>
1397 retryingPolicyCappedJitteredExponentialBackoff(
1398     size_t max_tries,
1399     Duration backoff_min,
1400     Duration backoff_max,
1401     double jitter_param,
1402     URNG&& rng,
1403     Policy&& p) {
1404   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1405   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1406       max_tries,
1407       backoff_min,
1408       backoff_max,
1409       jitter_param,
1410       std::forward<URNG>(rng),
1411       std::forward<Policy>(p),
1412       tag());
1413 }
1414
1415 inline
1416 std::function<Future<bool>(size_t, const exception_wrapper&)>
1417 retryingPolicyCappedJitteredExponentialBackoff(
1418     size_t max_tries,
1419     Duration backoff_min,
1420     Duration backoff_max,
1421     double jitter_param) {
1422   auto p = [](size_t, const exception_wrapper&) { return true; };
1423   return retryingPolicyCappedJitteredExponentialBackoff(
1424       max_tries,
1425       backoff_min,
1426       backoff_max,
1427       jitter_param,
1428       ThreadLocalPRNG(),
1429       std::move(p));
1430 }
1431
1432 }
1433
1434 // Instantiate the most common Future types to save compile time
1435 extern template class Future<Unit>;
1436 extern template class Future<bool>;
1437 extern template class Future<int>;
1438 extern template class Future<int64_t>;
1439 extern template class Future<std::string>;
1440 extern template class Future<double>;
1441
1442 } // namespace folly