Swap a few APIs to reduce sign and implicit truncations required to work with it
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
33 #else
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
36 #endif
37
38 namespace folly {
39
40 class Timekeeper;
41
42 namespace detail {
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
45 #else
46 typedef folly::Baton<> FutureBatonType;
47 #endif
48 }
49
50 namespace detail {
51   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52 }
53
54 template <class T>
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56   other.core_ = nullptr;
57 }
58
59 template <class T>
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61   std::swap(core_, other.core_);
62   return *this;
63 }
64
65 template <class T>
66 template <class T2, typename>
67 Future<T>::Future(T2&& val)
68     : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
69
70 template <class T>
71 template <typename T2>
72 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
73     : core_(new detail::Core<T>(Try<T>(T()))) {}
74
75 template <class T>
76 Future<T>::~Future() {
77   detach();
78 }
79
80 template <class T>
81 void Future<T>::detach() {
82   if (core_) {
83     core_->detachFuture();
84     core_ = nullptr;
85   }
86 }
87
88 template <class T>
89 void Future<T>::throwIfInvalid() const {
90   if (!core_)
91     throw NoState();
92 }
93
94 template <class T>
95 template <class F>
96 void Future<T>::setCallback_(F&& func) {
97   throwIfInvalid();
98   core_->setCallback(std::forward<F>(func));
99 }
100
101 // unwrap
102
103 template <class T>
104 template <class F>
105 typename std::enable_if<isFuture<F>::value,
106                         Future<typename isFuture<T>::Inner>>::type
107 Future<T>::unwrap() {
108   return then([](Future<typename isFuture<T>::Inner> internal_future) {
109       return internal_future;
110   });
111 }
112
113 // then
114
115 // Variant: returns a value
116 // e.g. f.then([](Try<T>&& t){ return t.value(); });
117 template <class T>
118 template <typename F, typename R, bool isTry, typename... Args>
119 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
120 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
121   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
122   typedef typename R::ReturnsFuture::Inner B;
123
124   throwIfInvalid();
125
126   Promise<B> p;
127   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
128
129   // grab the Future now before we lose our handle on the Promise
130   auto f = p.getFuture();
131   f.core_->setExecutorNoLock(getExecutor());
132
133   /* This is a bit tricky.
134
135      We can't just close over *this in case this Future gets moved. So we
136      make a new dummy Future. We could figure out something more
137      sophisticated that avoids making a new Future object when it can, as an
138      optimization. But this is correct.
139
140      core_ can't be moved, it is explicitly disallowed (as is copying). But
141      if there's ever a reason to allow it, this is one place that makes that
142      assumption and would need to be fixed. We use a standard shared pointer
143      for core_ (by copying it in), which means in essence obj holds a shared
144      pointer to itself.  But this shouldn't leak because Promise will not
145      outlive the continuation, because Promise will setException() with a
146      broken Promise if it is destructed before completed. We could use a
147      weak pointer but it would have to be converted to a shared pointer when
148      func is executed (because the Future returned by func may possibly
149      persist beyond the callback, if it gets moved), and so it is an
150      optimization to just make it shared from the get-go.
151
152      Two subtle but important points about this design. detail::Core has no
153      back pointers to Future or Promise, so if Future or Promise get moved
154      (and they will be moved in performant code) we don't have to do
155      anything fancy. And because we store the continuation in the
156      detail::Core, not in the Future, we can execute the continuation even
157      after the Future has gone out of scope. This is an intentional design
158      decision. It is likely we will want to be able to cancel a continuation
159      in some circumstances, but I think it should be explicit not implicit
160      in the destruction of the Future used to create it.
161      */
162   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
163       Try<T> && t) mutable {
164     if (!isTry && t.hasException()) {
165       pm.setException(std::move(t.exception()));
166     } else {
167       pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
168     }
169   });
170
171   return f;
172 }
173
174 // Variant: returns a Future
175 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
176 template <class T>
177 template <typename F, typename R, bool isTry, typename... Args>
178 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
179 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
180   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
181   typedef typename R::ReturnsFuture::Inner B;
182
183   throwIfInvalid();
184
185   Promise<B> p;
186   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
187
188   // grab the Future now before we lose our handle on the Promise
189   auto f = p.getFuture();
190   f.core_->setExecutorNoLock(getExecutor());
191
192   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
193       Try<T> && t) mutable {
194     auto ew = [&] {
195       if (!isTry && t.hasException()) {
196         return std::move(t.exception());
197       } else {
198         try {
199           auto f2 = funcm(t.template get<isTry, Args>()...);
200           // that didn't throw, now we can steal p
201           f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
202             p.setTry(std::move(b));
203           });
204           return exception_wrapper();
205         } catch (const std::exception& e) {
206           return exception_wrapper(std::current_exception(), e);
207         } catch (...) {
208           return exception_wrapper(std::current_exception());
209         }
210       }
211     }();
212     if (ew) {
213       pm.setException(std::move(ew));
214     }
215   });
216
217   return f;
218 }
219
220 template <typename T>
221 template <typename R, typename Caller, typename... Args>
222   Future<typename isFuture<R>::Inner>
223 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
224   typedef typename std::remove_cv<
225     typename std::remove_reference<
226       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
227   return then([instance, func](Try<T>&& t){
228     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
229   });
230 }
231
232 template <class T>
233 template <class Executor, class Arg, class... Args>
234 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
235   -> decltype(this->then(std::forward<Arg>(arg),
236                          std::forward<Args>(args)...))
237 {
238   auto oldX = getExecutor();
239   setExecutor(x);
240   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
241                via(oldX);
242 }
243
244 template <class T>
245 Future<Unit> Future<T>::then() {
246   return then([] () {});
247 }
248
249 // onError where the callback returns T
250 template <class T>
251 template <class F>
252 typename std::enable_if<
253   !detail::callableWith<F, exception_wrapper>::value &&
254   !detail::Extract<F>::ReturnsFuture::value,
255   Future<T>>::type
256 Future<T>::onError(F&& func) {
257   typedef typename detail::Extract<F>::FirstArg Exn;
258   static_assert(
259       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
260       "Return type of onError callback must be T or Future<T>");
261
262   Promise<T> p;
263   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
264   auto f = p.getFuture();
265
266   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
267       Try<T> && t) mutable {
268     if (!t.template withException<Exn>(
269             [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
270       pm.setTry(std::move(t));
271     }
272   });
273
274   return f;
275 }
276
277 // onError where the callback returns Future<T>
278 template <class T>
279 template <class F>
280 typename std::enable_if<
281   !detail::callableWith<F, exception_wrapper>::value &&
282   detail::Extract<F>::ReturnsFuture::value,
283   Future<T>>::type
284 Future<T>::onError(F&& func) {
285   static_assert(
286       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
287       "Return type of onError callback must be T or Future<T>");
288   typedef typename detail::Extract<F>::FirstArg Exn;
289
290   Promise<T> p;
291   auto f = p.getFuture();
292
293   setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
294       Try<T> && t) mutable {
295     if (!t.template withException<Exn>([&](Exn& e) {
296           auto ew = [&] {
297             try {
298               auto f2 = funcm(e);
299               f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
300                 pm.setTry(std::move(t2));
301               });
302               return exception_wrapper();
303             } catch (const std::exception& e2) {
304               return exception_wrapper(std::current_exception(), e2);
305             } catch (...) {
306               return exception_wrapper(std::current_exception());
307             }
308           }();
309           if (ew) {
310             pm.setException(std::move(ew));
311           }
312         })) {
313       pm.setTry(std::move(t));
314     }
315   });
316
317   return f;
318 }
319
320 template <class T>
321 template <class F>
322 Future<T> Future<T>::ensure(F&& func) {
323   return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
324     funcw();
325     return makeFuture(std::move(t));
326   });
327 }
328
329 template <class T>
330 template <class F>
331 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
332   return within(dur, tk).onError([funcw = std::forward<F>(func)](
333       TimedOut const&) { return funcw(); });
334 }
335
336 template <class T>
337 template <class F>
338 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
339                             detail::Extract<F>::ReturnsFuture::value,
340                         Future<T>>::type
341 Future<T>::onError(F&& func) {
342   static_assert(
343       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
344       "Return type of onError callback must be T or Future<T>");
345
346   Promise<T> p;
347   auto f = p.getFuture();
348   setCallback_(
349       [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
350         if (t.hasException()) {
351           auto ew = [&] {
352             try {
353               auto f2 = funcm(std::move(t.exception()));
354               f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
355                 pm.setTry(std::move(t2));
356               });
357               return exception_wrapper();
358             } catch (const std::exception& e2) {
359               return exception_wrapper(std::current_exception(), e2);
360             } catch (...) {
361               return exception_wrapper(std::current_exception());
362             }
363           }();
364           if (ew) {
365             pm.setException(std::move(ew));
366           }
367         } else {
368           pm.setTry(std::move(t));
369         }
370       });
371
372   return f;
373 }
374
375 // onError(exception_wrapper) that returns T
376 template <class T>
377 template <class F>
378 typename std::enable_if<
379   detail::callableWith<F, exception_wrapper>::value &&
380   !detail::Extract<F>::ReturnsFuture::value,
381   Future<T>>::type
382 Future<T>::onError(F&& func) {
383   static_assert(
384       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
385       "Return type of onError callback must be T or Future<T>");
386
387   Promise<T> p;
388   auto f = p.getFuture();
389   setCallback_(
390       [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
391         if (t.hasException()) {
392           pm.setWith([&] { return funcm(std::move(t.exception())); });
393         } else {
394           pm.setTry(std::move(t));
395         }
396       });
397
398   return f;
399 }
400
401 template <class T>
402 typename std::add_lvalue_reference<T>::type Future<T>::value() {
403   throwIfInvalid();
404
405   return core_->getTry().value();
406 }
407
408 template <class T>
409 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
410   throwIfInvalid();
411
412   return core_->getTry().value();
413 }
414
415 template <class T>
416 Try<T>& Future<T>::getTry() {
417   throwIfInvalid();
418
419   return core_->getTry();
420 }
421
422 template <class T>
423 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
424   return waitVia(e).getTry();
425 }
426
427 template <class T>
428 Optional<Try<T>> Future<T>::poll() {
429   Optional<Try<T>> o;
430   if (core_->ready()) {
431     o = std::move(core_->getTry());
432   }
433   return o;
434 }
435
436 template <class T>
437 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
438   throwIfInvalid();
439
440   setExecutor(executor, priority);
441
442   return std::move(*this);
443 }
444
445 template <class T>
446 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
447   throwIfInvalid();
448
449   Promise<T> p;
450   auto f = p.getFuture();
451   then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
452   return std::move(f).via(executor, priority);
453 }
454
455 template <class Func>
456 auto via(Executor* x, Func&& func)
457   -> Future<typename isFuture<decltype(func())>::Inner>
458 {
459   // TODO make this actually more performant. :-P #7260175
460   return via(x).then(std::forward<Func>(func));
461 }
462
463 template <class T>
464 bool Future<T>::isReady() const {
465   throwIfInvalid();
466   return core_->ready();
467 }
468
469 template <class T>
470 bool Future<T>::hasValue() {
471   return getTry().hasValue();
472 }
473
474 template <class T>
475 bool Future<T>::hasException() {
476   return getTry().hasException();
477 }
478
479 template <class T>
480 void Future<T>::raise(exception_wrapper exception) {
481   core_->raise(std::move(exception));
482 }
483
484 // makeFuture
485
486 template <class T>
487 Future<typename std::decay<T>::type> makeFuture(T&& t) {
488   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
489 }
490
491 inline // for multiple translation units
492 Future<Unit> makeFuture() {
493   return makeFuture(Unit{});
494 }
495
496 // makeFutureWith(Future<T>()) -> Future<T>
497 template <class F>
498 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
499                         typename std::result_of<F()>::type>::type
500 makeFutureWith(F&& func) {
501   using InnerType =
502       typename isFuture<typename std::result_of<F()>::type>::Inner;
503   try {
504     return func();
505   } catch (std::exception& e) {
506     return makeFuture<InnerType>(
507         exception_wrapper(std::current_exception(), e));
508   } catch (...) {
509     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
510   }
511 }
512
513 // makeFutureWith(T()) -> Future<T>
514 // makeFutureWith(void()) -> Future<Unit>
515 template <class F>
516 typename std::enable_if<
517     !(isFuture<typename std::result_of<F()>::type>::value),
518     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
519 makeFutureWith(F&& func) {
520   using LiftedResult =
521       typename Unit::Lift<typename std::result_of<F()>::type>::type;
522   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
523     return func();
524   }));
525 }
526
527 template <class T>
528 Future<T> makeFuture(std::exception_ptr const& e) {
529   return makeFuture(Try<T>(e));
530 }
531
532 template <class T>
533 Future<T> makeFuture(exception_wrapper ew) {
534   return makeFuture(Try<T>(std::move(ew)));
535 }
536
537 template <class T, class E>
538 typename std::enable_if<std::is_base_of<std::exception, E>::value,
539                         Future<T>>::type
540 makeFuture(E const& e) {
541   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
542 }
543
544 template <class T>
545 Future<T> makeFuture(Try<T>&& t) {
546   return Future<T>(new detail::Core<T>(std::move(t)));
547 }
548
549 // via
550 Future<Unit> via(Executor* executor, int8_t priority) {
551   return makeFuture().via(executor, priority);
552 }
553
554 // mapSetCallback calls func(i, Try<T>) when every future completes
555
556 template <class T, class InputIterator, class F>
557 void mapSetCallback(InputIterator first, InputIterator last, F func) {
558   for (size_t i = 0; first != last; ++first, ++i) {
559     first->setCallback_([func, i](Try<T>&& t) {
560       func(i, std::move(t));
561     });
562   }
563 }
564
565 // collectAll (variadic)
566
567 template <typename... Fs>
568 typename detail::CollectAllVariadicContext<
569   typename std::decay<Fs>::type::value_type...>::type
570 collectAll(Fs&&... fs) {
571   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
572     typename std::decay<Fs>::type::value_type...>>();
573   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
574     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
575   return ctx->p.getFuture();
576 }
577
578 // collectAll (iterator)
579
580 template <class InputIterator>
581 Future<
582   std::vector<
583   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
584 collectAll(InputIterator first, InputIterator last) {
585   typedef
586     typename std::iterator_traits<InputIterator>::value_type::value_type T;
587
588   struct CollectAllContext {
589     CollectAllContext(size_t n) : results(n) {}
590     ~CollectAllContext() {
591       p.setValue(std::move(results));
592     }
593     Promise<std::vector<Try<T>>> p;
594     std::vector<Try<T>> results;
595   };
596
597   auto ctx =
598       std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
599   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
600     ctx->results[i] = std::move(t);
601   });
602   return ctx->p.getFuture();
603 }
604
605 // collect (iterator)
606
607 namespace detail {
608
609 template <typename T>
610 struct CollectContext {
611   struct Nothing {
612     explicit Nothing(int /* n */) {}
613   };
614
615   using Result = typename std::conditional<
616     std::is_void<T>::value,
617     void,
618     std::vector<T>>::type;
619
620   using InternalResult = typename std::conditional<
621     std::is_void<T>::value,
622     Nothing,
623     std::vector<Optional<T>>>::type;
624
625   explicit CollectContext(size_t n) : result(n) {}
626   ~CollectContext() {
627     if (!threw.exchange(true)) {
628       // map Optional<T> -> T
629       std::vector<T> finalResult;
630       finalResult.reserve(result.size());
631       std::transform(result.begin(), result.end(),
632                      std::back_inserter(finalResult),
633                      [](Optional<T>& o) { return std::move(o.value()); });
634       p.setValue(std::move(finalResult));
635     }
636   }
637   inline void setPartialResult(size_t i, Try<T>& t) {
638     result[i] = std::move(t.value());
639   }
640   Promise<Result> p;
641   InternalResult result;
642   std::atomic<bool> threw {false};
643 };
644
645 }
646
647 template <class InputIterator>
648 Future<typename detail::CollectContext<
649   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
650 collect(InputIterator first, InputIterator last) {
651   typedef
652     typename std::iterator_traits<InputIterator>::value_type::value_type T;
653
654   auto ctx = std::make_shared<detail::CollectContext<T>>(
655     std::distance(first, last));
656   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
657     if (t.hasException()) {
658        if (!ctx->threw.exchange(true)) {
659          ctx->p.setException(std::move(t.exception()));
660        }
661      } else if (!ctx->threw) {
662        ctx->setPartialResult(i, t);
663      }
664   });
665   return ctx->p.getFuture();
666 }
667
668 // collect (variadic)
669
670 template <typename... Fs>
671 typename detail::CollectVariadicContext<
672   typename std::decay<Fs>::type::value_type...>::type
673 collect(Fs&&... fs) {
674   auto ctx = std::make_shared<detail::CollectVariadicContext<
675     typename std::decay<Fs>::type::value_type...>>();
676   detail::collectVariadicHelper<detail::CollectVariadicContext>(
677     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
678   return ctx->p.getFuture();
679 }
680
681 // collectAny (iterator)
682
683 template <class InputIterator>
684 Future<
685   std::pair<size_t,
686             Try<
687               typename
688               std::iterator_traits<InputIterator>::value_type::value_type>>>
689 collectAny(InputIterator first, InputIterator last) {
690   typedef
691     typename std::iterator_traits<InputIterator>::value_type::value_type T;
692
693   struct CollectAnyContext {
694     CollectAnyContext() {}
695     Promise<std::pair<size_t, Try<T>>> p;
696     std::atomic<bool> done {false};
697   };
698
699   auto ctx = std::make_shared<CollectAnyContext>();
700   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
701     if (!ctx->done.exchange(true)) {
702       ctx->p.setValue(std::make_pair(i, std::move(t)));
703     }
704   });
705   return ctx->p.getFuture();
706 }
707
708 // collectAnyWithoutException (iterator)
709
710 template <class InputIterator>
711 Future<std::pair<
712     size_t,
713     typename std::iterator_traits<InputIterator>::value_type::value_type>>
714 collectAnyWithoutException(InputIterator first, InputIterator last) {
715   typedef
716       typename std::iterator_traits<InputIterator>::value_type::value_type T;
717
718   struct CollectAnyWithoutExceptionContext {
719     CollectAnyWithoutExceptionContext(){}
720     Promise<std::pair<size_t, T>> p;
721     std::atomic<bool> done{false};
722     std::atomic<size_t> nFulfilled{0};
723     size_t nTotal;
724   };
725
726   auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
727   ctx->nTotal = size_t(std::distance(first, last));
728
729   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
730     if (!t.hasException() && !ctx->done.exchange(true)) {
731       ctx->p.setValue(std::make_pair(i, std::move(t.value())));
732     } else if (++ctx->nFulfilled == ctx->nTotal) {
733       ctx->p.setException(t.exception());
734     }
735   });
736   return ctx->p.getFuture();
737 }
738
739 // collectN (iterator)
740
741 template <class InputIterator>
742 Future<std::vector<std::pair<size_t, Try<typename
743   std::iterator_traits<InputIterator>::value_type::value_type>>>>
744 collectN(InputIterator first, InputIterator last, size_t n) {
745   typedef typename
746     std::iterator_traits<InputIterator>::value_type::value_type T;
747   typedef std::vector<std::pair<size_t, Try<T>>> V;
748
749   struct CollectNContext {
750     V v;
751     std::atomic<size_t> completed = {0};
752     Promise<V> p;
753   };
754   auto ctx = std::make_shared<CollectNContext>();
755
756   if (size_t(std::distance(first, last)) < n) {
757     ctx->p.setException(std::runtime_error("Not enough futures"));
758   } else {
759     // for each completed Future, increase count and add to vector, until we
760     // have n completed futures at which point we fulfil our Promise with the
761     // vector
762     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
763       auto c = ++ctx->completed;
764       if (c <= n) {
765         assert(ctx->v.size() < n);
766         ctx->v.emplace_back(i, std::move(t));
767         if (c == n) {
768           ctx->p.setTry(Try<V>(std::move(ctx->v)));
769         }
770       }
771     });
772   }
773
774   return ctx->p.getFuture();
775 }
776
777 // reduce (iterator)
778
779 template <class It, class T, class F>
780 Future<T> reduce(It first, It last, T&& initial, F&& func) {
781   if (first == last) {
782     return makeFuture(std::move(initial));
783   }
784
785   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
786   typedef
787       typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
788                                 Try<ItT>,
789                                 ItT>::type Arg;
790   typedef isTry<Arg> IsTry;
791
792   auto sfunc = std::make_shared<F>(std::move(func));
793
794   auto f = first->then(
795       [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
796         return (*sfunc)(
797             std::move(minitial), head.template get<IsTry::value, Arg&&>());
798       });
799
800   for (++first; first != last; ++first) {
801     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
802       return (*sfunc)(std::move(std::get<0>(t).value()),
803                   // Either return a ItT&& or a Try<ItT>&& depending
804                   // on the type of the argument of func.
805                   std::get<1>(t).template get<IsTry::value, Arg&&>());
806     });
807   }
808
809   return f;
810 }
811
812 // window (collection)
813
814 template <class Collection, class F, class ItT, class Result>
815 std::vector<Future<Result>>
816 window(Collection input, F func, size_t n) {
817   struct WindowContext {
818     WindowContext(Collection&& i, F&& fn)
819         : input_(std::move(i)), promises_(input_.size()),
820           func_(std::move(fn))
821       {}
822     std::atomic<size_t> i_ {0};
823     Collection input_;
824     std::vector<Promise<Result>> promises_;
825     F func_;
826
827     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
828       size_t i = ctx->i_++;
829       if (i < ctx->input_.size()) {
830         // Using setCallback_ directly since we don't need the Future
831         ctx->func_(std::move(ctx->input_[i])).setCallback_(
832           // ctx is captured by value
833           [ctx, i](Try<Result>&& t) {
834             ctx->promises_[i].setTry(std::move(t));
835             // Chain another future onto this one
836             spawn(std::move(ctx));
837           });
838       }
839     }
840   };
841
842   auto max = std::min(n, input.size());
843
844   auto ctx = std::make_shared<WindowContext>(
845     std::move(input), std::move(func));
846
847   for (size_t i = 0; i < max; ++i) {
848     // Start the first n Futures
849     WindowContext::spawn(ctx);
850   }
851
852   std::vector<Future<Result>> futures;
853   futures.reserve(ctx->promises_.size());
854   for (auto& promise : ctx->promises_) {
855     futures.emplace_back(promise.getFuture());
856   }
857
858   return futures;
859 }
860
861 // reduce
862
863 template <class T>
864 template <class I, class F>
865 Future<I> Future<T>::reduce(I&& initial, F&& func) {
866   return then([
867     minitial = std::forward<I>(initial),
868     mfunc = std::forward<F>(func)
869   ](T& vals) mutable {
870     auto ret = std::move(minitial);
871     for (auto& val : vals) {
872       ret = mfunc(std::move(ret), std::move(val));
873     }
874     return ret;
875   });
876 }
877
878 // unorderedReduce (iterator)
879
880 template <class It, class T, class F, class ItT, class Arg>
881 Future<T> unorderedReduce(It first, It last, T initial, F func) {
882   if (first == last) {
883     return makeFuture(std::move(initial));
884   }
885
886   typedef isTry<Arg> IsTry;
887
888   struct UnorderedReduceContext {
889     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
890         : lock_(), memo_(makeFuture<T>(std::move(memo))),
891           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
892       {}
893     folly::MicroSpinLock lock_; // protects memo_ and numThens_
894     Future<T> memo_;
895     F func_;
896     size_t numThens_; // how many Futures completed and called .then()
897     size_t numFutures_; // how many Futures in total
898     Promise<T> promise_;
899   };
900
901   auto ctx = std::make_shared<UnorderedReduceContext>(
902     std::move(initial), std::move(func), std::distance(first, last));
903
904   mapSetCallback<ItT>(
905       first,
906       last,
907       [ctx](size_t /* i */, Try<ItT>&& t) {
908         // Futures can be completed in any order, simultaneously.
909         // To make this non-blocking, we create a new Future chain in
910         // the order of completion to reduce the values.
911         // The spinlock just protects chaining a new Future, not actually
912         // executing the reduce, which should be really fast.
913         folly::MSLGuard lock(ctx->lock_);
914         ctx->memo_ =
915             ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
916               // Either return a ItT&& or a Try<ItT>&& depending
917               // on the type of the argument of func.
918               return ctx->func_(std::move(v),
919                                 mt.template get<IsTry::value, Arg&&>());
920             });
921         if (++ctx->numThens_ == ctx->numFutures_) {
922           // After reducing the value of the last Future, fulfill the Promise
923           ctx->memo_.setCallback_(
924               [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
925         }
926       });
927
928   return ctx->promise_.getFuture();
929 }
930
931 // within
932
933 template <class T>
934 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
935   return within(dur, TimedOut(), tk);
936 }
937
938 template <class T>
939 template <class E>
940 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
941
942   struct Context {
943     Context(E ex) : exception(std::move(ex)), promise() {}
944     E exception;
945     Future<Unit> thisFuture;
946     Promise<T> promise;
947     std::atomic<bool> token {false};
948   };
949
950   std::shared_ptr<Timekeeper> tks;
951   if (!tk) {
952     tks = folly::detail::getTimekeeperSingleton();
953     tk = DCHECK_NOTNULL(tks.get());
954   }
955
956   auto ctx = std::make_shared<Context>(std::move(e));
957
958   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
959     // TODO: "this" completed first, cancel "after"
960     if (ctx->token.exchange(true) == false) {
961       ctx->promise.setTry(std::move(t));
962     }
963   });
964
965   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
966     // "after" completed first, cancel "this"
967     ctx->thisFuture.raise(TimedOut());
968     if (ctx->token.exchange(true) == false) {
969       if (t.hasException()) {
970         ctx->promise.setException(std::move(t.exception()));
971       } else {
972         ctx->promise.setException(std::move(ctx->exception));
973       }
974     }
975   });
976
977   return ctx->promise.getFuture().via(getExecutor());
978 }
979
980 // delayed
981
982 template <class T>
983 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
984   return collectAll(*this, futures::sleep(dur, tk))
985     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
986       Try<T>& t = std::get<0>(tup);
987       return makeFuture<T>(std::move(t));
988     });
989 }
990
991 namespace detail {
992
993 template <class T>
994 void waitImpl(Future<T>& f) {
995   // short-circuit if there's nothing to do
996   if (f.isReady()) return;
997
998   FutureBatonType baton;
999   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1000   baton.wait();
1001   assert(f.isReady());
1002 }
1003
1004 template <class T>
1005 void waitImpl(Future<T>& f, Duration dur) {
1006   // short-circuit if there's nothing to do
1007   if (f.isReady()) {
1008     return;
1009   }
1010
1011   Promise<T> promise;
1012   auto ret = promise.getFuture();
1013   auto baton = std::make_shared<FutureBatonType>();
1014   f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1015     promise.setTry(std::move(t));
1016     baton->post();
1017   });
1018   f = std::move(ret);
1019   if (baton->timed_wait(dur)) {
1020     assert(f.isReady());
1021   }
1022 }
1023
1024 template <class T>
1025 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1026   // Set callback so to ensure that the via executor has something on it
1027   // so that once the preceding future triggers this callback, drive will
1028   // always have a callback to satisfy it
1029   if (f.isReady())
1030     return;
1031   f = f.via(e).then([](T&& t) { return std::move(t); });
1032   while (!f.isReady()) {
1033     e->drive();
1034   }
1035   assert(f.isReady());
1036 }
1037
1038 } // detail
1039
1040 template <class T>
1041 Future<T>& Future<T>::wait() & {
1042   detail::waitImpl(*this);
1043   return *this;
1044 }
1045
1046 template <class T>
1047 Future<T>&& Future<T>::wait() && {
1048   detail::waitImpl(*this);
1049   return std::move(*this);
1050 }
1051
1052 template <class T>
1053 Future<T>& Future<T>::wait(Duration dur) & {
1054   detail::waitImpl(*this, dur);
1055   return *this;
1056 }
1057
1058 template <class T>
1059 Future<T>&& Future<T>::wait(Duration dur) && {
1060   detail::waitImpl(*this, dur);
1061   return std::move(*this);
1062 }
1063
1064 template <class T>
1065 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1066   detail::waitViaImpl(*this, e);
1067   return *this;
1068 }
1069
1070 template <class T>
1071 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1072   detail::waitViaImpl(*this, e);
1073   return std::move(*this);
1074 }
1075
1076 template <class T>
1077 T Future<T>::get() {
1078   return std::move(wait().value());
1079 }
1080
1081 template <class T>
1082 T Future<T>::get(Duration dur) {
1083   wait(dur);
1084   if (isReady()) {
1085     return std::move(value());
1086   } else {
1087     throw TimedOut();
1088   }
1089 }
1090
1091 template <class T>
1092 T Future<T>::getVia(DrivableExecutor* e) {
1093   return std::move(waitVia(e).value());
1094 }
1095
1096 namespace detail {
1097   template <class T>
1098   struct TryEquals {
1099     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1100       return t1.value() == t2.value();
1101     }
1102   };
1103 }
1104
1105 template <class T>
1106 Future<bool> Future<T>::willEqual(Future<T>& f) {
1107   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1108     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1109       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1110     } else {
1111       return false;
1112       }
1113   });
1114 }
1115
1116 template <class T>
1117 template <class F>
1118 Future<T> Future<T>::filter(F&& predicate) {
1119   return this->then([p = std::forward<F>(predicate)](T val) {
1120     T const& valConstRef = val;
1121     if (!p(valConstRef)) {
1122       throw PredicateDoesNotObtain();
1123     }
1124     return val;
1125   });
1126 }
1127
1128 template <class T>
1129 template <class Callback>
1130 auto Future<T>::thenMulti(Callback&& fn)
1131     -> decltype(this->then(std::forward<Callback>(fn))) {
1132   // thenMulti with one callback is just a then
1133   return then(std::forward<Callback>(fn));
1134 }
1135
1136 template <class T>
1137 template <class Callback, class... Callbacks>
1138 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1139     -> decltype(this->then(std::forward<Callback>(fn)).
1140                       thenMulti(std::forward<Callbacks>(fns)...)) {
1141   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1142   return then(std::forward<Callback>(fn)).
1143          thenMulti(std::forward<Callbacks>(fns)...);
1144 }
1145
1146 template <class T>
1147 template <class Callback, class... Callbacks>
1148 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1149                                       Callbacks&&... fns)
1150     -> decltype(this->then(std::forward<Callback>(fn)).
1151                       thenMulti(std::forward<Callbacks>(fns)...)) {
1152   // thenMultiExecutor with two callbacks is
1153   // via(x).then(a).thenMulti(b, ...).via(oldX)
1154   auto oldX = getExecutor();
1155   setExecutor(x);
1156   return then(std::forward<Callback>(fn)).
1157          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1158 }
1159
1160 template <class T>
1161 template <class Callback>
1162 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1163     -> decltype(this->then(std::forward<Callback>(fn))) {
1164   // thenMulti with one callback is just a then with an executor
1165   return then(x, std::forward<Callback>(fn));
1166 }
1167
1168 template <class F>
1169 inline Future<Unit> when(bool p, F&& thunk) {
1170   return p ? std::forward<F>(thunk)().unit() : makeFuture();
1171 }
1172
1173 template <class P, class F>
1174 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1175   if (predicate()) {
1176     auto future = thunk();
1177     return future.then([
1178       predicate = std::forward<P>(predicate),
1179       thunk = std::forward<F>(thunk)
1180     ]() mutable {
1181       return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1182     });
1183   }
1184   return makeFuture();
1185 }
1186
1187 template <class F>
1188 Future<Unit> times(const int n, F&& thunk) {
1189   return folly::whileDo(
1190       [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
1191         return count->fetch_add(1) < n;
1192       },
1193       std::forward<F>(thunk));
1194 }
1195
1196 namespace futures {
1197   template <class It, class F, class ItT, class Result>
1198   std::vector<Future<Result>> map(It first, It last, F func) {
1199     std::vector<Future<Result>> results;
1200     for (auto it = first; it != last; it++) {
1201       results.push_back(it->then(func));
1202     }
1203     return results;
1204   }
1205 }
1206
1207 namespace futures {
1208
1209 namespace detail {
1210
1211 struct retrying_policy_raw_tag {};
1212 struct retrying_policy_fut_tag {};
1213
1214 template <class Policy>
1215 struct retrying_policy_traits {
1216   using ew = exception_wrapper;
1217   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1218   template <class Ret>
1219   using has_op = typename std::integral_constant<bool,
1220         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1221         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1222   using is_raw = has_op<bool>;
1223   using is_fut = has_op<Future<bool>>;
1224   using tag = typename std::conditional<
1225         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1226         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1227 };
1228
1229 template <class Policy, class FF>
1230 typename std::result_of<FF(size_t)>::type
1231 retrying(size_t k, Policy&& p, FF&& ff) {
1232   using F = typename std::result_of<FF(size_t)>::type;
1233   using T = typename F::value_type;
1234   auto f = ff(k++);
1235   return f.onError(
1236       [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
1237           exception_wrapper x) mutable {
1238         auto q = pm(k, x);
1239         return q.then(
1240             [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
1241                 bool r) mutable {
1242               return r ? retrying(k, std::move(pm), std::move(ffm))
1243                        : makeFuture<T>(std::move(xm));
1244             });
1245       });
1246 }
1247
1248 template <class Policy, class FF>
1249 typename std::result_of<FF(size_t)>::type
1250 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1251   auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1252     return makeFuture<bool>(pm(k, x));
1253   };
1254   return retrying(0, std::move(q), std::forward<FF>(ff));
1255 }
1256
1257 template <class Policy, class FF>
1258 typename std::result_of<FF(size_t)>::type
1259 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1260   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1261 }
1262
1263 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1264 template <class URNG>
1265 Duration retryingJitteredExponentialBackoffDur(
1266     size_t n,
1267     Duration backoff_min,
1268     Duration backoff_max,
1269     double jitter_param,
1270     URNG& rng) {
1271   using d = Duration;
1272   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1273   auto jitter = std::exp(dist(rng));
1274   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1275   return std::max(backoff_min, std::min(backoff_max, backoff));
1276 }
1277
1278 template <class Policy, class URNG>
1279 std::function<Future<bool>(size_t, const exception_wrapper&)>
1280 retryingPolicyCappedJitteredExponentialBackoff(
1281     size_t max_tries,
1282     Duration backoff_min,
1283     Duration backoff_max,
1284     double jitter_param,
1285     URNG&& rng,
1286     Policy&& p) {
1287   return [
1288     pm = std::forward<Policy>(p),
1289     max_tries,
1290     backoff_min,
1291     backoff_max,
1292     jitter_param,
1293     rngp = std::forward<URNG>(rng)
1294   ](size_t n, const exception_wrapper& ex) mutable {
1295     if (n == max_tries) {
1296       return makeFuture(false);
1297     }
1298     return pm(n, ex).then(
1299         [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1300             bool v) mutable {
1301           if (!v) {
1302             return makeFuture(false);
1303           }
1304           auto backoff = detail::retryingJitteredExponentialBackoffDur(
1305               n, backoff_min, backoff_max, jitter_param, rngp);
1306           return futures::sleep(backoff).then([] { return true; });
1307         });
1308   };
1309 }
1310
1311 template <class Policy, class URNG>
1312 std::function<Future<bool>(size_t, const exception_wrapper&)>
1313 retryingPolicyCappedJitteredExponentialBackoff(
1314     size_t max_tries,
1315     Duration backoff_min,
1316     Duration backoff_max,
1317     double jitter_param,
1318     URNG&& rng,
1319     Policy&& p,
1320     retrying_policy_raw_tag) {
1321   auto q = [pm = std::forward<Policy>(p)](
1322       size_t n, const exception_wrapper& e) {
1323     return makeFuture(pm(n, e));
1324   };
1325   return retryingPolicyCappedJitteredExponentialBackoff(
1326       max_tries,
1327       backoff_min,
1328       backoff_max,
1329       jitter_param,
1330       std::forward<URNG>(rng),
1331       std::move(q));
1332 }
1333
1334 template <class Policy, class URNG>
1335 std::function<Future<bool>(size_t, const exception_wrapper&)>
1336 retryingPolicyCappedJitteredExponentialBackoff(
1337     size_t max_tries,
1338     Duration backoff_min,
1339     Duration backoff_max,
1340     double jitter_param,
1341     URNG&& rng,
1342     Policy&& p,
1343     retrying_policy_fut_tag) {
1344   return retryingPolicyCappedJitteredExponentialBackoff(
1345       max_tries,
1346       backoff_min,
1347       backoff_max,
1348       jitter_param,
1349       std::forward<URNG>(rng),
1350       std::forward<Policy>(p));
1351 }
1352 }
1353
1354 template <class Policy, class FF>
1355 typename std::result_of<FF(size_t)>::type
1356 retrying(Policy&& p, FF&& ff) {
1357   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1358   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1359 }
1360
1361 inline
1362 std::function<bool(size_t, const exception_wrapper&)>
1363 retryingPolicyBasic(
1364     size_t max_tries) {
1365   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1366 }
1367
1368 template <class Policy, class URNG>
1369 std::function<Future<bool>(size_t, const exception_wrapper&)>
1370 retryingPolicyCappedJitteredExponentialBackoff(
1371     size_t max_tries,
1372     Duration backoff_min,
1373     Duration backoff_max,
1374     double jitter_param,
1375     URNG&& rng,
1376     Policy&& p) {
1377   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1378   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1379       max_tries,
1380       backoff_min,
1381       backoff_max,
1382       jitter_param,
1383       std::forward<URNG>(rng),
1384       std::forward<Policy>(p),
1385       tag());
1386 }
1387
1388 inline
1389 std::function<Future<bool>(size_t, const exception_wrapper&)>
1390 retryingPolicyCappedJitteredExponentialBackoff(
1391     size_t max_tries,
1392     Duration backoff_min,
1393     Duration backoff_max,
1394     double jitter_param) {
1395   auto p = [](size_t, const exception_wrapper&) { return true; };
1396   return retryingPolicyCappedJitteredExponentialBackoff(
1397       max_tries,
1398       backoff_min,
1399       backoff_max,
1400       jitter_param,
1401       ThreadLocalPRNG(),
1402       std::move(p));
1403 }
1404
1405 }
1406
1407 // Instantiate the most common Future types to save compile time
1408 extern template class Future<Unit>;
1409 extern template class Future<bool>;
1410 extern template class Future<int>;
1411 extern template class Future<int64_t>;
1412 extern template class Future<std::string>;
1413 extern template class Future<double>;
1414
1415 } // namespace folly