6fb85a33e34f4b06d5e5da1d653b4e09ac20e1e4
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/experimental/fibers/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 namespace folly {
32
33 class Timekeeper;
34
35 namespace detail {
36   Timekeeper* getTimekeeperSingleton();
37 }
38
39 template <class T>
40 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
41   other.core_ = nullptr;
42 }
43
44 template <class T>
45 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
46   std::swap(core_, other.core_);
47   return *this;
48 }
49
50 template <class T>
51 template <class T2, typename>
52 Future<T>::Future(T2&& val)
53   : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
54
55 template <class T>
56 template <typename, typename>
57 Future<T>::Future()
58   : core_(new detail::Core<T>(Try<T>(T()))) {}
59
60 template <class T>
61 Future<T>::~Future() {
62   detach();
63 }
64
65 template <class T>
66 void Future<T>::detach() {
67   if (core_) {
68     core_->detachFuture();
69     core_ = nullptr;
70   }
71 }
72
73 template <class T>
74 void Future<T>::throwIfInvalid() const {
75   if (!core_)
76     throw NoState();
77 }
78
79 template <class T>
80 template <class F>
81 void Future<T>::setCallback_(F&& func) {
82   throwIfInvalid();
83   core_->setCallback(std::move(func));
84 }
85
86 // unwrap
87
88 template <class T>
89 template <class F>
90 typename std::enable_if<isFuture<F>::value,
91                         Future<typename isFuture<T>::Inner>>::type
92 Future<T>::unwrap() {
93   return then([](Future<typename isFuture<T>::Inner> internal_future) {
94       return internal_future;
95   });
96 }
97
98 // then
99
100 // Variant: returns a value
101 // e.g. f.then([](Try<T>&& t){ return t.value(); });
102 template <class T>
103 template <typename F, typename R, bool isTry, typename... Args>
104 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
105 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
106   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
107   typedef typename R::ReturnsFuture::Inner B;
108
109   throwIfInvalid();
110
111   // wrap these so we can move them into the lambda
112   folly::MoveWrapper<Promise<B>> p;
113   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
114   folly::MoveWrapper<F> funcm(std::forward<F>(func));
115
116   // grab the Future now before we lose our handle on the Promise
117   auto f = p->getFuture();
118   f.core_->setExecutorNoLock(getExecutor());
119
120   /* This is a bit tricky.
121
122      We can't just close over *this in case this Future gets moved. So we
123      make a new dummy Future. We could figure out something more
124      sophisticated that avoids making a new Future object when it can, as an
125      optimization. But this is correct.
126
127      core_ can't be moved, it is explicitly disallowed (as is copying). But
128      if there's ever a reason to allow it, this is one place that makes that
129      assumption and would need to be fixed. We use a standard shared pointer
130      for core_ (by copying it in), which means in essence obj holds a shared
131      pointer to itself.  But this shouldn't leak because Promise will not
132      outlive the continuation, because Promise will setException() with a
133      broken Promise if it is destructed before completed. We could use a
134      weak pointer but it would have to be converted to a shared pointer when
135      func is executed (because the Future returned by func may possibly
136      persist beyond the callback, if it gets moved), and so it is an
137      optimization to just make it shared from the get-go.
138
139      We have to move in the Promise and func using the MoveWrapper
140      hack. (func could be copied but it's a big drag on perf).
141
142      Two subtle but important points about this design. detail::Core has no
143      back pointers to Future or Promise, so if Future or Promise get moved
144      (and they will be moved in performant code) we don't have to do
145      anything fancy. And because we store the continuation in the
146      detail::Core, not in the Future, we can execute the continuation even
147      after the Future has gone out of scope. This is an intentional design
148      decision. It is likely we will want to be able to cancel a continuation
149      in some circumstances, but I think it should be explicit not implicit
150      in the destruction of the Future used to create it.
151      */
152   setCallback_(
153     [p, funcm](Try<T>&& t) mutable {
154       if (!isTry && t.hasException()) {
155         p->setException(std::move(t.exception()));
156       } else {
157         p->setWith([&]() {
158           return (*funcm)(t.template get<isTry, Args>()...);
159         });
160       }
161     });
162
163   return f;
164 }
165
166 // Variant: returns a Future
167 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
168 template <class T>
169 template <typename F, typename R, bool isTry, typename... Args>
170 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
171 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
172   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
173   typedef typename R::ReturnsFuture::Inner B;
174
175   throwIfInvalid();
176
177   // wrap these so we can move them into the lambda
178   folly::MoveWrapper<Promise<B>> p;
179   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
180   folly::MoveWrapper<F> funcm(std::forward<F>(func));
181
182   // grab the Future now before we lose our handle on the Promise
183   auto f = p->getFuture();
184   f.core_->setExecutorNoLock(getExecutor());
185
186   setCallback_(
187     [p, funcm](Try<T>&& t) mutable {
188       if (!isTry && t.hasException()) {
189         p->setException(std::move(t.exception()));
190       } else {
191         try {
192           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
193           // that didn't throw, now we can steal p
194           f2.setCallback_([p](Try<B>&& b) mutable {
195             p->setTry(std::move(b));
196           });
197         } catch (const std::exception& e) {
198           p->setException(exception_wrapper(std::current_exception(), e));
199         } catch (...) {
200           p->setException(exception_wrapper(std::current_exception()));
201         }
202       }
203     });
204
205   return f;
206 }
207
208 template <typename T>
209 template <typename R, typename Caller, typename... Args>
210   Future<typename isFuture<R>::Inner>
211 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
212   typedef typename std::remove_cv<
213     typename std::remove_reference<
214       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
215   return then([instance, func](Try<T>&& t){
216     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
217   });
218 }
219
220 template <class T>
221 template <class Executor, class Arg, class... Args>
222 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
223   -> decltype(this->then(std::forward<Arg>(arg),
224                          std::forward<Args>(args)...))
225 {
226   auto oldX = getExecutor();
227   setExecutor(x);
228   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
229                via(oldX);
230 }
231
232 template <class T>
233 Future<Unit> Future<T>::then() {
234   return then([] () {});
235 }
236
237 // onError where the callback returns T
238 template <class T>
239 template <class F>
240 typename std::enable_if<
241   !detail::callableWith<F, exception_wrapper>::value &&
242   !detail::Extract<F>::ReturnsFuture::value,
243   Future<T>>::type
244 Future<T>::onError(F&& func) {
245   typedef typename detail::Extract<F>::FirstArg Exn;
246   static_assert(
247       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
248       "Return type of onError callback must be T or Future<T>");
249
250   Promise<T> p;
251   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
252   auto f = p.getFuture();
253   auto pm = folly::makeMoveWrapper(std::move(p));
254   auto funcm = folly::makeMoveWrapper(std::move(func));
255   setCallback_([pm, funcm](Try<T>&& t) mutable {
256     if (!t.template withException<Exn>([&] (Exn& e) {
257           pm->setWith([&]{
258             return (*funcm)(e);
259           });
260         })) {
261       pm->setTry(std::move(t));
262     }
263   });
264
265   return f;
266 }
267
268 // onError where the callback returns Future<T>
269 template <class T>
270 template <class F>
271 typename std::enable_if<
272   !detail::callableWith<F, exception_wrapper>::value &&
273   detail::Extract<F>::ReturnsFuture::value,
274   Future<T>>::type
275 Future<T>::onError(F&& func) {
276   static_assert(
277       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
278       "Return type of onError callback must be T or Future<T>");
279   typedef typename detail::Extract<F>::FirstArg Exn;
280
281   Promise<T> p;
282   auto f = p.getFuture();
283   auto pm = folly::makeMoveWrapper(std::move(p));
284   auto funcm = folly::makeMoveWrapper(std::move(func));
285   setCallback_([pm, funcm](Try<T>&& t) mutable {
286     if (!t.template withException<Exn>([&] (Exn& e) {
287           try {
288             auto f2 = (*funcm)(e);
289             f2.setCallback_([pm](Try<T>&& t2) mutable {
290               pm->setTry(std::move(t2));
291             });
292           } catch (const std::exception& e2) {
293             pm->setException(exception_wrapper(std::current_exception(), e2));
294           } catch (...) {
295             pm->setException(exception_wrapper(std::current_exception()));
296           }
297         })) {
298       pm->setTry(std::move(t));
299     }
300   });
301
302   return f;
303 }
304
305 template <class T>
306 template <class F>
307 Future<T> Future<T>::ensure(F func) {
308   MoveWrapper<F> funcw(std::move(func));
309   return this->then([funcw](Try<T>&& t) mutable {
310     (*funcw)();
311     return makeFuture(std::move(t));
312   });
313 }
314
315 template <class T>
316 template <class F>
317 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
318   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
319   return within(dur, tk)
320     .onError([funcw](TimedOut const&) { return (*funcw)(); });
321 }
322
323 template <class T>
324 template <class F>
325 typename std::enable_if<
326   detail::callableWith<F, exception_wrapper>::value &&
327   detail::Extract<F>::ReturnsFuture::value,
328   Future<T>>::type
329 Future<T>::onError(F&& func) {
330   static_assert(
331       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
332       "Return type of onError callback must be T or Future<T>");
333
334   Promise<T> p;
335   auto f = p.getFuture();
336   auto pm = folly::makeMoveWrapper(std::move(p));
337   auto funcm = folly::makeMoveWrapper(std::move(func));
338   setCallback_([pm, funcm](Try<T> t) mutable {
339     if (t.hasException()) {
340       try {
341         auto f2 = (*funcm)(std::move(t.exception()));
342         f2.setCallback_([pm](Try<T> t2) mutable {
343           pm->setTry(std::move(t2));
344         });
345       } catch (const std::exception& e2) {
346         pm->setException(exception_wrapper(std::current_exception(), e2));
347       } catch (...) {
348         pm->setException(exception_wrapper(std::current_exception()));
349       }
350     } else {
351       pm->setTry(std::move(t));
352     }
353   });
354
355   return f;
356 }
357
358 // onError(exception_wrapper) that returns T
359 template <class T>
360 template <class F>
361 typename std::enable_if<
362   detail::callableWith<F, exception_wrapper>::value &&
363   !detail::Extract<F>::ReturnsFuture::value,
364   Future<T>>::type
365 Future<T>::onError(F&& func) {
366   static_assert(
367       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
368       "Return type of onError callback must be T or Future<T>");
369
370   Promise<T> p;
371   auto f = p.getFuture();
372   auto pm = folly::makeMoveWrapper(std::move(p));
373   auto funcm = folly::makeMoveWrapper(std::move(func));
374   setCallback_([pm, funcm](Try<T> t) mutable {
375     if (t.hasException()) {
376       pm->setWith([&]{
377         return (*funcm)(std::move(t.exception()));
378       });
379     } else {
380       pm->setTry(std::move(t));
381     }
382   });
383
384   return f;
385 }
386
387 template <class T>
388 typename std::add_lvalue_reference<T>::type Future<T>::value() {
389   throwIfInvalid();
390
391   return core_->getTry().value();
392 }
393
394 template <class T>
395 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
396   throwIfInvalid();
397
398   return core_->getTry().value();
399 }
400
401 template <class T>
402 Try<T>& Future<T>::getTry() {
403   throwIfInvalid();
404
405   return core_->getTry();
406 }
407
408 template <class T>
409 Optional<Try<T>> Future<T>::poll() {
410   Optional<Try<T>> o;
411   if (core_->ready()) {
412     o = std::move(core_->getTry());
413   }
414   return o;
415 }
416
417 template <class T>
418 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
419   throwIfInvalid();
420
421   setExecutor(executor, priority);
422
423   return std::move(*this);
424 }
425
426 template <class T>
427 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
428   throwIfInvalid();
429
430   MoveWrapper<Promise<T>> p;
431   auto f = p->getFuture();
432   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
433   return std::move(f).via(executor, priority);
434 }
435
436
437 template <class Func>
438 auto via(Executor* x, Func func)
439   -> Future<typename isFuture<decltype(func())>::Inner>
440 {
441   // TODO make this actually more performant. :-P #7260175
442   return via(x).then(func);
443 }
444
445 template <class T>
446 bool Future<T>::isReady() const {
447   throwIfInvalid();
448   return core_->ready();
449 }
450
451 template <class T>
452 bool Future<T>::hasValue() {
453   return getTry().hasValue();
454 }
455
456 template <class T>
457 bool Future<T>::hasException() {
458   return getTry().hasException();
459 }
460
461 template <class T>
462 void Future<T>::raise(exception_wrapper exception) {
463   core_->raise(std::move(exception));
464 }
465
466 // makeFuture
467
468 template <class T>
469 Future<typename std::decay<T>::type> makeFuture(T&& t) {
470   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
471 }
472
473 inline // for multiple translation units
474 Future<Unit> makeFuture() {
475   return makeFuture(Unit{});
476 }
477
478 // makeFutureWith(Future<T>()) -> Future<T>
479 template <class F>
480 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
481                         typename std::result_of<F()>::type>::type
482 makeFutureWith(F&& func) {
483   using InnerType =
484       typename isFuture<typename std::result_of<F()>::type>::Inner;
485   try {
486     return func();
487   } catch (std::exception& e) {
488     return makeFuture<InnerType>(
489         exception_wrapper(std::current_exception(), e));
490   } catch (...) {
491     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
492   }
493 }
494
495 // makeFutureWith(T()) -> Future<T>
496 // makeFutureWith(void()) -> Future<Unit>
497 template <class F>
498 typename std::enable_if<
499     !(isFuture<typename std::result_of<F()>::type>::value),
500     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
501 makeFutureWith(F&& func) {
502   using LiftedResult =
503       typename Unit::Lift<typename std::result_of<F()>::type>::type;
504   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
505     return func();
506   }));
507 }
508
509 template <class T>
510 Future<T> makeFuture(std::exception_ptr const& e) {
511   return makeFuture(Try<T>(e));
512 }
513
514 template <class T>
515 Future<T> makeFuture(exception_wrapper ew) {
516   return makeFuture(Try<T>(std::move(ew)));
517 }
518
519 template <class T, class E>
520 typename std::enable_if<std::is_base_of<std::exception, E>::value,
521                         Future<T>>::type
522 makeFuture(E const& e) {
523   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
524 }
525
526 template <class T>
527 Future<T> makeFuture(Try<T>&& t) {
528   return Future<T>(new detail::Core<T>(std::move(t)));
529 }
530
531 // via
532 Future<Unit> via(Executor* executor, int8_t priority) {
533   return makeFuture().via(executor, priority);
534 }
535
536 // mapSetCallback calls func(i, Try<T>) when every future completes
537
538 template <class T, class InputIterator, class F>
539 void mapSetCallback(InputIterator first, InputIterator last, F func) {
540   for (size_t i = 0; first != last; ++first, ++i) {
541     first->setCallback_([func, i](Try<T>&& t) {
542       func(i, std::move(t));
543     });
544   }
545 }
546
547 // collectAll (variadic)
548
549 template <typename... Fs>
550 typename detail::CollectAllVariadicContext<
551   typename std::decay<Fs>::type::value_type...>::type
552 collectAll(Fs&&... fs) {
553   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
554     typename std::decay<Fs>::type::value_type...>>();
555   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
556     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
557   return ctx->p.getFuture();
558 }
559
560 // collectAll (iterator)
561
562 template <class InputIterator>
563 Future<
564   std::vector<
565   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
566 collectAll(InputIterator first, InputIterator last) {
567   typedef
568     typename std::iterator_traits<InputIterator>::value_type::value_type T;
569
570   struct CollectAllContext {
571     CollectAllContext(int n) : results(n) {}
572     ~CollectAllContext() {
573       p.setValue(std::move(results));
574     }
575     Promise<std::vector<Try<T>>> p;
576     std::vector<Try<T>> results;
577   };
578
579   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
580   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
581     ctx->results[i] = std::move(t);
582   });
583   return ctx->p.getFuture();
584 }
585
586 // collect (iterator)
587
588 namespace detail {
589
590 template <typename T>
591 struct CollectContext {
592   struct Nothing { explicit Nothing(int n) {} };
593
594   using Result = typename std::conditional<
595     std::is_void<T>::value,
596     void,
597     std::vector<T>>::type;
598
599   using InternalResult = typename std::conditional<
600     std::is_void<T>::value,
601     Nothing,
602     std::vector<Optional<T>>>::type;
603
604   explicit CollectContext(int n) : result(n) {}
605   ~CollectContext() {
606     if (!threw.exchange(true)) {
607       // map Optional<T> -> T
608       std::vector<T> finalResult;
609       finalResult.reserve(result.size());
610       std::transform(result.begin(), result.end(),
611                      std::back_inserter(finalResult),
612                      [](Optional<T>& o) { return std::move(o.value()); });
613       p.setValue(std::move(finalResult));
614     }
615   }
616   inline void setPartialResult(size_t i, Try<T>& t) {
617     result[i] = std::move(t.value());
618   }
619   Promise<Result> p;
620   InternalResult result;
621   std::atomic<bool> threw {false};
622 };
623
624 }
625
626 template <class InputIterator>
627 Future<typename detail::CollectContext<
628   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
629 collect(InputIterator first, InputIterator last) {
630   typedef
631     typename std::iterator_traits<InputIterator>::value_type::value_type T;
632
633   auto ctx = std::make_shared<detail::CollectContext<T>>(
634     std::distance(first, last));
635   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
636     if (t.hasException()) {
637        if (!ctx->threw.exchange(true)) {
638          ctx->p.setException(std::move(t.exception()));
639        }
640      } else if (!ctx->threw) {
641        ctx->setPartialResult(i, t);
642      }
643   });
644   return ctx->p.getFuture();
645 }
646
647 // collect (variadic)
648
649 template <typename... Fs>
650 typename detail::CollectVariadicContext<
651   typename std::decay<Fs>::type::value_type...>::type
652 collect(Fs&&... fs) {
653   auto ctx = std::make_shared<detail::CollectVariadicContext<
654     typename std::decay<Fs>::type::value_type...>>();
655   detail::collectVariadicHelper<detail::CollectVariadicContext>(
656     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
657   return ctx->p.getFuture();
658 }
659
660 // collectAny (iterator)
661
662 template <class InputIterator>
663 Future<
664   std::pair<size_t,
665             Try<
666               typename
667               std::iterator_traits<InputIterator>::value_type::value_type>>>
668 collectAny(InputIterator first, InputIterator last) {
669   typedef
670     typename std::iterator_traits<InputIterator>::value_type::value_type T;
671
672   struct CollectAnyContext {
673     CollectAnyContext() {};
674     Promise<std::pair<size_t, Try<T>>> p;
675     std::atomic<bool> done {false};
676   };
677
678   auto ctx = std::make_shared<CollectAnyContext>();
679   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
680     if (!ctx->done.exchange(true)) {
681       ctx->p.setValue(std::make_pair(i, std::move(t)));
682     }
683   });
684   return ctx->p.getFuture();
685 }
686
687 // collectN (iterator)
688
689 template <class InputIterator>
690 Future<std::vector<std::pair<size_t, Try<typename
691   std::iterator_traits<InputIterator>::value_type::value_type>>>>
692 collectN(InputIterator first, InputIterator last, size_t n) {
693   typedef typename
694     std::iterator_traits<InputIterator>::value_type::value_type T;
695   typedef std::vector<std::pair<size_t, Try<T>>> V;
696
697   struct CollectNContext {
698     V v;
699     std::atomic<size_t> completed = {0};
700     Promise<V> p;
701   };
702   auto ctx = std::make_shared<CollectNContext>();
703
704   if (size_t(std::distance(first, last)) < n) {
705     ctx->p.setException(std::runtime_error("Not enough futures"));
706   } else {
707     // for each completed Future, increase count and add to vector, until we
708     // have n completed futures at which point we fulfil our Promise with the
709     // vector
710     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
711       auto c = ++ctx->completed;
712       if (c <= n) {
713         assert(ctx->v.size() < n);
714         ctx->v.emplace_back(i, std::move(t));
715         if (c == n) {
716           ctx->p.setTry(Try<V>(std::move(ctx->v)));
717         }
718       }
719     });
720   }
721
722   return ctx->p.getFuture();
723 }
724
725 // reduce (iterator)
726
727 template <class It, class T, class F>
728 Future<T> reduce(It first, It last, T&& initial, F&& func) {
729   if (first == last) {
730     return makeFuture(std::move(initial));
731   }
732
733   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
734   typedef typename std::conditional<
735     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
736   typedef isTry<Arg> IsTry;
737
738   folly::MoveWrapper<T> minitial(std::move(initial));
739   auto sfunc = std::make_shared<F>(std::move(func));
740
741   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
742     return (*sfunc)(std::move(*minitial),
743                 head.template get<IsTry::value, Arg&&>());
744   });
745
746   for (++first; first != last; ++first) {
747     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
748       return (*sfunc)(std::move(std::get<0>(t).value()),
749                   // Either return a ItT&& or a Try<ItT>&& depending
750                   // on the type of the argument of func.
751                   std::get<1>(t).template get<IsTry::value, Arg&&>());
752     });
753   }
754
755   return f;
756 }
757
758 // window (collection)
759
760 template <class Collection, class F, class ItT, class Result>
761 std::vector<Future<Result>>
762 window(Collection input, F func, size_t n) {
763   struct WindowContext {
764     WindowContext(Collection&& i, F&& fn)
765         : input_(std::move(i)), promises_(input_.size()),
766           func_(std::move(fn))
767       {}
768     std::atomic<size_t> i_ {0};
769     Collection input_;
770     std::vector<Promise<Result>> promises_;
771     F func_;
772
773     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
774       size_t i = ctx->i_++;
775       if (i < ctx->input_.size()) {
776         // Using setCallback_ directly since we don't need the Future
777         ctx->func_(std::move(ctx->input_[i])).setCallback_(
778           // ctx is captured by value
779           [ctx, i](Try<Result>&& t) {
780             ctx->promises_[i].setTry(std::move(t));
781             // Chain another future onto this one
782             spawn(std::move(ctx));
783           });
784       }
785     }
786   };
787
788   auto max = std::min(n, input.size());
789
790   auto ctx = std::make_shared<WindowContext>(
791     std::move(input), std::move(func));
792
793   for (size_t i = 0; i < max; ++i) {
794     // Start the first n Futures
795     WindowContext::spawn(ctx);
796   }
797
798   std::vector<Future<Result>> futures;
799   futures.reserve(ctx->promises_.size());
800   for (auto& promise : ctx->promises_) {
801     futures.emplace_back(promise.getFuture());
802   }
803
804   return futures;
805 }
806
807 // reduce
808
809 template <class T>
810 template <class I, class F>
811 Future<I> Future<T>::reduce(I&& initial, F&& func) {
812   folly::MoveWrapper<I> minitial(std::move(initial));
813   folly::MoveWrapper<F> mfunc(std::move(func));
814   return then([minitial, mfunc](T& vals) mutable {
815     auto ret = std::move(*minitial);
816     for (auto& val : vals) {
817       ret = (*mfunc)(std::move(ret), std::move(val));
818     }
819     return ret;
820   });
821 }
822
823 // unorderedReduce (iterator)
824
825 template <class It, class T, class F, class ItT, class Arg>
826 Future<T> unorderedReduce(It first, It last, T initial, F func) {
827   if (first == last) {
828     return makeFuture(std::move(initial));
829   }
830
831   typedef isTry<Arg> IsTry;
832
833   struct UnorderedReduceContext {
834     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
835         : lock_(), memo_(makeFuture<T>(std::move(memo))),
836           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
837       {};
838     folly::MicroSpinLock lock_; // protects memo_ and numThens_
839     Future<T> memo_;
840     F func_;
841     size_t numThens_; // how many Futures completed and called .then()
842     size_t numFutures_; // how many Futures in total
843     Promise<T> promise_;
844   };
845
846   auto ctx = std::make_shared<UnorderedReduceContext>(
847     std::move(initial), std::move(func), std::distance(first, last));
848
849   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
850     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
851     // Futures can be completed in any order, simultaneously.
852     // To make this non-blocking, we create a new Future chain in
853     // the order of completion to reduce the values.
854     // The spinlock just protects chaining a new Future, not actually
855     // executing the reduce, which should be really fast.
856     folly::MSLGuard lock(ctx->lock_);
857     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
858       // Either return a ItT&& or a Try<ItT>&& depending
859       // on the type of the argument of func.
860       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
861     });
862     if (++ctx->numThens_ == ctx->numFutures_) {
863       // After reducing the value of the last Future, fulfill the Promise
864       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
865         ctx->promise_.setValue(std::move(t2));
866       });
867     }
868   });
869
870   return ctx->promise_.getFuture();
871 }
872
873 // within
874
875 template <class T>
876 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
877   return within(dur, TimedOut(), tk);
878 }
879
880 template <class T>
881 template <class E>
882 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
883
884   struct Context {
885     Context(E ex) : exception(std::move(ex)), promise() {}
886     E exception;
887     Future<Unit> thisFuture;
888     Promise<T> promise;
889     std::atomic<bool> token {false};
890   };
891
892   if (!tk) {
893     tk = folly::detail::getTimekeeperSingleton();
894   }
895
896   auto ctx = std::make_shared<Context>(std::move(e));
897
898   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
899     // TODO: "this" completed first, cancel "after"
900     if (ctx->token.exchange(true) == false) {
901       ctx->promise.setTry(std::move(t));
902     }
903   });
904
905   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
906     // "after" completed first, cancel "this"
907     ctx->thisFuture.raise(TimedOut());
908     if (ctx->token.exchange(true) == false) {
909       if (t.hasException()) {
910         ctx->promise.setException(std::move(t.exception()));
911       } else {
912         ctx->promise.setException(std::move(ctx->exception));
913       }
914     }
915   });
916
917   return ctx->promise.getFuture().via(getExecutor());
918 }
919
920 // delayed
921
922 template <class T>
923 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
924   return collectAll(*this, futures::sleep(dur, tk))
925     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
926       Try<T>& t = std::get<0>(tup);
927       return makeFuture<T>(std::move(t));
928     });
929 }
930
931 namespace detail {
932
933 template <class T>
934 void waitImpl(Future<T>& f) {
935   // short-circuit if there's nothing to do
936   if (f.isReady()) return;
937
938   folly::fibers::Baton baton;
939   f.setCallback_([&](const Try<T>& t) { baton.post(); });
940   baton.wait();
941   assert(f.isReady());
942 }
943
944 template <class T>
945 void waitImpl(Future<T>& f, Duration dur) {
946   // short-circuit if there's nothing to do
947   if (f.isReady()) return;
948
949   folly::MoveWrapper<Promise<T>> promise;
950   auto ret = promise->getFuture();
951   auto baton = std::make_shared<folly::fibers::Baton>();
952   f.setCallback_([baton, promise](Try<T>&& t) mutable {
953     promise->setTry(std::move(t));
954     baton->post();
955   });
956   f = std::move(ret);
957   if (baton->timed_wait(dur)) {
958     assert(f.isReady());
959   }
960 }
961
962 template <class T>
963 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
964   while (!f.isReady()) {
965     e->drive();
966   }
967 }
968
969 } // detail
970
971 template <class T>
972 Future<T>& Future<T>::wait() & {
973   detail::waitImpl(*this);
974   return *this;
975 }
976
977 template <class T>
978 Future<T>&& Future<T>::wait() && {
979   detail::waitImpl(*this);
980   return std::move(*this);
981 }
982
983 template <class T>
984 Future<T>& Future<T>::wait(Duration dur) & {
985   detail::waitImpl(*this, dur);
986   return *this;
987 }
988
989 template <class T>
990 Future<T>&& Future<T>::wait(Duration dur) && {
991   detail::waitImpl(*this, dur);
992   return std::move(*this);
993 }
994
995 template <class T>
996 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
997   detail::waitViaImpl(*this, e);
998   return *this;
999 }
1000
1001 template <class T>
1002 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1003   detail::waitViaImpl(*this, e);
1004   return std::move(*this);
1005 }
1006
1007 template <class T>
1008 T Future<T>::get() {
1009   return std::move(wait().value());
1010 }
1011
1012 template <class T>
1013 T Future<T>::get(Duration dur) {
1014   wait(dur);
1015   if (isReady()) {
1016     return std::move(value());
1017   } else {
1018     throw TimedOut();
1019   }
1020 }
1021
1022 template <class T>
1023 T Future<T>::getVia(DrivableExecutor* e) {
1024   return std::move(waitVia(e).value());
1025 }
1026
1027 namespace detail {
1028   template <class T>
1029   struct TryEquals {
1030     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1031       return t1.value() == t2.value();
1032     }
1033   };
1034 }
1035
1036 template <class T>
1037 Future<bool> Future<T>::willEqual(Future<T>& f) {
1038   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1039     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1040       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1041     } else {
1042       return false;
1043       }
1044   });
1045 }
1046
1047 template <class T>
1048 template <class F>
1049 Future<T> Future<T>::filter(F predicate) {
1050   auto p = folly::makeMoveWrapper(std::move(predicate));
1051   return this->then([p](T val) {
1052     T const& valConstRef = val;
1053     if (!(*p)(valConstRef)) {
1054       throw PredicateDoesNotObtain();
1055     }
1056     return val;
1057   });
1058 }
1059
1060 template <class T>
1061 template <class Callback>
1062 auto Future<T>::thenMulti(Callback&& fn)
1063     -> decltype(this->then(std::forward<Callback>(fn))) {
1064   // thenMulti with one callback is just a then
1065   return then(std::forward<Callback>(fn));
1066 }
1067
1068 template <class T>
1069 template <class Callback, class... Callbacks>
1070 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1071     -> decltype(this->then(std::forward<Callback>(fn)).
1072                       thenMulti(std::forward<Callbacks>(fns)...)) {
1073   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1074   return then(std::forward<Callback>(fn)).
1075          thenMulti(std::forward<Callbacks>(fns)...);
1076 }
1077
1078 template <class T>
1079 template <class Callback, class... Callbacks>
1080 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1081                                       Callbacks&&... fns)
1082     -> decltype(this->then(std::forward<Callback>(fn)).
1083                       thenMulti(std::forward<Callbacks>(fns)...)) {
1084   // thenMultiExecutor with two callbacks is
1085   // via(x).then(a).thenMulti(b, ...).via(oldX)
1086   auto oldX = getExecutor();
1087   setExecutor(x);
1088   return then(std::forward<Callback>(fn)).
1089          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1090 }
1091
1092 template <class T>
1093 template <class Callback>
1094 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1095     -> decltype(this->then(std::forward<Callback>(fn))) {
1096   // thenMulti with one callback is just a then with an executor
1097   return then(x, std::forward<Callback>(fn));
1098 }
1099
1100 template <class F>
1101 inline Future<Unit> when(bool p, F thunk) {
1102   return p ? thunk().unit() : makeFuture();
1103 }
1104
1105 template <class P, class F>
1106 Future<Unit> whileDo(P predicate, F thunk) {
1107   if (predicate()) {
1108     return thunk().then([=] {
1109       return whileDo(predicate, thunk);
1110     });
1111   }
1112   return makeFuture();
1113 }
1114
1115 template <class F>
1116 Future<Unit> times(const int n, F thunk) {
1117   auto count = folly::makeMoveWrapper(
1118     std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
1119   );
1120   return folly::whileDo([=]() mutable {
1121       return (*count)->fetch_add(1) < n;
1122     }, thunk);
1123 }
1124
1125 namespace futures {
1126   template <class It, class F, class ItT, class Result>
1127   std::vector<Future<Result>> map(It first, It last, F func) {
1128     std::vector<Future<Result>> results;
1129     for (auto it = first; it != last; it++) {
1130       results.push_back(it->then(func));
1131     }
1132     return results;
1133   }
1134 }
1135
1136 namespace futures {
1137
1138 namespace detail {
1139
1140 struct retrying_policy_raw_tag {};
1141 struct retrying_policy_fut_tag {};
1142
1143 template <class Policy>
1144 struct retrying_policy_traits {
1145   using ew = exception_wrapper;
1146   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1147   template <class Ret>
1148   using has_op = typename std::integral_constant<bool,
1149         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1150         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1151   using is_raw = has_op<bool>;
1152   using is_fut = has_op<Future<bool>>;
1153   using tag = typename std::conditional<
1154         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1155         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1156 };
1157
1158 template <class Policy, class FF>
1159 typename std::result_of<FF(size_t)>::type
1160 retrying(size_t k, Policy&& p, FF&& ff) {
1161   using F = typename std::result_of<FF(size_t)>::type;
1162   using T = typename F::value_type;
1163   auto f = ff(k++);
1164   auto pm = makeMoveWrapper(p);
1165   auto ffm = makeMoveWrapper(ff);
1166   return f.onError([=](exception_wrapper x) mutable {
1167       auto q = (*pm)(k, x);
1168       auto xm = makeMoveWrapper(std::move(x));
1169       return q.then([=](bool r) mutable {
1170           return r
1171             ? retrying(k, pm.move(), ffm.move())
1172             : makeFuture<T>(xm.move());
1173       });
1174   });
1175 }
1176
1177 template <class Policy, class FF>
1178 typename std::result_of<FF(size_t)>::type
1179 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1180   auto pm = makeMoveWrapper(std::move(p));
1181   auto q = [=](size_t k, exception_wrapper x) {
1182     return makeFuture<bool>((*pm)(k, x));
1183   };
1184   return retrying(0, std::move(q), std::forward<FF>(ff));
1185 }
1186
1187 template <class Policy, class FF>
1188 typename std::result_of<FF(size_t)>::type
1189 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1190   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1191 }
1192
1193 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1194 template <class URNG>
1195 Duration retryingJitteredExponentialBackoffDur(
1196     size_t n,
1197     Duration backoff_min,
1198     Duration backoff_max,
1199     double jitter_param,
1200     URNG& rng) {
1201   using d = Duration;
1202   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1203   auto jitter = std::exp(dist(rng));
1204   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1205   return std::max(backoff_min, std::min(backoff_max, backoff));
1206 }
1207
1208 template <class Policy, class URNG>
1209 std::function<Future<bool>(size_t, const exception_wrapper&)>
1210 retryingPolicyCappedJitteredExponentialBackoff(
1211     size_t max_tries,
1212     Duration backoff_min,
1213     Duration backoff_max,
1214     double jitter_param,
1215     URNG rng,
1216     Policy&& p) {
1217   auto pm = makeMoveWrapper(std::move(p));
1218   auto rngp = std::make_shared<URNG>(std::move(rng));
1219   return [=](size_t n, const exception_wrapper& ex) mutable {
1220     if (n == max_tries) { return makeFuture(false); }
1221     return (*pm)(n, ex).then([=](bool v) {
1222         if (!v) { return makeFuture(false); }
1223         auto backoff = detail::retryingJitteredExponentialBackoffDur(
1224             n, backoff_min, backoff_max, jitter_param, *rngp);
1225         return futures::sleep(backoff).then([] { return true; });
1226     });
1227   };
1228 }
1229
1230 template <class Policy, class URNG>
1231 std::function<Future<bool>(size_t, const exception_wrapper&)>
1232 retryingPolicyCappedJitteredExponentialBackoff(
1233     size_t max_tries,
1234     Duration backoff_min,
1235     Duration backoff_max,
1236     double jitter_param,
1237     URNG rng,
1238     Policy&& p,
1239     retrying_policy_raw_tag) {
1240   auto pm = makeMoveWrapper(std::move(p));
1241   auto q = [=](size_t n, const exception_wrapper& e) {
1242     return makeFuture((*pm)(n, e));
1243   };
1244   return retryingPolicyCappedJitteredExponentialBackoff(
1245       max_tries,
1246       backoff_min,
1247       backoff_max,
1248       jitter_param,
1249       std::move(rng),
1250       std::move(q));
1251 }
1252
1253 template <class Policy, class URNG>
1254 std::function<Future<bool>(size_t, const exception_wrapper&)>
1255 retryingPolicyCappedJitteredExponentialBackoff(
1256     size_t max_tries,
1257     Duration backoff_min,
1258     Duration backoff_max,
1259     double jitter_param,
1260     URNG rng,
1261     Policy&& p,
1262     retrying_policy_fut_tag) {
1263   return retryingPolicyCappedJitteredExponentialBackoff(
1264       max_tries,
1265       backoff_min,
1266       backoff_max,
1267       jitter_param,
1268       std::move(rng),
1269       std::move(p));
1270 }
1271
1272 }
1273
1274 template <class Policy, class FF>
1275 typename std::result_of<FF(size_t)>::type
1276 retrying(Policy&& p, FF&& ff) {
1277   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1278   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1279 }
1280
1281 inline
1282 std::function<bool(size_t, const exception_wrapper&)>
1283 retryingPolicyBasic(
1284     size_t max_tries) {
1285   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1286 }
1287
1288 template <class Policy, class URNG>
1289 std::function<Future<bool>(size_t, const exception_wrapper&)>
1290 retryingPolicyCappedJitteredExponentialBackoff(
1291     size_t max_tries,
1292     Duration backoff_min,
1293     Duration backoff_max,
1294     double jitter_param,
1295     URNG rng,
1296     Policy&& p) {
1297   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1298   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1299       max_tries,
1300       backoff_min,
1301       backoff_max,
1302       jitter_param,
1303       std::move(rng),
1304       std::move(p),
1305       tag());
1306 }
1307
1308 inline
1309 std::function<Future<bool>(size_t, const exception_wrapper&)>
1310 retryingPolicyCappedJitteredExponentialBackoff(
1311     size_t max_tries,
1312     Duration backoff_min,
1313     Duration backoff_max,
1314     double jitter_param) {
1315   auto p = [](size_t, const exception_wrapper&) { return true; };
1316   return retryingPolicyCappedJitteredExponentialBackoff(
1317       max_tries,
1318       backoff_min,
1319       backoff_max,
1320       jitter_param,
1321       ThreadLocalPRNG(),
1322       std::move(p));
1323 }
1324
1325 }
1326
1327 // Instantiate the most common Future types to save compile time
1328 extern template class Future<Unit>;
1329 extern template class Future<bool>;
1330 extern template class Future<int>;
1331 extern template class Future<int64_t>;
1332 extern template class Future<std::string>;
1333 extern template class Future<double>;
1334
1335 } // namespace folly