use previous Executor after delayed
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
33 #else
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
36 #endif
37
38 namespace folly {
39
40 class Timekeeper;
41
42 namespace detail {
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
45 #else
46 typedef folly::Baton<> FutureBatonType;
47 #endif
48 }
49
50 namespace detail {
51   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52 }
53
54 template <class T>
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56   other.core_ = nullptr;
57 }
58
59 template <class T>
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61   std::swap(core_, other.core_);
62   return *this;
63 }
64
65 template <class T>
66 template <typename U, typename>
67 Future<T>::Future(Future<U>&& other) noexcept
68     : core_(detail::Core<T>::convert(other.core_)) {
69   other.core_ = nullptr;
70 }
71
72 template <class T>
73 template <typename U, typename>
74 Future<T>& Future<T>::operator=(Future<U>&& other) noexcept {
75   std::swap(core_, detail::Core<T>::convert(other.core_));
76   return *this;
77 }
78
79 template <class T>
80 template <class T2, typename>
81 Future<T>::Future(T2&& val)
82     : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
83
84 template <class T>
85 template <typename T2>
86 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
87     : core_(new detail::Core<T>(Try<T>(T()))) {}
88
89 template <class T>
90 Future<T>::~Future() {
91   detach();
92 }
93
94 template <class T>
95 void Future<T>::detach() {
96   if (core_) {
97     core_->detachFuture();
98     core_ = nullptr;
99   }
100 }
101
102 template <class T>
103 void Future<T>::throwIfInvalid() const {
104   if (!core_)
105     throw NoState();
106 }
107
108 template <class T>
109 template <class F>
110 void Future<T>::setCallback_(F&& func) {
111   throwIfInvalid();
112   core_->setCallback(std::forward<F>(func));
113 }
114
115 // unwrap
116
117 template <class T>
118 template <class F>
119 typename std::enable_if<isFuture<F>::value,
120                         Future<typename isFuture<T>::Inner>>::type
121 Future<T>::unwrap() {
122   return then([](Future<typename isFuture<T>::Inner> internal_future) {
123       return internal_future;
124   });
125 }
126
127 // then
128
129 // Variant: returns a value
130 // e.g. f.then([](Try<T>&& t){ return t.value(); });
131 template <class T>
132 template <typename F, typename R, bool isTry, typename... Args>
133 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
134 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
135   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
136   typedef typename R::ReturnsFuture::Inner B;
137
138   throwIfInvalid();
139
140   Promise<B> p;
141   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
142
143   // grab the Future now before we lose our handle on the Promise
144   auto f = p.getFuture();
145   f.core_->setExecutorNoLock(getExecutor());
146
147   /* This is a bit tricky.
148
149      We can't just close over *this in case this Future gets moved. So we
150      make a new dummy Future. We could figure out something more
151      sophisticated that avoids making a new Future object when it can, as an
152      optimization. But this is correct.
153
154      core_ can't be moved, it is explicitly disallowed (as is copying). But
155      if there's ever a reason to allow it, this is one place that makes that
156      assumption and would need to be fixed. We use a standard shared pointer
157      for core_ (by copying it in), which means in essence obj holds a shared
158      pointer to itself.  But this shouldn't leak because Promise will not
159      outlive the continuation, because Promise will setException() with a
160      broken Promise if it is destructed before completed. We could use a
161      weak pointer but it would have to be converted to a shared pointer when
162      func is executed (because the Future returned by func may possibly
163      persist beyond the callback, if it gets moved), and so it is an
164      optimization to just make it shared from the get-go.
165
166      Two subtle but important points about this design. detail::Core has no
167      back pointers to Future or Promise, so if Future or Promise get moved
168      (and they will be moved in performant code) we don't have to do
169      anything fancy. And because we store the continuation in the
170      detail::Core, not in the Future, we can execute the continuation even
171      after the Future has gone out of scope. This is an intentional design
172      decision. It is likely we will want to be able to cancel a continuation
173      in some circumstances, but I think it should be explicit not implicit
174      in the destruction of the Future used to create it.
175      */
176   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
177       Try<T> && t) mutable {
178     if (!isTry && t.hasException()) {
179       pm.setException(std::move(t.exception()));
180     } else {
181       pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
182     }
183   });
184
185   return f;
186 }
187
188 // Variant: returns a Future
189 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
190 template <class T>
191 template <typename F, typename R, bool isTry, typename... Args>
192 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
193 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
194   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
195   typedef typename R::ReturnsFuture::Inner B;
196
197   throwIfInvalid();
198
199   Promise<B> p;
200   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
201
202   // grab the Future now before we lose our handle on the Promise
203   auto f = p.getFuture();
204   f.core_->setExecutorNoLock(getExecutor());
205
206   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
207       Try<T> && t) mutable {
208     if (!isTry && t.hasException()) {
209       pm.setException(std::move(t.exception()));
210     } else {
211       try {
212         auto f2 = funcm(t.template get<isTry, Args>()...);
213         // that didn't throw, now we can steal p
214         f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
215           p.setTry(std::move(b));
216         });
217       } catch (const std::exception& e) {
218         pm.setException(exception_wrapper(std::current_exception(), e));
219       } catch (...) {
220         pm.setException(exception_wrapper(std::current_exception()));
221       }
222     }
223   });
224
225   return f;
226 }
227
228 template <typename T>
229 template <typename R, typename Caller, typename... Args>
230   Future<typename isFuture<R>::Inner>
231 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
232   typedef typename std::remove_cv<
233     typename std::remove_reference<
234       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
235   return then([instance, func](Try<T>&& t){
236     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
237   });
238 }
239
240 template <class T>
241 template <class Executor, class Arg, class... Args>
242 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
243   -> decltype(this->then(std::forward<Arg>(arg),
244                          std::forward<Args>(args)...))
245 {
246   auto oldX = getExecutor();
247   setExecutor(x);
248   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
249                via(oldX);
250 }
251
252 template <class T>
253 Future<Unit> Future<T>::then() {
254   return then([] () {});
255 }
256
257 // onError where the callback returns T
258 template <class T>
259 template <class F>
260 typename std::enable_if<
261   !detail::callableWith<F, exception_wrapper>::value &&
262   !detail::Extract<F>::ReturnsFuture::value,
263   Future<T>>::type
264 Future<T>::onError(F&& func) {
265   typedef typename detail::Extract<F>::FirstArg Exn;
266   static_assert(
267       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
268       "Return type of onError callback must be T or Future<T>");
269
270   Promise<T> p;
271   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
272   auto f = p.getFuture();
273
274   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
275       Try<T> && t) mutable {
276     if (!t.template withException<Exn>(
277             [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
278       pm.setTry(std::move(t));
279     }
280   });
281
282   return f;
283 }
284
285 // onError where the callback returns Future<T>
286 template <class T>
287 template <class F>
288 typename std::enable_if<
289   !detail::callableWith<F, exception_wrapper>::value &&
290   detail::Extract<F>::ReturnsFuture::value,
291   Future<T>>::type
292 Future<T>::onError(F&& func) {
293   static_assert(
294       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
295       "Return type of onError callback must be T or Future<T>");
296   typedef typename detail::Extract<F>::FirstArg Exn;
297
298   Promise<T> p;
299   auto f = p.getFuture();
300
301   setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
302       Try<T> && t) mutable {
303     if (!t.template withException<Exn>([&](Exn& e) {
304           try {
305             auto f2 = funcm(e);
306             f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
307               pm.setTry(std::move(t2));
308             });
309           } catch (const std::exception& e2) {
310             pm.setException(exception_wrapper(std::current_exception(), e2));
311           } catch (...) {
312             pm.setException(exception_wrapper(std::current_exception()));
313           }
314         })) {
315       pm.setTry(std::move(t));
316     }
317   });
318
319   return f;
320 }
321
322 template <class T>
323 template <class F>
324 Future<T> Future<T>::ensure(F&& func) {
325   return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
326     funcw();
327     return makeFuture(std::move(t));
328   });
329 }
330
331 template <class T>
332 template <class F>
333 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
334   return within(dur, tk).onError([funcw = std::forward<F>(func)](
335       TimedOut const&) { return funcw(); });
336 }
337
338 template <class T>
339 template <class F>
340 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
341                             detail::Extract<F>::ReturnsFuture::value,
342                         Future<T>>::type
343 Future<T>::onError(F&& func) {
344   static_assert(
345       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
346       "Return type of onError callback must be T or Future<T>");
347
348   Promise<T> p;
349   auto f = p.getFuture();
350   setCallback_(
351       [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
352         if (t.hasException()) {
353           try {
354             auto f2 = funcm(std::move(t.exception()));
355             f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
356               pm.setTry(std::move(t2));
357             });
358           } catch (const std::exception& e2) {
359             pm.setException(exception_wrapper(std::current_exception(), e2));
360           } catch (...) {
361             pm.setException(exception_wrapper(std::current_exception()));
362           }
363         } else {
364           pm.setTry(std::move(t));
365         }
366       });
367
368   return f;
369 }
370
371 // onError(exception_wrapper) that returns T
372 template <class T>
373 template <class F>
374 typename std::enable_if<
375   detail::callableWith<F, exception_wrapper>::value &&
376   !detail::Extract<F>::ReturnsFuture::value,
377   Future<T>>::type
378 Future<T>::onError(F&& func) {
379   static_assert(
380       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
381       "Return type of onError callback must be T or Future<T>");
382
383   Promise<T> p;
384   auto f = p.getFuture();
385   setCallback_(
386       [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
387         if (t.hasException()) {
388           pm.setWith([&] { return funcm(std::move(t.exception())); });
389         } else {
390           pm.setTry(std::move(t));
391         }
392       });
393
394   return f;
395 }
396
397 template <class T>
398 typename std::add_lvalue_reference<T>::type Future<T>::value() {
399   throwIfInvalid();
400
401   return core_->getTry().value();
402 }
403
404 template <class T>
405 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
406   throwIfInvalid();
407
408   return core_->getTry().value();
409 }
410
411 template <class T>
412 Try<T>& Future<T>::getTry() {
413   throwIfInvalid();
414
415   return core_->getTry();
416 }
417
418 template <class T>
419 Optional<Try<T>> Future<T>::poll() {
420   Optional<Try<T>> o;
421   if (core_->ready()) {
422     o = std::move(core_->getTry());
423   }
424   return o;
425 }
426
427 template <class T>
428 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
429   throwIfInvalid();
430
431   setExecutor(executor, priority);
432
433   return std::move(*this);
434 }
435
436 template <class T>
437 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
438   throwIfInvalid();
439
440   Promise<T> p;
441   auto f = p.getFuture();
442   then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
443   return std::move(f).via(executor, priority);
444 }
445
446 template <class Func>
447 auto via(Executor* x, Func&& func)
448   -> Future<typename isFuture<decltype(func())>::Inner>
449 {
450   // TODO make this actually more performant. :-P #7260175
451   return via(x).then(std::forward<Func>(func));
452 }
453
454 template <class T>
455 bool Future<T>::isReady() const {
456   throwIfInvalid();
457   return core_->ready();
458 }
459
460 template <class T>
461 bool Future<T>::hasValue() {
462   return getTry().hasValue();
463 }
464
465 template <class T>
466 bool Future<T>::hasException() {
467   return getTry().hasException();
468 }
469
470 template <class T>
471 void Future<T>::raise(exception_wrapper exception) {
472   core_->raise(std::move(exception));
473 }
474
475 // makeFuture
476
477 template <class T>
478 Future<typename std::decay<T>::type> makeFuture(T&& t) {
479   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
480 }
481
482 inline // for multiple translation units
483 Future<Unit> makeFuture() {
484   return makeFuture(Unit{});
485 }
486
487 // makeFutureWith(Future<T>()) -> Future<T>
488 template <class F>
489 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
490                         typename std::result_of<F()>::type>::type
491 makeFutureWith(F&& func) {
492   using InnerType =
493       typename isFuture<typename std::result_of<F()>::type>::Inner;
494   try {
495     return func();
496   } catch (std::exception& e) {
497     return makeFuture<InnerType>(
498         exception_wrapper(std::current_exception(), e));
499   } catch (...) {
500     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
501   }
502 }
503
504 // makeFutureWith(T()) -> Future<T>
505 // makeFutureWith(void()) -> Future<Unit>
506 template <class F>
507 typename std::enable_if<
508     !(isFuture<typename std::result_of<F()>::type>::value),
509     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
510 makeFutureWith(F&& func) {
511   using LiftedResult =
512       typename Unit::Lift<typename std::result_of<F()>::type>::type;
513   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
514     return func();
515   }));
516 }
517
518 template <class T>
519 Future<T> makeFuture(std::exception_ptr const& e) {
520   return makeFuture(Try<T>(e));
521 }
522
523 template <class T>
524 Future<T> makeFuture(exception_wrapper ew) {
525   return makeFuture(Try<T>(std::move(ew)));
526 }
527
528 template <class T, class E>
529 typename std::enable_if<std::is_base_of<std::exception, E>::value,
530                         Future<T>>::type
531 makeFuture(E const& e) {
532   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
533 }
534
535 template <class T>
536 Future<T> makeFuture(Try<T>&& t) {
537   return Future<T>(new detail::Core<T>(std::move(t)));
538 }
539
540 // via
541 Future<Unit> via(Executor* executor, int8_t priority) {
542   return makeFuture().via(executor, priority);
543 }
544
545 // mapSetCallback calls func(i, Try<T>) when every future completes
546
547 template <class T, class InputIterator, class F>
548 void mapSetCallback(InputIterator first, InputIterator last, F func) {
549   for (size_t i = 0; first != last; ++first, ++i) {
550     first->setCallback_([func, i](Try<T>&& t) {
551       func(i, std::move(t));
552     });
553   }
554 }
555
556 // collectAll (variadic)
557
558 template <typename... Fs>
559 typename detail::CollectAllVariadicContext<
560   typename std::decay<Fs>::type::value_type...>::type
561 collectAll(Fs&&... fs) {
562   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
563     typename std::decay<Fs>::type::value_type...>>();
564   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
565     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
566   return ctx->p.getFuture();
567 }
568
569 // collectAll (iterator)
570
571 template <class InputIterator>
572 Future<
573   std::vector<
574   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
575 collectAll(InputIterator first, InputIterator last) {
576   typedef
577     typename std::iterator_traits<InputIterator>::value_type::value_type T;
578
579   struct CollectAllContext {
580     CollectAllContext(int n) : results(n) {}
581     ~CollectAllContext() {
582       p.setValue(std::move(results));
583     }
584     Promise<std::vector<Try<T>>> p;
585     std::vector<Try<T>> results;
586   };
587
588   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
589   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
590     ctx->results[i] = std::move(t);
591   });
592   return ctx->p.getFuture();
593 }
594
595 // collect (iterator)
596
597 namespace detail {
598
599 template <typename T>
600 struct CollectContext {
601   struct Nothing {
602     explicit Nothing(int /* n */) {}
603   };
604
605   using Result = typename std::conditional<
606     std::is_void<T>::value,
607     void,
608     std::vector<T>>::type;
609
610   using InternalResult = typename std::conditional<
611     std::is_void<T>::value,
612     Nothing,
613     std::vector<Optional<T>>>::type;
614
615   explicit CollectContext(int n) : result(n) {}
616   ~CollectContext() {
617     if (!threw.exchange(true)) {
618       // map Optional<T> -> T
619       std::vector<T> finalResult;
620       finalResult.reserve(result.size());
621       std::transform(result.begin(), result.end(),
622                      std::back_inserter(finalResult),
623                      [](Optional<T>& o) { return std::move(o.value()); });
624       p.setValue(std::move(finalResult));
625     }
626   }
627   inline void setPartialResult(size_t i, Try<T>& t) {
628     result[i] = std::move(t.value());
629   }
630   Promise<Result> p;
631   InternalResult result;
632   std::atomic<bool> threw {false};
633 };
634
635 }
636
637 template <class InputIterator>
638 Future<typename detail::CollectContext<
639   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
640 collect(InputIterator first, InputIterator last) {
641   typedef
642     typename std::iterator_traits<InputIterator>::value_type::value_type T;
643
644   auto ctx = std::make_shared<detail::CollectContext<T>>(
645     std::distance(first, last));
646   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
647     if (t.hasException()) {
648        if (!ctx->threw.exchange(true)) {
649          ctx->p.setException(std::move(t.exception()));
650        }
651      } else if (!ctx->threw) {
652        ctx->setPartialResult(i, t);
653      }
654   });
655   return ctx->p.getFuture();
656 }
657
658 // collect (variadic)
659
660 template <typename... Fs>
661 typename detail::CollectVariadicContext<
662   typename std::decay<Fs>::type::value_type...>::type
663 collect(Fs&&... fs) {
664   auto ctx = std::make_shared<detail::CollectVariadicContext<
665     typename std::decay<Fs>::type::value_type...>>();
666   detail::collectVariadicHelper<detail::CollectVariadicContext>(
667     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
668   return ctx->p.getFuture();
669 }
670
671 // collectAny (iterator)
672
673 template <class InputIterator>
674 Future<
675   std::pair<size_t,
676             Try<
677               typename
678               std::iterator_traits<InputIterator>::value_type::value_type>>>
679 collectAny(InputIterator first, InputIterator last) {
680   typedef
681     typename std::iterator_traits<InputIterator>::value_type::value_type T;
682
683   struct CollectAnyContext {
684     CollectAnyContext() {};
685     Promise<std::pair<size_t, Try<T>>> p;
686     std::atomic<bool> done {false};
687   };
688
689   auto ctx = std::make_shared<CollectAnyContext>();
690   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
691     if (!ctx->done.exchange(true)) {
692       ctx->p.setValue(std::make_pair(i, std::move(t)));
693     }
694   });
695   return ctx->p.getFuture();
696 }
697
698 // collectAnyWithoutException (iterator)
699
700 template <class InputIterator>
701 Future<std::pair<
702     size_t,
703     typename std::iterator_traits<InputIterator>::value_type::value_type>>
704 collectAnyWithoutException(InputIterator first, InputIterator last) {
705   typedef
706       typename std::iterator_traits<InputIterator>::value_type::value_type T;
707
708   struct CollectAnyWithoutExceptionContext {
709     CollectAnyWithoutExceptionContext(){};
710     Promise<std::pair<size_t, T>> p;
711     std::atomic<bool> done{false};
712     std::atomic<size_t> nFulfilled{0};
713     size_t nTotal;
714   };
715
716   auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
717   ctx->nTotal = std::distance(first, last);
718
719   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
720     if (!t.hasException() && !ctx->done.exchange(true)) {
721       ctx->p.setValue(std::make_pair(i, std::move(t.value())));
722     } else if (++ctx->nFulfilled == ctx->nTotal) {
723       ctx->p.setException(t.exception());
724     }
725   });
726   return ctx->p.getFuture();
727 }
728
729 // collectN (iterator)
730
731 template <class InputIterator>
732 Future<std::vector<std::pair<size_t, Try<typename
733   std::iterator_traits<InputIterator>::value_type::value_type>>>>
734 collectN(InputIterator first, InputIterator last, size_t n) {
735   typedef typename
736     std::iterator_traits<InputIterator>::value_type::value_type T;
737   typedef std::vector<std::pair<size_t, Try<T>>> V;
738
739   struct CollectNContext {
740     V v;
741     std::atomic<size_t> completed = {0};
742     Promise<V> p;
743   };
744   auto ctx = std::make_shared<CollectNContext>();
745
746   if (size_t(std::distance(first, last)) < n) {
747     ctx->p.setException(std::runtime_error("Not enough futures"));
748   } else {
749     // for each completed Future, increase count and add to vector, until we
750     // have n completed futures at which point we fulfil our Promise with the
751     // vector
752     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
753       auto c = ++ctx->completed;
754       if (c <= n) {
755         assert(ctx->v.size() < n);
756         ctx->v.emplace_back(i, std::move(t));
757         if (c == n) {
758           ctx->p.setTry(Try<V>(std::move(ctx->v)));
759         }
760       }
761     });
762   }
763
764   return ctx->p.getFuture();
765 }
766
767 // reduce (iterator)
768
769 template <class It, class T, class F>
770 Future<T> reduce(It first, It last, T&& initial, F&& func) {
771   if (first == last) {
772     return makeFuture(std::move(initial));
773   }
774
775   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
776   typedef
777       typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
778                                 Try<ItT>,
779                                 ItT>::type Arg;
780   typedef isTry<Arg> IsTry;
781
782   auto sfunc = std::make_shared<F>(std::move(func));
783
784   auto f = first->then(
785       [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
786         return (*sfunc)(
787             std::move(minitial), head.template get<IsTry::value, Arg&&>());
788       });
789
790   for (++first; first != last; ++first) {
791     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
792       return (*sfunc)(std::move(std::get<0>(t).value()),
793                   // Either return a ItT&& or a Try<ItT>&& depending
794                   // on the type of the argument of func.
795                   std::get<1>(t).template get<IsTry::value, Arg&&>());
796     });
797   }
798
799   return f;
800 }
801
802 // window (collection)
803
804 template <class Collection, class F, class ItT, class Result>
805 std::vector<Future<Result>>
806 window(Collection input, F func, size_t n) {
807   struct WindowContext {
808     WindowContext(Collection&& i, F&& fn)
809         : input_(std::move(i)), promises_(input_.size()),
810           func_(std::move(fn))
811       {}
812     std::atomic<size_t> i_ {0};
813     Collection input_;
814     std::vector<Promise<Result>> promises_;
815     F func_;
816
817     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
818       size_t i = ctx->i_++;
819       if (i < ctx->input_.size()) {
820         // Using setCallback_ directly since we don't need the Future
821         ctx->func_(std::move(ctx->input_[i])).setCallback_(
822           // ctx is captured by value
823           [ctx, i](Try<Result>&& t) {
824             ctx->promises_[i].setTry(std::move(t));
825             // Chain another future onto this one
826             spawn(std::move(ctx));
827           });
828       }
829     }
830   };
831
832   auto max = std::min(n, input.size());
833
834   auto ctx = std::make_shared<WindowContext>(
835     std::move(input), std::move(func));
836
837   for (size_t i = 0; i < max; ++i) {
838     // Start the first n Futures
839     WindowContext::spawn(ctx);
840   }
841
842   std::vector<Future<Result>> futures;
843   futures.reserve(ctx->promises_.size());
844   for (auto& promise : ctx->promises_) {
845     futures.emplace_back(promise.getFuture());
846   }
847
848   return futures;
849 }
850
851 // reduce
852
853 template <class T>
854 template <class I, class F>
855 Future<I> Future<T>::reduce(I&& initial, F&& func) {
856   return then([
857     minitial = std::forward<I>(initial),
858     mfunc = std::forward<F>(func)
859   ](T& vals) mutable {
860     auto ret = std::move(minitial);
861     for (auto& val : vals) {
862       ret = mfunc(std::move(ret), std::move(val));
863     }
864     return ret;
865   });
866 }
867
868 // unorderedReduce (iterator)
869
870 template <class It, class T, class F, class ItT, class Arg>
871 Future<T> unorderedReduce(It first, It last, T initial, F func) {
872   if (first == last) {
873     return makeFuture(std::move(initial));
874   }
875
876   typedef isTry<Arg> IsTry;
877
878   struct UnorderedReduceContext {
879     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
880         : lock_(), memo_(makeFuture<T>(std::move(memo))),
881           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
882       {};
883     folly::MicroSpinLock lock_; // protects memo_ and numThens_
884     Future<T> memo_;
885     F func_;
886     size_t numThens_; // how many Futures completed and called .then()
887     size_t numFutures_; // how many Futures in total
888     Promise<T> promise_;
889   };
890
891   auto ctx = std::make_shared<UnorderedReduceContext>(
892     std::move(initial), std::move(func), std::distance(first, last));
893
894   mapSetCallback<ItT>(
895       first,
896       last,
897       [ctx](size_t /* i */, Try<ItT>&& t) {
898         // Futures can be completed in any order, simultaneously.
899         // To make this non-blocking, we create a new Future chain in
900         // the order of completion to reduce the values.
901         // The spinlock just protects chaining a new Future, not actually
902         // executing the reduce, which should be really fast.
903         folly::MSLGuard lock(ctx->lock_);
904         ctx->memo_ =
905             ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
906               // Either return a ItT&& or a Try<ItT>&& depending
907               // on the type of the argument of func.
908               return ctx->func_(std::move(v),
909                                 mt.template get<IsTry::value, Arg&&>());
910             });
911         if (++ctx->numThens_ == ctx->numFutures_) {
912           // After reducing the value of the last Future, fulfill the Promise
913           ctx->memo_.setCallback_(
914               [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
915         }
916       });
917
918   return ctx->promise_.getFuture();
919 }
920
921 // within
922
923 template <class T>
924 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
925   return within(dur, TimedOut(), tk);
926 }
927
928 template <class T>
929 template <class E>
930 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
931
932   struct Context {
933     Context(E ex) : exception(std::move(ex)), promise() {}
934     E exception;
935     Future<Unit> thisFuture;
936     Promise<T> promise;
937     std::atomic<bool> token {false};
938   };
939
940   std::shared_ptr<Timekeeper> tks;
941   if (!tk) {
942     tks = folly::detail::getTimekeeperSingleton();
943     tk = DCHECK_NOTNULL(tks.get());
944   }
945
946   auto ctx = std::make_shared<Context>(std::move(e));
947
948   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
949     // TODO: "this" completed first, cancel "after"
950     if (ctx->token.exchange(true) == false) {
951       ctx->promise.setTry(std::move(t));
952     }
953   });
954
955   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
956     // "after" completed first, cancel "this"
957     ctx->thisFuture.raise(TimedOut());
958     if (ctx->token.exchange(true) == false) {
959       if (t.hasException()) {
960         ctx->promise.setException(std::move(t.exception()));
961       } else {
962         ctx->promise.setException(std::move(ctx->exception));
963       }
964     }
965   });
966
967   return ctx->promise.getFuture().via(getExecutor());
968 }
969
970 // delayed
971
972 template <class T>
973 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
974   return collectAll(*this, futures::sleep(dur, tk))
975       .then([](std::tuple<Try<T>, Try<Unit>> tup) {
976         Try<T>& t = std::get<0>(tup);
977         return makeFuture<T>(std::move(t));
978       })
979       .via(getExecutor());
980 }
981
982 namespace detail {
983
984 template <class T>
985 void waitImpl(Future<T>& f) {
986   // short-circuit if there's nothing to do
987   if (f.isReady()) return;
988
989   FutureBatonType baton;
990   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
991   baton.wait();
992   assert(f.isReady());
993 }
994
995 template <class T>
996 void waitImpl(Future<T>& f, Duration dur) {
997   // short-circuit if there's nothing to do
998   if (f.isReady()) {
999     return;
1000   }
1001
1002   Promise<T> promise;
1003   auto ret = promise.getFuture();
1004   auto baton = std::make_shared<FutureBatonType>();
1005   f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1006     promise.setTry(std::move(t));
1007     baton->post();
1008   });
1009   f = std::move(ret);
1010   if (baton->timed_wait(dur)) {
1011     assert(f.isReady());
1012   }
1013 }
1014
1015 template <class T>
1016 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1017   // Set callback so to ensure that the via executor has something on it
1018   // so that once the preceding future triggers this callback, drive will
1019   // always have a callback to satisfy it
1020   if (f.isReady())
1021     return;
1022   f = f.via(e).then([](T&& t) { return std::move(t); });
1023   while (!f.isReady()) {
1024     e->drive();
1025   }
1026   assert(f.isReady());
1027 }
1028
1029 } // detail
1030
1031 template <class T>
1032 Future<T>& Future<T>::wait() & {
1033   detail::waitImpl(*this);
1034   return *this;
1035 }
1036
1037 template <class T>
1038 Future<T>&& Future<T>::wait() && {
1039   detail::waitImpl(*this);
1040   return std::move(*this);
1041 }
1042
1043 template <class T>
1044 Future<T>& Future<T>::wait(Duration dur) & {
1045   detail::waitImpl(*this, dur);
1046   return *this;
1047 }
1048
1049 template <class T>
1050 Future<T>&& Future<T>::wait(Duration dur) && {
1051   detail::waitImpl(*this, dur);
1052   return std::move(*this);
1053 }
1054
1055 template <class T>
1056 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1057   detail::waitViaImpl(*this, e);
1058   return *this;
1059 }
1060
1061 template <class T>
1062 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1063   detail::waitViaImpl(*this, e);
1064   return std::move(*this);
1065 }
1066
1067 template <class T>
1068 T Future<T>::get() {
1069   return std::move(wait().value());
1070 }
1071
1072 template <class T>
1073 T Future<T>::get(Duration dur) {
1074   wait(dur);
1075   if (isReady()) {
1076     return std::move(value());
1077   } else {
1078     throw TimedOut();
1079   }
1080 }
1081
1082 template <class T>
1083 T Future<T>::getVia(DrivableExecutor* e) {
1084   return std::move(waitVia(e).value());
1085 }
1086
1087 namespace detail {
1088   template <class T>
1089   struct TryEquals {
1090     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1091       return t1.value() == t2.value();
1092     }
1093   };
1094 }
1095
1096 template <class T>
1097 Future<bool> Future<T>::willEqual(Future<T>& f) {
1098   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1099     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1100       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1101     } else {
1102       return false;
1103       }
1104   });
1105 }
1106
1107 template <class T>
1108 template <class F>
1109 Future<T> Future<T>::filter(F&& predicate) {
1110   return this->then([p = std::forward<F>(predicate)](T val) {
1111     T const& valConstRef = val;
1112     if (!p(valConstRef)) {
1113       throw PredicateDoesNotObtain();
1114     }
1115     return val;
1116   });
1117 }
1118
1119 template <class T>
1120 template <class Callback>
1121 auto Future<T>::thenMulti(Callback&& fn)
1122     -> decltype(this->then(std::forward<Callback>(fn))) {
1123   // thenMulti with one callback is just a then
1124   return then(std::forward<Callback>(fn));
1125 }
1126
1127 template <class T>
1128 template <class Callback, class... Callbacks>
1129 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1130     -> decltype(this->then(std::forward<Callback>(fn)).
1131                       thenMulti(std::forward<Callbacks>(fns)...)) {
1132   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1133   return then(std::forward<Callback>(fn)).
1134          thenMulti(std::forward<Callbacks>(fns)...);
1135 }
1136
1137 template <class T>
1138 template <class Callback, class... Callbacks>
1139 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1140                                       Callbacks&&... fns)
1141     -> decltype(this->then(std::forward<Callback>(fn)).
1142                       thenMulti(std::forward<Callbacks>(fns)...)) {
1143   // thenMultiExecutor with two callbacks is
1144   // via(x).then(a).thenMulti(b, ...).via(oldX)
1145   auto oldX = getExecutor();
1146   setExecutor(x);
1147   return then(std::forward<Callback>(fn)).
1148          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1149 }
1150
1151 template <class T>
1152 template <class Callback>
1153 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1154     -> decltype(this->then(std::forward<Callback>(fn))) {
1155   // thenMulti with one callback is just a then with an executor
1156   return then(x, std::forward<Callback>(fn));
1157 }
1158
1159 template <class F>
1160 inline Future<Unit> when(bool p, F&& thunk) {
1161   return p ? std::forward<F>(thunk)().unit() : makeFuture();
1162 }
1163
1164 template <class P, class F>
1165 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1166   if (predicate()) {
1167     auto future = thunk();
1168     return future.then([
1169       predicate = std::forward<P>(predicate),
1170       thunk = std::forward<F>(thunk)
1171     ]() mutable {
1172       return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1173     });
1174   }
1175   return makeFuture();
1176 }
1177
1178 template <class F>
1179 Future<Unit> times(const int n, F&& thunk) {
1180   return folly::whileDo(
1181       [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
1182         return count->fetch_add(1) < n;
1183       },
1184       std::forward<F>(thunk));
1185 }
1186
1187 namespace futures {
1188   template <class It, class F, class ItT, class Result>
1189   std::vector<Future<Result>> map(It first, It last, F func) {
1190     std::vector<Future<Result>> results;
1191     for (auto it = first; it != last; it++) {
1192       results.push_back(it->then(func));
1193     }
1194     return results;
1195   }
1196 }
1197
1198 namespace futures {
1199
1200 namespace detail {
1201
1202 struct retrying_policy_raw_tag {};
1203 struct retrying_policy_fut_tag {};
1204
1205 template <class Policy>
1206 struct retrying_policy_traits {
1207   using ew = exception_wrapper;
1208   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1209   template <class Ret>
1210   using has_op = typename std::integral_constant<bool,
1211         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1212         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1213   using is_raw = has_op<bool>;
1214   using is_fut = has_op<Future<bool>>;
1215   using tag = typename std::conditional<
1216         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1217         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1218 };
1219
1220 template <class Policy, class FF>
1221 typename std::result_of<FF(size_t)>::type
1222 retrying(size_t k, Policy&& p, FF&& ff) {
1223   using F = typename std::result_of<FF(size_t)>::type;
1224   using T = typename F::value_type;
1225   auto f = ff(k++);
1226   return f.onError(
1227       [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
1228           exception_wrapper x) mutable {
1229         auto q = pm(k, x);
1230         return q.then(
1231             [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
1232                 bool r) mutable {
1233               return r ? retrying(k, std::move(pm), std::move(ffm))
1234                        : makeFuture<T>(std::move(xm));
1235             });
1236       });
1237 }
1238
1239 template <class Policy, class FF>
1240 typename std::result_of<FF(size_t)>::type
1241 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1242   auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1243     return makeFuture<bool>(pm(k, x));
1244   };
1245   return retrying(0, std::move(q), std::forward<FF>(ff));
1246 }
1247
1248 template <class Policy, class FF>
1249 typename std::result_of<FF(size_t)>::type
1250 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1251   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1252 }
1253
1254 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1255 template <class URNG>
1256 Duration retryingJitteredExponentialBackoffDur(
1257     size_t n,
1258     Duration backoff_min,
1259     Duration backoff_max,
1260     double jitter_param,
1261     URNG& rng) {
1262   using d = Duration;
1263   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1264   auto jitter = std::exp(dist(rng));
1265   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1266   return std::max(backoff_min, std::min(backoff_max, backoff));
1267 }
1268
1269 template <class Policy, class URNG>
1270 std::function<Future<bool>(size_t, const exception_wrapper&)>
1271 retryingPolicyCappedJitteredExponentialBackoff(
1272     size_t max_tries,
1273     Duration backoff_min,
1274     Duration backoff_max,
1275     double jitter_param,
1276     URNG&& rng,
1277     Policy&& p) {
1278   return [
1279     pm = std::forward<Policy>(p),
1280     max_tries,
1281     backoff_min,
1282     backoff_max,
1283     jitter_param,
1284     rngp = std::forward<URNG>(rng)
1285   ](size_t n, const exception_wrapper& ex) mutable {
1286     if (n == max_tries) {
1287       return makeFuture(false);
1288     }
1289     return pm(n, ex).then(
1290         [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1291             bool v) mutable {
1292           if (!v) {
1293             return makeFuture(false);
1294           }
1295           auto backoff = detail::retryingJitteredExponentialBackoffDur(
1296               n, backoff_min, backoff_max, jitter_param, rngp);
1297           return futures::sleep(backoff).then([] { return true; });
1298         });
1299   };
1300 }
1301
1302 template <class Policy, class URNG>
1303 std::function<Future<bool>(size_t, const exception_wrapper&)>
1304 retryingPolicyCappedJitteredExponentialBackoff(
1305     size_t max_tries,
1306     Duration backoff_min,
1307     Duration backoff_max,
1308     double jitter_param,
1309     URNG&& rng,
1310     Policy&& p,
1311     retrying_policy_raw_tag) {
1312   auto q = [pm = std::forward<Policy>(p)](
1313       size_t n, const exception_wrapper& e) {
1314     return makeFuture(pm(n, e));
1315   };
1316   return retryingPolicyCappedJitteredExponentialBackoff(
1317       max_tries,
1318       backoff_min,
1319       backoff_max,
1320       jitter_param,
1321       std::forward<URNG>(rng),
1322       std::move(q));
1323 }
1324
1325 template <class Policy, class URNG>
1326 std::function<Future<bool>(size_t, const exception_wrapper&)>
1327 retryingPolicyCappedJitteredExponentialBackoff(
1328     size_t max_tries,
1329     Duration backoff_min,
1330     Duration backoff_max,
1331     double jitter_param,
1332     URNG&& rng,
1333     Policy&& p,
1334     retrying_policy_fut_tag) {
1335   return retryingPolicyCappedJitteredExponentialBackoff(
1336       max_tries,
1337       backoff_min,
1338       backoff_max,
1339       jitter_param,
1340       std::forward<URNG>(rng),
1341       std::forward<Policy>(p));
1342 }
1343 }
1344
1345 template <class Policy, class FF>
1346 typename std::result_of<FF(size_t)>::type
1347 retrying(Policy&& p, FF&& ff) {
1348   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1349   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1350 }
1351
1352 inline
1353 std::function<bool(size_t, const exception_wrapper&)>
1354 retryingPolicyBasic(
1355     size_t max_tries) {
1356   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1357 }
1358
1359 template <class Policy, class URNG>
1360 std::function<Future<bool>(size_t, const exception_wrapper&)>
1361 retryingPolicyCappedJitteredExponentialBackoff(
1362     size_t max_tries,
1363     Duration backoff_min,
1364     Duration backoff_max,
1365     double jitter_param,
1366     URNG&& rng,
1367     Policy&& p) {
1368   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1369   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1370       max_tries,
1371       backoff_min,
1372       backoff_max,
1373       jitter_param,
1374       std::forward<URNG>(rng),
1375       std::forward<Policy>(p),
1376       tag());
1377 }
1378
1379 inline
1380 std::function<Future<bool>(size_t, const exception_wrapper&)>
1381 retryingPolicyCappedJitteredExponentialBackoff(
1382     size_t max_tries,
1383     Duration backoff_min,
1384     Duration backoff_max,
1385     double jitter_param) {
1386   auto p = [](size_t, const exception_wrapper&) { return true; };
1387   return retryingPolicyCappedJitteredExponentialBackoff(
1388       max_tries,
1389       backoff_min,
1390       backoff_max,
1391       jitter_param,
1392       ThreadLocalPRNG(),
1393       std::move(p));
1394 }
1395
1396 }
1397
1398 // Instantiate the most common Future types to save compile time
1399 extern template class Future<Unit>;
1400 extern template class Future<bool>;
1401 extern template class Future<int>;
1402 extern template class Future<int64_t>;
1403 extern template class Future<std::string>;
1404 extern template class Future<double>;
1405
1406 } // namespace folly