Shift Future::then and Future::thenMulti into the class definition
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
33 #else
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
36 #endif
37
38 namespace folly {
39
40 class Timekeeper;
41
42 namespace detail {
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
45 #else
46 typedef folly::Baton<> FutureBatonType;
47 #endif
48 }
49
50 namespace detail {
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52
53 //  Guarantees that the stored functor is destructed before the stored promise
54 //  may be fulfilled. Assumes the stored functor to be noexcept-destructible.
55 template <typename T, typename F>
56 class CoreCallbackState {
57  public:
58   template <typename FF>
59   CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
60       noexcept(F(std::declval<FF>())))
61       : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
62     assert(before_barrier());
63   }
64
65   CoreCallbackState(CoreCallbackState&& that) noexcept(
66       noexcept(F(std::declval<F>()))) {
67     if (that.before_barrier()) {
68       new (&func_) F(std::move(that.func_));
69       promise_ = that.stealPromise();
70     }
71   }
72
73   CoreCallbackState& operator=(CoreCallbackState&&) = delete;
74
75   ~CoreCallbackState() {
76     if (before_barrier()) {
77       stealPromise();
78     }
79   }
80
81   template <typename... Args>
82   auto invoke(Args&&... args) noexcept(
83       noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
84     assert(before_barrier());
85     return std::move(func_)(std::forward<Args>(args)...);
86   }
87
88   template <typename... Args>
89   auto tryInvoke(Args&&... args) noexcept {
90     return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
91   }
92
93   void setTry(Try<T>&& t) {
94     stealPromise().setTry(std::move(t));
95   }
96
97   void setException(exception_wrapper&& ew) {
98     stealPromise().setException(std::move(ew));
99   }
100
101   Promise<T> stealPromise() noexcept {
102     assert(before_barrier());
103     func_.~F();
104     return std::move(promise_);
105   }
106
107  private:
108   bool before_barrier() const noexcept {
109     return !promise_.isFulfilled();
110   }
111
112   union {
113     F func_;
114   };
115   Promise<T> promise_{detail::EmptyConstruct{}};
116 };
117
118 template <typename T, typename F>
119 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
120     noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
121         std::declval<Promise<T>&&>(),
122         std::declval<F&&>()))) {
123   return CoreCallbackState<T, _t<std::decay<F>>>(
124       std::move(p), std::forward<F>(f));
125 }
126 }
127
128 template <class T>
129 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
130   other.core_ = nullptr;
131 }
132
133 template <class T>
134 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
135   std::swap(core_, other.core_);
136   return *this;
137 }
138
139 template <class T>
140 template <class T2, typename>
141 Future<T>::Future(T2&& val)
142     : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
143
144 template <class T>
145 template <typename T2>
146 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
147     : core_(new detail::Core<T>(Try<T>(T()))) {}
148
149 template <class T>
150 Future<T>::~Future() {
151   detach();
152 }
153
154 template <class T>
155 void Future<T>::detach() {
156   if (core_) {
157     core_->detachFuture();
158     core_ = nullptr;
159   }
160 }
161
162 template <class T>
163 void Future<T>::throwIfInvalid() const {
164   if (!core_)
165     throw NoState();
166 }
167
168 template <class T>
169 template <class F>
170 void Future<T>::setCallback_(F&& func) {
171   throwIfInvalid();
172   core_->setCallback(std::forward<F>(func));
173 }
174
175 // unwrap
176
177 template <class T>
178 template <class F>
179 typename std::enable_if<isFuture<F>::value,
180                         Future<typename isFuture<T>::Inner>>::type
181 Future<T>::unwrap() {
182   return then([](Future<typename isFuture<T>::Inner> internal_future) {
183       return internal_future;
184   });
185 }
186
187 // then
188
189 // Variant: returns a value
190 // e.g. f.then([](Try<T>&& t){ return t.value(); });
191 template <class T>
192 template <typename F, typename R, bool isTry, typename... Args>
193 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
194 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
195   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
196   typedef typename R::ReturnsFuture::Inner B;
197
198   throwIfInvalid();
199
200   Promise<B> p;
201   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
202
203   // grab the Future now before we lose our handle on the Promise
204   auto f = p.getFuture();
205   f.core_->setExecutorNoLock(getExecutor());
206
207   /* This is a bit tricky.
208
209      We can't just close over *this in case this Future gets moved. So we
210      make a new dummy Future. We could figure out something more
211      sophisticated that avoids making a new Future object when it can, as an
212      optimization. But this is correct.
213
214      core_ can't be moved, it is explicitly disallowed (as is copying). But
215      if there's ever a reason to allow it, this is one place that makes that
216      assumption and would need to be fixed. We use a standard shared pointer
217      for core_ (by copying it in), which means in essence obj holds a shared
218      pointer to itself.  But this shouldn't leak because Promise will not
219      outlive the continuation, because Promise will setException() with a
220      broken Promise if it is destructed before completed. We could use a
221      weak pointer but it would have to be converted to a shared pointer when
222      func is executed (because the Future returned by func may possibly
223      persist beyond the callback, if it gets moved), and so it is an
224      optimization to just make it shared from the get-go.
225
226      Two subtle but important points about this design. detail::Core has no
227      back pointers to Future or Promise, so if Future or Promise get moved
228      (and they will be moved in performant code) we don't have to do
229      anything fancy. And because we store the continuation in the
230      detail::Core, not in the Future, we can execute the continuation even
231      after the Future has gone out of scope. This is an intentional design
232      decision. It is likely we will want to be able to cancel a continuation
233      in some circumstances, but I think it should be explicit not implicit
234      in the destruction of the Future used to create it.
235      */
236   setCallback_(
237       [state = detail::makeCoreCallbackState(
238            std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
239         if (!isTry && t.hasException()) {
240           state.setException(std::move(t.exception()));
241         } else {
242           state.setTry(makeTryWith(
243               [&] { return state.invoke(t.template get<isTry, Args>()...); }));
244         }
245       });
246
247   return f;
248 }
249
250 // Variant: returns a Future
251 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
252 template <class T>
253 template <typename F, typename R, bool isTry, typename... Args>
254 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
255 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
256   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
257   typedef typename R::ReturnsFuture::Inner B;
258
259   throwIfInvalid();
260
261   Promise<B> p;
262   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
263
264   // grab the Future now before we lose our handle on the Promise
265   auto f = p.getFuture();
266   f.core_->setExecutorNoLock(getExecutor());
267
268   setCallback_(
269       [state = detail::makeCoreCallbackState(
270            std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
271         if (!isTry && t.hasException()) {
272           state.setException(std::move(t.exception()));
273         } else {
274           auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
275           if (tf2.hasException()) {
276             state.setException(std::move(tf2.exception()));
277           } else {
278             tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
279               p.setTry(std::move(b));
280             });
281           }
282         }
283       });
284
285   return f;
286 }
287
288 template <typename T>
289 template <typename R, typename Caller, typename... Args>
290   Future<typename isFuture<R>::Inner>
291 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
292   typedef typename std::remove_cv<
293     typename std::remove_reference<
294       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
295   return then([instance, func](Try<T>&& t){
296     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
297   });
298 }
299
300 template <class T>
301 Future<Unit> Future<T>::then() {
302   return then([] () {});
303 }
304
305 // onError where the callback returns T
306 template <class T>
307 template <class F>
308 typename std::enable_if<
309   !detail::callableWith<F, exception_wrapper>::value &&
310   !detail::Extract<F>::ReturnsFuture::value,
311   Future<T>>::type
312 Future<T>::onError(F&& func) {
313   typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
314   static_assert(
315       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
316       "Return type of onError callback must be T or Future<T>");
317
318   Promise<T> p;
319   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
320   auto f = p.getFuture();
321
322   setCallback_(
323       [state = detail::makeCoreCallbackState(
324            std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
325         if (auto e = t.template tryGetExceptionObject<Exn>()) {
326           state.setTry(makeTryWith([&] { return state.invoke(*e); }));
327         } else {
328           state.setTry(std::move(t));
329         }
330       });
331
332   return f;
333 }
334
335 // onError where the callback returns Future<T>
336 template <class T>
337 template <class F>
338 typename std::enable_if<
339   !detail::callableWith<F, exception_wrapper>::value &&
340   detail::Extract<F>::ReturnsFuture::value,
341   Future<T>>::type
342 Future<T>::onError(F&& func) {
343   static_assert(
344       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
345       "Return type of onError callback must be T or Future<T>");
346   typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
347
348   Promise<T> p;
349   auto f = p.getFuture();
350
351   setCallback_(
352       [state = detail::makeCoreCallbackState(
353            std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
354         if (auto e = t.template tryGetExceptionObject<Exn>()) {
355           auto tf2 = state.tryInvoke(*e);
356           if (tf2.hasException()) {
357             state.setException(std::move(tf2.exception()));
358           } else {
359             tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
360               p.setTry(std::move(t3));
361             });
362           }
363         } else {
364           state.setTry(std::move(t));
365         }
366       });
367
368   return f;
369 }
370
371 template <class T>
372 template <class F>
373 Future<T> Future<T>::ensure(F&& func) {
374   return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
375     std::move(funcw)();
376     return makeFuture(std::move(t));
377   });
378 }
379
380 template <class T>
381 template <class F>
382 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
383   return within(dur, tk).onError([funcw = std::forward<F>(func)](
384       TimedOut const&) { return std::move(funcw)(); });
385 }
386
387 template <class T>
388 template <class F>
389 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
390                             detail::Extract<F>::ReturnsFuture::value,
391                         Future<T>>::type
392 Future<T>::onError(F&& func) {
393   static_assert(
394       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
395       "Return type of onError callback must be T or Future<T>");
396
397   Promise<T> p;
398   auto f = p.getFuture();
399   setCallback_(
400       [state = detail::makeCoreCallbackState(
401            std::move(p), std::forward<F>(func))](Try<T> t) mutable {
402         if (t.hasException()) {
403           auto tf2 = state.tryInvoke(std::move(t.exception()));
404           if (tf2.hasException()) {
405             state.setException(std::move(tf2.exception()));
406           } else {
407             tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
408               p.setTry(std::move(t3));
409             });
410           }
411         } else {
412           state.setTry(std::move(t));
413         }
414       });
415
416   return f;
417 }
418
419 // onError(exception_wrapper) that returns T
420 template <class T>
421 template <class F>
422 typename std::enable_if<
423   detail::callableWith<F, exception_wrapper>::value &&
424   !detail::Extract<F>::ReturnsFuture::value,
425   Future<T>>::type
426 Future<T>::onError(F&& func) {
427   static_assert(
428       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
429       "Return type of onError callback must be T or Future<T>");
430
431   Promise<T> p;
432   auto f = p.getFuture();
433   setCallback_(
434       [state = detail::makeCoreCallbackState(
435            std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
436         if (t.hasException()) {
437           state.setTry(makeTryWith(
438               [&] { return state.invoke(std::move(t.exception())); }));
439         } else {
440           state.setTry(std::move(t));
441         }
442       });
443
444   return f;
445 }
446
447 template <class T>
448 typename std::add_lvalue_reference<T>::type Future<T>::value() {
449   throwIfInvalid();
450
451   return core_->getTry().value();
452 }
453
454 template <class T>
455 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
456   throwIfInvalid();
457
458   return core_->getTry().value();
459 }
460
461 template <class T>
462 Try<T>& Future<T>::getTry() {
463   throwIfInvalid();
464
465   return core_->getTry();
466 }
467
468 template <class T>
469 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
470   return waitVia(e).getTry();
471 }
472
473 template <class T>
474 Optional<Try<T>> Future<T>::poll() {
475   Optional<Try<T>> o;
476   if (core_->ready()) {
477     o = std::move(core_->getTry());
478   }
479   return o;
480 }
481
482 template <class T>
483 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
484   throwIfInvalid();
485
486   setExecutor(executor, priority);
487
488   return std::move(*this);
489 }
490
491 template <class T>
492 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
493   throwIfInvalid();
494
495   Promise<T> p;
496   auto f = p.getFuture();
497   then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
498   return std::move(f).via(executor, priority);
499 }
500
501 template <class Func>
502 auto via(Executor* x, Func&& func)
503     -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
504   // TODO make this actually more performant. :-P #7260175
505   return via(x).then(std::forward<Func>(func));
506 }
507
508 template <class T>
509 bool Future<T>::isReady() const {
510   throwIfInvalid();
511   return core_->ready();
512 }
513
514 template <class T>
515 bool Future<T>::hasValue() {
516   return getTry().hasValue();
517 }
518
519 template <class T>
520 bool Future<T>::hasException() {
521   return getTry().hasException();
522 }
523
524 template <class T>
525 void Future<T>::raise(exception_wrapper exception) {
526   core_->raise(std::move(exception));
527 }
528
529 // makeFuture
530
531 template <class T>
532 Future<typename std::decay<T>::type> makeFuture(T&& t) {
533   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
534 }
535
536 inline // for multiple translation units
537 Future<Unit> makeFuture() {
538   return makeFuture(Unit{});
539 }
540
541 // makeFutureWith(Future<T>()) -> Future<T>
542 template <class F>
543 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
544                         typename std::result_of<F()>::type>::type
545 makeFutureWith(F&& func) {
546   using InnerType =
547       typename isFuture<typename std::result_of<F()>::type>::Inner;
548   try {
549     return std::forward<F>(func)();
550   } catch (std::exception& e) {
551     return makeFuture<InnerType>(
552         exception_wrapper(std::current_exception(), e));
553   } catch (...) {
554     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
555   }
556 }
557
558 // makeFutureWith(T()) -> Future<T>
559 // makeFutureWith(void()) -> Future<Unit>
560 template <class F>
561 typename std::enable_if<
562     !(isFuture<typename std::result_of<F()>::type>::value),
563     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
564 makeFutureWith(F&& func) {
565   using LiftedResult =
566       typename Unit::Lift<typename std::result_of<F()>::type>::type;
567   return makeFuture<LiftedResult>(
568       makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
569 }
570
571 template <class T>
572 Future<T> makeFuture(std::exception_ptr const& e) {
573   return makeFuture(Try<T>(e));
574 }
575
576 template <class T>
577 Future<T> makeFuture(exception_wrapper ew) {
578   return makeFuture(Try<T>(std::move(ew)));
579 }
580
581 template <class T, class E>
582 typename std::enable_if<std::is_base_of<std::exception, E>::value,
583                         Future<T>>::type
584 makeFuture(E const& e) {
585   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
586 }
587
588 template <class T>
589 Future<T> makeFuture(Try<T>&& t) {
590   return Future<T>(new detail::Core<T>(std::move(t)));
591 }
592
593 // via
594 Future<Unit> via(Executor* executor, int8_t priority) {
595   return makeFuture().via(executor, priority);
596 }
597
598 // mapSetCallback calls func(i, Try<T>) when every future completes
599
600 template <class T, class InputIterator, class F>
601 void mapSetCallback(InputIterator first, InputIterator last, F func) {
602   for (size_t i = 0; first != last; ++first, ++i) {
603     first->setCallback_([func, i](Try<T>&& t) {
604       func(i, std::move(t));
605     });
606   }
607 }
608
609 // collectAll (variadic)
610
611 template <typename... Fs>
612 typename detail::CollectAllVariadicContext<
613   typename std::decay<Fs>::type::value_type...>::type
614 collectAll(Fs&&... fs) {
615   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
616     typename std::decay<Fs>::type::value_type...>>();
617   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
618       ctx, std::forward<Fs>(fs)...);
619   return ctx->p.getFuture();
620 }
621
622 // collectAll (iterator)
623
624 template <class InputIterator>
625 Future<
626   std::vector<
627   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
628 collectAll(InputIterator first, InputIterator last) {
629   typedef
630     typename std::iterator_traits<InputIterator>::value_type::value_type T;
631
632   struct CollectAllContext {
633     CollectAllContext(size_t n) : results(n) {}
634     ~CollectAllContext() {
635       p.setValue(std::move(results));
636     }
637     Promise<std::vector<Try<T>>> p;
638     std::vector<Try<T>> results;
639   };
640
641   auto ctx =
642       std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
643   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
644     ctx->results[i] = std::move(t);
645   });
646   return ctx->p.getFuture();
647 }
648
649 // collect (iterator)
650
651 namespace detail {
652
653 template <typename T>
654 struct CollectContext {
655   struct Nothing {
656     explicit Nothing(int /* n */) {}
657   };
658
659   using Result = typename std::conditional<
660     std::is_void<T>::value,
661     void,
662     std::vector<T>>::type;
663
664   using InternalResult = typename std::conditional<
665     std::is_void<T>::value,
666     Nothing,
667     std::vector<Optional<T>>>::type;
668
669   explicit CollectContext(size_t n) : result(n) {}
670   ~CollectContext() {
671     if (!threw.exchange(true)) {
672       // map Optional<T> -> T
673       std::vector<T> finalResult;
674       finalResult.reserve(result.size());
675       std::transform(result.begin(), result.end(),
676                      std::back_inserter(finalResult),
677                      [](Optional<T>& o) { return std::move(o.value()); });
678       p.setValue(std::move(finalResult));
679     }
680   }
681   inline void setPartialResult(size_t i, Try<T>& t) {
682     result[i] = std::move(t.value());
683   }
684   Promise<Result> p;
685   InternalResult result;
686   std::atomic<bool> threw {false};
687 };
688
689 }
690
691 template <class InputIterator>
692 Future<typename detail::CollectContext<
693   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
694 collect(InputIterator first, InputIterator last) {
695   typedef
696     typename std::iterator_traits<InputIterator>::value_type::value_type T;
697
698   auto ctx = std::make_shared<detail::CollectContext<T>>(
699     std::distance(first, last));
700   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
701     if (t.hasException()) {
702        if (!ctx->threw.exchange(true)) {
703          ctx->p.setException(std::move(t.exception()));
704        }
705      } else if (!ctx->threw) {
706        ctx->setPartialResult(i, t);
707      }
708   });
709   return ctx->p.getFuture();
710 }
711
712 // collect (variadic)
713
714 template <typename... Fs>
715 typename detail::CollectVariadicContext<
716   typename std::decay<Fs>::type::value_type...>::type
717 collect(Fs&&... fs) {
718   auto ctx = std::make_shared<detail::CollectVariadicContext<
719     typename std::decay<Fs>::type::value_type...>>();
720   detail::collectVariadicHelper<detail::CollectVariadicContext>(
721       ctx, std::forward<Fs>(fs)...);
722   return ctx->p.getFuture();
723 }
724
725 // collectAny (iterator)
726
727 template <class InputIterator>
728 Future<
729   std::pair<size_t,
730             Try<
731               typename
732               std::iterator_traits<InputIterator>::value_type::value_type>>>
733 collectAny(InputIterator first, InputIterator last) {
734   typedef
735     typename std::iterator_traits<InputIterator>::value_type::value_type T;
736
737   struct CollectAnyContext {
738     CollectAnyContext() {}
739     Promise<std::pair<size_t, Try<T>>> p;
740     std::atomic<bool> done {false};
741   };
742
743   auto ctx = std::make_shared<CollectAnyContext>();
744   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
745     if (!ctx->done.exchange(true)) {
746       ctx->p.setValue(std::make_pair(i, std::move(t)));
747     }
748   });
749   return ctx->p.getFuture();
750 }
751
752 // collectAnyWithoutException (iterator)
753
754 template <class InputIterator>
755 Future<std::pair<
756     size_t,
757     typename std::iterator_traits<InputIterator>::value_type::value_type>>
758 collectAnyWithoutException(InputIterator first, InputIterator last) {
759   typedef
760       typename std::iterator_traits<InputIterator>::value_type::value_type T;
761
762   struct CollectAnyWithoutExceptionContext {
763     CollectAnyWithoutExceptionContext(){}
764     Promise<std::pair<size_t, T>> p;
765     std::atomic<bool> done{false};
766     std::atomic<size_t> nFulfilled{0};
767     size_t nTotal;
768   };
769
770   auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
771   ctx->nTotal = size_t(std::distance(first, last));
772
773   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
774     if (!t.hasException() && !ctx->done.exchange(true)) {
775       ctx->p.setValue(std::make_pair(i, std::move(t.value())));
776     } else if (++ctx->nFulfilled == ctx->nTotal) {
777       ctx->p.setException(t.exception());
778     }
779   });
780   return ctx->p.getFuture();
781 }
782
783 // collectN (iterator)
784
785 template <class InputIterator>
786 Future<std::vector<std::pair<size_t, Try<typename
787   std::iterator_traits<InputIterator>::value_type::value_type>>>>
788 collectN(InputIterator first, InputIterator last, size_t n) {
789   typedef typename
790     std::iterator_traits<InputIterator>::value_type::value_type T;
791   typedef std::vector<std::pair<size_t, Try<T>>> V;
792
793   struct CollectNContext {
794     V v;
795     std::atomic<size_t> completed = {0};
796     Promise<V> p;
797   };
798   auto ctx = std::make_shared<CollectNContext>();
799
800   if (size_t(std::distance(first, last)) < n) {
801     ctx->p.setException(std::runtime_error("Not enough futures"));
802   } else {
803     // for each completed Future, increase count and add to vector, until we
804     // have n completed futures at which point we fulfil our Promise with the
805     // vector
806     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
807       auto c = ++ctx->completed;
808       if (c <= n) {
809         assert(ctx->v.size() < n);
810         ctx->v.emplace_back(i, std::move(t));
811         if (c == n) {
812           ctx->p.setTry(Try<V>(std::move(ctx->v)));
813         }
814       }
815     });
816   }
817
818   return ctx->p.getFuture();
819 }
820
821 // reduce (iterator)
822
823 template <class It, class T, class F>
824 Future<T> reduce(It first, It last, T&& initial, F&& func) {
825   if (first == last) {
826     return makeFuture(std::move(initial));
827   }
828
829   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
830   typedef
831       typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
832                                 Try<ItT>,
833                                 ItT>::type Arg;
834   typedef isTry<Arg> IsTry;
835
836   auto sfunc = std::make_shared<F>(std::move(func));
837
838   auto f = first->then(
839       [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
840         return (*sfunc)(
841             std::move(minitial), head.template get<IsTry::value, Arg&&>());
842       });
843
844   for (++first; first != last; ++first) {
845     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
846       return (*sfunc)(std::move(std::get<0>(t).value()),
847                   // Either return a ItT&& or a Try<ItT>&& depending
848                   // on the type of the argument of func.
849                   std::get<1>(t).template get<IsTry::value, Arg&&>());
850     });
851   }
852
853   return f;
854 }
855
856 // window (collection)
857
858 template <class Collection, class F, class ItT, class Result>
859 std::vector<Future<Result>>
860 window(Collection input, F func, size_t n) {
861   struct WindowContext {
862     WindowContext(Collection&& i, F&& fn)
863         : input_(std::move(i)), promises_(input_.size()),
864           func_(std::move(fn))
865       {}
866     std::atomic<size_t> i_ {0};
867     Collection input_;
868     std::vector<Promise<Result>> promises_;
869     F func_;
870
871     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
872       size_t i = ctx->i_++;
873       if (i < ctx->input_.size()) {
874         // Using setCallback_ directly since we don't need the Future
875         ctx->func_(std::move(ctx->input_[i])).setCallback_(
876           // ctx is captured by value
877           [ctx, i](Try<Result>&& t) {
878             ctx->promises_[i].setTry(std::move(t));
879             // Chain another future onto this one
880             spawn(std::move(ctx));
881           });
882       }
883     }
884   };
885
886   auto max = std::min(n, input.size());
887
888   auto ctx = std::make_shared<WindowContext>(
889     std::move(input), std::move(func));
890
891   for (size_t i = 0; i < max; ++i) {
892     // Start the first n Futures
893     WindowContext::spawn(ctx);
894   }
895
896   std::vector<Future<Result>> futures;
897   futures.reserve(ctx->promises_.size());
898   for (auto& promise : ctx->promises_) {
899     futures.emplace_back(promise.getFuture());
900   }
901
902   return futures;
903 }
904
905 // reduce
906
907 template <class T>
908 template <class I, class F>
909 Future<I> Future<T>::reduce(I&& initial, F&& func) {
910   return then([
911     minitial = std::forward<I>(initial),
912     mfunc = std::forward<F>(func)
913   ](T& vals) mutable {
914     auto ret = std::move(minitial);
915     for (auto& val : vals) {
916       ret = mfunc(std::move(ret), std::move(val));
917     }
918     return ret;
919   });
920 }
921
922 // unorderedReduce (iterator)
923
924 template <class It, class T, class F, class ItT, class Arg>
925 Future<T> unorderedReduce(It first, It last, T initial, F func) {
926   if (first == last) {
927     return makeFuture(std::move(initial));
928   }
929
930   typedef isTry<Arg> IsTry;
931
932   struct UnorderedReduceContext {
933     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
934         : lock_(), memo_(makeFuture<T>(std::move(memo))),
935           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
936       {}
937     folly::MicroSpinLock lock_; // protects memo_ and numThens_
938     Future<T> memo_;
939     F func_;
940     size_t numThens_; // how many Futures completed and called .then()
941     size_t numFutures_; // how many Futures in total
942     Promise<T> promise_;
943   };
944
945   auto ctx = std::make_shared<UnorderedReduceContext>(
946     std::move(initial), std::move(func), std::distance(first, last));
947
948   mapSetCallback<ItT>(
949       first,
950       last,
951       [ctx](size_t /* i */, Try<ItT>&& t) {
952         // Futures can be completed in any order, simultaneously.
953         // To make this non-blocking, we create a new Future chain in
954         // the order of completion to reduce the values.
955         // The spinlock just protects chaining a new Future, not actually
956         // executing the reduce, which should be really fast.
957         folly::MSLGuard lock(ctx->lock_);
958         ctx->memo_ =
959             ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
960               // Either return a ItT&& or a Try<ItT>&& depending
961               // on the type of the argument of func.
962               return ctx->func_(std::move(v),
963                                 mt.template get<IsTry::value, Arg&&>());
964             });
965         if (++ctx->numThens_ == ctx->numFutures_) {
966           // After reducing the value of the last Future, fulfill the Promise
967           ctx->memo_.setCallback_(
968               [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
969         }
970       });
971
972   return ctx->promise_.getFuture();
973 }
974
975 // within
976
977 template <class T>
978 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
979   return within(dur, TimedOut(), tk);
980 }
981
982 template <class T>
983 template <class E>
984 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
985
986   struct Context {
987     Context(E ex) : exception(std::move(ex)), promise() {}
988     E exception;
989     Future<Unit> thisFuture;
990     Promise<T> promise;
991     std::atomic<bool> token {false};
992   };
993
994   std::shared_ptr<Timekeeper> tks;
995   if (!tk) {
996     tks = folly::detail::getTimekeeperSingleton();
997     tk = DCHECK_NOTNULL(tks.get());
998   }
999
1000   auto ctx = std::make_shared<Context>(std::move(e));
1001
1002   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1003     // TODO: "this" completed first, cancel "after"
1004     if (ctx->token.exchange(true) == false) {
1005       ctx->promise.setTry(std::move(t));
1006     }
1007   });
1008
1009   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
1010     // "after" completed first, cancel "this"
1011     ctx->thisFuture.raise(TimedOut());
1012     if (ctx->token.exchange(true) == false) {
1013       if (t.hasException()) {
1014         ctx->promise.setException(std::move(t.exception()));
1015       } else {
1016         ctx->promise.setException(std::move(ctx->exception));
1017       }
1018     }
1019   });
1020
1021   return ctx->promise.getFuture().via(getExecutor());
1022 }
1023
1024 // delayed
1025
1026 template <class T>
1027 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1028   return collectAll(*this, futures::sleep(dur, tk))
1029     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1030       Try<T>& t = std::get<0>(tup);
1031       return makeFuture<T>(std::move(t));
1032     });
1033 }
1034
1035 namespace detail {
1036
1037 template <class T>
1038 void waitImpl(Future<T>& f) {
1039   // short-circuit if there's nothing to do
1040   if (f.isReady()) return;
1041
1042   FutureBatonType baton;
1043   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1044   baton.wait();
1045   assert(f.isReady());
1046 }
1047
1048 template <class T>
1049 void waitImpl(Future<T>& f, Duration dur) {
1050   // short-circuit if there's nothing to do
1051   if (f.isReady()) {
1052     return;
1053   }
1054
1055   Promise<T> promise;
1056   auto ret = promise.getFuture();
1057   auto baton = std::make_shared<FutureBatonType>();
1058   f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1059     promise.setTry(std::move(t));
1060     baton->post();
1061   });
1062   f = std::move(ret);
1063   if (baton->timed_wait(dur)) {
1064     assert(f.isReady());
1065   }
1066 }
1067
1068 template <class T>
1069 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1070   // Set callback so to ensure that the via executor has something on it
1071   // so that once the preceding future triggers this callback, drive will
1072   // always have a callback to satisfy it
1073   if (f.isReady())
1074     return;
1075   f = f.via(e).then([](T&& t) { return std::move(t); });
1076   while (!f.isReady()) {
1077     e->drive();
1078   }
1079   assert(f.isReady());
1080 }
1081
1082 } // detail
1083
1084 template <class T>
1085 Future<T>& Future<T>::wait() & {
1086   detail::waitImpl(*this);
1087   return *this;
1088 }
1089
1090 template <class T>
1091 Future<T>&& Future<T>::wait() && {
1092   detail::waitImpl(*this);
1093   return std::move(*this);
1094 }
1095
1096 template <class T>
1097 Future<T>& Future<T>::wait(Duration dur) & {
1098   detail::waitImpl(*this, dur);
1099   return *this;
1100 }
1101
1102 template <class T>
1103 Future<T>&& Future<T>::wait(Duration dur) && {
1104   detail::waitImpl(*this, dur);
1105   return std::move(*this);
1106 }
1107
1108 template <class T>
1109 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1110   detail::waitViaImpl(*this, e);
1111   return *this;
1112 }
1113
1114 template <class T>
1115 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1116   detail::waitViaImpl(*this, e);
1117   return std::move(*this);
1118 }
1119
1120 template <class T>
1121 T Future<T>::get() {
1122   return std::move(wait().value());
1123 }
1124
1125 template <class T>
1126 T Future<T>::get(Duration dur) {
1127   wait(dur);
1128   if (isReady()) {
1129     return std::move(value());
1130   } else {
1131     throw TimedOut();
1132   }
1133 }
1134
1135 template <class T>
1136 T Future<T>::getVia(DrivableExecutor* e) {
1137   return std::move(waitVia(e).value());
1138 }
1139
1140 namespace detail {
1141   template <class T>
1142   struct TryEquals {
1143     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1144       return t1.value() == t2.value();
1145     }
1146   };
1147 }
1148
1149 template <class T>
1150 Future<bool> Future<T>::willEqual(Future<T>& f) {
1151   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1152     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1153       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1154     } else {
1155       return false;
1156       }
1157   });
1158 }
1159
1160 template <class T>
1161 template <class F>
1162 Future<T> Future<T>::filter(F&& predicate) {
1163   return this->then([p = std::forward<F>(predicate)](T val) {
1164     T const& valConstRef = val;
1165     if (!p(valConstRef)) {
1166       throw PredicateDoesNotObtain();
1167     }
1168     return val;
1169   });
1170 }
1171
1172 template <class F>
1173 inline Future<Unit> when(bool p, F&& thunk) {
1174   return p ? std::forward<F>(thunk)().unit() : makeFuture();
1175 }
1176
1177 template <class P, class F>
1178 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1179   if (predicate()) {
1180     auto future = thunk();
1181     return future.then([
1182       predicate = std::forward<P>(predicate),
1183       thunk = std::forward<F>(thunk)
1184     ]() mutable {
1185       return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1186     });
1187   }
1188   return makeFuture();
1189 }
1190
1191 template <class F>
1192 Future<Unit> times(const int n, F&& thunk) {
1193   return folly::whileDo(
1194       [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1195         return count->fetch_add(1) < n;
1196       },
1197       std::forward<F>(thunk));
1198 }
1199
1200 namespace futures {
1201   template <class It, class F, class ItT, class Result>
1202   std::vector<Future<Result>> map(It first, It last, F func) {
1203     std::vector<Future<Result>> results;
1204     for (auto it = first; it != last; it++) {
1205       results.push_back(it->then(func));
1206     }
1207     return results;
1208   }
1209 }
1210
1211 namespace futures {
1212
1213 namespace detail {
1214
1215 struct retrying_policy_raw_tag {};
1216 struct retrying_policy_fut_tag {};
1217
1218 template <class Policy>
1219 struct retrying_policy_traits {
1220   using ew = exception_wrapper;
1221   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1222   template <class Ret>
1223   using has_op = typename std::integral_constant<bool,
1224         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1225         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1226   using is_raw = has_op<bool>;
1227   using is_fut = has_op<Future<bool>>;
1228   using tag = typename std::conditional<
1229         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1230         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1231 };
1232
1233 template <class Policy, class FF, class Prom>
1234 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1235   using F = typename std::result_of<FF(size_t)>::type;
1236   using T = typename F::value_type;
1237   auto f = makeFutureWith([&] { return ff(k++); });
1238   f.then([
1239     k,
1240     prom = std::move(prom),
1241     pm = std::forward<Policy>(p),
1242     ffm = std::forward<FF>(ff)
1243   ](Try<T> && t) mutable {
1244     if (t.hasValue()) {
1245       prom.setValue(std::move(t).value());
1246       return;
1247     }
1248     auto& x = t.exception();
1249     auto q = pm(k, x);
1250     q.then([
1251       k,
1252       prom = std::move(prom),
1253       xm = std::move(x),
1254       pm = std::move(pm),
1255       ffm = std::move(ffm)
1256     ](bool shouldRetry) mutable {
1257       if (shouldRetry) {
1258         retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1259       } else {
1260         prom.setException(std::move(xm));
1261       };
1262     });
1263   });
1264 }
1265
1266 template <class Policy, class FF>
1267 typename std::result_of<FF(size_t)>::type
1268 retrying(size_t k, Policy&& p, FF&& ff) {
1269   using F = typename std::result_of<FF(size_t)>::type;
1270   using T = typename F::value_type;
1271   auto prom = Promise<T>();
1272   auto f = prom.getFuture();
1273   retryingImpl(
1274       k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1275   return f;
1276 }
1277
1278 template <class Policy, class FF>
1279 typename std::result_of<FF(size_t)>::type
1280 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1281   auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1282     return makeFuture<bool>(pm(k, x));
1283   };
1284   return retrying(0, std::move(q), std::forward<FF>(ff));
1285 }
1286
1287 template <class Policy, class FF>
1288 typename std::result_of<FF(size_t)>::type
1289 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1290   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1291 }
1292
1293 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1294 template <class URNG>
1295 Duration retryingJitteredExponentialBackoffDur(
1296     size_t n,
1297     Duration backoff_min,
1298     Duration backoff_max,
1299     double jitter_param,
1300     URNG& rng) {
1301   using d = Duration;
1302   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1303   auto jitter = std::exp(dist(rng));
1304   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1305   return std::max(backoff_min, std::min(backoff_max, backoff));
1306 }
1307
1308 template <class Policy, class URNG>
1309 std::function<Future<bool>(size_t, const exception_wrapper&)>
1310 retryingPolicyCappedJitteredExponentialBackoff(
1311     size_t max_tries,
1312     Duration backoff_min,
1313     Duration backoff_max,
1314     double jitter_param,
1315     URNG&& rng,
1316     Policy&& p) {
1317   return [
1318     pm = std::forward<Policy>(p),
1319     max_tries,
1320     backoff_min,
1321     backoff_max,
1322     jitter_param,
1323     rngp = std::forward<URNG>(rng)
1324   ](size_t n, const exception_wrapper& ex) mutable {
1325     if (n == max_tries) {
1326       return makeFuture(false);
1327     }
1328     return pm(n, ex).then(
1329         [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1330             bool v) mutable {
1331           if (!v) {
1332             return makeFuture(false);
1333           }
1334           auto backoff = detail::retryingJitteredExponentialBackoffDur(
1335               n, backoff_min, backoff_max, jitter_param, rngp);
1336           return futures::sleep(backoff).then([] { return true; });
1337         });
1338   };
1339 }
1340
1341 template <class Policy, class URNG>
1342 std::function<Future<bool>(size_t, const exception_wrapper&)>
1343 retryingPolicyCappedJitteredExponentialBackoff(
1344     size_t max_tries,
1345     Duration backoff_min,
1346     Duration backoff_max,
1347     double jitter_param,
1348     URNG&& rng,
1349     Policy&& p,
1350     retrying_policy_raw_tag) {
1351   auto q = [pm = std::forward<Policy>(p)](
1352       size_t n, const exception_wrapper& e) {
1353     return makeFuture(pm(n, e));
1354   };
1355   return retryingPolicyCappedJitteredExponentialBackoff(
1356       max_tries,
1357       backoff_min,
1358       backoff_max,
1359       jitter_param,
1360       std::forward<URNG>(rng),
1361       std::move(q));
1362 }
1363
1364 template <class Policy, class URNG>
1365 std::function<Future<bool>(size_t, const exception_wrapper&)>
1366 retryingPolicyCappedJitteredExponentialBackoff(
1367     size_t max_tries,
1368     Duration backoff_min,
1369     Duration backoff_max,
1370     double jitter_param,
1371     URNG&& rng,
1372     Policy&& p,
1373     retrying_policy_fut_tag) {
1374   return retryingPolicyCappedJitteredExponentialBackoff(
1375       max_tries,
1376       backoff_min,
1377       backoff_max,
1378       jitter_param,
1379       std::forward<URNG>(rng),
1380       std::forward<Policy>(p));
1381 }
1382 }
1383
1384 template <class Policy, class FF>
1385 typename std::result_of<FF(size_t)>::type
1386 retrying(Policy&& p, FF&& ff) {
1387   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1388   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1389 }
1390
1391 inline
1392 std::function<bool(size_t, const exception_wrapper&)>
1393 retryingPolicyBasic(
1394     size_t max_tries) {
1395   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1396 }
1397
1398 template <class Policy, class URNG>
1399 std::function<Future<bool>(size_t, const exception_wrapper&)>
1400 retryingPolicyCappedJitteredExponentialBackoff(
1401     size_t max_tries,
1402     Duration backoff_min,
1403     Duration backoff_max,
1404     double jitter_param,
1405     URNG&& rng,
1406     Policy&& p) {
1407   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1408   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1409       max_tries,
1410       backoff_min,
1411       backoff_max,
1412       jitter_param,
1413       std::forward<URNG>(rng),
1414       std::forward<Policy>(p),
1415       tag());
1416 }
1417
1418 inline
1419 std::function<Future<bool>(size_t, const exception_wrapper&)>
1420 retryingPolicyCappedJitteredExponentialBackoff(
1421     size_t max_tries,
1422     Duration backoff_min,
1423     Duration backoff_max,
1424     double jitter_param) {
1425   auto p = [](size_t, const exception_wrapper&) { return true; };
1426   return retryingPolicyCappedJitteredExponentialBackoff(
1427       max_tries,
1428       backoff_min,
1429       backoff_max,
1430       jitter_param,
1431       ThreadLocalPRNG(),
1432       std::move(p));
1433 }
1434
1435 }
1436
1437 // Instantiate the most common Future types to save compile time
1438 extern template class Future<Unit>;
1439 extern template class Future<bool>;
1440 extern template class Future<int>;
1441 extern template class Future<int64_t>;
1442 extern template class Future<std::string>;
1443 extern template class Future<double>;
1444
1445 } // namespace folly