4b2c7477675cb22c94c4ef3a530b84f603dbf28b
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
26
27 namespace folly {
28
29 class Timekeeper;
30
31 namespace detail {
32   Timekeeper* getTimekeeperSingleton();
33 }
34
35 template <class T>
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37   other.core_ = nullptr;
38 }
39
40 template <class T>
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42   std::swap(core_, other.core_);
43   return *this;
44 }
45
46 template <class T>
47 template <class T2, typename>
48 Future<T>::Future(T2&& val) : core_(nullptr) {
49   Promise<T> p;
50   p.setValue(std::forward<T2>(val));
51   *this = p.getFuture();
52 }
53
54 template <class T>
55 template <class T2,
56           typename std::enable_if<
57             folly::is_void_or_unit<T2>::value,
58             int>::type>
59 Future<T>::Future() : core_(nullptr) {
60   Promise<T> p;
61   p.setValue();
62   *this = p.getFuture();
63 }
64
65
66 template <class T>
67 Future<T>::~Future() {
68   detach();
69 }
70
71 template <class T>
72 void Future<T>::detach() {
73   if (core_) {
74     core_->detachFuture();
75     core_ = nullptr;
76   }
77 }
78
79 template <class T>
80 void Future<T>::throwIfInvalid() const {
81   if (!core_)
82     throw NoState();
83 }
84
85 template <class T>
86 template <class F>
87 void Future<T>::setCallback_(F&& func) {
88   throwIfInvalid();
89   core_->setCallback(std::move(func));
90 }
91
92 // unwrap
93
94 template <class T>
95 template <class F>
96 typename std::enable_if<isFuture<F>::value,
97                         Future<typename isFuture<T>::Inner>>::type
98 Future<T>::unwrap() {
99   return then([](Future<typename isFuture<T>::Inner> internal_future) {
100       return internal_future;
101   });
102 }
103
104 // then
105
106 // Variant: returns a value
107 // e.g. f.then([](Try<T>&& t){ return t.value(); });
108 template <class T>
109 template <typename F, typename R, bool isTry, typename... Args>
110 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
111 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
112   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
113   typedef typename R::ReturnsFuture::Inner B;
114
115   throwIfInvalid();
116
117   // wrap these so we can move them into the lambda
118   folly::MoveWrapper<Promise<B>> p;
119   p->setInterruptHandler(core_->getInterruptHandler());
120   folly::MoveWrapper<F> funcm(std::forward<F>(func));
121
122   // grab the Future now before we lose our handle on the Promise
123   auto f = p->getFuture();
124   if (getExecutor()) {
125     f.setExecutor(getExecutor());
126   }
127
128   /* This is a bit tricky.
129
130      We can't just close over *this in case this Future gets moved. So we
131      make a new dummy Future. We could figure out something more
132      sophisticated that avoids making a new Future object when it can, as an
133      optimization. But this is correct.
134
135      core_ can't be moved, it is explicitly disallowed (as is copying). But
136      if there's ever a reason to allow it, this is one place that makes that
137      assumption and would need to be fixed. We use a standard shared pointer
138      for core_ (by copying it in), which means in essence obj holds a shared
139      pointer to itself.  But this shouldn't leak because Promise will not
140      outlive the continuation, because Promise will setException() with a
141      broken Promise if it is destructed before completed. We could use a
142      weak pointer but it would have to be converted to a shared pointer when
143      func is executed (because the Future returned by func may possibly
144      persist beyond the callback, if it gets moved), and so it is an
145      optimization to just make it shared from the get-go.
146
147      We have to move in the Promise and func using the MoveWrapper
148      hack. (func could be copied but it's a big drag on perf).
149
150      Two subtle but important points about this design. detail::Core has no
151      back pointers to Future or Promise, so if Future or Promise get moved
152      (and they will be moved in performant code) we don't have to do
153      anything fancy. And because we store the continuation in the
154      detail::Core, not in the Future, we can execute the continuation even
155      after the Future has gone out of scope. This is an intentional design
156      decision. It is likely we will want to be able to cancel a continuation
157      in some circumstances, but I think it should be explicit not implicit
158      in the destruction of the Future used to create it.
159      */
160   setCallback_(
161     [p, funcm](Try<T>&& t) mutable {
162       if (!isTry && t.hasException()) {
163         p->setException(std::move(t.exception()));
164       } else {
165         p->setWith([&]() {
166           return (*funcm)(t.template get<isTry, Args>()...);
167         });
168       }
169     });
170
171   return f;
172 }
173
174 // Variant: returns a Future
175 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
176 template <class T>
177 template <typename F, typename R, bool isTry, typename... Args>
178 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
179 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
180   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
181   typedef typename R::ReturnsFuture::Inner B;
182
183   throwIfInvalid();
184
185   // wrap these so we can move them into the lambda
186   folly::MoveWrapper<Promise<B>> p;
187   folly::MoveWrapper<F> funcm(std::forward<F>(func));
188
189   // grab the Future now before we lose our handle on the Promise
190   auto f = p->getFuture();
191   if (getExecutor()) {
192     f.setExecutor(getExecutor());
193   }
194
195   setCallback_(
196     [p, funcm](Try<T>&& t) mutable {
197       if (!isTry && t.hasException()) {
198         p->setException(std::move(t.exception()));
199       } else {
200         try {
201           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
202           // that didn't throw, now we can steal p
203           f2.setCallback_([p](Try<B>&& b) mutable {
204             p->setTry(std::move(b));
205           });
206         } catch (const std::exception& e) {
207           p->setException(exception_wrapper(std::current_exception(), e));
208         } catch (...) {
209           p->setException(exception_wrapper(std::current_exception()));
210         }
211       }
212     });
213
214   return f;
215 }
216
217 template <typename T>
218 template <typename R, typename Caller, typename... Args>
219   Future<typename isFuture<R>::Inner>
220 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
221   typedef typename std::remove_cv<
222     typename std::remove_reference<
223       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
224   return then([instance, func](Try<T>&& t){
225     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
226   });
227 }
228
229 template <class T>
230 template <class Executor, class Arg, class... Args>
231 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
232   -> decltype(this->then(std::forward<Arg>(arg),
233                          std::forward<Args>(args)...))
234 {
235   auto oldX = getExecutor();
236   setExecutor(x);
237   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
238                via(oldX);
239 }
240
241 template <class T>
242 Future<void> Future<T>::then() {
243   return then([] (Try<T>&& t) {});
244 }
245
246 // onError where the callback returns T
247 template <class T>
248 template <class F>
249 typename std::enable_if<
250   !detail::callableWith<F, exception_wrapper>::value &&
251   !detail::Extract<F>::ReturnsFuture::value,
252   Future<T>>::type
253 Future<T>::onError(F&& func) {
254   typedef typename detail::Extract<F>::FirstArg Exn;
255   static_assert(
256       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
257       "Return type of onError callback must be T or Future<T>");
258
259   Promise<T> p;
260   auto f = p.getFuture();
261   auto pm = folly::makeMoveWrapper(std::move(p));
262   auto funcm = folly::makeMoveWrapper(std::move(func));
263   setCallback_([pm, funcm](Try<T>&& t) mutable {
264     if (!t.template withException<Exn>([&] (Exn& e) {
265           pm->setWith([&]{
266             return (*funcm)(e);
267           });
268         })) {
269       pm->setTry(std::move(t));
270     }
271   });
272
273   return f;
274 }
275
276 // onError where the callback returns Future<T>
277 template <class T>
278 template <class F>
279 typename std::enable_if<
280   !detail::callableWith<F, exception_wrapper>::value &&
281   detail::Extract<F>::ReturnsFuture::value,
282   Future<T>>::type
283 Future<T>::onError(F&& func) {
284   static_assert(
285       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
286       "Return type of onError callback must be T or Future<T>");
287   typedef typename detail::Extract<F>::FirstArg Exn;
288
289   Promise<T> p;
290   auto f = p.getFuture();
291   auto pm = folly::makeMoveWrapper(std::move(p));
292   auto funcm = folly::makeMoveWrapper(std::move(func));
293   setCallback_([pm, funcm](Try<T>&& t) mutable {
294     if (!t.template withException<Exn>([&] (Exn& e) {
295           try {
296             auto f2 = (*funcm)(e);
297             f2.setCallback_([pm](Try<T>&& t2) mutable {
298               pm->setTry(std::move(t2));
299             });
300           } catch (const std::exception& e2) {
301             pm->setException(exception_wrapper(std::current_exception(), e2));
302           } catch (...) {
303             pm->setException(exception_wrapper(std::current_exception()));
304           }
305         })) {
306       pm->setTry(std::move(t));
307     }
308   });
309
310   return f;
311 }
312
313 template <class T>
314 template <class F>
315 Future<T> Future<T>::ensure(F func) {
316   MoveWrapper<F> funcw(std::move(func));
317   return this->then([funcw](Try<T>&& t) {
318     (*funcw)();
319     return makeFuture(std::move(t));
320   });
321 }
322
323 template <class T>
324 template <class F>
325 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
326   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
327   return within(dur, tk)
328     .onError([funcw](TimedOut const&) { return (*funcw)(); });
329 }
330
331 template <class T>
332 template <class F>
333 typename std::enable_if<
334   detail::callableWith<F, exception_wrapper>::value &&
335   detail::Extract<F>::ReturnsFuture::value,
336   Future<T>>::type
337 Future<T>::onError(F&& func) {
338   static_assert(
339       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
340       "Return type of onError callback must be T or Future<T>");
341
342   Promise<T> p;
343   auto f = p.getFuture();
344   auto pm = folly::makeMoveWrapper(std::move(p));
345   auto funcm = folly::makeMoveWrapper(std::move(func));
346   setCallback_([pm, funcm](Try<T> t) mutable {
347     if (t.hasException()) {
348       try {
349         auto f2 = (*funcm)(std::move(t.exception()));
350         f2.setCallback_([pm](Try<T> t2) mutable {
351           pm->setTry(std::move(t2));
352         });
353       } catch (const std::exception& e2) {
354         pm->setException(exception_wrapper(std::current_exception(), e2));
355       } catch (...) {
356         pm->setException(exception_wrapper(std::current_exception()));
357       }
358     } else {
359       pm->setTry(std::move(t));
360     }
361   });
362
363   return f;
364 }
365
366 // onError(exception_wrapper) that returns T
367 template <class T>
368 template <class F>
369 typename std::enable_if<
370   detail::callableWith<F, exception_wrapper>::value &&
371   !detail::Extract<F>::ReturnsFuture::value,
372   Future<T>>::type
373 Future<T>::onError(F&& func) {
374   static_assert(
375       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
376       "Return type of onError callback must be T or Future<T>");
377
378   Promise<T> p;
379   auto f = p.getFuture();
380   auto pm = folly::makeMoveWrapper(std::move(p));
381   auto funcm = folly::makeMoveWrapper(std::move(func));
382   setCallback_([pm, funcm](Try<T> t) mutable {
383     if (t.hasException()) {
384       pm->setWith([&]{
385         return (*funcm)(std::move(t.exception()));
386       });
387     } else {
388       pm->setTry(std::move(t));
389     }
390   });
391
392   return f;
393 }
394
395 template <class T>
396 typename std::add_lvalue_reference<T>::type Future<T>::value() {
397   throwIfInvalid();
398
399   return core_->getTry().value();
400 }
401
402 template <class T>
403 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
404   throwIfInvalid();
405
406   return core_->getTry().value();
407 }
408
409 template <class T>
410 Try<T>& Future<T>::getTry() {
411   throwIfInvalid();
412
413   return core_->getTry();
414 }
415
416 template <class T>
417 Optional<Try<T>> Future<T>::poll() {
418   Optional<Try<T>> o;
419   if (core_->ready()) {
420     o = std::move(core_->getTry());
421   }
422   return o;
423 }
424
425 template <class T>
426 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
427   throwIfInvalid();
428
429   setExecutor(executor, priority);
430
431   return std::move(*this);
432 }
433
434 template <class T>
435 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
436   throwIfInvalid();
437
438   MoveWrapper<Promise<T>> p;
439   auto f = p->getFuture();
440   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
441   return std::move(f).via(executor, priority);
442 }
443
444 template <class T>
445 bool Future<T>::isReady() const {
446   throwIfInvalid();
447   return core_->ready();
448 }
449
450 template <class T>
451 void Future<T>::raise(exception_wrapper exception) {
452   core_->raise(std::move(exception));
453 }
454
455 // makeFuture
456
457 template <class T>
458 Future<typename std::decay<T>::type> makeFuture(T&& t) {
459   Promise<typename std::decay<T>::type> p;
460   p.setValue(std::forward<T>(t));
461   return p.getFuture();
462 }
463
464 inline // for multiple translation units
465 Future<void> makeFuture() {
466   Promise<void> p;
467   p.setValue();
468   return p.getFuture();
469 }
470
471 template <class F>
472 auto makeFutureWith(
473     F&& func,
474     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
475     -> Future<decltype(func())> {
476   Promise<decltype(func())> p;
477   p.setWith(
478     [&func]() {
479       return (func)();
480     });
481   return p.getFuture();
482 }
483
484 template <class F>
485 auto makeFutureWith(F const& func) -> Future<decltype(func())> {
486   F copy = func;
487   return makeFutureWith(std::move(copy));
488 }
489
490 template <class T>
491 Future<T> makeFuture(std::exception_ptr const& e) {
492   Promise<T> p;
493   p.setException(e);
494   return p.getFuture();
495 }
496
497 template <class T>
498 Future<T> makeFuture(exception_wrapper ew) {
499   Promise<T> p;
500   p.setException(std::move(ew));
501   return p.getFuture();
502 }
503
504 template <class T, class E>
505 typename std::enable_if<std::is_base_of<std::exception, E>::value,
506                         Future<T>>::type
507 makeFuture(E const& e) {
508   Promise<T> p;
509   p.setException(make_exception_wrapper<E>(e));
510   return p.getFuture();
511 }
512
513 template <class T>
514 Future<T> makeFuture(Try<T>&& t) {
515   Promise<typename std::decay<T>::type> p;
516   p.setTry(std::move(t));
517   return p.getFuture();
518 }
519
520 template <>
521 inline Future<void> makeFuture(Try<void>&& t) {
522   if (t.hasException()) {
523     return makeFuture<void>(std::move(t.exception()));
524   } else {
525     return makeFuture();
526   }
527 }
528
529 // via
530 Future<void> via(Executor* executor, int8_t priority) {
531   return makeFuture().via(executor, priority);
532 }
533
534 // mapSetCallback calls func(i, Try<T>) when every future completes
535
536 template <class T, class InputIterator, class F>
537 void mapSetCallback(InputIterator first, InputIterator last, F func) {
538   for (size_t i = 0; first != last; ++first, ++i) {
539     first->setCallback_([func, i](Try<T>&& t) {
540       func(i, std::move(t));
541     });
542   }
543 }
544
545 // collectAll (variadic)
546
547 template <typename... Fs>
548 typename detail::CollectAllVariadicContext<
549   typename std::decay<Fs>::type::value_type...>::type
550 collectAll(Fs&&... fs) {
551   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
552     typename std::decay<Fs>::type::value_type...>>();
553   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
554     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
555   return ctx->p.getFuture();
556 }
557
558 // collectAll (iterator)
559
560 template <class InputIterator>
561 Future<
562   std::vector<
563   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
564 collectAll(InputIterator first, InputIterator last) {
565   typedef
566     typename std::iterator_traits<InputIterator>::value_type::value_type T;
567
568   struct CollectAllContext {
569     CollectAllContext(int n) : results(n) {}
570     ~CollectAllContext() {
571       p.setValue(std::move(results));
572     }
573     Promise<std::vector<Try<T>>> p;
574     std::vector<Try<T>> results;
575   };
576
577   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
578   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
579     ctx->results[i] = std::move(t);
580   });
581   return ctx->p.getFuture();
582 }
583
584 // collect (iterator)
585
586 namespace detail {
587
588 template <typename T>
589 struct CollectContext {
590   struct Nothing { explicit Nothing(int n) {} };
591
592   using Result = typename std::conditional<
593     std::is_void<T>::value,
594     void,
595     std::vector<T>>::type;
596
597   using InternalResult = typename std::conditional<
598     std::is_void<T>::value,
599     Nothing,
600     std::vector<Optional<T>>>::type;
601
602   explicit CollectContext(int n) : result(n) {}
603   ~CollectContext() {
604     if (!threw.exchange(true)) {
605       // map Optional<T> -> T
606       std::vector<T> finalResult;
607       finalResult.reserve(result.size());
608       std::transform(result.begin(), result.end(),
609                      std::back_inserter(finalResult),
610                      [](Optional<T>& o) { return std::move(o.value()); });
611       p.setValue(std::move(finalResult));
612     }
613   }
614   inline void setPartialResult(size_t i, Try<T>& t) {
615     result[i] = std::move(t.value());
616   }
617   Promise<Result> p;
618   InternalResult result;
619   std::atomic<bool> threw;
620 };
621
622 // Specialize for void (implementations in Future.cpp)
623
624 template <>
625 CollectContext<void>::~CollectContext();
626
627 template <>
628 void CollectContext<void>::setPartialResult(size_t i, Try<void>& t);
629
630 }
631
632 template <class InputIterator>
633 Future<typename detail::CollectContext<
634   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
635 collect(InputIterator first, InputIterator last) {
636   typedef
637     typename std::iterator_traits<InputIterator>::value_type::value_type T;
638
639   auto ctx = std::make_shared<detail::CollectContext<T>>(
640     std::distance(first, last));
641   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
642     if (t.hasException()) {
643        if (!ctx->threw.exchange(true)) {
644          ctx->p.setException(std::move(t.exception()));
645        }
646      } else if (!ctx->threw) {
647        ctx->setPartialResult(i, t);
648      }
649   });
650   return ctx->p.getFuture();
651 }
652
653 // collect (variadic)
654
655 template <typename... Fs>
656 typename detail::CollectVariadicContext<
657   typename std::decay<Fs>::type::value_type...>::type
658 collect(Fs&&... fs) {
659   auto ctx = std::make_shared<detail::CollectVariadicContext<
660     typename std::decay<Fs>::type::value_type...>>();
661   detail::collectVariadicHelper<detail::CollectVariadicContext>(
662     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
663   return ctx->p.getFuture();
664 }
665
666 // collectAny (iterator)
667
668 template <class InputIterator>
669 Future<
670   std::pair<size_t,
671             Try<
672               typename
673               std::iterator_traits<InputIterator>::value_type::value_type>>>
674 collectAny(InputIterator first, InputIterator last) {
675   typedef
676     typename std::iterator_traits<InputIterator>::value_type::value_type T;
677
678   struct CollectAnyContext {
679     CollectAnyContext(size_t n) : done(false) {};
680     Promise<std::pair<size_t, Try<T>>> p;
681     std::atomic<bool> done;
682   };
683
684   auto ctx = std::make_shared<CollectAnyContext>(std::distance(first, last));
685   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
686     if (!ctx->done.exchange(true)) {
687       ctx->p.setValue(std::make_pair(i, std::move(t)));
688     }
689   });
690   return ctx->p.getFuture();
691 }
692
693 // collectN (iterator)
694
695 template <class InputIterator>
696 Future<std::vector<std::pair<size_t, Try<typename
697   std::iterator_traits<InputIterator>::value_type::value_type>>>>
698 collectN(InputIterator first, InputIterator last, size_t n) {
699   typedef typename
700     std::iterator_traits<InputIterator>::value_type::value_type T;
701   typedef std::vector<std::pair<size_t, Try<T>>> V;
702
703   struct CollectNContext {
704     V v;
705     std::atomic<size_t> completed = {0};
706     Promise<V> p;
707   };
708   auto ctx = std::make_shared<CollectNContext>();
709
710   if (size_t(std::distance(first, last)) < n) {
711     ctx->p.setException(std::runtime_error("Not enough futures"));
712   } else {
713     // for each completed Future, increase count and add to vector, until we
714     // have n completed futures at which point we fulfil our Promise with the
715     // vector
716     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
717       auto c = ++ctx->completed;
718       if (c <= n) {
719         assert(ctx->v.size() < n);
720         ctx->v.push_back(std::make_pair(i, std::move(t)));
721         if (c == n) {
722           ctx->p.setTry(Try<V>(std::move(ctx->v)));
723         }
724       }
725     });
726   }
727
728   return ctx->p.getFuture();
729 }
730
731 // reduce (iterator)
732
733 template <class It, class T, class F>
734 Future<T> reduce(It first, It last, T&& initial, F&& func) {
735   if (first == last) {
736     return makeFuture(std::move(initial));
737   }
738
739   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
740   typedef typename std::conditional<
741     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
742   typedef isTry<Arg> IsTry;
743
744   folly::MoveWrapper<T> minitial(std::move(initial));
745   auto sfunc = std::make_shared<F>(std::move(func));
746
747   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
748     return (*sfunc)(std::move(*minitial),
749                 head.template get<IsTry::value, Arg&&>());
750   });
751
752   for (++first; first != last; ++first) {
753     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
754       return (*sfunc)(std::move(std::get<0>(t).value()),
755                   // Either return a ItT&& or a Try<ItT>&& depending
756                   // on the type of the argument of func.
757                   std::get<1>(t).template get<IsTry::value, Arg&&>());
758     });
759   }
760
761   return f;
762 }
763
764 // window (collection)
765
766 template <class Collection, class F, class ItT, class Result>
767 std::vector<Future<Result>>
768 window(Collection input, F func, size_t n) {
769   struct WindowContext {
770     WindowContext(Collection&& i, F&& fn)
771         : i_(0), input_(std::move(i)), promises_(input_.size()),
772           func_(std::move(fn))
773       {}
774     std::atomic<size_t> i_;
775     Collection input_;
776     std::vector<Promise<Result>> promises_;
777     F func_;
778
779     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
780       size_t i = ctx->i_++;
781       if (i < ctx->input_.size()) {
782         // Using setCallback_ directly since we don't need the Future
783         ctx->func_(std::move(ctx->input_[i])).setCallback_(
784           // ctx is captured by value
785           [ctx, i](Try<Result>&& t) {
786             ctx->promises_[i].setTry(std::move(t));
787             // Chain another future onto this one
788             spawn(std::move(ctx));
789           });
790       }
791     }
792   };
793
794   auto max = std::min(n, input.size());
795
796   auto ctx = std::make_shared<WindowContext>(
797     std::move(input), std::move(func));
798
799   for (size_t i = 0; i < max; ++i) {
800     // Start the first n Futures
801     WindowContext::spawn(ctx);
802   }
803
804   std::vector<Future<Result>> futures;
805   futures.reserve(ctx->promises_.size());
806   for (auto& promise : ctx->promises_) {
807     futures.emplace_back(promise.getFuture());
808   }
809
810   return futures;
811 }
812
813 // reduce
814
815 template <class T>
816 template <class I, class F>
817 Future<I> Future<T>::reduce(I&& initial, F&& func) {
818   folly::MoveWrapper<I> minitial(std::move(initial));
819   folly::MoveWrapper<F> mfunc(std::move(func));
820   return then([minitial, mfunc](T& vals) mutable {
821     auto ret = std::move(*minitial);
822     for (auto& val : vals) {
823       ret = (*mfunc)(std::move(ret), std::move(val));
824     }
825     return ret;
826   });
827 }
828
829 // unorderedReduce (iterator)
830
831 template <class It, class T, class F, class ItT, class Arg>
832 Future<T> unorderedReduce(It first, It last, T initial, F func) {
833   if (first == last) {
834     return makeFuture(std::move(initial));
835   }
836
837   typedef isTry<Arg> IsTry;
838
839   struct UnorderedReduceContext {
840     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
841         : lock_(), memo_(makeFuture<T>(std::move(memo))),
842           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
843       {};
844     folly::MicroSpinLock lock_; // protects memo_ and numThens_
845     Future<T> memo_;
846     F func_;
847     size_t numThens_; // how many Futures completed and called .then()
848     size_t numFutures_; // how many Futures in total
849     Promise<T> promise_;
850   };
851
852   auto ctx = std::make_shared<UnorderedReduceContext>(
853     std::move(initial), std::move(func), std::distance(first, last));
854
855   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
856     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
857     // Futures can be completed in any order, simultaneously.
858     // To make this non-blocking, we create a new Future chain in
859     // the order of completion to reduce the values.
860     // The spinlock just protects chaining a new Future, not actually
861     // executing the reduce, which should be really fast.
862     folly::MSLGuard lock(ctx->lock_);
863     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
864       // Either return a ItT&& or a Try<ItT>&& depending
865       // on the type of the argument of func.
866       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
867     });
868     if (++ctx->numThens_ == ctx->numFutures_) {
869       // After reducing the value of the last Future, fulfill the Promise
870       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
871         ctx->promise_.setValue(std::move(t2));
872       });
873     }
874   });
875
876   return ctx->promise_.getFuture();
877 }
878
879 // within
880
881 template <class T>
882 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
883   return within(dur, TimedOut(), tk);
884 }
885
886 template <class T>
887 template <class E>
888 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
889
890   struct Context {
891     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
892     E exception;
893     Promise<T> promise;
894     std::atomic<bool> token;
895   };
896   auto ctx = std::make_shared<Context>(std::move(e));
897
898   if (!tk) {
899     tk = folly::detail::getTimekeeperSingleton();
900   }
901
902   tk->after(dur)
903     .then([ctx](Try<void> const& t) {
904       if (ctx->token.exchange(true) == false) {
905         if (t.hasException()) {
906           ctx->promise.setException(std::move(t.exception()));
907         } else {
908           ctx->promise.setException(std::move(ctx->exception));
909         }
910       }
911     });
912
913   this->then([ctx](Try<T>&& t) {
914     if (ctx->token.exchange(true) == false) {
915       ctx->promise.setTry(std::move(t));
916     }
917   });
918
919   return ctx->promise.getFuture().via(getExecutor());
920 }
921
922 // delayed
923
924 template <class T>
925 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
926   return collectAll(*this, futures::sleep(dur, tk))
927     .then([](std::tuple<Try<T>, Try<void>> tup) {
928       Try<T>& t = std::get<0>(tup);
929       return makeFuture<T>(std::move(t));
930     });
931 }
932
933 namespace detail {
934
935 template <class T>
936 void waitImpl(Future<T>& f) {
937   // short-circuit if there's nothing to do
938   if (f.isReady()) return;
939
940   folly::fibers::Baton baton;
941   f = f.then([&](Try<T> t) {
942     baton.post();
943     return makeFuture(std::move(t));
944   });
945   baton.wait();
946
947   // There's a race here between the return here and the actual finishing of
948   // the future. f is completed, but the setup may not have finished on done
949   // after the baton has posted.
950   while (!f.isReady()) {
951     std::this_thread::yield();
952   }
953 }
954
955 template <class T>
956 void waitImpl(Future<T>& f, Duration dur) {
957   // short-circuit if there's nothing to do
958   if (f.isReady()) return;
959
960   auto baton = std::make_shared<folly::fibers::Baton>();
961   f = f.then([baton](Try<T> t) {
962     baton->post();
963     return makeFuture(std::move(t));
964   });
965
966   // Let's preserve the invariant that if we did not timeout (timed_wait returns
967   // true), then the returned Future is complete when it is returned to the
968   // caller. We need to wait out the race for that Future to complete.
969   if (baton->timed_wait(dur)) {
970     while (!f.isReady()) {
971       std::this_thread::yield();
972     }
973   }
974 }
975
976 template <class T>
977 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
978   while (!f.isReady()) {
979     e->drive();
980   }
981 }
982
983 } // detail
984
985 template <class T>
986 Future<T>& Future<T>::wait() & {
987   detail::waitImpl(*this);
988   return *this;
989 }
990
991 template <class T>
992 Future<T>&& Future<T>::wait() && {
993   detail::waitImpl(*this);
994   return std::move(*this);
995 }
996
997 template <class T>
998 Future<T>& Future<T>::wait(Duration dur) & {
999   detail::waitImpl(*this, dur);
1000   return *this;
1001 }
1002
1003 template <class T>
1004 Future<T>&& Future<T>::wait(Duration dur) && {
1005   detail::waitImpl(*this, dur);
1006   return std::move(*this);
1007 }
1008
1009 template <class T>
1010 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1011   detail::waitViaImpl(*this, e);
1012   return *this;
1013 }
1014
1015 template <class T>
1016 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1017   detail::waitViaImpl(*this, e);
1018   return std::move(*this);
1019 }
1020
1021 template <class T>
1022 T Future<T>::get() {
1023   return std::move(wait().value());
1024 }
1025
1026 template <>
1027 inline void Future<void>::get() {
1028   wait().value();
1029 }
1030
1031 template <class T>
1032 T Future<T>::get(Duration dur) {
1033   wait(dur);
1034   if (isReady()) {
1035     return std::move(value());
1036   } else {
1037     throw TimedOut();
1038   }
1039 }
1040
1041 template <>
1042 inline void Future<void>::get(Duration dur) {
1043   wait(dur);
1044   if (isReady()) {
1045     return;
1046   } else {
1047     throw TimedOut();
1048   }
1049 }
1050
1051 template <class T>
1052 T Future<T>::getVia(DrivableExecutor* e) {
1053   return std::move(waitVia(e).value());
1054 }
1055
1056 template <>
1057 inline void Future<void>::getVia(DrivableExecutor* e) {
1058   waitVia(e).value();
1059 }
1060
1061 namespace detail {
1062   template <class T>
1063   struct TryEquals {
1064     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1065       return t1.value() == t2.value();
1066     }
1067   };
1068
1069   template <>
1070   struct TryEquals<void> {
1071     static bool equals(const Try<void>& t1, const Try<void>& t2) {
1072       return true;
1073     }
1074   };
1075 }
1076
1077 template <class T>
1078 Future<bool> Future<T>::willEqual(Future<T>& f) {
1079   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1080     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1081       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1082     } else {
1083       return false;
1084       }
1085   });
1086 }
1087
1088 template <class T>
1089 template <class F>
1090 Future<T> Future<T>::filter(F predicate) {
1091   auto p = folly::makeMoveWrapper(std::move(predicate));
1092   return this->then([p](T val) {
1093     T const& valConstRef = val;
1094     if (!(*p)(valConstRef)) {
1095       throw PredicateDoesNotObtain();
1096     }
1097     return val;
1098   });
1099 }
1100
1101 template <class T>
1102 template <class Callback>
1103 auto Future<T>::thenMulti(Callback&& fn)
1104     -> decltype(this->then(std::forward<Callback>(fn))) {
1105   // thenMulti with one callback is just a then
1106   return then(std::forward<Callback>(fn));
1107 }
1108
1109 template <class T>
1110 template <class Callback, class... Callbacks>
1111 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1112     -> decltype(this->then(std::forward<Callback>(fn)).
1113                       thenMulti(std::forward<Callbacks>(fns)...)) {
1114   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1115   return then(std::forward<Callback>(fn)).
1116          thenMulti(std::forward<Callbacks>(fns)...);
1117 }
1118
1119 template <class T>
1120 template <class Callback, class... Callbacks>
1121 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1122                                       Callbacks&&... fns)
1123     -> decltype(this->then(std::forward<Callback>(fn)).
1124                       thenMulti(std::forward<Callbacks>(fns)...)) {
1125   // thenMultiExecutor with two callbacks is
1126   // via(x).then(a).thenMulti(b, ...).via(oldX)
1127   auto oldX = getExecutor();
1128   setExecutor(x);
1129   return then(std::forward<Callback>(fn)).
1130          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1131 }
1132
1133 template <class T>
1134 template <class Callback>
1135 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1136     -> decltype(this->then(std::forward<Callback>(fn))) {
1137   // thenMulti with one callback is just a then with an executor
1138   return then(x, std::forward<Callback>(fn));
1139 }
1140
1141 namespace futures {
1142   template <class It, class F, class ItT, class Result>
1143   std::vector<Future<Result>> map(It first, It last, F func) {
1144     std::vector<Future<Result>> results;
1145     for (auto it = first; it != last; it++) {
1146       results.push_back(it->then(func));
1147     }
1148     return results;
1149   }
1150 }
1151
1152 // Instantiate the most common Future types to save compile time
1153 extern template class Future<void>;
1154 extern template class Future<bool>;
1155 extern template class Future<int>;
1156 extern template class Future<int64_t>;
1157 extern template class Future<std::string>;
1158 extern template class Future<double>;
1159
1160 } // namespace folly
1161
1162 // I haven't included a Future<T&> specialization because I don't forsee us
1163 // using it, however it is not difficult to add when needed. Refer to
1164 // Future<void> for guidance. std::future and boost::future code would also be
1165 // instructive.