Reverted commit D3979179
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
33 #else
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
36 #endif
37
38 namespace folly {
39
40 class Timekeeper;
41
42 namespace detail {
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
45 #else
46 typedef folly::Baton<> FutureBatonType;
47 #endif
48 }
49
50 namespace detail {
51   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52 }
53
54 template <class T>
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56   other.core_ = nullptr;
57 }
58
59 template <class T>
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61   std::swap(core_, other.core_);
62   return *this;
63 }
64
65 template <class T>
66 template <typename U, typename>
67 Future<T>::Future(Future<U>&& other) noexcept
68     : core_(detail::Core<T>::convert(other.core_)) {
69   other.core_ = nullptr;
70 }
71
72 template <class T>
73 template <typename U, typename>
74 Future<T>& Future<T>::operator=(Future<U>&& other) noexcept {
75   std::swap(core_, detail::Core<T>::convert(other.core_));
76   return *this;
77 }
78
79 template <class T>
80 template <class T2, typename>
81 Future<T>::Future(T2&& val)
82     : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
83
84 template <class T>
85 template <typename T2>
86 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
87     : core_(new detail::Core<T>(Try<T>(T()))) {}
88
89 template <class T>
90 Future<T>::~Future() {
91   detach();
92 }
93
94 template <class T>
95 void Future<T>::detach() {
96   if (core_) {
97     core_->detachFuture();
98     core_ = nullptr;
99   }
100 }
101
102 template <class T>
103 void Future<T>::throwIfInvalid() const {
104   if (!core_)
105     throw NoState();
106 }
107
108 template <class T>
109 template <class F>
110 void Future<T>::setCallback_(F&& func) {
111   throwIfInvalid();
112   core_->setCallback(std::forward<F>(func));
113 }
114
115 // unwrap
116
117 template <class T>
118 template <class F>
119 typename std::enable_if<isFuture<F>::value,
120                         Future<typename isFuture<T>::Inner>>::type
121 Future<T>::unwrap() {
122   return then([](Future<typename isFuture<T>::Inner> internal_future) {
123       return internal_future;
124   });
125 }
126
127 // then
128
129 // Variant: returns a value
130 // e.g. f.then([](Try<T>&& t){ return t.value(); });
131 template <class T>
132 template <typename F, typename R, bool isTry, typename... Args>
133 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
134 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
135   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
136   typedef typename R::ReturnsFuture::Inner B;
137
138   throwIfInvalid();
139
140   Promise<B> p;
141   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
142
143   // grab the Future now before we lose our handle on the Promise
144   auto f = p.getFuture();
145   f.core_->setExecutorNoLock(getExecutor());
146
147   /* This is a bit tricky.
148
149      We can't just close over *this in case this Future gets moved. So we
150      make a new dummy Future. We could figure out something more
151      sophisticated that avoids making a new Future object when it can, as an
152      optimization. But this is correct.
153
154      core_ can't be moved, it is explicitly disallowed (as is copying). But
155      if there's ever a reason to allow it, this is one place that makes that
156      assumption and would need to be fixed. We use a standard shared pointer
157      for core_ (by copying it in), which means in essence obj holds a shared
158      pointer to itself.  But this shouldn't leak because Promise will not
159      outlive the continuation, because Promise will setException() with a
160      broken Promise if it is destructed before completed. We could use a
161      weak pointer but it would have to be converted to a shared pointer when
162      func is executed (because the Future returned by func may possibly
163      persist beyond the callback, if it gets moved), and so it is an
164      optimization to just make it shared from the get-go.
165
166      Two subtle but important points about this design. detail::Core has no
167      back pointers to Future or Promise, so if Future or Promise get moved
168      (and they will be moved in performant code) we don't have to do
169      anything fancy. And because we store the continuation in the
170      detail::Core, not in the Future, we can execute the continuation even
171      after the Future has gone out of scope. This is an intentional design
172      decision. It is likely we will want to be able to cancel a continuation
173      in some circumstances, but I think it should be explicit not implicit
174      in the destruction of the Future used to create it.
175      */
176   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
177       Try<T> && t) mutable {
178     if (!isTry && t.hasException()) {
179       pm.setException(std::move(t.exception()));
180     } else {
181       pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
182     }
183   });
184
185   return f;
186 }
187
188 // Variant: returns a Future
189 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
190 template <class T>
191 template <typename F, typename R, bool isTry, typename... Args>
192 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
193 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
194   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
195   typedef typename R::ReturnsFuture::Inner B;
196
197   throwIfInvalid();
198
199   Promise<B> p;
200   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
201
202   // grab the Future now before we lose our handle on the Promise
203   auto f = p.getFuture();
204   f.core_->setExecutorNoLock(getExecutor());
205
206   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
207       Try<T> && t) mutable {
208     if (!isTry && t.hasException()) {
209       pm.setException(std::move(t.exception()));
210     } else {
211       try {
212         auto f2 = funcm(t.template get<isTry, Args>()...);
213         // that didn't throw, now we can steal p
214         f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
215           p.setTry(std::move(b));
216         });
217       } catch (const std::exception& e) {
218         pm.setException(exception_wrapper(std::current_exception(), e));
219       } catch (...) {
220         pm.setException(exception_wrapper(std::current_exception()));
221       }
222     }
223   });
224
225   return f;
226 }
227
228 template <typename T>
229 template <typename R, typename Caller, typename... Args>
230   Future<typename isFuture<R>::Inner>
231 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
232   typedef typename std::remove_cv<
233     typename std::remove_reference<
234       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
235   return then([instance, func](Try<T>&& t){
236     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
237   });
238 }
239
240 template <class T>
241 template <class Executor, class Arg, class... Args>
242 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
243   -> decltype(this->then(std::forward<Arg>(arg),
244                          std::forward<Args>(args)...))
245 {
246   auto oldX = getExecutor();
247   setExecutor(x);
248   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
249                via(oldX);
250 }
251
252 template <class T>
253 Future<Unit> Future<T>::then() {
254   return then([] () {});
255 }
256
257 // onError where the callback returns T
258 template <class T>
259 template <class F>
260 typename std::enable_if<
261   !detail::callableWith<F, exception_wrapper>::value &&
262   !detail::Extract<F>::ReturnsFuture::value,
263   Future<T>>::type
264 Future<T>::onError(F&& func) {
265   typedef typename detail::Extract<F>::FirstArg Exn;
266   static_assert(
267       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
268       "Return type of onError callback must be T or Future<T>");
269
270   Promise<T> p;
271   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
272   auto f = p.getFuture();
273
274   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
275       Try<T> && t) mutable {
276     if (!t.template withException<Exn>(
277             [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
278       pm.setTry(std::move(t));
279     }
280   });
281
282   return f;
283 }
284
285 // onError where the callback returns Future<T>
286 template <class T>
287 template <class F>
288 typename std::enable_if<
289   !detail::callableWith<F, exception_wrapper>::value &&
290   detail::Extract<F>::ReturnsFuture::value,
291   Future<T>>::type
292 Future<T>::onError(F&& func) {
293   static_assert(
294       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
295       "Return type of onError callback must be T or Future<T>");
296   typedef typename detail::Extract<F>::FirstArg Exn;
297
298   Promise<T> p;
299   auto f = p.getFuture();
300
301   setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
302       Try<T> && t) mutable {
303     if (!t.template withException<Exn>([&](Exn& e) {
304           try {
305             auto f2 = funcm(e);
306             f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
307               pm.setTry(std::move(t2));
308             });
309           } catch (const std::exception& e2) {
310             pm.setException(exception_wrapper(std::current_exception(), e2));
311           } catch (...) {
312             pm.setException(exception_wrapper(std::current_exception()));
313           }
314         })) {
315       pm.setTry(std::move(t));
316     }
317   });
318
319   return f;
320 }
321
322 template <class T>
323 template <class F>
324 Future<T> Future<T>::ensure(F&& func) {
325   return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
326     funcw();
327     return makeFuture(std::move(t));
328   });
329 }
330
331 template <class T>
332 template <class F>
333 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
334   return within(dur, tk).onError([funcw = std::forward<F>(func)](
335       TimedOut const&) { return funcw(); });
336 }
337
338 template <class T>
339 template <class F>
340 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
341                             detail::Extract<F>::ReturnsFuture::value,
342                         Future<T>>::type
343 Future<T>::onError(F&& func) {
344   static_assert(
345       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
346       "Return type of onError callback must be T or Future<T>");
347
348   Promise<T> p;
349   auto f = p.getFuture();
350   setCallback_(
351       [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
352         if (t.hasException()) {
353           try {
354             auto f2 = funcm(std::move(t.exception()));
355             f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
356               pm.setTry(std::move(t2));
357             });
358           } catch (const std::exception& e2) {
359             pm.setException(exception_wrapper(std::current_exception(), e2));
360           } catch (...) {
361             pm.setException(exception_wrapper(std::current_exception()));
362           }
363         } else {
364           pm.setTry(std::move(t));
365         }
366       });
367
368   return f;
369 }
370
371 // onError(exception_wrapper) that returns T
372 template <class T>
373 template <class F>
374 typename std::enable_if<
375   detail::callableWith<F, exception_wrapper>::value &&
376   !detail::Extract<F>::ReturnsFuture::value,
377   Future<T>>::type
378 Future<T>::onError(F&& func) {
379   static_assert(
380       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
381       "Return type of onError callback must be T or Future<T>");
382
383   Promise<T> p;
384   auto f = p.getFuture();
385   setCallback_(
386       [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
387         if (t.hasException()) {
388           pm.setWith([&] { return funcm(std::move(t.exception())); });
389         } else {
390           pm.setTry(std::move(t));
391         }
392       });
393
394   return f;
395 }
396
397 template <class T>
398 typename std::add_lvalue_reference<T>::type Future<T>::value() {
399   throwIfInvalid();
400
401   return core_->getTry().value();
402 }
403
404 template <class T>
405 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
406   throwIfInvalid();
407
408   return core_->getTry().value();
409 }
410
411 template <class T>
412 Try<T>& Future<T>::getTry() {
413   throwIfInvalid();
414
415   return core_->getTry();
416 }
417
418 template <class T>
419 Optional<Try<T>> Future<T>::poll() {
420   Optional<Try<T>> o;
421   if (core_->ready()) {
422     o = std::move(core_->getTry());
423   }
424   return o;
425 }
426
427 template <class T>
428 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
429   throwIfInvalid();
430
431   setExecutor(executor, priority);
432
433   return std::move(*this);
434 }
435
436 template <class T>
437 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
438   throwIfInvalid();
439
440   Promise<T> p;
441   auto f = p.getFuture();
442   then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
443   return std::move(f).via(executor, priority);
444 }
445
446 template <class Func>
447 auto via(Executor* x, Func&& func)
448   -> Future<typename isFuture<decltype(func())>::Inner>
449 {
450   // TODO make this actually more performant. :-P #7260175
451   return via(x).then(std::forward<Func>(func));
452 }
453
454 template <class T>
455 bool Future<T>::isReady() const {
456   throwIfInvalid();
457   return core_->ready();
458 }
459
460 template <class T>
461 bool Future<T>::hasValue() {
462   return getTry().hasValue();
463 }
464
465 template <class T>
466 bool Future<T>::hasException() {
467   return getTry().hasException();
468 }
469
470 template <class T>
471 void Future<T>::raise(exception_wrapper exception) {
472   core_->raise(std::move(exception));
473 }
474
475 // makeFuture
476
477 template <class T>
478 Future<typename std::decay<T>::type> makeFuture(T&& t) {
479   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
480 }
481
482 inline // for multiple translation units
483 Future<Unit> makeFuture() {
484   return makeFuture(Unit{});
485 }
486
487 // makeFutureWith(Future<T>()) -> Future<T>
488 template <class F>
489 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
490                         typename std::result_of<F()>::type>::type
491 makeFutureWith(F&& func) {
492   using InnerType =
493       typename isFuture<typename std::result_of<F()>::type>::Inner;
494   try {
495     return func();
496   } catch (std::exception& e) {
497     return makeFuture<InnerType>(
498         exception_wrapper(std::current_exception(), e));
499   } catch (...) {
500     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
501   }
502 }
503
504 // makeFutureWith(T()) -> Future<T>
505 // makeFutureWith(void()) -> Future<Unit>
506 template <class F>
507 typename std::enable_if<
508     !(isFuture<typename std::result_of<F()>::type>::value),
509     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
510 makeFutureWith(F&& func) {
511   using LiftedResult =
512       typename Unit::Lift<typename std::result_of<F()>::type>::type;
513   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
514     return func();
515   }));
516 }
517
518 template <class T>
519 Future<T> makeFuture(std::exception_ptr const& e) {
520   return makeFuture(Try<T>(e));
521 }
522
523 template <class T>
524 Future<T> makeFuture(exception_wrapper ew) {
525   return makeFuture(Try<T>(std::move(ew)));
526 }
527
528 template <class T, class E>
529 typename std::enable_if<std::is_base_of<std::exception, E>::value,
530                         Future<T>>::type
531 makeFuture(E const& e) {
532   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
533 }
534
535 template <class T>
536 Future<T> makeFuture(Try<T>&& t) {
537   return Future<T>(new detail::Core<T>(std::move(t)));
538 }
539
540 // via
541 Future<Unit> via(Executor* executor, int8_t priority) {
542   return makeFuture().via(executor, priority);
543 }
544
545 // mapSetCallback calls func(i, Try<T>) when every future completes
546
547 template <class T, class InputIterator, class F>
548 void mapSetCallback(InputIterator first, InputIterator last, F func) {
549   for (size_t i = 0; first != last; ++first, ++i) {
550     first->setCallback_([func, i](Try<T>&& t) {
551       func(i, std::move(t));
552     });
553   }
554 }
555
556 // collectAll (variadic)
557
558 template <typename... Fs>
559 typename detail::CollectAllVariadicContext<
560   typename std::decay<Fs>::type::value_type...>::type
561 collectAll(Fs&&... fs) {
562   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
563     typename std::decay<Fs>::type::value_type...>>();
564   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
565     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
566   return ctx->p.getFuture();
567 }
568
569 // collectAll (iterator)
570
571 template <class InputIterator>
572 Future<
573   std::vector<
574   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
575 collectAll(InputIterator first, InputIterator last) {
576   typedef
577     typename std::iterator_traits<InputIterator>::value_type::value_type T;
578
579   struct CollectAllContext {
580     CollectAllContext(int n) : results(n) {}
581     ~CollectAllContext() {
582       p.setValue(std::move(results));
583     }
584     Promise<std::vector<Try<T>>> p;
585     std::vector<Try<T>> results;
586   };
587
588   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
589   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
590     ctx->results[i] = std::move(t);
591   });
592   return ctx->p.getFuture();
593 }
594
595 // collect (iterator)
596
597 namespace detail {
598
599 template <typename T>
600 struct CollectContext {
601   struct Nothing {
602     explicit Nothing(int /* n */) {}
603   };
604
605   using Result = typename std::conditional<
606     std::is_void<T>::value,
607     void,
608     std::vector<T>>::type;
609
610   using InternalResult = typename std::conditional<
611     std::is_void<T>::value,
612     Nothing,
613     std::vector<Optional<T>>>::type;
614
615   explicit CollectContext(int n) : result(n) {}
616   ~CollectContext() {
617     if (!threw.exchange(true)) {
618       // map Optional<T> -> T
619       std::vector<T> finalResult;
620       finalResult.reserve(result.size());
621       std::transform(result.begin(), result.end(),
622                      std::back_inserter(finalResult),
623                      [](Optional<T>& o) { return std::move(o.value()); });
624       p.setValue(std::move(finalResult));
625     }
626   }
627   inline void setPartialResult(size_t i, Try<T>& t) {
628     result[i] = std::move(t.value());
629   }
630   Promise<Result> p;
631   InternalResult result;
632   std::atomic<bool> threw {false};
633 };
634
635 }
636
637 template <class InputIterator>
638 Future<typename detail::CollectContext<
639   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
640 collect(InputIterator first, InputIterator last) {
641   typedef
642     typename std::iterator_traits<InputIterator>::value_type::value_type T;
643
644   auto ctx = std::make_shared<detail::CollectContext<T>>(
645     std::distance(first, last));
646   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
647     if (t.hasException()) {
648        if (!ctx->threw.exchange(true)) {
649          ctx->p.setException(std::move(t.exception()));
650        }
651      } else if (!ctx->threw) {
652        ctx->setPartialResult(i, t);
653      }
654   });
655   return ctx->p.getFuture();
656 }
657
658 // collect (variadic)
659
660 template <typename... Fs>
661 typename detail::CollectVariadicContext<
662   typename std::decay<Fs>::type::value_type...>::type
663 collect(Fs&&... fs) {
664   auto ctx = std::make_shared<detail::CollectVariadicContext<
665     typename std::decay<Fs>::type::value_type...>>();
666   detail::collectVariadicHelper<detail::CollectVariadicContext>(
667     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
668   return ctx->p.getFuture();
669 }
670
671 // collectAny (iterator)
672
673 template <class InputIterator>
674 Future<
675   std::pair<size_t,
676             Try<
677               typename
678               std::iterator_traits<InputIterator>::value_type::value_type>>>
679 collectAny(InputIterator first, InputIterator last) {
680   typedef
681     typename std::iterator_traits<InputIterator>::value_type::value_type T;
682
683   struct CollectAnyContext {
684     CollectAnyContext() {};
685     Promise<std::pair<size_t, Try<T>>> p;
686     std::atomic<bool> done {false};
687   };
688
689   auto ctx = std::make_shared<CollectAnyContext>();
690   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
691     if (!ctx->done.exchange(true)) {
692       ctx->p.setValue(std::make_pair(i, std::move(t)));
693     }
694   });
695   return ctx->p.getFuture();
696 }
697
698 // collectAnyWithoutException (iterator)
699
700 template <class InputIterator>
701 Future<std::pair<
702     size_t,
703     typename std::iterator_traits<InputIterator>::value_type::value_type>>
704 collectAnyWithoutException(InputIterator first, InputIterator last) {
705   typedef
706       typename std::iterator_traits<InputIterator>::value_type::value_type T;
707
708   struct CollectAnyWithoutExceptionContext {
709     CollectAnyWithoutExceptionContext(){};
710     Promise<std::pair<size_t, T>> p;
711     std::atomic<bool> done{false};
712     std::atomic<size_t> nFulfilled{0};
713     size_t nTotal;
714   };
715
716   auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
717   ctx->nTotal = std::distance(first, last);
718
719   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
720     if (!t.hasException() && !ctx->done.exchange(true)) {
721       ctx->p.setValue(std::make_pair(i, std::move(t.value())));
722     } else if (++ctx->nFulfilled == ctx->nTotal) {
723       ctx->p.setException(t.exception());
724     }
725   });
726   return ctx->p.getFuture();
727 }
728
729 // collectN (iterator)
730
731 template <class InputIterator>
732 Future<std::vector<std::pair<size_t, Try<typename
733   std::iterator_traits<InputIterator>::value_type::value_type>>>>
734 collectN(InputIterator first, InputIterator last, size_t n) {
735   typedef typename
736     std::iterator_traits<InputIterator>::value_type::value_type T;
737   typedef std::vector<std::pair<size_t, Try<T>>> V;
738
739   struct CollectNContext {
740     V v;
741     std::atomic<size_t> completed = {0};
742     Promise<V> p;
743   };
744   auto ctx = std::make_shared<CollectNContext>();
745
746   if (size_t(std::distance(first, last)) < n) {
747     ctx->p.setException(std::runtime_error("Not enough futures"));
748   } else {
749     // for each completed Future, increase count and add to vector, until we
750     // have n completed futures at which point we fulfil our Promise with the
751     // vector
752     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
753       auto c = ++ctx->completed;
754       if (c <= n) {
755         assert(ctx->v.size() < n);
756         ctx->v.emplace_back(i, std::move(t));
757         if (c == n) {
758           ctx->p.setTry(Try<V>(std::move(ctx->v)));
759         }
760       }
761     });
762   }
763
764   return ctx->p.getFuture();
765 }
766
767 // reduce (iterator)
768
769 template <class It, class T, class F>
770 Future<T> reduce(It first, It last, T&& initial, F&& func) {
771   if (first == last) {
772     return makeFuture(std::move(initial));
773   }
774
775   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
776   typedef
777       typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
778                                 Try<ItT>,
779                                 ItT>::type Arg;
780   typedef isTry<Arg> IsTry;
781
782   auto sfunc = std::make_shared<F>(std::move(func));
783
784   auto f = first->then(
785       [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
786         return (*sfunc)(
787             std::move(minitial), head.template get<IsTry::value, Arg&&>());
788       });
789
790   for (++first; first != last; ++first) {
791     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
792       return (*sfunc)(std::move(std::get<0>(t).value()),
793                   // Either return a ItT&& or a Try<ItT>&& depending
794                   // on the type of the argument of func.
795                   std::get<1>(t).template get<IsTry::value, Arg&&>());
796     });
797   }
798
799   return f;
800 }
801
802 // window (collection)
803
804 template <class Collection, class F, class ItT, class Result>
805 std::vector<Future<Result>>
806 window(Collection input, F func, size_t n) {
807   struct WindowContext {
808     WindowContext(Collection&& i, F&& fn)
809         : input_(std::move(i)), promises_(input_.size()),
810           func_(std::move(fn))
811       {}
812     std::atomic<size_t> i_ {0};
813     Collection input_;
814     std::vector<Promise<Result>> promises_;
815     F func_;
816
817     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
818       size_t i = ctx->i_++;
819       if (i < ctx->input_.size()) {
820         // Using setCallback_ directly since we don't need the Future
821         ctx->func_(std::move(ctx->input_[i])).setCallback_(
822           // ctx is captured by value
823           [ctx, i](Try<Result>&& t) {
824             ctx->promises_[i].setTry(std::move(t));
825             // Chain another future onto this one
826             spawn(std::move(ctx));
827           });
828       }
829     }
830   };
831
832   auto max = std::min(n, input.size());
833
834   auto ctx = std::make_shared<WindowContext>(
835     std::move(input), std::move(func));
836
837   for (size_t i = 0; i < max; ++i) {
838     // Start the first n Futures
839     WindowContext::spawn(ctx);
840   }
841
842   std::vector<Future<Result>> futures;
843   futures.reserve(ctx->promises_.size());
844   for (auto& promise : ctx->promises_) {
845     futures.emplace_back(promise.getFuture());
846   }
847
848   return futures;
849 }
850
851 // reduce
852
853 template <class T>
854 template <class I, class F>
855 Future<I> Future<T>::reduce(I&& initial, F&& func) {
856   return then([
857     minitial = std::forward<I>(initial),
858     mfunc = std::forward<F>(func)
859   ](T& vals) mutable {
860     auto ret = std::move(minitial);
861     for (auto& val : vals) {
862       ret = mfunc(std::move(ret), std::move(val));
863     }
864     return ret;
865   });
866 }
867
868 // unorderedReduce (iterator)
869
870 template <class It, class T, class F, class ItT, class Arg>
871 Future<T> unorderedReduce(It first, It last, T initial, F func) {
872   if (first == last) {
873     return makeFuture(std::move(initial));
874   }
875
876   typedef isTry<Arg> IsTry;
877
878   struct UnorderedReduceContext {
879     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
880         : lock_(), memo_(makeFuture<T>(std::move(memo))),
881           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
882       {};
883     folly::MicroSpinLock lock_; // protects memo_ and numThens_
884     Future<T> memo_;
885     F func_;
886     size_t numThens_; // how many Futures completed and called .then()
887     size_t numFutures_; // how many Futures in total
888     Promise<T> promise_;
889   };
890
891   auto ctx = std::make_shared<UnorderedReduceContext>(
892     std::move(initial), std::move(func), std::distance(first, last));
893
894   mapSetCallback<ItT>(
895       first,
896       last,
897       [ctx](size_t /* i */, Try<ItT>&& t) {
898         // Futures can be completed in any order, simultaneously.
899         // To make this non-blocking, we create a new Future chain in
900         // the order of completion to reduce the values.
901         // The spinlock just protects chaining a new Future, not actually
902         // executing the reduce, which should be really fast.
903         folly::MSLGuard lock(ctx->lock_);
904         ctx->memo_ =
905             ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
906               // Either return a ItT&& or a Try<ItT>&& depending
907               // on the type of the argument of func.
908               return ctx->func_(std::move(v),
909                                 mt.template get<IsTry::value, Arg&&>());
910             });
911         if (++ctx->numThens_ == ctx->numFutures_) {
912           // After reducing the value of the last Future, fulfill the Promise
913           ctx->memo_.setCallback_(
914               [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
915         }
916       });
917
918   return ctx->promise_.getFuture();
919 }
920
921 // within
922
923 template <class T>
924 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
925   return within(dur, TimedOut(), tk);
926 }
927
928 template <class T>
929 template <class E>
930 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
931
932   struct Context {
933     Context(E ex) : exception(std::move(ex)), promise() {}
934     E exception;
935     Future<Unit> thisFuture;
936     Promise<T> promise;
937     std::atomic<bool> token {false};
938   };
939
940   std::shared_ptr<Timekeeper> tks;
941   if (!tk) {
942     tks = folly::detail::getTimekeeperSingleton();
943     tk = DCHECK_NOTNULL(tks.get());
944   }
945
946   auto ctx = std::make_shared<Context>(std::move(e));
947
948   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
949     // TODO: "this" completed first, cancel "after"
950     if (ctx->token.exchange(true) == false) {
951       ctx->promise.setTry(std::move(t));
952     }
953   });
954
955   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
956     // "after" completed first, cancel "this"
957     ctx->thisFuture.raise(TimedOut());
958     if (ctx->token.exchange(true) == false) {
959       if (t.hasException()) {
960         ctx->promise.setException(std::move(t.exception()));
961       } else {
962         ctx->promise.setException(std::move(ctx->exception));
963       }
964     }
965   });
966
967   return ctx->promise.getFuture().via(getExecutor());
968 }
969
970 // delayed
971
972 template <class T>
973 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
974   return collectAll(*this, futures::sleep(dur, tk))
975     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
976       Try<T>& t = std::get<0>(tup);
977       return makeFuture<T>(std::move(t));
978     });
979 }
980
981 namespace detail {
982
983 template <class T>
984 void waitImpl(Future<T>& f) {
985   // short-circuit if there's nothing to do
986   if (f.isReady()) return;
987
988   FutureBatonType baton;
989   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
990   baton.wait();
991   assert(f.isReady());
992 }
993
994 template <class T>
995 void waitImpl(Future<T>& f, Duration dur) {
996   // short-circuit if there's nothing to do
997   if (f.isReady()) {
998     return;
999   }
1000
1001   Promise<T> promise;
1002   auto ret = promise.getFuture();
1003   auto baton = std::make_shared<FutureBatonType>();
1004   f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1005     promise.setTry(std::move(t));
1006     baton->post();
1007   });
1008   f = std::move(ret);
1009   if (baton->timed_wait(dur)) {
1010     assert(f.isReady());
1011   }
1012 }
1013
1014 template <class T>
1015 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1016   // Set callback so to ensure that the via executor has something on it
1017   // so that once the preceding future triggers this callback, drive will
1018   // always have a callback to satisfy it
1019   if (f.isReady())
1020     return;
1021   f = f.via(e).then([](T&& t) { return std::move(t); });
1022   while (!f.isReady()) {
1023     e->drive();
1024   }
1025   assert(f.isReady());
1026 }
1027
1028 } // detail
1029
1030 template <class T>
1031 Future<T>& Future<T>::wait() & {
1032   detail::waitImpl(*this);
1033   return *this;
1034 }
1035
1036 template <class T>
1037 Future<T>&& Future<T>::wait() && {
1038   detail::waitImpl(*this);
1039   return std::move(*this);
1040 }
1041
1042 template <class T>
1043 Future<T>& Future<T>::wait(Duration dur) & {
1044   detail::waitImpl(*this, dur);
1045   return *this;
1046 }
1047
1048 template <class T>
1049 Future<T>&& Future<T>::wait(Duration dur) && {
1050   detail::waitImpl(*this, dur);
1051   return std::move(*this);
1052 }
1053
1054 template <class T>
1055 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1056   detail::waitViaImpl(*this, e);
1057   return *this;
1058 }
1059
1060 template <class T>
1061 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1062   detail::waitViaImpl(*this, e);
1063   return std::move(*this);
1064 }
1065
1066 template <class T>
1067 T Future<T>::get() {
1068   return std::move(wait().value());
1069 }
1070
1071 template <class T>
1072 T Future<T>::get(Duration dur) {
1073   wait(dur);
1074   if (isReady()) {
1075     return std::move(value());
1076   } else {
1077     throw TimedOut();
1078   }
1079 }
1080
1081 template <class T>
1082 T Future<T>::getVia(DrivableExecutor* e) {
1083   return std::move(waitVia(e).value());
1084 }
1085
1086 namespace detail {
1087   template <class T>
1088   struct TryEquals {
1089     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1090       return t1.value() == t2.value();
1091     }
1092   };
1093 }
1094
1095 template <class T>
1096 Future<bool> Future<T>::willEqual(Future<T>& f) {
1097   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1098     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1099       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1100     } else {
1101       return false;
1102       }
1103   });
1104 }
1105
1106 template <class T>
1107 template <class F>
1108 Future<T> Future<T>::filter(F&& predicate) {
1109   return this->then([p = std::forward<F>(predicate)](T val) {
1110     T const& valConstRef = val;
1111     if (!p(valConstRef)) {
1112       throw PredicateDoesNotObtain();
1113     }
1114     return val;
1115   });
1116 }
1117
1118 template <class T>
1119 template <class Callback>
1120 auto Future<T>::thenMulti(Callback&& fn)
1121     -> decltype(this->then(std::forward<Callback>(fn))) {
1122   // thenMulti with one callback is just a then
1123   return then(std::forward<Callback>(fn));
1124 }
1125
1126 template <class T>
1127 template <class Callback, class... Callbacks>
1128 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1129     -> decltype(this->then(std::forward<Callback>(fn)).
1130                       thenMulti(std::forward<Callbacks>(fns)...)) {
1131   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1132   return then(std::forward<Callback>(fn)).
1133          thenMulti(std::forward<Callbacks>(fns)...);
1134 }
1135
1136 template <class T>
1137 template <class Callback, class... Callbacks>
1138 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1139                                       Callbacks&&... fns)
1140     -> decltype(this->then(std::forward<Callback>(fn)).
1141                       thenMulti(std::forward<Callbacks>(fns)...)) {
1142   // thenMultiExecutor with two callbacks is
1143   // via(x).then(a).thenMulti(b, ...).via(oldX)
1144   auto oldX = getExecutor();
1145   setExecutor(x);
1146   return then(std::forward<Callback>(fn)).
1147          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1148 }
1149
1150 template <class T>
1151 template <class Callback>
1152 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1153     -> decltype(this->then(std::forward<Callback>(fn))) {
1154   // thenMulti with one callback is just a then with an executor
1155   return then(x, std::forward<Callback>(fn));
1156 }
1157
1158 template <class F>
1159 inline Future<Unit> when(bool p, F&& thunk) {
1160   return p ? std::forward<F>(thunk)().unit() : makeFuture();
1161 }
1162
1163 template <class P, class F>
1164 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1165   if (predicate()) {
1166     auto future = thunk();
1167     return future.then([
1168       predicate = std::forward<P>(predicate),
1169       thunk = std::forward<F>(thunk)
1170     ]() mutable {
1171       return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1172     });
1173   }
1174   return makeFuture();
1175 }
1176
1177 template <class F>
1178 Future<Unit> times(const int n, F&& thunk) {
1179   return folly::whileDo(
1180       [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
1181         return count->fetch_add(1) < n;
1182       },
1183       std::forward<F>(thunk));
1184 }
1185
1186 namespace futures {
1187   template <class It, class F, class ItT, class Result>
1188   std::vector<Future<Result>> map(It first, It last, F func) {
1189     std::vector<Future<Result>> results;
1190     for (auto it = first; it != last; it++) {
1191       results.push_back(it->then(func));
1192     }
1193     return results;
1194   }
1195 }
1196
1197 namespace futures {
1198
1199 namespace detail {
1200
1201 struct retrying_policy_raw_tag {};
1202 struct retrying_policy_fut_tag {};
1203
1204 template <class Policy>
1205 struct retrying_policy_traits {
1206   using ew = exception_wrapper;
1207   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1208   template <class Ret>
1209   using has_op = typename std::integral_constant<bool,
1210         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1211         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1212   using is_raw = has_op<bool>;
1213   using is_fut = has_op<Future<bool>>;
1214   using tag = typename std::conditional<
1215         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1216         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1217 };
1218
1219 template <class Policy, class FF>
1220 typename std::result_of<FF(size_t)>::type
1221 retrying(size_t k, Policy&& p, FF&& ff) {
1222   using F = typename std::result_of<FF(size_t)>::type;
1223   using T = typename F::value_type;
1224   auto f = ff(k++);
1225   return f.onError(
1226       [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
1227           exception_wrapper x) mutable {
1228         auto q = pm(k, x);
1229         return q.then(
1230             [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
1231                 bool r) mutable {
1232               return r ? retrying(k, std::move(pm), std::move(ffm))
1233                        : makeFuture<T>(std::move(xm));
1234             });
1235       });
1236 }
1237
1238 template <class Policy, class FF>
1239 typename std::result_of<FF(size_t)>::type
1240 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1241   auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1242     return makeFuture<bool>(pm(k, x));
1243   };
1244   return retrying(0, std::move(q), std::forward<FF>(ff));
1245 }
1246
1247 template <class Policy, class FF>
1248 typename std::result_of<FF(size_t)>::type
1249 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1250   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1251 }
1252
1253 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1254 template <class URNG>
1255 Duration retryingJitteredExponentialBackoffDur(
1256     size_t n,
1257     Duration backoff_min,
1258     Duration backoff_max,
1259     double jitter_param,
1260     URNG& rng) {
1261   using d = Duration;
1262   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1263   auto jitter = std::exp(dist(rng));
1264   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1265   return std::max(backoff_min, std::min(backoff_max, backoff));
1266 }
1267
1268 template <class Policy, class URNG>
1269 std::function<Future<bool>(size_t, const exception_wrapper&)>
1270 retryingPolicyCappedJitteredExponentialBackoff(
1271     size_t max_tries,
1272     Duration backoff_min,
1273     Duration backoff_max,
1274     double jitter_param,
1275     URNG&& rng,
1276     Policy&& p) {
1277   return [
1278     pm = std::forward<Policy>(p),
1279     max_tries,
1280     backoff_min,
1281     backoff_max,
1282     jitter_param,
1283     rngp = std::forward<URNG>(rng)
1284   ](size_t n, const exception_wrapper& ex) mutable {
1285     if (n == max_tries) {
1286       return makeFuture(false);
1287     }
1288     return pm(n, ex).then(
1289         [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1290             bool v) mutable {
1291           if (!v) {
1292             return makeFuture(false);
1293           }
1294           auto backoff = detail::retryingJitteredExponentialBackoffDur(
1295               n, backoff_min, backoff_max, jitter_param, rngp);
1296           return futures::sleep(backoff).then([] { return true; });
1297         });
1298   };
1299 }
1300
1301 template <class Policy, class URNG>
1302 std::function<Future<bool>(size_t, const exception_wrapper&)>
1303 retryingPolicyCappedJitteredExponentialBackoff(
1304     size_t max_tries,
1305     Duration backoff_min,
1306     Duration backoff_max,
1307     double jitter_param,
1308     URNG&& rng,
1309     Policy&& p,
1310     retrying_policy_raw_tag) {
1311   auto q = [pm = std::forward<Policy>(p)](
1312       size_t n, const exception_wrapper& e) {
1313     return makeFuture(pm(n, e));
1314   };
1315   return retryingPolicyCappedJitteredExponentialBackoff(
1316       max_tries,
1317       backoff_min,
1318       backoff_max,
1319       jitter_param,
1320       std::forward<URNG>(rng),
1321       std::move(q));
1322 }
1323
1324 template <class Policy, class URNG>
1325 std::function<Future<bool>(size_t, const exception_wrapper&)>
1326 retryingPolicyCappedJitteredExponentialBackoff(
1327     size_t max_tries,
1328     Duration backoff_min,
1329     Duration backoff_max,
1330     double jitter_param,
1331     URNG&& rng,
1332     Policy&& p,
1333     retrying_policy_fut_tag) {
1334   return retryingPolicyCappedJitteredExponentialBackoff(
1335       max_tries,
1336       backoff_min,
1337       backoff_max,
1338       jitter_param,
1339       std::forward<URNG>(rng),
1340       std::forward<Policy>(p));
1341 }
1342 }
1343
1344 template <class Policy, class FF>
1345 typename std::result_of<FF(size_t)>::type
1346 retrying(Policy&& p, FF&& ff) {
1347   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1348   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1349 }
1350
1351 inline
1352 std::function<bool(size_t, const exception_wrapper&)>
1353 retryingPolicyBasic(
1354     size_t max_tries) {
1355   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1356 }
1357
1358 template <class Policy, class URNG>
1359 std::function<Future<bool>(size_t, const exception_wrapper&)>
1360 retryingPolicyCappedJitteredExponentialBackoff(
1361     size_t max_tries,
1362     Duration backoff_min,
1363     Duration backoff_max,
1364     double jitter_param,
1365     URNG&& rng,
1366     Policy&& p) {
1367   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1368   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1369       max_tries,
1370       backoff_min,
1371       backoff_max,
1372       jitter_param,
1373       std::forward<URNG>(rng),
1374       std::forward<Policy>(p),
1375       tag());
1376 }
1377
1378 inline
1379 std::function<Future<bool>(size_t, const exception_wrapper&)>
1380 retryingPolicyCappedJitteredExponentialBackoff(
1381     size_t max_tries,
1382     Duration backoff_min,
1383     Duration backoff_max,
1384     double jitter_param) {
1385   auto p = [](size_t, const exception_wrapper&) { return true; };
1386   return retryingPolicyCappedJitteredExponentialBackoff(
1387       max_tries,
1388       backoff_min,
1389       backoff_max,
1390       jitter_param,
1391       ThreadLocalPRNG(),
1392       std::move(p));
1393 }
1394
1395 }
1396
1397 // Instantiate the most common Future types to save compile time
1398 extern template class Future<Unit>;
1399 extern template class Future<bool>;
1400 extern template class Future<int>;
1401 extern template class Future<int64_t>;
1402 extern template class Future<std::string>;
1403 extern template class Future<double>;
1404
1405 } // namespace folly