(Wangle) Fix typo
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
26
27 namespace folly {
28
29 class Timekeeper;
30
31 namespace detail {
32   Timekeeper* getTimekeeperSingleton();
33 }
34
35 template <class T>
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37   other.core_ = nullptr;
38 }
39
40 template <class T>
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42   std::swap(core_, other.core_);
43   return *this;
44 }
45
46 template <class T>
47 template <class T2, typename>
48 Future<T>::Future(T2&& val) : core_(nullptr) {
49   Promise<T> p;
50   p.setValue(std::forward<T2>(val));
51   *this = p.getFuture();
52 }
53
54 template <class T>
55 template <class T2,
56           typename std::enable_if<
57             folly::is_void_or_unit<T2>::value,
58             int>::type>
59 Future<T>::Future() : core_(nullptr) {
60   Promise<T> p;
61   p.setValue();
62   *this = p.getFuture();
63 }
64
65
66 template <class T>
67 Future<T>::~Future() {
68   detach();
69 }
70
71 template <class T>
72 void Future<T>::detach() {
73   if (core_) {
74     core_->detachFuture();
75     core_ = nullptr;
76   }
77 }
78
79 template <class T>
80 void Future<T>::throwIfInvalid() const {
81   if (!core_)
82     throw NoState();
83 }
84
85 template <class T>
86 template <class F>
87 void Future<T>::setCallback_(F&& func) {
88   throwIfInvalid();
89   core_->setCallback(std::move(func));
90 }
91
92 // unwrap
93
94 template <class T>
95 template <class F>
96 typename std::enable_if<isFuture<F>::value,
97                         Future<typename isFuture<T>::Inner>>::type
98 Future<T>::unwrap() {
99   return then([](Future<typename isFuture<T>::Inner> internal_future) {
100       return internal_future;
101   });
102 }
103
104 // then
105
106 // Variant: returns a value
107 // e.g. f.then([](Try<T>&& t){ return t.value(); });
108 template <class T>
109 template <typename F, typename R, bool isTry, typename... Args>
110 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
111 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
112   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
113   typedef typename R::ReturnsFuture::Inner B;
114
115   throwIfInvalid();
116
117   // wrap these so we can move them into the lambda
118   folly::MoveWrapper<Promise<B>> p;
119   folly::MoveWrapper<F> funcm(std::forward<F>(func));
120
121   // grab the Future now before we lose our handle on the Promise
122   auto f = p->getFuture();
123   if (getExecutor()) {
124     f.setExecutor(getExecutor());
125   }
126
127   /* This is a bit tricky.
128
129      We can't just close over *this in case this Future gets moved. So we
130      make a new dummy Future. We could figure out something more
131      sophisticated that avoids making a new Future object when it can, as an
132      optimization. But this is correct.
133
134      core_ can't be moved, it is explicitly disallowed (as is copying). But
135      if there's ever a reason to allow it, this is one place that makes that
136      assumption and would need to be fixed. We use a standard shared pointer
137      for core_ (by copying it in), which means in essence obj holds a shared
138      pointer to itself.  But this shouldn't leak because Promise will not
139      outlive the continuation, because Promise will setException() with a
140      broken Promise if it is destructed before completed. We could use a
141      weak pointer but it would have to be converted to a shared pointer when
142      func is executed (because the Future returned by func may possibly
143      persist beyond the callback, if it gets moved), and so it is an
144      optimization to just make it shared from the get-go.
145
146      We have to move in the Promise and func using the MoveWrapper
147      hack. (func could be copied but it's a big drag on perf).
148
149      Two subtle but important points about this design. detail::Core has no
150      back pointers to Future or Promise, so if Future or Promise get moved
151      (and they will be moved in performant code) we don't have to do
152      anything fancy. And because we store the continuation in the
153      detail::Core, not in the Future, we can execute the continuation even
154      after the Future has gone out of scope. This is an intentional design
155      decision. It is likely we will want to be able to cancel a continuation
156      in some circumstances, but I think it should be explicit not implicit
157      in the destruction of the Future used to create it.
158      */
159   setCallback_(
160     [p, funcm](Try<T>&& t) mutable {
161       if (!isTry && t.hasException()) {
162         p->setException(std::move(t.exception()));
163       } else {
164         p->setWith([&]() {
165           return (*funcm)(t.template get<isTry, Args>()...);
166         });
167       }
168     });
169
170   return f;
171 }
172
173 // Variant: returns a Future
174 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
175 template <class T>
176 template <typename F, typename R, bool isTry, typename... Args>
177 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
178 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
179   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
180   typedef typename R::ReturnsFuture::Inner B;
181
182   throwIfInvalid();
183
184   // wrap these so we can move them into the lambda
185   folly::MoveWrapper<Promise<B>> p;
186   folly::MoveWrapper<F> funcm(std::forward<F>(func));
187
188   // grab the Future now before we lose our handle on the Promise
189   auto f = p->getFuture();
190   if (getExecutor()) {
191     f.setExecutor(getExecutor());
192   }
193
194   setCallback_(
195     [p, funcm](Try<T>&& t) mutable {
196       if (!isTry && t.hasException()) {
197         p->setException(std::move(t.exception()));
198       } else {
199         try {
200           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
201           // that didn't throw, now we can steal p
202           f2.setCallback_([p](Try<B>&& b) mutable {
203             p->setTry(std::move(b));
204           });
205         } catch (const std::exception& e) {
206           p->setException(exception_wrapper(std::current_exception(), e));
207         } catch (...) {
208           p->setException(exception_wrapper(std::current_exception()));
209         }
210       }
211     });
212
213   return f;
214 }
215
216 template <typename T>
217 template <typename R, typename Caller, typename... Args>
218   Future<typename isFuture<R>::Inner>
219 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
220   typedef typename std::remove_cv<
221     typename std::remove_reference<
222       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
223   return then([instance, func](Try<T>&& t){
224     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
225   });
226 }
227
228 template <class T>
229 template <class Executor, class Arg, class... Args>
230 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
231   -> decltype(this->then(std::forward<Arg>(arg),
232                          std::forward<Args>(args)...))
233 {
234   auto oldX = getExecutor();
235   setExecutor(x);
236   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
237                via(oldX);
238 }
239
240 template <class T>
241 Future<void> Future<T>::then() {
242   return then([] (Try<T>&& t) {});
243 }
244
245 // onError where the callback returns T
246 template <class T>
247 template <class F>
248 typename std::enable_if<
249   !detail::callableWith<F, exception_wrapper>::value &&
250   !detail::Extract<F>::ReturnsFuture::value,
251   Future<T>>::type
252 Future<T>::onError(F&& func) {
253   typedef typename detail::Extract<F>::FirstArg Exn;
254   static_assert(
255       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
256       "Return type of onError callback must be T or Future<T>");
257
258   Promise<T> p;
259   auto f = p.getFuture();
260   auto pm = folly::makeMoveWrapper(std::move(p));
261   auto funcm = folly::makeMoveWrapper(std::move(func));
262   setCallback_([pm, funcm](Try<T>&& t) mutable {
263     if (!t.template withException<Exn>([&] (Exn& e) {
264           pm->setWith([&]{
265             return (*funcm)(e);
266           });
267         })) {
268       pm->setTry(std::move(t));
269     }
270   });
271
272   return f;
273 }
274
275 // onError where the callback returns Future<T>
276 template <class T>
277 template <class F>
278 typename std::enable_if<
279   !detail::callableWith<F, exception_wrapper>::value &&
280   detail::Extract<F>::ReturnsFuture::value,
281   Future<T>>::type
282 Future<T>::onError(F&& func) {
283   static_assert(
284       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
285       "Return type of onError callback must be T or Future<T>");
286   typedef typename detail::Extract<F>::FirstArg Exn;
287
288   Promise<T> p;
289   auto f = p.getFuture();
290   auto pm = folly::makeMoveWrapper(std::move(p));
291   auto funcm = folly::makeMoveWrapper(std::move(func));
292   setCallback_([pm, funcm](Try<T>&& t) mutable {
293     if (!t.template withException<Exn>([&] (Exn& e) {
294           try {
295             auto f2 = (*funcm)(e);
296             f2.setCallback_([pm](Try<T>&& t2) mutable {
297               pm->setTry(std::move(t2));
298             });
299           } catch (const std::exception& e2) {
300             pm->setException(exception_wrapper(std::current_exception(), e2));
301           } catch (...) {
302             pm->setException(exception_wrapper(std::current_exception()));
303           }
304         })) {
305       pm->setTry(std::move(t));
306     }
307   });
308
309   return f;
310 }
311
312 template <class T>
313 template <class F>
314 Future<T> Future<T>::ensure(F func) {
315   MoveWrapper<F> funcw(std::move(func));
316   return this->then([funcw](Try<T>&& t) {
317     (*funcw)();
318     return makeFuture(std::move(t));
319   });
320 }
321
322 template <class T>
323 template <class F>
324 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
325   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
326   return within(dur, tk)
327     .onError([funcw](TimedOut const&) { return (*funcw)(); });
328 }
329
330 template <class T>
331 template <class F>
332 typename std::enable_if<
333   detail::callableWith<F, exception_wrapper>::value &&
334   detail::Extract<F>::ReturnsFuture::value,
335   Future<T>>::type
336 Future<T>::onError(F&& func) {
337   static_assert(
338       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
339       "Return type of onError callback must be T or Future<T>");
340
341   Promise<T> p;
342   auto f = p.getFuture();
343   auto pm = folly::makeMoveWrapper(std::move(p));
344   auto funcm = folly::makeMoveWrapper(std::move(func));
345   setCallback_([pm, funcm](Try<T> t) mutable {
346     if (t.hasException()) {
347       try {
348         auto f2 = (*funcm)(std::move(t.exception()));
349         f2.setCallback_([pm](Try<T> t2) mutable {
350           pm->setTry(std::move(t2));
351         });
352       } catch (const std::exception& e2) {
353         pm->setException(exception_wrapper(std::current_exception(), e2));
354       } catch (...) {
355         pm->setException(exception_wrapper(std::current_exception()));
356       }
357     } else {
358       pm->setTry(std::move(t));
359     }
360   });
361
362   return f;
363 }
364
365 // onError(exception_wrapper) that returns T
366 template <class T>
367 template <class F>
368 typename std::enable_if<
369   detail::callableWith<F, exception_wrapper>::value &&
370   !detail::Extract<F>::ReturnsFuture::value,
371   Future<T>>::type
372 Future<T>::onError(F&& func) {
373   static_assert(
374       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
375       "Return type of onError callback must be T or Future<T>");
376
377   Promise<T> p;
378   auto f = p.getFuture();
379   auto pm = folly::makeMoveWrapper(std::move(p));
380   auto funcm = folly::makeMoveWrapper(std::move(func));
381   setCallback_([pm, funcm](Try<T> t) mutable {
382     if (t.hasException()) {
383       pm->setWith([&]{
384         return (*funcm)(std::move(t.exception()));
385       });
386     } else {
387       pm->setTry(std::move(t));
388     }
389   });
390
391   return f;
392 }
393
394 template <class T>
395 typename std::add_lvalue_reference<T>::type Future<T>::value() {
396   throwIfInvalid();
397
398   return core_->getTry().value();
399 }
400
401 template <class T>
402 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
403   throwIfInvalid();
404
405   return core_->getTry().value();
406 }
407
408 template <class T>
409 Try<T>& Future<T>::getTry() {
410   throwIfInvalid();
411
412   return core_->getTry();
413 }
414
415 template <class T>
416 Optional<Try<T>> Future<T>::poll() {
417   Optional<Try<T>> o;
418   if (core_->ready()) {
419     o = std::move(core_->getTry());
420   }
421   return o;
422 }
423
424 template <class T>
425 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
426   throwIfInvalid();
427
428   setExecutor(executor, priority);
429
430   return std::move(*this);
431 }
432
433 template <class T>
434 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
435   throwIfInvalid();
436
437   MoveWrapper<Promise<T>> p;
438   auto f = p->getFuture();
439   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
440   return std::move(f).via(executor, priority);
441 }
442
443 template <class T>
444 bool Future<T>::isReady() const {
445   throwIfInvalid();
446   return core_->ready();
447 }
448
449 template <class T>
450 void Future<T>::raise(exception_wrapper exception) {
451   core_->raise(std::move(exception));
452 }
453
454 // makeFuture
455
456 template <class T>
457 Future<typename std::decay<T>::type> makeFuture(T&& t) {
458   Promise<typename std::decay<T>::type> p;
459   p.setValue(std::forward<T>(t));
460   return p.getFuture();
461 }
462
463 inline // for multiple translation units
464 Future<void> makeFuture() {
465   Promise<void> p;
466   p.setValue();
467   return p.getFuture();
468 }
469
470 template <class F>
471 auto makeFutureWith(
472     F&& func,
473     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
474     -> Future<decltype(func())> {
475   Promise<decltype(func())> p;
476   p.setWith(
477     [&func]() {
478       return (func)();
479     });
480   return p.getFuture();
481 }
482
483 template <class F>
484 auto makeFutureWith(F const& func) -> Future<decltype(func())> {
485   F copy = func;
486   return makeFutureWith(std::move(copy));
487 }
488
489 template <class T>
490 Future<T> makeFuture(std::exception_ptr const& e) {
491   Promise<T> p;
492   p.setException(e);
493   return p.getFuture();
494 }
495
496 template <class T>
497 Future<T> makeFuture(exception_wrapper ew) {
498   Promise<T> p;
499   p.setException(std::move(ew));
500   return p.getFuture();
501 }
502
503 template <class T, class E>
504 typename std::enable_if<std::is_base_of<std::exception, E>::value,
505                         Future<T>>::type
506 makeFuture(E const& e) {
507   Promise<T> p;
508   p.setException(make_exception_wrapper<E>(e));
509   return p.getFuture();
510 }
511
512 template <class T>
513 Future<T> makeFuture(Try<T>&& t) {
514   Promise<typename std::decay<T>::type> p;
515   p.setTry(std::move(t));
516   return p.getFuture();
517 }
518
519 template <>
520 inline Future<void> makeFuture(Try<void>&& t) {
521   if (t.hasException()) {
522     return makeFuture<void>(std::move(t.exception()));
523   } else {
524     return makeFuture();
525   }
526 }
527
528 // via
529 Future<void> via(Executor* executor, int8_t priority) {
530   return makeFuture().via(executor, priority);
531 }
532
533 // mapSetCallback calls func(i, Try<T>) when every future completes
534
535 template <class T, class InputIterator, class F>
536 void mapSetCallback(InputIterator first, InputIterator last, F func) {
537   for (size_t i = 0; first != last; ++first, ++i) {
538     first->setCallback_([func, i](Try<T>&& t) {
539       func(i, std::move(t));
540     });
541   }
542 }
543
544 // collectAll (variadic)
545
546 template <typename... Fs>
547 typename detail::VariadicContext<
548   typename std::decay<Fs>::type::value_type...>::type
549 collectAll(Fs&&... fs) {
550   auto ctx = std::make_shared<detail::VariadicContext<
551     typename std::decay<Fs>::type::value_type...>>();
552   detail::collectAllVariadicHelper(ctx,
553     std::forward<typename std::decay<Fs>::type>(fs)...);
554   return ctx->p.getFuture();
555 }
556
557 // collectAll (iterator)
558
559 template <class InputIterator>
560 Future<
561   std::vector<
562   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
563 collectAll(InputIterator first, InputIterator last) {
564   typedef
565     typename std::iterator_traits<InputIterator>::value_type::value_type T;
566
567   struct CollectAllContext {
568     CollectAllContext(int n) : results(n) {}
569     ~CollectAllContext() {
570       p.setValue(std::move(results));
571     }
572     Promise<std::vector<Try<T>>> p;
573     std::vector<Try<T>> results;
574   };
575
576   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
577   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
578     ctx->results[i] = std::move(t);
579   });
580   return ctx->p.getFuture();
581 }
582
583 namespace detail {
584
585 template <typename T>
586 struct CollectContext {
587   struct Nothing { explicit Nothing(int n) {} };
588
589   using Result = typename std::conditional<
590     std::is_void<T>::value,
591     void,
592     std::vector<T>>::type;
593
594   using InternalResult = typename std::conditional<
595     std::is_void<T>::value,
596     Nothing,
597     std::vector<Optional<T>>>::type;
598
599   explicit CollectContext(int n) : result(n) {}
600   ~CollectContext() {
601     if (!threw.exchange(true)) {
602       // map Optional<T> -> T
603       std::vector<T> finalResult;
604       finalResult.reserve(result.size());
605       std::transform(result.begin(), result.end(),
606                      std::back_inserter(finalResult),
607                      [](Optional<T>& o) { return std::move(o.value()); });
608       p.setValue(std::move(finalResult));
609     }
610   }
611   inline void setPartialResult(size_t i, Try<T>& t) {
612     result[i] = std::move(t.value());
613   }
614   Promise<Result> p;
615   InternalResult result;
616   std::atomic<bool> threw;
617 };
618
619 // Specialize for void (implementations in Future.cpp)
620
621 template <>
622 CollectContext<void>::~CollectContext();
623
624 template <>
625 void CollectContext<void>::setPartialResult(size_t i, Try<void>& t);
626
627 }
628
629 template <class InputIterator>
630 Future<typename detail::CollectContext<
631   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
632 collect(InputIterator first, InputIterator last) {
633   typedef
634     typename std::iterator_traits<InputIterator>::value_type::value_type T;
635
636   auto ctx = std::make_shared<detail::CollectContext<T>>(
637     std::distance(first, last));
638   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
639     if (t.hasException()) {
640        if (!ctx->threw.exchange(true)) {
641          ctx->p.setException(std::move(t.exception()));
642        }
643      } else if (!ctx->threw) {
644        ctx->setPartialResult(i, t);
645      }
646   });
647   return ctx->p.getFuture();
648 }
649
650 template <class InputIterator>
651 Future<
652   std::pair<size_t,
653             Try<
654               typename
655               std::iterator_traits<InputIterator>::value_type::value_type>>>
656 collectAny(InputIterator first, InputIterator last) {
657   typedef
658     typename std::iterator_traits<InputIterator>::value_type::value_type T;
659
660   struct CollectAnyContext {
661     CollectAnyContext(size_t n) : done(false) {};
662     Promise<std::pair<size_t, Try<T>>> p;
663     std::atomic<bool> done;
664   };
665
666   auto ctx = std::make_shared<CollectAnyContext>(std::distance(first, last));
667   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
668     if (!ctx->done.exchange(true)) {
669       ctx->p.setValue(std::make_pair(i, std::move(t)));
670     }
671   });
672   return ctx->p.getFuture();
673 }
674
675 template <class InputIterator>
676 Future<std::vector<std::pair<size_t, Try<typename
677   std::iterator_traits<InputIterator>::value_type::value_type>>>>
678 collectN(InputIterator first, InputIterator last, size_t n) {
679   typedef typename
680     std::iterator_traits<InputIterator>::value_type::value_type T;
681   typedef std::vector<std::pair<size_t, Try<T>>> V;
682
683   struct CollectNContext {
684     V v;
685     std::atomic<size_t> completed = {0};
686     Promise<V> p;
687   };
688   auto ctx = std::make_shared<CollectNContext>();
689
690   if (std::distance(first, last) < n) {
691     ctx->p.setException(std::runtime_error("Not enough futures"));
692   } else {
693     // for each completed Future, increase count and add to vector, until we
694     // have n completed futures at which point we fulfil our Promise with the
695     // vector
696     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
697       auto c = ++ctx->completed;
698       if (c <= n) {
699         assert(ctx->v.size() < n);
700         ctx->v.push_back(std::make_pair(i, std::move(t)));
701         if (c == n) {
702           ctx->p.setTry(Try<V>(std::move(ctx->v)));
703         }
704       }
705     });
706   }
707
708   return ctx->p.getFuture();
709 }
710
711 template <class It, class T, class F>
712 Future<T> reduce(It first, It last, T&& initial, F&& func) {
713   if (first == last) {
714     return makeFuture(std::move(initial));
715   }
716
717   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
718   typedef typename std::conditional<
719     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
720   typedef isTry<Arg> IsTry;
721
722   folly::MoveWrapper<T> minitial(std::move(initial));
723   auto sfunc = std::make_shared<F>(std::move(func));
724
725   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
726     return (*sfunc)(std::move(*minitial),
727                 head.template get<IsTry::value, Arg&&>());
728   });
729
730   for (++first; first != last; ++first) {
731     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
732       return (*sfunc)(std::move(std::get<0>(t).value()),
733                   // Either return a ItT&& or a Try<ItT>&& depending
734                   // on the type of the argument of func.
735                   std::get<1>(t).template get<IsTry::value, Arg&&>());
736     });
737   }
738
739   return f;
740 }
741
742 template <class Collection, class F, class ItT, class Result>
743 std::vector<Future<Result>>
744 window(Collection input, F func, size_t n) {
745   struct WindowContext {
746     WindowContext(Collection&& i, F&& fn)
747         : i_(0), input_(std::move(i)), promises_(input_.size()),
748           func_(std::move(fn))
749       {}
750     std::atomic<size_t> i_;
751     Collection input_;
752     std::vector<Promise<Result>> promises_;
753     F func_;
754
755     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
756       size_t i = ctx->i_++;
757       if (i < ctx->input_.size()) {
758         // Using setCallback_ directly since we don't need the Future
759         ctx->func_(std::move(ctx->input_[i])).setCallback_(
760           // ctx is captured by value
761           [ctx, i](Try<Result>&& t) {
762             ctx->promises_[i].setTry(std::move(t));
763             // Chain another future onto this one
764             spawn(std::move(ctx));
765           });
766       }
767     }
768   };
769
770   auto max = std::min(n, input.size());
771
772   auto ctx = std::make_shared<WindowContext>(
773     std::move(input), std::move(func));
774
775   for (size_t i = 0; i < max; ++i) {
776     // Start the first n Futures
777     WindowContext::spawn(ctx);
778   }
779
780   std::vector<Future<Result>> futures;
781   futures.reserve(ctx->promises_.size());
782   for (auto& promise : ctx->promises_) {
783     futures.emplace_back(promise.getFuture());
784   }
785
786   return futures;
787 }
788
789 template <class T>
790 template <class I, class F>
791 Future<I> Future<T>::reduce(I&& initial, F&& func) {
792   folly::MoveWrapper<I> minitial(std::move(initial));
793   folly::MoveWrapper<F> mfunc(std::move(func));
794   return then([minitial, mfunc](T& vals) mutable {
795     auto ret = std::move(*minitial);
796     for (auto& val : vals) {
797       ret = (*mfunc)(std::move(ret), std::move(val));
798     }
799     return ret;
800   });
801 }
802
803 template <class It, class T, class F, class ItT, class Arg>
804 Future<T> unorderedReduce(It first, It last, T initial, F func) {
805   if (first == last) {
806     return makeFuture(std::move(initial));
807   }
808
809   typedef isTry<Arg> IsTry;
810
811   struct UnorderedReduceContext {
812     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
813         : lock_(), memo_(makeFuture<T>(std::move(memo))),
814           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
815       {};
816     folly::MicroSpinLock lock_; // protects memo_ and numThens_
817     Future<T> memo_;
818     F func_;
819     size_t numThens_; // how many Futures completed and called .then()
820     size_t numFutures_; // how many Futures in total
821     Promise<T> promise_;
822   };
823
824   auto ctx = std::make_shared<UnorderedReduceContext>(
825     std::move(initial), std::move(func), std::distance(first, last));
826
827   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
828     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
829     // Futures can be completed in any order, simultaneously.
830     // To make this non-blocking, we create a new Future chain in
831     // the order of completion to reduce the values.
832     // The spinlock just protects chaining a new Future, not actually
833     // executing the reduce, which should be really fast.
834     folly::MSLGuard lock(ctx->lock_);
835     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
836       // Either return a ItT&& or a Try<ItT>&& depending
837       // on the type of the argument of func.
838       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
839     });
840     if (++ctx->numThens_ == ctx->numFutures_) {
841       // After reducing the value of the last Future, fulfill the Promise
842       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
843         ctx->promise_.setValue(std::move(t2));
844       });
845     }
846   });
847
848   return ctx->promise_.getFuture();
849 }
850
851 template <class T>
852 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
853   return within(dur, TimedOut(), tk);
854 }
855
856 template <class T>
857 template <class E>
858 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
859
860   struct Context {
861     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
862     E exception;
863     Promise<T> promise;
864     std::atomic<bool> token;
865   };
866   auto ctx = std::make_shared<Context>(std::move(e));
867
868   if (!tk) {
869     tk = folly::detail::getTimekeeperSingleton();
870   }
871
872   tk->after(dur)
873     .then([ctx](Try<void> const& t) {
874       if (ctx->token.exchange(true) == false) {
875         if (t.hasException()) {
876           ctx->promise.setException(std::move(t.exception()));
877         } else {
878           ctx->promise.setException(std::move(ctx->exception));
879         }
880       }
881     });
882
883   this->then([ctx](Try<T>&& t) {
884     if (ctx->token.exchange(true) == false) {
885       ctx->promise.setTry(std::move(t));
886     }
887   });
888
889   return ctx->promise.getFuture();
890 }
891
892 template <class T>
893 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
894   return collectAll(*this, futures::sleep(dur, tk))
895     .then([](std::tuple<Try<T>, Try<void>> tup) {
896       Try<T>& t = std::get<0>(tup);
897       return makeFuture<T>(std::move(t));
898     });
899 }
900
901 namespace detail {
902
903 template <class T>
904 void waitImpl(Future<T>& f) {
905   // short-circuit if there's nothing to do
906   if (f.isReady()) return;
907
908   folly::fibers::Baton baton;
909   f = f.then([&](Try<T> t) {
910     baton.post();
911     return makeFuture(std::move(t));
912   });
913   baton.wait();
914
915   // There's a race here between the return here and the actual finishing of
916   // the future. f is completed, but the setup may not have finished on done
917   // after the baton has posted.
918   while (!f.isReady()) {
919     std::this_thread::yield();
920   }
921 }
922
923 template <class T>
924 void waitImpl(Future<T>& f, Duration dur) {
925   // short-circuit if there's nothing to do
926   if (f.isReady()) return;
927
928   auto baton = std::make_shared<folly::fibers::Baton>();
929   f = f.then([baton](Try<T> t) {
930     baton->post();
931     return makeFuture(std::move(t));
932   });
933
934   // Let's preserve the invariant that if we did not timeout (timed_wait returns
935   // true), then the returned Future is complete when it is returned to the
936   // caller. We need to wait out the race for that Future to complete.
937   if (baton->timed_wait(dur)) {
938     while (!f.isReady()) {
939       std::this_thread::yield();
940     }
941   }
942 }
943
944 template <class T>
945 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
946   while (!f.isReady()) {
947     e->drive();
948   }
949 }
950
951 } // detail
952
953 template <class T>
954 Future<T>& Future<T>::wait() & {
955   detail::waitImpl(*this);
956   return *this;
957 }
958
959 template <class T>
960 Future<T>&& Future<T>::wait() && {
961   detail::waitImpl(*this);
962   return std::move(*this);
963 }
964
965 template <class T>
966 Future<T>& Future<T>::wait(Duration dur) & {
967   detail::waitImpl(*this, dur);
968   return *this;
969 }
970
971 template <class T>
972 Future<T>&& Future<T>::wait(Duration dur) && {
973   detail::waitImpl(*this, dur);
974   return std::move(*this);
975 }
976
977 template <class T>
978 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
979   detail::waitViaImpl(*this, e);
980   return *this;
981 }
982
983 template <class T>
984 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
985   detail::waitViaImpl(*this, e);
986   return std::move(*this);
987 }
988
989 template <class T>
990 T Future<T>::get() {
991   return std::move(wait().value());
992 }
993
994 template <>
995 inline void Future<void>::get() {
996   wait().value();
997 }
998
999 template <class T>
1000 T Future<T>::get(Duration dur) {
1001   wait(dur);
1002   if (isReady()) {
1003     return std::move(value());
1004   } else {
1005     throw TimedOut();
1006   }
1007 }
1008
1009 template <>
1010 inline void Future<void>::get(Duration dur) {
1011   wait(dur);
1012   if (isReady()) {
1013     return;
1014   } else {
1015     throw TimedOut();
1016   }
1017 }
1018
1019 template <class T>
1020 T Future<T>::getVia(DrivableExecutor* e) {
1021   return std::move(waitVia(e).value());
1022 }
1023
1024 template <>
1025 inline void Future<void>::getVia(DrivableExecutor* e) {
1026   waitVia(e).value();
1027 }
1028
1029 namespace detail {
1030   template <class T>
1031   struct TryEquals {
1032     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1033       return t1.value() == t2.value();
1034     }
1035   };
1036
1037   template <>
1038   struct TryEquals<void> {
1039     static bool equals(const Try<void>& t1, const Try<void>& t2) {
1040       return true;
1041     }
1042   };
1043 }
1044
1045 template <class T>
1046 Future<bool> Future<T>::willEqual(Future<T>& f) {
1047   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1048     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1049       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1050     } else {
1051       return false;
1052       }
1053   });
1054 }
1055
1056 template <class T>
1057 template <class F>
1058 Future<T> Future<T>::filter(F predicate) {
1059   auto p = folly::makeMoveWrapper(std::move(predicate));
1060   return this->then([p](T val) {
1061     T const& valConstRef = val;
1062     if (!(*p)(valConstRef)) {
1063       throw PredicateDoesNotObtain();
1064     }
1065     return val;
1066   });
1067 }
1068
1069 template <class T>
1070 template <class Callback>
1071 auto Future<T>::thenMulti(Callback&& fn)
1072     -> decltype(this->then(std::forward<Callback>(fn))) {
1073   // thenMulti with one callback is just a then
1074   return then(std::forward<Callback>(fn));
1075 }
1076
1077 template <class T>
1078 template <class Callback, class... Callbacks>
1079 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1080     -> decltype(this->then(std::forward<Callback>(fn)).
1081                       thenMulti(std::forward<Callbacks>(fns)...)) {
1082   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1083   return then(std::forward<Callback>(fn)).
1084          thenMulti(std::forward<Callbacks>(fns)...);
1085 }
1086
1087 template <class T>
1088 template <class Callback, class... Callbacks>
1089 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1090                                       Callbacks&&... fns)
1091     -> decltype(this->then(std::forward<Callback>(fn)).
1092                       thenMulti(std::forward<Callbacks>(fns)...)) {
1093   // thenMultiExecutor with two callbacks is
1094   // via(x).then(a).thenMulti(b, ...).via(oldX)
1095   auto oldX = getExecutor();
1096   setExecutor(x);
1097   return then(std::forward<Callback>(fn)).
1098          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1099 }
1100
1101 template <class T>
1102 template <class Callback>
1103 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1104     -> decltype(this->then(std::forward<Callback>(fn))) {
1105   // thenMulti with one callback is just a then with an executor
1106   return then(x, std::forward<Callback>(fn));
1107 }
1108
1109 namespace futures {
1110   template <class It, class F, class ItT, class Result>
1111   std::vector<Future<Result>> map(It first, It last, F func) {
1112     std::vector<Future<Result>> results;
1113     for (auto it = first; it != last; it++) {
1114       results.push_back(it->then(func));
1115     }
1116     return results;
1117   }
1118 }
1119
1120 // Instantiate the most common Future types to save compile time
1121 extern template class Future<void>;
1122 extern template class Future<bool>;
1123 extern template class Future<int>;
1124 extern template class Future<int64_t>;
1125 extern template class Future<std::string>;
1126 extern template class Future<double>;
1127
1128 } // namespace folly
1129
1130 // I haven't included a Future<T&> specialization because I don't forsee us
1131 // using it, however it is not difficult to add when needed. Refer to
1132 // Future<void> for guidance. std::future and boost::future code would also be
1133 // instructive.