Codemod folly::make_unique to std::make_unique
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
33 #else
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
36 #endif
37
38 namespace folly {
39
40 class Timekeeper;
41
42 namespace detail {
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
45 #else
46 typedef folly::Baton<> FutureBatonType;
47 #endif
48 }
49
50 namespace detail {
51   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52 }
53
54 template <class T>
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56   other.core_ = nullptr;
57 }
58
59 template <class T>
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61   std::swap(core_, other.core_);
62   return *this;
63 }
64
65 template <class T>
66 template <class T2, typename>
67 Future<T>::Future(T2&& val)
68     : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
69
70 template <class T>
71 template <typename T2>
72 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
73     : core_(new detail::Core<T>(Try<T>(T()))) {}
74
75 template <class T>
76 Future<T>::~Future() {
77   detach();
78 }
79
80 template <class T>
81 void Future<T>::detach() {
82   if (core_) {
83     core_->detachFuture();
84     core_ = nullptr;
85   }
86 }
87
88 template <class T>
89 void Future<T>::throwIfInvalid() const {
90   if (!core_)
91     throw NoState();
92 }
93
94 template <class T>
95 template <class F>
96 void Future<T>::setCallback_(F&& func) {
97   throwIfInvalid();
98   core_->setCallback(std::forward<F>(func));
99 }
100
101 // unwrap
102
103 template <class T>
104 template <class F>
105 typename std::enable_if<isFuture<F>::value,
106                         Future<typename isFuture<T>::Inner>>::type
107 Future<T>::unwrap() {
108   return then([](Future<typename isFuture<T>::Inner> internal_future) {
109       return internal_future;
110   });
111 }
112
113 // then
114
115 // Variant: returns a value
116 // e.g. f.then([](Try<T>&& t){ return t.value(); });
117 template <class T>
118 template <typename F, typename R, bool isTry, typename... Args>
119 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
120 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
121   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
122   typedef typename R::ReturnsFuture::Inner B;
123
124   throwIfInvalid();
125
126   Promise<B> p;
127   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
128
129   // grab the Future now before we lose our handle on the Promise
130   auto f = p.getFuture();
131   f.core_->setExecutorNoLock(getExecutor());
132
133   /* This is a bit tricky.
134
135      We can't just close over *this in case this Future gets moved. So we
136      make a new dummy Future. We could figure out something more
137      sophisticated that avoids making a new Future object when it can, as an
138      optimization. But this is correct.
139
140      core_ can't be moved, it is explicitly disallowed (as is copying). But
141      if there's ever a reason to allow it, this is one place that makes that
142      assumption and would need to be fixed. We use a standard shared pointer
143      for core_ (by copying it in), which means in essence obj holds a shared
144      pointer to itself.  But this shouldn't leak because Promise will not
145      outlive the continuation, because Promise will setException() with a
146      broken Promise if it is destructed before completed. We could use a
147      weak pointer but it would have to be converted to a shared pointer when
148      func is executed (because the Future returned by func may possibly
149      persist beyond the callback, if it gets moved), and so it is an
150      optimization to just make it shared from the get-go.
151
152      Two subtle but important points about this design. detail::Core has no
153      back pointers to Future or Promise, so if Future or Promise get moved
154      (and they will be moved in performant code) we don't have to do
155      anything fancy. And because we store the continuation in the
156      detail::Core, not in the Future, we can execute the continuation even
157      after the Future has gone out of scope. This is an intentional design
158      decision. It is likely we will want to be able to cancel a continuation
159      in some circumstances, but I think it should be explicit not implicit
160      in the destruction of the Future used to create it.
161      */
162   setCallback_(
163       [ func = std::forward<F>(func), pm = std::move(p) ](Try<T> && t) mutable {
164         if (!isTry && t.hasException()) {
165           pm.setException(std::move(t.exception()));
166         } else {
167           pm.setWith([&]() {
168             return std::move(func)(t.template get<isTry, Args>()...);
169           });
170         }
171       });
172
173   return f;
174 }
175
176 // Variant: returns a Future
177 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
178 template <class T>
179 template <typename F, typename R, bool isTry, typename... Args>
180 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
181 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
182   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
183   typedef typename R::ReturnsFuture::Inner B;
184
185   throwIfInvalid();
186
187   Promise<B> p;
188   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
189
190   // grab the Future now before we lose our handle on the Promise
191   auto f = p.getFuture();
192   f.core_->setExecutorNoLock(getExecutor());
193
194   setCallback_([ func = std::forward<F>(func), pm = std::move(p) ](
195       Try<T> && t) mutable {
196     auto ew = [&] {
197       if (!isTry && t.hasException()) {
198         return std::move(t.exception());
199       } else {
200         try {
201           auto f2 = std::move(func)(t.template get<isTry, Args>()...);
202           // that didn't throw, now we can steal p
203           f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
204             p.setTry(std::move(b));
205           });
206           return exception_wrapper();
207         } catch (const std::exception& e) {
208           return exception_wrapper(std::current_exception(), e);
209         } catch (...) {
210           return exception_wrapper(std::current_exception());
211         }
212       }
213     }();
214     if (ew) {
215       pm.setException(std::move(ew));
216     }
217   });
218
219   return f;
220 }
221
222 template <typename T>
223 template <typename R, typename Caller, typename... Args>
224   Future<typename isFuture<R>::Inner>
225 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
226   typedef typename std::remove_cv<
227     typename std::remove_reference<
228       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
229   return then([instance, func](Try<T>&& t){
230     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
231   });
232 }
233
234 template <class T>
235 template <class Executor, class Arg, class... Args>
236 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
237   -> decltype(this->then(std::forward<Arg>(arg),
238                          std::forward<Args>(args)...))
239 {
240   auto oldX = getExecutor();
241   setExecutor(x);
242   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
243                via(oldX);
244 }
245
246 template <class T>
247 Future<Unit> Future<T>::then() {
248   return then([] () {});
249 }
250
251 // onError where the callback returns T
252 template <class T>
253 template <class F>
254 typename std::enable_if<
255   !detail::callableWith<F, exception_wrapper>::value &&
256   !detail::Extract<F>::ReturnsFuture::value,
257   Future<T>>::type
258 Future<T>::onError(F&& func) {
259   typedef typename detail::Extract<F>::FirstArg Exn;
260   static_assert(
261       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
262       "Return type of onError callback must be T or Future<T>");
263
264   Promise<T> p;
265   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
266   auto f = p.getFuture();
267
268   setCallback_(
269       [ func = std::forward<F>(func), pm = std::move(p) ](Try<T> && t) mutable {
270         if (!t.template withException<Exn>([&](Exn& e) {
271               pm.setWith([&] { return std::move(func)(e); });
272             })) {
273           pm.setTry(std::move(t));
274         }
275       });
276
277   return f;
278 }
279
280 // onError where the callback returns Future<T>
281 template <class T>
282 template <class F>
283 typename std::enable_if<
284   !detail::callableWith<F, exception_wrapper>::value &&
285   detail::Extract<F>::ReturnsFuture::value,
286   Future<T>>::type
287 Future<T>::onError(F&& func) {
288   static_assert(
289       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
290       "Return type of onError callback must be T or Future<T>");
291   typedef typename detail::Extract<F>::FirstArg Exn;
292
293   Promise<T> p;
294   auto f = p.getFuture();
295
296   setCallback_([ pm = std::move(p), func = std::forward<F>(func) ](
297       Try<T> && t) mutable {
298     if (!t.template withException<Exn>([&](Exn& e) {
299           auto ew = [&] {
300             try {
301               auto f2 = std::move(func)(e);
302               f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
303                 pm.setTry(std::move(t2));
304               });
305               return exception_wrapper();
306             } catch (const std::exception& e2) {
307               return exception_wrapper(std::current_exception(), e2);
308             } catch (...) {
309               return exception_wrapper(std::current_exception());
310             }
311           }();
312           if (ew) {
313             pm.setException(std::move(ew));
314           }
315         })) {
316       pm.setTry(std::move(t));
317     }
318   });
319
320   return f;
321 }
322
323 template <class T>
324 template <class F>
325 Future<T> Future<T>::ensure(F&& func) {
326   return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
327     funcw();
328     return makeFuture(std::move(t));
329   });
330 }
331
332 template <class T>
333 template <class F>
334 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
335   return within(dur, tk).onError([funcw = std::forward<F>(func)](
336       TimedOut const&) { return funcw(); });
337 }
338
339 template <class T>
340 template <class F>
341 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
342                             detail::Extract<F>::ReturnsFuture::value,
343                         Future<T>>::type
344 Future<T>::onError(F&& func) {
345   static_assert(
346       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
347       "Return type of onError callback must be T or Future<T>");
348
349   Promise<T> p;
350   auto f = p.getFuture();
351   setCallback_(
352       [ pm = std::move(p), func = std::forward<F>(func) ](Try<T> t) mutable {
353         if (t.hasException()) {
354           auto ew = [&] {
355             try {
356               auto f2 = std::move(func)(std::move(t.exception()));
357               f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
358                 pm.setTry(std::move(t2));
359               });
360               return exception_wrapper();
361             } catch (const std::exception& e2) {
362               return exception_wrapper(std::current_exception(), e2);
363             } catch (...) {
364               return exception_wrapper(std::current_exception());
365             }
366           }();
367           if (ew) {
368             pm.setException(std::move(ew));
369           }
370         } else {
371           pm.setTry(std::move(t));
372         }
373       });
374
375   return f;
376 }
377
378 // onError(exception_wrapper) that returns T
379 template <class T>
380 template <class F>
381 typename std::enable_if<
382   detail::callableWith<F, exception_wrapper>::value &&
383   !detail::Extract<F>::ReturnsFuture::value,
384   Future<T>>::type
385 Future<T>::onError(F&& func) {
386   static_assert(
387       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
388       "Return type of onError callback must be T or Future<T>");
389
390   Promise<T> p;
391   auto f = p.getFuture();
392   setCallback_(
393       [ pm = std::move(p), func = std::forward<F>(func) ](Try<T> t) mutable {
394         if (t.hasException()) {
395           pm.setWith([&] { return std::move(func)(std::move(t.exception())); });
396         } else {
397           pm.setTry(std::move(t));
398         }
399       });
400
401   return f;
402 }
403
404 template <class T>
405 typename std::add_lvalue_reference<T>::type Future<T>::value() {
406   throwIfInvalid();
407
408   return core_->getTry().value();
409 }
410
411 template <class T>
412 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
413   throwIfInvalid();
414
415   return core_->getTry().value();
416 }
417
418 template <class T>
419 Try<T>& Future<T>::getTry() {
420   throwIfInvalid();
421
422   return core_->getTry();
423 }
424
425 template <class T>
426 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
427   return waitVia(e).getTry();
428 }
429
430 template <class T>
431 Optional<Try<T>> Future<T>::poll() {
432   Optional<Try<T>> o;
433   if (core_->ready()) {
434     o = std::move(core_->getTry());
435   }
436   return o;
437 }
438
439 template <class T>
440 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
441   throwIfInvalid();
442
443   setExecutor(executor, priority);
444
445   return std::move(*this);
446 }
447
448 template <class T>
449 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
450   throwIfInvalid();
451
452   Promise<T> p;
453   auto f = p.getFuture();
454   then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
455   return std::move(f).via(executor, priority);
456 }
457
458 template <class Func>
459 auto via(Executor* x, Func&& func)
460   -> Future<typename isFuture<decltype(func())>::Inner>
461 {
462   // TODO make this actually more performant. :-P #7260175
463   return via(x).then(std::forward<Func>(func));
464 }
465
466 template <class T>
467 bool Future<T>::isReady() const {
468   throwIfInvalid();
469   return core_->ready();
470 }
471
472 template <class T>
473 bool Future<T>::hasValue() {
474   return getTry().hasValue();
475 }
476
477 template <class T>
478 bool Future<T>::hasException() {
479   return getTry().hasException();
480 }
481
482 template <class T>
483 void Future<T>::raise(exception_wrapper exception) {
484   core_->raise(std::move(exception));
485 }
486
487 // makeFuture
488
489 template <class T>
490 Future<typename std::decay<T>::type> makeFuture(T&& t) {
491   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
492 }
493
494 inline // for multiple translation units
495 Future<Unit> makeFuture() {
496   return makeFuture(Unit{});
497 }
498
499 // makeFutureWith(Future<T>()) -> Future<T>
500 template <class F>
501 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
502                         typename std::result_of<F()>::type>::type
503 makeFutureWith(F&& func) {
504   using InnerType =
505       typename isFuture<typename std::result_of<F()>::type>::Inner;
506   try {
507     return func();
508   } catch (std::exception& e) {
509     return makeFuture<InnerType>(
510         exception_wrapper(std::current_exception(), e));
511   } catch (...) {
512     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
513   }
514 }
515
516 // makeFutureWith(T()) -> Future<T>
517 // makeFutureWith(void()) -> Future<Unit>
518 template <class F>
519 typename std::enable_if<
520     !(isFuture<typename std::result_of<F()>::type>::value),
521     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
522 makeFutureWith(F&& func) {
523   using LiftedResult =
524       typename Unit::Lift<typename std::result_of<F()>::type>::type;
525   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
526     return func();
527   }));
528 }
529
530 template <class T>
531 Future<T> makeFuture(std::exception_ptr const& e) {
532   return makeFuture(Try<T>(e));
533 }
534
535 template <class T>
536 Future<T> makeFuture(exception_wrapper ew) {
537   return makeFuture(Try<T>(std::move(ew)));
538 }
539
540 template <class T, class E>
541 typename std::enable_if<std::is_base_of<std::exception, E>::value,
542                         Future<T>>::type
543 makeFuture(E const& e) {
544   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
545 }
546
547 template <class T>
548 Future<T> makeFuture(Try<T>&& t) {
549   return Future<T>(new detail::Core<T>(std::move(t)));
550 }
551
552 // via
553 Future<Unit> via(Executor* executor, int8_t priority) {
554   return makeFuture().via(executor, priority);
555 }
556
557 // mapSetCallback calls func(i, Try<T>) when every future completes
558
559 template <class T, class InputIterator, class F>
560 void mapSetCallback(InputIterator first, InputIterator last, F func) {
561   for (size_t i = 0; first != last; ++first, ++i) {
562     first->setCallback_([func, i](Try<T>&& t) {
563       func(i, std::move(t));
564     });
565   }
566 }
567
568 // collectAll (variadic)
569
570 template <typename... Fs>
571 typename detail::CollectAllVariadicContext<
572   typename std::decay<Fs>::type::value_type...>::type
573 collectAll(Fs&&... fs) {
574   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
575     typename std::decay<Fs>::type::value_type...>>();
576   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
577     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
578   return ctx->p.getFuture();
579 }
580
581 // collectAll (iterator)
582
583 template <class InputIterator>
584 Future<
585   std::vector<
586   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
587 collectAll(InputIterator first, InputIterator last) {
588   typedef
589     typename std::iterator_traits<InputIterator>::value_type::value_type T;
590
591   struct CollectAllContext {
592     CollectAllContext(size_t n) : results(n) {}
593     ~CollectAllContext() {
594       p.setValue(std::move(results));
595     }
596     Promise<std::vector<Try<T>>> p;
597     std::vector<Try<T>> results;
598   };
599
600   auto ctx =
601       std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
602   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
603     ctx->results[i] = std::move(t);
604   });
605   return ctx->p.getFuture();
606 }
607
608 // collect (iterator)
609
610 namespace detail {
611
612 template <typename T>
613 struct CollectContext {
614   struct Nothing {
615     explicit Nothing(int /* n */) {}
616   };
617
618   using Result = typename std::conditional<
619     std::is_void<T>::value,
620     void,
621     std::vector<T>>::type;
622
623   using InternalResult = typename std::conditional<
624     std::is_void<T>::value,
625     Nothing,
626     std::vector<Optional<T>>>::type;
627
628   explicit CollectContext(size_t n) : result(n) {}
629   ~CollectContext() {
630     if (!threw.exchange(true)) {
631       // map Optional<T> -> T
632       std::vector<T> finalResult;
633       finalResult.reserve(result.size());
634       std::transform(result.begin(), result.end(),
635                      std::back_inserter(finalResult),
636                      [](Optional<T>& o) { return std::move(o.value()); });
637       p.setValue(std::move(finalResult));
638     }
639   }
640   inline void setPartialResult(size_t i, Try<T>& t) {
641     result[i] = std::move(t.value());
642   }
643   Promise<Result> p;
644   InternalResult result;
645   std::atomic<bool> threw {false};
646 };
647
648 }
649
650 template <class InputIterator>
651 Future<typename detail::CollectContext<
652   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
653 collect(InputIterator first, InputIterator last) {
654   typedef
655     typename std::iterator_traits<InputIterator>::value_type::value_type T;
656
657   auto ctx = std::make_shared<detail::CollectContext<T>>(
658     std::distance(first, last));
659   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
660     if (t.hasException()) {
661        if (!ctx->threw.exchange(true)) {
662          ctx->p.setException(std::move(t.exception()));
663        }
664      } else if (!ctx->threw) {
665        ctx->setPartialResult(i, t);
666      }
667   });
668   return ctx->p.getFuture();
669 }
670
671 // collect (variadic)
672
673 template <typename... Fs>
674 typename detail::CollectVariadicContext<
675   typename std::decay<Fs>::type::value_type...>::type
676 collect(Fs&&... fs) {
677   auto ctx = std::make_shared<detail::CollectVariadicContext<
678     typename std::decay<Fs>::type::value_type...>>();
679   detail::collectVariadicHelper<detail::CollectVariadicContext>(
680     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
681   return ctx->p.getFuture();
682 }
683
684 // collectAny (iterator)
685
686 template <class InputIterator>
687 Future<
688   std::pair<size_t,
689             Try<
690               typename
691               std::iterator_traits<InputIterator>::value_type::value_type>>>
692 collectAny(InputIterator first, InputIterator last) {
693   typedef
694     typename std::iterator_traits<InputIterator>::value_type::value_type T;
695
696   struct CollectAnyContext {
697     CollectAnyContext() {}
698     Promise<std::pair<size_t, Try<T>>> p;
699     std::atomic<bool> done {false};
700   };
701
702   auto ctx = std::make_shared<CollectAnyContext>();
703   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
704     if (!ctx->done.exchange(true)) {
705       ctx->p.setValue(std::make_pair(i, std::move(t)));
706     }
707   });
708   return ctx->p.getFuture();
709 }
710
711 // collectAnyWithoutException (iterator)
712
713 template <class InputIterator>
714 Future<std::pair<
715     size_t,
716     typename std::iterator_traits<InputIterator>::value_type::value_type>>
717 collectAnyWithoutException(InputIterator first, InputIterator last) {
718   typedef
719       typename std::iterator_traits<InputIterator>::value_type::value_type T;
720
721   struct CollectAnyWithoutExceptionContext {
722     CollectAnyWithoutExceptionContext(){}
723     Promise<std::pair<size_t, T>> p;
724     std::atomic<bool> done{false};
725     std::atomic<size_t> nFulfilled{0};
726     size_t nTotal;
727   };
728
729   auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
730   ctx->nTotal = size_t(std::distance(first, last));
731
732   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
733     if (!t.hasException() && !ctx->done.exchange(true)) {
734       ctx->p.setValue(std::make_pair(i, std::move(t.value())));
735     } else if (++ctx->nFulfilled == ctx->nTotal) {
736       ctx->p.setException(t.exception());
737     }
738   });
739   return ctx->p.getFuture();
740 }
741
742 // collectN (iterator)
743
744 template <class InputIterator>
745 Future<std::vector<std::pair<size_t, Try<typename
746   std::iterator_traits<InputIterator>::value_type::value_type>>>>
747 collectN(InputIterator first, InputIterator last, size_t n) {
748   typedef typename
749     std::iterator_traits<InputIterator>::value_type::value_type T;
750   typedef std::vector<std::pair<size_t, Try<T>>> V;
751
752   struct CollectNContext {
753     V v;
754     std::atomic<size_t> completed = {0};
755     Promise<V> p;
756   };
757   auto ctx = std::make_shared<CollectNContext>();
758
759   if (size_t(std::distance(first, last)) < n) {
760     ctx->p.setException(std::runtime_error("Not enough futures"));
761   } else {
762     // for each completed Future, increase count and add to vector, until we
763     // have n completed futures at which point we fulfil our Promise with the
764     // vector
765     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
766       auto c = ++ctx->completed;
767       if (c <= n) {
768         assert(ctx->v.size() < n);
769         ctx->v.emplace_back(i, std::move(t));
770         if (c == n) {
771           ctx->p.setTry(Try<V>(std::move(ctx->v)));
772         }
773       }
774     });
775   }
776
777   return ctx->p.getFuture();
778 }
779
780 // reduce (iterator)
781
782 template <class It, class T, class F>
783 Future<T> reduce(It first, It last, T&& initial, F&& func) {
784   if (first == last) {
785     return makeFuture(std::move(initial));
786   }
787
788   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
789   typedef
790       typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
791                                 Try<ItT>,
792                                 ItT>::type Arg;
793   typedef isTry<Arg> IsTry;
794
795   auto sfunc = std::make_shared<F>(std::move(func));
796
797   auto f = first->then(
798       [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
799         return (*sfunc)(
800             std::move(minitial), head.template get<IsTry::value, Arg&&>());
801       });
802
803   for (++first; first != last; ++first) {
804     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
805       return (*sfunc)(std::move(std::get<0>(t).value()),
806                   // Either return a ItT&& or a Try<ItT>&& depending
807                   // on the type of the argument of func.
808                   std::get<1>(t).template get<IsTry::value, Arg&&>());
809     });
810   }
811
812   return f;
813 }
814
815 // window (collection)
816
817 template <class Collection, class F, class ItT, class Result>
818 std::vector<Future<Result>>
819 window(Collection input, F func, size_t n) {
820   struct WindowContext {
821     WindowContext(Collection&& i, F&& fn)
822         : input_(std::move(i)), promises_(input_.size()),
823           func_(std::move(fn))
824       {}
825     std::atomic<size_t> i_ {0};
826     Collection input_;
827     std::vector<Promise<Result>> promises_;
828     F func_;
829
830     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
831       size_t i = ctx->i_++;
832       if (i < ctx->input_.size()) {
833         // Using setCallback_ directly since we don't need the Future
834         ctx->func_(std::move(ctx->input_[i])).setCallback_(
835           // ctx is captured by value
836           [ctx, i](Try<Result>&& t) {
837             ctx->promises_[i].setTry(std::move(t));
838             // Chain another future onto this one
839             spawn(std::move(ctx));
840           });
841       }
842     }
843   };
844
845   auto max = std::min(n, input.size());
846
847   auto ctx = std::make_shared<WindowContext>(
848     std::move(input), std::move(func));
849
850   for (size_t i = 0; i < max; ++i) {
851     // Start the first n Futures
852     WindowContext::spawn(ctx);
853   }
854
855   std::vector<Future<Result>> futures;
856   futures.reserve(ctx->promises_.size());
857   for (auto& promise : ctx->promises_) {
858     futures.emplace_back(promise.getFuture());
859   }
860
861   return futures;
862 }
863
864 // reduce
865
866 template <class T>
867 template <class I, class F>
868 Future<I> Future<T>::reduce(I&& initial, F&& func) {
869   return then([
870     minitial = std::forward<I>(initial),
871     mfunc = std::forward<F>(func)
872   ](T& vals) mutable {
873     auto ret = std::move(minitial);
874     for (auto& val : vals) {
875       ret = mfunc(std::move(ret), std::move(val));
876     }
877     return ret;
878   });
879 }
880
881 // unorderedReduce (iterator)
882
883 template <class It, class T, class F, class ItT, class Arg>
884 Future<T> unorderedReduce(It first, It last, T initial, F func) {
885   if (first == last) {
886     return makeFuture(std::move(initial));
887   }
888
889   typedef isTry<Arg> IsTry;
890
891   struct UnorderedReduceContext {
892     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
893         : lock_(), memo_(makeFuture<T>(std::move(memo))),
894           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
895       {}
896     folly::MicroSpinLock lock_; // protects memo_ and numThens_
897     Future<T> memo_;
898     F func_;
899     size_t numThens_; // how many Futures completed and called .then()
900     size_t numFutures_; // how many Futures in total
901     Promise<T> promise_;
902   };
903
904   auto ctx = std::make_shared<UnorderedReduceContext>(
905     std::move(initial), std::move(func), std::distance(first, last));
906
907   mapSetCallback<ItT>(
908       first,
909       last,
910       [ctx](size_t /* i */, Try<ItT>&& t) {
911         // Futures can be completed in any order, simultaneously.
912         // To make this non-blocking, we create a new Future chain in
913         // the order of completion to reduce the values.
914         // The spinlock just protects chaining a new Future, not actually
915         // executing the reduce, which should be really fast.
916         folly::MSLGuard lock(ctx->lock_);
917         ctx->memo_ =
918             ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
919               // Either return a ItT&& or a Try<ItT>&& depending
920               // on the type of the argument of func.
921               return ctx->func_(std::move(v),
922                                 mt.template get<IsTry::value, Arg&&>());
923             });
924         if (++ctx->numThens_ == ctx->numFutures_) {
925           // After reducing the value of the last Future, fulfill the Promise
926           ctx->memo_.setCallback_(
927               [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
928         }
929       });
930
931   return ctx->promise_.getFuture();
932 }
933
934 // within
935
936 template <class T>
937 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
938   return within(dur, TimedOut(), tk);
939 }
940
941 template <class T>
942 template <class E>
943 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
944
945   struct Context {
946     Context(E ex) : exception(std::move(ex)), promise() {}
947     E exception;
948     Future<Unit> thisFuture;
949     Promise<T> promise;
950     std::atomic<bool> token {false};
951   };
952
953   std::shared_ptr<Timekeeper> tks;
954   if (!tk) {
955     tks = folly::detail::getTimekeeperSingleton();
956     tk = DCHECK_NOTNULL(tks.get());
957   }
958
959   auto ctx = std::make_shared<Context>(std::move(e));
960
961   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
962     // TODO: "this" completed first, cancel "after"
963     if (ctx->token.exchange(true) == false) {
964       ctx->promise.setTry(std::move(t));
965     }
966   });
967
968   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
969     // "after" completed first, cancel "this"
970     ctx->thisFuture.raise(TimedOut());
971     if (ctx->token.exchange(true) == false) {
972       if (t.hasException()) {
973         ctx->promise.setException(std::move(t.exception()));
974       } else {
975         ctx->promise.setException(std::move(ctx->exception));
976       }
977     }
978   });
979
980   return ctx->promise.getFuture().via(getExecutor());
981 }
982
983 // delayed
984
985 template <class T>
986 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
987   return collectAll(*this, futures::sleep(dur, tk))
988     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
989       Try<T>& t = std::get<0>(tup);
990       return makeFuture<T>(std::move(t));
991     });
992 }
993
994 namespace detail {
995
996 template <class T>
997 void waitImpl(Future<T>& f) {
998   // short-circuit if there's nothing to do
999   if (f.isReady()) return;
1000
1001   FutureBatonType baton;
1002   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1003   baton.wait();
1004   assert(f.isReady());
1005 }
1006
1007 template <class T>
1008 void waitImpl(Future<T>& f, Duration dur) {
1009   // short-circuit if there's nothing to do
1010   if (f.isReady()) {
1011     return;
1012   }
1013
1014   Promise<T> promise;
1015   auto ret = promise.getFuture();
1016   auto baton = std::make_shared<FutureBatonType>();
1017   f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1018     promise.setTry(std::move(t));
1019     baton->post();
1020   });
1021   f = std::move(ret);
1022   if (baton->timed_wait(dur)) {
1023     assert(f.isReady());
1024   }
1025 }
1026
1027 template <class T>
1028 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1029   // Set callback so to ensure that the via executor has something on it
1030   // so that once the preceding future triggers this callback, drive will
1031   // always have a callback to satisfy it
1032   if (f.isReady())
1033     return;
1034   f = f.via(e).then([](T&& t) { return std::move(t); });
1035   while (!f.isReady()) {
1036     e->drive();
1037   }
1038   assert(f.isReady());
1039 }
1040
1041 } // detail
1042
1043 template <class T>
1044 Future<T>& Future<T>::wait() & {
1045   detail::waitImpl(*this);
1046   return *this;
1047 }
1048
1049 template <class T>
1050 Future<T>&& Future<T>::wait() && {
1051   detail::waitImpl(*this);
1052   return std::move(*this);
1053 }
1054
1055 template <class T>
1056 Future<T>& Future<T>::wait(Duration dur) & {
1057   detail::waitImpl(*this, dur);
1058   return *this;
1059 }
1060
1061 template <class T>
1062 Future<T>&& Future<T>::wait(Duration dur) && {
1063   detail::waitImpl(*this, dur);
1064   return std::move(*this);
1065 }
1066
1067 template <class T>
1068 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1069   detail::waitViaImpl(*this, e);
1070   return *this;
1071 }
1072
1073 template <class T>
1074 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1075   detail::waitViaImpl(*this, e);
1076   return std::move(*this);
1077 }
1078
1079 template <class T>
1080 T Future<T>::get() {
1081   return std::move(wait().value());
1082 }
1083
1084 template <class T>
1085 T Future<T>::get(Duration dur) {
1086   wait(dur);
1087   if (isReady()) {
1088     return std::move(value());
1089   } else {
1090     throw TimedOut();
1091   }
1092 }
1093
1094 template <class T>
1095 T Future<T>::getVia(DrivableExecutor* e) {
1096   return std::move(waitVia(e).value());
1097 }
1098
1099 namespace detail {
1100   template <class T>
1101   struct TryEquals {
1102     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1103       return t1.value() == t2.value();
1104     }
1105   };
1106 }
1107
1108 template <class T>
1109 Future<bool> Future<T>::willEqual(Future<T>& f) {
1110   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1111     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1112       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1113     } else {
1114       return false;
1115       }
1116   });
1117 }
1118
1119 template <class T>
1120 template <class F>
1121 Future<T> Future<T>::filter(F&& predicate) {
1122   return this->then([p = std::forward<F>(predicate)](T val) {
1123     T const& valConstRef = val;
1124     if (!p(valConstRef)) {
1125       throw PredicateDoesNotObtain();
1126     }
1127     return val;
1128   });
1129 }
1130
1131 template <class T>
1132 template <class Callback>
1133 auto Future<T>::thenMulti(Callback&& fn)
1134     -> decltype(this->then(std::forward<Callback>(fn))) {
1135   // thenMulti with one callback is just a then
1136   return then(std::forward<Callback>(fn));
1137 }
1138
1139 template <class T>
1140 template <class Callback, class... Callbacks>
1141 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1142     -> decltype(this->then(std::forward<Callback>(fn)).
1143                       thenMulti(std::forward<Callbacks>(fns)...)) {
1144   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1145   return then(std::forward<Callback>(fn)).
1146          thenMulti(std::forward<Callbacks>(fns)...);
1147 }
1148
1149 template <class T>
1150 template <class Callback, class... Callbacks>
1151 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1152                                       Callbacks&&... fns)
1153     -> decltype(this->then(std::forward<Callback>(fn)).
1154                       thenMulti(std::forward<Callbacks>(fns)...)) {
1155   // thenMultiExecutor with two callbacks is
1156   // via(x).then(a).thenMulti(b, ...).via(oldX)
1157   auto oldX = getExecutor();
1158   setExecutor(x);
1159   return then(std::forward<Callback>(fn)).
1160          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1161 }
1162
1163 template <class T>
1164 template <class Callback>
1165 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1166     -> decltype(this->then(std::forward<Callback>(fn))) {
1167   // thenMulti with one callback is just a then with an executor
1168   return then(x, std::forward<Callback>(fn));
1169 }
1170
1171 template <class F>
1172 inline Future<Unit> when(bool p, F&& thunk) {
1173   return p ? std::forward<F>(thunk)().unit() : makeFuture();
1174 }
1175
1176 template <class P, class F>
1177 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1178   if (predicate()) {
1179     auto future = thunk();
1180     return future.then([
1181       predicate = std::forward<P>(predicate),
1182       thunk = std::forward<F>(thunk)
1183     ]() mutable {
1184       return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1185     });
1186   }
1187   return makeFuture();
1188 }
1189
1190 template <class F>
1191 Future<Unit> times(const int n, F&& thunk) {
1192   return folly::whileDo(
1193       [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1194         return count->fetch_add(1) < n;
1195       },
1196       std::forward<F>(thunk));
1197 }
1198
1199 namespace futures {
1200   template <class It, class F, class ItT, class Result>
1201   std::vector<Future<Result>> map(It first, It last, F func) {
1202     std::vector<Future<Result>> results;
1203     for (auto it = first; it != last; it++) {
1204       results.push_back(it->then(func));
1205     }
1206     return results;
1207   }
1208 }
1209
1210 namespace futures {
1211
1212 namespace detail {
1213
1214 struct retrying_policy_raw_tag {};
1215 struct retrying_policy_fut_tag {};
1216
1217 template <class Policy>
1218 struct retrying_policy_traits {
1219   using ew = exception_wrapper;
1220   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1221   template <class Ret>
1222   using has_op = typename std::integral_constant<bool,
1223         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1224         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1225   using is_raw = has_op<bool>;
1226   using is_fut = has_op<Future<bool>>;
1227   using tag = typename std::conditional<
1228         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1229         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1230 };
1231
1232 template <class Policy, class FF, class Prom>
1233 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1234   using F = typename std::result_of<FF(size_t)>::type;
1235   using T = typename F::value_type;
1236   auto f = ff(k++);
1237   f.then([
1238     k,
1239     prom = std::move(prom),
1240     pm = std::forward<Policy>(p),
1241     ffm = std::forward<FF>(ff)
1242   ](Try<T> && t) mutable {
1243     if (t.hasValue()) {
1244       prom.setValue(std::move(t).value());
1245       return;
1246     }
1247     auto& x = t.exception();
1248     auto q = pm(k, x);
1249     q.then([
1250       k,
1251       prom = std::move(prom),
1252       xm = std::move(x),
1253       pm = std::move(pm),
1254       ffm = std::move(ffm)
1255     ](bool shouldRetry) mutable {
1256       if (shouldRetry) {
1257         retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1258       } else {
1259         prom.setException(std::move(xm));
1260       };
1261     });
1262   });
1263 }
1264
1265 template <class Policy, class FF>
1266 typename std::result_of<FF(size_t)>::type
1267 retrying(size_t k, Policy&& p, FF&& ff) {
1268   using F = typename std::result_of<FF(size_t)>::type;
1269   using T = typename F::value_type;
1270   auto prom = Promise<T>();
1271   auto f = prom.getFuture();
1272   retryingImpl(
1273       k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1274   return f;
1275 }
1276
1277 template <class Policy, class FF>
1278 typename std::result_of<FF(size_t)>::type
1279 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1280   auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1281     return makeFuture<bool>(pm(k, x));
1282   };
1283   return retrying(0, std::move(q), std::forward<FF>(ff));
1284 }
1285
1286 template <class Policy, class FF>
1287 typename std::result_of<FF(size_t)>::type
1288 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1289   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1290 }
1291
1292 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1293 template <class URNG>
1294 Duration retryingJitteredExponentialBackoffDur(
1295     size_t n,
1296     Duration backoff_min,
1297     Duration backoff_max,
1298     double jitter_param,
1299     URNG& rng) {
1300   using d = Duration;
1301   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1302   auto jitter = std::exp(dist(rng));
1303   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1304   return std::max(backoff_min, std::min(backoff_max, backoff));
1305 }
1306
1307 template <class Policy, class URNG>
1308 std::function<Future<bool>(size_t, const exception_wrapper&)>
1309 retryingPolicyCappedJitteredExponentialBackoff(
1310     size_t max_tries,
1311     Duration backoff_min,
1312     Duration backoff_max,
1313     double jitter_param,
1314     URNG&& rng,
1315     Policy&& p) {
1316   return [
1317     pm = std::forward<Policy>(p),
1318     max_tries,
1319     backoff_min,
1320     backoff_max,
1321     jitter_param,
1322     rngp = std::forward<URNG>(rng)
1323   ](size_t n, const exception_wrapper& ex) mutable {
1324     if (n == max_tries) {
1325       return makeFuture(false);
1326     }
1327     return pm(n, ex).then(
1328         [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1329             bool v) mutable {
1330           if (!v) {
1331             return makeFuture(false);
1332           }
1333           auto backoff = detail::retryingJitteredExponentialBackoffDur(
1334               n, backoff_min, backoff_max, jitter_param, rngp);
1335           return futures::sleep(backoff).then([] { return true; });
1336         });
1337   };
1338 }
1339
1340 template <class Policy, class URNG>
1341 std::function<Future<bool>(size_t, const exception_wrapper&)>
1342 retryingPolicyCappedJitteredExponentialBackoff(
1343     size_t max_tries,
1344     Duration backoff_min,
1345     Duration backoff_max,
1346     double jitter_param,
1347     URNG&& rng,
1348     Policy&& p,
1349     retrying_policy_raw_tag) {
1350   auto q = [pm = std::forward<Policy>(p)](
1351       size_t n, const exception_wrapper& e) {
1352     return makeFuture(pm(n, e));
1353   };
1354   return retryingPolicyCappedJitteredExponentialBackoff(
1355       max_tries,
1356       backoff_min,
1357       backoff_max,
1358       jitter_param,
1359       std::forward<URNG>(rng),
1360       std::move(q));
1361 }
1362
1363 template <class Policy, class URNG>
1364 std::function<Future<bool>(size_t, const exception_wrapper&)>
1365 retryingPolicyCappedJitteredExponentialBackoff(
1366     size_t max_tries,
1367     Duration backoff_min,
1368     Duration backoff_max,
1369     double jitter_param,
1370     URNG&& rng,
1371     Policy&& p,
1372     retrying_policy_fut_tag) {
1373   return retryingPolicyCappedJitteredExponentialBackoff(
1374       max_tries,
1375       backoff_min,
1376       backoff_max,
1377       jitter_param,
1378       std::forward<URNG>(rng),
1379       std::forward<Policy>(p));
1380 }
1381 }
1382
1383 template <class Policy, class FF>
1384 typename std::result_of<FF(size_t)>::type
1385 retrying(Policy&& p, FF&& ff) {
1386   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1387   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1388 }
1389
1390 inline
1391 std::function<bool(size_t, const exception_wrapper&)>
1392 retryingPolicyBasic(
1393     size_t max_tries) {
1394   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1395 }
1396
1397 template <class Policy, class URNG>
1398 std::function<Future<bool>(size_t, const exception_wrapper&)>
1399 retryingPolicyCappedJitteredExponentialBackoff(
1400     size_t max_tries,
1401     Duration backoff_min,
1402     Duration backoff_max,
1403     double jitter_param,
1404     URNG&& rng,
1405     Policy&& p) {
1406   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1407   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1408       max_tries,
1409       backoff_min,
1410       backoff_max,
1411       jitter_param,
1412       std::forward<URNG>(rng),
1413       std::forward<Policy>(p),
1414       tag());
1415 }
1416
1417 inline
1418 std::function<Future<bool>(size_t, const exception_wrapper&)>
1419 retryingPolicyCappedJitteredExponentialBackoff(
1420     size_t max_tries,
1421     Duration backoff_min,
1422     Duration backoff_max,
1423     double jitter_param) {
1424   auto p = [](size_t, const exception_wrapper&) { return true; };
1425   return retryingPolicyCappedJitteredExponentialBackoff(
1426       max_tries,
1427       backoff_min,
1428       backoff_max,
1429       jitter_param,
1430       ThreadLocalPRNG(),
1431       std::move(p));
1432 }
1433
1434 }
1435
1436 // Instantiate the most common Future types to save compile time
1437 extern template class Future<Unit>;
1438 extern template class Future<bool>;
1439 extern template class Future<int>;
1440 extern template class Future<int64_t>;
1441 extern template class Future<std::string>;
1442 extern template class Future<double>;
1443
1444 } // namespace folly