2017
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
33 #else
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
36 #endif
37
38 namespace folly {
39
40 class Timekeeper;
41
42 namespace detail {
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
45 #else
46 typedef folly::Baton<> FutureBatonType;
47 #endif
48 }
49
50 namespace detail {
51   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52 }
53
54 template <class T>
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56   other.core_ = nullptr;
57 }
58
59 template <class T>
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61   std::swap(core_, other.core_);
62   return *this;
63 }
64
65 template <class T>
66 template <typename U, typename>
67 Future<T>::Future(Future<U>&& other) noexcept
68     : core_(detail::Core<T>::convert(other.core_)) {
69   other.core_ = nullptr;
70 }
71
72 template <class T>
73 template <typename U, typename>
74 Future<T>& Future<T>::operator=(Future<U>&& other) noexcept {
75   std::swap(core_, detail::Core<T>::convert(other.core_));
76   return *this;
77 }
78
79 template <class T>
80 template <class T2, typename>
81 Future<T>::Future(T2&& val)
82     : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
83
84 template <class T>
85 template <typename T2>
86 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
87     : core_(new detail::Core<T>(Try<T>(T()))) {}
88
89 template <class T>
90 Future<T>::~Future() {
91   detach();
92 }
93
94 template <class T>
95 void Future<T>::detach() {
96   if (core_) {
97     core_->detachFuture();
98     core_ = nullptr;
99   }
100 }
101
102 template <class T>
103 void Future<T>::throwIfInvalid() const {
104   if (!core_)
105     throw NoState();
106 }
107
108 template <class T>
109 template <class F>
110 void Future<T>::setCallback_(F&& func) {
111   throwIfInvalid();
112   core_->setCallback(std::forward<F>(func));
113 }
114
115 // unwrap
116
117 template <class T>
118 template <class F>
119 typename std::enable_if<isFuture<F>::value,
120                         Future<typename isFuture<T>::Inner>>::type
121 Future<T>::unwrap() {
122   return then([](Future<typename isFuture<T>::Inner> internal_future) {
123       return internal_future;
124   });
125 }
126
127 // then
128
129 // Variant: returns a value
130 // e.g. f.then([](Try<T>&& t){ return t.value(); });
131 template <class T>
132 template <typename F, typename R, bool isTry, typename... Args>
133 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
134 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
135   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
136   typedef typename R::ReturnsFuture::Inner B;
137
138   throwIfInvalid();
139
140   Promise<B> p;
141   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
142
143   // grab the Future now before we lose our handle on the Promise
144   auto f = p.getFuture();
145   f.core_->setExecutorNoLock(getExecutor());
146
147   /* This is a bit tricky.
148
149      We can't just close over *this in case this Future gets moved. So we
150      make a new dummy Future. We could figure out something more
151      sophisticated that avoids making a new Future object when it can, as an
152      optimization. But this is correct.
153
154      core_ can't be moved, it is explicitly disallowed (as is copying). But
155      if there's ever a reason to allow it, this is one place that makes that
156      assumption and would need to be fixed. We use a standard shared pointer
157      for core_ (by copying it in), which means in essence obj holds a shared
158      pointer to itself.  But this shouldn't leak because Promise will not
159      outlive the continuation, because Promise will setException() with a
160      broken Promise if it is destructed before completed. We could use a
161      weak pointer but it would have to be converted to a shared pointer when
162      func is executed (because the Future returned by func may possibly
163      persist beyond the callback, if it gets moved), and so it is an
164      optimization to just make it shared from the get-go.
165
166      Two subtle but important points about this design. detail::Core has no
167      back pointers to Future or Promise, so if Future or Promise get moved
168      (and they will be moved in performant code) we don't have to do
169      anything fancy. And because we store the continuation in the
170      detail::Core, not in the Future, we can execute the continuation even
171      after the Future has gone out of scope. This is an intentional design
172      decision. It is likely we will want to be able to cancel a continuation
173      in some circumstances, but I think it should be explicit not implicit
174      in the destruction of the Future used to create it.
175      */
176   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
177       Try<T> && t) mutable {
178     if (!isTry && t.hasException()) {
179       pm.setException(std::move(t.exception()));
180     } else {
181       pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
182     }
183   });
184
185   return f;
186 }
187
188 // Variant: returns a Future
189 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
190 template <class T>
191 template <typename F, typename R, bool isTry, typename... Args>
192 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
193 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
194   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
195   typedef typename R::ReturnsFuture::Inner B;
196
197   throwIfInvalid();
198
199   Promise<B> p;
200   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
201
202   // grab the Future now before we lose our handle on the Promise
203   auto f = p.getFuture();
204   f.core_->setExecutorNoLock(getExecutor());
205
206   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
207       Try<T> && t) mutable {
208     auto ew = [&] {
209       if (!isTry && t.hasException()) {
210         return std::move(t.exception());
211       } else {
212         try {
213           auto f2 = funcm(t.template get<isTry, Args>()...);
214           // that didn't throw, now we can steal p
215           f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
216             p.setTry(std::move(b));
217           });
218           return exception_wrapper();
219         } catch (const std::exception& e) {
220           return exception_wrapper(std::current_exception(), e);
221         } catch (...) {
222           return exception_wrapper(std::current_exception());
223         }
224       }
225     }();
226     if (ew) {
227       pm.setException(std::move(ew));
228     }
229   });
230
231   return f;
232 }
233
234 template <typename T>
235 template <typename R, typename Caller, typename... Args>
236   Future<typename isFuture<R>::Inner>
237 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
238   typedef typename std::remove_cv<
239     typename std::remove_reference<
240       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
241   return then([instance, func](Try<T>&& t){
242     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
243   });
244 }
245
246 template <class T>
247 template <class Executor, class Arg, class... Args>
248 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
249   -> decltype(this->then(std::forward<Arg>(arg),
250                          std::forward<Args>(args)...))
251 {
252   auto oldX = getExecutor();
253   setExecutor(x);
254   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
255                via(oldX);
256 }
257
258 template <class T>
259 Future<Unit> Future<T>::then() {
260   return then([] () {});
261 }
262
263 // onError where the callback returns T
264 template <class T>
265 template <class F>
266 typename std::enable_if<
267   !detail::callableWith<F, exception_wrapper>::value &&
268   !detail::Extract<F>::ReturnsFuture::value,
269   Future<T>>::type
270 Future<T>::onError(F&& func) {
271   typedef typename detail::Extract<F>::FirstArg Exn;
272   static_assert(
273       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
274       "Return type of onError callback must be T or Future<T>");
275
276   Promise<T> p;
277   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
278   auto f = p.getFuture();
279
280   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
281       Try<T> && t) mutable {
282     if (!t.template withException<Exn>(
283             [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
284       pm.setTry(std::move(t));
285     }
286   });
287
288   return f;
289 }
290
291 // onError where the callback returns Future<T>
292 template <class T>
293 template <class F>
294 typename std::enable_if<
295   !detail::callableWith<F, exception_wrapper>::value &&
296   detail::Extract<F>::ReturnsFuture::value,
297   Future<T>>::type
298 Future<T>::onError(F&& func) {
299   static_assert(
300       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
301       "Return type of onError callback must be T or Future<T>");
302   typedef typename detail::Extract<F>::FirstArg Exn;
303
304   Promise<T> p;
305   auto f = p.getFuture();
306
307   setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
308       Try<T> && t) mutable {
309     if (!t.template withException<Exn>([&](Exn& e) {
310           auto ew = [&] {
311             try {
312               auto f2 = funcm(e);
313               f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
314                 pm.setTry(std::move(t2));
315               });
316               return exception_wrapper();
317             } catch (const std::exception& e2) {
318               return exception_wrapper(std::current_exception(), e2);
319             } catch (...) {
320               return exception_wrapper(std::current_exception());
321             }
322           }();
323           if (ew) {
324             pm.setException(std::move(ew));
325           }
326         })) {
327       pm.setTry(std::move(t));
328     }
329   });
330
331   return f;
332 }
333
334 template <class T>
335 template <class F>
336 Future<T> Future<T>::ensure(F&& func) {
337   return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
338     funcw();
339     return makeFuture(std::move(t));
340   });
341 }
342
343 template <class T>
344 template <class F>
345 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
346   return within(dur, tk).onError([funcw = std::forward<F>(func)](
347       TimedOut const&) { return funcw(); });
348 }
349
350 template <class T>
351 template <class F>
352 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
353                             detail::Extract<F>::ReturnsFuture::value,
354                         Future<T>>::type
355 Future<T>::onError(F&& func) {
356   static_assert(
357       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
358       "Return type of onError callback must be T or Future<T>");
359
360   Promise<T> p;
361   auto f = p.getFuture();
362   setCallback_(
363       [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
364         if (t.hasException()) {
365           auto ew = [&] {
366             try {
367               auto f2 = funcm(std::move(t.exception()));
368               f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
369                 pm.setTry(std::move(t2));
370               });
371               return exception_wrapper();
372             } catch (const std::exception& e2) {
373               return exception_wrapper(std::current_exception(), e2);
374             } catch (...) {
375               return exception_wrapper(std::current_exception());
376             }
377           }();
378           if (ew) {
379             pm.setException(std::move(ew));
380           }
381         } else {
382           pm.setTry(std::move(t));
383         }
384       });
385
386   return f;
387 }
388
389 // onError(exception_wrapper) that returns T
390 template <class T>
391 template <class F>
392 typename std::enable_if<
393   detail::callableWith<F, exception_wrapper>::value &&
394   !detail::Extract<F>::ReturnsFuture::value,
395   Future<T>>::type
396 Future<T>::onError(F&& func) {
397   static_assert(
398       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
399       "Return type of onError callback must be T or Future<T>");
400
401   Promise<T> p;
402   auto f = p.getFuture();
403   setCallback_(
404       [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
405         if (t.hasException()) {
406           pm.setWith([&] { return funcm(std::move(t.exception())); });
407         } else {
408           pm.setTry(std::move(t));
409         }
410       });
411
412   return f;
413 }
414
415 template <class T>
416 typename std::add_lvalue_reference<T>::type Future<T>::value() {
417   throwIfInvalid();
418
419   return core_->getTry().value();
420 }
421
422 template <class T>
423 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
424   throwIfInvalid();
425
426   return core_->getTry().value();
427 }
428
429 template <class T>
430 Try<T>& Future<T>::getTry() {
431   throwIfInvalid();
432
433   return core_->getTry();
434 }
435
436 template <class T>
437 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
438   return waitVia(e).getTry();
439 }
440
441 template <class T>
442 Optional<Try<T>> Future<T>::poll() {
443   Optional<Try<T>> o;
444   if (core_->ready()) {
445     o = std::move(core_->getTry());
446   }
447   return o;
448 }
449
450 template <class T>
451 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
452   throwIfInvalid();
453
454   setExecutor(executor, priority);
455
456   return std::move(*this);
457 }
458
459 template <class T>
460 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
461   throwIfInvalid();
462
463   Promise<T> p;
464   auto f = p.getFuture();
465   then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
466   return std::move(f).via(executor, priority);
467 }
468
469 template <class Func>
470 auto via(Executor* x, Func&& func)
471   -> Future<typename isFuture<decltype(func())>::Inner>
472 {
473   // TODO make this actually more performant. :-P #7260175
474   return via(x).then(std::forward<Func>(func));
475 }
476
477 template <class T>
478 bool Future<T>::isReady() const {
479   throwIfInvalid();
480   return core_->ready();
481 }
482
483 template <class T>
484 bool Future<T>::hasValue() {
485   return getTry().hasValue();
486 }
487
488 template <class T>
489 bool Future<T>::hasException() {
490   return getTry().hasException();
491 }
492
493 template <class T>
494 void Future<T>::raise(exception_wrapper exception) {
495   core_->raise(std::move(exception));
496 }
497
498 // makeFuture
499
500 template <class T>
501 Future<typename std::decay<T>::type> makeFuture(T&& t) {
502   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
503 }
504
505 inline // for multiple translation units
506 Future<Unit> makeFuture() {
507   return makeFuture(Unit{});
508 }
509
510 // makeFutureWith(Future<T>()) -> Future<T>
511 template <class F>
512 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
513                         typename std::result_of<F()>::type>::type
514 makeFutureWith(F&& func) {
515   using InnerType =
516       typename isFuture<typename std::result_of<F()>::type>::Inner;
517   try {
518     return func();
519   } catch (std::exception& e) {
520     return makeFuture<InnerType>(
521         exception_wrapper(std::current_exception(), e));
522   } catch (...) {
523     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
524   }
525 }
526
527 // makeFutureWith(T()) -> Future<T>
528 // makeFutureWith(void()) -> Future<Unit>
529 template <class F>
530 typename std::enable_if<
531     !(isFuture<typename std::result_of<F()>::type>::value),
532     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
533 makeFutureWith(F&& func) {
534   using LiftedResult =
535       typename Unit::Lift<typename std::result_of<F()>::type>::type;
536   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
537     return func();
538   }));
539 }
540
541 template <class T>
542 Future<T> makeFuture(std::exception_ptr const& e) {
543   return makeFuture(Try<T>(e));
544 }
545
546 template <class T>
547 Future<T> makeFuture(exception_wrapper ew) {
548   return makeFuture(Try<T>(std::move(ew)));
549 }
550
551 template <class T, class E>
552 typename std::enable_if<std::is_base_of<std::exception, E>::value,
553                         Future<T>>::type
554 makeFuture(E const& e) {
555   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
556 }
557
558 template <class T>
559 Future<T> makeFuture(Try<T>&& t) {
560   return Future<T>(new detail::Core<T>(std::move(t)));
561 }
562
563 // via
564 Future<Unit> via(Executor* executor, int8_t priority) {
565   return makeFuture().via(executor, priority);
566 }
567
568 // mapSetCallback calls func(i, Try<T>) when every future completes
569
570 template <class T, class InputIterator, class F>
571 void mapSetCallback(InputIterator first, InputIterator last, F func) {
572   for (size_t i = 0; first != last; ++first, ++i) {
573     first->setCallback_([func, i](Try<T>&& t) {
574       func(i, std::move(t));
575     });
576   }
577 }
578
579 // collectAll (variadic)
580
581 template <typename... Fs>
582 typename detail::CollectAllVariadicContext<
583   typename std::decay<Fs>::type::value_type...>::type
584 collectAll(Fs&&... fs) {
585   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
586     typename std::decay<Fs>::type::value_type...>>();
587   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
588     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
589   return ctx->p.getFuture();
590 }
591
592 // collectAll (iterator)
593
594 template <class InputIterator>
595 Future<
596   std::vector<
597   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
598 collectAll(InputIterator first, InputIterator last) {
599   typedef
600     typename std::iterator_traits<InputIterator>::value_type::value_type T;
601
602   struct CollectAllContext {
603     CollectAllContext(int n) : results(n) {}
604     ~CollectAllContext() {
605       p.setValue(std::move(results));
606     }
607     Promise<std::vector<Try<T>>> p;
608     std::vector<Try<T>> results;
609   };
610
611   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
612   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
613     ctx->results[i] = std::move(t);
614   });
615   return ctx->p.getFuture();
616 }
617
618 // collect (iterator)
619
620 namespace detail {
621
622 template <typename T>
623 struct CollectContext {
624   struct Nothing {
625     explicit Nothing(int /* n */) {}
626   };
627
628   using Result = typename std::conditional<
629     std::is_void<T>::value,
630     void,
631     std::vector<T>>::type;
632
633   using InternalResult = typename std::conditional<
634     std::is_void<T>::value,
635     Nothing,
636     std::vector<Optional<T>>>::type;
637
638   explicit CollectContext(int n) : result(n) {}
639   ~CollectContext() {
640     if (!threw.exchange(true)) {
641       // map Optional<T> -> T
642       std::vector<T> finalResult;
643       finalResult.reserve(result.size());
644       std::transform(result.begin(), result.end(),
645                      std::back_inserter(finalResult),
646                      [](Optional<T>& o) { return std::move(o.value()); });
647       p.setValue(std::move(finalResult));
648     }
649   }
650   inline void setPartialResult(size_t i, Try<T>& t) {
651     result[i] = std::move(t.value());
652   }
653   Promise<Result> p;
654   InternalResult result;
655   std::atomic<bool> threw {false};
656 };
657
658 }
659
660 template <class InputIterator>
661 Future<typename detail::CollectContext<
662   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
663 collect(InputIterator first, InputIterator last) {
664   typedef
665     typename std::iterator_traits<InputIterator>::value_type::value_type T;
666
667   auto ctx = std::make_shared<detail::CollectContext<T>>(
668     std::distance(first, last));
669   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
670     if (t.hasException()) {
671        if (!ctx->threw.exchange(true)) {
672          ctx->p.setException(std::move(t.exception()));
673        }
674      } else if (!ctx->threw) {
675        ctx->setPartialResult(i, t);
676      }
677   });
678   return ctx->p.getFuture();
679 }
680
681 // collect (variadic)
682
683 template <typename... Fs>
684 typename detail::CollectVariadicContext<
685   typename std::decay<Fs>::type::value_type...>::type
686 collect(Fs&&... fs) {
687   auto ctx = std::make_shared<detail::CollectVariadicContext<
688     typename std::decay<Fs>::type::value_type...>>();
689   detail::collectVariadicHelper<detail::CollectVariadicContext>(
690     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
691   return ctx->p.getFuture();
692 }
693
694 // collectAny (iterator)
695
696 template <class InputIterator>
697 Future<
698   std::pair<size_t,
699             Try<
700               typename
701               std::iterator_traits<InputIterator>::value_type::value_type>>>
702 collectAny(InputIterator first, InputIterator last) {
703   typedef
704     typename std::iterator_traits<InputIterator>::value_type::value_type T;
705
706   struct CollectAnyContext {
707     CollectAnyContext() {}
708     Promise<std::pair<size_t, Try<T>>> p;
709     std::atomic<bool> done {false};
710   };
711
712   auto ctx = std::make_shared<CollectAnyContext>();
713   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
714     if (!ctx->done.exchange(true)) {
715       ctx->p.setValue(std::make_pair(i, std::move(t)));
716     }
717   });
718   return ctx->p.getFuture();
719 }
720
721 // collectAnyWithoutException (iterator)
722
723 template <class InputIterator>
724 Future<std::pair<
725     size_t,
726     typename std::iterator_traits<InputIterator>::value_type::value_type>>
727 collectAnyWithoutException(InputIterator first, InputIterator last) {
728   typedef
729       typename std::iterator_traits<InputIterator>::value_type::value_type T;
730
731   struct CollectAnyWithoutExceptionContext {
732     CollectAnyWithoutExceptionContext(){}
733     Promise<std::pair<size_t, T>> p;
734     std::atomic<bool> done{false};
735     std::atomic<size_t> nFulfilled{0};
736     size_t nTotal;
737   };
738
739   auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
740   ctx->nTotal = std::distance(first, last);
741
742   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
743     if (!t.hasException() && !ctx->done.exchange(true)) {
744       ctx->p.setValue(std::make_pair(i, std::move(t.value())));
745     } else if (++ctx->nFulfilled == ctx->nTotal) {
746       ctx->p.setException(t.exception());
747     }
748   });
749   return ctx->p.getFuture();
750 }
751
752 // collectN (iterator)
753
754 template <class InputIterator>
755 Future<std::vector<std::pair<size_t, Try<typename
756   std::iterator_traits<InputIterator>::value_type::value_type>>>>
757 collectN(InputIterator first, InputIterator last, size_t n) {
758   typedef typename
759     std::iterator_traits<InputIterator>::value_type::value_type T;
760   typedef std::vector<std::pair<size_t, Try<T>>> V;
761
762   struct CollectNContext {
763     V v;
764     std::atomic<size_t> completed = {0};
765     Promise<V> p;
766   };
767   auto ctx = std::make_shared<CollectNContext>();
768
769   if (size_t(std::distance(first, last)) < n) {
770     ctx->p.setException(std::runtime_error("Not enough futures"));
771   } else {
772     // for each completed Future, increase count and add to vector, until we
773     // have n completed futures at which point we fulfil our Promise with the
774     // vector
775     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
776       auto c = ++ctx->completed;
777       if (c <= n) {
778         assert(ctx->v.size() < n);
779         ctx->v.emplace_back(i, std::move(t));
780         if (c == n) {
781           ctx->p.setTry(Try<V>(std::move(ctx->v)));
782         }
783       }
784     });
785   }
786
787   return ctx->p.getFuture();
788 }
789
790 // reduce (iterator)
791
792 template <class It, class T, class F>
793 Future<T> reduce(It first, It last, T&& initial, F&& func) {
794   if (first == last) {
795     return makeFuture(std::move(initial));
796   }
797
798   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
799   typedef
800       typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
801                                 Try<ItT>,
802                                 ItT>::type Arg;
803   typedef isTry<Arg> IsTry;
804
805   auto sfunc = std::make_shared<F>(std::move(func));
806
807   auto f = first->then(
808       [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
809         return (*sfunc)(
810             std::move(minitial), head.template get<IsTry::value, Arg&&>());
811       });
812
813   for (++first; first != last; ++first) {
814     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
815       return (*sfunc)(std::move(std::get<0>(t).value()),
816                   // Either return a ItT&& or a Try<ItT>&& depending
817                   // on the type of the argument of func.
818                   std::get<1>(t).template get<IsTry::value, Arg&&>());
819     });
820   }
821
822   return f;
823 }
824
825 // window (collection)
826
827 template <class Collection, class F, class ItT, class Result>
828 std::vector<Future<Result>>
829 window(Collection input, F func, size_t n) {
830   struct WindowContext {
831     WindowContext(Collection&& i, F&& fn)
832         : input_(std::move(i)), promises_(input_.size()),
833           func_(std::move(fn))
834       {}
835     std::atomic<size_t> i_ {0};
836     Collection input_;
837     std::vector<Promise<Result>> promises_;
838     F func_;
839
840     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
841       size_t i = ctx->i_++;
842       if (i < ctx->input_.size()) {
843         // Using setCallback_ directly since we don't need the Future
844         ctx->func_(std::move(ctx->input_[i])).setCallback_(
845           // ctx is captured by value
846           [ctx, i](Try<Result>&& t) {
847             ctx->promises_[i].setTry(std::move(t));
848             // Chain another future onto this one
849             spawn(std::move(ctx));
850           });
851       }
852     }
853   };
854
855   auto max = std::min(n, input.size());
856
857   auto ctx = std::make_shared<WindowContext>(
858     std::move(input), std::move(func));
859
860   for (size_t i = 0; i < max; ++i) {
861     // Start the first n Futures
862     WindowContext::spawn(ctx);
863   }
864
865   std::vector<Future<Result>> futures;
866   futures.reserve(ctx->promises_.size());
867   for (auto& promise : ctx->promises_) {
868     futures.emplace_back(promise.getFuture());
869   }
870
871   return futures;
872 }
873
874 // reduce
875
876 template <class T>
877 template <class I, class F>
878 Future<I> Future<T>::reduce(I&& initial, F&& func) {
879   return then([
880     minitial = std::forward<I>(initial),
881     mfunc = std::forward<F>(func)
882   ](T& vals) mutable {
883     auto ret = std::move(minitial);
884     for (auto& val : vals) {
885       ret = mfunc(std::move(ret), std::move(val));
886     }
887     return ret;
888   });
889 }
890
891 // unorderedReduce (iterator)
892
893 template <class It, class T, class F, class ItT, class Arg>
894 Future<T> unorderedReduce(It first, It last, T initial, F func) {
895   if (first == last) {
896     return makeFuture(std::move(initial));
897   }
898
899   typedef isTry<Arg> IsTry;
900
901   struct UnorderedReduceContext {
902     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
903         : lock_(), memo_(makeFuture<T>(std::move(memo))),
904           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
905       {}
906     folly::MicroSpinLock lock_; // protects memo_ and numThens_
907     Future<T> memo_;
908     F func_;
909     size_t numThens_; // how many Futures completed and called .then()
910     size_t numFutures_; // how many Futures in total
911     Promise<T> promise_;
912   };
913
914   auto ctx = std::make_shared<UnorderedReduceContext>(
915     std::move(initial), std::move(func), std::distance(first, last));
916
917   mapSetCallback<ItT>(
918       first,
919       last,
920       [ctx](size_t /* i */, Try<ItT>&& t) {
921         // Futures can be completed in any order, simultaneously.
922         // To make this non-blocking, we create a new Future chain in
923         // the order of completion to reduce the values.
924         // The spinlock just protects chaining a new Future, not actually
925         // executing the reduce, which should be really fast.
926         folly::MSLGuard lock(ctx->lock_);
927         ctx->memo_ =
928             ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
929               // Either return a ItT&& or a Try<ItT>&& depending
930               // on the type of the argument of func.
931               return ctx->func_(std::move(v),
932                                 mt.template get<IsTry::value, Arg&&>());
933             });
934         if (++ctx->numThens_ == ctx->numFutures_) {
935           // After reducing the value of the last Future, fulfill the Promise
936           ctx->memo_.setCallback_(
937               [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
938         }
939       });
940
941   return ctx->promise_.getFuture();
942 }
943
944 // within
945
946 template <class T>
947 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
948   return within(dur, TimedOut(), tk);
949 }
950
951 template <class T>
952 template <class E>
953 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
954
955   struct Context {
956     Context(E ex) : exception(std::move(ex)), promise() {}
957     E exception;
958     Future<Unit> thisFuture;
959     Promise<T> promise;
960     std::atomic<bool> token {false};
961   };
962
963   std::shared_ptr<Timekeeper> tks;
964   if (!tk) {
965     tks = folly::detail::getTimekeeperSingleton();
966     tk = DCHECK_NOTNULL(tks.get());
967   }
968
969   auto ctx = std::make_shared<Context>(std::move(e));
970
971   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
972     // TODO: "this" completed first, cancel "after"
973     if (ctx->token.exchange(true) == false) {
974       ctx->promise.setTry(std::move(t));
975     }
976   });
977
978   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
979     // "after" completed first, cancel "this"
980     ctx->thisFuture.raise(TimedOut());
981     if (ctx->token.exchange(true) == false) {
982       if (t.hasException()) {
983         ctx->promise.setException(std::move(t.exception()));
984       } else {
985         ctx->promise.setException(std::move(ctx->exception));
986       }
987     }
988   });
989
990   return ctx->promise.getFuture().via(getExecutor());
991 }
992
993 // delayed
994
995 template <class T>
996 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
997   return collectAll(*this, futures::sleep(dur, tk))
998     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
999       Try<T>& t = std::get<0>(tup);
1000       return makeFuture<T>(std::move(t));
1001     });
1002 }
1003
1004 namespace detail {
1005
1006 template <class T>
1007 void waitImpl(Future<T>& f) {
1008   // short-circuit if there's nothing to do
1009   if (f.isReady()) return;
1010
1011   FutureBatonType baton;
1012   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1013   baton.wait();
1014   assert(f.isReady());
1015 }
1016
1017 template <class T>
1018 void waitImpl(Future<T>& f, Duration dur) {
1019   // short-circuit if there's nothing to do
1020   if (f.isReady()) {
1021     return;
1022   }
1023
1024   Promise<T> promise;
1025   auto ret = promise.getFuture();
1026   auto baton = std::make_shared<FutureBatonType>();
1027   f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1028     promise.setTry(std::move(t));
1029     baton->post();
1030   });
1031   f = std::move(ret);
1032   if (baton->timed_wait(dur)) {
1033     assert(f.isReady());
1034   }
1035 }
1036
1037 template <class T>
1038 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1039   // Set callback so to ensure that the via executor has something on it
1040   // so that once the preceding future triggers this callback, drive will
1041   // always have a callback to satisfy it
1042   if (f.isReady())
1043     return;
1044   f = f.via(e).then([](T&& t) { return std::move(t); });
1045   while (!f.isReady()) {
1046     e->drive();
1047   }
1048   assert(f.isReady());
1049 }
1050
1051 } // detail
1052
1053 template <class T>
1054 Future<T>& Future<T>::wait() & {
1055   detail::waitImpl(*this);
1056   return *this;
1057 }
1058
1059 template <class T>
1060 Future<T>&& Future<T>::wait() && {
1061   detail::waitImpl(*this);
1062   return std::move(*this);
1063 }
1064
1065 template <class T>
1066 Future<T>& Future<T>::wait(Duration dur) & {
1067   detail::waitImpl(*this, dur);
1068   return *this;
1069 }
1070
1071 template <class T>
1072 Future<T>&& Future<T>::wait(Duration dur) && {
1073   detail::waitImpl(*this, dur);
1074   return std::move(*this);
1075 }
1076
1077 template <class T>
1078 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1079   detail::waitViaImpl(*this, e);
1080   return *this;
1081 }
1082
1083 template <class T>
1084 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1085   detail::waitViaImpl(*this, e);
1086   return std::move(*this);
1087 }
1088
1089 template <class T>
1090 T Future<T>::get() {
1091   return std::move(wait().value());
1092 }
1093
1094 template <class T>
1095 T Future<T>::get(Duration dur) {
1096   wait(dur);
1097   if (isReady()) {
1098     return std::move(value());
1099   } else {
1100     throw TimedOut();
1101   }
1102 }
1103
1104 template <class T>
1105 T Future<T>::getVia(DrivableExecutor* e) {
1106   return std::move(waitVia(e).value());
1107 }
1108
1109 namespace detail {
1110   template <class T>
1111   struct TryEquals {
1112     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1113       return t1.value() == t2.value();
1114     }
1115   };
1116 }
1117
1118 template <class T>
1119 Future<bool> Future<T>::willEqual(Future<T>& f) {
1120   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1121     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1122       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1123     } else {
1124       return false;
1125       }
1126   });
1127 }
1128
1129 template <class T>
1130 template <class F>
1131 Future<T> Future<T>::filter(F&& predicate) {
1132   return this->then([p = std::forward<F>(predicate)](T val) {
1133     T const& valConstRef = val;
1134     if (!p(valConstRef)) {
1135       throw PredicateDoesNotObtain();
1136     }
1137     return val;
1138   });
1139 }
1140
1141 template <class T>
1142 template <class Callback>
1143 auto Future<T>::thenMulti(Callback&& fn)
1144     -> decltype(this->then(std::forward<Callback>(fn))) {
1145   // thenMulti with one callback is just a then
1146   return then(std::forward<Callback>(fn));
1147 }
1148
1149 template <class T>
1150 template <class Callback, class... Callbacks>
1151 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1152     -> decltype(this->then(std::forward<Callback>(fn)).
1153                       thenMulti(std::forward<Callbacks>(fns)...)) {
1154   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1155   return then(std::forward<Callback>(fn)).
1156          thenMulti(std::forward<Callbacks>(fns)...);
1157 }
1158
1159 template <class T>
1160 template <class Callback, class... Callbacks>
1161 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1162                                       Callbacks&&... fns)
1163     -> decltype(this->then(std::forward<Callback>(fn)).
1164                       thenMulti(std::forward<Callbacks>(fns)...)) {
1165   // thenMultiExecutor with two callbacks is
1166   // via(x).then(a).thenMulti(b, ...).via(oldX)
1167   auto oldX = getExecutor();
1168   setExecutor(x);
1169   return then(std::forward<Callback>(fn)).
1170          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1171 }
1172
1173 template <class T>
1174 template <class Callback>
1175 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1176     -> decltype(this->then(std::forward<Callback>(fn))) {
1177   // thenMulti with one callback is just a then with an executor
1178   return then(x, std::forward<Callback>(fn));
1179 }
1180
1181 template <class F>
1182 inline Future<Unit> when(bool p, F&& thunk) {
1183   return p ? std::forward<F>(thunk)().unit() : makeFuture();
1184 }
1185
1186 template <class P, class F>
1187 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1188   if (predicate()) {
1189     auto future = thunk();
1190     return future.then([
1191       predicate = std::forward<P>(predicate),
1192       thunk = std::forward<F>(thunk)
1193     ]() mutable {
1194       return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1195     });
1196   }
1197   return makeFuture();
1198 }
1199
1200 template <class F>
1201 Future<Unit> times(const int n, F&& thunk) {
1202   return folly::whileDo(
1203       [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
1204         return count->fetch_add(1) < n;
1205       },
1206       std::forward<F>(thunk));
1207 }
1208
1209 namespace futures {
1210   template <class It, class F, class ItT, class Result>
1211   std::vector<Future<Result>> map(It first, It last, F func) {
1212     std::vector<Future<Result>> results;
1213     for (auto it = first; it != last; it++) {
1214       results.push_back(it->then(func));
1215     }
1216     return results;
1217   }
1218 }
1219
1220 namespace futures {
1221
1222 namespace detail {
1223
1224 struct retrying_policy_raw_tag {};
1225 struct retrying_policy_fut_tag {};
1226
1227 template <class Policy>
1228 struct retrying_policy_traits {
1229   using ew = exception_wrapper;
1230   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1231   template <class Ret>
1232   using has_op = typename std::integral_constant<bool,
1233         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1234         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1235   using is_raw = has_op<bool>;
1236   using is_fut = has_op<Future<bool>>;
1237   using tag = typename std::conditional<
1238         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1239         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1240 };
1241
1242 template <class Policy, class FF>
1243 typename std::result_of<FF(size_t)>::type
1244 retrying(size_t k, Policy&& p, FF&& ff) {
1245   using F = typename std::result_of<FF(size_t)>::type;
1246   using T = typename F::value_type;
1247   auto f = ff(k++);
1248   return f.onError(
1249       [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
1250           exception_wrapper x) mutable {
1251         auto q = pm(k, x);
1252         return q.then(
1253             [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
1254                 bool r) mutable {
1255               return r ? retrying(k, std::move(pm), std::move(ffm))
1256                        : makeFuture<T>(std::move(xm));
1257             });
1258       });
1259 }
1260
1261 template <class Policy, class FF>
1262 typename std::result_of<FF(size_t)>::type
1263 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1264   auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1265     return makeFuture<bool>(pm(k, x));
1266   };
1267   return retrying(0, std::move(q), std::forward<FF>(ff));
1268 }
1269
1270 template <class Policy, class FF>
1271 typename std::result_of<FF(size_t)>::type
1272 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1273   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1274 }
1275
1276 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1277 template <class URNG>
1278 Duration retryingJitteredExponentialBackoffDur(
1279     size_t n,
1280     Duration backoff_min,
1281     Duration backoff_max,
1282     double jitter_param,
1283     URNG& rng) {
1284   using d = Duration;
1285   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1286   auto jitter = std::exp(dist(rng));
1287   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1288   return std::max(backoff_min, std::min(backoff_max, backoff));
1289 }
1290
1291 template <class Policy, class URNG>
1292 std::function<Future<bool>(size_t, const exception_wrapper&)>
1293 retryingPolicyCappedJitteredExponentialBackoff(
1294     size_t max_tries,
1295     Duration backoff_min,
1296     Duration backoff_max,
1297     double jitter_param,
1298     URNG&& rng,
1299     Policy&& p) {
1300   return [
1301     pm = std::forward<Policy>(p),
1302     max_tries,
1303     backoff_min,
1304     backoff_max,
1305     jitter_param,
1306     rngp = std::forward<URNG>(rng)
1307   ](size_t n, const exception_wrapper& ex) mutable {
1308     if (n == max_tries) {
1309       return makeFuture(false);
1310     }
1311     return pm(n, ex).then(
1312         [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1313             bool v) mutable {
1314           if (!v) {
1315             return makeFuture(false);
1316           }
1317           auto backoff = detail::retryingJitteredExponentialBackoffDur(
1318               n, backoff_min, backoff_max, jitter_param, rngp);
1319           return futures::sleep(backoff).then([] { return true; });
1320         });
1321   };
1322 }
1323
1324 template <class Policy, class URNG>
1325 std::function<Future<bool>(size_t, const exception_wrapper&)>
1326 retryingPolicyCappedJitteredExponentialBackoff(
1327     size_t max_tries,
1328     Duration backoff_min,
1329     Duration backoff_max,
1330     double jitter_param,
1331     URNG&& rng,
1332     Policy&& p,
1333     retrying_policy_raw_tag) {
1334   auto q = [pm = std::forward<Policy>(p)](
1335       size_t n, const exception_wrapper& e) {
1336     return makeFuture(pm(n, e));
1337   };
1338   return retryingPolicyCappedJitteredExponentialBackoff(
1339       max_tries,
1340       backoff_min,
1341       backoff_max,
1342       jitter_param,
1343       std::forward<URNG>(rng),
1344       std::move(q));
1345 }
1346
1347 template <class Policy, class URNG>
1348 std::function<Future<bool>(size_t, const exception_wrapper&)>
1349 retryingPolicyCappedJitteredExponentialBackoff(
1350     size_t max_tries,
1351     Duration backoff_min,
1352     Duration backoff_max,
1353     double jitter_param,
1354     URNG&& rng,
1355     Policy&& p,
1356     retrying_policy_fut_tag) {
1357   return retryingPolicyCappedJitteredExponentialBackoff(
1358       max_tries,
1359       backoff_min,
1360       backoff_max,
1361       jitter_param,
1362       std::forward<URNG>(rng),
1363       std::forward<Policy>(p));
1364 }
1365 }
1366
1367 template <class Policy, class FF>
1368 typename std::result_of<FF(size_t)>::type
1369 retrying(Policy&& p, FF&& ff) {
1370   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1371   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1372 }
1373
1374 inline
1375 std::function<bool(size_t, const exception_wrapper&)>
1376 retryingPolicyBasic(
1377     size_t max_tries) {
1378   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1379 }
1380
1381 template <class Policy, class URNG>
1382 std::function<Future<bool>(size_t, const exception_wrapper&)>
1383 retryingPolicyCappedJitteredExponentialBackoff(
1384     size_t max_tries,
1385     Duration backoff_min,
1386     Duration backoff_max,
1387     double jitter_param,
1388     URNG&& rng,
1389     Policy&& p) {
1390   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1391   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1392       max_tries,
1393       backoff_min,
1394       backoff_max,
1395       jitter_param,
1396       std::forward<URNG>(rng),
1397       std::forward<Policy>(p),
1398       tag());
1399 }
1400
1401 inline
1402 std::function<Future<bool>(size_t, const exception_wrapper&)>
1403 retryingPolicyCappedJitteredExponentialBackoff(
1404     size_t max_tries,
1405     Duration backoff_min,
1406     Duration backoff_max,
1407     double jitter_param) {
1408   auto p = [](size_t, const exception_wrapper&) { return true; };
1409   return retryingPolicyCappedJitteredExponentialBackoff(
1410       max_tries,
1411       backoff_min,
1412       backoff_max,
1413       jitter_param,
1414       ThreadLocalPRNG(),
1415       std::move(p));
1416 }
1417
1418 }
1419
1420 // Instantiate the most common Future types to save compile time
1421 extern template class Future<Unit>;
1422 extern template class Future<bool>;
1423 extern template class Future<int>;
1424 extern template class Future<int64_t>;
1425 extern template class Future<std::string>;
1426 extern template class Future<double>;
1427
1428 } // namespace folly