Future<typename>::getTryVia
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
33 #else
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
36 #endif
37
38 namespace folly {
39
40 class Timekeeper;
41
42 namespace detail {
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
45 #else
46 typedef folly::Baton<> FutureBatonType;
47 #endif
48 }
49
50 namespace detail {
51   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52 }
53
54 template <class T>
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56   other.core_ = nullptr;
57 }
58
59 template <class T>
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61   std::swap(core_, other.core_);
62   return *this;
63 }
64
65 template <class T>
66 template <typename U, typename>
67 Future<T>::Future(Future<U>&& other) noexcept
68     : core_(detail::Core<T>::convert(other.core_)) {
69   other.core_ = nullptr;
70 }
71
72 template <class T>
73 template <typename U, typename>
74 Future<T>& Future<T>::operator=(Future<U>&& other) noexcept {
75   std::swap(core_, detail::Core<T>::convert(other.core_));
76   return *this;
77 }
78
79 template <class T>
80 template <class T2, typename>
81 Future<T>::Future(T2&& val)
82     : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
83
84 template <class T>
85 template <typename T2>
86 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
87     : core_(new detail::Core<T>(Try<T>(T()))) {}
88
89 template <class T>
90 Future<T>::~Future() {
91   detach();
92 }
93
94 template <class T>
95 void Future<T>::detach() {
96   if (core_) {
97     core_->detachFuture();
98     core_ = nullptr;
99   }
100 }
101
102 template <class T>
103 void Future<T>::throwIfInvalid() const {
104   if (!core_)
105     throw NoState();
106 }
107
108 template <class T>
109 template <class F>
110 void Future<T>::setCallback_(F&& func) {
111   throwIfInvalid();
112   core_->setCallback(std::forward<F>(func));
113 }
114
115 // unwrap
116
117 template <class T>
118 template <class F>
119 typename std::enable_if<isFuture<F>::value,
120                         Future<typename isFuture<T>::Inner>>::type
121 Future<T>::unwrap() {
122   return then([](Future<typename isFuture<T>::Inner> internal_future) {
123       return internal_future;
124   });
125 }
126
127 // then
128
129 // Variant: returns a value
130 // e.g. f.then([](Try<T>&& t){ return t.value(); });
131 template <class T>
132 template <typename F, typename R, bool isTry, typename... Args>
133 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
134 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
135   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
136   typedef typename R::ReturnsFuture::Inner B;
137
138   throwIfInvalid();
139
140   Promise<B> p;
141   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
142
143   // grab the Future now before we lose our handle on the Promise
144   auto f = p.getFuture();
145   f.core_->setExecutorNoLock(getExecutor());
146
147   /* This is a bit tricky.
148
149      We can't just close over *this in case this Future gets moved. So we
150      make a new dummy Future. We could figure out something more
151      sophisticated that avoids making a new Future object when it can, as an
152      optimization. But this is correct.
153
154      core_ can't be moved, it is explicitly disallowed (as is copying). But
155      if there's ever a reason to allow it, this is one place that makes that
156      assumption and would need to be fixed. We use a standard shared pointer
157      for core_ (by copying it in), which means in essence obj holds a shared
158      pointer to itself.  But this shouldn't leak because Promise will not
159      outlive the continuation, because Promise will setException() with a
160      broken Promise if it is destructed before completed. We could use a
161      weak pointer but it would have to be converted to a shared pointer when
162      func is executed (because the Future returned by func may possibly
163      persist beyond the callback, if it gets moved), and so it is an
164      optimization to just make it shared from the get-go.
165
166      Two subtle but important points about this design. detail::Core has no
167      back pointers to Future or Promise, so if Future or Promise get moved
168      (and they will be moved in performant code) we don't have to do
169      anything fancy. And because we store the continuation in the
170      detail::Core, not in the Future, we can execute the continuation even
171      after the Future has gone out of scope. This is an intentional design
172      decision. It is likely we will want to be able to cancel a continuation
173      in some circumstances, but I think it should be explicit not implicit
174      in the destruction of the Future used to create it.
175      */
176   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
177       Try<T> && t) mutable {
178     if (!isTry && t.hasException()) {
179       pm.setException(std::move(t.exception()));
180     } else {
181       pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
182     }
183   });
184
185   return f;
186 }
187
188 // Variant: returns a Future
189 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
190 template <class T>
191 template <typename F, typename R, bool isTry, typename... Args>
192 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
193 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
194   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
195   typedef typename R::ReturnsFuture::Inner B;
196
197   throwIfInvalid();
198
199   Promise<B> p;
200   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
201
202   // grab the Future now before we lose our handle on the Promise
203   auto f = p.getFuture();
204   f.core_->setExecutorNoLock(getExecutor());
205
206   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
207       Try<T> && t) mutable {
208     if (!isTry && t.hasException()) {
209       pm.setException(std::move(t.exception()));
210     } else {
211       try {
212         auto f2 = funcm(t.template get<isTry, Args>()...);
213         // that didn't throw, now we can steal p
214         f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
215           p.setTry(std::move(b));
216         });
217       } catch (const std::exception& e) {
218         pm.setException(exception_wrapper(std::current_exception(), e));
219       } catch (...) {
220         pm.setException(exception_wrapper(std::current_exception()));
221       }
222     }
223   });
224
225   return f;
226 }
227
228 template <typename T>
229 template <typename R, typename Caller, typename... Args>
230   Future<typename isFuture<R>::Inner>
231 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
232   typedef typename std::remove_cv<
233     typename std::remove_reference<
234       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
235   return then([instance, func](Try<T>&& t){
236     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
237   });
238 }
239
240 template <class T>
241 template <class Executor, class Arg, class... Args>
242 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
243   -> decltype(this->then(std::forward<Arg>(arg),
244                          std::forward<Args>(args)...))
245 {
246   auto oldX = getExecutor();
247   setExecutor(x);
248   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
249                via(oldX);
250 }
251
252 template <class T>
253 Future<Unit> Future<T>::then() {
254   return then([] () {});
255 }
256
257 // onError where the callback returns T
258 template <class T>
259 template <class F>
260 typename std::enable_if<
261   !detail::callableWith<F, exception_wrapper>::value &&
262   !detail::Extract<F>::ReturnsFuture::value,
263   Future<T>>::type
264 Future<T>::onError(F&& func) {
265   typedef typename detail::Extract<F>::FirstArg Exn;
266   static_assert(
267       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
268       "Return type of onError callback must be T or Future<T>");
269
270   Promise<T> p;
271   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
272   auto f = p.getFuture();
273
274   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
275       Try<T> && t) mutable {
276     if (!t.template withException<Exn>(
277             [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
278       pm.setTry(std::move(t));
279     }
280   });
281
282   return f;
283 }
284
285 // onError where the callback returns Future<T>
286 template <class T>
287 template <class F>
288 typename std::enable_if<
289   !detail::callableWith<F, exception_wrapper>::value &&
290   detail::Extract<F>::ReturnsFuture::value,
291   Future<T>>::type
292 Future<T>::onError(F&& func) {
293   static_assert(
294       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
295       "Return type of onError callback must be T or Future<T>");
296   typedef typename detail::Extract<F>::FirstArg Exn;
297
298   Promise<T> p;
299   auto f = p.getFuture();
300
301   setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
302       Try<T> && t) mutable {
303     if (!t.template withException<Exn>([&](Exn& e) {
304           try {
305             auto f2 = funcm(e);
306             f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
307               pm.setTry(std::move(t2));
308             });
309           } catch (const std::exception& e2) {
310             pm.setException(exception_wrapper(std::current_exception(), e2));
311           } catch (...) {
312             pm.setException(exception_wrapper(std::current_exception()));
313           }
314         })) {
315       pm.setTry(std::move(t));
316     }
317   });
318
319   return f;
320 }
321
322 template <class T>
323 template <class F>
324 Future<T> Future<T>::ensure(F&& func) {
325   return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
326     funcw();
327     return makeFuture(std::move(t));
328   });
329 }
330
331 template <class T>
332 template <class F>
333 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
334   return within(dur, tk).onError([funcw = std::forward<F>(func)](
335       TimedOut const&) { return funcw(); });
336 }
337
338 template <class T>
339 template <class F>
340 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
341                             detail::Extract<F>::ReturnsFuture::value,
342                         Future<T>>::type
343 Future<T>::onError(F&& func) {
344   static_assert(
345       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
346       "Return type of onError callback must be T or Future<T>");
347
348   Promise<T> p;
349   auto f = p.getFuture();
350   setCallback_(
351       [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
352         if (t.hasException()) {
353           try {
354             auto f2 = funcm(std::move(t.exception()));
355             f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
356               pm.setTry(std::move(t2));
357             });
358           } catch (const std::exception& e2) {
359             pm.setException(exception_wrapper(std::current_exception(), e2));
360           } catch (...) {
361             pm.setException(exception_wrapper(std::current_exception()));
362           }
363         } else {
364           pm.setTry(std::move(t));
365         }
366       });
367
368   return f;
369 }
370
371 // onError(exception_wrapper) that returns T
372 template <class T>
373 template <class F>
374 typename std::enable_if<
375   detail::callableWith<F, exception_wrapper>::value &&
376   !detail::Extract<F>::ReturnsFuture::value,
377   Future<T>>::type
378 Future<T>::onError(F&& func) {
379   static_assert(
380       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
381       "Return type of onError callback must be T or Future<T>");
382
383   Promise<T> p;
384   auto f = p.getFuture();
385   setCallback_(
386       [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
387         if (t.hasException()) {
388           pm.setWith([&] { return funcm(std::move(t.exception())); });
389         } else {
390           pm.setTry(std::move(t));
391         }
392       });
393
394   return f;
395 }
396
397 template <class T>
398 typename std::add_lvalue_reference<T>::type Future<T>::value() {
399   throwIfInvalid();
400
401   return core_->getTry().value();
402 }
403
404 template <class T>
405 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
406   throwIfInvalid();
407
408   return core_->getTry().value();
409 }
410
411 template <class T>
412 Try<T>& Future<T>::getTry() {
413   throwIfInvalid();
414
415   return core_->getTry();
416 }
417
418 template <class T>
419 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
420   return waitVia(e).getTry();
421 }
422
423 template <class T>
424 Optional<Try<T>> Future<T>::poll() {
425   Optional<Try<T>> o;
426   if (core_->ready()) {
427     o = std::move(core_->getTry());
428   }
429   return o;
430 }
431
432 template <class T>
433 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
434   throwIfInvalid();
435
436   setExecutor(executor, priority);
437
438   return std::move(*this);
439 }
440
441 template <class T>
442 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
443   throwIfInvalid();
444
445   Promise<T> p;
446   auto f = p.getFuture();
447   then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
448   return std::move(f).via(executor, priority);
449 }
450
451 template <class Func>
452 auto via(Executor* x, Func&& func)
453   -> Future<typename isFuture<decltype(func())>::Inner>
454 {
455   // TODO make this actually more performant. :-P #7260175
456   return via(x).then(std::forward<Func>(func));
457 }
458
459 template <class T>
460 bool Future<T>::isReady() const {
461   throwIfInvalid();
462   return core_->ready();
463 }
464
465 template <class T>
466 bool Future<T>::hasValue() {
467   return getTry().hasValue();
468 }
469
470 template <class T>
471 bool Future<T>::hasException() {
472   return getTry().hasException();
473 }
474
475 template <class T>
476 void Future<T>::raise(exception_wrapper exception) {
477   core_->raise(std::move(exception));
478 }
479
480 // makeFuture
481
482 template <class T>
483 Future<typename std::decay<T>::type> makeFuture(T&& t) {
484   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
485 }
486
487 inline // for multiple translation units
488 Future<Unit> makeFuture() {
489   return makeFuture(Unit{});
490 }
491
492 // makeFutureWith(Future<T>()) -> Future<T>
493 template <class F>
494 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
495                         typename std::result_of<F()>::type>::type
496 makeFutureWith(F&& func) {
497   using InnerType =
498       typename isFuture<typename std::result_of<F()>::type>::Inner;
499   try {
500     return func();
501   } catch (std::exception& e) {
502     return makeFuture<InnerType>(
503         exception_wrapper(std::current_exception(), e));
504   } catch (...) {
505     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
506   }
507 }
508
509 // makeFutureWith(T()) -> Future<T>
510 // makeFutureWith(void()) -> Future<Unit>
511 template <class F>
512 typename std::enable_if<
513     !(isFuture<typename std::result_of<F()>::type>::value),
514     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
515 makeFutureWith(F&& func) {
516   using LiftedResult =
517       typename Unit::Lift<typename std::result_of<F()>::type>::type;
518   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
519     return func();
520   }));
521 }
522
523 template <class T>
524 Future<T> makeFuture(std::exception_ptr const& e) {
525   return makeFuture(Try<T>(e));
526 }
527
528 template <class T>
529 Future<T> makeFuture(exception_wrapper ew) {
530   return makeFuture(Try<T>(std::move(ew)));
531 }
532
533 template <class T, class E>
534 typename std::enable_if<std::is_base_of<std::exception, E>::value,
535                         Future<T>>::type
536 makeFuture(E const& e) {
537   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
538 }
539
540 template <class T>
541 Future<T> makeFuture(Try<T>&& t) {
542   return Future<T>(new detail::Core<T>(std::move(t)));
543 }
544
545 // via
546 Future<Unit> via(Executor* executor, int8_t priority) {
547   return makeFuture().via(executor, priority);
548 }
549
550 // mapSetCallback calls func(i, Try<T>) when every future completes
551
552 template <class T, class InputIterator, class F>
553 void mapSetCallback(InputIterator first, InputIterator last, F func) {
554   for (size_t i = 0; first != last; ++first, ++i) {
555     first->setCallback_([func, i](Try<T>&& t) {
556       func(i, std::move(t));
557     });
558   }
559 }
560
561 // collectAll (variadic)
562
563 template <typename... Fs>
564 typename detail::CollectAllVariadicContext<
565   typename std::decay<Fs>::type::value_type...>::type
566 collectAll(Fs&&... fs) {
567   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
568     typename std::decay<Fs>::type::value_type...>>();
569   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
570     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
571   return ctx->p.getFuture();
572 }
573
574 // collectAll (iterator)
575
576 template <class InputIterator>
577 Future<
578   std::vector<
579   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
580 collectAll(InputIterator first, InputIterator last) {
581   typedef
582     typename std::iterator_traits<InputIterator>::value_type::value_type T;
583
584   struct CollectAllContext {
585     CollectAllContext(int n) : results(n) {}
586     ~CollectAllContext() {
587       p.setValue(std::move(results));
588     }
589     Promise<std::vector<Try<T>>> p;
590     std::vector<Try<T>> results;
591   };
592
593   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
594   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
595     ctx->results[i] = std::move(t);
596   });
597   return ctx->p.getFuture();
598 }
599
600 // collect (iterator)
601
602 namespace detail {
603
604 template <typename T>
605 struct CollectContext {
606   struct Nothing {
607     explicit Nothing(int /* n */) {}
608   };
609
610   using Result = typename std::conditional<
611     std::is_void<T>::value,
612     void,
613     std::vector<T>>::type;
614
615   using InternalResult = typename std::conditional<
616     std::is_void<T>::value,
617     Nothing,
618     std::vector<Optional<T>>>::type;
619
620   explicit CollectContext(int n) : result(n) {}
621   ~CollectContext() {
622     if (!threw.exchange(true)) {
623       // map Optional<T> -> T
624       std::vector<T> finalResult;
625       finalResult.reserve(result.size());
626       std::transform(result.begin(), result.end(),
627                      std::back_inserter(finalResult),
628                      [](Optional<T>& o) { return std::move(o.value()); });
629       p.setValue(std::move(finalResult));
630     }
631   }
632   inline void setPartialResult(size_t i, Try<T>& t) {
633     result[i] = std::move(t.value());
634   }
635   Promise<Result> p;
636   InternalResult result;
637   std::atomic<bool> threw {false};
638 };
639
640 }
641
642 template <class InputIterator>
643 Future<typename detail::CollectContext<
644   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
645 collect(InputIterator first, InputIterator last) {
646   typedef
647     typename std::iterator_traits<InputIterator>::value_type::value_type T;
648
649   auto ctx = std::make_shared<detail::CollectContext<T>>(
650     std::distance(first, last));
651   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
652     if (t.hasException()) {
653        if (!ctx->threw.exchange(true)) {
654          ctx->p.setException(std::move(t.exception()));
655        }
656      } else if (!ctx->threw) {
657        ctx->setPartialResult(i, t);
658      }
659   });
660   return ctx->p.getFuture();
661 }
662
663 // collect (variadic)
664
665 template <typename... Fs>
666 typename detail::CollectVariadicContext<
667   typename std::decay<Fs>::type::value_type...>::type
668 collect(Fs&&... fs) {
669   auto ctx = std::make_shared<detail::CollectVariadicContext<
670     typename std::decay<Fs>::type::value_type...>>();
671   detail::collectVariadicHelper<detail::CollectVariadicContext>(
672     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
673   return ctx->p.getFuture();
674 }
675
676 // collectAny (iterator)
677
678 template <class InputIterator>
679 Future<
680   std::pair<size_t,
681             Try<
682               typename
683               std::iterator_traits<InputIterator>::value_type::value_type>>>
684 collectAny(InputIterator first, InputIterator last) {
685   typedef
686     typename std::iterator_traits<InputIterator>::value_type::value_type T;
687
688   struct CollectAnyContext {
689     CollectAnyContext() {};
690     Promise<std::pair<size_t, Try<T>>> p;
691     std::atomic<bool> done {false};
692   };
693
694   auto ctx = std::make_shared<CollectAnyContext>();
695   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
696     if (!ctx->done.exchange(true)) {
697       ctx->p.setValue(std::make_pair(i, std::move(t)));
698     }
699   });
700   return ctx->p.getFuture();
701 }
702
703 // collectAnyWithoutException (iterator)
704
705 template <class InputIterator>
706 Future<std::pair<
707     size_t,
708     typename std::iterator_traits<InputIterator>::value_type::value_type>>
709 collectAnyWithoutException(InputIterator first, InputIterator last) {
710   typedef
711       typename std::iterator_traits<InputIterator>::value_type::value_type T;
712
713   struct CollectAnyWithoutExceptionContext {
714     CollectAnyWithoutExceptionContext(){};
715     Promise<std::pair<size_t, T>> p;
716     std::atomic<bool> done{false};
717     std::atomic<size_t> nFulfilled{0};
718     size_t nTotal;
719   };
720
721   auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
722   ctx->nTotal = std::distance(first, last);
723
724   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
725     if (!t.hasException() && !ctx->done.exchange(true)) {
726       ctx->p.setValue(std::make_pair(i, std::move(t.value())));
727     } else if (++ctx->nFulfilled == ctx->nTotal) {
728       ctx->p.setException(t.exception());
729     }
730   });
731   return ctx->p.getFuture();
732 }
733
734 // collectN (iterator)
735
736 template <class InputIterator>
737 Future<std::vector<std::pair<size_t, Try<typename
738   std::iterator_traits<InputIterator>::value_type::value_type>>>>
739 collectN(InputIterator first, InputIterator last, size_t n) {
740   typedef typename
741     std::iterator_traits<InputIterator>::value_type::value_type T;
742   typedef std::vector<std::pair<size_t, Try<T>>> V;
743
744   struct CollectNContext {
745     V v;
746     std::atomic<size_t> completed = {0};
747     Promise<V> p;
748   };
749   auto ctx = std::make_shared<CollectNContext>();
750
751   if (size_t(std::distance(first, last)) < n) {
752     ctx->p.setException(std::runtime_error("Not enough futures"));
753   } else {
754     // for each completed Future, increase count and add to vector, until we
755     // have n completed futures at which point we fulfil our Promise with the
756     // vector
757     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
758       auto c = ++ctx->completed;
759       if (c <= n) {
760         assert(ctx->v.size() < n);
761         ctx->v.emplace_back(i, std::move(t));
762         if (c == n) {
763           ctx->p.setTry(Try<V>(std::move(ctx->v)));
764         }
765       }
766     });
767   }
768
769   return ctx->p.getFuture();
770 }
771
772 // reduce (iterator)
773
774 template <class It, class T, class F>
775 Future<T> reduce(It first, It last, T&& initial, F&& func) {
776   if (first == last) {
777     return makeFuture(std::move(initial));
778   }
779
780   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
781   typedef
782       typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
783                                 Try<ItT>,
784                                 ItT>::type Arg;
785   typedef isTry<Arg> IsTry;
786
787   auto sfunc = std::make_shared<F>(std::move(func));
788
789   auto f = first->then(
790       [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
791         return (*sfunc)(
792             std::move(minitial), head.template get<IsTry::value, Arg&&>());
793       });
794
795   for (++first; first != last; ++first) {
796     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
797       return (*sfunc)(std::move(std::get<0>(t).value()),
798                   // Either return a ItT&& or a Try<ItT>&& depending
799                   // on the type of the argument of func.
800                   std::get<1>(t).template get<IsTry::value, Arg&&>());
801     });
802   }
803
804   return f;
805 }
806
807 // window (collection)
808
809 template <class Collection, class F, class ItT, class Result>
810 std::vector<Future<Result>>
811 window(Collection input, F func, size_t n) {
812   struct WindowContext {
813     WindowContext(Collection&& i, F&& fn)
814         : input_(std::move(i)), promises_(input_.size()),
815           func_(std::move(fn))
816       {}
817     std::atomic<size_t> i_ {0};
818     Collection input_;
819     std::vector<Promise<Result>> promises_;
820     F func_;
821
822     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
823       size_t i = ctx->i_++;
824       if (i < ctx->input_.size()) {
825         // Using setCallback_ directly since we don't need the Future
826         ctx->func_(std::move(ctx->input_[i])).setCallback_(
827           // ctx is captured by value
828           [ctx, i](Try<Result>&& t) {
829             ctx->promises_[i].setTry(std::move(t));
830             // Chain another future onto this one
831             spawn(std::move(ctx));
832           });
833       }
834     }
835   };
836
837   auto max = std::min(n, input.size());
838
839   auto ctx = std::make_shared<WindowContext>(
840     std::move(input), std::move(func));
841
842   for (size_t i = 0; i < max; ++i) {
843     // Start the first n Futures
844     WindowContext::spawn(ctx);
845   }
846
847   std::vector<Future<Result>> futures;
848   futures.reserve(ctx->promises_.size());
849   for (auto& promise : ctx->promises_) {
850     futures.emplace_back(promise.getFuture());
851   }
852
853   return futures;
854 }
855
856 // reduce
857
858 template <class T>
859 template <class I, class F>
860 Future<I> Future<T>::reduce(I&& initial, F&& func) {
861   return then([
862     minitial = std::forward<I>(initial),
863     mfunc = std::forward<F>(func)
864   ](T& vals) mutable {
865     auto ret = std::move(minitial);
866     for (auto& val : vals) {
867       ret = mfunc(std::move(ret), std::move(val));
868     }
869     return ret;
870   });
871 }
872
873 // unorderedReduce (iterator)
874
875 template <class It, class T, class F, class ItT, class Arg>
876 Future<T> unorderedReduce(It first, It last, T initial, F func) {
877   if (first == last) {
878     return makeFuture(std::move(initial));
879   }
880
881   typedef isTry<Arg> IsTry;
882
883   struct UnorderedReduceContext {
884     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
885         : lock_(), memo_(makeFuture<T>(std::move(memo))),
886           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
887       {};
888     folly::MicroSpinLock lock_; // protects memo_ and numThens_
889     Future<T> memo_;
890     F func_;
891     size_t numThens_; // how many Futures completed and called .then()
892     size_t numFutures_; // how many Futures in total
893     Promise<T> promise_;
894   };
895
896   auto ctx = std::make_shared<UnorderedReduceContext>(
897     std::move(initial), std::move(func), std::distance(first, last));
898
899   mapSetCallback<ItT>(
900       first,
901       last,
902       [ctx](size_t /* i */, Try<ItT>&& t) {
903         // Futures can be completed in any order, simultaneously.
904         // To make this non-blocking, we create a new Future chain in
905         // the order of completion to reduce the values.
906         // The spinlock just protects chaining a new Future, not actually
907         // executing the reduce, which should be really fast.
908         folly::MSLGuard lock(ctx->lock_);
909         ctx->memo_ =
910             ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
911               // Either return a ItT&& or a Try<ItT>&& depending
912               // on the type of the argument of func.
913               return ctx->func_(std::move(v),
914                                 mt.template get<IsTry::value, Arg&&>());
915             });
916         if (++ctx->numThens_ == ctx->numFutures_) {
917           // After reducing the value of the last Future, fulfill the Promise
918           ctx->memo_.setCallback_(
919               [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
920         }
921       });
922
923   return ctx->promise_.getFuture();
924 }
925
926 // within
927
928 template <class T>
929 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
930   return within(dur, TimedOut(), tk);
931 }
932
933 template <class T>
934 template <class E>
935 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
936
937   struct Context {
938     Context(E ex) : exception(std::move(ex)), promise() {}
939     E exception;
940     Future<Unit> thisFuture;
941     Promise<T> promise;
942     std::atomic<bool> token {false};
943   };
944
945   std::shared_ptr<Timekeeper> tks;
946   if (!tk) {
947     tks = folly::detail::getTimekeeperSingleton();
948     tk = DCHECK_NOTNULL(tks.get());
949   }
950
951   auto ctx = std::make_shared<Context>(std::move(e));
952
953   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
954     // TODO: "this" completed first, cancel "after"
955     if (ctx->token.exchange(true) == false) {
956       ctx->promise.setTry(std::move(t));
957     }
958   });
959
960   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
961     // "after" completed first, cancel "this"
962     ctx->thisFuture.raise(TimedOut());
963     if (ctx->token.exchange(true) == false) {
964       if (t.hasException()) {
965         ctx->promise.setException(std::move(t.exception()));
966       } else {
967         ctx->promise.setException(std::move(ctx->exception));
968       }
969     }
970   });
971
972   return ctx->promise.getFuture().via(getExecutor());
973 }
974
975 // delayed
976
977 template <class T>
978 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
979   return collectAll(*this, futures::sleep(dur, tk))
980     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
981       Try<T>& t = std::get<0>(tup);
982       return makeFuture<T>(std::move(t));
983     });
984 }
985
986 namespace detail {
987
988 template <class T>
989 void waitImpl(Future<T>& f) {
990   // short-circuit if there's nothing to do
991   if (f.isReady()) return;
992
993   FutureBatonType baton;
994   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
995   baton.wait();
996   assert(f.isReady());
997 }
998
999 template <class T>
1000 void waitImpl(Future<T>& f, Duration dur) {
1001   // short-circuit if there's nothing to do
1002   if (f.isReady()) {
1003     return;
1004   }
1005
1006   Promise<T> promise;
1007   auto ret = promise.getFuture();
1008   auto baton = std::make_shared<FutureBatonType>();
1009   f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1010     promise.setTry(std::move(t));
1011     baton->post();
1012   });
1013   f = std::move(ret);
1014   if (baton->timed_wait(dur)) {
1015     assert(f.isReady());
1016   }
1017 }
1018
1019 template <class T>
1020 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1021   // Set callback so to ensure that the via executor has something on it
1022   // so that once the preceding future triggers this callback, drive will
1023   // always have a callback to satisfy it
1024   if (f.isReady())
1025     return;
1026   f = f.via(e).then([](T&& t) { return std::move(t); });
1027   while (!f.isReady()) {
1028     e->drive();
1029   }
1030   assert(f.isReady());
1031 }
1032
1033 } // detail
1034
1035 template <class T>
1036 Future<T>& Future<T>::wait() & {
1037   detail::waitImpl(*this);
1038   return *this;
1039 }
1040
1041 template <class T>
1042 Future<T>&& Future<T>::wait() && {
1043   detail::waitImpl(*this);
1044   return std::move(*this);
1045 }
1046
1047 template <class T>
1048 Future<T>& Future<T>::wait(Duration dur) & {
1049   detail::waitImpl(*this, dur);
1050   return *this;
1051 }
1052
1053 template <class T>
1054 Future<T>&& Future<T>::wait(Duration dur) && {
1055   detail::waitImpl(*this, dur);
1056   return std::move(*this);
1057 }
1058
1059 template <class T>
1060 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1061   detail::waitViaImpl(*this, e);
1062   return *this;
1063 }
1064
1065 template <class T>
1066 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1067   detail::waitViaImpl(*this, e);
1068   return std::move(*this);
1069 }
1070
1071 template <class T>
1072 T Future<T>::get() {
1073   return std::move(wait().value());
1074 }
1075
1076 template <class T>
1077 T Future<T>::get(Duration dur) {
1078   wait(dur);
1079   if (isReady()) {
1080     return std::move(value());
1081   } else {
1082     throw TimedOut();
1083   }
1084 }
1085
1086 template <class T>
1087 T Future<T>::getVia(DrivableExecutor* e) {
1088   return std::move(waitVia(e).value());
1089 }
1090
1091 namespace detail {
1092   template <class T>
1093   struct TryEquals {
1094     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1095       return t1.value() == t2.value();
1096     }
1097   };
1098 }
1099
1100 template <class T>
1101 Future<bool> Future<T>::willEqual(Future<T>& f) {
1102   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1103     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1104       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1105     } else {
1106       return false;
1107       }
1108   });
1109 }
1110
1111 template <class T>
1112 template <class F>
1113 Future<T> Future<T>::filter(F&& predicate) {
1114   return this->then([p = std::forward<F>(predicate)](T val) {
1115     T const& valConstRef = val;
1116     if (!p(valConstRef)) {
1117       throw PredicateDoesNotObtain();
1118     }
1119     return val;
1120   });
1121 }
1122
1123 template <class T>
1124 template <class Callback>
1125 auto Future<T>::thenMulti(Callback&& fn)
1126     -> decltype(this->then(std::forward<Callback>(fn))) {
1127   // thenMulti with one callback is just a then
1128   return then(std::forward<Callback>(fn));
1129 }
1130
1131 template <class T>
1132 template <class Callback, class... Callbacks>
1133 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1134     -> decltype(this->then(std::forward<Callback>(fn)).
1135                       thenMulti(std::forward<Callbacks>(fns)...)) {
1136   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1137   return then(std::forward<Callback>(fn)).
1138          thenMulti(std::forward<Callbacks>(fns)...);
1139 }
1140
1141 template <class T>
1142 template <class Callback, class... Callbacks>
1143 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1144                                       Callbacks&&... fns)
1145     -> decltype(this->then(std::forward<Callback>(fn)).
1146                       thenMulti(std::forward<Callbacks>(fns)...)) {
1147   // thenMultiExecutor with two callbacks is
1148   // via(x).then(a).thenMulti(b, ...).via(oldX)
1149   auto oldX = getExecutor();
1150   setExecutor(x);
1151   return then(std::forward<Callback>(fn)).
1152          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1153 }
1154
1155 template <class T>
1156 template <class Callback>
1157 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1158     -> decltype(this->then(std::forward<Callback>(fn))) {
1159   // thenMulti with one callback is just a then with an executor
1160   return then(x, std::forward<Callback>(fn));
1161 }
1162
1163 template <class F>
1164 inline Future<Unit> when(bool p, F&& thunk) {
1165   return p ? std::forward<F>(thunk)().unit() : makeFuture();
1166 }
1167
1168 template <class P, class F>
1169 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1170   if (predicate()) {
1171     auto future = thunk();
1172     return future.then([
1173       predicate = std::forward<P>(predicate),
1174       thunk = std::forward<F>(thunk)
1175     ]() mutable {
1176       return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1177     });
1178   }
1179   return makeFuture();
1180 }
1181
1182 template <class F>
1183 Future<Unit> times(const int n, F&& thunk) {
1184   return folly::whileDo(
1185       [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
1186         return count->fetch_add(1) < n;
1187       },
1188       std::forward<F>(thunk));
1189 }
1190
1191 namespace futures {
1192   template <class It, class F, class ItT, class Result>
1193   std::vector<Future<Result>> map(It first, It last, F func) {
1194     std::vector<Future<Result>> results;
1195     for (auto it = first; it != last; it++) {
1196       results.push_back(it->then(func));
1197     }
1198     return results;
1199   }
1200 }
1201
1202 namespace futures {
1203
1204 namespace detail {
1205
1206 struct retrying_policy_raw_tag {};
1207 struct retrying_policy_fut_tag {};
1208
1209 template <class Policy>
1210 struct retrying_policy_traits {
1211   using ew = exception_wrapper;
1212   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1213   template <class Ret>
1214   using has_op = typename std::integral_constant<bool,
1215         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1216         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1217   using is_raw = has_op<bool>;
1218   using is_fut = has_op<Future<bool>>;
1219   using tag = typename std::conditional<
1220         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1221         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1222 };
1223
1224 template <class Policy, class FF>
1225 typename std::result_of<FF(size_t)>::type
1226 retrying(size_t k, Policy&& p, FF&& ff) {
1227   using F = typename std::result_of<FF(size_t)>::type;
1228   using T = typename F::value_type;
1229   auto f = ff(k++);
1230   return f.onError(
1231       [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
1232           exception_wrapper x) mutable {
1233         auto q = pm(k, x);
1234         return q.then(
1235             [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
1236                 bool r) mutable {
1237               return r ? retrying(k, std::move(pm), std::move(ffm))
1238                        : makeFuture<T>(std::move(xm));
1239             });
1240       });
1241 }
1242
1243 template <class Policy, class FF>
1244 typename std::result_of<FF(size_t)>::type
1245 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1246   auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1247     return makeFuture<bool>(pm(k, x));
1248   };
1249   return retrying(0, std::move(q), std::forward<FF>(ff));
1250 }
1251
1252 template <class Policy, class FF>
1253 typename std::result_of<FF(size_t)>::type
1254 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1255   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1256 }
1257
1258 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1259 template <class URNG>
1260 Duration retryingJitteredExponentialBackoffDur(
1261     size_t n,
1262     Duration backoff_min,
1263     Duration backoff_max,
1264     double jitter_param,
1265     URNG& rng) {
1266   using d = Duration;
1267   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1268   auto jitter = std::exp(dist(rng));
1269   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1270   return std::max(backoff_min, std::min(backoff_max, backoff));
1271 }
1272
1273 template <class Policy, class URNG>
1274 std::function<Future<bool>(size_t, const exception_wrapper&)>
1275 retryingPolicyCappedJitteredExponentialBackoff(
1276     size_t max_tries,
1277     Duration backoff_min,
1278     Duration backoff_max,
1279     double jitter_param,
1280     URNG&& rng,
1281     Policy&& p) {
1282   return [
1283     pm = std::forward<Policy>(p),
1284     max_tries,
1285     backoff_min,
1286     backoff_max,
1287     jitter_param,
1288     rngp = std::forward<URNG>(rng)
1289   ](size_t n, const exception_wrapper& ex) mutable {
1290     if (n == max_tries) {
1291       return makeFuture(false);
1292     }
1293     return pm(n, ex).then(
1294         [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1295             bool v) mutable {
1296           if (!v) {
1297             return makeFuture(false);
1298           }
1299           auto backoff = detail::retryingJitteredExponentialBackoffDur(
1300               n, backoff_min, backoff_max, jitter_param, rngp);
1301           return futures::sleep(backoff).then([] { return true; });
1302         });
1303   };
1304 }
1305
1306 template <class Policy, class URNG>
1307 std::function<Future<bool>(size_t, const exception_wrapper&)>
1308 retryingPolicyCappedJitteredExponentialBackoff(
1309     size_t max_tries,
1310     Duration backoff_min,
1311     Duration backoff_max,
1312     double jitter_param,
1313     URNG&& rng,
1314     Policy&& p,
1315     retrying_policy_raw_tag) {
1316   auto q = [pm = std::forward<Policy>(p)](
1317       size_t n, const exception_wrapper& e) {
1318     return makeFuture(pm(n, e));
1319   };
1320   return retryingPolicyCappedJitteredExponentialBackoff(
1321       max_tries,
1322       backoff_min,
1323       backoff_max,
1324       jitter_param,
1325       std::forward<URNG>(rng),
1326       std::move(q));
1327 }
1328
1329 template <class Policy, class URNG>
1330 std::function<Future<bool>(size_t, const exception_wrapper&)>
1331 retryingPolicyCappedJitteredExponentialBackoff(
1332     size_t max_tries,
1333     Duration backoff_min,
1334     Duration backoff_max,
1335     double jitter_param,
1336     URNG&& rng,
1337     Policy&& p,
1338     retrying_policy_fut_tag) {
1339   return retryingPolicyCappedJitteredExponentialBackoff(
1340       max_tries,
1341       backoff_min,
1342       backoff_max,
1343       jitter_param,
1344       std::forward<URNG>(rng),
1345       std::forward<Policy>(p));
1346 }
1347 }
1348
1349 template <class Policy, class FF>
1350 typename std::result_of<FF(size_t)>::type
1351 retrying(Policy&& p, FF&& ff) {
1352   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1353   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1354 }
1355
1356 inline
1357 std::function<bool(size_t, const exception_wrapper&)>
1358 retryingPolicyBasic(
1359     size_t max_tries) {
1360   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1361 }
1362
1363 template <class Policy, class URNG>
1364 std::function<Future<bool>(size_t, const exception_wrapper&)>
1365 retryingPolicyCappedJitteredExponentialBackoff(
1366     size_t max_tries,
1367     Duration backoff_min,
1368     Duration backoff_max,
1369     double jitter_param,
1370     URNG&& rng,
1371     Policy&& p) {
1372   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1373   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1374       max_tries,
1375       backoff_min,
1376       backoff_max,
1377       jitter_param,
1378       std::forward<URNG>(rng),
1379       std::forward<Policy>(p),
1380       tag());
1381 }
1382
1383 inline
1384 std::function<Future<bool>(size_t, const exception_wrapper&)>
1385 retryingPolicyCappedJitteredExponentialBackoff(
1386     size_t max_tries,
1387     Duration backoff_min,
1388     Duration backoff_max,
1389     double jitter_param) {
1390   auto p = [](size_t, const exception_wrapper&) { return true; };
1391   return retryingPolicyCappedJitteredExponentialBackoff(
1392       max_tries,
1393       backoff_min,
1394       backoff_max,
1395       jitter_param,
1396       ThreadLocalPRNG(),
1397       std::move(p));
1398 }
1399
1400 }
1401
1402 // Instantiate the most common Future types to save compile time
1403 extern template class Future<Unit>;
1404 extern template class Future<bool>;
1405 extern template class Future<int>;
1406 extern template class Future<int64_t>;
1407 extern template class Future<std::string>;
1408 extern template class Future<double>;
1409
1410 } // namespace folly