futures::retrying.
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <chrono>
21 #include <random>
22 #include <thread>
23
24 #include <folly/experimental/fibers/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 namespace folly {
32
33 class Timekeeper;
34
35 namespace detail {
36   Timekeeper* getTimekeeperSingleton();
37 }
38
39 template <class T>
40 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
41   other.core_ = nullptr;
42 }
43
44 template <class T>
45 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
46   std::swap(core_, other.core_);
47   return *this;
48 }
49
50 template <class T>
51 template <class T2, typename>
52 Future<T>::Future(T2&& val)
53   : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
54
55 template <class T>
56 template <typename, typename>
57 Future<T>::Future()
58   : core_(new detail::Core<T>(Try<T>(T()))) {}
59
60 template <class T>
61 Future<T>::~Future() {
62   detach();
63 }
64
65 template <class T>
66 void Future<T>::detach() {
67   if (core_) {
68     core_->detachFuture();
69     core_ = nullptr;
70   }
71 }
72
73 template <class T>
74 void Future<T>::throwIfInvalid() const {
75   if (!core_)
76     throw NoState();
77 }
78
79 template <class T>
80 template <class F>
81 void Future<T>::setCallback_(F&& func) {
82   throwIfInvalid();
83   core_->setCallback(std::move(func));
84 }
85
86 // unwrap
87
88 template <class T>
89 template <class F>
90 typename std::enable_if<isFuture<F>::value,
91                         Future<typename isFuture<T>::Inner>>::type
92 Future<T>::unwrap() {
93   return then([](Future<typename isFuture<T>::Inner> internal_future) {
94       return internal_future;
95   });
96 }
97
98 // then
99
100 // Variant: returns a value
101 // e.g. f.then([](Try<T>&& t){ return t.value(); });
102 template <class T>
103 template <typename F, typename R, bool isTry, typename... Args>
104 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
105 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
106   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
107   typedef typename R::ReturnsFuture::Inner B;
108
109   throwIfInvalid();
110
111   // wrap these so we can move them into the lambda
112   folly::MoveWrapper<Promise<B>> p;
113   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
114   folly::MoveWrapper<F> funcm(std::forward<F>(func));
115
116   // grab the Future now before we lose our handle on the Promise
117   auto f = p->getFuture();
118   f.core_->setExecutorNoLock(getExecutor());
119
120   /* This is a bit tricky.
121
122      We can't just close over *this in case this Future gets moved. So we
123      make a new dummy Future. We could figure out something more
124      sophisticated that avoids making a new Future object when it can, as an
125      optimization. But this is correct.
126
127      core_ can't be moved, it is explicitly disallowed (as is copying). But
128      if there's ever a reason to allow it, this is one place that makes that
129      assumption and would need to be fixed. We use a standard shared pointer
130      for core_ (by copying it in), which means in essence obj holds a shared
131      pointer to itself.  But this shouldn't leak because Promise will not
132      outlive the continuation, because Promise will setException() with a
133      broken Promise if it is destructed before completed. We could use a
134      weak pointer but it would have to be converted to a shared pointer when
135      func is executed (because the Future returned by func may possibly
136      persist beyond the callback, if it gets moved), and so it is an
137      optimization to just make it shared from the get-go.
138
139      We have to move in the Promise and func using the MoveWrapper
140      hack. (func could be copied but it's a big drag on perf).
141
142      Two subtle but important points about this design. detail::Core has no
143      back pointers to Future or Promise, so if Future or Promise get moved
144      (and they will be moved in performant code) we don't have to do
145      anything fancy. And because we store the continuation in the
146      detail::Core, not in the Future, we can execute the continuation even
147      after the Future has gone out of scope. This is an intentional design
148      decision. It is likely we will want to be able to cancel a continuation
149      in some circumstances, but I think it should be explicit not implicit
150      in the destruction of the Future used to create it.
151      */
152   setCallback_(
153     [p, funcm](Try<T>&& t) mutable {
154       if (!isTry && t.hasException()) {
155         p->setException(std::move(t.exception()));
156       } else {
157         p->setWith([&]() {
158           return (*funcm)(t.template get<isTry, Args>()...);
159         });
160       }
161     });
162
163   return f;
164 }
165
166 // Variant: returns a Future
167 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
168 template <class T>
169 template <typename F, typename R, bool isTry, typename... Args>
170 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
171 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
172   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
173   typedef typename R::ReturnsFuture::Inner B;
174
175   throwIfInvalid();
176
177   // wrap these so we can move them into the lambda
178   folly::MoveWrapper<Promise<B>> p;
179   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
180   folly::MoveWrapper<F> funcm(std::forward<F>(func));
181
182   // grab the Future now before we lose our handle on the Promise
183   auto f = p->getFuture();
184   f.core_->setExecutorNoLock(getExecutor());
185
186   setCallback_(
187     [p, funcm](Try<T>&& t) mutable {
188       if (!isTry && t.hasException()) {
189         p->setException(std::move(t.exception()));
190       } else {
191         try {
192           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
193           // that didn't throw, now we can steal p
194           f2.setCallback_([p](Try<B>&& b) mutable {
195             p->setTry(std::move(b));
196           });
197         } catch (const std::exception& e) {
198           p->setException(exception_wrapper(std::current_exception(), e));
199         } catch (...) {
200           p->setException(exception_wrapper(std::current_exception()));
201         }
202       }
203     });
204
205   return f;
206 }
207
208 template <typename T>
209 template <typename R, typename Caller, typename... Args>
210   Future<typename isFuture<R>::Inner>
211 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
212   typedef typename std::remove_cv<
213     typename std::remove_reference<
214       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
215   return then([instance, func](Try<T>&& t){
216     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
217   });
218 }
219
220 template <class T>
221 template <class Executor, class Arg, class... Args>
222 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
223   -> decltype(this->then(std::forward<Arg>(arg),
224                          std::forward<Args>(args)...))
225 {
226   auto oldX = getExecutor();
227   setExecutor(x);
228   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
229                via(oldX);
230 }
231
232 template <class T>
233 Future<Unit> Future<T>::then() {
234   return then([] () {});
235 }
236
237 // onError where the callback returns T
238 template <class T>
239 template <class F>
240 typename std::enable_if<
241   !detail::callableWith<F, exception_wrapper>::value &&
242   !detail::Extract<F>::ReturnsFuture::value,
243   Future<T>>::type
244 Future<T>::onError(F&& func) {
245   typedef typename detail::Extract<F>::FirstArg Exn;
246   static_assert(
247       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
248       "Return type of onError callback must be T or Future<T>");
249
250   Promise<T> p;
251   auto f = p.getFuture();
252   auto pm = folly::makeMoveWrapper(std::move(p));
253   auto funcm = folly::makeMoveWrapper(std::move(func));
254   setCallback_([pm, funcm](Try<T>&& t) mutable {
255     if (!t.template withException<Exn>([&] (Exn& e) {
256           pm->setWith([&]{
257             return (*funcm)(e);
258           });
259         })) {
260       pm->setTry(std::move(t));
261     }
262   });
263
264   return f;
265 }
266
267 // onError where the callback returns Future<T>
268 template <class T>
269 template <class F>
270 typename std::enable_if<
271   !detail::callableWith<F, exception_wrapper>::value &&
272   detail::Extract<F>::ReturnsFuture::value,
273   Future<T>>::type
274 Future<T>::onError(F&& func) {
275   static_assert(
276       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
277       "Return type of onError callback must be T or Future<T>");
278   typedef typename detail::Extract<F>::FirstArg Exn;
279
280   Promise<T> p;
281   auto f = p.getFuture();
282   auto pm = folly::makeMoveWrapper(std::move(p));
283   auto funcm = folly::makeMoveWrapper(std::move(func));
284   setCallback_([pm, funcm](Try<T>&& t) mutable {
285     if (!t.template withException<Exn>([&] (Exn& e) {
286           try {
287             auto f2 = (*funcm)(e);
288             f2.setCallback_([pm](Try<T>&& t2) mutable {
289               pm->setTry(std::move(t2));
290             });
291           } catch (const std::exception& e2) {
292             pm->setException(exception_wrapper(std::current_exception(), e2));
293           } catch (...) {
294             pm->setException(exception_wrapper(std::current_exception()));
295           }
296         })) {
297       pm->setTry(std::move(t));
298     }
299   });
300
301   return f;
302 }
303
304 template <class T>
305 template <class F>
306 Future<T> Future<T>::ensure(F func) {
307   MoveWrapper<F> funcw(std::move(func));
308   return this->then([funcw](Try<T>&& t) mutable {
309     (*funcw)();
310     return makeFuture(std::move(t));
311   });
312 }
313
314 template <class T>
315 template <class F>
316 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
317   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
318   return within(dur, tk)
319     .onError([funcw](TimedOut const&) { return (*funcw)(); });
320 }
321
322 template <class T>
323 template <class F>
324 typename std::enable_if<
325   detail::callableWith<F, exception_wrapper>::value &&
326   detail::Extract<F>::ReturnsFuture::value,
327   Future<T>>::type
328 Future<T>::onError(F&& func) {
329   static_assert(
330       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
331       "Return type of onError callback must be T or Future<T>");
332
333   Promise<T> p;
334   auto f = p.getFuture();
335   auto pm = folly::makeMoveWrapper(std::move(p));
336   auto funcm = folly::makeMoveWrapper(std::move(func));
337   setCallback_([pm, funcm](Try<T> t) mutable {
338     if (t.hasException()) {
339       try {
340         auto f2 = (*funcm)(std::move(t.exception()));
341         f2.setCallback_([pm](Try<T> t2) mutable {
342           pm->setTry(std::move(t2));
343         });
344       } catch (const std::exception& e2) {
345         pm->setException(exception_wrapper(std::current_exception(), e2));
346       } catch (...) {
347         pm->setException(exception_wrapper(std::current_exception()));
348       }
349     } else {
350       pm->setTry(std::move(t));
351     }
352   });
353
354   return f;
355 }
356
357 // onError(exception_wrapper) that returns T
358 template <class T>
359 template <class F>
360 typename std::enable_if<
361   detail::callableWith<F, exception_wrapper>::value &&
362   !detail::Extract<F>::ReturnsFuture::value,
363   Future<T>>::type
364 Future<T>::onError(F&& func) {
365   static_assert(
366       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
367       "Return type of onError callback must be T or Future<T>");
368
369   Promise<T> p;
370   auto f = p.getFuture();
371   auto pm = folly::makeMoveWrapper(std::move(p));
372   auto funcm = folly::makeMoveWrapper(std::move(func));
373   setCallback_([pm, funcm](Try<T> t) mutable {
374     if (t.hasException()) {
375       pm->setWith([&]{
376         return (*funcm)(std::move(t.exception()));
377       });
378     } else {
379       pm->setTry(std::move(t));
380     }
381   });
382
383   return f;
384 }
385
386 template <class T>
387 typename std::add_lvalue_reference<T>::type Future<T>::value() {
388   throwIfInvalid();
389
390   return core_->getTry().value();
391 }
392
393 template <class T>
394 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
395   throwIfInvalid();
396
397   return core_->getTry().value();
398 }
399
400 template <class T>
401 Try<T>& Future<T>::getTry() {
402   throwIfInvalid();
403
404   return core_->getTry();
405 }
406
407 template <class T>
408 Optional<Try<T>> Future<T>::poll() {
409   Optional<Try<T>> o;
410   if (core_->ready()) {
411     o = std::move(core_->getTry());
412   }
413   return o;
414 }
415
416 template <class T>
417 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
418   throwIfInvalid();
419
420   setExecutor(executor, priority);
421
422   return std::move(*this);
423 }
424
425 template <class T>
426 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
427   throwIfInvalid();
428
429   MoveWrapper<Promise<T>> p;
430   auto f = p->getFuture();
431   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
432   return std::move(f).via(executor, priority);
433 }
434
435
436 template <class Func>
437 auto via(Executor* x, Func func)
438   -> Future<typename isFuture<decltype(func())>::Inner>
439 {
440   // TODO make this actually more performant. :-P #7260175
441   return via(x).then(func);
442 }
443
444 template <class T>
445 bool Future<T>::isReady() const {
446   throwIfInvalid();
447   return core_->ready();
448 }
449
450 template <class T>
451 bool Future<T>::hasValue() {
452   return getTry().hasValue();
453 }
454
455 template <class T>
456 bool Future<T>::hasException() {
457   return getTry().hasException();
458 }
459
460 template <class T>
461 void Future<T>::raise(exception_wrapper exception) {
462   core_->raise(std::move(exception));
463 }
464
465 // makeFuture
466
467 template <class T>
468 Future<typename std::decay<T>::type> makeFuture(T&& t) {
469   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
470 }
471
472 inline // for multiple translation units
473 Future<Unit> makeFuture() {
474   return makeFuture(Unit{});
475 }
476
477 template <class F>
478 auto makeFutureWith(F&& func)
479     -> Future<typename Unit::Lift<decltype(func())>::type> {
480   using LiftedResult = typename Unit::Lift<decltype(func())>::type;
481   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
482     return func();
483   }));
484 }
485
486 template <class T>
487 Future<T> makeFuture(std::exception_ptr const& e) {
488   return makeFuture(Try<T>(e));
489 }
490
491 template <class T>
492 Future<T> makeFuture(exception_wrapper ew) {
493   return makeFuture(Try<T>(std::move(ew)));
494 }
495
496 template <class T, class E>
497 typename std::enable_if<std::is_base_of<std::exception, E>::value,
498                         Future<T>>::type
499 makeFuture(E const& e) {
500   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
501 }
502
503 template <class T>
504 Future<T> makeFuture(Try<T>&& t) {
505   return Future<T>(new detail::Core<T>(std::move(t)));
506 }
507
508 // via
509 Future<Unit> via(Executor* executor, int8_t priority) {
510   return makeFuture().via(executor, priority);
511 }
512
513 // mapSetCallback calls func(i, Try<T>) when every future completes
514
515 template <class T, class InputIterator, class F>
516 void mapSetCallback(InputIterator first, InputIterator last, F func) {
517   for (size_t i = 0; first != last; ++first, ++i) {
518     first->setCallback_([func, i](Try<T>&& t) {
519       func(i, std::move(t));
520     });
521   }
522 }
523
524 // collectAll (variadic)
525
526 template <typename... Fs>
527 typename detail::CollectAllVariadicContext<
528   typename std::decay<Fs>::type::value_type...>::type
529 collectAll(Fs&&... fs) {
530   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
531     typename std::decay<Fs>::type::value_type...>>();
532   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
533     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
534   return ctx->p.getFuture();
535 }
536
537 // collectAll (iterator)
538
539 template <class InputIterator>
540 Future<
541   std::vector<
542   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
543 collectAll(InputIterator first, InputIterator last) {
544   typedef
545     typename std::iterator_traits<InputIterator>::value_type::value_type T;
546
547   struct CollectAllContext {
548     CollectAllContext(int n) : results(n) {}
549     ~CollectAllContext() {
550       p.setValue(std::move(results));
551     }
552     Promise<std::vector<Try<T>>> p;
553     std::vector<Try<T>> results;
554   };
555
556   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
557   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
558     ctx->results[i] = std::move(t);
559   });
560   return ctx->p.getFuture();
561 }
562
563 // collect (iterator)
564
565 namespace detail {
566
567 template <typename T>
568 struct CollectContext {
569   struct Nothing { explicit Nothing(int n) {} };
570
571   using Result = typename std::conditional<
572     std::is_void<T>::value,
573     void,
574     std::vector<T>>::type;
575
576   using InternalResult = typename std::conditional<
577     std::is_void<T>::value,
578     Nothing,
579     std::vector<Optional<T>>>::type;
580
581   explicit CollectContext(int n) : result(n) {}
582   ~CollectContext() {
583     if (!threw.exchange(true)) {
584       // map Optional<T> -> T
585       std::vector<T> finalResult;
586       finalResult.reserve(result.size());
587       std::transform(result.begin(), result.end(),
588                      std::back_inserter(finalResult),
589                      [](Optional<T>& o) { return std::move(o.value()); });
590       p.setValue(std::move(finalResult));
591     }
592   }
593   inline void setPartialResult(size_t i, Try<T>& t) {
594     result[i] = std::move(t.value());
595   }
596   Promise<Result> p;
597   InternalResult result;
598   std::atomic<bool> threw {false};
599 };
600
601 }
602
603 template <class InputIterator>
604 Future<typename detail::CollectContext<
605   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
606 collect(InputIterator first, InputIterator last) {
607   typedef
608     typename std::iterator_traits<InputIterator>::value_type::value_type T;
609
610   auto ctx = std::make_shared<detail::CollectContext<T>>(
611     std::distance(first, last));
612   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
613     if (t.hasException()) {
614        if (!ctx->threw.exchange(true)) {
615          ctx->p.setException(std::move(t.exception()));
616        }
617      } else if (!ctx->threw) {
618        ctx->setPartialResult(i, t);
619      }
620   });
621   return ctx->p.getFuture();
622 }
623
624 // collect (variadic)
625
626 template <typename... Fs>
627 typename detail::CollectVariadicContext<
628   typename std::decay<Fs>::type::value_type...>::type
629 collect(Fs&&... fs) {
630   auto ctx = std::make_shared<detail::CollectVariadicContext<
631     typename std::decay<Fs>::type::value_type...>>();
632   detail::collectVariadicHelper<detail::CollectVariadicContext>(
633     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
634   return ctx->p.getFuture();
635 }
636
637 // collectAny (iterator)
638
639 template <class InputIterator>
640 Future<
641   std::pair<size_t,
642             Try<
643               typename
644               std::iterator_traits<InputIterator>::value_type::value_type>>>
645 collectAny(InputIterator first, InputIterator last) {
646   typedef
647     typename std::iterator_traits<InputIterator>::value_type::value_type T;
648
649   struct CollectAnyContext {
650     CollectAnyContext() {};
651     Promise<std::pair<size_t, Try<T>>> p;
652     std::atomic<bool> done {false};
653   };
654
655   auto ctx = std::make_shared<CollectAnyContext>();
656   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
657     if (!ctx->done.exchange(true)) {
658       ctx->p.setValue(std::make_pair(i, std::move(t)));
659     }
660   });
661   return ctx->p.getFuture();
662 }
663
664 // collectN (iterator)
665
666 template <class InputIterator>
667 Future<std::vector<std::pair<size_t, Try<typename
668   std::iterator_traits<InputIterator>::value_type::value_type>>>>
669 collectN(InputIterator first, InputIterator last, size_t n) {
670   typedef typename
671     std::iterator_traits<InputIterator>::value_type::value_type T;
672   typedef std::vector<std::pair<size_t, Try<T>>> V;
673
674   struct CollectNContext {
675     V v;
676     std::atomic<size_t> completed = {0};
677     Promise<V> p;
678   };
679   auto ctx = std::make_shared<CollectNContext>();
680
681   if (size_t(std::distance(first, last)) < n) {
682     ctx->p.setException(std::runtime_error("Not enough futures"));
683   } else {
684     // for each completed Future, increase count and add to vector, until we
685     // have n completed futures at which point we fulfil our Promise with the
686     // vector
687     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
688       auto c = ++ctx->completed;
689       if (c <= n) {
690         assert(ctx->v.size() < n);
691         ctx->v.emplace_back(i, std::move(t));
692         if (c == n) {
693           ctx->p.setTry(Try<V>(std::move(ctx->v)));
694         }
695       }
696     });
697   }
698
699   return ctx->p.getFuture();
700 }
701
702 // reduce (iterator)
703
704 template <class It, class T, class F>
705 Future<T> reduce(It first, It last, T&& initial, F&& func) {
706   if (first == last) {
707     return makeFuture(std::move(initial));
708   }
709
710   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
711   typedef typename std::conditional<
712     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
713   typedef isTry<Arg> IsTry;
714
715   folly::MoveWrapper<T> minitial(std::move(initial));
716   auto sfunc = std::make_shared<F>(std::move(func));
717
718   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
719     return (*sfunc)(std::move(*minitial),
720                 head.template get<IsTry::value, Arg&&>());
721   });
722
723   for (++first; first != last; ++first) {
724     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
725       return (*sfunc)(std::move(std::get<0>(t).value()),
726                   // Either return a ItT&& or a Try<ItT>&& depending
727                   // on the type of the argument of func.
728                   std::get<1>(t).template get<IsTry::value, Arg&&>());
729     });
730   }
731
732   return f;
733 }
734
735 // window (collection)
736
737 template <class Collection, class F, class ItT, class Result>
738 std::vector<Future<Result>>
739 window(Collection input, F func, size_t n) {
740   struct WindowContext {
741     WindowContext(Collection&& i, F&& fn)
742         : input_(std::move(i)), promises_(input_.size()),
743           func_(std::move(fn))
744       {}
745     std::atomic<size_t> i_ {0};
746     Collection input_;
747     std::vector<Promise<Result>> promises_;
748     F func_;
749
750     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
751       size_t i = ctx->i_++;
752       if (i < ctx->input_.size()) {
753         // Using setCallback_ directly since we don't need the Future
754         ctx->func_(std::move(ctx->input_[i])).setCallback_(
755           // ctx is captured by value
756           [ctx, i](Try<Result>&& t) {
757             ctx->promises_[i].setTry(std::move(t));
758             // Chain another future onto this one
759             spawn(std::move(ctx));
760           });
761       }
762     }
763   };
764
765   auto max = std::min(n, input.size());
766
767   auto ctx = std::make_shared<WindowContext>(
768     std::move(input), std::move(func));
769
770   for (size_t i = 0; i < max; ++i) {
771     // Start the first n Futures
772     WindowContext::spawn(ctx);
773   }
774
775   std::vector<Future<Result>> futures;
776   futures.reserve(ctx->promises_.size());
777   for (auto& promise : ctx->promises_) {
778     futures.emplace_back(promise.getFuture());
779   }
780
781   return futures;
782 }
783
784 // reduce
785
786 template <class T>
787 template <class I, class F>
788 Future<I> Future<T>::reduce(I&& initial, F&& func) {
789   folly::MoveWrapper<I> minitial(std::move(initial));
790   folly::MoveWrapper<F> mfunc(std::move(func));
791   return then([minitial, mfunc](T& vals) mutable {
792     auto ret = std::move(*minitial);
793     for (auto& val : vals) {
794       ret = (*mfunc)(std::move(ret), std::move(val));
795     }
796     return ret;
797   });
798 }
799
800 // unorderedReduce (iterator)
801
802 template <class It, class T, class F, class ItT, class Arg>
803 Future<T> unorderedReduce(It first, It last, T initial, F func) {
804   if (first == last) {
805     return makeFuture(std::move(initial));
806   }
807
808   typedef isTry<Arg> IsTry;
809
810   struct UnorderedReduceContext {
811     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
812         : lock_(), memo_(makeFuture<T>(std::move(memo))),
813           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
814       {};
815     folly::MicroSpinLock lock_; // protects memo_ and numThens_
816     Future<T> memo_;
817     F func_;
818     size_t numThens_; // how many Futures completed and called .then()
819     size_t numFutures_; // how many Futures in total
820     Promise<T> promise_;
821   };
822
823   auto ctx = std::make_shared<UnorderedReduceContext>(
824     std::move(initial), std::move(func), std::distance(first, last));
825
826   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
827     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
828     // Futures can be completed in any order, simultaneously.
829     // To make this non-blocking, we create a new Future chain in
830     // the order of completion to reduce the values.
831     // The spinlock just protects chaining a new Future, not actually
832     // executing the reduce, which should be really fast.
833     folly::MSLGuard lock(ctx->lock_);
834     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
835       // Either return a ItT&& or a Try<ItT>&& depending
836       // on the type of the argument of func.
837       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
838     });
839     if (++ctx->numThens_ == ctx->numFutures_) {
840       // After reducing the value of the last Future, fulfill the Promise
841       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
842         ctx->promise_.setValue(std::move(t2));
843       });
844     }
845   });
846
847   return ctx->promise_.getFuture();
848 }
849
850 // within
851
852 template <class T>
853 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
854   return within(dur, TimedOut(), tk);
855 }
856
857 template <class T>
858 template <class E>
859 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
860
861   struct Context {
862     Context(E ex) : exception(std::move(ex)), promise() {}
863     E exception;
864     Future<Unit> thisFuture;
865     Promise<T> promise;
866     std::atomic<bool> token {false};
867   };
868
869   if (!tk) {
870     tk = folly::detail::getTimekeeperSingleton();
871   }
872
873   auto ctx = std::make_shared<Context>(std::move(e));
874
875   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
876     // TODO: "this" completed first, cancel "after"
877     if (ctx->token.exchange(true) == false) {
878       ctx->promise.setTry(std::move(t));
879     }
880   });
881
882   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
883     // "after" completed first, cancel "this"
884     ctx->thisFuture.raise(TimedOut());
885     if (ctx->token.exchange(true) == false) {
886       if (t.hasException()) {
887         ctx->promise.setException(std::move(t.exception()));
888       } else {
889         ctx->promise.setException(std::move(ctx->exception));
890       }
891     }
892   });
893
894   return ctx->promise.getFuture().via(getExecutor());
895 }
896
897 // delayed
898
899 template <class T>
900 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
901   return collectAll(*this, futures::sleep(dur, tk))
902     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
903       Try<T>& t = std::get<0>(tup);
904       return makeFuture<T>(std::move(t));
905     });
906 }
907
908 namespace detail {
909
910 template <class T>
911 void waitImpl(Future<T>& f) {
912   // short-circuit if there's nothing to do
913   if (f.isReady()) return;
914
915   folly::fibers::Baton baton;
916   f = f.then([&](Try<T> t) {
917     baton.post();
918     return makeFuture(std::move(t));
919   });
920   baton.wait();
921
922   // There's a race here between the return here and the actual finishing of
923   // the future. f is completed, but the setup may not have finished on done
924   // after the baton has posted.
925   while (!f.isReady()) {
926     std::this_thread::yield();
927   }
928 }
929
930 template <class T>
931 void waitImpl(Future<T>& f, Duration dur) {
932   // short-circuit if there's nothing to do
933   if (f.isReady()) return;
934
935   auto baton = std::make_shared<folly::fibers::Baton>();
936   f = f.then([baton](Try<T> t) {
937     baton->post();
938     return makeFuture(std::move(t));
939   });
940
941   // Let's preserve the invariant that if we did not timeout (timed_wait returns
942   // true), then the returned Future is complete when it is returned to the
943   // caller. We need to wait out the race for that Future to complete.
944   if (baton->timed_wait(dur)) {
945     while (!f.isReady()) {
946       std::this_thread::yield();
947     }
948   }
949 }
950
951 template <class T>
952 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
953   while (!f.isReady()) {
954     e->drive();
955   }
956 }
957
958 } // detail
959
960 template <class T>
961 Future<T>& Future<T>::wait() & {
962   detail::waitImpl(*this);
963   return *this;
964 }
965
966 template <class T>
967 Future<T>&& Future<T>::wait() && {
968   detail::waitImpl(*this);
969   return std::move(*this);
970 }
971
972 template <class T>
973 Future<T>& Future<T>::wait(Duration dur) & {
974   detail::waitImpl(*this, dur);
975   return *this;
976 }
977
978 template <class T>
979 Future<T>&& Future<T>::wait(Duration dur) && {
980   detail::waitImpl(*this, dur);
981   return std::move(*this);
982 }
983
984 template <class T>
985 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
986   detail::waitViaImpl(*this, e);
987   return *this;
988 }
989
990 template <class T>
991 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
992   detail::waitViaImpl(*this, e);
993   return std::move(*this);
994 }
995
996 template <class T>
997 T Future<T>::get() {
998   return std::move(wait().value());
999 }
1000
1001 template <class T>
1002 T Future<T>::get(Duration dur) {
1003   wait(dur);
1004   if (isReady()) {
1005     return std::move(value());
1006   } else {
1007     throw TimedOut();
1008   }
1009 }
1010
1011 template <class T>
1012 T Future<T>::getVia(DrivableExecutor* e) {
1013   return std::move(waitVia(e).value());
1014 }
1015
1016 namespace detail {
1017   template <class T>
1018   struct TryEquals {
1019     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1020       return t1.value() == t2.value();
1021     }
1022   };
1023 }
1024
1025 template <class T>
1026 Future<bool> Future<T>::willEqual(Future<T>& f) {
1027   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1028     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1029       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1030     } else {
1031       return false;
1032       }
1033   });
1034 }
1035
1036 template <class T>
1037 template <class F>
1038 Future<T> Future<T>::filter(F predicate) {
1039   auto p = folly::makeMoveWrapper(std::move(predicate));
1040   return this->then([p](T val) {
1041     T const& valConstRef = val;
1042     if (!(*p)(valConstRef)) {
1043       throw PredicateDoesNotObtain();
1044     }
1045     return val;
1046   });
1047 }
1048
1049 template <class T>
1050 template <class Callback>
1051 auto Future<T>::thenMulti(Callback&& fn)
1052     -> decltype(this->then(std::forward<Callback>(fn))) {
1053   // thenMulti with one callback is just a then
1054   return then(std::forward<Callback>(fn));
1055 }
1056
1057 template <class T>
1058 template <class Callback, class... Callbacks>
1059 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1060     -> decltype(this->then(std::forward<Callback>(fn)).
1061                       thenMulti(std::forward<Callbacks>(fns)...)) {
1062   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1063   return then(std::forward<Callback>(fn)).
1064          thenMulti(std::forward<Callbacks>(fns)...);
1065 }
1066
1067 template <class T>
1068 template <class Callback, class... Callbacks>
1069 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1070                                       Callbacks&&... fns)
1071     -> decltype(this->then(std::forward<Callback>(fn)).
1072                       thenMulti(std::forward<Callbacks>(fns)...)) {
1073   // thenMultiExecutor with two callbacks is
1074   // via(x).then(a).thenMulti(b, ...).via(oldX)
1075   auto oldX = getExecutor();
1076   setExecutor(x);
1077   return then(std::forward<Callback>(fn)).
1078          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1079 }
1080
1081 template <class T>
1082 template <class Callback>
1083 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1084     -> decltype(this->then(std::forward<Callback>(fn))) {
1085   // thenMulti with one callback is just a then with an executor
1086   return then(x, std::forward<Callback>(fn));
1087 }
1088
1089 namespace futures {
1090   template <class It, class F, class ItT, class Result>
1091   std::vector<Future<Result>> map(It first, It last, F func) {
1092     std::vector<Future<Result>> results;
1093     for (auto it = first; it != last; it++) {
1094       results.push_back(it->then(func));
1095     }
1096     return results;
1097   }
1098 }
1099
1100 namespace futures {
1101
1102 namespace detail {
1103
1104 struct retrying_policy_raw_tag {};
1105 struct retrying_policy_fut_tag {};
1106
1107 template <class Policy>
1108 struct retrying_policy_traits {
1109   using ew = exception_wrapper;
1110   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1111   template <class Ret>
1112   using has_op = typename std::integral_constant<bool,
1113         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1114         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1115   using is_raw = has_op<bool>;
1116   using is_fut = has_op<Future<bool>>;
1117   using tag = typename std::conditional<
1118         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1119         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1120 };
1121
1122 template <class Policy, class FF>
1123 typename std::result_of<FF(size_t)>::type
1124 retrying(size_t k, Policy&& p, FF&& ff) {
1125   using F = typename std::result_of<FF(size_t)>::type;
1126   using T = typename F::value_type;
1127   auto f = ff(k++);
1128   auto pm = makeMoveWrapper(p);
1129   auto ffm = makeMoveWrapper(ff);
1130   return f.onError([=](exception_wrapper x) mutable {
1131       auto q = (*pm)(k, x);
1132       auto xm = makeMoveWrapper(std::move(x));
1133       return q.then([=](bool r) mutable {
1134           return r
1135             ? retrying(k, pm.move(), ffm.move())
1136             : makeFuture<T>(xm.move());
1137       });
1138   });
1139 }
1140
1141 template <class Policy, class FF>
1142 typename std::result_of<FF(size_t)>::type
1143 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1144   auto pm = makeMoveWrapper(std::move(p));
1145   auto q = [=](size_t k, exception_wrapper x) {
1146     return makeFuture<bool>((*pm)(k, x));
1147   };
1148   return retrying(0, std::move(q), std::forward<FF>(ff));
1149 }
1150
1151 template <class Policy, class FF>
1152 typename std::result_of<FF(size_t)>::type
1153 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1154   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1155 }
1156
1157 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1158 template <class URNG>
1159 Duration retryingJitteredExponentialBackoffDur(
1160     size_t n,
1161     Duration backoff_min,
1162     Duration backoff_max,
1163     double jitter_param,
1164     URNG& rng) {
1165   using d = Duration;
1166   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1167   auto jitter = std::exp(dist(rng));
1168   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1169   return std::max(backoff_min, std::min(backoff_max, backoff));
1170 }
1171
1172 template <class Policy, class URNG>
1173 std::function<Future<bool>(size_t, const exception_wrapper&)>
1174 retryingPolicyCappedJitteredExponentialBackoff(
1175     size_t max_tries,
1176     Duration backoff_min,
1177     Duration backoff_max,
1178     double jitter_param,
1179     URNG rng,
1180     Policy&& p) {
1181   auto pm = makeMoveWrapper(std::move(p));
1182   auto rngp = std::make_shared<URNG>(std::move(rng));
1183   return [=](size_t n, const exception_wrapper& ex) mutable {
1184     if (n == max_tries) { return makeFuture(false); }
1185     return (*pm)(n, ex).then([=](bool v) {
1186         if (!v) { return makeFuture(false); }
1187         auto backoff = detail::retryingJitteredExponentialBackoffDur(
1188             n, backoff_min, backoff_max, jitter_param, *rngp);
1189         return futures::sleep(backoff).then([] { return true; });
1190     });
1191   };
1192 }
1193
1194 template <class Policy, class URNG>
1195 std::function<Future<bool>(size_t, const exception_wrapper&)>
1196 retryingPolicyCappedJitteredExponentialBackoff(
1197     size_t max_tries,
1198     Duration backoff_min,
1199     Duration backoff_max,
1200     double jitter_param,
1201     URNG rng,
1202     Policy&& p,
1203     retrying_policy_raw_tag) {
1204   auto pm = makeMoveWrapper(std::move(p));
1205   auto q = [=](size_t n, const exception_wrapper& e) {
1206     return makeFuture((*pm)(n, e));
1207   };
1208   return retryingPolicyCappedJitteredExponentialBackoff(
1209       max_tries,
1210       backoff_min,
1211       backoff_max,
1212       jitter_param,
1213       std::move(rng),
1214       std::move(q));
1215 }
1216
1217 template <class Policy, class URNG>
1218 std::function<Future<bool>(size_t, const exception_wrapper&)>
1219 retryingPolicyCappedJitteredExponentialBackoff(
1220     size_t max_tries,
1221     Duration backoff_min,
1222     Duration backoff_max,
1223     double jitter_param,
1224     URNG rng,
1225     Policy&& p,
1226     retrying_policy_fut_tag) {
1227   return retryingPolicyCappedJitteredExponentialBackoff(
1228       max_tries,
1229       backoff_min,
1230       backoff_max,
1231       jitter_param,
1232       std::move(rng),
1233       std::move(p));
1234 }
1235
1236 }
1237
1238 template <class Policy, class FF>
1239 typename std::result_of<FF(size_t)>::type
1240 retrying(Policy&& p, FF&& ff) {
1241   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1242   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1243 }
1244
1245 inline
1246 std::function<bool(size_t, const exception_wrapper&)>
1247 retryingPolicyBasic(
1248     size_t max_tries) {
1249   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1250 }
1251
1252 template <class Policy, class URNG>
1253 std::function<Future<bool>(size_t, const exception_wrapper&)>
1254 retryingPolicyCappedJitteredExponentialBackoff(
1255     size_t max_tries,
1256     Duration backoff_min,
1257     Duration backoff_max,
1258     double jitter_param,
1259     URNG rng,
1260     Policy&& p) {
1261   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1262   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1263       max_tries,
1264       backoff_min,
1265       backoff_max,
1266       jitter_param,
1267       std::move(rng),
1268       std::move(p),
1269       tag());
1270 }
1271
1272 inline
1273 std::function<Future<bool>(size_t, const exception_wrapper&)>
1274 retryingPolicyCappedJitteredExponentialBackoff(
1275     size_t max_tries,
1276     Duration backoff_min,
1277     Duration backoff_max,
1278     double jitter_param) {
1279   auto p = [](size_t, const exception_wrapper&) { return true; };
1280   return retryingPolicyCappedJitteredExponentialBackoff(
1281       max_tries,
1282       backoff_min,
1283       backoff_max,
1284       jitter_param,
1285       ThreadLocalPRNG(),
1286       std::move(p));
1287 }
1288
1289 }
1290
1291 // Instantiate the most common Future types to save compile time
1292 extern template class Future<Unit>;
1293 extern template class Future<bool>;
1294 extern template class Future<int>;
1295 extern template class Future<int64_t>;
1296 extern template class Future<std::string>;
1297 extern template class Future<double>;
1298
1299 } // namespace folly