Add pre received data API to AsyncSSLSocket.
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
33 #else
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
36 #endif
37
38 namespace folly {
39
40 class Timekeeper;
41
42 namespace detail {
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
45 #else
46 typedef folly::Baton<> FutureBatonType;
47 #endif
48 }
49
50 namespace detail {
51   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52 }
53
54 template <class T>
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56   other.core_ = nullptr;
57 }
58
59 template <class T>
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61   std::swap(core_, other.core_);
62   return *this;
63 }
64
65 template <class T>
66 template <class T2, typename>
67 Future<T>::Future(T2&& val)
68     : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
69
70 template <class T>
71 template <typename T2>
72 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
73     : core_(new detail::Core<T>(Try<T>(T()))) {}
74
75 template <class T>
76 Future<T>::~Future() {
77   detach();
78 }
79
80 template <class T>
81 void Future<T>::detach() {
82   if (core_) {
83     core_->detachFuture();
84     core_ = nullptr;
85   }
86 }
87
88 template <class T>
89 void Future<T>::throwIfInvalid() const {
90   if (!core_)
91     throw NoState();
92 }
93
94 template <class T>
95 template <class F>
96 void Future<T>::setCallback_(F&& func) {
97   throwIfInvalid();
98   core_->setCallback(std::forward<F>(func));
99 }
100
101 // unwrap
102
103 template <class T>
104 template <class F>
105 typename std::enable_if<isFuture<F>::value,
106                         Future<typename isFuture<T>::Inner>>::type
107 Future<T>::unwrap() {
108   return then([](Future<typename isFuture<T>::Inner> internal_future) {
109       return internal_future;
110   });
111 }
112
113 // then
114
115 // Variant: returns a value
116 // e.g. f.then([](Try<T>&& t){ return t.value(); });
117 template <class T>
118 template <typename F, typename R, bool isTry, typename... Args>
119 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
120 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
121   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
122   typedef typename R::ReturnsFuture::Inner B;
123
124   throwIfInvalid();
125
126   Promise<B> p;
127   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
128
129   // grab the Future now before we lose our handle on the Promise
130   auto f = p.getFuture();
131   f.core_->setExecutorNoLock(getExecutor());
132
133   /* This is a bit tricky.
134
135      We can't just close over *this in case this Future gets moved. So we
136      make a new dummy Future. We could figure out something more
137      sophisticated that avoids making a new Future object when it can, as an
138      optimization. But this is correct.
139
140      core_ can't be moved, it is explicitly disallowed (as is copying). But
141      if there's ever a reason to allow it, this is one place that makes that
142      assumption and would need to be fixed. We use a standard shared pointer
143      for core_ (by copying it in), which means in essence obj holds a shared
144      pointer to itself.  But this shouldn't leak because Promise will not
145      outlive the continuation, because Promise will setException() with a
146      broken Promise if it is destructed before completed. We could use a
147      weak pointer but it would have to be converted to a shared pointer when
148      func is executed (because the Future returned by func may possibly
149      persist beyond the callback, if it gets moved), and so it is an
150      optimization to just make it shared from the get-go.
151
152      Two subtle but important points about this design. detail::Core has no
153      back pointers to Future or Promise, so if Future or Promise get moved
154      (and they will be moved in performant code) we don't have to do
155      anything fancy. And because we store the continuation in the
156      detail::Core, not in the Future, we can execute the continuation even
157      after the Future has gone out of scope. This is an intentional design
158      decision. It is likely we will want to be able to cancel a continuation
159      in some circumstances, but I think it should be explicit not implicit
160      in the destruction of the Future used to create it.
161      */
162   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
163       Try<T> && t) mutable {
164     if (!isTry && t.hasException()) {
165       pm.setException(std::move(t.exception()));
166     } else {
167       pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
168     }
169   });
170
171   return f;
172 }
173
174 // Variant: returns a Future
175 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
176 template <class T>
177 template <typename F, typename R, bool isTry, typename... Args>
178 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
179 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
180   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
181   typedef typename R::ReturnsFuture::Inner B;
182
183   throwIfInvalid();
184
185   Promise<B> p;
186   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
187
188   // grab the Future now before we lose our handle on the Promise
189   auto f = p.getFuture();
190   f.core_->setExecutorNoLock(getExecutor());
191
192   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
193       Try<T> && t) mutable {
194     auto ew = [&] {
195       if (!isTry && t.hasException()) {
196         return std::move(t.exception());
197       } else {
198         try {
199           auto f2 = funcm(t.template get<isTry, Args>()...);
200           // that didn't throw, now we can steal p
201           f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
202             p.setTry(std::move(b));
203           });
204           return exception_wrapper();
205         } catch (const std::exception& e) {
206           return exception_wrapper(std::current_exception(), e);
207         } catch (...) {
208           return exception_wrapper(std::current_exception());
209         }
210       }
211     }();
212     if (ew) {
213       pm.setException(std::move(ew));
214     }
215   });
216
217   return f;
218 }
219
220 template <typename T>
221 template <typename R, typename Caller, typename... Args>
222   Future<typename isFuture<R>::Inner>
223 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
224   typedef typename std::remove_cv<
225     typename std::remove_reference<
226       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
227   return then([instance, func](Try<T>&& t){
228     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
229   });
230 }
231
232 template <class T>
233 template <class Executor, class Arg, class... Args>
234 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
235   -> decltype(this->then(std::forward<Arg>(arg),
236                          std::forward<Args>(args)...))
237 {
238   auto oldX = getExecutor();
239   setExecutor(x);
240   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
241                via(oldX);
242 }
243
244 template <class T>
245 Future<Unit> Future<T>::then() {
246   return then([] () {});
247 }
248
249 // onError where the callback returns T
250 template <class T>
251 template <class F>
252 typename std::enable_if<
253   !detail::callableWith<F, exception_wrapper>::value &&
254   !detail::Extract<F>::ReturnsFuture::value,
255   Future<T>>::type
256 Future<T>::onError(F&& func) {
257   typedef typename detail::Extract<F>::FirstArg Exn;
258   static_assert(
259       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
260       "Return type of onError callback must be T or Future<T>");
261
262   Promise<T> p;
263   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
264   auto f = p.getFuture();
265
266   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
267       Try<T> && t) mutable {
268     if (!t.template withException<Exn>(
269             [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
270       pm.setTry(std::move(t));
271     }
272   });
273
274   return f;
275 }
276
277 // onError where the callback returns Future<T>
278 template <class T>
279 template <class F>
280 typename std::enable_if<
281   !detail::callableWith<F, exception_wrapper>::value &&
282   detail::Extract<F>::ReturnsFuture::value,
283   Future<T>>::type
284 Future<T>::onError(F&& func) {
285   static_assert(
286       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
287       "Return type of onError callback must be T or Future<T>");
288   typedef typename detail::Extract<F>::FirstArg Exn;
289
290   Promise<T> p;
291   auto f = p.getFuture();
292
293   setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
294       Try<T> && t) mutable {
295     if (!t.template withException<Exn>([&](Exn& e) {
296           auto ew = [&] {
297             try {
298               auto f2 = funcm(e);
299               f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
300                 pm.setTry(std::move(t2));
301               });
302               return exception_wrapper();
303             } catch (const std::exception& e2) {
304               return exception_wrapper(std::current_exception(), e2);
305             } catch (...) {
306               return exception_wrapper(std::current_exception());
307             }
308           }();
309           if (ew) {
310             pm.setException(std::move(ew));
311           }
312         })) {
313       pm.setTry(std::move(t));
314     }
315   });
316
317   return f;
318 }
319
320 template <class T>
321 template <class F>
322 Future<T> Future<T>::ensure(F&& func) {
323   return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
324     funcw();
325     return makeFuture(std::move(t));
326   });
327 }
328
329 template <class T>
330 template <class F>
331 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
332   return within(dur, tk).onError([funcw = std::forward<F>(func)](
333       TimedOut const&) { return funcw(); });
334 }
335
336 template <class T>
337 template <class F>
338 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
339                             detail::Extract<F>::ReturnsFuture::value,
340                         Future<T>>::type
341 Future<T>::onError(F&& func) {
342   static_assert(
343       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
344       "Return type of onError callback must be T or Future<T>");
345
346   Promise<T> p;
347   auto f = p.getFuture();
348   setCallback_(
349       [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
350         if (t.hasException()) {
351           auto ew = [&] {
352             try {
353               auto f2 = funcm(std::move(t.exception()));
354               f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
355                 pm.setTry(std::move(t2));
356               });
357               return exception_wrapper();
358             } catch (const std::exception& e2) {
359               return exception_wrapper(std::current_exception(), e2);
360             } catch (...) {
361               return exception_wrapper(std::current_exception());
362             }
363           }();
364           if (ew) {
365             pm.setException(std::move(ew));
366           }
367         } else {
368           pm.setTry(std::move(t));
369         }
370       });
371
372   return f;
373 }
374
375 // onError(exception_wrapper) that returns T
376 template <class T>
377 template <class F>
378 typename std::enable_if<
379   detail::callableWith<F, exception_wrapper>::value &&
380   !detail::Extract<F>::ReturnsFuture::value,
381   Future<T>>::type
382 Future<T>::onError(F&& func) {
383   static_assert(
384       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
385       "Return type of onError callback must be T or Future<T>");
386
387   Promise<T> p;
388   auto f = p.getFuture();
389   setCallback_(
390       [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
391         if (t.hasException()) {
392           pm.setWith([&] { return funcm(std::move(t.exception())); });
393         } else {
394           pm.setTry(std::move(t));
395         }
396       });
397
398   return f;
399 }
400
401 template <class T>
402 typename std::add_lvalue_reference<T>::type Future<T>::value() {
403   throwIfInvalid();
404
405   return core_->getTry().value();
406 }
407
408 template <class T>
409 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
410   throwIfInvalid();
411
412   return core_->getTry().value();
413 }
414
415 template <class T>
416 Try<T>& Future<T>::getTry() {
417   throwIfInvalid();
418
419   return core_->getTry();
420 }
421
422 template <class T>
423 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
424   return waitVia(e).getTry();
425 }
426
427 template <class T>
428 Optional<Try<T>> Future<T>::poll() {
429   Optional<Try<T>> o;
430   if (core_->ready()) {
431     o = std::move(core_->getTry());
432   }
433   return o;
434 }
435
436 template <class T>
437 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
438   throwIfInvalid();
439
440   setExecutor(executor, priority);
441
442   return std::move(*this);
443 }
444
445 template <class T>
446 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
447   throwIfInvalid();
448
449   Promise<T> p;
450   auto f = p.getFuture();
451   then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
452   return std::move(f).via(executor, priority);
453 }
454
455 template <class Func>
456 auto via(Executor* x, Func&& func)
457   -> Future<typename isFuture<decltype(func())>::Inner>
458 {
459   // TODO make this actually more performant. :-P #7260175
460   return via(x).then(std::forward<Func>(func));
461 }
462
463 template <class T>
464 bool Future<T>::isReady() const {
465   throwIfInvalid();
466   return core_->ready();
467 }
468
469 template <class T>
470 bool Future<T>::hasValue() {
471   return getTry().hasValue();
472 }
473
474 template <class T>
475 bool Future<T>::hasException() {
476   return getTry().hasException();
477 }
478
479 template <class T>
480 void Future<T>::raise(exception_wrapper exception) {
481   core_->raise(std::move(exception));
482 }
483
484 // makeFuture
485
486 template <class T>
487 Future<typename std::decay<T>::type> makeFuture(T&& t) {
488   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
489 }
490
491 inline // for multiple translation units
492 Future<Unit> makeFuture() {
493   return makeFuture(Unit{});
494 }
495
496 // makeFutureWith(Future<T>()) -> Future<T>
497 template <class F>
498 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
499                         typename std::result_of<F()>::type>::type
500 makeFutureWith(F&& func) {
501   using InnerType =
502       typename isFuture<typename std::result_of<F()>::type>::Inner;
503   try {
504     return func();
505   } catch (std::exception& e) {
506     return makeFuture<InnerType>(
507         exception_wrapper(std::current_exception(), e));
508   } catch (...) {
509     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
510   }
511 }
512
513 // makeFutureWith(T()) -> Future<T>
514 // makeFutureWith(void()) -> Future<Unit>
515 template <class F>
516 typename std::enable_if<
517     !(isFuture<typename std::result_of<F()>::type>::value),
518     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
519 makeFutureWith(F&& func) {
520   using LiftedResult =
521       typename Unit::Lift<typename std::result_of<F()>::type>::type;
522   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
523     return func();
524   }));
525 }
526
527 template <class T>
528 Future<T> makeFuture(std::exception_ptr const& e) {
529   return makeFuture(Try<T>(e));
530 }
531
532 template <class T>
533 Future<T> makeFuture(exception_wrapper ew) {
534   return makeFuture(Try<T>(std::move(ew)));
535 }
536
537 template <class T, class E>
538 typename std::enable_if<std::is_base_of<std::exception, E>::value,
539                         Future<T>>::type
540 makeFuture(E const& e) {
541   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
542 }
543
544 template <class T>
545 Future<T> makeFuture(Try<T>&& t) {
546   return Future<T>(new detail::Core<T>(std::move(t)));
547 }
548
549 // via
550 Future<Unit> via(Executor* executor, int8_t priority) {
551   return makeFuture().via(executor, priority);
552 }
553
554 // mapSetCallback calls func(i, Try<T>) when every future completes
555
556 template <class T, class InputIterator, class F>
557 void mapSetCallback(InputIterator first, InputIterator last, F func) {
558   for (size_t i = 0; first != last; ++first, ++i) {
559     first->setCallback_([func, i](Try<T>&& t) {
560       func(i, std::move(t));
561     });
562   }
563 }
564
565 // collectAll (variadic)
566
567 template <typename... Fs>
568 typename detail::CollectAllVariadicContext<
569   typename std::decay<Fs>::type::value_type...>::type
570 collectAll(Fs&&... fs) {
571   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
572     typename std::decay<Fs>::type::value_type...>>();
573   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
574     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
575   return ctx->p.getFuture();
576 }
577
578 // collectAll (iterator)
579
580 template <class InputIterator>
581 Future<
582   std::vector<
583   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
584 collectAll(InputIterator first, InputIterator last) {
585   typedef
586     typename std::iterator_traits<InputIterator>::value_type::value_type T;
587
588   struct CollectAllContext {
589     CollectAllContext(int n) : results(n) {}
590     ~CollectAllContext() {
591       p.setValue(std::move(results));
592     }
593     Promise<std::vector<Try<T>>> p;
594     std::vector<Try<T>> results;
595   };
596
597   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
598   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
599     ctx->results[i] = std::move(t);
600   });
601   return ctx->p.getFuture();
602 }
603
604 // collect (iterator)
605
606 namespace detail {
607
608 template <typename T>
609 struct CollectContext {
610   struct Nothing {
611     explicit Nothing(int /* n */) {}
612   };
613
614   using Result = typename std::conditional<
615     std::is_void<T>::value,
616     void,
617     std::vector<T>>::type;
618
619   using InternalResult = typename std::conditional<
620     std::is_void<T>::value,
621     Nothing,
622     std::vector<Optional<T>>>::type;
623
624   explicit CollectContext(int n) : result(n) {}
625   ~CollectContext() {
626     if (!threw.exchange(true)) {
627       // map Optional<T> -> T
628       std::vector<T> finalResult;
629       finalResult.reserve(result.size());
630       std::transform(result.begin(), result.end(),
631                      std::back_inserter(finalResult),
632                      [](Optional<T>& o) { return std::move(o.value()); });
633       p.setValue(std::move(finalResult));
634     }
635   }
636   inline void setPartialResult(size_t i, Try<T>& t) {
637     result[i] = std::move(t.value());
638   }
639   Promise<Result> p;
640   InternalResult result;
641   std::atomic<bool> threw {false};
642 };
643
644 }
645
646 template <class InputIterator>
647 Future<typename detail::CollectContext<
648   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
649 collect(InputIterator first, InputIterator last) {
650   typedef
651     typename std::iterator_traits<InputIterator>::value_type::value_type T;
652
653   auto ctx = std::make_shared<detail::CollectContext<T>>(
654     std::distance(first, last));
655   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
656     if (t.hasException()) {
657        if (!ctx->threw.exchange(true)) {
658          ctx->p.setException(std::move(t.exception()));
659        }
660      } else if (!ctx->threw) {
661        ctx->setPartialResult(i, t);
662      }
663   });
664   return ctx->p.getFuture();
665 }
666
667 // collect (variadic)
668
669 template <typename... Fs>
670 typename detail::CollectVariadicContext<
671   typename std::decay<Fs>::type::value_type...>::type
672 collect(Fs&&... fs) {
673   auto ctx = std::make_shared<detail::CollectVariadicContext<
674     typename std::decay<Fs>::type::value_type...>>();
675   detail::collectVariadicHelper<detail::CollectVariadicContext>(
676     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
677   return ctx->p.getFuture();
678 }
679
680 // collectAny (iterator)
681
682 template <class InputIterator>
683 Future<
684   std::pair<size_t,
685             Try<
686               typename
687               std::iterator_traits<InputIterator>::value_type::value_type>>>
688 collectAny(InputIterator first, InputIterator last) {
689   typedef
690     typename std::iterator_traits<InputIterator>::value_type::value_type T;
691
692   struct CollectAnyContext {
693     CollectAnyContext() {}
694     Promise<std::pair<size_t, Try<T>>> p;
695     std::atomic<bool> done {false};
696   };
697
698   auto ctx = std::make_shared<CollectAnyContext>();
699   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
700     if (!ctx->done.exchange(true)) {
701       ctx->p.setValue(std::make_pair(i, std::move(t)));
702     }
703   });
704   return ctx->p.getFuture();
705 }
706
707 // collectAnyWithoutException (iterator)
708
709 template <class InputIterator>
710 Future<std::pair<
711     size_t,
712     typename std::iterator_traits<InputIterator>::value_type::value_type>>
713 collectAnyWithoutException(InputIterator first, InputIterator last) {
714   typedef
715       typename std::iterator_traits<InputIterator>::value_type::value_type T;
716
717   struct CollectAnyWithoutExceptionContext {
718     CollectAnyWithoutExceptionContext(){}
719     Promise<std::pair<size_t, T>> p;
720     std::atomic<bool> done{false};
721     std::atomic<size_t> nFulfilled{0};
722     size_t nTotal;
723   };
724
725   auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
726   ctx->nTotal = std::distance(first, last);
727
728   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
729     if (!t.hasException() && !ctx->done.exchange(true)) {
730       ctx->p.setValue(std::make_pair(i, std::move(t.value())));
731     } else if (++ctx->nFulfilled == ctx->nTotal) {
732       ctx->p.setException(t.exception());
733     }
734   });
735   return ctx->p.getFuture();
736 }
737
738 // collectN (iterator)
739
740 template <class InputIterator>
741 Future<std::vector<std::pair<size_t, Try<typename
742   std::iterator_traits<InputIterator>::value_type::value_type>>>>
743 collectN(InputIterator first, InputIterator last, size_t n) {
744   typedef typename
745     std::iterator_traits<InputIterator>::value_type::value_type T;
746   typedef std::vector<std::pair<size_t, Try<T>>> V;
747
748   struct CollectNContext {
749     V v;
750     std::atomic<size_t> completed = {0};
751     Promise<V> p;
752   };
753   auto ctx = std::make_shared<CollectNContext>();
754
755   if (size_t(std::distance(first, last)) < n) {
756     ctx->p.setException(std::runtime_error("Not enough futures"));
757   } else {
758     // for each completed Future, increase count and add to vector, until we
759     // have n completed futures at which point we fulfil our Promise with the
760     // vector
761     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
762       auto c = ++ctx->completed;
763       if (c <= n) {
764         assert(ctx->v.size() < n);
765         ctx->v.emplace_back(i, std::move(t));
766         if (c == n) {
767           ctx->p.setTry(Try<V>(std::move(ctx->v)));
768         }
769       }
770     });
771   }
772
773   return ctx->p.getFuture();
774 }
775
776 // reduce (iterator)
777
778 template <class It, class T, class F>
779 Future<T> reduce(It first, It last, T&& initial, F&& func) {
780   if (first == last) {
781     return makeFuture(std::move(initial));
782   }
783
784   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
785   typedef
786       typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
787                                 Try<ItT>,
788                                 ItT>::type Arg;
789   typedef isTry<Arg> IsTry;
790
791   auto sfunc = std::make_shared<F>(std::move(func));
792
793   auto f = first->then(
794       [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
795         return (*sfunc)(
796             std::move(minitial), head.template get<IsTry::value, Arg&&>());
797       });
798
799   for (++first; first != last; ++first) {
800     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
801       return (*sfunc)(std::move(std::get<0>(t).value()),
802                   // Either return a ItT&& or a Try<ItT>&& depending
803                   // on the type of the argument of func.
804                   std::get<1>(t).template get<IsTry::value, Arg&&>());
805     });
806   }
807
808   return f;
809 }
810
811 // window (collection)
812
813 template <class Collection, class F, class ItT, class Result>
814 std::vector<Future<Result>>
815 window(Collection input, F func, size_t n) {
816   struct WindowContext {
817     WindowContext(Collection&& i, F&& fn)
818         : input_(std::move(i)), promises_(input_.size()),
819           func_(std::move(fn))
820       {}
821     std::atomic<size_t> i_ {0};
822     Collection input_;
823     std::vector<Promise<Result>> promises_;
824     F func_;
825
826     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
827       size_t i = ctx->i_++;
828       if (i < ctx->input_.size()) {
829         // Using setCallback_ directly since we don't need the Future
830         ctx->func_(std::move(ctx->input_[i])).setCallback_(
831           // ctx is captured by value
832           [ctx, i](Try<Result>&& t) {
833             ctx->promises_[i].setTry(std::move(t));
834             // Chain another future onto this one
835             spawn(std::move(ctx));
836           });
837       }
838     }
839   };
840
841   auto max = std::min(n, input.size());
842
843   auto ctx = std::make_shared<WindowContext>(
844     std::move(input), std::move(func));
845
846   for (size_t i = 0; i < max; ++i) {
847     // Start the first n Futures
848     WindowContext::spawn(ctx);
849   }
850
851   std::vector<Future<Result>> futures;
852   futures.reserve(ctx->promises_.size());
853   for (auto& promise : ctx->promises_) {
854     futures.emplace_back(promise.getFuture());
855   }
856
857   return futures;
858 }
859
860 // reduce
861
862 template <class T>
863 template <class I, class F>
864 Future<I> Future<T>::reduce(I&& initial, F&& func) {
865   return then([
866     minitial = std::forward<I>(initial),
867     mfunc = std::forward<F>(func)
868   ](T& vals) mutable {
869     auto ret = std::move(minitial);
870     for (auto& val : vals) {
871       ret = mfunc(std::move(ret), std::move(val));
872     }
873     return ret;
874   });
875 }
876
877 // unorderedReduce (iterator)
878
879 template <class It, class T, class F, class ItT, class Arg>
880 Future<T> unorderedReduce(It first, It last, T initial, F func) {
881   if (first == last) {
882     return makeFuture(std::move(initial));
883   }
884
885   typedef isTry<Arg> IsTry;
886
887   struct UnorderedReduceContext {
888     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
889         : lock_(), memo_(makeFuture<T>(std::move(memo))),
890           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
891       {}
892     folly::MicroSpinLock lock_; // protects memo_ and numThens_
893     Future<T> memo_;
894     F func_;
895     size_t numThens_; // how many Futures completed and called .then()
896     size_t numFutures_; // how many Futures in total
897     Promise<T> promise_;
898   };
899
900   auto ctx = std::make_shared<UnorderedReduceContext>(
901     std::move(initial), std::move(func), std::distance(first, last));
902
903   mapSetCallback<ItT>(
904       first,
905       last,
906       [ctx](size_t /* i */, Try<ItT>&& t) {
907         // Futures can be completed in any order, simultaneously.
908         // To make this non-blocking, we create a new Future chain in
909         // the order of completion to reduce the values.
910         // The spinlock just protects chaining a new Future, not actually
911         // executing the reduce, which should be really fast.
912         folly::MSLGuard lock(ctx->lock_);
913         ctx->memo_ =
914             ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
915               // Either return a ItT&& or a Try<ItT>&& depending
916               // on the type of the argument of func.
917               return ctx->func_(std::move(v),
918                                 mt.template get<IsTry::value, Arg&&>());
919             });
920         if (++ctx->numThens_ == ctx->numFutures_) {
921           // After reducing the value of the last Future, fulfill the Promise
922           ctx->memo_.setCallback_(
923               [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
924         }
925       });
926
927   return ctx->promise_.getFuture();
928 }
929
930 // within
931
932 template <class T>
933 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
934   return within(dur, TimedOut(), tk);
935 }
936
937 template <class T>
938 template <class E>
939 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
940
941   struct Context {
942     Context(E ex) : exception(std::move(ex)), promise() {}
943     E exception;
944     Future<Unit> thisFuture;
945     Promise<T> promise;
946     std::atomic<bool> token {false};
947   };
948
949   std::shared_ptr<Timekeeper> tks;
950   if (!tk) {
951     tks = folly::detail::getTimekeeperSingleton();
952     tk = DCHECK_NOTNULL(tks.get());
953   }
954
955   auto ctx = std::make_shared<Context>(std::move(e));
956
957   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
958     // TODO: "this" completed first, cancel "after"
959     if (ctx->token.exchange(true) == false) {
960       ctx->promise.setTry(std::move(t));
961     }
962   });
963
964   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
965     // "after" completed first, cancel "this"
966     ctx->thisFuture.raise(TimedOut());
967     if (ctx->token.exchange(true) == false) {
968       if (t.hasException()) {
969         ctx->promise.setException(std::move(t.exception()));
970       } else {
971         ctx->promise.setException(std::move(ctx->exception));
972       }
973     }
974   });
975
976   return ctx->promise.getFuture().via(getExecutor());
977 }
978
979 // delayed
980
981 template <class T>
982 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
983   return collectAll(*this, futures::sleep(dur, tk))
984     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
985       Try<T>& t = std::get<0>(tup);
986       return makeFuture<T>(std::move(t));
987     });
988 }
989
990 namespace detail {
991
992 template <class T>
993 void waitImpl(Future<T>& f) {
994   // short-circuit if there's nothing to do
995   if (f.isReady()) return;
996
997   FutureBatonType baton;
998   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
999   baton.wait();
1000   assert(f.isReady());
1001 }
1002
1003 template <class T>
1004 void waitImpl(Future<T>& f, Duration dur) {
1005   // short-circuit if there's nothing to do
1006   if (f.isReady()) {
1007     return;
1008   }
1009
1010   Promise<T> promise;
1011   auto ret = promise.getFuture();
1012   auto baton = std::make_shared<FutureBatonType>();
1013   f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1014     promise.setTry(std::move(t));
1015     baton->post();
1016   });
1017   f = std::move(ret);
1018   if (baton->timed_wait(dur)) {
1019     assert(f.isReady());
1020   }
1021 }
1022
1023 template <class T>
1024 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1025   // Set callback so to ensure that the via executor has something on it
1026   // so that once the preceding future triggers this callback, drive will
1027   // always have a callback to satisfy it
1028   if (f.isReady())
1029     return;
1030   f = f.via(e).then([](T&& t) { return std::move(t); });
1031   while (!f.isReady()) {
1032     e->drive();
1033   }
1034   assert(f.isReady());
1035 }
1036
1037 } // detail
1038
1039 template <class T>
1040 Future<T>& Future<T>::wait() & {
1041   detail::waitImpl(*this);
1042   return *this;
1043 }
1044
1045 template <class T>
1046 Future<T>&& Future<T>::wait() && {
1047   detail::waitImpl(*this);
1048   return std::move(*this);
1049 }
1050
1051 template <class T>
1052 Future<T>& Future<T>::wait(Duration dur) & {
1053   detail::waitImpl(*this, dur);
1054   return *this;
1055 }
1056
1057 template <class T>
1058 Future<T>&& Future<T>::wait(Duration dur) && {
1059   detail::waitImpl(*this, dur);
1060   return std::move(*this);
1061 }
1062
1063 template <class T>
1064 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1065   detail::waitViaImpl(*this, e);
1066   return *this;
1067 }
1068
1069 template <class T>
1070 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1071   detail::waitViaImpl(*this, e);
1072   return std::move(*this);
1073 }
1074
1075 template <class T>
1076 T Future<T>::get() {
1077   return std::move(wait().value());
1078 }
1079
1080 template <class T>
1081 T Future<T>::get(Duration dur) {
1082   wait(dur);
1083   if (isReady()) {
1084     return std::move(value());
1085   } else {
1086     throw TimedOut();
1087   }
1088 }
1089
1090 template <class T>
1091 T Future<T>::getVia(DrivableExecutor* e) {
1092   return std::move(waitVia(e).value());
1093 }
1094
1095 namespace detail {
1096   template <class T>
1097   struct TryEquals {
1098     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1099       return t1.value() == t2.value();
1100     }
1101   };
1102 }
1103
1104 template <class T>
1105 Future<bool> Future<T>::willEqual(Future<T>& f) {
1106   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1107     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1108       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1109     } else {
1110       return false;
1111       }
1112   });
1113 }
1114
1115 template <class T>
1116 template <class F>
1117 Future<T> Future<T>::filter(F&& predicate) {
1118   return this->then([p = std::forward<F>(predicate)](T val) {
1119     T const& valConstRef = val;
1120     if (!p(valConstRef)) {
1121       throw PredicateDoesNotObtain();
1122     }
1123     return val;
1124   });
1125 }
1126
1127 template <class T>
1128 template <class Callback>
1129 auto Future<T>::thenMulti(Callback&& fn)
1130     -> decltype(this->then(std::forward<Callback>(fn))) {
1131   // thenMulti with one callback is just a then
1132   return then(std::forward<Callback>(fn));
1133 }
1134
1135 template <class T>
1136 template <class Callback, class... Callbacks>
1137 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1138     -> decltype(this->then(std::forward<Callback>(fn)).
1139                       thenMulti(std::forward<Callbacks>(fns)...)) {
1140   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1141   return then(std::forward<Callback>(fn)).
1142          thenMulti(std::forward<Callbacks>(fns)...);
1143 }
1144
1145 template <class T>
1146 template <class Callback, class... Callbacks>
1147 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1148                                       Callbacks&&... fns)
1149     -> decltype(this->then(std::forward<Callback>(fn)).
1150                       thenMulti(std::forward<Callbacks>(fns)...)) {
1151   // thenMultiExecutor with two callbacks is
1152   // via(x).then(a).thenMulti(b, ...).via(oldX)
1153   auto oldX = getExecutor();
1154   setExecutor(x);
1155   return then(std::forward<Callback>(fn)).
1156          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1157 }
1158
1159 template <class T>
1160 template <class Callback>
1161 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1162     -> decltype(this->then(std::forward<Callback>(fn))) {
1163   // thenMulti with one callback is just a then with an executor
1164   return then(x, std::forward<Callback>(fn));
1165 }
1166
1167 template <class F>
1168 inline Future<Unit> when(bool p, F&& thunk) {
1169   return p ? std::forward<F>(thunk)().unit() : makeFuture();
1170 }
1171
1172 template <class P, class F>
1173 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1174   if (predicate()) {
1175     auto future = thunk();
1176     return future.then([
1177       predicate = std::forward<P>(predicate),
1178       thunk = std::forward<F>(thunk)
1179     ]() mutable {
1180       return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1181     });
1182   }
1183   return makeFuture();
1184 }
1185
1186 template <class F>
1187 Future<Unit> times(const int n, F&& thunk) {
1188   return folly::whileDo(
1189       [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
1190         return count->fetch_add(1) < n;
1191       },
1192       std::forward<F>(thunk));
1193 }
1194
1195 namespace futures {
1196   template <class It, class F, class ItT, class Result>
1197   std::vector<Future<Result>> map(It first, It last, F func) {
1198     std::vector<Future<Result>> results;
1199     for (auto it = first; it != last; it++) {
1200       results.push_back(it->then(func));
1201     }
1202     return results;
1203   }
1204 }
1205
1206 namespace futures {
1207
1208 namespace detail {
1209
1210 struct retrying_policy_raw_tag {};
1211 struct retrying_policy_fut_tag {};
1212
1213 template <class Policy>
1214 struct retrying_policy_traits {
1215   using ew = exception_wrapper;
1216   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1217   template <class Ret>
1218   using has_op = typename std::integral_constant<bool,
1219         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1220         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1221   using is_raw = has_op<bool>;
1222   using is_fut = has_op<Future<bool>>;
1223   using tag = typename std::conditional<
1224         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1225         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1226 };
1227
1228 template <class Policy, class FF>
1229 typename std::result_of<FF(size_t)>::type
1230 retrying(size_t k, Policy&& p, FF&& ff) {
1231   using F = typename std::result_of<FF(size_t)>::type;
1232   using T = typename F::value_type;
1233   auto f = ff(k++);
1234   return f.onError(
1235       [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
1236           exception_wrapper x) mutable {
1237         auto q = pm(k, x);
1238         return q.then(
1239             [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
1240                 bool r) mutable {
1241               return r ? retrying(k, std::move(pm), std::move(ffm))
1242                        : makeFuture<T>(std::move(xm));
1243             });
1244       });
1245 }
1246
1247 template <class Policy, class FF>
1248 typename std::result_of<FF(size_t)>::type
1249 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1250   auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1251     return makeFuture<bool>(pm(k, x));
1252   };
1253   return retrying(0, std::move(q), std::forward<FF>(ff));
1254 }
1255
1256 template <class Policy, class FF>
1257 typename std::result_of<FF(size_t)>::type
1258 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1259   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1260 }
1261
1262 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1263 template <class URNG>
1264 Duration retryingJitteredExponentialBackoffDur(
1265     size_t n,
1266     Duration backoff_min,
1267     Duration backoff_max,
1268     double jitter_param,
1269     URNG& rng) {
1270   using d = Duration;
1271   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1272   auto jitter = std::exp(dist(rng));
1273   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1274   return std::max(backoff_min, std::min(backoff_max, backoff));
1275 }
1276
1277 template <class Policy, class URNG>
1278 std::function<Future<bool>(size_t, const exception_wrapper&)>
1279 retryingPolicyCappedJitteredExponentialBackoff(
1280     size_t max_tries,
1281     Duration backoff_min,
1282     Duration backoff_max,
1283     double jitter_param,
1284     URNG&& rng,
1285     Policy&& p) {
1286   return [
1287     pm = std::forward<Policy>(p),
1288     max_tries,
1289     backoff_min,
1290     backoff_max,
1291     jitter_param,
1292     rngp = std::forward<URNG>(rng)
1293   ](size_t n, const exception_wrapper& ex) mutable {
1294     if (n == max_tries) {
1295       return makeFuture(false);
1296     }
1297     return pm(n, ex).then(
1298         [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1299             bool v) mutable {
1300           if (!v) {
1301             return makeFuture(false);
1302           }
1303           auto backoff = detail::retryingJitteredExponentialBackoffDur(
1304               n, backoff_min, backoff_max, jitter_param, rngp);
1305           return futures::sleep(backoff).then([] { return true; });
1306         });
1307   };
1308 }
1309
1310 template <class Policy, class URNG>
1311 std::function<Future<bool>(size_t, const exception_wrapper&)>
1312 retryingPolicyCappedJitteredExponentialBackoff(
1313     size_t max_tries,
1314     Duration backoff_min,
1315     Duration backoff_max,
1316     double jitter_param,
1317     URNG&& rng,
1318     Policy&& p,
1319     retrying_policy_raw_tag) {
1320   auto q = [pm = std::forward<Policy>(p)](
1321       size_t n, const exception_wrapper& e) {
1322     return makeFuture(pm(n, e));
1323   };
1324   return retryingPolicyCappedJitteredExponentialBackoff(
1325       max_tries,
1326       backoff_min,
1327       backoff_max,
1328       jitter_param,
1329       std::forward<URNG>(rng),
1330       std::move(q));
1331 }
1332
1333 template <class Policy, class URNG>
1334 std::function<Future<bool>(size_t, const exception_wrapper&)>
1335 retryingPolicyCappedJitteredExponentialBackoff(
1336     size_t max_tries,
1337     Duration backoff_min,
1338     Duration backoff_max,
1339     double jitter_param,
1340     URNG&& rng,
1341     Policy&& p,
1342     retrying_policy_fut_tag) {
1343   return retryingPolicyCappedJitteredExponentialBackoff(
1344       max_tries,
1345       backoff_min,
1346       backoff_max,
1347       jitter_param,
1348       std::forward<URNG>(rng),
1349       std::forward<Policy>(p));
1350 }
1351 }
1352
1353 template <class Policy, class FF>
1354 typename std::result_of<FF(size_t)>::type
1355 retrying(Policy&& p, FF&& ff) {
1356   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1357   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1358 }
1359
1360 inline
1361 std::function<bool(size_t, const exception_wrapper&)>
1362 retryingPolicyBasic(
1363     size_t max_tries) {
1364   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1365 }
1366
1367 template <class Policy, class URNG>
1368 std::function<Future<bool>(size_t, const exception_wrapper&)>
1369 retryingPolicyCappedJitteredExponentialBackoff(
1370     size_t max_tries,
1371     Duration backoff_min,
1372     Duration backoff_max,
1373     double jitter_param,
1374     URNG&& rng,
1375     Policy&& p) {
1376   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1377   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1378       max_tries,
1379       backoff_min,
1380       backoff_max,
1381       jitter_param,
1382       std::forward<URNG>(rng),
1383       std::forward<Policy>(p),
1384       tag());
1385 }
1386
1387 inline
1388 std::function<Future<bool>(size_t, const exception_wrapper&)>
1389 retryingPolicyCappedJitteredExponentialBackoff(
1390     size_t max_tries,
1391     Duration backoff_min,
1392     Duration backoff_max,
1393     double jitter_param) {
1394   auto p = [](size_t, const exception_wrapper&) { return true; };
1395   return retryingPolicyCappedJitteredExponentialBackoff(
1396       max_tries,
1397       backoff_min,
1398       backoff_max,
1399       jitter_param,
1400       ThreadLocalPRNG(),
1401       std::move(p));
1402 }
1403
1404 }
1405
1406 // Instantiate the most common Future types to save compile time
1407 extern template class Future<Unit>;
1408 extern template class Future<bool>;
1409 extern template class Future<int>;
1410 extern template class Future<int64_t>;
1411 extern template class Future<std::string>;
1412 extern template class Future<double>;
1413
1414 } // namespace folly