Ensure getVia(eventbase) does not busy wait
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
33 #else
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
36 #endif
37
38 namespace folly {
39
40 class Timekeeper;
41
42 namespace detail {
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
45 #else
46 typedef folly::Baton<> FutureBatonType;
47 #endif
48 }
49
50 namespace detail {
51   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52 }
53
54 template <class T>
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56   other.core_ = nullptr;
57 }
58
59 template <class T>
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61   std::swap(core_, other.core_);
62   return *this;
63 }
64
65 template <class T>
66 template <typename U, typename>
67 Future<T>::Future(Future<U>&& other) noexcept
68     : core_(detail::Core<T>::convert(other.core_)) {
69   other.core_ = nullptr;
70 }
71
72 template <class T>
73 template <typename U, typename>
74 Future<T>& Future<T>::operator=(Future<U>&& other) noexcept {
75   std::swap(core_, detail::Core<T>::convert(other.core_));
76   return *this;
77 }
78
79 template <class T>
80 template <class T2, typename>
81 Future<T>::Future(T2&& val)
82     : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
83
84 template <class T>
85 template <typename T2>
86 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
87     : core_(new detail::Core<T>(Try<T>(T()))) {}
88
89 template <class T>
90 Future<T>::~Future() {
91   detach();
92 }
93
94 template <class T>
95 void Future<T>::detach() {
96   if (core_) {
97     core_->detachFuture();
98     core_ = nullptr;
99   }
100 }
101
102 template <class T>
103 void Future<T>::throwIfInvalid() const {
104   if (!core_)
105     throw NoState();
106 }
107
108 template <class T>
109 template <class F>
110 void Future<T>::setCallback_(F&& func) {
111   throwIfInvalid();
112   core_->setCallback(std::forward<F>(func));
113 }
114
115 // unwrap
116
117 template <class T>
118 template <class F>
119 typename std::enable_if<isFuture<F>::value,
120                         Future<typename isFuture<T>::Inner>>::type
121 Future<T>::unwrap() {
122   return then([](Future<typename isFuture<T>::Inner> internal_future) {
123       return internal_future;
124   });
125 }
126
127 // then
128
129 // Variant: returns a value
130 // e.g. f.then([](Try<T>&& t){ return t.value(); });
131 template <class T>
132 template <typename F, typename R, bool isTry, typename... Args>
133 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
134 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
135   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
136   typedef typename R::ReturnsFuture::Inner B;
137
138   throwIfInvalid();
139
140   Promise<B> p;
141   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
142
143   // grab the Future now before we lose our handle on the Promise
144   auto f = p.getFuture();
145   f.core_->setExecutorNoLock(getExecutor());
146
147   /* This is a bit tricky.
148
149      We can't just close over *this in case this Future gets moved. So we
150      make a new dummy Future. We could figure out something more
151      sophisticated that avoids making a new Future object when it can, as an
152      optimization. But this is correct.
153
154      core_ can't be moved, it is explicitly disallowed (as is copying). But
155      if there's ever a reason to allow it, this is one place that makes that
156      assumption and would need to be fixed. We use a standard shared pointer
157      for core_ (by copying it in), which means in essence obj holds a shared
158      pointer to itself.  But this shouldn't leak because Promise will not
159      outlive the continuation, because Promise will setException() with a
160      broken Promise if it is destructed before completed. We could use a
161      weak pointer but it would have to be converted to a shared pointer when
162      func is executed (because the Future returned by func may possibly
163      persist beyond the callback, if it gets moved), and so it is an
164      optimization to just make it shared from the get-go.
165
166      Two subtle but important points about this design. detail::Core has no
167      back pointers to Future or Promise, so if Future or Promise get moved
168      (and they will be moved in performant code) we don't have to do
169      anything fancy. And because we store the continuation in the
170      detail::Core, not in the Future, we can execute the continuation even
171      after the Future has gone out of scope. This is an intentional design
172      decision. It is likely we will want to be able to cancel a continuation
173      in some circumstances, but I think it should be explicit not implicit
174      in the destruction of the Future used to create it.
175      */
176   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
177       Try<T> && t) mutable {
178     if (!isTry && t.hasException()) {
179       pm.setException(std::move(t.exception()));
180     } else {
181       pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
182     }
183   });
184
185   return f;
186 }
187
188 // Variant: returns a Future
189 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
190 template <class T>
191 template <typename F, typename R, bool isTry, typename... Args>
192 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
193 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
194   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
195   typedef typename R::ReturnsFuture::Inner B;
196
197   throwIfInvalid();
198
199   Promise<B> p;
200   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
201
202   // grab the Future now before we lose our handle on the Promise
203   auto f = p.getFuture();
204   f.core_->setExecutorNoLock(getExecutor());
205
206   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
207       Try<T> && t) mutable {
208     if (!isTry && t.hasException()) {
209       pm.setException(std::move(t.exception()));
210     } else {
211       try {
212         auto f2 = funcm(t.template get<isTry, Args>()...);
213         // that didn't throw, now we can steal p
214         f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
215           p.setTry(std::move(b));
216         });
217       } catch (const std::exception& e) {
218         pm.setException(exception_wrapper(std::current_exception(), e));
219       } catch (...) {
220         pm.setException(exception_wrapper(std::current_exception()));
221       }
222     }
223   });
224
225   return f;
226 }
227
228 template <typename T>
229 template <typename R, typename Caller, typename... Args>
230   Future<typename isFuture<R>::Inner>
231 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
232   typedef typename std::remove_cv<
233     typename std::remove_reference<
234       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
235   return then([instance, func](Try<T>&& t){
236     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
237   });
238 }
239
240 template <class T>
241 template <class Executor, class Arg, class... Args>
242 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
243   -> decltype(this->then(std::forward<Arg>(arg),
244                          std::forward<Args>(args)...))
245 {
246   auto oldX = getExecutor();
247   setExecutor(x);
248   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
249                via(oldX);
250 }
251
252 template <class T>
253 Future<Unit> Future<T>::then() {
254   return then([] () {});
255 }
256
257 // onError where the callback returns T
258 template <class T>
259 template <class F>
260 typename std::enable_if<
261   !detail::callableWith<F, exception_wrapper>::value &&
262   !detail::Extract<F>::ReturnsFuture::value,
263   Future<T>>::type
264 Future<T>::onError(F&& func) {
265   typedef typename detail::Extract<F>::FirstArg Exn;
266   static_assert(
267       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
268       "Return type of onError callback must be T or Future<T>");
269
270   Promise<T> p;
271   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
272   auto f = p.getFuture();
273
274   setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
275       Try<T> && t) mutable {
276     if (!t.template withException<Exn>(
277             [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
278       pm.setTry(std::move(t));
279     }
280   });
281
282   return f;
283 }
284
285 // onError where the callback returns Future<T>
286 template <class T>
287 template <class F>
288 typename std::enable_if<
289   !detail::callableWith<F, exception_wrapper>::value &&
290   detail::Extract<F>::ReturnsFuture::value,
291   Future<T>>::type
292 Future<T>::onError(F&& func) {
293   static_assert(
294       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
295       "Return type of onError callback must be T or Future<T>");
296   typedef typename detail::Extract<F>::FirstArg Exn;
297
298   Promise<T> p;
299   auto f = p.getFuture();
300
301   setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
302       Try<T> && t) mutable {
303     if (!t.template withException<Exn>([&](Exn& e) {
304           try {
305             auto f2 = funcm(e);
306             f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
307               pm.setTry(std::move(t2));
308             });
309           } catch (const std::exception& e2) {
310             pm.setException(exception_wrapper(std::current_exception(), e2));
311           } catch (...) {
312             pm.setException(exception_wrapper(std::current_exception()));
313           }
314         })) {
315       pm.setTry(std::move(t));
316     }
317   });
318
319   return f;
320 }
321
322 template <class T>
323 template <class F>
324 Future<T> Future<T>::ensure(F&& func) {
325   return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
326     funcw();
327     return makeFuture(std::move(t));
328   });
329 }
330
331 template <class T>
332 template <class F>
333 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
334   return within(dur, tk).onError([funcw = std::forward<F>(func)](
335       TimedOut const&) { return funcw(); });
336 }
337
338 template <class T>
339 template <class F>
340 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
341                             detail::Extract<F>::ReturnsFuture::value,
342                         Future<T>>::type
343 Future<T>::onError(F&& func) {
344   static_assert(
345       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
346       "Return type of onError callback must be T or Future<T>");
347
348   Promise<T> p;
349   auto f = p.getFuture();
350   setCallback_(
351       [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
352         if (t.hasException()) {
353           try {
354             auto f2 = funcm(std::move(t.exception()));
355             f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
356               pm.setTry(std::move(t2));
357             });
358           } catch (const std::exception& e2) {
359             pm.setException(exception_wrapper(std::current_exception(), e2));
360           } catch (...) {
361             pm.setException(exception_wrapper(std::current_exception()));
362           }
363         } else {
364           pm.setTry(std::move(t));
365         }
366       });
367
368   return f;
369 }
370
371 // onError(exception_wrapper) that returns T
372 template <class T>
373 template <class F>
374 typename std::enable_if<
375   detail::callableWith<F, exception_wrapper>::value &&
376   !detail::Extract<F>::ReturnsFuture::value,
377   Future<T>>::type
378 Future<T>::onError(F&& func) {
379   static_assert(
380       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
381       "Return type of onError callback must be T or Future<T>");
382
383   Promise<T> p;
384   auto f = p.getFuture();
385   setCallback_(
386       [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
387         if (t.hasException()) {
388           pm.setWith([&] { return funcm(std::move(t.exception())); });
389         } else {
390           pm.setTry(std::move(t));
391         }
392       });
393
394   return f;
395 }
396
397 template <class T>
398 typename std::add_lvalue_reference<T>::type Future<T>::value() {
399   throwIfInvalid();
400
401   return core_->getTry().value();
402 }
403
404 template <class T>
405 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
406   throwIfInvalid();
407
408   return core_->getTry().value();
409 }
410
411 template <class T>
412 Try<T>& Future<T>::getTry() {
413   throwIfInvalid();
414
415   return core_->getTry();
416 }
417
418 template <class T>
419 Optional<Try<T>> Future<T>::poll() {
420   Optional<Try<T>> o;
421   if (core_->ready()) {
422     o = std::move(core_->getTry());
423   }
424   return o;
425 }
426
427 template <class T>
428 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
429   throwIfInvalid();
430
431   setExecutor(executor, priority);
432
433   return std::move(*this);
434 }
435
436 template <class T>
437 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
438   throwIfInvalid();
439
440   Promise<T> p;
441   auto f = p.getFuture();
442   then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
443   return std::move(f).via(executor, priority);
444 }
445
446 template <class Func>
447 auto via(Executor* x, Func&& func)
448   -> Future<typename isFuture<decltype(func())>::Inner>
449 {
450   // TODO make this actually more performant. :-P #7260175
451   return via(x).then(std::forward<Func>(func));
452 }
453
454 template <class T>
455 bool Future<T>::isReady() const {
456   throwIfInvalid();
457   return core_->ready();
458 }
459
460 template <class T>
461 bool Future<T>::hasValue() {
462   return getTry().hasValue();
463 }
464
465 template <class T>
466 bool Future<T>::hasException() {
467   return getTry().hasException();
468 }
469
470 template <class T>
471 void Future<T>::raise(exception_wrapper exception) {
472   core_->raise(std::move(exception));
473 }
474
475 // makeFuture
476
477 template <class T>
478 Future<typename std::decay<T>::type> makeFuture(T&& t) {
479   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
480 }
481
482 inline // for multiple translation units
483 Future<Unit> makeFuture() {
484   return makeFuture(Unit{});
485 }
486
487 // makeFutureWith(Future<T>()) -> Future<T>
488 template <class F>
489 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
490                         typename std::result_of<F()>::type>::type
491 makeFutureWith(F&& func) {
492   using InnerType =
493       typename isFuture<typename std::result_of<F()>::type>::Inner;
494   try {
495     return func();
496   } catch (std::exception& e) {
497     return makeFuture<InnerType>(
498         exception_wrapper(std::current_exception(), e));
499   } catch (...) {
500     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
501   }
502 }
503
504 // makeFutureWith(T()) -> Future<T>
505 // makeFutureWith(void()) -> Future<Unit>
506 template <class F>
507 typename std::enable_if<
508     !(isFuture<typename std::result_of<F()>::type>::value),
509     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
510 makeFutureWith(F&& func) {
511   using LiftedResult =
512       typename Unit::Lift<typename std::result_of<F()>::type>::type;
513   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
514     return func();
515   }));
516 }
517
518 template <class T>
519 Future<T> makeFuture(std::exception_ptr const& e) {
520   return makeFuture(Try<T>(e));
521 }
522
523 template <class T>
524 Future<T> makeFuture(exception_wrapper ew) {
525   return makeFuture(Try<T>(std::move(ew)));
526 }
527
528 template <class T, class E>
529 typename std::enable_if<std::is_base_of<std::exception, E>::value,
530                         Future<T>>::type
531 makeFuture(E const& e) {
532   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
533 }
534
535 template <class T>
536 Future<T> makeFuture(Try<T>&& t) {
537   return Future<T>(new detail::Core<T>(std::move(t)));
538 }
539
540 // via
541 Future<Unit> via(Executor* executor, int8_t priority) {
542   return makeFuture().via(executor, priority);
543 }
544
545 // mapSetCallback calls func(i, Try<T>) when every future completes
546
547 template <class T, class InputIterator, class F>
548 void mapSetCallback(InputIterator first, InputIterator last, F func) {
549   for (size_t i = 0; first != last; ++first, ++i) {
550     first->setCallback_([func, i](Try<T>&& t) {
551       func(i, std::move(t));
552     });
553   }
554 }
555
556 // collectAll (variadic)
557
558 template <typename... Fs>
559 typename detail::CollectAllVariadicContext<
560   typename std::decay<Fs>::type::value_type...>::type
561 collectAll(Fs&&... fs) {
562   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
563     typename std::decay<Fs>::type::value_type...>>();
564   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
565     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
566   return ctx->p.getFuture();
567 }
568
569 // collectAll (iterator)
570
571 template <class InputIterator>
572 Future<
573   std::vector<
574   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
575 collectAll(InputIterator first, InputIterator last) {
576   typedef
577     typename std::iterator_traits<InputIterator>::value_type::value_type T;
578
579   struct CollectAllContext {
580     CollectAllContext(int n) : results(n) {}
581     ~CollectAllContext() {
582       p.setValue(std::move(results));
583     }
584     Promise<std::vector<Try<T>>> p;
585     std::vector<Try<T>> results;
586   };
587
588   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
589   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
590     ctx->results[i] = std::move(t);
591   });
592   return ctx->p.getFuture();
593 }
594
595 // collect (iterator)
596
597 namespace detail {
598
599 template <typename T>
600 struct CollectContext {
601   struct Nothing {
602     explicit Nothing(int /* n */) {}
603   };
604
605   using Result = typename std::conditional<
606     std::is_void<T>::value,
607     void,
608     std::vector<T>>::type;
609
610   using InternalResult = typename std::conditional<
611     std::is_void<T>::value,
612     Nothing,
613     std::vector<Optional<T>>>::type;
614
615   explicit CollectContext(int n) : result(n) {}
616   ~CollectContext() {
617     if (!threw.exchange(true)) {
618       // map Optional<T> -> T
619       std::vector<T> finalResult;
620       finalResult.reserve(result.size());
621       std::transform(result.begin(), result.end(),
622                      std::back_inserter(finalResult),
623                      [](Optional<T>& o) { return std::move(o.value()); });
624       p.setValue(std::move(finalResult));
625     }
626   }
627   inline void setPartialResult(size_t i, Try<T>& t) {
628     result[i] = std::move(t.value());
629   }
630   Promise<Result> p;
631   InternalResult result;
632   std::atomic<bool> threw {false};
633 };
634
635 }
636
637 template <class InputIterator>
638 Future<typename detail::CollectContext<
639   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
640 collect(InputIterator first, InputIterator last) {
641   typedef
642     typename std::iterator_traits<InputIterator>::value_type::value_type T;
643
644   auto ctx = std::make_shared<detail::CollectContext<T>>(
645     std::distance(first, last));
646   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
647     if (t.hasException()) {
648        if (!ctx->threw.exchange(true)) {
649          ctx->p.setException(std::move(t.exception()));
650        }
651      } else if (!ctx->threw) {
652        ctx->setPartialResult(i, t);
653      }
654   });
655   return ctx->p.getFuture();
656 }
657
658 // collect (variadic)
659
660 template <typename... Fs>
661 typename detail::CollectVariadicContext<
662   typename std::decay<Fs>::type::value_type...>::type
663 collect(Fs&&... fs) {
664   auto ctx = std::make_shared<detail::CollectVariadicContext<
665     typename std::decay<Fs>::type::value_type...>>();
666   detail::collectVariadicHelper<detail::CollectVariadicContext>(
667     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
668   return ctx->p.getFuture();
669 }
670
671 // collectAny (iterator)
672
673 template <class InputIterator>
674 Future<
675   std::pair<size_t,
676             Try<
677               typename
678               std::iterator_traits<InputIterator>::value_type::value_type>>>
679 collectAny(InputIterator first, InputIterator last) {
680   typedef
681     typename std::iterator_traits<InputIterator>::value_type::value_type T;
682
683   struct CollectAnyContext {
684     CollectAnyContext() {};
685     Promise<std::pair<size_t, Try<T>>> p;
686     std::atomic<bool> done {false};
687   };
688
689   auto ctx = std::make_shared<CollectAnyContext>();
690   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
691     if (!ctx->done.exchange(true)) {
692       ctx->p.setValue(std::make_pair(i, std::move(t)));
693     }
694   });
695   return ctx->p.getFuture();
696 }
697
698 // collectN (iterator)
699
700 template <class InputIterator>
701 Future<std::vector<std::pair<size_t, Try<typename
702   std::iterator_traits<InputIterator>::value_type::value_type>>>>
703 collectN(InputIterator first, InputIterator last, size_t n) {
704   typedef typename
705     std::iterator_traits<InputIterator>::value_type::value_type T;
706   typedef std::vector<std::pair<size_t, Try<T>>> V;
707
708   struct CollectNContext {
709     V v;
710     std::atomic<size_t> completed = {0};
711     Promise<V> p;
712   };
713   auto ctx = std::make_shared<CollectNContext>();
714
715   if (size_t(std::distance(first, last)) < n) {
716     ctx->p.setException(std::runtime_error("Not enough futures"));
717   } else {
718     // for each completed Future, increase count and add to vector, until we
719     // have n completed futures at which point we fulfil our Promise with the
720     // vector
721     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
722       auto c = ++ctx->completed;
723       if (c <= n) {
724         assert(ctx->v.size() < n);
725         ctx->v.emplace_back(i, std::move(t));
726         if (c == n) {
727           ctx->p.setTry(Try<V>(std::move(ctx->v)));
728         }
729       }
730     });
731   }
732
733   return ctx->p.getFuture();
734 }
735
736 // reduce (iterator)
737
738 template <class It, class T, class F>
739 Future<T> reduce(It first, It last, T&& initial, F&& func) {
740   if (first == last) {
741     return makeFuture(std::move(initial));
742   }
743
744   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
745   typedef
746       typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
747                                 Try<ItT>,
748                                 ItT>::type Arg;
749   typedef isTry<Arg> IsTry;
750
751   auto sfunc = std::make_shared<F>(std::move(func));
752
753   auto f = first->then(
754       [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
755         return (*sfunc)(
756             std::move(minitial), head.template get<IsTry::value, Arg&&>());
757       });
758
759   for (++first; first != last; ++first) {
760     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
761       return (*sfunc)(std::move(std::get<0>(t).value()),
762                   // Either return a ItT&& or a Try<ItT>&& depending
763                   // on the type of the argument of func.
764                   std::get<1>(t).template get<IsTry::value, Arg&&>());
765     });
766   }
767
768   return f;
769 }
770
771 // window (collection)
772
773 template <class Collection, class F, class ItT, class Result>
774 std::vector<Future<Result>>
775 window(Collection input, F func, size_t n) {
776   struct WindowContext {
777     WindowContext(Collection&& i, F&& fn)
778         : input_(std::move(i)), promises_(input_.size()),
779           func_(std::move(fn))
780       {}
781     std::atomic<size_t> i_ {0};
782     Collection input_;
783     std::vector<Promise<Result>> promises_;
784     F func_;
785
786     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
787       size_t i = ctx->i_++;
788       if (i < ctx->input_.size()) {
789         // Using setCallback_ directly since we don't need the Future
790         ctx->func_(std::move(ctx->input_[i])).setCallback_(
791           // ctx is captured by value
792           [ctx, i](Try<Result>&& t) {
793             ctx->promises_[i].setTry(std::move(t));
794             // Chain another future onto this one
795             spawn(std::move(ctx));
796           });
797       }
798     }
799   };
800
801   auto max = std::min(n, input.size());
802
803   auto ctx = std::make_shared<WindowContext>(
804     std::move(input), std::move(func));
805
806   for (size_t i = 0; i < max; ++i) {
807     // Start the first n Futures
808     WindowContext::spawn(ctx);
809   }
810
811   std::vector<Future<Result>> futures;
812   futures.reserve(ctx->promises_.size());
813   for (auto& promise : ctx->promises_) {
814     futures.emplace_back(promise.getFuture());
815   }
816
817   return futures;
818 }
819
820 // reduce
821
822 template <class T>
823 template <class I, class F>
824 Future<I> Future<T>::reduce(I&& initial, F&& func) {
825   return then([
826     minitial = std::forward<I>(initial),
827     mfunc = std::forward<F>(func)
828   ](T& vals) mutable {
829     auto ret = std::move(minitial);
830     for (auto& val : vals) {
831       ret = mfunc(std::move(ret), std::move(val));
832     }
833     return ret;
834   });
835 }
836
837 // unorderedReduce (iterator)
838
839 template <class It, class T, class F, class ItT, class Arg>
840 Future<T> unorderedReduce(It first, It last, T initial, F func) {
841   if (first == last) {
842     return makeFuture(std::move(initial));
843   }
844
845   typedef isTry<Arg> IsTry;
846
847   struct UnorderedReduceContext {
848     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
849         : lock_(), memo_(makeFuture<T>(std::move(memo))),
850           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
851       {};
852     folly::MicroSpinLock lock_; // protects memo_ and numThens_
853     Future<T> memo_;
854     F func_;
855     size_t numThens_; // how many Futures completed and called .then()
856     size_t numFutures_; // how many Futures in total
857     Promise<T> promise_;
858   };
859
860   auto ctx = std::make_shared<UnorderedReduceContext>(
861     std::move(initial), std::move(func), std::distance(first, last));
862
863   mapSetCallback<ItT>(
864       first,
865       last,
866       [ctx](size_t /* i */, Try<ItT>&& t) {
867         // Futures can be completed in any order, simultaneously.
868         // To make this non-blocking, we create a new Future chain in
869         // the order of completion to reduce the values.
870         // The spinlock just protects chaining a new Future, not actually
871         // executing the reduce, which should be really fast.
872         folly::MSLGuard lock(ctx->lock_);
873         ctx->memo_ =
874             ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
875               // Either return a ItT&& or a Try<ItT>&& depending
876               // on the type of the argument of func.
877               return ctx->func_(std::move(v),
878                                 mt.template get<IsTry::value, Arg&&>());
879             });
880         if (++ctx->numThens_ == ctx->numFutures_) {
881           // After reducing the value of the last Future, fulfill the Promise
882           ctx->memo_.setCallback_(
883               [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
884         }
885       });
886
887   return ctx->promise_.getFuture();
888 }
889
890 // within
891
892 template <class T>
893 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
894   return within(dur, TimedOut(), tk);
895 }
896
897 template <class T>
898 template <class E>
899 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
900
901   struct Context {
902     Context(E ex) : exception(std::move(ex)), promise() {}
903     E exception;
904     Future<Unit> thisFuture;
905     Promise<T> promise;
906     std::atomic<bool> token {false};
907   };
908
909   std::shared_ptr<Timekeeper> tks;
910   if (!tk) {
911     tks = folly::detail::getTimekeeperSingleton();
912     tk = DCHECK_NOTNULL(tks.get());
913   }
914
915   auto ctx = std::make_shared<Context>(std::move(e));
916
917   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
918     // TODO: "this" completed first, cancel "after"
919     if (ctx->token.exchange(true) == false) {
920       ctx->promise.setTry(std::move(t));
921     }
922   });
923
924   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
925     // "after" completed first, cancel "this"
926     ctx->thisFuture.raise(TimedOut());
927     if (ctx->token.exchange(true) == false) {
928       if (t.hasException()) {
929         ctx->promise.setException(std::move(t.exception()));
930       } else {
931         ctx->promise.setException(std::move(ctx->exception));
932       }
933     }
934   });
935
936   return ctx->promise.getFuture().via(getExecutor());
937 }
938
939 // delayed
940
941 template <class T>
942 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
943   return collectAll(*this, futures::sleep(dur, tk))
944     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
945       Try<T>& t = std::get<0>(tup);
946       return makeFuture<T>(std::move(t));
947     });
948 }
949
950 namespace detail {
951
952 template <class T>
953 void waitImpl(Future<T>& f) {
954   // short-circuit if there's nothing to do
955   if (f.isReady()) return;
956
957   FutureBatonType baton;
958   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
959   baton.wait();
960   assert(f.isReady());
961 }
962
963 template <class T>
964 void waitImpl(Future<T>& f, Duration dur) {
965   // short-circuit if there's nothing to do
966   if (f.isReady()) {
967     return;
968   }
969
970   Promise<T> promise;
971   auto ret = promise.getFuture();
972   auto baton = std::make_shared<FutureBatonType>();
973   f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
974     promise.setTry(std::move(t));
975     baton->post();
976   });
977   f = std::move(ret);
978   if (baton->timed_wait(dur)) {
979     assert(f.isReady());
980   }
981 }
982
983 template <class T>
984 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
985   // Set callback so to ensure that the via executor has something on it
986   // so that once the preceding future triggers this callback, drive will
987   // always have a callback to satisfy it
988   if (f.isReady())
989     return;
990   f = f.via(e).then([](T&& t) { return std::move(t); });
991   while (!f.isReady()) {
992     e->drive();
993   }
994   assert(f.isReady());
995 }
996
997 } // detail
998
999 template <class T>
1000 Future<T>& Future<T>::wait() & {
1001   detail::waitImpl(*this);
1002   return *this;
1003 }
1004
1005 template <class T>
1006 Future<T>&& Future<T>::wait() && {
1007   detail::waitImpl(*this);
1008   return std::move(*this);
1009 }
1010
1011 template <class T>
1012 Future<T>& Future<T>::wait(Duration dur) & {
1013   detail::waitImpl(*this, dur);
1014   return *this;
1015 }
1016
1017 template <class T>
1018 Future<T>&& Future<T>::wait(Duration dur) && {
1019   detail::waitImpl(*this, dur);
1020   return std::move(*this);
1021 }
1022
1023 template <class T>
1024 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1025   detail::waitViaImpl(*this, e);
1026   return *this;
1027 }
1028
1029 template <class T>
1030 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1031   detail::waitViaImpl(*this, e);
1032   return std::move(*this);
1033 }
1034
1035 template <class T>
1036 T Future<T>::get() {
1037   return std::move(wait().value());
1038 }
1039
1040 template <class T>
1041 T Future<T>::get(Duration dur) {
1042   wait(dur);
1043   if (isReady()) {
1044     return std::move(value());
1045   } else {
1046     throw TimedOut();
1047   }
1048 }
1049
1050 template <class T>
1051 T Future<T>::getVia(DrivableExecutor* e) {
1052   return std::move(waitVia(e).value());
1053 }
1054
1055 namespace detail {
1056   template <class T>
1057   struct TryEquals {
1058     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1059       return t1.value() == t2.value();
1060     }
1061   };
1062 }
1063
1064 template <class T>
1065 Future<bool> Future<T>::willEqual(Future<T>& f) {
1066   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1067     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1068       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1069     } else {
1070       return false;
1071       }
1072   });
1073 }
1074
1075 template <class T>
1076 template <class F>
1077 Future<T> Future<T>::filter(F&& predicate) {
1078   return this->then([p = std::forward<F>(predicate)](T val) {
1079     T const& valConstRef = val;
1080     if (!p(valConstRef)) {
1081       throw PredicateDoesNotObtain();
1082     }
1083     return val;
1084   });
1085 }
1086
1087 template <class T>
1088 template <class Callback>
1089 auto Future<T>::thenMulti(Callback&& fn)
1090     -> decltype(this->then(std::forward<Callback>(fn))) {
1091   // thenMulti with one callback is just a then
1092   return then(std::forward<Callback>(fn));
1093 }
1094
1095 template <class T>
1096 template <class Callback, class... Callbacks>
1097 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1098     -> decltype(this->then(std::forward<Callback>(fn)).
1099                       thenMulti(std::forward<Callbacks>(fns)...)) {
1100   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1101   return then(std::forward<Callback>(fn)).
1102          thenMulti(std::forward<Callbacks>(fns)...);
1103 }
1104
1105 template <class T>
1106 template <class Callback, class... Callbacks>
1107 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1108                                       Callbacks&&... fns)
1109     -> decltype(this->then(std::forward<Callback>(fn)).
1110                       thenMulti(std::forward<Callbacks>(fns)...)) {
1111   // thenMultiExecutor with two callbacks is
1112   // via(x).then(a).thenMulti(b, ...).via(oldX)
1113   auto oldX = getExecutor();
1114   setExecutor(x);
1115   return then(std::forward<Callback>(fn)).
1116          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1117 }
1118
1119 template <class T>
1120 template <class Callback>
1121 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1122     -> decltype(this->then(std::forward<Callback>(fn))) {
1123   // thenMulti with one callback is just a then with an executor
1124   return then(x, std::forward<Callback>(fn));
1125 }
1126
1127 template <class F>
1128 inline Future<Unit> when(bool p, F&& thunk) {
1129   return p ? std::forward<F>(thunk)().unit() : makeFuture();
1130 }
1131
1132 template <class P, class F>
1133 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1134   if (predicate()) {
1135     auto future = thunk();
1136     return future.then([
1137       predicate = std::forward<P>(predicate),
1138       thunk = std::forward<F>(thunk)
1139     ]() mutable {
1140       return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1141     });
1142   }
1143   return makeFuture();
1144 }
1145
1146 template <class F>
1147 Future<Unit> times(const int n, F&& thunk) {
1148   return folly::whileDo(
1149       [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
1150         return count->fetch_add(1) < n;
1151       },
1152       std::forward<F>(thunk));
1153 }
1154
1155 namespace futures {
1156   template <class It, class F, class ItT, class Result>
1157   std::vector<Future<Result>> map(It first, It last, F func) {
1158     std::vector<Future<Result>> results;
1159     for (auto it = first; it != last; it++) {
1160       results.push_back(it->then(func));
1161     }
1162     return results;
1163   }
1164 }
1165
1166 namespace futures {
1167
1168 namespace detail {
1169
1170 struct retrying_policy_raw_tag {};
1171 struct retrying_policy_fut_tag {};
1172
1173 template <class Policy>
1174 struct retrying_policy_traits {
1175   using ew = exception_wrapper;
1176   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1177   template <class Ret>
1178   using has_op = typename std::integral_constant<bool,
1179         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1180         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1181   using is_raw = has_op<bool>;
1182   using is_fut = has_op<Future<bool>>;
1183   using tag = typename std::conditional<
1184         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1185         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1186 };
1187
1188 template <class Policy, class FF>
1189 typename std::result_of<FF(size_t)>::type
1190 retrying(size_t k, Policy&& p, FF&& ff) {
1191   using F = typename std::result_of<FF(size_t)>::type;
1192   using T = typename F::value_type;
1193   auto f = ff(k++);
1194   return f.onError(
1195       [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
1196           exception_wrapper x) mutable {
1197         auto q = pm(k, x);
1198         return q.then(
1199             [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
1200                 bool r) mutable {
1201               return r ? retrying(k, std::move(pm), std::move(ffm))
1202                        : makeFuture<T>(std::move(xm));
1203             });
1204       });
1205 }
1206
1207 template <class Policy, class FF>
1208 typename std::result_of<FF(size_t)>::type
1209 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1210   auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1211     return makeFuture<bool>(pm(k, x));
1212   };
1213   return retrying(0, std::move(q), std::forward<FF>(ff));
1214 }
1215
1216 template <class Policy, class FF>
1217 typename std::result_of<FF(size_t)>::type
1218 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1219   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1220 }
1221
1222 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1223 template <class URNG>
1224 Duration retryingJitteredExponentialBackoffDur(
1225     size_t n,
1226     Duration backoff_min,
1227     Duration backoff_max,
1228     double jitter_param,
1229     URNG& rng) {
1230   using d = Duration;
1231   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1232   auto jitter = std::exp(dist(rng));
1233   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1234   return std::max(backoff_min, std::min(backoff_max, backoff));
1235 }
1236
1237 template <class Policy, class URNG>
1238 std::function<Future<bool>(size_t, const exception_wrapper&)>
1239 retryingPolicyCappedJitteredExponentialBackoff(
1240     size_t max_tries,
1241     Duration backoff_min,
1242     Duration backoff_max,
1243     double jitter_param,
1244     URNG&& rng,
1245     Policy&& p) {
1246   return [
1247     pm = std::forward<Policy>(p),
1248     max_tries,
1249     backoff_min,
1250     backoff_max,
1251     jitter_param,
1252     rngp = std::forward<URNG>(rng)
1253   ](size_t n, const exception_wrapper& ex) mutable {
1254     if (n == max_tries) {
1255       return makeFuture(false);
1256     }
1257     return pm(n, ex).then(
1258         [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1259             bool v) mutable {
1260           if (!v) {
1261             return makeFuture(false);
1262           }
1263           auto backoff = detail::retryingJitteredExponentialBackoffDur(
1264               n, backoff_min, backoff_max, jitter_param, rngp);
1265           return futures::sleep(backoff).then([] { return true; });
1266         });
1267   };
1268 }
1269
1270 template <class Policy, class URNG>
1271 std::function<Future<bool>(size_t, const exception_wrapper&)>
1272 retryingPolicyCappedJitteredExponentialBackoff(
1273     size_t max_tries,
1274     Duration backoff_min,
1275     Duration backoff_max,
1276     double jitter_param,
1277     URNG&& rng,
1278     Policy&& p,
1279     retrying_policy_raw_tag) {
1280   auto q = [pm = std::forward<Policy>(p)](
1281       size_t n, const exception_wrapper& e) {
1282     return makeFuture(pm(n, e));
1283   };
1284   return retryingPolicyCappedJitteredExponentialBackoff(
1285       max_tries,
1286       backoff_min,
1287       backoff_max,
1288       jitter_param,
1289       std::forward<URNG>(rng),
1290       std::move(q));
1291 }
1292
1293 template <class Policy, class URNG>
1294 std::function<Future<bool>(size_t, const exception_wrapper&)>
1295 retryingPolicyCappedJitteredExponentialBackoff(
1296     size_t max_tries,
1297     Duration backoff_min,
1298     Duration backoff_max,
1299     double jitter_param,
1300     URNG&& rng,
1301     Policy&& p,
1302     retrying_policy_fut_tag) {
1303   return retryingPolicyCappedJitteredExponentialBackoff(
1304       max_tries,
1305       backoff_min,
1306       backoff_max,
1307       jitter_param,
1308       std::forward<URNG>(rng),
1309       std::forward<Policy>(p));
1310 }
1311 }
1312
1313 template <class Policy, class FF>
1314 typename std::result_of<FF(size_t)>::type
1315 retrying(Policy&& p, FF&& ff) {
1316   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1317   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1318 }
1319
1320 inline
1321 std::function<bool(size_t, const exception_wrapper&)>
1322 retryingPolicyBasic(
1323     size_t max_tries) {
1324   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1325 }
1326
1327 template <class Policy, class URNG>
1328 std::function<Future<bool>(size_t, const exception_wrapper&)>
1329 retryingPolicyCappedJitteredExponentialBackoff(
1330     size_t max_tries,
1331     Duration backoff_min,
1332     Duration backoff_max,
1333     double jitter_param,
1334     URNG&& rng,
1335     Policy&& p) {
1336   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1337   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1338       max_tries,
1339       backoff_min,
1340       backoff_max,
1341       jitter_param,
1342       std::forward<URNG>(rng),
1343       std::forward<Policy>(p),
1344       tag());
1345 }
1346
1347 inline
1348 std::function<Future<bool>(size_t, const exception_wrapper&)>
1349 retryingPolicyCappedJitteredExponentialBackoff(
1350     size_t max_tries,
1351     Duration backoff_min,
1352     Duration backoff_max,
1353     double jitter_param) {
1354   auto p = [](size_t, const exception_wrapper&) { return true; };
1355   return retryingPolicyCappedJitteredExponentialBackoff(
1356       max_tries,
1357       backoff_min,
1358       backoff_max,
1359       jitter_param,
1360       ThreadLocalPRNG(),
1361       std::move(p));
1362 }
1363
1364 }
1365
1366 // Instantiate the most common Future types to save compile time
1367 extern template class Future<Unit>;
1368 extern template class Future<bool>;
1369 extern template class Future<int>;
1370 extern template class Future<int64_t>;
1371 extern template class Future<std::string>;
1372 extern template class Future<double>;
1373
1374 } // namespace folly