(Wangle) chain -> thenMulti + thenMultiWithExecutor
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
26
27 namespace folly {
28
29 class Timekeeper;
30
31 namespace detail {
32   Timekeeper* getTimekeeperSingleton();
33 }
34
35 template <class T>
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37   other.core_ = nullptr;
38 }
39
40 template <class T>
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42   std::swap(core_, other.core_);
43   return *this;
44 }
45
46 template <class T>
47 template <class T2, typename>
48 Future<T>::Future(T2&& val) : core_(nullptr) {
49   Promise<T> p;
50   p.setValue(std::forward<T2>(val));
51   *this = p.getFuture();
52 }
53
54 template <class T>
55 template <class T2,
56           typename std::enable_if<
57             folly::is_void_or_unit<T2>::value,
58             int>::type>
59 Future<T>::Future() : core_(nullptr) {
60   Promise<T> p;
61   p.setValue();
62   *this = p.getFuture();
63 }
64
65
66 template <class T>
67 Future<T>::~Future() {
68   detach();
69 }
70
71 template <class T>
72 void Future<T>::detach() {
73   if (core_) {
74     core_->detachFuture();
75     core_ = nullptr;
76   }
77 }
78
79 template <class T>
80 void Future<T>::throwIfInvalid() const {
81   if (!core_)
82     throw NoState();
83 }
84
85 template <class T>
86 template <class F>
87 void Future<T>::setCallback_(F&& func) {
88   throwIfInvalid();
89   core_->setCallback(std::move(func));
90 }
91
92 // unwrap
93
94 template <class T>
95 template <class F>
96 typename std::enable_if<isFuture<F>::value,
97                         Future<typename isFuture<T>::Inner>>::type
98 Future<T>::unwrap() {
99   return then([](Future<typename isFuture<T>::Inner> internal_future) {
100       return internal_future;
101   });
102 }
103
104 // then
105
106 // Variant: returns a value
107 // e.g. f.then([](Try<T>&& t){ return t.value(); });
108 template <class T>
109 template <typename F, typename R, bool isTry, typename... Args>
110 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
111 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
112   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
113   typedef typename R::ReturnsFuture::Inner B;
114
115   throwIfInvalid();
116
117   // wrap these so we can move them into the lambda
118   folly::MoveWrapper<Promise<B>> p;
119   folly::MoveWrapper<F> funcm(std::forward<F>(func));
120
121   // grab the Future now before we lose our handle on the Promise
122   auto f = p->getFuture();
123   if (getExecutor()) {
124     f.setExecutor(getExecutor());
125   }
126
127   /* This is a bit tricky.
128
129      We can't just close over *this in case this Future gets moved. So we
130      make a new dummy Future. We could figure out something more
131      sophisticated that avoids making a new Future object when it can, as an
132      optimization. But this is correct.
133
134      core_ can't be moved, it is explicitly disallowed (as is copying). But
135      if there's ever a reason to allow it, this is one place that makes that
136      assumption and would need to be fixed. We use a standard shared pointer
137      for core_ (by copying it in), which means in essence obj holds a shared
138      pointer to itself.  But this shouldn't leak because Promise will not
139      outlive the continuation, because Promise will setException() with a
140      broken Promise if it is destructed before completed. We could use a
141      weak pointer but it would have to be converted to a shared pointer when
142      func is executed (because the Future returned by func may possibly
143      persist beyond the callback, if it gets moved), and so it is an
144      optimization to just make it shared from the get-go.
145
146      We have to move in the Promise and func using the MoveWrapper
147      hack. (func could be copied but it's a big drag on perf).
148
149      Two subtle but important points about this design. detail::Core has no
150      back pointers to Future or Promise, so if Future or Promise get moved
151      (and they will be moved in performant code) we don't have to do
152      anything fancy. And because we store the continuation in the
153      detail::Core, not in the Future, we can execute the continuation even
154      after the Future has gone out of scope. This is an intentional design
155      decision. It is likely we will want to be able to cancel a continuation
156      in some circumstances, but I think it should be explicit not implicit
157      in the destruction of the Future used to create it.
158      */
159   setCallback_(
160     [p, funcm](Try<T>&& t) mutable {
161       if (!isTry && t.hasException()) {
162         p->setException(std::move(t.exception()));
163       } else {
164         p->setWith([&]() {
165           return (*funcm)(t.template get<isTry, Args>()...);
166         });
167       }
168     });
169
170   return f;
171 }
172
173 // Variant: returns a Future
174 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
175 template <class T>
176 template <typename F, typename R, bool isTry, typename... Args>
177 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
178 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
179   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
180   typedef typename R::ReturnsFuture::Inner B;
181
182   throwIfInvalid();
183
184   // wrap these so we can move them into the lambda
185   folly::MoveWrapper<Promise<B>> p;
186   folly::MoveWrapper<F> funcm(std::forward<F>(func));
187
188   // grab the Future now before we lose our handle on the Promise
189   auto f = p->getFuture();
190   if (getExecutor()) {
191     f.setExecutor(getExecutor());
192   }
193
194   setCallback_(
195     [p, funcm](Try<T>&& t) mutable {
196       if (!isTry && t.hasException()) {
197         p->setException(std::move(t.exception()));
198       } else {
199         try {
200           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
201           // that didn't throw, now we can steal p
202           f2.setCallback_([p](Try<B>&& b) mutable {
203             p->setTry(std::move(b));
204           });
205         } catch (const std::exception& e) {
206           p->setException(exception_wrapper(std::current_exception(), e));
207         } catch (...) {
208           p->setException(exception_wrapper(std::current_exception()));
209         }
210       }
211     });
212
213   return f;
214 }
215
216 template <typename T>
217 template <typename R, typename Caller, typename... Args>
218   Future<typename isFuture<R>::Inner>
219 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
220   typedef typename std::remove_cv<
221     typename std::remove_reference<
222       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
223   return then([instance, func](Try<T>&& t){
224     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
225   });
226 }
227
228 template <class T>
229 template <class Executor, class Arg, class... Args>
230 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
231   -> decltype(this->then(std::forward<Arg>(arg),
232                          std::forward<Args>(args)...))
233 {
234   auto oldX = getExecutor();
235   setExecutor(x);
236   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
237                via(oldX);
238 }
239
240 template <class T>
241 Future<void> Future<T>::then() {
242   return then([] (Try<T>&& t) {});
243 }
244
245 // onError where the callback returns T
246 template <class T>
247 template <class F>
248 typename std::enable_if<
249   !detail::callableWith<F, exception_wrapper>::value &&
250   !detail::Extract<F>::ReturnsFuture::value,
251   Future<T>>::type
252 Future<T>::onError(F&& func) {
253   typedef typename detail::Extract<F>::FirstArg Exn;
254   static_assert(
255       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
256       "Return type of onError callback must be T or Future<T>");
257
258   Promise<T> p;
259   auto f = p.getFuture();
260   auto pm = folly::makeMoveWrapper(std::move(p));
261   auto funcm = folly::makeMoveWrapper(std::move(func));
262   setCallback_([pm, funcm](Try<T>&& t) mutable {
263     if (!t.template withException<Exn>([&] (Exn& e) {
264           pm->setWith([&]{
265             return (*funcm)(e);
266           });
267         })) {
268       pm->setTry(std::move(t));
269     }
270   });
271
272   return f;
273 }
274
275 // onError where the callback returns Future<T>
276 template <class T>
277 template <class F>
278 typename std::enable_if<
279   !detail::callableWith<F, exception_wrapper>::value &&
280   detail::Extract<F>::ReturnsFuture::value,
281   Future<T>>::type
282 Future<T>::onError(F&& func) {
283   static_assert(
284       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
285       "Return type of onError callback must be T or Future<T>");
286   typedef typename detail::Extract<F>::FirstArg Exn;
287
288   Promise<T> p;
289   auto f = p.getFuture();
290   auto pm = folly::makeMoveWrapper(std::move(p));
291   auto funcm = folly::makeMoveWrapper(std::move(func));
292   setCallback_([pm, funcm](Try<T>&& t) mutable {
293     if (!t.template withException<Exn>([&] (Exn& e) {
294           try {
295             auto f2 = (*funcm)(e);
296             f2.setCallback_([pm](Try<T>&& t2) mutable {
297               pm->setTry(std::move(t2));
298             });
299           } catch (const std::exception& e2) {
300             pm->setException(exception_wrapper(std::current_exception(), e2));
301           } catch (...) {
302             pm->setException(exception_wrapper(std::current_exception()));
303           }
304         })) {
305       pm->setTry(std::move(t));
306     }
307   });
308
309   return f;
310 }
311
312 template <class T>
313 template <class F>
314 Future<T> Future<T>::ensure(F func) {
315   MoveWrapper<F> funcw(std::move(func));
316   return this->then([funcw](Try<T>&& t) {
317     (*funcw)();
318     return makeFuture(std::move(t));
319   });
320 }
321
322 template <class T>
323 template <class F>
324 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
325   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
326   return within(dur, tk)
327     .onError([funcw](TimedOut const&) { return (*funcw)(); });
328 }
329
330 template <class T>
331 template <class F>
332 typename std::enable_if<
333   detail::callableWith<F, exception_wrapper>::value &&
334   detail::Extract<F>::ReturnsFuture::value,
335   Future<T>>::type
336 Future<T>::onError(F&& func) {
337   static_assert(
338       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
339       "Return type of onError callback must be T or Future<T>");
340
341   Promise<T> p;
342   auto f = p.getFuture();
343   auto pm = folly::makeMoveWrapper(std::move(p));
344   auto funcm = folly::makeMoveWrapper(std::move(func));
345   setCallback_([pm, funcm](Try<T> t) mutable {
346     if (t.hasException()) {
347       try {
348         auto f2 = (*funcm)(std::move(t.exception()));
349         f2.setCallback_([pm](Try<T> t2) mutable {
350           pm->setTry(std::move(t2));
351         });
352       } catch (const std::exception& e2) {
353         pm->setException(exception_wrapper(std::current_exception(), e2));
354       } catch (...) {
355         pm->setException(exception_wrapper(std::current_exception()));
356       }
357     } else {
358       pm->setTry(std::move(t));
359     }
360   });
361
362   return f;
363 }
364
365 // onError(exception_wrapper) that returns T
366 template <class T>
367 template <class F>
368 typename std::enable_if<
369   detail::callableWith<F, exception_wrapper>::value &&
370   !detail::Extract<F>::ReturnsFuture::value,
371   Future<T>>::type
372 Future<T>::onError(F&& func) {
373   static_assert(
374       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
375       "Return type of onError callback must be T or Future<T>");
376
377   Promise<T> p;
378   auto f = p.getFuture();
379   auto pm = folly::makeMoveWrapper(std::move(p));
380   auto funcm = folly::makeMoveWrapper(std::move(func));
381   setCallback_([pm, funcm](Try<T> t) mutable {
382     if (t.hasException()) {
383       pm->setWith([&]{
384         return (*funcm)(std::move(t.exception()));
385       });
386     } else {
387       pm->setTry(std::move(t));
388     }
389   });
390
391   return f;
392 }
393
394 template <class T>
395 typename std::add_lvalue_reference<T>::type Future<T>::value() {
396   throwIfInvalid();
397
398   return core_->getTry().value();
399 }
400
401 template <class T>
402 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
403   throwIfInvalid();
404
405   return core_->getTry().value();
406 }
407
408 template <class T>
409 Try<T>& Future<T>::getTry() {
410   throwIfInvalid();
411
412   return core_->getTry();
413 }
414
415 template <class T>
416 Optional<Try<T>> Future<T>::poll() {
417   Optional<Try<T>> o;
418   if (core_->ready()) {
419     o = std::move(core_->getTry());
420   }
421   return o;
422 }
423
424 template <class T>
425 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
426   throwIfInvalid();
427
428   setExecutor(executor, priority);
429
430   return std::move(*this);
431 }
432
433 template <class T>
434 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
435   throwIfInvalid();
436
437   MoveWrapper<Promise<T>> p;
438   auto f = p->getFuture();
439   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
440   return std::move(f).via(executor, priority);
441 }
442
443 template <class T>
444 bool Future<T>::isReady() const {
445   throwIfInvalid();
446   return core_->ready();
447 }
448
449 template <class T>
450 void Future<T>::raise(exception_wrapper exception) {
451   core_->raise(std::move(exception));
452 }
453
454 // makeFuture
455
456 template <class T>
457 Future<typename std::decay<T>::type> makeFuture(T&& t) {
458   Promise<typename std::decay<T>::type> p;
459   p.setValue(std::forward<T>(t));
460   return p.getFuture();
461 }
462
463 inline // for multiple translation units
464 Future<void> makeFuture() {
465   Promise<void> p;
466   p.setValue();
467   return p.getFuture();
468 }
469
470 template <class F>
471 auto makeFutureWith(
472     F&& func,
473     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
474     -> Future<decltype(func())> {
475   Promise<decltype(func())> p;
476   p.setWith(
477     [&func]() {
478       return (func)();
479     });
480   return p.getFuture();
481 }
482
483 template <class F>
484 auto makeFutureWith(F const& func) -> Future<decltype(func())> {
485   F copy = func;
486   return makeFutureWith(std::move(copy));
487 }
488
489 template <class T>
490 Future<T> makeFuture(std::exception_ptr const& e) {
491   Promise<T> p;
492   p.setException(e);
493   return p.getFuture();
494 }
495
496 template <class T>
497 Future<T> makeFuture(exception_wrapper ew) {
498   Promise<T> p;
499   p.setException(std::move(ew));
500   return p.getFuture();
501 }
502
503 template <class T, class E>
504 typename std::enable_if<std::is_base_of<std::exception, E>::value,
505                         Future<T>>::type
506 makeFuture(E const& e) {
507   Promise<T> p;
508   p.setException(make_exception_wrapper<E>(e));
509   return p.getFuture();
510 }
511
512 template <class T>
513 Future<T> makeFuture(Try<T>&& t) {
514   Promise<typename std::decay<T>::type> p;
515   p.setTry(std::move(t));
516   return p.getFuture();
517 }
518
519 template <>
520 inline Future<void> makeFuture(Try<void>&& t) {
521   if (t.hasException()) {
522     return makeFuture<void>(std::move(t.exception()));
523   } else {
524     return makeFuture();
525   }
526 }
527
528 // via
529 Future<void> via(Executor* executor, int8_t priority) {
530   return makeFuture().via(executor, priority);
531 }
532
533 // mapSetCallback calls func(i, Try<T>) when every future completes
534
535 template <class T, class InputIterator, class F>
536 void mapSetCallback(InputIterator first, InputIterator last, F func) {
537   for (size_t i = 0; first != last; ++first, ++i) {
538     first->setCallback_([func, i](Try<T>&& t) {
539       func(i, std::move(t));
540     });
541   }
542 }
543
544 // collectAll (variadic)
545
546 template <typename... Fs>
547 typename detail::VariadicContext<
548   typename std::decay<Fs>::type::value_type...>::type
549 collectAll(Fs&&... fs) {
550   auto ctx = std::make_shared<detail::VariadicContext<
551     typename std::decay<Fs>::type::value_type...>>();
552   detail::collectAllVariadicHelper(ctx,
553     std::forward<typename std::decay<Fs>::type>(fs)...);
554   return ctx->p.getFuture();
555 }
556
557 // collectAll (iterator)
558
559 template <class InputIterator>
560 Future<
561   std::vector<
562   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
563 collectAll(InputIterator first, InputIterator last) {
564   typedef
565     typename std::iterator_traits<InputIterator>::value_type::value_type T;
566
567   struct CollectAllContext {
568     CollectAllContext(int n) : results(n) {}
569     ~CollectAllContext() {
570       p.setValue(std::move(results));
571     }
572     Promise<std::vector<Try<T>>> p;
573     std::vector<Try<T>> results;
574   };
575
576   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
577   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
578     ctx->results[i] = std::move(t);
579   });
580   return ctx->p.getFuture();
581 }
582
583 namespace detail {
584
585 template <typename T>
586 struct CollectContext {
587   struct Nothing { explicit Nothing(int n) {} };
588
589   using Result = typename std::conditional<
590     std::is_void<T>::value,
591     void,
592     std::vector<T>>::type;
593
594   using InternalResult = typename std::conditional<
595     std::is_void<T>::value,
596     Nothing,
597     std::vector<Optional<T>>>::type;
598
599   explicit CollectContext(int n) : result(n) {}
600   ~CollectContext() {
601     if (!threw.exchange(true)) {
602       // map Optional<T> -> T
603       std::vector<T> finalResult;
604       finalResult.reserve(result.size());
605       std::transform(result.begin(), result.end(),
606                      std::back_inserter(finalResult),
607                      [](Optional<T>& o) { return std::move(o.value()); });
608       p.setValue(std::move(finalResult));
609     }
610   }
611   inline void setPartialResult(size_t i, Try<T>& t) {
612     result[i] = std::move(t.value());
613   }
614   Promise<Result> p;
615   InternalResult result;
616   std::atomic<bool> threw;
617 };
618
619 // Specialize for void (implementations in Future.cpp)
620
621 template <>
622 CollectContext<void>::~CollectContext();
623
624 template <>
625 void CollectContext<void>::setPartialResult(size_t i, Try<void>& t);
626
627 }
628
629 template <class InputIterator>
630 Future<typename detail::CollectContext<
631   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
632 collect(InputIterator first, InputIterator last) {
633   typedef
634     typename std::iterator_traits<InputIterator>::value_type::value_type T;
635
636   auto ctx = std::make_shared<detail::CollectContext<T>>(
637     std::distance(first, last));
638   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
639     if (t.hasException()) {
640        if (!ctx->threw.exchange(true)) {
641          ctx->p.setException(std::move(t.exception()));
642        }
643      } else if (!ctx->threw) {
644        ctx->setPartialResult(i, t);
645      }
646   });
647   return ctx->p.getFuture();
648 }
649
650 template <class InputIterator>
651 Future<
652   std::pair<size_t,
653             Try<
654               typename
655               std::iterator_traits<InputIterator>::value_type::value_type>>>
656 collectAny(InputIterator first, InputIterator last) {
657   typedef
658     typename std::iterator_traits<InputIterator>::value_type::value_type T;
659
660   struct CollectAnyContext {
661     CollectAnyContext(size_t n) : done(false) {};
662     Promise<std::pair<size_t, Try<T>>> p;
663     std::atomic<bool> done;
664   };
665
666   auto ctx = std::make_shared<CollectAnyContext>(std::distance(first, last));
667   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
668     if (!ctx->done.exchange(true)) {
669       ctx->p.setValue(std::make_pair(i, std::move(t)));
670     }
671   });
672   return ctx->p.getFuture();
673 }
674
675 template <class InputIterator>
676 Future<std::vector<std::pair<size_t, Try<typename
677   std::iterator_traits<InputIterator>::value_type::value_type>>>>
678 collectN(InputIterator first, InputIterator last, size_t n) {
679   typedef typename
680     std::iterator_traits<InputIterator>::value_type::value_type T;
681   typedef std::vector<std::pair<size_t, Try<T>>> V;
682
683   struct CollectNContext {
684     V v;
685     std::atomic<size_t> completed = {0};
686     Promise<V> p;
687   };
688   auto ctx = std::make_shared<CollectNContext>();
689
690   if (std::distance(first, last) < n) {
691     ctx->p.setException(std::runtime_error("Not enough futures"));
692   } else {
693     // for each completed Future, increase count and add to vector, until we
694     // have n completed futures at which point we fulfil our Promise with the
695     // vector
696     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
697       auto c = ++ctx->completed;
698       if (c <= n) {
699         assert(ctx->v.size() < n);
700         ctx->v.push_back(std::make_pair(i, std::move(t)));
701         if (c == n) {
702           ctx->p.setTry(Try<V>(std::move(ctx->v)));
703         }
704       }
705     });
706   }
707
708   return ctx->p.getFuture();
709 }
710
711 template <class It, class T, class F>
712 Future<T> reduce(It first, It last, T&& initial, F&& func) {
713   if (first == last) {
714     return makeFuture(std::move(initial));
715   }
716
717   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
718   typedef typename std::conditional<
719     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
720   typedef isTry<Arg> IsTry;
721
722   folly::MoveWrapper<T> minitial(std::move(initial));
723   auto sfunc = std::make_shared<F>(std::move(func));
724
725   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
726     return (*sfunc)(std::move(*minitial),
727                 head.template get<IsTry::value, Arg&&>());
728   });
729
730   for (++first; first != last; ++first) {
731     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
732       return (*sfunc)(std::move(std::get<0>(t).value()),
733                   // Either return a ItT&& or a Try<ItT>&& depending
734                   // on the type of the argument of func.
735                   std::get<1>(t).template get<IsTry::value, Arg&&>());
736     });
737   }
738
739   return f;
740 }
741
742 template <class Collection, class F, class ItT, class Result>
743 std::vector<Future<Result>>
744 window(Collection input, F func, size_t n) {
745   struct WindowContext {
746     WindowContext(Collection&& i, F&& fn)
747         : i_(0), input_(std::move(i)), promises_(input_.size()),
748           func_(std::move(fn))
749       {}
750     std::atomic<size_t> i_;
751     Collection input_;
752     std::vector<Promise<Result>> promises_;
753     F func_;
754
755     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
756       size_t i = ctx->i_++;
757       if (i < ctx->input_.size()) {
758         // Using setCallback_ directly since we don't need the Future
759         ctx->func_(std::move(ctx->input_[i])).setCallback_(
760           // ctx is captured by value
761           [ctx, i](Try<ItT>&& t) {
762             ctx->promises_[i].setTry(std::move(t));
763             // Chain another future onto this one
764             spawn(std::move(ctx));
765           });
766       }
767     }
768   };
769
770   auto max = std::min(n, input.size());
771
772   auto ctx = std::make_shared<WindowContext>(
773     std::move(input), std::move(func));
774
775   for (size_t i = 0; i < max; ++i) {
776     // Start the first n Futures
777     WindowContext::spawn(ctx);
778   }
779
780   std::vector<Future<Result>> futures;
781   futures.reserve(ctx->promises_.size());
782   for (auto& promise : ctx->promises_) {
783     futures.emplace_back(promise.getFuture());
784   }
785
786   return futures;
787 }
788
789 template <class T>
790 template <class I, class F>
791 Future<I> Future<T>::reduce(I&& initial, F&& func) {
792   folly::MoveWrapper<I> minitial(std::move(initial));
793   folly::MoveWrapper<F> mfunc(std::move(func));
794   return then([minitial, mfunc](T& vals) mutable {
795     auto ret = std::move(*minitial);
796     for (auto& val : vals) {
797       ret = (*mfunc)(std::move(ret), std::move(val));
798     }
799     return ret;
800   });
801 }
802
803 template <class T>
804 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
805   return within(dur, TimedOut(), tk);
806 }
807
808 template <class T>
809 template <class E>
810 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
811
812   struct Context {
813     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
814     E exception;
815     Promise<T> promise;
816     std::atomic<bool> token;
817   };
818   auto ctx = std::make_shared<Context>(std::move(e));
819
820   if (!tk) {
821     tk = folly::detail::getTimekeeperSingleton();
822   }
823
824   tk->after(dur)
825     .then([ctx](Try<void> const& t) {
826       if (ctx->token.exchange(true) == false) {
827         if (t.hasException()) {
828           ctx->promise.setException(std::move(t.exception()));
829         } else {
830           ctx->promise.setException(std::move(ctx->exception));
831         }
832       }
833     });
834
835   this->then([ctx](Try<T>&& t) {
836     if (ctx->token.exchange(true) == false) {
837       ctx->promise.setTry(std::move(t));
838     }
839   });
840
841   return ctx->promise.getFuture();
842 }
843
844 template <class T>
845 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
846   return collectAll(*this, futures::sleep(dur, tk))
847     .then([](std::tuple<Try<T>, Try<void>> tup) {
848       Try<T>& t = std::get<0>(tup);
849       return makeFuture<T>(std::move(t));
850     });
851 }
852
853 namespace detail {
854
855 template <class T>
856 void waitImpl(Future<T>& f) {
857   // short-circuit if there's nothing to do
858   if (f.isReady()) return;
859
860   folly::fibers::Baton baton;
861   f = f.then([&](Try<T> t) {
862     baton.post();
863     return makeFuture(std::move(t));
864   });
865   baton.wait();
866
867   // There's a race here between the return here and the actual finishing of
868   // the future. f is completed, but the setup may not have finished on done
869   // after the baton has posted.
870   while (!f.isReady()) {
871     std::this_thread::yield();
872   }
873 }
874
875 template <class T>
876 void waitImpl(Future<T>& f, Duration dur) {
877   // short-circuit if there's nothing to do
878   if (f.isReady()) return;
879
880   auto baton = std::make_shared<folly::fibers::Baton>();
881   f = f.then([baton](Try<T> t) {
882     baton->post();
883     return makeFuture(std::move(t));
884   });
885
886   // Let's preserve the invariant that if we did not timeout (timed_wait returns
887   // true), then the returned Future is complete when it is returned to the
888   // caller. We need to wait out the race for that Future to complete.
889   if (baton->timed_wait(dur)) {
890     while (!f.isReady()) {
891       std::this_thread::yield();
892     }
893   }
894 }
895
896 template <class T>
897 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
898   while (!f.isReady()) {
899     e->drive();
900   }
901 }
902
903 } // detail
904
905 template <class T>
906 Future<T>& Future<T>::wait() & {
907   detail::waitImpl(*this);
908   return *this;
909 }
910
911 template <class T>
912 Future<T>&& Future<T>::wait() && {
913   detail::waitImpl(*this);
914   return std::move(*this);
915 }
916
917 template <class T>
918 Future<T>& Future<T>::wait(Duration dur) & {
919   detail::waitImpl(*this, dur);
920   return *this;
921 }
922
923 template <class T>
924 Future<T>&& Future<T>::wait(Duration dur) && {
925   detail::waitImpl(*this, dur);
926   return std::move(*this);
927 }
928
929 template <class T>
930 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
931   detail::waitViaImpl(*this, e);
932   return *this;
933 }
934
935 template <class T>
936 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
937   detail::waitViaImpl(*this, e);
938   return std::move(*this);
939 }
940
941 template <class T>
942 T Future<T>::get() {
943   return std::move(wait().value());
944 }
945
946 template <>
947 inline void Future<void>::get() {
948   wait().value();
949 }
950
951 template <class T>
952 T Future<T>::get(Duration dur) {
953   wait(dur);
954   if (isReady()) {
955     return std::move(value());
956   } else {
957     throw TimedOut();
958   }
959 }
960
961 template <>
962 inline void Future<void>::get(Duration dur) {
963   wait(dur);
964   if (isReady()) {
965     return;
966   } else {
967     throw TimedOut();
968   }
969 }
970
971 template <class T>
972 T Future<T>::getVia(DrivableExecutor* e) {
973   return std::move(waitVia(e).value());
974 }
975
976 template <>
977 inline void Future<void>::getVia(DrivableExecutor* e) {
978   waitVia(e).value();
979 }
980
981 namespace detail {
982   template <class T>
983   struct TryEquals {
984     static bool equals(const Try<T>& t1, const Try<T>& t2) {
985       return t1.value() == t2.value();
986     }
987   };
988
989   template <>
990   struct TryEquals<void> {
991     static bool equals(const Try<void>& t1, const Try<void>& t2) {
992       return true;
993     }
994   };
995 }
996
997 template <class T>
998 Future<bool> Future<T>::willEqual(Future<T>& f) {
999   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1000     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1001       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1002     } else {
1003       return false;
1004       }
1005   });
1006 }
1007
1008 template <class T>
1009 template <class F>
1010 Future<T> Future<T>::filter(F predicate) {
1011   auto p = folly::makeMoveWrapper(std::move(predicate));
1012   return this->then([p](T val) {
1013     T const& valConstRef = val;
1014     if (!(*p)(valConstRef)) {
1015       throw PredicateDoesNotObtain();
1016     }
1017     return val;
1018   });
1019 }
1020
1021 template <class T>
1022 template <class Callback>
1023 auto Future<T>::thenMulti(Callback&& fn)
1024     -> decltype(this->then(std::forward<Callback>(fn))) {
1025   // thenMulti with one callback is just a then
1026   return then(std::forward<Callback>(fn));
1027 }
1028
1029 template <class T>
1030 template <class Callback, class... Callbacks>
1031 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1032     -> decltype(this->then(std::forward<Callback>(fn)).
1033                       thenMulti(std::forward<Callbacks>(fns)...)) {
1034   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1035   return then(std::forward<Callback>(fn)).
1036          thenMulti(std::forward<Callbacks>(fns)...);
1037 }
1038
1039 template <class T>
1040 template <class Callback, class... Callbacks>
1041 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1042                                       Callbacks&&... fns)
1043     -> decltype(this->then(std::forward<Callback>(fn)).
1044                       thenMulti(std::forward<Callbacks>(fns)...)) {
1045   // thenMultiExecutor with two callbacks is
1046   // via(x).then(a).thenMulti(b, ...).via(oldX)
1047   auto oldX = getExecutor();
1048   setExecutor(x);
1049   return then(std::forward<Callback>(fn)).
1050          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1051 }
1052
1053 template <class T>
1054 template <class Callback>
1055 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1056     -> decltype(this->then(std::forward<Callback>(fn))) {
1057   // thenMulti with one callback is just a then with an executor
1058   return then(x, std::forward<Callback>(fn));
1059 }
1060
1061 namespace futures {
1062   template <class It, class F, class ItT, class Result>
1063   std::vector<Future<Result>> map(It first, It last, F func) {
1064     std::vector<Future<Result>> results;
1065     for (auto it = first; it != last; it++) {
1066       results.push_back(it->then(func));
1067     }
1068     return results;
1069   }
1070 }
1071
1072 // Instantiate the most common Future types to save compile time
1073 extern template class Future<void>;
1074 extern template class Future<bool>;
1075 extern template class Future<int>;
1076 extern template class Future<int64_t>;
1077 extern template class Future<std::string>;
1078 extern template class Future<double>;
1079
1080 } // namespace folly
1081
1082 // I haven't included a Future<T&> specialization because I don't forsee us
1083 // using it, however it is not difficult to add when needed. Refer to
1084 // Future<void> for guidance. std::future and boost::future code would also be
1085 // instructive.