Add SemiFuture class.
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24
25 #include <folly/Baton.h>
26 #include <folly/Optional.h>
27 #include <folly/Random.h>
28 #include <folly/futures/Timekeeper.h>
29 #include <folly/futures/detail/Core.h>
30
31 #ifndef FOLLY_FUTURE_USING_FIBER
32 #if FOLLY_MOBILE || defined(__APPLE__)
33 #define FOLLY_FUTURE_USING_FIBER 0
34 #else
35 #define FOLLY_FUTURE_USING_FIBER 1
36 #include <folly/fibers/Baton.h>
37 #endif
38 #endif
39
40 namespace folly {
41
42 class Timekeeper;
43
44 namespace futures {
45 namespace detail {
46 #if FOLLY_FUTURE_USING_FIBER
47 typedef folly::fibers::Baton FutureBatonType;
48 #else
49 typedef folly::Baton<> FutureBatonType;
50 #endif
51 } // namespace detail
52 } // namespace futures
53
54 namespace detail {
55 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
56 } // namespace detail
57
58 namespace futures {
59 namespace detail {
60 //  Guarantees that the stored functor is destructed before the stored promise
61 //  may be fulfilled. Assumes the stored functor to be noexcept-destructible.
62 template <typename T, typename F>
63 class CoreCallbackState {
64  public:
65   template <typename FF>
66   CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
67       noexcept(F(std::declval<FF>())))
68       : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
69     assert(before_barrier());
70   }
71
72   CoreCallbackState(CoreCallbackState&& that) noexcept(
73       noexcept(F(std::declval<F>()))) {
74     if (that.before_barrier()) {
75       new (&func_) F(std::move(that.func_));
76       promise_ = that.stealPromise();
77     }
78   }
79
80   CoreCallbackState& operator=(CoreCallbackState&&) = delete;
81
82   ~CoreCallbackState() {
83     if (before_barrier()) {
84       stealPromise();
85     }
86   }
87
88   template <typename... Args>
89   auto invoke(Args&&... args) noexcept(
90       noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
91     assert(before_barrier());
92     return std::move(func_)(std::forward<Args>(args)...);
93   }
94
95   template <typename... Args>
96   auto tryInvoke(Args&&... args) noexcept {
97     return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
98   }
99
100   void setTry(Try<T>&& t) {
101     stealPromise().setTry(std::move(t));
102   }
103
104   void setException(exception_wrapper&& ew) {
105     stealPromise().setException(std::move(ew));
106   }
107
108   Promise<T> stealPromise() noexcept {
109     assert(before_barrier());
110     func_.~F();
111     return std::move(promise_);
112   }
113
114  private:
115   bool before_barrier() const noexcept {
116     return !promise_.isFulfilled();
117   }
118
119   union {
120     F func_;
121   };
122   Promise<T> promise_{Promise<T>::makeEmpty()};
123 };
124
125 template <typename T, typename F>
126 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
127     noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
128         std::declval<Promise<T>&&>(),
129         std::declval<F&&>()))) {
130   return CoreCallbackState<T, _t<std::decay<F>>>(
131       std::move(p), std::forward<F>(f));
132 }
133 } // namespace detail
134 } // namespace futures
135
136 template <class T>
137 SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
138   return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
139 }
140
141 inline SemiFuture<Unit> makeSemiFuture() {
142   return makeSemiFuture(Unit{});
143 }
144
145 // makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
146 template <class F>
147 typename std::enable_if<
148     isSemiFuture<typename std::result_of<F()>::type>::value,
149     typename std::result_of<F()>::type>::type
150 makeSemiFutureWith(F&& func) {
151   using InnerType =
152       typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
153   try {
154     return std::forward<F>(func)();
155   } catch (std::exception& e) {
156     return makeSemiFuture<InnerType>(
157         exception_wrapper(std::current_exception(), e));
158   } catch (...) {
159     return makeSemiFuture<InnerType>(
160         exception_wrapper(std::current_exception()));
161   }
162 }
163
164 // makeSemiFutureWith(T()) -> SemiFuture<T>
165 // makeSemiFutureWith(void()) -> SemiFuture<Unit>
166 template <class F>
167 typename std::enable_if<
168     !(isSemiFuture<typename std::result_of<F()>::type>::value),
169     SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
170 makeSemiFutureWith(F&& func) {
171   using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
172   return makeSemiFuture<LiftedResult>(
173       makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
174 }
175
176 template <class T>
177 SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
178   return makeSemiFuture(Try<T>(e));
179 }
180
181 template <class T>
182 SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
183   return makeSemiFuture(Try<T>(std::move(ew)));
184 }
185
186 template <class T, class E>
187 typename std::
188     enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
189     makeSemiFuture(E const& e) {
190   return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
191 }
192
193 template <class T>
194 SemiFuture<T> makeSemiFuture(Try<T>&& t) {
195   return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
196 }
197
198 template <class T>
199 SemiFuture<T> SemiFuture<T>::makeEmpty() {
200   return SemiFuture<T>(futures::detail::EmptyConstruct{});
201 }
202
203 template <class T>
204 SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept : core_(other.core_) {
205   other.core_ = nullptr;
206 }
207
208 template <class T>
209 SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
210   std::swap(core_, other.core_);
211   return *this;
212 }
213
214 template <class T>
215 SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept : core_(other.core_) {
216   other.core_ = nullptr;
217 }
218
219 template <class T>
220 SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
221   std::swap(core_, other.core_);
222   return *this;
223 }
224
225 template <class T>
226 template <class T2, typename>
227 SemiFuture<T>::SemiFuture(T2&& val)
228     : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
229
230 template <class T>
231 template <typename T2>
232 SemiFuture<T>::SemiFuture(
233     typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
234     : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
235
236 template <class T>
237 template <
238     class... Args,
239     typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
240         type>
241 SemiFuture<T>::SemiFuture(in_place_t, Args&&... args)
242     : core_(
243           new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
244 }
245
246 template <class T>
247 SemiFuture<T>::~SemiFuture() {
248   detach();
249 }
250
251 template <class T>
252 typename std::add_lvalue_reference<T>::type SemiFuture<T>::value() {
253   throwIfInvalid();
254
255   return core_->getTry().value();
256 }
257
258 template <class T>
259 typename std::add_lvalue_reference<const T>::type SemiFuture<T>::value() const {
260   throwIfInvalid();
261
262   return core_->getTry().value();
263 }
264
265 template <class T>
266 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
267   throwIfInvalid();
268
269   setExecutor(executor, priority);
270
271   auto newFuture = Future<T>(core_);
272   core_ = nullptr;
273   return newFuture;
274 }
275
276 template <class T>
277 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) & {
278   throwIfInvalid();
279   Promise<T> p;
280   auto f = p.getFuture();
281   auto func = [p = std::move(p)](Try<T>&& t) mutable {
282     p.setTry(std::move(t));
283   };
284   using R = futures::detail::callableResult<T, decltype(func)>;
285   thenImplementation<decltype(func), R>(std::move(func), typename R::Arg());
286   return std::move(f).via(executor, priority);
287 }
288
289 template <class T>
290 bool SemiFuture<T>::isReady() const {
291   throwIfInvalid();
292   return core_->ready();
293 }
294
295 template <class T>
296 bool SemiFuture<T>::hasValue() {
297   return getTry().hasValue();
298 }
299
300 template <class T>
301 bool SemiFuture<T>::hasException() {
302   return getTry().hasException();
303 }
304
305 template <class T>
306 void SemiFuture<T>::detach() {
307   if (core_) {
308     core_->detachFuture();
309     core_ = nullptr;
310   }
311 }
312
313 template <class T>
314 Try<T>& SemiFuture<T>::getTry() {
315   throwIfInvalid();
316
317   return core_->getTry();
318 }
319
320 template <class T>
321 void SemiFuture<T>::throwIfInvalid() const {
322   if (!core_)
323     throwNoState();
324 }
325
326 template <class T>
327 Optional<Try<T>> SemiFuture<T>::poll() {
328   Optional<Try<T>> o;
329   if (core_->ready()) {
330     o = std::move(core_->getTry());
331   }
332   return o;
333 }
334
335 template <class T>
336 void SemiFuture<T>::raise(exception_wrapper exception) {
337   core_->raise(std::move(exception));
338 }
339
340 template <class T>
341 template <class F>
342 void SemiFuture<T>::setCallback_(F&& func) {
343   throwIfInvalid();
344   core_->setCallback(std::forward<F>(func));
345 }
346
347 template <class T>
348 SemiFuture<T>::SemiFuture(futures::detail::EmptyConstruct) noexcept
349     : core_(nullptr) {}
350
351 template <class T>
352 Future<T> Future<T>::makeEmpty() {
353   return Future<T>(futures::detail::EmptyConstruct{});
354 }
355
356 template <class T>
357 Future<T>::Future(Future<T>&& other) noexcept
358     : SemiFuture<T>(std::move(other)) {}
359
360 template <class T>
361 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
362   SemiFuture<T>::operator=(SemiFuture<T>{std::move(other)});
363   return *this;
364 }
365
366 template <class T>
367 template <
368     class T2,
369     typename std::enable_if<
370         !std::is_same<T, typename std::decay<T2>::type>::value &&
371             std::is_constructible<T, T2&&>::value &&
372             std::is_convertible<T2&&, T>::value,
373         int>::type>
374 Future<T>::Future(Future<T2>&& other)
375     : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
376
377 template <class T>
378 template <
379     class T2,
380     typename std::enable_if<
381         !std::is_same<T, typename std::decay<T2>::type>::value &&
382             std::is_constructible<T, T2&&>::value &&
383             !std::is_convertible<T2&&, T>::value,
384         int>::type>
385 Future<T>::Future(Future<T2>&& other)
386     : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
387
388 template <class T>
389 template <
390     class T2,
391     typename std::enable_if<
392         !std::is_same<T, typename std::decay<T2>::type>::value &&
393             std::is_constructible<T, T2&&>::value,
394         int>::type>
395 Future<T>& Future<T>::operator=(Future<T2>&& other) {
396   return operator=(
397       std::move(other).then([](T2&& v) { return T(std::move(v)); }));
398 }
399
400 // TODO: isSemiFuture
401 template <class T>
402 template <class T2, typename>
403 Future<T>::Future(T2&& val) : SemiFuture<T>(std::forward<T2>(val)) {}
404
405 template <class T>
406 template <typename T2>
407 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
408     : SemiFuture<T>() {}
409
410 template <class T>
411 template <
412     class... Args,
413     typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
414         type>
415 Future<T>::Future(in_place_t, Args&&... args)
416     : SemiFuture<T>(in_place, std::forward<Args>(args)...) {}
417
418 template <class T>
419 Future<T>::~Future() {
420 }
421
422 // unwrap
423
424 template <class T>
425 template <class F>
426 typename std::enable_if<isFuture<F>::value,
427                         Future<typename isFuture<T>::Inner>>::type
428 Future<T>::unwrap() {
429   return then([](Future<typename isFuture<T>::Inner> internal_future) {
430       return internal_future;
431   });
432 }
433
434 // then
435
436 // Variant: returns a value
437 // e.g. f.then([](Try<T>&& t){ return t.value(); });
438 template <class T>
439 template <typename F, typename R, bool isTry, typename... Args>
440 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
441 SemiFuture<T>::thenImplementation(
442     F&& func,
443     futures::detail::argResult<isTry, F, Args...>) {
444   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
445   typedef typename R::ReturnsFuture::Inner B;
446
447   this->throwIfInvalid();
448
449   Promise<B> p;
450   p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
451
452   // grab the Future now before we lose our handle on the Promise
453   auto f = p.getFuture();
454   f.core_->setExecutorNoLock(this->getExecutor());
455
456   /* This is a bit tricky.
457
458      We can't just close over *this in case this Future gets moved. So we
459      make a new dummy Future. We could figure out something more
460      sophisticated that avoids making a new Future object when it can, as an
461      optimization. But this is correct.
462
463      core_ can't be moved, it is explicitly disallowed (as is copying). But
464      if there's ever a reason to allow it, this is one place that makes that
465      assumption and would need to be fixed. We use a standard shared pointer
466      for core_ (by copying it in), which means in essence obj holds a shared
467      pointer to itself.  But this shouldn't leak because Promise will not
468      outlive the continuation, because Promise will setException() with a
469      broken Promise if it is destructed before completed. We could use a
470      weak pointer but it would have to be converted to a shared pointer when
471      func is executed (because the Future returned by func may possibly
472      persist beyond the callback, if it gets moved), and so it is an
473      optimization to just make it shared from the get-go.
474
475      Two subtle but important points about this design. futures::detail::Core
476      has no back pointers to Future or Promise, so if Future or Promise get
477      moved (and they will be moved in performant code) we don't have to do
478      anything fancy. And because we store the continuation in the
479      futures::detail::Core, not in the Future, we can execute the continuation
480      even after the Future has gone out of scope. This is an intentional design
481      decision. It is likely we will want to be able to cancel a continuation
482      in some circumstances, but I think it should be explicit not implicit
483      in the destruction of the Future used to create it.
484      */
485   this->setCallback_(
486       [state = futures::detail::makeCoreCallbackState(
487            std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
488
489         if (!isTry && t.hasException()) {
490           state.setException(std::move(t.exception()));
491         } else {
492           state.setTry(makeTryWith(
493               [&] { return state.invoke(t.template get<isTry, Args>()...); }));
494         }
495       });
496   return f;
497 }
498
499 // Variant: returns a Future
500 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
501 template <class T>
502 template <typename F, typename R, bool isTry, typename... Args>
503 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
504 SemiFuture<T>::thenImplementation(
505     F&& func,
506     futures::detail::argResult<isTry, F, Args...>) {
507   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
508   typedef typename R::ReturnsFuture::Inner B;
509   this->throwIfInvalid();
510
511   Promise<B> p;
512   p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
513
514   // grab the Future now before we lose our handle on the Promise
515   auto f = p.getFuture();
516   f.core_->setExecutorNoLock(this->getExecutor());
517
518   this->setCallback_(
519       [state = futures::detail::makeCoreCallbackState(
520            std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
521         if (!isTry && t.hasException()) {
522           state.setException(std::move(t.exception()));
523         } else {
524           auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
525           if (tf2.hasException()) {
526             state.setException(std::move(tf2.exception()));
527           } else {
528             tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
529               p.setTry(std::move(b));
530             });
531           }
532         }
533       });
534
535   return f;
536 }
537
538 template <typename T>
539 template <typename R, typename Caller, typename... Args>
540   Future<typename isFuture<R>::Inner>
541 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
542   typedef typename std::remove_cv<typename std::remove_reference<
543       typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
544       FirstArg;
545
546   return then([instance, func](Try<T>&& t){
547     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
548   });
549 }
550
551 template <class T>
552 Future<Unit> Future<T>::then() {
553   return then([] () {});
554 }
555
556 // onError where the callback returns T
557 template <class T>
558 template <class F>
559 typename std::enable_if<
560     !futures::detail::callableWith<F, exception_wrapper>::value &&
561         !futures::detail::callableWith<F, exception_wrapper&>::value &&
562         !futures::detail::Extract<F>::ReturnsFuture::value,
563     Future<T>>::type
564 Future<T>::onError(F&& func) {
565   typedef std::remove_reference_t<
566       typename futures::detail::Extract<F>::FirstArg>
567       Exn;
568   static_assert(
569       std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
570       "Return type of onError callback must be T or Future<T>");
571
572   Promise<T> p;
573   p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
574   auto f = p.getFuture();
575
576   this->setCallback_(
577       [state = futures::detail::makeCoreCallbackState(
578            std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
579         if (auto e = t.template tryGetExceptionObject<Exn>()) {
580           state.setTry(makeTryWith([&] { return state.invoke(*e); }));
581         } else {
582           state.setTry(std::move(t));
583         }
584       });
585
586   return f;
587 }
588
589 // onError where the callback returns Future<T>
590 template <class T>
591 template <class F>
592 typename std::enable_if<
593     !futures::detail::callableWith<F, exception_wrapper>::value &&
594         !futures::detail::callableWith<F, exception_wrapper&>::value &&
595         futures::detail::Extract<F>::ReturnsFuture::value,
596     Future<T>>::type
597 Future<T>::onError(F&& func) {
598   static_assert(
599       std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
600           value,
601       "Return type of onError callback must be T or Future<T>");
602   typedef std::remove_reference_t<
603       typename futures::detail::Extract<F>::FirstArg>
604       Exn;
605
606   Promise<T> p;
607   auto f = p.getFuture();
608
609   this->setCallback_(
610       [state = futures::detail::makeCoreCallbackState(
611            std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
612         if (auto e = t.template tryGetExceptionObject<Exn>()) {
613           auto tf2 = state.tryInvoke(*e);
614           if (tf2.hasException()) {
615             state.setException(std::move(tf2.exception()));
616           } else {
617             tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
618               p.setTry(std::move(t3));
619             });
620           }
621         } else {
622           state.setTry(std::move(t));
623         }
624       });
625
626   return f;
627 }
628
629 template <class T>
630 template <class F>
631 Future<T> Future<T>::ensure(F&& func) {
632   return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
633     std::move(funcw)();
634     return makeFuture(std::move(t));
635   });
636 }
637
638 template <class T>
639 template <class F>
640 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
641   return within(dur, tk).onError([funcw = std::forward<F>(func)](
642       TimedOut const&) { return std::move(funcw)(); });
643 }
644
645 template <class T>
646 template <class F>
647 typename std::enable_if<
648     futures::detail::callableWith<F, exception_wrapper>::value &&
649         futures::detail::Extract<F>::ReturnsFuture::value,
650     Future<T>>::type
651 Future<T>::onError(F&& func) {
652   static_assert(
653       std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
654           value,
655       "Return type of onError callback must be T or Future<T>");
656
657   Promise<T> p;
658   auto f = p.getFuture();
659   this->setCallback_(
660       [state = futures::detail::makeCoreCallbackState(
661            std::move(p), std::forward<F>(func))](Try<T> t) mutable {
662         if (t.hasException()) {
663           auto tf2 = state.tryInvoke(std::move(t.exception()));
664           if (tf2.hasException()) {
665             state.setException(std::move(tf2.exception()));
666           } else {
667             tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
668               p.setTry(std::move(t3));
669             });
670           }
671         } else {
672           state.setTry(std::move(t));
673         }
674       });
675
676   return f;
677 }
678
679 // onError(exception_wrapper) that returns T
680 template <class T>
681 template <class F>
682 typename std::enable_if<
683     futures::detail::callableWith<F, exception_wrapper>::value &&
684         !futures::detail::Extract<F>::ReturnsFuture::value,
685     Future<T>>::type
686 Future<T>::onError(F&& func) {
687   static_assert(
688       std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
689           value,
690       "Return type of onError callback must be T or Future<T>");
691
692   Promise<T> p;
693   auto f = p.getFuture();
694   this->setCallback_(
695       [state = futures::detail::makeCoreCallbackState(
696            std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
697         if (t.hasException()) {
698           state.setTry(makeTryWith(
699               [&] { return state.invoke(std::move(t.exception())); }));
700         } else {
701           state.setTry(std::move(t));
702         }
703       });
704
705   return f;
706 }
707
708 template <class T>
709 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
710   return waitVia(e).getTry();
711 }
712
713 template <class Func>
714 auto via(Executor* x, Func&& func)
715     -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
716   // TODO make this actually more performant. :-P #7260175
717   return via(x).then(std::forward<Func>(func));
718 }
719
720 template <class T>
721 Future<T>::Future(futures::detail::EmptyConstruct) noexcept
722     : SemiFuture<T>(futures::detail::EmptyConstruct{}) {}
723
724 // makeFuture
725
726 template <class T>
727 Future<typename std::decay<T>::type> makeFuture(T&& t) {
728   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
729 }
730
731 inline Future<Unit> makeFuture() {
732   return makeFuture(Unit{});
733 }
734
735 // makeFutureWith(Future<T>()) -> Future<T>
736 template <class F>
737 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
738                         typename std::result_of<F()>::type>::type
739 makeFutureWith(F&& func) {
740   using InnerType =
741       typename isFuture<typename std::result_of<F()>::type>::Inner;
742   try {
743     return std::forward<F>(func)();
744   } catch (std::exception& e) {
745     return makeFuture<InnerType>(
746         exception_wrapper(std::current_exception(), e));
747   } catch (...) {
748     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
749   }
750 }
751
752 // makeFutureWith(T()) -> Future<T>
753 // makeFutureWith(void()) -> Future<Unit>
754 template <class F>
755 typename std::enable_if<
756     !(isFuture<typename std::result_of<F()>::type>::value),
757     Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
758 makeFutureWith(F&& func) {
759   using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
760   return makeFuture<LiftedResult>(
761       makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
762 }
763
764 template <class T>
765 Future<T> makeFuture(std::exception_ptr const& e) {
766   return makeFuture(Try<T>(e));
767 }
768
769 template <class T>
770 Future<T> makeFuture(exception_wrapper ew) {
771   return makeFuture(Try<T>(std::move(ew)));
772 }
773
774 template <class T, class E>
775 typename std::enable_if<std::is_base_of<std::exception, E>::value,
776                         Future<T>>::type
777 makeFuture(E const& e) {
778   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
779 }
780
781 template <class T>
782 Future<T> makeFuture(Try<T>&& t) {
783   return Future<T>(new futures::detail::Core<T>(std::move(t)));
784 }
785
786 // via
787 Future<Unit> via(Executor* executor, int8_t priority) {
788   return makeFuture().via(executor, priority);
789 }
790
791 // mapSetCallback calls func(i, Try<T>) when every future completes
792
793 template <class T, class InputIterator, class F>
794 void mapSetCallback(InputIterator first, InputIterator last, F func) {
795   for (size_t i = 0; first != last; ++first, ++i) {
796     first->setCallback_([func, i](Try<T>&& t) {
797       func(i, std::move(t));
798     });
799   }
800 }
801
802 // collectAll (variadic)
803
804 template <typename... Fs>
805 typename futures::detail::CollectAllVariadicContext<
806     typename std::decay<Fs>::type::value_type...>::type
807 collectAll(Fs&&... fs) {
808   auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
809       typename std::decay<Fs>::type::value_type...>>();
810   futures::detail::collectVariadicHelper<
811       futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
812   return ctx->p.getFuture();
813 }
814
815 // collectAll (iterator)
816
817 template <class InputIterator>
818 Future<
819   std::vector<
820   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
821 collectAll(InputIterator first, InputIterator last) {
822   typedef
823     typename std::iterator_traits<InputIterator>::value_type::value_type T;
824
825   struct CollectAllContext {
826     CollectAllContext(size_t n) : results(n) {}
827     ~CollectAllContext() {
828       p.setValue(std::move(results));
829     }
830     Promise<std::vector<Try<T>>> p;
831     std::vector<Try<T>> results;
832   };
833
834   auto ctx =
835       std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
836   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
837     ctx->results[i] = std::move(t);
838   });
839   return ctx->p.getFuture();
840 }
841
842 // collect (iterator)
843
844 namespace futures {
845 namespace detail {
846
847 template <typename T>
848 struct CollectContext {
849   struct Nothing {
850     explicit Nothing(int /* n */) {}
851   };
852
853   using Result = typename std::conditional<
854     std::is_void<T>::value,
855     void,
856     std::vector<T>>::type;
857
858   using InternalResult = typename std::conditional<
859     std::is_void<T>::value,
860     Nothing,
861     std::vector<Optional<T>>>::type;
862
863   explicit CollectContext(size_t n) : result(n) {}
864   ~CollectContext() {
865     if (!threw.exchange(true)) {
866       // map Optional<T> -> T
867       std::vector<T> finalResult;
868       finalResult.reserve(result.size());
869       std::transform(result.begin(), result.end(),
870                      std::back_inserter(finalResult),
871                      [](Optional<T>& o) { return std::move(o.value()); });
872       p.setValue(std::move(finalResult));
873     }
874   }
875   inline void setPartialResult(size_t i, Try<T>& t) {
876     result[i] = std::move(t.value());
877   }
878   Promise<Result> p;
879   InternalResult result;
880   std::atomic<bool> threw {false};
881 };
882
883 } // namespace detail
884 } // namespace futures
885
886 template <class InputIterator>
887 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
888     InputIterator>::value_type::value_type>::Result>
889 collect(InputIterator first, InputIterator last) {
890   typedef
891     typename std::iterator_traits<InputIterator>::value_type::value_type T;
892
893   auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
894       std::distance(first, last));
895   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
896     if (t.hasException()) {
897        if (!ctx->threw.exchange(true)) {
898          ctx->p.setException(std::move(t.exception()));
899        }
900      } else if (!ctx->threw) {
901        ctx->setPartialResult(i, t);
902      }
903   });
904   return ctx->p.getFuture();
905 }
906
907 // collect (variadic)
908
909 template <typename... Fs>
910 typename futures::detail::CollectVariadicContext<
911     typename std::decay<Fs>::type::value_type...>::type
912 collect(Fs&&... fs) {
913   auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
914       typename std::decay<Fs>::type::value_type...>>();
915   futures::detail::collectVariadicHelper<
916       futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
917   return ctx->p.getFuture();
918 }
919
920 // collectAny (iterator)
921
922 template <class InputIterator>
923 Future<
924   std::pair<size_t,
925             Try<
926               typename
927               std::iterator_traits<InputIterator>::value_type::value_type>>>
928 collectAny(InputIterator first, InputIterator last) {
929   typedef
930     typename std::iterator_traits<InputIterator>::value_type::value_type T;
931
932   struct CollectAnyContext {
933     CollectAnyContext() {}
934     Promise<std::pair<size_t, Try<T>>> p;
935     std::atomic<bool> done {false};
936   };
937
938   auto ctx = std::make_shared<CollectAnyContext>();
939   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
940     if (!ctx->done.exchange(true)) {
941       ctx->p.setValue(std::make_pair(i, std::move(t)));
942     }
943   });
944   return ctx->p.getFuture();
945 }
946
947 // collectAnyWithoutException (iterator)
948
949 template <class InputIterator>
950 Future<std::pair<
951     size_t,
952     typename std::iterator_traits<InputIterator>::value_type::value_type>>
953 collectAnyWithoutException(InputIterator first, InputIterator last) {
954   typedef
955       typename std::iterator_traits<InputIterator>::value_type::value_type T;
956
957   struct CollectAnyWithoutExceptionContext {
958     CollectAnyWithoutExceptionContext(){}
959     Promise<std::pair<size_t, T>> p;
960     std::atomic<bool> done{false};
961     std::atomic<size_t> nFulfilled{0};
962     size_t nTotal;
963   };
964
965   auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
966   ctx->nTotal = size_t(std::distance(first, last));
967
968   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
969     if (!t.hasException() && !ctx->done.exchange(true)) {
970       ctx->p.setValue(std::make_pair(i, std::move(t.value())));
971     } else if (++ctx->nFulfilled == ctx->nTotal) {
972       ctx->p.setException(t.exception());
973     }
974   });
975   return ctx->p.getFuture();
976 }
977
978 // collectN (iterator)
979
980 template <class InputIterator>
981 Future<std::vector<std::pair<size_t, Try<typename
982   std::iterator_traits<InputIterator>::value_type::value_type>>>>
983 collectN(InputIterator first, InputIterator last, size_t n) {
984   typedef typename
985     std::iterator_traits<InputIterator>::value_type::value_type T;
986   typedef std::vector<std::pair<size_t, Try<T>>> V;
987
988   struct CollectNContext {
989     V v;
990     std::atomic<size_t> completed = {0};
991     Promise<V> p;
992   };
993   auto ctx = std::make_shared<CollectNContext>();
994
995   if (size_t(std::distance(first, last)) < n) {
996     ctx->p.setException(std::runtime_error("Not enough futures"));
997   } else {
998     // for each completed Future, increase count and add to vector, until we
999     // have n completed futures at which point we fulfil our Promise with the
1000     // vector
1001     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
1002       auto c = ++ctx->completed;
1003       if (c <= n) {
1004         assert(ctx->v.size() < n);
1005         ctx->v.emplace_back(i, std::move(t));
1006         if (c == n) {
1007           ctx->p.setTry(Try<V>(std::move(ctx->v)));
1008         }
1009       }
1010     });
1011   }
1012
1013   return ctx->p.getFuture();
1014 }
1015
1016 // reduce (iterator)
1017
1018 template <class It, class T, class F>
1019 Future<T> reduce(It first, It last, T&& initial, F&& func) {
1020   if (first == last) {
1021     return makeFuture(std::move(initial));
1022   }
1023
1024   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
1025   typedef typename std::conditional<
1026       futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
1027       Try<ItT>,
1028       ItT>::type Arg;
1029   typedef isTry<Arg> IsTry;
1030
1031   auto sfunc = std::make_shared<F>(std::move(func));
1032
1033   auto f = first->then(
1034       [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
1035         return (*sfunc)(
1036             std::move(minitial), head.template get<IsTry::value, Arg&&>());
1037       });
1038
1039   for (++first; first != last; ++first) {
1040     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
1041       return (*sfunc)(std::move(std::get<0>(t).value()),
1042                   // Either return a ItT&& or a Try<ItT>&& depending
1043                   // on the type of the argument of func.
1044                   std::get<1>(t).template get<IsTry::value, Arg&&>());
1045     });
1046   }
1047
1048   return f;
1049 }
1050
1051 // window (collection)
1052
1053 template <class Collection, class F, class ItT, class Result>
1054 std::vector<Future<Result>>
1055 window(Collection input, F func, size_t n) {
1056   struct WindowContext {
1057     WindowContext(Collection&& i, F&& fn)
1058         : input_(std::move(i)), promises_(input_.size()),
1059           func_(std::move(fn))
1060       {}
1061     std::atomic<size_t> i_ {0};
1062     Collection input_;
1063     std::vector<Promise<Result>> promises_;
1064     F func_;
1065
1066     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
1067       size_t i = ctx->i_++;
1068       if (i < ctx->input_.size()) {
1069         // Using setCallback_ directly since we don't need the Future
1070         ctx->func_(std::move(ctx->input_[i])).setCallback_(
1071           // ctx is captured by value
1072           [ctx, i](Try<Result>&& t) {
1073             ctx->promises_[i].setTry(std::move(t));
1074             // Chain another future onto this one
1075             spawn(std::move(ctx));
1076           });
1077       }
1078     }
1079   };
1080
1081   auto max = std::min(n, input.size());
1082
1083   auto ctx = std::make_shared<WindowContext>(
1084     std::move(input), std::move(func));
1085
1086   for (size_t i = 0; i < max; ++i) {
1087     // Start the first n Futures
1088     WindowContext::spawn(ctx);
1089   }
1090
1091   std::vector<Future<Result>> futures;
1092   futures.reserve(ctx->promises_.size());
1093   for (auto& promise : ctx->promises_) {
1094     futures.emplace_back(promise.getFuture());
1095   }
1096
1097   return futures;
1098 }
1099
1100 // reduce
1101
1102 template <class T>
1103 template <class I, class F>
1104 Future<I> Future<T>::reduce(I&& initial, F&& func) {
1105   return then([
1106     minitial = std::forward<I>(initial),
1107     mfunc = std::forward<F>(func)
1108   ](T& vals) mutable {
1109     auto ret = std::move(minitial);
1110     for (auto& val : vals) {
1111       ret = mfunc(std::move(ret), std::move(val));
1112     }
1113     return ret;
1114   });
1115 }
1116
1117 // unorderedReduce (iterator)
1118
1119 template <class It, class T, class F, class ItT, class Arg>
1120 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1121   if (first == last) {
1122     return makeFuture(std::move(initial));
1123   }
1124
1125   typedef isTry<Arg> IsTry;
1126
1127   struct UnorderedReduceContext {
1128     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1129         : lock_(), memo_(makeFuture<T>(std::move(memo))),
1130           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1131       {}
1132     folly::MicroSpinLock lock_; // protects memo_ and numThens_
1133     Future<T> memo_;
1134     F func_;
1135     size_t numThens_; // how many Futures completed and called .then()
1136     size_t numFutures_; // how many Futures in total
1137     Promise<T> promise_;
1138   };
1139
1140   auto ctx = std::make_shared<UnorderedReduceContext>(
1141     std::move(initial), std::move(func), std::distance(first, last));
1142
1143   mapSetCallback<ItT>(
1144       first,
1145       last,
1146       [ctx](size_t /* i */, Try<ItT>&& t) {
1147         // Futures can be completed in any order, simultaneously.
1148         // To make this non-blocking, we create a new Future chain in
1149         // the order of completion to reduce the values.
1150         // The spinlock just protects chaining a new Future, not actually
1151         // executing the reduce, which should be really fast.
1152         folly::MSLGuard lock(ctx->lock_);
1153         ctx->memo_ =
1154             ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1155               // Either return a ItT&& or a Try<ItT>&& depending
1156               // on the type of the argument of func.
1157               return ctx->func_(std::move(v),
1158                                 mt.template get<IsTry::value, Arg&&>());
1159             });
1160         if (++ctx->numThens_ == ctx->numFutures_) {
1161           // After reducing the value of the last Future, fulfill the Promise
1162           ctx->memo_.setCallback_(
1163               [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1164         }
1165       });
1166
1167   return ctx->promise_.getFuture();
1168 }
1169
1170 // within
1171
1172 template <class T>
1173 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1174   return within(dur, TimedOut(), tk);
1175 }
1176
1177 template <class T>
1178 template <class E>
1179 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1180
1181   struct Context {
1182     Context(E ex) : exception(std::move(ex)), promise() {}
1183     E exception;
1184     Future<Unit> thisFuture;
1185     Promise<T> promise;
1186     std::atomic<bool> token {false};
1187   };
1188
1189   if (this->isReady()) {
1190     return std::move(*this);
1191   }
1192
1193   std::shared_ptr<Timekeeper> tks;
1194   if (!tk) {
1195     tks = folly::detail::getTimekeeperSingleton();
1196     tk = DCHECK_NOTNULL(tks.get());
1197   }
1198
1199   auto ctx = std::make_shared<Context>(std::move(e));
1200
1201   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1202     if (ctx->token.exchange(true) == false) {
1203       ctx->promise.setTry(std::move(t));
1204     }
1205   });
1206
1207   // Have time keeper use a weak ptr to hold ctx,
1208   // so that ctx can be deallocated as soon as the future job finished.
1209   tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1210     auto lockedCtx = weakCtx.lock();
1211     if (!lockedCtx) {
1212       // ctx already released. "this" completed first, cancel "after"
1213       return;
1214     }
1215     // "after" completed first, cancel "this"
1216     lockedCtx->thisFuture.raise(TimedOut());
1217     if (lockedCtx->token.exchange(true) == false) {
1218       if (t.hasException()) {
1219         lockedCtx->promise.setException(std::move(t.exception()));
1220       } else {
1221         lockedCtx->promise.setException(std::move(lockedCtx->exception));
1222       }
1223     }
1224   });
1225
1226   return ctx->promise.getFuture().via(this->getExecutor());
1227 }
1228
1229 // delayed
1230
1231 template <class T>
1232 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1233   return collectAll(*this, futures::sleep(dur, tk))
1234     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1235       Try<T>& t = std::get<0>(tup);
1236       return makeFuture<T>(std::move(t));
1237     });
1238 }
1239
1240 namespace futures {
1241 namespace detail {
1242
1243 template <class FutureType, typename T = typename FutureType::value_type>
1244 void waitImpl(FutureType& f) {
1245   // short-circuit if there's nothing to do
1246   if (f.isReady()) return;
1247
1248   FutureBatonType baton;
1249   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1250   baton.wait();
1251   assert(f.isReady());
1252 }
1253
1254 template <class FutureType, typename T = typename FutureType::value_type>
1255 void waitImpl(FutureType& f, Duration dur) {
1256   // short-circuit if there's nothing to do
1257   if (f.isReady()) {
1258     return;
1259   }
1260
1261   Promise<T> promise;
1262   auto ret = promise.getFuture();
1263   auto baton = std::make_shared<FutureBatonType>();
1264   f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1265     promise.setTry(std::move(t));
1266     baton->post();
1267   });
1268   f = std::move(ret);
1269   if (baton->timed_wait(dur)) {
1270     assert(f.isReady());
1271   }
1272 }
1273
1274 template <class T>
1275 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1276   // Set callback so to ensure that the via executor has something on it
1277   // so that once the preceding future triggers this callback, drive will
1278   // always have a callback to satisfy it
1279   if (f.isReady())
1280     return;
1281   f = f.via(e).then([](T&& t) { return std::move(t); });
1282   while (!f.isReady()) {
1283     e->drive();
1284   }
1285   assert(f.isReady());
1286 }
1287
1288 } // namespace detail
1289 } // namespace futures
1290
1291 template <class T>
1292 SemiFuture<T>& SemiFuture<T>::wait() & {
1293   futures::detail::waitImpl(*this);
1294   return *this;
1295 }
1296
1297 template <class T>
1298 SemiFuture<T>&& SemiFuture<T>::wait() && {
1299   futures::detail::waitImpl(*this);
1300   return std::move(*this);
1301 }
1302
1303 template <class T>
1304 SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
1305   futures::detail::waitImpl(*this, dur);
1306   return *this;
1307 }
1308
1309 template <class T>
1310 SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
1311   futures::detail::waitImpl(*this, dur);
1312   return std::move(*this);
1313 }
1314
1315 template <class T>
1316 T SemiFuture<T>::get() {
1317   return std::move(wait().value());
1318 }
1319
1320 template <class T>
1321 T SemiFuture<T>::get(Duration dur) {
1322   wait(dur);
1323   if (this->isReady()) {
1324     return std::move(this->value());
1325   } else {
1326     throwTimedOut();
1327   }
1328 }
1329
1330 template <class T>
1331 Future<T>& Future<T>::wait() & {
1332   futures::detail::waitImpl(*this);
1333   return *this;
1334 }
1335
1336 template <class T>
1337 Future<T>&& Future<T>::wait() && {
1338   futures::detail::waitImpl(*this);
1339   return std::move(*this);
1340 }
1341
1342 template <class T>
1343 Future<T>& Future<T>::wait(Duration dur) & {
1344   futures::detail::waitImpl(*this, dur);
1345   return *this;
1346 }
1347
1348 template <class T>
1349 Future<T>&& Future<T>::wait(Duration dur) && {
1350   futures::detail::waitImpl(*this, dur);
1351   return std::move(*this);
1352 }
1353
1354 template <class T>
1355 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1356   futures::detail::waitViaImpl(*this, e);
1357   return *this;
1358 }
1359
1360 template <class T>
1361 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1362   futures::detail::waitViaImpl(*this, e);
1363   return std::move(*this);
1364 }
1365
1366 template <class T>
1367 T Future<T>::getVia(DrivableExecutor* e) {
1368   return std::move(waitVia(e).value());
1369 }
1370
1371 namespace futures {
1372 namespace detail {
1373 template <class T>
1374 struct TryEquals {
1375   static bool equals(const Try<T>& t1, const Try<T>& t2) {
1376     return t1.value() == t2.value();
1377   }
1378 };
1379 } // namespace detail
1380 } // namespace futures
1381
1382 template <class T>
1383 Future<bool> Future<T>::willEqual(Future<T>& f) {
1384   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1385     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1386       return futures::detail::TryEquals<T>::equals(
1387           std::get<0>(t), std::get<1>(t));
1388     } else {
1389       return false;
1390       }
1391   });
1392 }
1393
1394 template <class T>
1395 template <class F>
1396 Future<T> Future<T>::filter(F&& predicate) {
1397   return this->then([p = std::forward<F>(predicate)](T val) {
1398     T const& valConstRef = val;
1399     if (!p(valConstRef)) {
1400       throwPredicateDoesNotObtain();
1401     }
1402     return val;
1403   });
1404 }
1405
1406 template <class F>
1407 inline Future<Unit> when(bool p, F&& thunk) {
1408   return p ? std::forward<F>(thunk)().unit() : makeFuture();
1409 }
1410
1411 template <class P, class F>
1412 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1413   if (predicate()) {
1414     auto future = thunk();
1415     return future.then([
1416       predicate = std::forward<P>(predicate),
1417       thunk = std::forward<F>(thunk)
1418     ]() mutable {
1419       return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1420     });
1421   }
1422   return makeFuture();
1423 }
1424
1425 template <class F>
1426 Future<Unit> times(const int n, F&& thunk) {
1427   return folly::whileDo(
1428       [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1429         return count->fetch_add(1) < n;
1430       },
1431       std::forward<F>(thunk));
1432 }
1433
1434 namespace futures {
1435   template <class It, class F, class ItT, class Result>
1436   std::vector<Future<Result>> map(It first, It last, F func) {
1437     std::vector<Future<Result>> results;
1438     for (auto it = first; it != last; it++) {
1439       results.push_back(it->then(func));
1440     }
1441     return results;
1442   }
1443 }
1444
1445 namespace futures {
1446
1447 namespace detail {
1448
1449 struct retrying_policy_raw_tag {};
1450 struct retrying_policy_fut_tag {};
1451
1452 template <class Policy>
1453 struct retrying_policy_traits {
1454   using result = std::result_of_t<Policy(size_t, const exception_wrapper&)>;
1455   using is_raw = std::is_same<result, bool>;
1456   using is_fut = std::is_same<result, Future<bool>>;
1457   using tag = typename std::conditional<
1458         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1459         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1460 };
1461
1462 template <class Policy, class FF, class Prom>
1463 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1464   using F = typename std::result_of<FF(size_t)>::type;
1465   using T = typename F::value_type;
1466   auto f = makeFutureWith([&] { return ff(k++); });
1467   f.then([
1468     k,
1469     prom = std::move(prom),
1470     pm = std::forward<Policy>(p),
1471     ffm = std::forward<FF>(ff)
1472   ](Try<T> && t) mutable {
1473     if (t.hasValue()) {
1474       prom.setValue(std::move(t).value());
1475       return;
1476     }
1477     auto& x = t.exception();
1478     auto q = pm(k, x);
1479     q.then([
1480       k,
1481       prom = std::move(prom),
1482       xm = std::move(x),
1483       pm = std::move(pm),
1484       ffm = std::move(ffm)
1485     ](bool shouldRetry) mutable {
1486       if (shouldRetry) {
1487         retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1488       } else {
1489         prom.setException(std::move(xm));
1490       };
1491     });
1492   });
1493 }
1494
1495 template <class Policy, class FF>
1496 typename std::result_of<FF(size_t)>::type
1497 retrying(size_t k, Policy&& p, FF&& ff) {
1498   using F = typename std::result_of<FF(size_t)>::type;
1499   using T = typename F::value_type;
1500   auto prom = Promise<T>();
1501   auto f = prom.getFuture();
1502   retryingImpl(
1503       k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1504   return f;
1505 }
1506
1507 template <class Policy, class FF>
1508 typename std::result_of<FF(size_t)>::type
1509 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1510   auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1511     return makeFuture<bool>(pm(k, x));
1512   };
1513   return retrying(0, std::move(q), std::forward<FF>(ff));
1514 }
1515
1516 template <class Policy, class FF>
1517 typename std::result_of<FF(size_t)>::type
1518 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1519   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1520 }
1521
1522 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1523 template <class URNG>
1524 Duration retryingJitteredExponentialBackoffDur(
1525     size_t n,
1526     Duration backoff_min,
1527     Duration backoff_max,
1528     double jitter_param,
1529     URNG& rng) {
1530   using d = Duration;
1531   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1532   auto jitter = std::exp(dist(rng));
1533   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1534   return std::max(backoff_min, std::min(backoff_max, backoff));
1535 }
1536
1537 template <class Policy, class URNG>
1538 std::function<Future<bool>(size_t, const exception_wrapper&)>
1539 retryingPolicyCappedJitteredExponentialBackoff(
1540     size_t max_tries,
1541     Duration backoff_min,
1542     Duration backoff_max,
1543     double jitter_param,
1544     URNG&& rng,
1545     Policy&& p) {
1546   return [
1547     pm = std::forward<Policy>(p),
1548     max_tries,
1549     backoff_min,
1550     backoff_max,
1551     jitter_param,
1552     rngp = std::forward<URNG>(rng)
1553   ](size_t n, const exception_wrapper& ex) mutable {
1554     if (n == max_tries) {
1555       return makeFuture(false);
1556     }
1557     return pm(n, ex).then(
1558         [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1559             bool v) mutable {
1560           if (!v) {
1561             return makeFuture(false);
1562           }
1563           auto backoff = detail::retryingJitteredExponentialBackoffDur(
1564               n, backoff_min, backoff_max, jitter_param, rngp);
1565           return futures::sleep(backoff).then([] { return true; });
1566         });
1567   };
1568 }
1569
1570 template <class Policy, class URNG>
1571 std::function<Future<bool>(size_t, const exception_wrapper&)>
1572 retryingPolicyCappedJitteredExponentialBackoff(
1573     size_t max_tries,
1574     Duration backoff_min,
1575     Duration backoff_max,
1576     double jitter_param,
1577     URNG&& rng,
1578     Policy&& p,
1579     retrying_policy_raw_tag) {
1580   auto q = [pm = std::forward<Policy>(p)](
1581       size_t n, const exception_wrapper& e) {
1582     return makeFuture(pm(n, e));
1583   };
1584   return retryingPolicyCappedJitteredExponentialBackoff(
1585       max_tries,
1586       backoff_min,
1587       backoff_max,
1588       jitter_param,
1589       std::forward<URNG>(rng),
1590       std::move(q));
1591 }
1592
1593 template <class Policy, class URNG>
1594 std::function<Future<bool>(size_t, const exception_wrapper&)>
1595 retryingPolicyCappedJitteredExponentialBackoff(
1596     size_t max_tries,
1597     Duration backoff_min,
1598     Duration backoff_max,
1599     double jitter_param,
1600     URNG&& rng,
1601     Policy&& p,
1602     retrying_policy_fut_tag) {
1603   return retryingPolicyCappedJitteredExponentialBackoff(
1604       max_tries,
1605       backoff_min,
1606       backoff_max,
1607       jitter_param,
1608       std::forward<URNG>(rng),
1609       std::forward<Policy>(p));
1610 }
1611 }
1612
1613 template <class Policy, class FF>
1614 typename std::result_of<FF(size_t)>::type
1615 retrying(Policy&& p, FF&& ff) {
1616   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1617   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1618 }
1619
1620 inline
1621 std::function<bool(size_t, const exception_wrapper&)>
1622 retryingPolicyBasic(
1623     size_t max_tries) {
1624   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1625 }
1626
1627 template <class Policy, class URNG>
1628 std::function<Future<bool>(size_t, const exception_wrapper&)>
1629 retryingPolicyCappedJitteredExponentialBackoff(
1630     size_t max_tries,
1631     Duration backoff_min,
1632     Duration backoff_max,
1633     double jitter_param,
1634     URNG&& rng,
1635     Policy&& p) {
1636   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1637   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1638       max_tries,
1639       backoff_min,
1640       backoff_max,
1641       jitter_param,
1642       std::forward<URNG>(rng),
1643       std::forward<Policy>(p),
1644       tag());
1645 }
1646
1647 inline
1648 std::function<Future<bool>(size_t, const exception_wrapper&)>
1649 retryingPolicyCappedJitteredExponentialBackoff(
1650     size_t max_tries,
1651     Duration backoff_min,
1652     Duration backoff_max,
1653     double jitter_param) {
1654   auto p = [](size_t, const exception_wrapper&) { return true; };
1655   return retryingPolicyCappedJitteredExponentialBackoff(
1656       max_tries,
1657       backoff_min,
1658       backoff_max,
1659       jitter_param,
1660       ThreadLocalPRNG(),
1661       std::move(p));
1662 }
1663
1664 }
1665
1666 // Instantiate the most common Future types to save compile time
1667 extern template class Future<Unit>;
1668 extern template class Future<bool>;
1669 extern template class Future<int>;
1670 extern template class Future<int64_t>;
1671 extern template class Future<std::string>;
1672 extern template class Future<double>;
1673 } // namespace folly