Added futures helpers times, when, and whileDo
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <chrono>
21 #include <random>
22 #include <thread>
23
24 #include <folly/experimental/fibers/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 namespace folly {
32
33 class Timekeeper;
34
35 namespace detail {
36   Timekeeper* getTimekeeperSingleton();
37 }
38
39 template <class T>
40 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
41   other.core_ = nullptr;
42 }
43
44 template <class T>
45 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
46   std::swap(core_, other.core_);
47   return *this;
48 }
49
50 template <class T>
51 template <class T2, typename>
52 Future<T>::Future(T2&& val)
53   : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
54
55 template <class T>
56 template <typename, typename>
57 Future<T>::Future()
58   : core_(new detail::Core<T>(Try<T>(T()))) {}
59
60 template <class T>
61 Future<T>::~Future() {
62   detach();
63 }
64
65 template <class T>
66 void Future<T>::detach() {
67   if (core_) {
68     core_->detachFuture();
69     core_ = nullptr;
70   }
71 }
72
73 template <class T>
74 void Future<T>::throwIfInvalid() const {
75   if (!core_)
76     throw NoState();
77 }
78
79 template <class T>
80 template <class F>
81 void Future<T>::setCallback_(F&& func) {
82   throwIfInvalid();
83   core_->setCallback(std::move(func));
84 }
85
86 // unwrap
87
88 template <class T>
89 template <class F>
90 typename std::enable_if<isFuture<F>::value,
91                         Future<typename isFuture<T>::Inner>>::type
92 Future<T>::unwrap() {
93   return then([](Future<typename isFuture<T>::Inner> internal_future) {
94       return internal_future;
95   });
96 }
97
98 // then
99
100 // Variant: returns a value
101 // e.g. f.then([](Try<T>&& t){ return t.value(); });
102 template <class T>
103 template <typename F, typename R, bool isTry, typename... Args>
104 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
105 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
106   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
107   typedef typename R::ReturnsFuture::Inner B;
108
109   throwIfInvalid();
110
111   // wrap these so we can move them into the lambda
112   folly::MoveWrapper<Promise<B>> p;
113   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
114   folly::MoveWrapper<F> funcm(std::forward<F>(func));
115
116   // grab the Future now before we lose our handle on the Promise
117   auto f = p->getFuture();
118   f.core_->setExecutorNoLock(getExecutor());
119
120   /* This is a bit tricky.
121
122      We can't just close over *this in case this Future gets moved. So we
123      make a new dummy Future. We could figure out something more
124      sophisticated that avoids making a new Future object when it can, as an
125      optimization. But this is correct.
126
127      core_ can't be moved, it is explicitly disallowed (as is copying). But
128      if there's ever a reason to allow it, this is one place that makes that
129      assumption and would need to be fixed. We use a standard shared pointer
130      for core_ (by copying it in), which means in essence obj holds a shared
131      pointer to itself.  But this shouldn't leak because Promise will not
132      outlive the continuation, because Promise will setException() with a
133      broken Promise if it is destructed before completed. We could use a
134      weak pointer but it would have to be converted to a shared pointer when
135      func is executed (because the Future returned by func may possibly
136      persist beyond the callback, if it gets moved), and so it is an
137      optimization to just make it shared from the get-go.
138
139      We have to move in the Promise and func using the MoveWrapper
140      hack. (func could be copied but it's a big drag on perf).
141
142      Two subtle but important points about this design. detail::Core has no
143      back pointers to Future or Promise, so if Future or Promise get moved
144      (and they will be moved in performant code) we don't have to do
145      anything fancy. And because we store the continuation in the
146      detail::Core, not in the Future, we can execute the continuation even
147      after the Future has gone out of scope. This is an intentional design
148      decision. It is likely we will want to be able to cancel a continuation
149      in some circumstances, but I think it should be explicit not implicit
150      in the destruction of the Future used to create it.
151      */
152   setCallback_(
153     [p, funcm](Try<T>&& t) mutable {
154       if (!isTry && t.hasException()) {
155         p->setException(std::move(t.exception()));
156       } else {
157         p->setWith([&]() {
158           return (*funcm)(t.template get<isTry, Args>()...);
159         });
160       }
161     });
162
163   return f;
164 }
165
166 // Variant: returns a Future
167 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
168 template <class T>
169 template <typename F, typename R, bool isTry, typename... Args>
170 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
171 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
172   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
173   typedef typename R::ReturnsFuture::Inner B;
174
175   throwIfInvalid();
176
177   // wrap these so we can move them into the lambda
178   folly::MoveWrapper<Promise<B>> p;
179   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
180   folly::MoveWrapper<F> funcm(std::forward<F>(func));
181
182   // grab the Future now before we lose our handle on the Promise
183   auto f = p->getFuture();
184   f.core_->setExecutorNoLock(getExecutor());
185
186   setCallback_(
187     [p, funcm](Try<T>&& t) mutable {
188       if (!isTry && t.hasException()) {
189         p->setException(std::move(t.exception()));
190       } else {
191         try {
192           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
193           // that didn't throw, now we can steal p
194           f2.setCallback_([p](Try<B>&& b) mutable {
195             p->setTry(std::move(b));
196           });
197         } catch (const std::exception& e) {
198           p->setException(exception_wrapper(std::current_exception(), e));
199         } catch (...) {
200           p->setException(exception_wrapper(std::current_exception()));
201         }
202       }
203     });
204
205   return f;
206 }
207
208 template <typename T>
209 template <typename R, typename Caller, typename... Args>
210   Future<typename isFuture<R>::Inner>
211 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
212   typedef typename std::remove_cv<
213     typename std::remove_reference<
214       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
215   return then([instance, func](Try<T>&& t){
216     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
217   });
218 }
219
220 template <class T>
221 template <class Executor, class Arg, class... Args>
222 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
223   -> decltype(this->then(std::forward<Arg>(arg),
224                          std::forward<Args>(args)...))
225 {
226   auto oldX = getExecutor();
227   setExecutor(x);
228   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
229                via(oldX);
230 }
231
232 template <class T>
233 Future<Unit> Future<T>::then() {
234   return then([] () {});
235 }
236
237 // onError where the callback returns T
238 template <class T>
239 template <class F>
240 typename std::enable_if<
241   !detail::callableWith<F, exception_wrapper>::value &&
242   !detail::Extract<F>::ReturnsFuture::value,
243   Future<T>>::type
244 Future<T>::onError(F&& func) {
245   typedef typename detail::Extract<F>::FirstArg Exn;
246   static_assert(
247       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
248       "Return type of onError callback must be T or Future<T>");
249
250   Promise<T> p;
251   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
252   auto f = p.getFuture();
253   auto pm = folly::makeMoveWrapper(std::move(p));
254   auto funcm = folly::makeMoveWrapper(std::move(func));
255   setCallback_([pm, funcm](Try<T>&& t) mutable {
256     if (!t.template withException<Exn>([&] (Exn& e) {
257           pm->setWith([&]{
258             return (*funcm)(e);
259           });
260         })) {
261       pm->setTry(std::move(t));
262     }
263   });
264
265   return f;
266 }
267
268 // onError where the callback returns Future<T>
269 template <class T>
270 template <class F>
271 typename std::enable_if<
272   !detail::callableWith<F, exception_wrapper>::value &&
273   detail::Extract<F>::ReturnsFuture::value,
274   Future<T>>::type
275 Future<T>::onError(F&& func) {
276   static_assert(
277       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
278       "Return type of onError callback must be T or Future<T>");
279   typedef typename detail::Extract<F>::FirstArg Exn;
280
281   Promise<T> p;
282   auto f = p.getFuture();
283   auto pm = folly::makeMoveWrapper(std::move(p));
284   auto funcm = folly::makeMoveWrapper(std::move(func));
285   setCallback_([pm, funcm](Try<T>&& t) mutable {
286     if (!t.template withException<Exn>([&] (Exn& e) {
287           try {
288             auto f2 = (*funcm)(e);
289             f2.setCallback_([pm](Try<T>&& t2) mutable {
290               pm->setTry(std::move(t2));
291             });
292           } catch (const std::exception& e2) {
293             pm->setException(exception_wrapper(std::current_exception(), e2));
294           } catch (...) {
295             pm->setException(exception_wrapper(std::current_exception()));
296           }
297         })) {
298       pm->setTry(std::move(t));
299     }
300   });
301
302   return f;
303 }
304
305 template <class T>
306 template <class F>
307 Future<T> Future<T>::ensure(F func) {
308   MoveWrapper<F> funcw(std::move(func));
309   return this->then([funcw](Try<T>&& t) mutable {
310     (*funcw)();
311     return makeFuture(std::move(t));
312   });
313 }
314
315 template <class T>
316 template <class F>
317 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
318   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
319   return within(dur, tk)
320     .onError([funcw](TimedOut const&) { return (*funcw)(); });
321 }
322
323 template <class T>
324 template <class F>
325 typename std::enable_if<
326   detail::callableWith<F, exception_wrapper>::value &&
327   detail::Extract<F>::ReturnsFuture::value,
328   Future<T>>::type
329 Future<T>::onError(F&& func) {
330   static_assert(
331       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
332       "Return type of onError callback must be T or Future<T>");
333
334   Promise<T> p;
335   auto f = p.getFuture();
336   auto pm = folly::makeMoveWrapper(std::move(p));
337   auto funcm = folly::makeMoveWrapper(std::move(func));
338   setCallback_([pm, funcm](Try<T> t) mutable {
339     if (t.hasException()) {
340       try {
341         auto f2 = (*funcm)(std::move(t.exception()));
342         f2.setCallback_([pm](Try<T> t2) mutable {
343           pm->setTry(std::move(t2));
344         });
345       } catch (const std::exception& e2) {
346         pm->setException(exception_wrapper(std::current_exception(), e2));
347       } catch (...) {
348         pm->setException(exception_wrapper(std::current_exception()));
349       }
350     } else {
351       pm->setTry(std::move(t));
352     }
353   });
354
355   return f;
356 }
357
358 // onError(exception_wrapper) that returns T
359 template <class T>
360 template <class F>
361 typename std::enable_if<
362   detail::callableWith<F, exception_wrapper>::value &&
363   !detail::Extract<F>::ReturnsFuture::value,
364   Future<T>>::type
365 Future<T>::onError(F&& func) {
366   static_assert(
367       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
368       "Return type of onError callback must be T or Future<T>");
369
370   Promise<T> p;
371   auto f = p.getFuture();
372   auto pm = folly::makeMoveWrapper(std::move(p));
373   auto funcm = folly::makeMoveWrapper(std::move(func));
374   setCallback_([pm, funcm](Try<T> t) mutable {
375     if (t.hasException()) {
376       pm->setWith([&]{
377         return (*funcm)(std::move(t.exception()));
378       });
379     } else {
380       pm->setTry(std::move(t));
381     }
382   });
383
384   return f;
385 }
386
387 template <class T>
388 typename std::add_lvalue_reference<T>::type Future<T>::value() {
389   throwIfInvalid();
390
391   return core_->getTry().value();
392 }
393
394 template <class T>
395 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
396   throwIfInvalid();
397
398   return core_->getTry().value();
399 }
400
401 template <class T>
402 Try<T>& Future<T>::getTry() {
403   throwIfInvalid();
404
405   return core_->getTry();
406 }
407
408 template <class T>
409 Optional<Try<T>> Future<T>::poll() {
410   Optional<Try<T>> o;
411   if (core_->ready()) {
412     o = std::move(core_->getTry());
413   }
414   return o;
415 }
416
417 template <class T>
418 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
419   throwIfInvalid();
420
421   setExecutor(executor, priority);
422
423   return std::move(*this);
424 }
425
426 template <class T>
427 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
428   throwIfInvalid();
429
430   MoveWrapper<Promise<T>> p;
431   auto f = p->getFuture();
432   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
433   return std::move(f).via(executor, priority);
434 }
435
436
437 template <class Func>
438 auto via(Executor* x, Func func)
439   -> Future<typename isFuture<decltype(func())>::Inner>
440 {
441   // TODO make this actually more performant. :-P #7260175
442   return via(x).then(func);
443 }
444
445 template <class T>
446 bool Future<T>::isReady() const {
447   throwIfInvalid();
448   return core_->ready();
449 }
450
451 template <class T>
452 bool Future<T>::hasValue() {
453   return getTry().hasValue();
454 }
455
456 template <class T>
457 bool Future<T>::hasException() {
458   return getTry().hasException();
459 }
460
461 template <class T>
462 void Future<T>::raise(exception_wrapper exception) {
463   core_->raise(std::move(exception));
464 }
465
466 // makeFuture
467
468 template <class T>
469 Future<typename std::decay<T>::type> makeFuture(T&& t) {
470   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
471 }
472
473 inline // for multiple translation units
474 Future<Unit> makeFuture() {
475   return makeFuture(Unit{});
476 }
477
478 template <class F>
479 auto makeFutureWith(F&& func)
480     -> Future<typename Unit::Lift<decltype(func())>::type> {
481   using LiftedResult = typename Unit::Lift<decltype(func())>::type;
482   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
483     return func();
484   }));
485 }
486
487 template <class T>
488 Future<T> makeFuture(std::exception_ptr const& e) {
489   return makeFuture(Try<T>(e));
490 }
491
492 template <class T>
493 Future<T> makeFuture(exception_wrapper ew) {
494   return makeFuture(Try<T>(std::move(ew)));
495 }
496
497 template <class T, class E>
498 typename std::enable_if<std::is_base_of<std::exception, E>::value,
499                         Future<T>>::type
500 makeFuture(E const& e) {
501   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
502 }
503
504 template <class T>
505 Future<T> makeFuture(Try<T>&& t) {
506   return Future<T>(new detail::Core<T>(std::move(t)));
507 }
508
509 // via
510 Future<Unit> via(Executor* executor, int8_t priority) {
511   return makeFuture().via(executor, priority);
512 }
513
514 // mapSetCallback calls func(i, Try<T>) when every future completes
515
516 template <class T, class InputIterator, class F>
517 void mapSetCallback(InputIterator first, InputIterator last, F func) {
518   for (size_t i = 0; first != last; ++first, ++i) {
519     first->setCallback_([func, i](Try<T>&& t) {
520       func(i, std::move(t));
521     });
522   }
523 }
524
525 // collectAll (variadic)
526
527 template <typename... Fs>
528 typename detail::CollectAllVariadicContext<
529   typename std::decay<Fs>::type::value_type...>::type
530 collectAll(Fs&&... fs) {
531   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
532     typename std::decay<Fs>::type::value_type...>>();
533   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
534     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
535   return ctx->p.getFuture();
536 }
537
538 // collectAll (iterator)
539
540 template <class InputIterator>
541 Future<
542   std::vector<
543   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
544 collectAll(InputIterator first, InputIterator last) {
545   typedef
546     typename std::iterator_traits<InputIterator>::value_type::value_type T;
547
548   struct CollectAllContext {
549     CollectAllContext(int n) : results(n) {}
550     ~CollectAllContext() {
551       p.setValue(std::move(results));
552     }
553     Promise<std::vector<Try<T>>> p;
554     std::vector<Try<T>> results;
555   };
556
557   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
558   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
559     ctx->results[i] = std::move(t);
560   });
561   return ctx->p.getFuture();
562 }
563
564 // collect (iterator)
565
566 namespace detail {
567
568 template <typename T>
569 struct CollectContext {
570   struct Nothing { explicit Nothing(int n) {} };
571
572   using Result = typename std::conditional<
573     std::is_void<T>::value,
574     void,
575     std::vector<T>>::type;
576
577   using InternalResult = typename std::conditional<
578     std::is_void<T>::value,
579     Nothing,
580     std::vector<Optional<T>>>::type;
581
582   explicit CollectContext(int n) : result(n) {}
583   ~CollectContext() {
584     if (!threw.exchange(true)) {
585       // map Optional<T> -> T
586       std::vector<T> finalResult;
587       finalResult.reserve(result.size());
588       std::transform(result.begin(), result.end(),
589                      std::back_inserter(finalResult),
590                      [](Optional<T>& o) { return std::move(o.value()); });
591       p.setValue(std::move(finalResult));
592     }
593   }
594   inline void setPartialResult(size_t i, Try<T>& t) {
595     result[i] = std::move(t.value());
596   }
597   Promise<Result> p;
598   InternalResult result;
599   std::atomic<bool> threw {false};
600 };
601
602 }
603
604 template <class InputIterator>
605 Future<typename detail::CollectContext<
606   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
607 collect(InputIterator first, InputIterator last) {
608   typedef
609     typename std::iterator_traits<InputIterator>::value_type::value_type T;
610
611   auto ctx = std::make_shared<detail::CollectContext<T>>(
612     std::distance(first, last));
613   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
614     if (t.hasException()) {
615        if (!ctx->threw.exchange(true)) {
616          ctx->p.setException(std::move(t.exception()));
617        }
618      } else if (!ctx->threw) {
619        ctx->setPartialResult(i, t);
620      }
621   });
622   return ctx->p.getFuture();
623 }
624
625 // collect (variadic)
626
627 template <typename... Fs>
628 typename detail::CollectVariadicContext<
629   typename std::decay<Fs>::type::value_type...>::type
630 collect(Fs&&... fs) {
631   auto ctx = std::make_shared<detail::CollectVariadicContext<
632     typename std::decay<Fs>::type::value_type...>>();
633   detail::collectVariadicHelper<detail::CollectVariadicContext>(
634     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
635   return ctx->p.getFuture();
636 }
637
638 // collectAny (iterator)
639
640 template <class InputIterator>
641 Future<
642   std::pair<size_t,
643             Try<
644               typename
645               std::iterator_traits<InputIterator>::value_type::value_type>>>
646 collectAny(InputIterator first, InputIterator last) {
647   typedef
648     typename std::iterator_traits<InputIterator>::value_type::value_type T;
649
650   struct CollectAnyContext {
651     CollectAnyContext() {};
652     Promise<std::pair<size_t, Try<T>>> p;
653     std::atomic<bool> done {false};
654   };
655
656   auto ctx = std::make_shared<CollectAnyContext>();
657   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
658     if (!ctx->done.exchange(true)) {
659       ctx->p.setValue(std::make_pair(i, std::move(t)));
660     }
661   });
662   return ctx->p.getFuture();
663 }
664
665 // collectN (iterator)
666
667 template <class InputIterator>
668 Future<std::vector<std::pair<size_t, Try<typename
669   std::iterator_traits<InputIterator>::value_type::value_type>>>>
670 collectN(InputIterator first, InputIterator last, size_t n) {
671   typedef typename
672     std::iterator_traits<InputIterator>::value_type::value_type T;
673   typedef std::vector<std::pair<size_t, Try<T>>> V;
674
675   struct CollectNContext {
676     V v;
677     std::atomic<size_t> completed = {0};
678     Promise<V> p;
679   };
680   auto ctx = std::make_shared<CollectNContext>();
681
682   if (size_t(std::distance(first, last)) < n) {
683     ctx->p.setException(std::runtime_error("Not enough futures"));
684   } else {
685     // for each completed Future, increase count and add to vector, until we
686     // have n completed futures at which point we fulfil our Promise with the
687     // vector
688     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
689       auto c = ++ctx->completed;
690       if (c <= n) {
691         assert(ctx->v.size() < n);
692         ctx->v.emplace_back(i, std::move(t));
693         if (c == n) {
694           ctx->p.setTry(Try<V>(std::move(ctx->v)));
695         }
696       }
697     });
698   }
699
700   return ctx->p.getFuture();
701 }
702
703 // reduce (iterator)
704
705 template <class It, class T, class F>
706 Future<T> reduce(It first, It last, T&& initial, F&& func) {
707   if (first == last) {
708     return makeFuture(std::move(initial));
709   }
710
711   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
712   typedef typename std::conditional<
713     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
714   typedef isTry<Arg> IsTry;
715
716   folly::MoveWrapper<T> minitial(std::move(initial));
717   auto sfunc = std::make_shared<F>(std::move(func));
718
719   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
720     return (*sfunc)(std::move(*minitial),
721                 head.template get<IsTry::value, Arg&&>());
722   });
723
724   for (++first; first != last; ++first) {
725     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
726       return (*sfunc)(std::move(std::get<0>(t).value()),
727                   // Either return a ItT&& or a Try<ItT>&& depending
728                   // on the type of the argument of func.
729                   std::get<1>(t).template get<IsTry::value, Arg&&>());
730     });
731   }
732
733   return f;
734 }
735
736 // window (collection)
737
738 template <class Collection, class F, class ItT, class Result>
739 std::vector<Future<Result>>
740 window(Collection input, F func, size_t n) {
741   struct WindowContext {
742     WindowContext(Collection&& i, F&& fn)
743         : input_(std::move(i)), promises_(input_.size()),
744           func_(std::move(fn))
745       {}
746     std::atomic<size_t> i_ {0};
747     Collection input_;
748     std::vector<Promise<Result>> promises_;
749     F func_;
750
751     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
752       size_t i = ctx->i_++;
753       if (i < ctx->input_.size()) {
754         // Using setCallback_ directly since we don't need the Future
755         ctx->func_(std::move(ctx->input_[i])).setCallback_(
756           // ctx is captured by value
757           [ctx, i](Try<Result>&& t) {
758             ctx->promises_[i].setTry(std::move(t));
759             // Chain another future onto this one
760             spawn(std::move(ctx));
761           });
762       }
763     }
764   };
765
766   auto max = std::min(n, input.size());
767
768   auto ctx = std::make_shared<WindowContext>(
769     std::move(input), std::move(func));
770
771   for (size_t i = 0; i < max; ++i) {
772     // Start the first n Futures
773     WindowContext::spawn(ctx);
774   }
775
776   std::vector<Future<Result>> futures;
777   futures.reserve(ctx->promises_.size());
778   for (auto& promise : ctx->promises_) {
779     futures.emplace_back(promise.getFuture());
780   }
781
782   return futures;
783 }
784
785 // reduce
786
787 template <class T>
788 template <class I, class F>
789 Future<I> Future<T>::reduce(I&& initial, F&& func) {
790   folly::MoveWrapper<I> minitial(std::move(initial));
791   folly::MoveWrapper<F> mfunc(std::move(func));
792   return then([minitial, mfunc](T& vals) mutable {
793     auto ret = std::move(*minitial);
794     for (auto& val : vals) {
795       ret = (*mfunc)(std::move(ret), std::move(val));
796     }
797     return ret;
798   });
799 }
800
801 // unorderedReduce (iterator)
802
803 template <class It, class T, class F, class ItT, class Arg>
804 Future<T> unorderedReduce(It first, It last, T initial, F func) {
805   if (first == last) {
806     return makeFuture(std::move(initial));
807   }
808
809   typedef isTry<Arg> IsTry;
810
811   struct UnorderedReduceContext {
812     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
813         : lock_(), memo_(makeFuture<T>(std::move(memo))),
814           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
815       {};
816     folly::MicroSpinLock lock_; // protects memo_ and numThens_
817     Future<T> memo_;
818     F func_;
819     size_t numThens_; // how many Futures completed and called .then()
820     size_t numFutures_; // how many Futures in total
821     Promise<T> promise_;
822   };
823
824   auto ctx = std::make_shared<UnorderedReduceContext>(
825     std::move(initial), std::move(func), std::distance(first, last));
826
827   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
828     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
829     // Futures can be completed in any order, simultaneously.
830     // To make this non-blocking, we create a new Future chain in
831     // the order of completion to reduce the values.
832     // The spinlock just protects chaining a new Future, not actually
833     // executing the reduce, which should be really fast.
834     folly::MSLGuard lock(ctx->lock_);
835     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
836       // Either return a ItT&& or a Try<ItT>&& depending
837       // on the type of the argument of func.
838       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
839     });
840     if (++ctx->numThens_ == ctx->numFutures_) {
841       // After reducing the value of the last Future, fulfill the Promise
842       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
843         ctx->promise_.setValue(std::move(t2));
844       });
845     }
846   });
847
848   return ctx->promise_.getFuture();
849 }
850
851 // within
852
853 template <class T>
854 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
855   return within(dur, TimedOut(), tk);
856 }
857
858 template <class T>
859 template <class E>
860 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
861
862   struct Context {
863     Context(E ex) : exception(std::move(ex)), promise() {}
864     E exception;
865     Future<Unit> thisFuture;
866     Promise<T> promise;
867     std::atomic<bool> token {false};
868   };
869
870   if (!tk) {
871     tk = folly::detail::getTimekeeperSingleton();
872   }
873
874   auto ctx = std::make_shared<Context>(std::move(e));
875
876   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
877     // TODO: "this" completed first, cancel "after"
878     if (ctx->token.exchange(true) == false) {
879       ctx->promise.setTry(std::move(t));
880     }
881   });
882
883   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
884     // "after" completed first, cancel "this"
885     ctx->thisFuture.raise(TimedOut());
886     if (ctx->token.exchange(true) == false) {
887       if (t.hasException()) {
888         ctx->promise.setException(std::move(t.exception()));
889       } else {
890         ctx->promise.setException(std::move(ctx->exception));
891       }
892     }
893   });
894
895   return ctx->promise.getFuture().via(getExecutor());
896 }
897
898 // delayed
899
900 template <class T>
901 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
902   return collectAll(*this, futures::sleep(dur, tk))
903     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
904       Try<T>& t = std::get<0>(tup);
905       return makeFuture<T>(std::move(t));
906     });
907 }
908
909 namespace detail {
910
911 template <class T>
912 void waitImpl(Future<T>& f) {
913   // short-circuit if there's nothing to do
914   if (f.isReady()) return;
915
916   folly::fibers::Baton baton;
917   f = f.then([&](Try<T> t) {
918     baton.post();
919     return makeFuture(std::move(t));
920   });
921   baton.wait();
922
923   // There's a race here between the return here and the actual finishing of
924   // the future. f is completed, but the setup may not have finished on done
925   // after the baton has posted.
926   while (!f.isReady()) {
927     std::this_thread::yield();
928   }
929 }
930
931 template <class T>
932 void waitImpl(Future<T>& f, Duration dur) {
933   // short-circuit if there's nothing to do
934   if (f.isReady()) return;
935
936   auto baton = std::make_shared<folly::fibers::Baton>();
937   f = f.then([baton](Try<T> t) {
938     baton->post();
939     return makeFuture(std::move(t));
940   });
941
942   // Let's preserve the invariant that if we did not timeout (timed_wait returns
943   // true), then the returned Future is complete when it is returned to the
944   // caller. We need to wait out the race for that Future to complete.
945   if (baton->timed_wait(dur)) {
946     while (!f.isReady()) {
947       std::this_thread::yield();
948     }
949   }
950 }
951
952 template <class T>
953 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
954   while (!f.isReady()) {
955     e->drive();
956   }
957 }
958
959 } // detail
960
961 template <class T>
962 Future<T>& Future<T>::wait() & {
963   detail::waitImpl(*this);
964   return *this;
965 }
966
967 template <class T>
968 Future<T>&& Future<T>::wait() && {
969   detail::waitImpl(*this);
970   return std::move(*this);
971 }
972
973 template <class T>
974 Future<T>& Future<T>::wait(Duration dur) & {
975   detail::waitImpl(*this, dur);
976   return *this;
977 }
978
979 template <class T>
980 Future<T>&& Future<T>::wait(Duration dur) && {
981   detail::waitImpl(*this, dur);
982   return std::move(*this);
983 }
984
985 template <class T>
986 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
987   detail::waitViaImpl(*this, e);
988   return *this;
989 }
990
991 template <class T>
992 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
993   detail::waitViaImpl(*this, e);
994   return std::move(*this);
995 }
996
997 template <class T>
998 T Future<T>::get() {
999   return std::move(wait().value());
1000 }
1001
1002 template <class T>
1003 T Future<T>::get(Duration dur) {
1004   wait(dur);
1005   if (isReady()) {
1006     return std::move(value());
1007   } else {
1008     throw TimedOut();
1009   }
1010 }
1011
1012 template <class T>
1013 T Future<T>::getVia(DrivableExecutor* e) {
1014   return std::move(waitVia(e).value());
1015 }
1016
1017 namespace detail {
1018   template <class T>
1019   struct TryEquals {
1020     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1021       return t1.value() == t2.value();
1022     }
1023   };
1024 }
1025
1026 template <class T>
1027 Future<bool> Future<T>::willEqual(Future<T>& f) {
1028   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1029     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1030       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1031     } else {
1032       return false;
1033       }
1034   });
1035 }
1036
1037 template <class T>
1038 template <class F>
1039 Future<T> Future<T>::filter(F predicate) {
1040   auto p = folly::makeMoveWrapper(std::move(predicate));
1041   return this->then([p](T val) {
1042     T const& valConstRef = val;
1043     if (!(*p)(valConstRef)) {
1044       throw PredicateDoesNotObtain();
1045     }
1046     return val;
1047   });
1048 }
1049
1050 template <class T>
1051 template <class Callback>
1052 auto Future<T>::thenMulti(Callback&& fn)
1053     -> decltype(this->then(std::forward<Callback>(fn))) {
1054   // thenMulti with one callback is just a then
1055   return then(std::forward<Callback>(fn));
1056 }
1057
1058 template <class T>
1059 template <class Callback, class... Callbacks>
1060 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1061     -> decltype(this->then(std::forward<Callback>(fn)).
1062                       thenMulti(std::forward<Callbacks>(fns)...)) {
1063   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1064   return then(std::forward<Callback>(fn)).
1065          thenMulti(std::forward<Callbacks>(fns)...);
1066 }
1067
1068 template <class T>
1069 template <class Callback, class... Callbacks>
1070 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1071                                       Callbacks&&... fns)
1072     -> decltype(this->then(std::forward<Callback>(fn)).
1073                       thenMulti(std::forward<Callbacks>(fns)...)) {
1074   // thenMultiExecutor with two callbacks is
1075   // via(x).then(a).thenMulti(b, ...).via(oldX)
1076   auto oldX = getExecutor();
1077   setExecutor(x);
1078   return then(std::forward<Callback>(fn)).
1079          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1080 }
1081
1082 template <class T>
1083 template <class Callback>
1084 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1085     -> decltype(this->then(std::forward<Callback>(fn))) {
1086   // thenMulti with one callback is just a then with an executor
1087   return then(x, std::forward<Callback>(fn));
1088 }
1089
1090 template <class F>
1091 inline Future<Unit> when(bool p, F thunk) {
1092   return p ? thunk().unit() : makeFuture();
1093 }
1094
1095 template <class P, class F>
1096 Future<Unit> whileDo(P predicate, F thunk) {
1097   if (predicate()) {
1098     return thunk().then([=] {
1099       return whileDo(predicate, thunk);
1100     });
1101   }
1102   return makeFuture();
1103 }
1104
1105 template <class F>
1106 Future<Unit> times(const int n, F thunk) {
1107   auto count = folly::makeMoveWrapper(
1108     std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
1109   );
1110   return folly::whileDo([=]() mutable {
1111       return (*count)->fetch_add(1) < n;
1112     }, thunk);
1113 }
1114
1115 namespace futures {
1116   template <class It, class F, class ItT, class Result>
1117   std::vector<Future<Result>> map(It first, It last, F func) {
1118     std::vector<Future<Result>> results;
1119     for (auto it = first; it != last; it++) {
1120       results.push_back(it->then(func));
1121     }
1122     return results;
1123   }
1124 }
1125
1126 namespace futures {
1127
1128 namespace detail {
1129
1130 struct retrying_policy_raw_tag {};
1131 struct retrying_policy_fut_tag {};
1132
1133 template <class Policy>
1134 struct retrying_policy_traits {
1135   using ew = exception_wrapper;
1136   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1137   template <class Ret>
1138   using has_op = typename std::integral_constant<bool,
1139         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1140         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1141   using is_raw = has_op<bool>;
1142   using is_fut = has_op<Future<bool>>;
1143   using tag = typename std::conditional<
1144         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1145         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1146 };
1147
1148 template <class Policy, class FF>
1149 typename std::result_of<FF(size_t)>::type
1150 retrying(size_t k, Policy&& p, FF&& ff) {
1151   using F = typename std::result_of<FF(size_t)>::type;
1152   using T = typename F::value_type;
1153   auto f = ff(k++);
1154   auto pm = makeMoveWrapper(p);
1155   auto ffm = makeMoveWrapper(ff);
1156   return f.onError([=](exception_wrapper x) mutable {
1157       auto q = (*pm)(k, x);
1158       auto xm = makeMoveWrapper(std::move(x));
1159       return q.then([=](bool r) mutable {
1160           return r
1161             ? retrying(k, pm.move(), ffm.move())
1162             : makeFuture<T>(xm.move());
1163       });
1164   });
1165 }
1166
1167 template <class Policy, class FF>
1168 typename std::result_of<FF(size_t)>::type
1169 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1170   auto pm = makeMoveWrapper(std::move(p));
1171   auto q = [=](size_t k, exception_wrapper x) {
1172     return makeFuture<bool>((*pm)(k, x));
1173   };
1174   return retrying(0, std::move(q), std::forward<FF>(ff));
1175 }
1176
1177 template <class Policy, class FF>
1178 typename std::result_of<FF(size_t)>::type
1179 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1180   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1181 }
1182
1183 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1184 template <class URNG>
1185 Duration retryingJitteredExponentialBackoffDur(
1186     size_t n,
1187     Duration backoff_min,
1188     Duration backoff_max,
1189     double jitter_param,
1190     URNG& rng) {
1191   using d = Duration;
1192   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1193   auto jitter = std::exp(dist(rng));
1194   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1195   return std::max(backoff_min, std::min(backoff_max, backoff));
1196 }
1197
1198 template <class Policy, class URNG>
1199 std::function<Future<bool>(size_t, const exception_wrapper&)>
1200 retryingPolicyCappedJitteredExponentialBackoff(
1201     size_t max_tries,
1202     Duration backoff_min,
1203     Duration backoff_max,
1204     double jitter_param,
1205     URNG rng,
1206     Policy&& p) {
1207   auto pm = makeMoveWrapper(std::move(p));
1208   auto rngp = std::make_shared<URNG>(std::move(rng));
1209   return [=](size_t n, const exception_wrapper& ex) mutable {
1210     if (n == max_tries) { return makeFuture(false); }
1211     return (*pm)(n, ex).then([=](bool v) {
1212         if (!v) { return makeFuture(false); }
1213         auto backoff = detail::retryingJitteredExponentialBackoffDur(
1214             n, backoff_min, backoff_max, jitter_param, *rngp);
1215         return futures::sleep(backoff).then([] { return true; });
1216     });
1217   };
1218 }
1219
1220 template <class Policy, class URNG>
1221 std::function<Future<bool>(size_t, const exception_wrapper&)>
1222 retryingPolicyCappedJitteredExponentialBackoff(
1223     size_t max_tries,
1224     Duration backoff_min,
1225     Duration backoff_max,
1226     double jitter_param,
1227     URNG rng,
1228     Policy&& p,
1229     retrying_policy_raw_tag) {
1230   auto pm = makeMoveWrapper(std::move(p));
1231   auto q = [=](size_t n, const exception_wrapper& e) {
1232     return makeFuture((*pm)(n, e));
1233   };
1234   return retryingPolicyCappedJitteredExponentialBackoff(
1235       max_tries,
1236       backoff_min,
1237       backoff_max,
1238       jitter_param,
1239       std::move(rng),
1240       std::move(q));
1241 }
1242
1243 template <class Policy, class URNG>
1244 std::function<Future<bool>(size_t, const exception_wrapper&)>
1245 retryingPolicyCappedJitteredExponentialBackoff(
1246     size_t max_tries,
1247     Duration backoff_min,
1248     Duration backoff_max,
1249     double jitter_param,
1250     URNG rng,
1251     Policy&& p,
1252     retrying_policy_fut_tag) {
1253   return retryingPolicyCappedJitteredExponentialBackoff(
1254       max_tries,
1255       backoff_min,
1256       backoff_max,
1257       jitter_param,
1258       std::move(rng),
1259       std::move(p));
1260 }
1261
1262 }
1263
1264 template <class Policy, class FF>
1265 typename std::result_of<FF(size_t)>::type
1266 retrying(Policy&& p, FF&& ff) {
1267   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1268   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1269 }
1270
1271 inline
1272 std::function<bool(size_t, const exception_wrapper&)>
1273 retryingPolicyBasic(
1274     size_t max_tries) {
1275   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1276 }
1277
1278 template <class Policy, class URNG>
1279 std::function<Future<bool>(size_t, const exception_wrapper&)>
1280 retryingPolicyCappedJitteredExponentialBackoff(
1281     size_t max_tries,
1282     Duration backoff_min,
1283     Duration backoff_max,
1284     double jitter_param,
1285     URNG rng,
1286     Policy&& p) {
1287   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1288   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1289       max_tries,
1290       backoff_min,
1291       backoff_max,
1292       jitter_param,
1293       std::move(rng),
1294       std::move(p),
1295       tag());
1296 }
1297
1298 inline
1299 std::function<Future<bool>(size_t, const exception_wrapper&)>
1300 retryingPolicyCappedJitteredExponentialBackoff(
1301     size_t max_tries,
1302     Duration backoff_min,
1303     Duration backoff_max,
1304     double jitter_param) {
1305   auto p = [](size_t, const exception_wrapper&) { return true; };
1306   return retryingPolicyCappedJitteredExponentialBackoff(
1307       max_tries,
1308       backoff_min,
1309       backoff_max,
1310       jitter_param,
1311       ThreadLocalPRNG(),
1312       std::move(p));
1313 }
1314
1315 }
1316
1317 // Instantiate the most common Future types to save compile time
1318 extern template class Future<Unit>;
1319 extern template class Future<bool>;
1320 extern template class Future<int>;
1321 extern template class Future<int64_t>;
1322 extern template class Future<std::string>;
1323 extern template class Future<double>;
1324
1325 } // namespace folly