Use FOLLY_MOBILE to split some functionality
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
33 #else
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/experimental/fibers/Baton.h>
36 #endif
37
38 namespace folly {
39
40 class Timekeeper;
41
42 namespace detail {
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
45 #else
46 typedef folly::Baton<> FutureBatonType;
47 #endif
48 }
49
50 namespace detail {
51   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52 }
53
54 template <class T>
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56   other.core_ = nullptr;
57 }
58
59 template <class T>
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61   std::swap(core_, other.core_);
62   return *this;
63 }
64
65 template <class T>
66 template <typename U, typename>
67 Future<T>::Future(Future<U>&& other) noexcept
68     : core_(detail::Core<T>::convert(other.core_)) {
69   other.core_ = nullptr;
70 }
71
72 template <class T>
73 template <typename U, typename>
74 Future<T>& Future<T>::operator=(Future<U>&& other) noexcept {
75   std::swap(core_, detail::Core<T>::convert(other.core_));
76   return *this;
77 }
78
79 template <class T>
80 template <class T2, typename>
81 Future<T>::Future(T2&& val)
82     : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
83
84 template <class T>
85 template <typename, typename>
86 Future<T>::Future()
87   : core_(new detail::Core<T>(Try<T>(T()))) {}
88
89 template <class T>
90 Future<T>::~Future() {
91   detach();
92 }
93
94 template <class T>
95 void Future<T>::detach() {
96   if (core_) {
97     core_->detachFuture();
98     core_ = nullptr;
99   }
100 }
101
102 template <class T>
103 void Future<T>::throwIfInvalid() const {
104   if (!core_)
105     throw NoState();
106 }
107
108 template <class T>
109 template <class F>
110 void Future<T>::setCallback_(F&& func) {
111   throwIfInvalid();
112   core_->setCallback(std::move(func));
113 }
114
115 // unwrap
116
117 template <class T>
118 template <class F>
119 typename std::enable_if<isFuture<F>::value,
120                         Future<typename isFuture<T>::Inner>>::type
121 Future<T>::unwrap() {
122   return then([](Future<typename isFuture<T>::Inner> internal_future) {
123       return internal_future;
124   });
125 }
126
127 // then
128
129 // Variant: returns a value
130 // e.g. f.then([](Try<T>&& t){ return t.value(); });
131 template <class T>
132 template <typename F, typename R, bool isTry, typename... Args>
133 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
134 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
135   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
136   typedef typename R::ReturnsFuture::Inner B;
137
138   throwIfInvalid();
139
140   // wrap these so we can move them into the lambda
141   folly::MoveWrapper<Promise<B>> p;
142   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
143   folly::MoveWrapper<F> funcm(std::forward<F>(func));
144
145   // grab the Future now before we lose our handle on the Promise
146   auto f = p->getFuture();
147   f.core_->setExecutorNoLock(getExecutor());
148
149   /* This is a bit tricky.
150
151      We can't just close over *this in case this Future gets moved. So we
152      make a new dummy Future. We could figure out something more
153      sophisticated that avoids making a new Future object when it can, as an
154      optimization. But this is correct.
155
156      core_ can't be moved, it is explicitly disallowed (as is copying). But
157      if there's ever a reason to allow it, this is one place that makes that
158      assumption and would need to be fixed. We use a standard shared pointer
159      for core_ (by copying it in), which means in essence obj holds a shared
160      pointer to itself.  But this shouldn't leak because Promise will not
161      outlive the continuation, because Promise will setException() with a
162      broken Promise if it is destructed before completed. We could use a
163      weak pointer but it would have to be converted to a shared pointer when
164      func is executed (because the Future returned by func may possibly
165      persist beyond the callback, if it gets moved), and so it is an
166      optimization to just make it shared from the get-go.
167
168      We have to move in the Promise and func using the MoveWrapper
169      hack. (func could be copied but it's a big drag on perf).
170
171      Two subtle but important points about this design. detail::Core has no
172      back pointers to Future or Promise, so if Future or Promise get moved
173      (and they will be moved in performant code) we don't have to do
174      anything fancy. And because we store the continuation in the
175      detail::Core, not in the Future, we can execute the continuation even
176      after the Future has gone out of scope. This is an intentional design
177      decision. It is likely we will want to be able to cancel a continuation
178      in some circumstances, but I think it should be explicit not implicit
179      in the destruction of the Future used to create it.
180      */
181   setCallback_(
182     [p, funcm](Try<T>&& t) mutable {
183       if (!isTry && t.hasException()) {
184         p->setException(std::move(t.exception()));
185       } else {
186         p->setWith([&]() {
187           return (*funcm)(t.template get<isTry, Args>()...);
188         });
189       }
190     });
191
192   return f;
193 }
194
195 // Variant: returns a Future
196 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
197 template <class T>
198 template <typename F, typename R, bool isTry, typename... Args>
199 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
200 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
201   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
202   typedef typename R::ReturnsFuture::Inner B;
203
204   throwIfInvalid();
205
206   // wrap these so we can move them into the lambda
207   folly::MoveWrapper<Promise<B>> p;
208   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
209   folly::MoveWrapper<F> funcm(std::forward<F>(func));
210
211   // grab the Future now before we lose our handle on the Promise
212   auto f = p->getFuture();
213   f.core_->setExecutorNoLock(getExecutor());
214
215   setCallback_(
216     [p, funcm](Try<T>&& t) mutable {
217       if (!isTry && t.hasException()) {
218         p->setException(std::move(t.exception()));
219       } else {
220         try {
221           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
222           // that didn't throw, now we can steal p
223           f2.setCallback_([p](Try<B>&& b) mutable {
224             p->setTry(std::move(b));
225           });
226         } catch (const std::exception& e) {
227           p->setException(exception_wrapper(std::current_exception(), e));
228         } catch (...) {
229           p->setException(exception_wrapper(std::current_exception()));
230         }
231       }
232     });
233
234   return f;
235 }
236
237 template <typename T>
238 template <typename R, typename Caller, typename... Args>
239   Future<typename isFuture<R>::Inner>
240 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
241   typedef typename std::remove_cv<
242     typename std::remove_reference<
243       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
244   return then([instance, func](Try<T>&& t){
245     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
246   });
247 }
248
249 template <class T>
250 template <class Executor, class Arg, class... Args>
251 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
252   -> decltype(this->then(std::forward<Arg>(arg),
253                          std::forward<Args>(args)...))
254 {
255   auto oldX = getExecutor();
256   setExecutor(x);
257   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
258                via(oldX);
259 }
260
261 template <class T>
262 Future<Unit> Future<T>::then() {
263   return then([] () {});
264 }
265
266 // onError where the callback returns T
267 template <class T>
268 template <class F>
269 typename std::enable_if<
270   !detail::callableWith<F, exception_wrapper>::value &&
271   !detail::Extract<F>::ReturnsFuture::value,
272   Future<T>>::type
273 Future<T>::onError(F&& func) {
274   typedef typename detail::Extract<F>::FirstArg Exn;
275   static_assert(
276       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
277       "Return type of onError callback must be T or Future<T>");
278
279   Promise<T> p;
280   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
281   auto f = p.getFuture();
282   auto pm = folly::makeMoveWrapper(std::move(p));
283   auto funcm = folly::makeMoveWrapper(std::move(func));
284   setCallback_([pm, funcm](Try<T>&& t) mutable {
285     if (!t.template withException<Exn>([&] (Exn& e) {
286           pm->setWith([&]{
287             return (*funcm)(e);
288           });
289         })) {
290       pm->setTry(std::move(t));
291     }
292   });
293
294   return f;
295 }
296
297 // onError where the callback returns Future<T>
298 template <class T>
299 template <class F>
300 typename std::enable_if<
301   !detail::callableWith<F, exception_wrapper>::value &&
302   detail::Extract<F>::ReturnsFuture::value,
303   Future<T>>::type
304 Future<T>::onError(F&& func) {
305   static_assert(
306       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
307       "Return type of onError callback must be T or Future<T>");
308   typedef typename detail::Extract<F>::FirstArg Exn;
309
310   Promise<T> p;
311   auto f = p.getFuture();
312   auto pm = folly::makeMoveWrapper(std::move(p));
313   auto funcm = folly::makeMoveWrapper(std::move(func));
314   setCallback_([pm, funcm](Try<T>&& t) mutable {
315     if (!t.template withException<Exn>([&] (Exn& e) {
316           try {
317             auto f2 = (*funcm)(e);
318             f2.setCallback_([pm](Try<T>&& t2) mutable {
319               pm->setTry(std::move(t2));
320             });
321           } catch (const std::exception& e2) {
322             pm->setException(exception_wrapper(std::current_exception(), e2));
323           } catch (...) {
324             pm->setException(exception_wrapper(std::current_exception()));
325           }
326         })) {
327       pm->setTry(std::move(t));
328     }
329   });
330
331   return f;
332 }
333
334 template <class T>
335 template <class F>
336 Future<T> Future<T>::ensure(F func) {
337   MoveWrapper<F> funcw(std::move(func));
338   return this->then([funcw](Try<T>&& t) mutable {
339     (*funcw)();
340     return makeFuture(std::move(t));
341   });
342 }
343
344 template <class T>
345 template <class F>
346 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
347   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
348   return within(dur, tk)
349     .onError([funcw](TimedOut const&) { return (*funcw)(); });
350 }
351
352 template <class T>
353 template <class F>
354 typename std::enable_if<
355   detail::callableWith<F, exception_wrapper>::value &&
356   detail::Extract<F>::ReturnsFuture::value,
357   Future<T>>::type
358 Future<T>::onError(F&& func) {
359   static_assert(
360       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
361       "Return type of onError callback must be T or Future<T>");
362
363   Promise<T> p;
364   auto f = p.getFuture();
365   auto pm = folly::makeMoveWrapper(std::move(p));
366   auto funcm = folly::makeMoveWrapper(std::move(func));
367   setCallback_([pm, funcm](Try<T> t) mutable {
368     if (t.hasException()) {
369       try {
370         auto f2 = (*funcm)(std::move(t.exception()));
371         f2.setCallback_([pm](Try<T> t2) mutable {
372           pm->setTry(std::move(t2));
373         });
374       } catch (const std::exception& e2) {
375         pm->setException(exception_wrapper(std::current_exception(), e2));
376       } catch (...) {
377         pm->setException(exception_wrapper(std::current_exception()));
378       }
379     } else {
380       pm->setTry(std::move(t));
381     }
382   });
383
384   return f;
385 }
386
387 // onError(exception_wrapper) that returns T
388 template <class T>
389 template <class F>
390 typename std::enable_if<
391   detail::callableWith<F, exception_wrapper>::value &&
392   !detail::Extract<F>::ReturnsFuture::value,
393   Future<T>>::type
394 Future<T>::onError(F&& func) {
395   static_assert(
396       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
397       "Return type of onError callback must be T or Future<T>");
398
399   Promise<T> p;
400   auto f = p.getFuture();
401   auto pm = folly::makeMoveWrapper(std::move(p));
402   auto funcm = folly::makeMoveWrapper(std::move(func));
403   setCallback_([pm, funcm](Try<T> t) mutable {
404     if (t.hasException()) {
405       pm->setWith([&]{
406         return (*funcm)(std::move(t.exception()));
407       });
408     } else {
409       pm->setTry(std::move(t));
410     }
411   });
412
413   return f;
414 }
415
416 template <class T>
417 typename std::add_lvalue_reference<T>::type Future<T>::value() {
418   throwIfInvalid();
419
420   return core_->getTry().value();
421 }
422
423 template <class T>
424 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
425   throwIfInvalid();
426
427   return core_->getTry().value();
428 }
429
430 template <class T>
431 Try<T>& Future<T>::getTry() {
432   throwIfInvalid();
433
434   return core_->getTry();
435 }
436
437 template <class T>
438 Optional<Try<T>> Future<T>::poll() {
439   Optional<Try<T>> o;
440   if (core_->ready()) {
441     o = std::move(core_->getTry());
442   }
443   return o;
444 }
445
446 template <class T>
447 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
448   throwIfInvalid();
449
450   setExecutor(executor, priority);
451
452   return std::move(*this);
453 }
454
455 template <class T>
456 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
457   throwIfInvalid();
458
459   MoveWrapper<Promise<T>> p;
460   auto f = p->getFuture();
461   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
462   return std::move(f).via(executor, priority);
463 }
464
465
466 template <class Func>
467 auto via(Executor* x, Func func)
468   -> Future<typename isFuture<decltype(func())>::Inner>
469 {
470   // TODO make this actually more performant. :-P #7260175
471   return via(x).then(func);
472 }
473
474 template <class T>
475 bool Future<T>::isReady() const {
476   throwIfInvalid();
477   return core_->ready();
478 }
479
480 template <class T>
481 bool Future<T>::hasValue() {
482   return getTry().hasValue();
483 }
484
485 template <class T>
486 bool Future<T>::hasException() {
487   return getTry().hasException();
488 }
489
490 template <class T>
491 void Future<T>::raise(exception_wrapper exception) {
492   core_->raise(std::move(exception));
493 }
494
495 // makeFuture
496
497 template <class T>
498 Future<typename std::decay<T>::type> makeFuture(T&& t) {
499   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
500 }
501
502 inline // for multiple translation units
503 Future<Unit> makeFuture() {
504   return makeFuture(Unit{});
505 }
506
507 // makeFutureWith(Future<T>()) -> Future<T>
508 template <class F>
509 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
510                         typename std::result_of<F()>::type>::type
511 makeFutureWith(F&& func) {
512   using InnerType =
513       typename isFuture<typename std::result_of<F()>::type>::Inner;
514   try {
515     return func();
516   } catch (std::exception& e) {
517     return makeFuture<InnerType>(
518         exception_wrapper(std::current_exception(), e));
519   } catch (...) {
520     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
521   }
522 }
523
524 // makeFutureWith(T()) -> Future<T>
525 // makeFutureWith(void()) -> Future<Unit>
526 template <class F>
527 typename std::enable_if<
528     !(isFuture<typename std::result_of<F()>::type>::value),
529     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
530 makeFutureWith(F&& func) {
531   using LiftedResult =
532       typename Unit::Lift<typename std::result_of<F()>::type>::type;
533   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
534     return func();
535   }));
536 }
537
538 template <class T>
539 Future<T> makeFuture(std::exception_ptr const& e) {
540   return makeFuture(Try<T>(e));
541 }
542
543 template <class T>
544 Future<T> makeFuture(exception_wrapper ew) {
545   return makeFuture(Try<T>(std::move(ew)));
546 }
547
548 template <class T, class E>
549 typename std::enable_if<std::is_base_of<std::exception, E>::value,
550                         Future<T>>::type
551 makeFuture(E const& e) {
552   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
553 }
554
555 template <class T>
556 Future<T> makeFuture(Try<T>&& t) {
557   return Future<T>(new detail::Core<T>(std::move(t)));
558 }
559
560 // via
561 Future<Unit> via(Executor* executor, int8_t priority) {
562   return makeFuture().via(executor, priority);
563 }
564
565 // mapSetCallback calls func(i, Try<T>) when every future completes
566
567 template <class T, class InputIterator, class F>
568 void mapSetCallback(InputIterator first, InputIterator last, F func) {
569   for (size_t i = 0; first != last; ++first, ++i) {
570     first->setCallback_([func, i](Try<T>&& t) {
571       func(i, std::move(t));
572     });
573   }
574 }
575
576 // collectAll (variadic)
577
578 template <typename... Fs>
579 typename detail::CollectAllVariadicContext<
580   typename std::decay<Fs>::type::value_type...>::type
581 collectAll(Fs&&... fs) {
582   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
583     typename std::decay<Fs>::type::value_type...>>();
584   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
585     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
586   return ctx->p.getFuture();
587 }
588
589 // collectAll (iterator)
590
591 template <class InputIterator>
592 Future<
593   std::vector<
594   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
595 collectAll(InputIterator first, InputIterator last) {
596   typedef
597     typename std::iterator_traits<InputIterator>::value_type::value_type T;
598
599   struct CollectAllContext {
600     CollectAllContext(int n) : results(n) {}
601     ~CollectAllContext() {
602       p.setValue(std::move(results));
603     }
604     Promise<std::vector<Try<T>>> p;
605     std::vector<Try<T>> results;
606   };
607
608   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
609   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
610     ctx->results[i] = std::move(t);
611   });
612   return ctx->p.getFuture();
613 }
614
615 // collect (iterator)
616
617 namespace detail {
618
619 template <typename T>
620 struct CollectContext {
621   struct Nothing {
622     explicit Nothing(int /* n */) {}
623   };
624
625   using Result = typename std::conditional<
626     std::is_void<T>::value,
627     void,
628     std::vector<T>>::type;
629
630   using InternalResult = typename std::conditional<
631     std::is_void<T>::value,
632     Nothing,
633     std::vector<Optional<T>>>::type;
634
635   explicit CollectContext(int n) : result(n) {}
636   ~CollectContext() {
637     if (!threw.exchange(true)) {
638       // map Optional<T> -> T
639       std::vector<T> finalResult;
640       finalResult.reserve(result.size());
641       std::transform(result.begin(), result.end(),
642                      std::back_inserter(finalResult),
643                      [](Optional<T>& o) { return std::move(o.value()); });
644       p.setValue(std::move(finalResult));
645     }
646   }
647   inline void setPartialResult(size_t i, Try<T>& t) {
648     result[i] = std::move(t.value());
649   }
650   Promise<Result> p;
651   InternalResult result;
652   std::atomic<bool> threw {false};
653 };
654
655 }
656
657 template <class InputIterator>
658 Future<typename detail::CollectContext<
659   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
660 collect(InputIterator first, InputIterator last) {
661   typedef
662     typename std::iterator_traits<InputIterator>::value_type::value_type T;
663
664   auto ctx = std::make_shared<detail::CollectContext<T>>(
665     std::distance(first, last));
666   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
667     if (t.hasException()) {
668        if (!ctx->threw.exchange(true)) {
669          ctx->p.setException(std::move(t.exception()));
670        }
671      } else if (!ctx->threw) {
672        ctx->setPartialResult(i, t);
673      }
674   });
675   return ctx->p.getFuture();
676 }
677
678 // collect (variadic)
679
680 template <typename... Fs>
681 typename detail::CollectVariadicContext<
682   typename std::decay<Fs>::type::value_type...>::type
683 collect(Fs&&... fs) {
684   auto ctx = std::make_shared<detail::CollectVariadicContext<
685     typename std::decay<Fs>::type::value_type...>>();
686   detail::collectVariadicHelper<detail::CollectVariadicContext>(
687     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
688   return ctx->p.getFuture();
689 }
690
691 // collectAny (iterator)
692
693 template <class InputIterator>
694 Future<
695   std::pair<size_t,
696             Try<
697               typename
698               std::iterator_traits<InputIterator>::value_type::value_type>>>
699 collectAny(InputIterator first, InputIterator last) {
700   typedef
701     typename std::iterator_traits<InputIterator>::value_type::value_type T;
702
703   struct CollectAnyContext {
704     CollectAnyContext() {};
705     Promise<std::pair<size_t, Try<T>>> p;
706     std::atomic<bool> done {false};
707   };
708
709   auto ctx = std::make_shared<CollectAnyContext>();
710   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
711     if (!ctx->done.exchange(true)) {
712       ctx->p.setValue(std::make_pair(i, std::move(t)));
713     }
714   });
715   return ctx->p.getFuture();
716 }
717
718 // collectN (iterator)
719
720 template <class InputIterator>
721 Future<std::vector<std::pair<size_t, Try<typename
722   std::iterator_traits<InputIterator>::value_type::value_type>>>>
723 collectN(InputIterator first, InputIterator last, size_t n) {
724   typedef typename
725     std::iterator_traits<InputIterator>::value_type::value_type T;
726   typedef std::vector<std::pair<size_t, Try<T>>> V;
727
728   struct CollectNContext {
729     V v;
730     std::atomic<size_t> completed = {0};
731     Promise<V> p;
732   };
733   auto ctx = std::make_shared<CollectNContext>();
734
735   if (size_t(std::distance(first, last)) < n) {
736     ctx->p.setException(std::runtime_error("Not enough futures"));
737   } else {
738     // for each completed Future, increase count and add to vector, until we
739     // have n completed futures at which point we fulfil our Promise with the
740     // vector
741     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
742       auto c = ++ctx->completed;
743       if (c <= n) {
744         assert(ctx->v.size() < n);
745         ctx->v.emplace_back(i, std::move(t));
746         if (c == n) {
747           ctx->p.setTry(Try<V>(std::move(ctx->v)));
748         }
749       }
750     });
751   }
752
753   return ctx->p.getFuture();
754 }
755
756 // reduce (iterator)
757
758 template <class It, class T, class F>
759 Future<T> reduce(It first, It last, T&& initial, F&& func) {
760   if (first == last) {
761     return makeFuture(std::move(initial));
762   }
763
764   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
765   typedef typename std::conditional<
766     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
767   typedef isTry<Arg> IsTry;
768
769   folly::MoveWrapper<T> minitial(std::move(initial));
770   auto sfunc = std::make_shared<F>(std::move(func));
771
772   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
773     return (*sfunc)(std::move(*minitial),
774                 head.template get<IsTry::value, Arg&&>());
775   });
776
777   for (++first; first != last; ++first) {
778     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
779       return (*sfunc)(std::move(std::get<0>(t).value()),
780                   // Either return a ItT&& or a Try<ItT>&& depending
781                   // on the type of the argument of func.
782                   std::get<1>(t).template get<IsTry::value, Arg&&>());
783     });
784   }
785
786   return f;
787 }
788
789 // window (collection)
790
791 template <class Collection, class F, class ItT, class Result>
792 std::vector<Future<Result>>
793 window(Collection input, F func, size_t n) {
794   struct WindowContext {
795     WindowContext(Collection&& i, F&& fn)
796         : input_(std::move(i)), promises_(input_.size()),
797           func_(std::move(fn))
798       {}
799     std::atomic<size_t> i_ {0};
800     Collection input_;
801     std::vector<Promise<Result>> promises_;
802     F func_;
803
804     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
805       size_t i = ctx->i_++;
806       if (i < ctx->input_.size()) {
807         // Using setCallback_ directly since we don't need the Future
808         ctx->func_(std::move(ctx->input_[i])).setCallback_(
809           // ctx is captured by value
810           [ctx, i](Try<Result>&& t) {
811             ctx->promises_[i].setTry(std::move(t));
812             // Chain another future onto this one
813             spawn(std::move(ctx));
814           });
815       }
816     }
817   };
818
819   auto max = std::min(n, input.size());
820
821   auto ctx = std::make_shared<WindowContext>(
822     std::move(input), std::move(func));
823
824   for (size_t i = 0; i < max; ++i) {
825     // Start the first n Futures
826     WindowContext::spawn(ctx);
827   }
828
829   std::vector<Future<Result>> futures;
830   futures.reserve(ctx->promises_.size());
831   for (auto& promise : ctx->promises_) {
832     futures.emplace_back(promise.getFuture());
833   }
834
835   return futures;
836 }
837
838 // reduce
839
840 template <class T>
841 template <class I, class F>
842 Future<I> Future<T>::reduce(I&& initial, F&& func) {
843   folly::MoveWrapper<I> minitial(std::move(initial));
844   folly::MoveWrapper<F> mfunc(std::move(func));
845   return then([minitial, mfunc](T& vals) mutable {
846     auto ret = std::move(*minitial);
847     for (auto& val : vals) {
848       ret = (*mfunc)(std::move(ret), std::move(val));
849     }
850     return ret;
851   });
852 }
853
854 // unorderedReduce (iterator)
855
856 template <class It, class T, class F, class ItT, class Arg>
857 Future<T> unorderedReduce(It first, It last, T initial, F func) {
858   if (first == last) {
859     return makeFuture(std::move(initial));
860   }
861
862   typedef isTry<Arg> IsTry;
863
864   struct UnorderedReduceContext {
865     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
866         : lock_(), memo_(makeFuture<T>(std::move(memo))),
867           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
868       {};
869     folly::MicroSpinLock lock_; // protects memo_ and numThens_
870     Future<T> memo_;
871     F func_;
872     size_t numThens_; // how many Futures completed and called .then()
873     size_t numFutures_; // how many Futures in total
874     Promise<T> promise_;
875   };
876
877   auto ctx = std::make_shared<UnorderedReduceContext>(
878     std::move(initial), std::move(func), std::distance(first, last));
879
880   mapSetCallback<ItT>(
881       first,
882       last,
883       [ctx](size_t /* i */, Try<ItT>&& t) {
884         folly::MoveWrapper<Try<ItT>> mt(std::move(t));
885         // Futures can be completed in any order, simultaneously.
886         // To make this non-blocking, we create a new Future chain in
887         // the order of completion to reduce the values.
888         // The spinlock just protects chaining a new Future, not actually
889         // executing the reduce, which should be really fast.
890         folly::MSLGuard lock(ctx->lock_);
891         ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
892           // Either return a ItT&& or a Try<ItT>&& depending
893           // on the type of the argument of func.
894           return ctx->func_(std::move(v),
895                             mt->template get<IsTry::value, Arg&&>());
896         });
897         if (++ctx->numThens_ == ctx->numFutures_) {
898           // After reducing the value of the last Future, fulfill the Promise
899           ctx->memo_.setCallback_(
900               [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
901         }
902       });
903
904   return ctx->promise_.getFuture();
905 }
906
907 // within
908
909 template <class T>
910 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
911   return within(dur, TimedOut(), tk);
912 }
913
914 template <class T>
915 template <class E>
916 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
917
918   struct Context {
919     Context(E ex) : exception(std::move(ex)), promise() {}
920     E exception;
921     Future<Unit> thisFuture;
922     Promise<T> promise;
923     std::atomic<bool> token {false};
924   };
925
926   std::shared_ptr<Timekeeper> tks;
927   if (!tk) {
928     tks = folly::detail::getTimekeeperSingleton();
929     tk = DCHECK_NOTNULL(tks.get());
930   }
931
932   auto ctx = std::make_shared<Context>(std::move(e));
933
934   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
935     // TODO: "this" completed first, cancel "after"
936     if (ctx->token.exchange(true) == false) {
937       ctx->promise.setTry(std::move(t));
938     }
939   });
940
941   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
942     // "after" completed first, cancel "this"
943     ctx->thisFuture.raise(TimedOut());
944     if (ctx->token.exchange(true) == false) {
945       if (t.hasException()) {
946         ctx->promise.setException(std::move(t.exception()));
947       } else {
948         ctx->promise.setException(std::move(ctx->exception));
949       }
950     }
951   });
952
953   return ctx->promise.getFuture().via(getExecutor());
954 }
955
956 // delayed
957
958 template <class T>
959 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
960   return collectAll(*this, futures::sleep(dur, tk))
961     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
962       Try<T>& t = std::get<0>(tup);
963       return makeFuture<T>(std::move(t));
964     });
965 }
966
967 namespace detail {
968
969 template <class T>
970 void waitImpl(Future<T>& f) {
971   // short-circuit if there's nothing to do
972   if (f.isReady()) return;
973
974   FutureBatonType baton;
975   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
976   baton.wait();
977   assert(f.isReady());
978 }
979
980 template <class T>
981 void waitImpl(Future<T>& f, Duration dur) {
982   // short-circuit if there's nothing to do
983   if (f.isReady()) return;
984
985   folly::MoveWrapper<Promise<T>> promise;
986   auto ret = promise->getFuture();
987   auto baton = std::make_shared<FutureBatonType>();
988   f.setCallback_([baton, promise](Try<T>&& t) mutable {
989     promise->setTry(std::move(t));
990     baton->post();
991   });
992   f = std::move(ret);
993   if (baton->timed_wait(dur)) {
994     assert(f.isReady());
995   }
996 }
997
998 template <class T>
999 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1000   // Set callback so to ensure that the via executor has something on it
1001   // so that once the preceding future triggers this callback, drive will
1002   // always have a callback to satisfy it
1003   if (f.isReady())
1004     return;
1005   f = f.then([](T&& t) { return std::move(t); });
1006   while (!f.isReady()) {
1007     e->drive();
1008   }
1009   assert(f.isReady());
1010 }
1011
1012 } // detail
1013
1014 template <class T>
1015 Future<T>& Future<T>::wait() & {
1016   detail::waitImpl(*this);
1017   return *this;
1018 }
1019
1020 template <class T>
1021 Future<T>&& Future<T>::wait() && {
1022   detail::waitImpl(*this);
1023   return std::move(*this);
1024 }
1025
1026 template <class T>
1027 Future<T>& Future<T>::wait(Duration dur) & {
1028   detail::waitImpl(*this, dur);
1029   return *this;
1030 }
1031
1032 template <class T>
1033 Future<T>&& Future<T>::wait(Duration dur) && {
1034   detail::waitImpl(*this, dur);
1035   return std::move(*this);
1036 }
1037
1038 template <class T>
1039 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1040   detail::waitViaImpl(*this, e);
1041   return *this;
1042 }
1043
1044 template <class T>
1045 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1046   detail::waitViaImpl(*this, e);
1047   return std::move(*this);
1048 }
1049
1050 template <class T>
1051 T Future<T>::get() {
1052   return std::move(wait().value());
1053 }
1054
1055 template <class T>
1056 T Future<T>::get(Duration dur) {
1057   wait(dur);
1058   if (isReady()) {
1059     return std::move(value());
1060   } else {
1061     throw TimedOut();
1062   }
1063 }
1064
1065 template <class T>
1066 T Future<T>::getVia(DrivableExecutor* e) {
1067   return std::move(waitVia(e).value());
1068 }
1069
1070 namespace detail {
1071   template <class T>
1072   struct TryEquals {
1073     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1074       return t1.value() == t2.value();
1075     }
1076   };
1077 }
1078
1079 template <class T>
1080 Future<bool> Future<T>::willEqual(Future<T>& f) {
1081   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1082     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1083       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1084     } else {
1085       return false;
1086       }
1087   });
1088 }
1089
1090 template <class T>
1091 template <class F>
1092 Future<T> Future<T>::filter(F predicate) {
1093   auto p = folly::makeMoveWrapper(std::move(predicate));
1094   return this->then([p](T val) {
1095     T const& valConstRef = val;
1096     if (!(*p)(valConstRef)) {
1097       throw PredicateDoesNotObtain();
1098     }
1099     return val;
1100   });
1101 }
1102
1103 template <class T>
1104 template <class Callback>
1105 auto Future<T>::thenMulti(Callback&& fn)
1106     -> decltype(this->then(std::forward<Callback>(fn))) {
1107   // thenMulti with one callback is just a then
1108   return then(std::forward<Callback>(fn));
1109 }
1110
1111 template <class T>
1112 template <class Callback, class... Callbacks>
1113 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1114     -> decltype(this->then(std::forward<Callback>(fn)).
1115                       thenMulti(std::forward<Callbacks>(fns)...)) {
1116   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1117   return then(std::forward<Callback>(fn)).
1118          thenMulti(std::forward<Callbacks>(fns)...);
1119 }
1120
1121 template <class T>
1122 template <class Callback, class... Callbacks>
1123 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1124                                       Callbacks&&... fns)
1125     -> decltype(this->then(std::forward<Callback>(fn)).
1126                       thenMulti(std::forward<Callbacks>(fns)...)) {
1127   // thenMultiExecutor with two callbacks is
1128   // via(x).then(a).thenMulti(b, ...).via(oldX)
1129   auto oldX = getExecutor();
1130   setExecutor(x);
1131   return then(std::forward<Callback>(fn)).
1132          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1133 }
1134
1135 template <class T>
1136 template <class Callback>
1137 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1138     -> decltype(this->then(std::forward<Callback>(fn))) {
1139   // thenMulti with one callback is just a then with an executor
1140   return then(x, std::forward<Callback>(fn));
1141 }
1142
1143 template <class F>
1144 inline Future<Unit> when(bool p, F thunk) {
1145   return p ? thunk().unit() : makeFuture();
1146 }
1147
1148 template <class P, class F>
1149 Future<Unit> whileDo(P predicate, F thunk) {
1150   if (predicate()) {
1151     return thunk().then([=] {
1152       return whileDo(predicate, thunk);
1153     });
1154   }
1155   return makeFuture();
1156 }
1157
1158 template <class F>
1159 Future<Unit> times(const int n, F thunk) {
1160   auto count = folly::makeMoveWrapper(
1161     std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
1162   );
1163   return folly::whileDo([=]() mutable {
1164       return (*count)->fetch_add(1) < n;
1165     }, thunk);
1166 }
1167
1168 namespace futures {
1169   template <class It, class F, class ItT, class Result>
1170   std::vector<Future<Result>> map(It first, It last, F func) {
1171     std::vector<Future<Result>> results;
1172     for (auto it = first; it != last; it++) {
1173       results.push_back(it->then(func));
1174     }
1175     return results;
1176   }
1177 }
1178
1179 namespace futures {
1180
1181 namespace detail {
1182
1183 struct retrying_policy_raw_tag {};
1184 struct retrying_policy_fut_tag {};
1185
1186 template <class Policy>
1187 struct retrying_policy_traits {
1188   using ew = exception_wrapper;
1189   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1190   template <class Ret>
1191   using has_op = typename std::integral_constant<bool,
1192         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1193         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1194   using is_raw = has_op<bool>;
1195   using is_fut = has_op<Future<bool>>;
1196   using tag = typename std::conditional<
1197         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1198         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1199 };
1200
1201 template <class Policy, class FF>
1202 typename std::result_of<FF(size_t)>::type
1203 retrying(size_t k, Policy&& p, FF&& ff) {
1204   using F = typename std::result_of<FF(size_t)>::type;
1205   using T = typename F::value_type;
1206   auto f = ff(k++);
1207   auto pm = makeMoveWrapper(p);
1208   auto ffm = makeMoveWrapper(ff);
1209   return f.onError([=](exception_wrapper x) mutable {
1210       auto q = (*pm)(k, x);
1211       auto xm = makeMoveWrapper(std::move(x));
1212       return q.then([=](bool r) mutable {
1213           return r
1214             ? retrying(k, pm.move(), ffm.move())
1215             : makeFuture<T>(xm.move());
1216       });
1217   });
1218 }
1219
1220 template <class Policy, class FF>
1221 typename std::result_of<FF(size_t)>::type
1222 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1223   auto pm = makeMoveWrapper(std::move(p));
1224   auto q = [=](size_t k, exception_wrapper x) {
1225     return makeFuture<bool>((*pm)(k, x));
1226   };
1227   return retrying(0, std::move(q), std::forward<FF>(ff));
1228 }
1229
1230 template <class Policy, class FF>
1231 typename std::result_of<FF(size_t)>::type
1232 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1233   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1234 }
1235
1236 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1237 template <class URNG>
1238 Duration retryingJitteredExponentialBackoffDur(
1239     size_t n,
1240     Duration backoff_min,
1241     Duration backoff_max,
1242     double jitter_param,
1243     URNG& rng) {
1244   using d = Duration;
1245   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1246   auto jitter = std::exp(dist(rng));
1247   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1248   return std::max(backoff_min, std::min(backoff_max, backoff));
1249 }
1250
1251 template <class Policy, class URNG>
1252 std::function<Future<bool>(size_t, const exception_wrapper&)>
1253 retryingPolicyCappedJitteredExponentialBackoff(
1254     size_t max_tries,
1255     Duration backoff_min,
1256     Duration backoff_max,
1257     double jitter_param,
1258     URNG rng,
1259     Policy&& p) {
1260   auto pm = makeMoveWrapper(std::move(p));
1261   auto rngp = std::make_shared<URNG>(std::move(rng));
1262   return [=](size_t n, const exception_wrapper& ex) mutable {
1263     if (n == max_tries) { return makeFuture(false); }
1264     return (*pm)(n, ex).then([=](bool v) {
1265         if (!v) { return makeFuture(false); }
1266         auto backoff = detail::retryingJitteredExponentialBackoffDur(
1267             n, backoff_min, backoff_max, jitter_param, *rngp);
1268         return futures::sleep(backoff).then([] { return true; });
1269     });
1270   };
1271 }
1272
1273 template <class Policy, class URNG>
1274 std::function<Future<bool>(size_t, const exception_wrapper&)>
1275 retryingPolicyCappedJitteredExponentialBackoff(
1276     size_t max_tries,
1277     Duration backoff_min,
1278     Duration backoff_max,
1279     double jitter_param,
1280     URNG rng,
1281     Policy&& p,
1282     retrying_policy_raw_tag) {
1283   auto pm = makeMoveWrapper(std::move(p));
1284   auto q = [=](size_t n, const exception_wrapper& e) {
1285     return makeFuture((*pm)(n, e));
1286   };
1287   return retryingPolicyCappedJitteredExponentialBackoff(
1288       max_tries,
1289       backoff_min,
1290       backoff_max,
1291       jitter_param,
1292       std::move(rng),
1293       std::move(q));
1294 }
1295
1296 template <class Policy, class URNG>
1297 std::function<Future<bool>(size_t, const exception_wrapper&)>
1298 retryingPolicyCappedJitteredExponentialBackoff(
1299     size_t max_tries,
1300     Duration backoff_min,
1301     Duration backoff_max,
1302     double jitter_param,
1303     URNG rng,
1304     Policy&& p,
1305     retrying_policy_fut_tag) {
1306   return retryingPolicyCappedJitteredExponentialBackoff(
1307       max_tries,
1308       backoff_min,
1309       backoff_max,
1310       jitter_param,
1311       std::move(rng),
1312       std::move(p));
1313 }
1314
1315 }
1316
1317 template <class Policy, class FF>
1318 typename std::result_of<FF(size_t)>::type
1319 retrying(Policy&& p, FF&& ff) {
1320   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1321   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1322 }
1323
1324 inline
1325 std::function<bool(size_t, const exception_wrapper&)>
1326 retryingPolicyBasic(
1327     size_t max_tries) {
1328   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1329 }
1330
1331 template <class Policy, class URNG>
1332 std::function<Future<bool>(size_t, const exception_wrapper&)>
1333 retryingPolicyCappedJitteredExponentialBackoff(
1334     size_t max_tries,
1335     Duration backoff_min,
1336     Duration backoff_max,
1337     double jitter_param,
1338     URNG rng,
1339     Policy&& p) {
1340   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1341   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1342       max_tries,
1343       backoff_min,
1344       backoff_max,
1345       jitter_param,
1346       std::move(rng),
1347       std::move(p),
1348       tag());
1349 }
1350
1351 inline
1352 std::function<Future<bool>(size_t, const exception_wrapper&)>
1353 retryingPolicyCappedJitteredExponentialBackoff(
1354     size_t max_tries,
1355     Duration backoff_min,
1356     Duration backoff_max,
1357     double jitter_param) {
1358   auto p = [](size_t, const exception_wrapper&) { return true; };
1359   return retryingPolicyCappedJitteredExponentialBackoff(
1360       max_tries,
1361       backoff_min,
1362       backoff_max,
1363       jitter_param,
1364       ThreadLocalPRNG(),
1365       std::move(p));
1366 }
1367
1368 }
1369
1370 // Instantiate the most common Future types to save compile time
1371 extern template class Future<Unit>;
1372 extern template class Future<bool>;
1373 extern template class Future<int>;
1374 extern template class Future<int64_t>;
1375 extern template class Future<std::string>;
1376 extern template class Future<double>;
1377
1378 } // namespace folly