59498af986ef9768a745b502e7968af11e32a555
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
26
27 namespace folly {
28
29 class Timekeeper;
30
31 namespace detail {
32   Timekeeper* getTimekeeperSingleton();
33 }
34
35 template <class T>
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37   other.core_ = nullptr;
38 }
39
40 template <class T>
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42   std::swap(core_, other.core_);
43   return *this;
44 }
45
46 template <class T>
47 template <class T2, typename>
48 Future<T>::Future(T2&& val)
49   : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
50
51 template <class T>
52 template <class T2,
53           typename std::enable_if<
54             folly::is_void_or_unit<T2>::value,
55             int>::type>
56 Future<T>::Future()
57   : core_(new detail::Core<T>(Try<T>())) {}
58
59
60 template <class T>
61 Future<T>::~Future() {
62   detach();
63 }
64
65 template <class T>
66 void Future<T>::detach() {
67   if (core_) {
68     core_->detachFuture();
69     core_ = nullptr;
70   }
71 }
72
73 template <class T>
74 void Future<T>::throwIfInvalid() const {
75   if (!core_)
76     throw NoState();
77 }
78
79 template <class T>
80 template <class F>
81 void Future<T>::setCallback_(F&& func) {
82   throwIfInvalid();
83   core_->setCallback(std::move(func));
84 }
85
86 // unwrap
87
88 template <class T>
89 template <class F>
90 typename std::enable_if<isFuture<F>::value,
91                         Future<typename isFuture<T>::Inner>>::type
92 Future<T>::unwrap() {
93   return then([](Future<typename isFuture<T>::Inner> internal_future) {
94       return internal_future;
95   });
96 }
97
98 // then
99
100 // Variant: returns a value
101 // e.g. f.then([](Try<T>&& t){ return t.value(); });
102 template <class T>
103 template <typename F, typename R, bool isTry, typename... Args>
104 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
105 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
106   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
107   typedef typename R::ReturnsFuture::Inner B;
108
109   throwIfInvalid();
110
111   // wrap these so we can move them into the lambda
112   folly::MoveWrapper<Promise<B>> p;
113   p->setInterruptHandler(core_->getInterruptHandler());
114   folly::MoveWrapper<F> funcm(std::forward<F>(func));
115
116   // grab the Future now before we lose our handle on the Promise
117   auto f = p->getFuture();
118   if (getExecutor()) {
119     f.setExecutor(getExecutor());
120   }
121
122   /* This is a bit tricky.
123
124      We can't just close over *this in case this Future gets moved. So we
125      make a new dummy Future. We could figure out something more
126      sophisticated that avoids making a new Future object when it can, as an
127      optimization. But this is correct.
128
129      core_ can't be moved, it is explicitly disallowed (as is copying). But
130      if there's ever a reason to allow it, this is one place that makes that
131      assumption and would need to be fixed. We use a standard shared pointer
132      for core_ (by copying it in), which means in essence obj holds a shared
133      pointer to itself.  But this shouldn't leak because Promise will not
134      outlive the continuation, because Promise will setException() with a
135      broken Promise if it is destructed before completed. We could use a
136      weak pointer but it would have to be converted to a shared pointer when
137      func is executed (because the Future returned by func may possibly
138      persist beyond the callback, if it gets moved), and so it is an
139      optimization to just make it shared from the get-go.
140
141      We have to move in the Promise and func using the MoveWrapper
142      hack. (func could be copied but it's a big drag on perf).
143
144      Two subtle but important points about this design. detail::Core has no
145      back pointers to Future or Promise, so if Future or Promise get moved
146      (and they will be moved in performant code) we don't have to do
147      anything fancy. And because we store the continuation in the
148      detail::Core, not in the Future, we can execute the continuation even
149      after the Future has gone out of scope. This is an intentional design
150      decision. It is likely we will want to be able to cancel a continuation
151      in some circumstances, but I think it should be explicit not implicit
152      in the destruction of the Future used to create it.
153      */
154   setCallback_(
155     [p, funcm](Try<T>&& t) mutable {
156       if (!isTry && t.hasException()) {
157         p->setException(std::move(t.exception()));
158       } else {
159         p->setWith([&]() {
160           return (*funcm)(t.template get<isTry, Args>()...);
161         });
162       }
163     });
164
165   return f;
166 }
167
168 // Variant: returns a Future
169 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
170 template <class T>
171 template <typename F, typename R, bool isTry, typename... Args>
172 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
173 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
174   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
175   typedef typename R::ReturnsFuture::Inner B;
176
177   throwIfInvalid();
178
179   // wrap these so we can move them into the lambda
180   folly::MoveWrapper<Promise<B>> p;
181   folly::MoveWrapper<F> funcm(std::forward<F>(func));
182
183   // grab the Future now before we lose our handle on the Promise
184   auto f = p->getFuture();
185   if (getExecutor()) {
186     f.setExecutor(getExecutor());
187   }
188
189   setCallback_(
190     [p, funcm](Try<T>&& t) mutable {
191       if (!isTry && t.hasException()) {
192         p->setException(std::move(t.exception()));
193       } else {
194         try {
195           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
196           // that didn't throw, now we can steal p
197           f2.setCallback_([p](Try<B>&& b) mutable {
198             p->setTry(std::move(b));
199           });
200         } catch (const std::exception& e) {
201           p->setException(exception_wrapper(std::current_exception(), e));
202         } catch (...) {
203           p->setException(exception_wrapper(std::current_exception()));
204         }
205       }
206     });
207
208   return f;
209 }
210
211 template <typename T>
212 template <typename R, typename Caller, typename... Args>
213   Future<typename isFuture<R>::Inner>
214 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
215   typedef typename std::remove_cv<
216     typename std::remove_reference<
217       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
218   return then([instance, func](Try<T>&& t){
219     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
220   });
221 }
222
223 template <class T>
224 template <class Executor, class Arg, class... Args>
225 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
226   -> decltype(this->then(std::forward<Arg>(arg),
227                          std::forward<Args>(args)...))
228 {
229   auto oldX = getExecutor();
230   setExecutor(x);
231   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
232                via(oldX);
233 }
234
235 template <class T>
236 Future<void> Future<T>::then() {
237   return then([] (Try<T>&& t) {});
238 }
239
240 // onError where the callback returns T
241 template <class T>
242 template <class F>
243 typename std::enable_if<
244   !detail::callableWith<F, exception_wrapper>::value &&
245   !detail::Extract<F>::ReturnsFuture::value,
246   Future<T>>::type
247 Future<T>::onError(F&& func) {
248   typedef typename detail::Extract<F>::FirstArg Exn;
249   static_assert(
250       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
251       "Return type of onError callback must be T or Future<T>");
252
253   Promise<T> p;
254   auto f = p.getFuture();
255   auto pm = folly::makeMoveWrapper(std::move(p));
256   auto funcm = folly::makeMoveWrapper(std::move(func));
257   setCallback_([pm, funcm](Try<T>&& t) mutable {
258     if (!t.template withException<Exn>([&] (Exn& e) {
259           pm->setWith([&]{
260             return (*funcm)(e);
261           });
262         })) {
263       pm->setTry(std::move(t));
264     }
265   });
266
267   return f;
268 }
269
270 // onError where the callback returns Future<T>
271 template <class T>
272 template <class F>
273 typename std::enable_if<
274   !detail::callableWith<F, exception_wrapper>::value &&
275   detail::Extract<F>::ReturnsFuture::value,
276   Future<T>>::type
277 Future<T>::onError(F&& func) {
278   static_assert(
279       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
280       "Return type of onError callback must be T or Future<T>");
281   typedef typename detail::Extract<F>::FirstArg Exn;
282
283   Promise<T> p;
284   auto f = p.getFuture();
285   auto pm = folly::makeMoveWrapper(std::move(p));
286   auto funcm = folly::makeMoveWrapper(std::move(func));
287   setCallback_([pm, funcm](Try<T>&& t) mutable {
288     if (!t.template withException<Exn>([&] (Exn& e) {
289           try {
290             auto f2 = (*funcm)(e);
291             f2.setCallback_([pm](Try<T>&& t2) mutable {
292               pm->setTry(std::move(t2));
293             });
294           } catch (const std::exception& e2) {
295             pm->setException(exception_wrapper(std::current_exception(), e2));
296           } catch (...) {
297             pm->setException(exception_wrapper(std::current_exception()));
298           }
299         })) {
300       pm->setTry(std::move(t));
301     }
302   });
303
304   return f;
305 }
306
307 template <class T>
308 template <class F>
309 Future<T> Future<T>::ensure(F func) {
310   MoveWrapper<F> funcw(std::move(func));
311   return this->then([funcw](Try<T>&& t) {
312     (*funcw)();
313     return makeFuture(std::move(t));
314   });
315 }
316
317 template <class T>
318 template <class F>
319 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
320   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
321   return within(dur, tk)
322     .onError([funcw](TimedOut const&) { return (*funcw)(); });
323 }
324
325 template <class T>
326 template <class F>
327 typename std::enable_if<
328   detail::callableWith<F, exception_wrapper>::value &&
329   detail::Extract<F>::ReturnsFuture::value,
330   Future<T>>::type
331 Future<T>::onError(F&& func) {
332   static_assert(
333       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
334       "Return type of onError callback must be T or Future<T>");
335
336   Promise<T> p;
337   auto f = p.getFuture();
338   auto pm = folly::makeMoveWrapper(std::move(p));
339   auto funcm = folly::makeMoveWrapper(std::move(func));
340   setCallback_([pm, funcm](Try<T> t) mutable {
341     if (t.hasException()) {
342       try {
343         auto f2 = (*funcm)(std::move(t.exception()));
344         f2.setCallback_([pm](Try<T> t2) mutable {
345           pm->setTry(std::move(t2));
346         });
347       } catch (const std::exception& e2) {
348         pm->setException(exception_wrapper(std::current_exception(), e2));
349       } catch (...) {
350         pm->setException(exception_wrapper(std::current_exception()));
351       }
352     } else {
353       pm->setTry(std::move(t));
354     }
355   });
356
357   return f;
358 }
359
360 // onError(exception_wrapper) that returns T
361 template <class T>
362 template <class F>
363 typename std::enable_if<
364   detail::callableWith<F, exception_wrapper>::value &&
365   !detail::Extract<F>::ReturnsFuture::value,
366   Future<T>>::type
367 Future<T>::onError(F&& func) {
368   static_assert(
369       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
370       "Return type of onError callback must be T or Future<T>");
371
372   Promise<T> p;
373   auto f = p.getFuture();
374   auto pm = folly::makeMoveWrapper(std::move(p));
375   auto funcm = folly::makeMoveWrapper(std::move(func));
376   setCallback_([pm, funcm](Try<T> t) mutable {
377     if (t.hasException()) {
378       pm->setWith([&]{
379         return (*funcm)(std::move(t.exception()));
380       });
381     } else {
382       pm->setTry(std::move(t));
383     }
384   });
385
386   return f;
387 }
388
389 template <class T>
390 typename std::add_lvalue_reference<T>::type Future<T>::value() {
391   throwIfInvalid();
392
393   return core_->getTry().value();
394 }
395
396 template <class T>
397 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
398   throwIfInvalid();
399
400   return core_->getTry().value();
401 }
402
403 template <class T>
404 Try<T>& Future<T>::getTry() {
405   throwIfInvalid();
406
407   return core_->getTry();
408 }
409
410 template <class T>
411 Optional<Try<T>> Future<T>::poll() {
412   Optional<Try<T>> o;
413   if (core_->ready()) {
414     o = std::move(core_->getTry());
415   }
416   return o;
417 }
418
419 template <class T>
420 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
421   throwIfInvalid();
422
423   setExecutor(executor, priority);
424
425   return std::move(*this);
426 }
427
428 template <class T>
429 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
430   throwIfInvalid();
431
432   MoveWrapper<Promise<T>> p;
433   auto f = p->getFuture();
434   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
435   return std::move(f).via(executor, priority);
436 }
437
438
439 template <class Func>
440 auto via(Executor* x, Func func)
441   -> Future<typename isFuture<decltype(func())>::Inner>
442 // this would work, if not for Future<void> :-/
443 // -> decltype(via(x).then(func))
444 {
445   // TODO make this actually more performant. :-P #7260175
446   return via(x).then(func);
447 }
448
449 template <class T>
450 bool Future<T>::isReady() const {
451   throwIfInvalid();
452   return core_->ready();
453 }
454
455 template <class T>
456 void Future<T>::raise(exception_wrapper exception) {
457   core_->raise(std::move(exception));
458 }
459
460 // makeFuture
461
462 template <class T>
463 Future<typename std::decay<T>::type> makeFuture(T&& t) {
464   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
465 }
466
467 inline // for multiple translation units
468 Future<void> makeFuture() {
469   return makeFuture(Try<void>());
470 }
471
472 template <class F>
473 auto makeFutureWith(
474     F&& func,
475     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
476     -> Future<decltype(func())> {
477   return makeFuture(makeTryWith([&func]() {
478     return (func)();
479   }));
480 }
481
482 template <class F>
483 auto makeFutureWith(F const& func) -> Future<decltype(func())> {
484   F copy = func;
485   return makeFuture(makeTryWith(std::move(copy)));
486 }
487
488 template <class T>
489 Future<T> makeFuture(std::exception_ptr const& e) {
490   return makeFuture(Try<T>(e));
491 }
492
493 template <class T>
494 Future<T> makeFuture(exception_wrapper ew) {
495   return makeFuture(Try<T>(std::move(ew)));
496 }
497
498 template <class T, class E>
499 typename std::enable_if<std::is_base_of<std::exception, E>::value,
500                         Future<T>>::type
501 makeFuture(E const& e) {
502   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
503 }
504
505 template <class T>
506 Future<T> makeFuture(Try<T>&& t) {
507   return Future<T>(new detail::Core<T>(std::move(t)));
508 }
509
510 // via
511 Future<void> via(Executor* executor, int8_t priority) {
512   return makeFuture().via(executor, priority);
513 }
514
515 // mapSetCallback calls func(i, Try<T>) when every future completes
516
517 template <class T, class InputIterator, class F>
518 void mapSetCallback(InputIterator first, InputIterator last, F func) {
519   for (size_t i = 0; first != last; ++first, ++i) {
520     first->setCallback_([func, i](Try<T>&& t) {
521       func(i, std::move(t));
522     });
523   }
524 }
525
526 // collectAll (variadic)
527
528 template <typename... Fs>
529 typename detail::CollectAllVariadicContext<
530   typename std::decay<Fs>::type::value_type...>::type
531 collectAll(Fs&&... fs) {
532   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
533     typename std::decay<Fs>::type::value_type...>>();
534   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
535     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
536   return ctx->p.getFuture();
537 }
538
539 // collectAll (iterator)
540
541 template <class InputIterator>
542 Future<
543   std::vector<
544   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
545 collectAll(InputIterator first, InputIterator last) {
546   typedef
547     typename std::iterator_traits<InputIterator>::value_type::value_type T;
548
549   struct CollectAllContext {
550     CollectAllContext(int n) : results(n) {}
551     ~CollectAllContext() {
552       p.setValue(std::move(results));
553     }
554     Promise<std::vector<Try<T>>> p;
555     std::vector<Try<T>> results;
556   };
557
558   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
559   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
560     ctx->results[i] = std::move(t);
561   });
562   return ctx->p.getFuture();
563 }
564
565 // collect (iterator)
566
567 namespace detail {
568
569 template <typename T>
570 struct CollectContext {
571   struct Nothing { explicit Nothing(int n) {} };
572
573   using Result = typename std::conditional<
574     std::is_void<T>::value,
575     void,
576     std::vector<T>>::type;
577
578   using InternalResult = typename std::conditional<
579     std::is_void<T>::value,
580     Nothing,
581     std::vector<Optional<T>>>::type;
582
583   explicit CollectContext(int n) : result(n) {}
584   ~CollectContext() {
585     if (!threw.exchange(true)) {
586       // map Optional<T> -> T
587       std::vector<T> finalResult;
588       finalResult.reserve(result.size());
589       std::transform(result.begin(), result.end(),
590                      std::back_inserter(finalResult),
591                      [](Optional<T>& o) { return std::move(o.value()); });
592       p.setValue(std::move(finalResult));
593     }
594   }
595   inline void setPartialResult(size_t i, Try<T>& t) {
596     result[i] = std::move(t.value());
597   }
598   Promise<Result> p;
599   InternalResult result;
600   std::atomic<bool> threw;
601 };
602
603 // Specialize for void (implementations in Future.cpp)
604
605 template <>
606 CollectContext<void>::~CollectContext();
607
608 template <>
609 void CollectContext<void>::setPartialResult(size_t i, Try<void>& t);
610
611 }
612
613 template <class InputIterator>
614 Future<typename detail::CollectContext<
615   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
616 collect(InputIterator first, InputIterator last) {
617   typedef
618     typename std::iterator_traits<InputIterator>::value_type::value_type T;
619
620   auto ctx = std::make_shared<detail::CollectContext<T>>(
621     std::distance(first, last));
622   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
623     if (t.hasException()) {
624        if (!ctx->threw.exchange(true)) {
625          ctx->p.setException(std::move(t.exception()));
626        }
627      } else if (!ctx->threw) {
628        ctx->setPartialResult(i, t);
629      }
630   });
631   return ctx->p.getFuture();
632 }
633
634 // collect (variadic)
635
636 template <typename... Fs>
637 typename detail::CollectVariadicContext<
638   typename std::decay<Fs>::type::value_type...>::type
639 collect(Fs&&... fs) {
640   auto ctx = std::make_shared<detail::CollectVariadicContext<
641     typename std::decay<Fs>::type::value_type...>>();
642   detail::collectVariadicHelper<detail::CollectVariadicContext>(
643     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
644   return ctx->p.getFuture();
645 }
646
647 // collectAny (iterator)
648
649 template <class InputIterator>
650 Future<
651   std::pair<size_t,
652             Try<
653               typename
654               std::iterator_traits<InputIterator>::value_type::value_type>>>
655 collectAny(InputIterator first, InputIterator last) {
656   typedef
657     typename std::iterator_traits<InputIterator>::value_type::value_type T;
658
659   struct CollectAnyContext {
660     CollectAnyContext(size_t n) : done(false) {};
661     Promise<std::pair<size_t, Try<T>>> p;
662     std::atomic<bool> done;
663   };
664
665   auto ctx = std::make_shared<CollectAnyContext>(std::distance(first, last));
666   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
667     if (!ctx->done.exchange(true)) {
668       ctx->p.setValue(std::make_pair(i, std::move(t)));
669     }
670   });
671   return ctx->p.getFuture();
672 }
673
674 // collectN (iterator)
675
676 template <class InputIterator>
677 Future<std::vector<std::pair<size_t, Try<typename
678   std::iterator_traits<InputIterator>::value_type::value_type>>>>
679 collectN(InputIterator first, InputIterator last, size_t n) {
680   typedef typename
681     std::iterator_traits<InputIterator>::value_type::value_type T;
682   typedef std::vector<std::pair<size_t, Try<T>>> V;
683
684   struct CollectNContext {
685     V v;
686     std::atomic<size_t> completed = {0};
687     Promise<V> p;
688   };
689   auto ctx = std::make_shared<CollectNContext>();
690
691   if (size_t(std::distance(first, last)) < n) {
692     ctx->p.setException(std::runtime_error("Not enough futures"));
693   } else {
694     // for each completed Future, increase count and add to vector, until we
695     // have n completed futures at which point we fulfil our Promise with the
696     // vector
697     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
698       auto c = ++ctx->completed;
699       if (c <= n) {
700         assert(ctx->v.size() < n);
701         ctx->v.push_back(std::make_pair(i, std::move(t)));
702         if (c == n) {
703           ctx->p.setTry(Try<V>(std::move(ctx->v)));
704         }
705       }
706     });
707   }
708
709   return ctx->p.getFuture();
710 }
711
712 // reduce (iterator)
713
714 template <class It, class T, class F>
715 Future<T> reduce(It first, It last, T&& initial, F&& func) {
716   if (first == last) {
717     return makeFuture(std::move(initial));
718   }
719
720   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
721   typedef typename std::conditional<
722     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
723   typedef isTry<Arg> IsTry;
724
725   folly::MoveWrapper<T> minitial(std::move(initial));
726   auto sfunc = std::make_shared<F>(std::move(func));
727
728   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
729     return (*sfunc)(std::move(*minitial),
730                 head.template get<IsTry::value, Arg&&>());
731   });
732
733   for (++first; first != last; ++first) {
734     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
735       return (*sfunc)(std::move(std::get<0>(t).value()),
736                   // Either return a ItT&& or a Try<ItT>&& depending
737                   // on the type of the argument of func.
738                   std::get<1>(t).template get<IsTry::value, Arg&&>());
739     });
740   }
741
742   return f;
743 }
744
745 // window (collection)
746
747 template <class Collection, class F, class ItT, class Result>
748 std::vector<Future<Result>>
749 window(Collection input, F func, size_t n) {
750   struct WindowContext {
751     WindowContext(Collection&& i, F&& fn)
752         : i_(0), input_(std::move(i)), promises_(input_.size()),
753           func_(std::move(fn))
754       {}
755     std::atomic<size_t> i_;
756     Collection input_;
757     std::vector<Promise<Result>> promises_;
758     F func_;
759
760     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
761       size_t i = ctx->i_++;
762       if (i < ctx->input_.size()) {
763         // Using setCallback_ directly since we don't need the Future
764         ctx->func_(std::move(ctx->input_[i])).setCallback_(
765           // ctx is captured by value
766           [ctx, i](Try<Result>&& t) {
767             ctx->promises_[i].setTry(std::move(t));
768             // Chain another future onto this one
769             spawn(std::move(ctx));
770           });
771       }
772     }
773   };
774
775   auto max = std::min(n, input.size());
776
777   auto ctx = std::make_shared<WindowContext>(
778     std::move(input), std::move(func));
779
780   for (size_t i = 0; i < max; ++i) {
781     // Start the first n Futures
782     WindowContext::spawn(ctx);
783   }
784
785   std::vector<Future<Result>> futures;
786   futures.reserve(ctx->promises_.size());
787   for (auto& promise : ctx->promises_) {
788     futures.emplace_back(promise.getFuture());
789   }
790
791   return futures;
792 }
793
794 // reduce
795
796 template <class T>
797 template <class I, class F>
798 Future<I> Future<T>::reduce(I&& initial, F&& func) {
799   folly::MoveWrapper<I> minitial(std::move(initial));
800   folly::MoveWrapper<F> mfunc(std::move(func));
801   return then([minitial, mfunc](T& vals) mutable {
802     auto ret = std::move(*minitial);
803     for (auto& val : vals) {
804       ret = (*mfunc)(std::move(ret), std::move(val));
805     }
806     return ret;
807   });
808 }
809
810 // unorderedReduce (iterator)
811
812 template <class It, class T, class F, class ItT, class Arg>
813 Future<T> unorderedReduce(It first, It last, T initial, F func) {
814   if (first == last) {
815     return makeFuture(std::move(initial));
816   }
817
818   typedef isTry<Arg> IsTry;
819
820   struct UnorderedReduceContext {
821     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
822         : lock_(), memo_(makeFuture<T>(std::move(memo))),
823           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
824       {};
825     folly::MicroSpinLock lock_; // protects memo_ and numThens_
826     Future<T> memo_;
827     F func_;
828     size_t numThens_; // how many Futures completed and called .then()
829     size_t numFutures_; // how many Futures in total
830     Promise<T> promise_;
831   };
832
833   auto ctx = std::make_shared<UnorderedReduceContext>(
834     std::move(initial), std::move(func), std::distance(first, last));
835
836   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
837     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
838     // Futures can be completed in any order, simultaneously.
839     // To make this non-blocking, we create a new Future chain in
840     // the order of completion to reduce the values.
841     // The spinlock just protects chaining a new Future, not actually
842     // executing the reduce, which should be really fast.
843     folly::MSLGuard lock(ctx->lock_);
844     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
845       // Either return a ItT&& or a Try<ItT>&& depending
846       // on the type of the argument of func.
847       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
848     });
849     if (++ctx->numThens_ == ctx->numFutures_) {
850       // After reducing the value of the last Future, fulfill the Promise
851       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
852         ctx->promise_.setValue(std::move(t2));
853       });
854     }
855   });
856
857   return ctx->promise_.getFuture();
858 }
859
860 // within
861
862 template <class T>
863 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
864   return within(dur, TimedOut(), tk);
865 }
866
867 template <class T>
868 template <class E>
869 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
870
871   struct Context {
872     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
873     E exception;
874     Promise<T> promise;
875     std::atomic<bool> token;
876   };
877   auto ctx = std::make_shared<Context>(std::move(e));
878
879   if (!tk) {
880     tk = folly::detail::getTimekeeperSingleton();
881   }
882
883   tk->after(dur)
884     .then([ctx](Try<void> const& t) {
885       if (ctx->token.exchange(true) == false) {
886         if (t.hasException()) {
887           ctx->promise.setException(std::move(t.exception()));
888         } else {
889           ctx->promise.setException(std::move(ctx->exception));
890         }
891       }
892     });
893
894   this->then([ctx](Try<T>&& t) {
895     if (ctx->token.exchange(true) == false) {
896       ctx->promise.setTry(std::move(t));
897     }
898   });
899
900   return ctx->promise.getFuture().via(getExecutor());
901 }
902
903 // delayed
904
905 template <class T>
906 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
907   return collectAll(*this, futures::sleep(dur, tk))
908     .then([](std::tuple<Try<T>, Try<void>> tup) {
909       Try<T>& t = std::get<0>(tup);
910       return makeFuture<T>(std::move(t));
911     });
912 }
913
914 namespace detail {
915
916 template <class T>
917 void waitImpl(Future<T>& f) {
918   // short-circuit if there's nothing to do
919   if (f.isReady()) return;
920
921   folly::fibers::Baton baton;
922   f = f.then([&](Try<T> t) {
923     baton.post();
924     return makeFuture(std::move(t));
925   });
926   baton.wait();
927
928   // There's a race here between the return here and the actual finishing of
929   // the future. f is completed, but the setup may not have finished on done
930   // after the baton has posted.
931   while (!f.isReady()) {
932     std::this_thread::yield();
933   }
934 }
935
936 template <class T>
937 void waitImpl(Future<T>& f, Duration dur) {
938   // short-circuit if there's nothing to do
939   if (f.isReady()) return;
940
941   auto baton = std::make_shared<folly::fibers::Baton>();
942   f = f.then([baton](Try<T> t) {
943     baton->post();
944     return makeFuture(std::move(t));
945   });
946
947   // Let's preserve the invariant that if we did not timeout (timed_wait returns
948   // true), then the returned Future is complete when it is returned to the
949   // caller. We need to wait out the race for that Future to complete.
950   if (baton->timed_wait(dur)) {
951     while (!f.isReady()) {
952       std::this_thread::yield();
953     }
954   }
955 }
956
957 template <class T>
958 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
959   while (!f.isReady()) {
960     e->drive();
961   }
962 }
963
964 } // detail
965
966 template <class T>
967 Future<T>& Future<T>::wait() & {
968   detail::waitImpl(*this);
969   return *this;
970 }
971
972 template <class T>
973 Future<T>&& Future<T>::wait() && {
974   detail::waitImpl(*this);
975   return std::move(*this);
976 }
977
978 template <class T>
979 Future<T>& Future<T>::wait(Duration dur) & {
980   detail::waitImpl(*this, dur);
981   return *this;
982 }
983
984 template <class T>
985 Future<T>&& Future<T>::wait(Duration dur) && {
986   detail::waitImpl(*this, dur);
987   return std::move(*this);
988 }
989
990 template <class T>
991 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
992   detail::waitViaImpl(*this, e);
993   return *this;
994 }
995
996 template <class T>
997 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
998   detail::waitViaImpl(*this, e);
999   return std::move(*this);
1000 }
1001
1002 template <class T>
1003 T Future<T>::get() {
1004   return std::move(wait().value());
1005 }
1006
1007 template <>
1008 inline void Future<void>::get() {
1009   wait().value();
1010 }
1011
1012 template <class T>
1013 T Future<T>::get(Duration dur) {
1014   wait(dur);
1015   if (isReady()) {
1016     return std::move(value());
1017   } else {
1018     throw TimedOut();
1019   }
1020 }
1021
1022 template <>
1023 inline void Future<void>::get(Duration dur) {
1024   wait(dur);
1025   if (isReady()) {
1026     return;
1027   } else {
1028     throw TimedOut();
1029   }
1030 }
1031
1032 template <class T>
1033 T Future<T>::getVia(DrivableExecutor* e) {
1034   return std::move(waitVia(e).value());
1035 }
1036
1037 template <>
1038 inline void Future<void>::getVia(DrivableExecutor* e) {
1039   waitVia(e).value();
1040 }
1041
1042 namespace detail {
1043   template <class T>
1044   struct TryEquals {
1045     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1046       return t1.value() == t2.value();
1047     }
1048   };
1049
1050   template <>
1051   struct TryEquals<void> {
1052     static bool equals(const Try<void>& t1, const Try<void>& t2) {
1053       return true;
1054     }
1055   };
1056 }
1057
1058 template <class T>
1059 Future<bool> Future<T>::willEqual(Future<T>& f) {
1060   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1061     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1062       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1063     } else {
1064       return false;
1065       }
1066   });
1067 }
1068
1069 template <class T>
1070 template <class F>
1071 Future<T> Future<T>::filter(F predicate) {
1072   auto p = folly::makeMoveWrapper(std::move(predicate));
1073   return this->then([p](T val) {
1074     T const& valConstRef = val;
1075     if (!(*p)(valConstRef)) {
1076       throw PredicateDoesNotObtain();
1077     }
1078     return val;
1079   });
1080 }
1081
1082 template <class T>
1083 template <class Callback>
1084 auto Future<T>::thenMulti(Callback&& fn)
1085     -> decltype(this->then(std::forward<Callback>(fn))) {
1086   // thenMulti with one callback is just a then
1087   return then(std::forward<Callback>(fn));
1088 }
1089
1090 template <class T>
1091 template <class Callback, class... Callbacks>
1092 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1093     -> decltype(this->then(std::forward<Callback>(fn)).
1094                       thenMulti(std::forward<Callbacks>(fns)...)) {
1095   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1096   return then(std::forward<Callback>(fn)).
1097          thenMulti(std::forward<Callbacks>(fns)...);
1098 }
1099
1100 template <class T>
1101 template <class Callback, class... Callbacks>
1102 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1103                                       Callbacks&&... fns)
1104     -> decltype(this->then(std::forward<Callback>(fn)).
1105                       thenMulti(std::forward<Callbacks>(fns)...)) {
1106   // thenMultiExecutor with two callbacks is
1107   // via(x).then(a).thenMulti(b, ...).via(oldX)
1108   auto oldX = getExecutor();
1109   setExecutor(x);
1110   return then(std::forward<Callback>(fn)).
1111          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1112 }
1113
1114 template <class T>
1115 template <class Callback>
1116 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1117     -> decltype(this->then(std::forward<Callback>(fn))) {
1118   // thenMulti with one callback is just a then with an executor
1119   return then(x, std::forward<Callback>(fn));
1120 }
1121
1122 namespace futures {
1123   template <class It, class F, class ItT, class Result>
1124   std::vector<Future<Result>> map(It first, It last, F func) {
1125     std::vector<Future<Result>> results;
1126     for (auto it = first; it != last; it++) {
1127       results.push_back(it->then(func));
1128     }
1129     return results;
1130   }
1131 }
1132
1133 // Instantiate the most common Future types to save compile time
1134 extern template class Future<void>;
1135 extern template class Future<bool>;
1136 extern template class Future<int>;
1137 extern template class Future<int64_t>;
1138 extern template class Future<std::string>;
1139 extern template class Future<double>;
1140
1141 } // namespace folly
1142
1143 // I haven't included a Future<T&> specialization because I don't forsee us
1144 // using it, however it is not difficult to add when needed. Refer to
1145 // Future<void> for guidance. std::future and boost::future code would also be
1146 // instructive.