Future::unit()
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
26
27 namespace folly {
28
29 class Timekeeper;
30
31 namespace detail {
32   Timekeeper* getTimekeeperSingleton();
33 }
34
35 template <class T>
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37   other.core_ = nullptr;
38 }
39
40 template <class T>
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42   std::swap(core_, other.core_);
43   return *this;
44 }
45
46 template <class T>
47 template <class T2, typename>
48 Future<T>::Future(T2&& val)
49   : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
50
51 template <class T>
52 template <typename, typename>
53 Future<T>::Future()
54   : core_(new detail::Core<T>(Try<T>())) {}
55
56 template <class T>
57 Future<T>::~Future() {
58   detach();
59 }
60
61 template <class T>
62 void Future<T>::detach() {
63   if (core_) {
64     core_->detachFuture();
65     core_ = nullptr;
66   }
67 }
68
69 template <class T>
70 void Future<T>::throwIfInvalid() const {
71   if (!core_)
72     throw NoState();
73 }
74
75 template <class T>
76 template <class F>
77 void Future<T>::setCallback_(F&& func) {
78   throwIfInvalid();
79   core_->setCallback(std::move(func));
80 }
81
82 // unwrap
83
84 template <class T>
85 template <class F>
86 typename std::enable_if<isFuture<F>::value,
87                         Future<typename isFuture<T>::Inner>>::type
88 Future<T>::unwrap() {
89   return then([](Future<typename isFuture<T>::Inner> internal_future) {
90       return internal_future;
91   });
92 }
93
94 // then
95
96 // Variant: returns a value
97 // e.g. f.then([](Try<T>&& t){ return t.value(); });
98 template <class T>
99 template <typename F, typename R, bool isTry, typename... Args>
100 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
101 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
102   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
103   typedef typename R::ReturnsFuture::Inner B;
104
105   throwIfInvalid();
106
107   // wrap these so we can move them into the lambda
108   folly::MoveWrapper<Promise<B>> p;
109   p->setInterruptHandler(core_->getInterruptHandler());
110   folly::MoveWrapper<F> funcm(std::forward<F>(func));
111
112   // grab the Future now before we lose our handle on the Promise
113   auto f = p->getFuture();
114   if (getExecutor()) {
115     f.setExecutor(getExecutor());
116   }
117
118   /* This is a bit tricky.
119
120      We can't just close over *this in case this Future gets moved. So we
121      make a new dummy Future. We could figure out something more
122      sophisticated that avoids making a new Future object when it can, as an
123      optimization. But this is correct.
124
125      core_ can't be moved, it is explicitly disallowed (as is copying). But
126      if there's ever a reason to allow it, this is one place that makes that
127      assumption and would need to be fixed. We use a standard shared pointer
128      for core_ (by copying it in), which means in essence obj holds a shared
129      pointer to itself.  But this shouldn't leak because Promise will not
130      outlive the continuation, because Promise will setException() with a
131      broken Promise if it is destructed before completed. We could use a
132      weak pointer but it would have to be converted to a shared pointer when
133      func is executed (because the Future returned by func may possibly
134      persist beyond the callback, if it gets moved), and so it is an
135      optimization to just make it shared from the get-go.
136
137      We have to move in the Promise and func using the MoveWrapper
138      hack. (func could be copied but it's a big drag on perf).
139
140      Two subtle but important points about this design. detail::Core has no
141      back pointers to Future or Promise, so if Future or Promise get moved
142      (and they will be moved in performant code) we don't have to do
143      anything fancy. And because we store the continuation in the
144      detail::Core, not in the Future, we can execute the continuation even
145      after the Future has gone out of scope. This is an intentional design
146      decision. It is likely we will want to be able to cancel a continuation
147      in some circumstances, but I think it should be explicit not implicit
148      in the destruction of the Future used to create it.
149      */
150   setCallback_(
151     [p, funcm](Try<T>&& t) mutable {
152       if (!isTry && t.hasException()) {
153         p->setException(std::move(t.exception()));
154       } else {
155         p->setWith([&]() {
156           return (*funcm)(t.template get<isTry, Args>()...);
157         });
158       }
159     });
160
161   return f;
162 }
163
164 // Variant: returns a Future
165 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
166 template <class T>
167 template <typename F, typename R, bool isTry, typename... Args>
168 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
169 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
170   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
171   typedef typename R::ReturnsFuture::Inner B;
172
173   throwIfInvalid();
174
175   // wrap these so we can move them into the lambda
176   folly::MoveWrapper<Promise<B>> p;
177   folly::MoveWrapper<F> funcm(std::forward<F>(func));
178
179   // grab the Future now before we lose our handle on the Promise
180   auto f = p->getFuture();
181   if (getExecutor()) {
182     f.setExecutor(getExecutor());
183   }
184
185   setCallback_(
186     [p, funcm](Try<T>&& t) mutable {
187       if (!isTry && t.hasException()) {
188         p->setException(std::move(t.exception()));
189       } else {
190         try {
191           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
192           // that didn't throw, now we can steal p
193           f2.setCallback_([p](Try<B>&& b) mutable {
194             p->setTry(std::move(b));
195           });
196         } catch (const std::exception& e) {
197           p->setException(exception_wrapper(std::current_exception(), e));
198         } catch (...) {
199           p->setException(exception_wrapper(std::current_exception()));
200         }
201       }
202     });
203
204   return f;
205 }
206
207 template <typename T>
208 template <typename R, typename Caller, typename... Args>
209   Future<typename isFuture<R>::Inner>
210 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
211   typedef typename std::remove_cv<
212     typename std::remove_reference<
213       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
214   return then([instance, func](Try<T>&& t){
215     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
216   });
217 }
218
219 template <class T>
220 template <class Executor, class Arg, class... Args>
221 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
222   -> decltype(this->then(std::forward<Arg>(arg),
223                          std::forward<Args>(args)...))
224 {
225   auto oldX = getExecutor();
226   setExecutor(x);
227   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
228                via(oldX);
229 }
230
231 template <class T>
232 Future<void> Future<T>::then() {
233   return then([] (Try<T>&& t) {});
234 }
235
236 // onError where the callback returns T
237 template <class T>
238 template <class F>
239 typename std::enable_if<
240   !detail::callableWith<F, exception_wrapper>::value &&
241   !detail::Extract<F>::ReturnsFuture::value,
242   Future<T>>::type
243 Future<T>::onError(F&& func) {
244   typedef typename detail::Extract<F>::FirstArg Exn;
245   static_assert(
246       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
247       "Return type of onError callback must be T or Future<T>");
248
249   Promise<T> p;
250   auto f = p.getFuture();
251   auto pm = folly::makeMoveWrapper(std::move(p));
252   auto funcm = folly::makeMoveWrapper(std::move(func));
253   setCallback_([pm, funcm](Try<T>&& t) mutable {
254     if (!t.template withException<Exn>([&] (Exn& e) {
255           pm->setWith([&]{
256             return (*funcm)(e);
257           });
258         })) {
259       pm->setTry(std::move(t));
260     }
261   });
262
263   return f;
264 }
265
266 // onError where the callback returns Future<T>
267 template <class T>
268 template <class F>
269 typename std::enable_if<
270   !detail::callableWith<F, exception_wrapper>::value &&
271   detail::Extract<F>::ReturnsFuture::value,
272   Future<T>>::type
273 Future<T>::onError(F&& func) {
274   static_assert(
275       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
276       "Return type of onError callback must be T or Future<T>");
277   typedef typename detail::Extract<F>::FirstArg Exn;
278
279   Promise<T> p;
280   auto f = p.getFuture();
281   auto pm = folly::makeMoveWrapper(std::move(p));
282   auto funcm = folly::makeMoveWrapper(std::move(func));
283   setCallback_([pm, funcm](Try<T>&& t) mutable {
284     if (!t.template withException<Exn>([&] (Exn& e) {
285           try {
286             auto f2 = (*funcm)(e);
287             f2.setCallback_([pm](Try<T>&& t2) mutable {
288               pm->setTry(std::move(t2));
289             });
290           } catch (const std::exception& e2) {
291             pm->setException(exception_wrapper(std::current_exception(), e2));
292           } catch (...) {
293             pm->setException(exception_wrapper(std::current_exception()));
294           }
295         })) {
296       pm->setTry(std::move(t));
297     }
298   });
299
300   return f;
301 }
302
303 template <class T>
304 template <class F>
305 Future<T> Future<T>::ensure(F func) {
306   MoveWrapper<F> funcw(std::move(func));
307   return this->then([funcw](Try<T>&& t) {
308     (*funcw)();
309     return makeFuture(std::move(t));
310   });
311 }
312
313 template <class T>
314 template <class F>
315 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
316   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
317   return within(dur, tk)
318     .onError([funcw](TimedOut const&) { return (*funcw)(); });
319 }
320
321 template <class T>
322 template <class F>
323 typename std::enable_if<
324   detail::callableWith<F, exception_wrapper>::value &&
325   detail::Extract<F>::ReturnsFuture::value,
326   Future<T>>::type
327 Future<T>::onError(F&& func) {
328   static_assert(
329       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
330       "Return type of onError callback must be T or Future<T>");
331
332   Promise<T> p;
333   auto f = p.getFuture();
334   auto pm = folly::makeMoveWrapper(std::move(p));
335   auto funcm = folly::makeMoveWrapper(std::move(func));
336   setCallback_([pm, funcm](Try<T> t) mutable {
337     if (t.hasException()) {
338       try {
339         auto f2 = (*funcm)(std::move(t.exception()));
340         f2.setCallback_([pm](Try<T> t2) mutable {
341           pm->setTry(std::move(t2));
342         });
343       } catch (const std::exception& e2) {
344         pm->setException(exception_wrapper(std::current_exception(), e2));
345       } catch (...) {
346         pm->setException(exception_wrapper(std::current_exception()));
347       }
348     } else {
349       pm->setTry(std::move(t));
350     }
351   });
352
353   return f;
354 }
355
356 // onError(exception_wrapper) that returns T
357 template <class T>
358 template <class F>
359 typename std::enable_if<
360   detail::callableWith<F, exception_wrapper>::value &&
361   !detail::Extract<F>::ReturnsFuture::value,
362   Future<T>>::type
363 Future<T>::onError(F&& func) {
364   static_assert(
365       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
366       "Return type of onError callback must be T or Future<T>");
367
368   Promise<T> p;
369   auto f = p.getFuture();
370   auto pm = folly::makeMoveWrapper(std::move(p));
371   auto funcm = folly::makeMoveWrapper(std::move(func));
372   setCallback_([pm, funcm](Try<T> t) mutable {
373     if (t.hasException()) {
374       pm->setWith([&]{
375         return (*funcm)(std::move(t.exception()));
376       });
377     } else {
378       pm->setTry(std::move(t));
379     }
380   });
381
382   return f;
383 }
384
385 template <class T>
386 typename std::add_lvalue_reference<T>::type Future<T>::value() {
387   throwIfInvalid();
388
389   return core_->getTry().value();
390 }
391
392 template <class T>
393 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
394   throwIfInvalid();
395
396   return core_->getTry().value();
397 }
398
399 template <class T>
400 Try<T>& Future<T>::getTry() {
401   throwIfInvalid();
402
403   return core_->getTry();
404 }
405
406 template <class T>
407 Optional<Try<T>> Future<T>::poll() {
408   Optional<Try<T>> o;
409   if (core_->ready()) {
410     o = std::move(core_->getTry());
411   }
412   return o;
413 }
414
415 template <class T>
416 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
417   throwIfInvalid();
418
419   setExecutor(executor, priority);
420
421   return std::move(*this);
422 }
423
424 template <class T>
425 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
426   throwIfInvalid();
427
428   MoveWrapper<Promise<T>> p;
429   auto f = p->getFuture();
430   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
431   return std::move(f).via(executor, priority);
432 }
433
434
435 template <class Func>
436 auto via(Executor* x, Func func)
437   -> Future<typename isFuture<decltype(func())>::Inner>
438 // this would work, if not for Future<void> :-/
439 // -> decltype(via(x).then(func))
440 {
441   // TODO make this actually more performant. :-P #7260175
442   return via(x).then(func);
443 }
444
445 template <class T>
446 bool Future<T>::isReady() const {
447   throwIfInvalid();
448   return core_->ready();
449 }
450
451 template <class T>
452 bool Future<T>::hasValue() {
453   return getTry().hasValue();
454 }
455
456 template <class T>
457 bool Future<T>::hasException() {
458   return getTry().hasException();
459 }
460
461 template <class T>
462 void Future<T>::raise(exception_wrapper exception) {
463   core_->raise(std::move(exception));
464 }
465
466 // makeFuture
467
468 template <class T>
469 Future<typename std::decay<T>::type> makeFuture(T&& t) {
470   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
471 }
472
473 inline // for multiple translation units
474 Future<void> makeFuture() {
475   return makeFuture(Try<void>());
476 }
477
478 template <class F>
479 auto makeFutureWith(
480     F&& func,
481     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
482     -> Future<decltype(func())> {
483   return makeFuture(makeTryWith([&func]() {
484     return (func)();
485   }));
486 }
487
488 template <class F>
489 auto makeFutureWith(F const& func) -> Future<decltype(func())> {
490   F copy = func;
491   return makeFuture(makeTryWith(std::move(copy)));
492 }
493
494 template <class T>
495 Future<T> makeFuture(std::exception_ptr const& e) {
496   return makeFuture(Try<T>(e));
497 }
498
499 template <class T>
500 Future<T> makeFuture(exception_wrapper ew) {
501   return makeFuture(Try<T>(std::move(ew)));
502 }
503
504 template <class T, class E>
505 typename std::enable_if<std::is_base_of<std::exception, E>::value,
506                         Future<T>>::type
507 makeFuture(E const& e) {
508   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
509 }
510
511 template <class T>
512 Future<T> makeFuture(Try<T>&& t) {
513   return Future<T>(new detail::Core<T>(std::move(t)));
514 }
515
516 // via
517 Future<void> via(Executor* executor, int8_t priority) {
518   return makeFuture().via(executor, priority);
519 }
520
521 // mapSetCallback calls func(i, Try<T>) when every future completes
522
523 template <class T, class InputIterator, class F>
524 void mapSetCallback(InputIterator first, InputIterator last, F func) {
525   for (size_t i = 0; first != last; ++first, ++i) {
526     first->setCallback_([func, i](Try<T>&& t) {
527       func(i, std::move(t));
528     });
529   }
530 }
531
532 // collectAll (variadic)
533
534 template <typename... Fs>
535 typename detail::CollectAllVariadicContext<
536   typename std::decay<Fs>::type::value_type...>::type
537 collectAll(Fs&&... fs) {
538   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
539     typename std::decay<Fs>::type::value_type...>>();
540   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
541     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
542   return ctx->p.getFuture();
543 }
544
545 // collectAll (iterator)
546
547 template <class InputIterator>
548 Future<
549   std::vector<
550   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
551 collectAll(InputIterator first, InputIterator last) {
552   typedef
553     typename std::iterator_traits<InputIterator>::value_type::value_type T;
554
555   struct CollectAllContext {
556     CollectAllContext(int n) : results(n) {}
557     ~CollectAllContext() {
558       p.setValue(std::move(results));
559     }
560     Promise<std::vector<Try<T>>> p;
561     std::vector<Try<T>> results;
562   };
563
564   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
565   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
566     ctx->results[i] = std::move(t);
567   });
568   return ctx->p.getFuture();
569 }
570
571 // collect (iterator)
572
573 namespace detail {
574
575 template <typename T>
576 struct CollectContext {
577   struct Nothing { explicit Nothing(int n) {} };
578
579   using Result = typename std::conditional<
580     std::is_void<T>::value,
581     void,
582     std::vector<T>>::type;
583
584   using InternalResult = typename std::conditional<
585     std::is_void<T>::value,
586     Nothing,
587     std::vector<Optional<T>>>::type;
588
589   explicit CollectContext(int n) : result(n) {}
590   ~CollectContext() {
591     if (!threw.exchange(true)) {
592       // map Optional<T> -> T
593       std::vector<T> finalResult;
594       finalResult.reserve(result.size());
595       std::transform(result.begin(), result.end(),
596                      std::back_inserter(finalResult),
597                      [](Optional<T>& o) { return std::move(o.value()); });
598       p.setValue(std::move(finalResult));
599     }
600   }
601   inline void setPartialResult(size_t i, Try<T>& t) {
602     result[i] = std::move(t.value());
603   }
604   Promise<Result> p;
605   InternalResult result;
606   std::atomic<bool> threw;
607 };
608
609 // Specialize for void (implementations in Future.cpp)
610
611 template <>
612 CollectContext<void>::~CollectContext();
613
614 template <>
615 void CollectContext<void>::setPartialResult(size_t i, Try<void>& t);
616
617 }
618
619 template <class InputIterator>
620 Future<typename detail::CollectContext<
621   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
622 collect(InputIterator first, InputIterator last) {
623   typedef
624     typename std::iterator_traits<InputIterator>::value_type::value_type T;
625
626   auto ctx = std::make_shared<detail::CollectContext<T>>(
627     std::distance(first, last));
628   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
629     if (t.hasException()) {
630        if (!ctx->threw.exchange(true)) {
631          ctx->p.setException(std::move(t.exception()));
632        }
633      } else if (!ctx->threw) {
634        ctx->setPartialResult(i, t);
635      }
636   });
637   return ctx->p.getFuture();
638 }
639
640 // collect (variadic)
641
642 template <typename... Fs>
643 typename detail::CollectVariadicContext<
644   typename std::decay<Fs>::type::value_type...>::type
645 collect(Fs&&... fs) {
646   auto ctx = std::make_shared<detail::CollectVariadicContext<
647     typename std::decay<Fs>::type::value_type...>>();
648   detail::collectVariadicHelper<detail::CollectVariadicContext>(
649     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
650   return ctx->p.getFuture();
651 }
652
653 // collectAny (iterator)
654
655 template <class InputIterator>
656 Future<
657   std::pair<size_t,
658             Try<
659               typename
660               std::iterator_traits<InputIterator>::value_type::value_type>>>
661 collectAny(InputIterator first, InputIterator last) {
662   typedef
663     typename std::iterator_traits<InputIterator>::value_type::value_type T;
664
665   struct CollectAnyContext {
666     CollectAnyContext(size_t n) : done(false) {};
667     Promise<std::pair<size_t, Try<T>>> p;
668     std::atomic<bool> done;
669   };
670
671   auto ctx = std::make_shared<CollectAnyContext>(std::distance(first, last));
672   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
673     if (!ctx->done.exchange(true)) {
674       ctx->p.setValue(std::make_pair(i, std::move(t)));
675     }
676   });
677   return ctx->p.getFuture();
678 }
679
680 // collectN (iterator)
681
682 template <class InputIterator>
683 Future<std::vector<std::pair<size_t, Try<typename
684   std::iterator_traits<InputIterator>::value_type::value_type>>>>
685 collectN(InputIterator first, InputIterator last, size_t n) {
686   typedef typename
687     std::iterator_traits<InputIterator>::value_type::value_type T;
688   typedef std::vector<std::pair<size_t, Try<T>>> V;
689
690   struct CollectNContext {
691     V v;
692     std::atomic<size_t> completed = {0};
693     Promise<V> p;
694   };
695   auto ctx = std::make_shared<CollectNContext>();
696
697   if (size_t(std::distance(first, last)) < n) {
698     ctx->p.setException(std::runtime_error("Not enough futures"));
699   } else {
700     // for each completed Future, increase count and add to vector, until we
701     // have n completed futures at which point we fulfil our Promise with the
702     // vector
703     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
704       auto c = ++ctx->completed;
705       if (c <= n) {
706         assert(ctx->v.size() < n);
707         ctx->v.push_back(std::make_pair(i, std::move(t)));
708         if (c == n) {
709           ctx->p.setTry(Try<V>(std::move(ctx->v)));
710         }
711       }
712     });
713   }
714
715   return ctx->p.getFuture();
716 }
717
718 // reduce (iterator)
719
720 template <class It, class T, class F>
721 Future<T> reduce(It first, It last, T&& initial, F&& func) {
722   if (first == last) {
723     return makeFuture(std::move(initial));
724   }
725
726   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
727   typedef typename std::conditional<
728     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
729   typedef isTry<Arg> IsTry;
730
731   folly::MoveWrapper<T> minitial(std::move(initial));
732   auto sfunc = std::make_shared<F>(std::move(func));
733
734   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
735     return (*sfunc)(std::move(*minitial),
736                 head.template get<IsTry::value, Arg&&>());
737   });
738
739   for (++first; first != last; ++first) {
740     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
741       return (*sfunc)(std::move(std::get<0>(t).value()),
742                   // Either return a ItT&& or a Try<ItT>&& depending
743                   // on the type of the argument of func.
744                   std::get<1>(t).template get<IsTry::value, Arg&&>());
745     });
746   }
747
748   return f;
749 }
750
751 // window (collection)
752
753 template <class Collection, class F, class ItT, class Result>
754 std::vector<Future<Result>>
755 window(Collection input, F func, size_t n) {
756   struct WindowContext {
757     WindowContext(Collection&& i, F&& fn)
758         : i_(0), input_(std::move(i)), promises_(input_.size()),
759           func_(std::move(fn))
760       {}
761     std::atomic<size_t> i_;
762     Collection input_;
763     std::vector<Promise<Result>> promises_;
764     F func_;
765
766     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
767       size_t i = ctx->i_++;
768       if (i < ctx->input_.size()) {
769         // Using setCallback_ directly since we don't need the Future
770         ctx->func_(std::move(ctx->input_[i])).setCallback_(
771           // ctx is captured by value
772           [ctx, i](Try<Result>&& t) {
773             ctx->promises_[i].setTry(std::move(t));
774             // Chain another future onto this one
775             spawn(std::move(ctx));
776           });
777       }
778     }
779   };
780
781   auto max = std::min(n, input.size());
782
783   auto ctx = std::make_shared<WindowContext>(
784     std::move(input), std::move(func));
785
786   for (size_t i = 0; i < max; ++i) {
787     // Start the first n Futures
788     WindowContext::spawn(ctx);
789   }
790
791   std::vector<Future<Result>> futures;
792   futures.reserve(ctx->promises_.size());
793   for (auto& promise : ctx->promises_) {
794     futures.emplace_back(promise.getFuture());
795   }
796
797   return futures;
798 }
799
800 // reduce
801
802 template <class T>
803 template <class I, class F>
804 Future<I> Future<T>::reduce(I&& initial, F&& func) {
805   folly::MoveWrapper<I> minitial(std::move(initial));
806   folly::MoveWrapper<F> mfunc(std::move(func));
807   return then([minitial, mfunc](T& vals) mutable {
808     auto ret = std::move(*minitial);
809     for (auto& val : vals) {
810       ret = (*mfunc)(std::move(ret), std::move(val));
811     }
812     return ret;
813   });
814 }
815
816 // unorderedReduce (iterator)
817
818 template <class It, class T, class F, class ItT, class Arg>
819 Future<T> unorderedReduce(It first, It last, T initial, F func) {
820   if (first == last) {
821     return makeFuture(std::move(initial));
822   }
823
824   typedef isTry<Arg> IsTry;
825
826   struct UnorderedReduceContext {
827     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
828         : lock_(), memo_(makeFuture<T>(std::move(memo))),
829           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
830       {};
831     folly::MicroSpinLock lock_; // protects memo_ and numThens_
832     Future<T> memo_;
833     F func_;
834     size_t numThens_; // how many Futures completed and called .then()
835     size_t numFutures_; // how many Futures in total
836     Promise<T> promise_;
837   };
838
839   auto ctx = std::make_shared<UnorderedReduceContext>(
840     std::move(initial), std::move(func), std::distance(first, last));
841
842   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
843     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
844     // Futures can be completed in any order, simultaneously.
845     // To make this non-blocking, we create a new Future chain in
846     // the order of completion to reduce the values.
847     // The spinlock just protects chaining a new Future, not actually
848     // executing the reduce, which should be really fast.
849     folly::MSLGuard lock(ctx->lock_);
850     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
851       // Either return a ItT&& or a Try<ItT>&& depending
852       // on the type of the argument of func.
853       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
854     });
855     if (++ctx->numThens_ == ctx->numFutures_) {
856       // After reducing the value of the last Future, fulfill the Promise
857       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
858         ctx->promise_.setValue(std::move(t2));
859       });
860     }
861   });
862
863   return ctx->promise_.getFuture();
864 }
865
866 // within
867
868 template <class T>
869 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
870   return within(dur, TimedOut(), tk);
871 }
872
873 template <class T>
874 template <class E>
875 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
876
877   struct Context {
878     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
879     E exception;
880     Promise<T> promise;
881     std::atomic<bool> token;
882   };
883   auto ctx = std::make_shared<Context>(std::move(e));
884
885   if (!tk) {
886     tk = folly::detail::getTimekeeperSingleton();
887   }
888
889   tk->after(dur)
890     .then([ctx](Try<void> const& t) {
891       if (ctx->token.exchange(true) == false) {
892         if (t.hasException()) {
893           ctx->promise.setException(std::move(t.exception()));
894         } else {
895           ctx->promise.setException(std::move(ctx->exception));
896         }
897       }
898     });
899
900   this->then([ctx](Try<T>&& t) {
901     if (ctx->token.exchange(true) == false) {
902       ctx->promise.setTry(std::move(t));
903     }
904   });
905
906   return ctx->promise.getFuture().via(getExecutor());
907 }
908
909 // delayed
910
911 template <class T>
912 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
913   return collectAll(*this, futures::sleep(dur, tk))
914     .then([](std::tuple<Try<T>, Try<void>> tup) {
915       Try<T>& t = std::get<0>(tup);
916       return makeFuture<T>(std::move(t));
917     });
918 }
919
920 namespace detail {
921
922 template <class T>
923 void waitImpl(Future<T>& f) {
924   // short-circuit if there's nothing to do
925   if (f.isReady()) return;
926
927   folly::fibers::Baton baton;
928   f = f.then([&](Try<T> t) {
929     baton.post();
930     return makeFuture(std::move(t));
931   });
932   baton.wait();
933
934   // There's a race here between the return here and the actual finishing of
935   // the future. f is completed, but the setup may not have finished on done
936   // after the baton has posted.
937   while (!f.isReady()) {
938     std::this_thread::yield();
939   }
940 }
941
942 template <class T>
943 void waitImpl(Future<T>& f, Duration dur) {
944   // short-circuit if there's nothing to do
945   if (f.isReady()) return;
946
947   auto baton = std::make_shared<folly::fibers::Baton>();
948   f = f.then([baton](Try<T> t) {
949     baton->post();
950     return makeFuture(std::move(t));
951   });
952
953   // Let's preserve the invariant that if we did not timeout (timed_wait returns
954   // true), then the returned Future is complete when it is returned to the
955   // caller. We need to wait out the race for that Future to complete.
956   if (baton->timed_wait(dur)) {
957     while (!f.isReady()) {
958       std::this_thread::yield();
959     }
960   }
961 }
962
963 template <class T>
964 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
965   while (!f.isReady()) {
966     e->drive();
967   }
968 }
969
970 } // detail
971
972 template <class T>
973 Future<T>& Future<T>::wait() & {
974   detail::waitImpl(*this);
975   return *this;
976 }
977
978 template <class T>
979 Future<T>&& Future<T>::wait() && {
980   detail::waitImpl(*this);
981   return std::move(*this);
982 }
983
984 template <class T>
985 Future<T>& Future<T>::wait(Duration dur) & {
986   detail::waitImpl(*this, dur);
987   return *this;
988 }
989
990 template <class T>
991 Future<T>&& Future<T>::wait(Duration dur) && {
992   detail::waitImpl(*this, dur);
993   return std::move(*this);
994 }
995
996 template <class T>
997 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
998   detail::waitViaImpl(*this, e);
999   return *this;
1000 }
1001
1002 template <class T>
1003 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1004   detail::waitViaImpl(*this, e);
1005   return std::move(*this);
1006 }
1007
1008 template <class T>
1009 T Future<T>::get() {
1010   return std::move(wait().value());
1011 }
1012
1013 template <>
1014 inline void Future<void>::get() {
1015   wait().value();
1016 }
1017
1018 template <class T>
1019 T Future<T>::get(Duration dur) {
1020   wait(dur);
1021   if (isReady()) {
1022     return std::move(value());
1023   } else {
1024     throw TimedOut();
1025   }
1026 }
1027
1028 template <>
1029 inline void Future<void>::get(Duration dur) {
1030   wait(dur);
1031   if (isReady()) {
1032     return;
1033   } else {
1034     throw TimedOut();
1035   }
1036 }
1037
1038 template <class T>
1039 T Future<T>::getVia(DrivableExecutor* e) {
1040   return std::move(waitVia(e).value());
1041 }
1042
1043 template <>
1044 inline void Future<void>::getVia(DrivableExecutor* e) {
1045   waitVia(e).value();
1046 }
1047
1048 namespace detail {
1049   template <class T>
1050   struct TryEquals {
1051     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1052       return t1.value() == t2.value();
1053     }
1054   };
1055
1056   template <>
1057   struct TryEquals<void> {
1058     static bool equals(const Try<void>& t1, const Try<void>& t2) {
1059       return true;
1060     }
1061   };
1062 }
1063
1064 template <class T>
1065 Future<bool> Future<T>::willEqual(Future<T>& f) {
1066   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1067     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1068       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1069     } else {
1070       return false;
1071       }
1072   });
1073 }
1074
1075 template <class T>
1076 template <class F>
1077 Future<T> Future<T>::filter(F predicate) {
1078   auto p = folly::makeMoveWrapper(std::move(predicate));
1079   return this->then([p](T val) {
1080     T const& valConstRef = val;
1081     if (!(*p)(valConstRef)) {
1082       throw PredicateDoesNotObtain();
1083     }
1084     return val;
1085   });
1086 }
1087
1088 template <class T>
1089 template <class Callback>
1090 auto Future<T>::thenMulti(Callback&& fn)
1091     -> decltype(this->then(std::forward<Callback>(fn))) {
1092   // thenMulti with one callback is just a then
1093   return then(std::forward<Callback>(fn));
1094 }
1095
1096 template <class T>
1097 template <class Callback, class... Callbacks>
1098 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1099     -> decltype(this->then(std::forward<Callback>(fn)).
1100                       thenMulti(std::forward<Callbacks>(fns)...)) {
1101   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1102   return then(std::forward<Callback>(fn)).
1103          thenMulti(std::forward<Callbacks>(fns)...);
1104 }
1105
1106 template <class T>
1107 template <class Callback, class... Callbacks>
1108 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1109                                       Callbacks&&... fns)
1110     -> decltype(this->then(std::forward<Callback>(fn)).
1111                       thenMulti(std::forward<Callbacks>(fns)...)) {
1112   // thenMultiExecutor with two callbacks is
1113   // via(x).then(a).thenMulti(b, ...).via(oldX)
1114   auto oldX = getExecutor();
1115   setExecutor(x);
1116   return then(std::forward<Callback>(fn)).
1117          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1118 }
1119
1120 template <class T>
1121 template <class Callback>
1122 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1123     -> decltype(this->then(std::forward<Callback>(fn))) {
1124   // thenMulti with one callback is just a then with an executor
1125   return then(x, std::forward<Callback>(fn));
1126 }
1127
1128 namespace futures {
1129   template <class It, class F, class ItT, class Result>
1130   std::vector<Future<Result>> map(It first, It last, F func) {
1131     std::vector<Future<Result>> results;
1132     for (auto it = first; it != last; it++) {
1133       results.push_back(it->then(func));
1134     }
1135     return results;
1136   }
1137 }
1138
1139 // Instantiate the most common Future types to save compile time
1140 extern template class Future<void>;
1141 extern template class Future<bool>;
1142 extern template class Future<int>;
1143 extern template class Future<int64_t>;
1144 extern template class Future<std::string>;
1145 extern template class Future<double>;
1146
1147 } // namespace folly
1148
1149 // I haven't included a Future<T&> specialization because I don't forsee us
1150 // using it, however it is not difficult to add when needed. Refer to
1151 // Future<void> for guidance. std::future and boost::future code would also be
1152 // instructive.