0b2a301c5014c134f5d5ebed226971df79d7aa7e
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
26
27 namespace folly {
28
29 class Timekeeper;
30
31 namespace detail {
32   Timekeeper* getTimekeeperSingleton();
33 }
34
35 template <class T>
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37   other.core_ = nullptr;
38 }
39
40 template <class T>
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42   std::swap(core_, other.core_);
43   return *this;
44 }
45
46 template <class T>
47 template <class T2, typename>
48 Future<T>::Future(T2&& val)
49   : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
50
51 template <class T>
52 template <class T2,
53           typename std::enable_if<
54             folly::is_void_or_unit<T2>::value,
55             int>::type>
56 Future<T>::Future()
57   : core_(new detail::Core<T>(Try<T>())) {}
58
59
60 template <class T>
61 Future<T>::~Future() {
62   detach();
63 }
64
65 template <class T>
66 void Future<T>::detach() {
67   if (core_) {
68     core_->detachFuture();
69     core_ = nullptr;
70   }
71 }
72
73 template <class T>
74 void Future<T>::throwIfInvalid() const {
75   if (!core_)
76     throw NoState();
77 }
78
79 template <class T>
80 template <class F>
81 void Future<T>::setCallback_(F&& func) {
82   throwIfInvalid();
83   core_->setCallback(std::move(func));
84 }
85
86 // unwrap
87
88 template <class T>
89 template <class F>
90 typename std::enable_if<isFuture<F>::value,
91                         Future<typename isFuture<T>::Inner>>::type
92 Future<T>::unwrap() {
93   return then([](Future<typename isFuture<T>::Inner> internal_future) {
94       return internal_future;
95   });
96 }
97
98 // then
99
100 // Variant: returns a value
101 // e.g. f.then([](Try<T>&& t){ return t.value(); });
102 template <class T>
103 template <typename F, typename R, bool isTry, typename... Args>
104 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
105 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
106   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
107   typedef typename R::ReturnsFuture::Inner B;
108
109   throwIfInvalid();
110
111   // wrap these so we can move them into the lambda
112   folly::MoveWrapper<Promise<B>> p;
113   p->setInterruptHandler(core_->getInterruptHandler());
114   folly::MoveWrapper<F> funcm(std::forward<F>(func));
115
116   // grab the Future now before we lose our handle on the Promise
117   auto f = p->getFuture();
118   if (getExecutor()) {
119     f.setExecutor(getExecutor());
120   }
121
122   /* This is a bit tricky.
123
124      We can't just close over *this in case this Future gets moved. So we
125      make a new dummy Future. We could figure out something more
126      sophisticated that avoids making a new Future object when it can, as an
127      optimization. But this is correct.
128
129      core_ can't be moved, it is explicitly disallowed (as is copying). But
130      if there's ever a reason to allow it, this is one place that makes that
131      assumption and would need to be fixed. We use a standard shared pointer
132      for core_ (by copying it in), which means in essence obj holds a shared
133      pointer to itself.  But this shouldn't leak because Promise will not
134      outlive the continuation, because Promise will setException() with a
135      broken Promise if it is destructed before completed. We could use a
136      weak pointer but it would have to be converted to a shared pointer when
137      func is executed (because the Future returned by func may possibly
138      persist beyond the callback, if it gets moved), and so it is an
139      optimization to just make it shared from the get-go.
140
141      We have to move in the Promise and func using the MoveWrapper
142      hack. (func could be copied but it's a big drag on perf).
143
144      Two subtle but important points about this design. detail::Core has no
145      back pointers to Future or Promise, so if Future or Promise get moved
146      (and they will be moved in performant code) we don't have to do
147      anything fancy. And because we store the continuation in the
148      detail::Core, not in the Future, we can execute the continuation even
149      after the Future has gone out of scope. This is an intentional design
150      decision. It is likely we will want to be able to cancel a continuation
151      in some circumstances, but I think it should be explicit not implicit
152      in the destruction of the Future used to create it.
153      */
154   setCallback_(
155     [p, funcm](Try<T>&& t) mutable {
156       if (!isTry && t.hasException()) {
157         p->setException(std::move(t.exception()));
158       } else {
159         p->setWith([&]() {
160           return (*funcm)(t.template get<isTry, Args>()...);
161         });
162       }
163     });
164
165   return f;
166 }
167
168 // Variant: returns a Future
169 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
170 template <class T>
171 template <typename F, typename R, bool isTry, typename... Args>
172 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
173 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
174   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
175   typedef typename R::ReturnsFuture::Inner B;
176
177   throwIfInvalid();
178
179   // wrap these so we can move them into the lambda
180   folly::MoveWrapper<Promise<B>> p;
181   folly::MoveWrapper<F> funcm(std::forward<F>(func));
182
183   // grab the Future now before we lose our handle on the Promise
184   auto f = p->getFuture();
185   if (getExecutor()) {
186     f.setExecutor(getExecutor());
187   }
188
189   setCallback_(
190     [p, funcm](Try<T>&& t) mutable {
191       if (!isTry && t.hasException()) {
192         p->setException(std::move(t.exception()));
193       } else {
194         try {
195           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
196           // that didn't throw, now we can steal p
197           f2.setCallback_([p](Try<B>&& b) mutable {
198             p->setTry(std::move(b));
199           });
200         } catch (const std::exception& e) {
201           p->setException(exception_wrapper(std::current_exception(), e));
202         } catch (...) {
203           p->setException(exception_wrapper(std::current_exception()));
204         }
205       }
206     });
207
208   return f;
209 }
210
211 template <typename T>
212 template <typename R, typename Caller, typename... Args>
213   Future<typename isFuture<R>::Inner>
214 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
215   typedef typename std::remove_cv<
216     typename std::remove_reference<
217       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
218   return then([instance, func](Try<T>&& t){
219     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
220   });
221 }
222
223 template <class T>
224 template <class Executor, class Arg, class... Args>
225 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
226   -> decltype(this->then(std::forward<Arg>(arg),
227                          std::forward<Args>(args)...))
228 {
229   auto oldX = getExecutor();
230   setExecutor(x);
231   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
232                via(oldX);
233 }
234
235 template <class T>
236 Future<void> Future<T>::then() {
237   return then([] (Try<T>&& t) {});
238 }
239
240 // onError where the callback returns T
241 template <class T>
242 template <class F>
243 typename std::enable_if<
244   !detail::callableWith<F, exception_wrapper>::value &&
245   !detail::Extract<F>::ReturnsFuture::value,
246   Future<T>>::type
247 Future<T>::onError(F&& func) {
248   typedef typename detail::Extract<F>::FirstArg Exn;
249   static_assert(
250       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
251       "Return type of onError callback must be T or Future<T>");
252
253   Promise<T> p;
254   auto f = p.getFuture();
255   auto pm = folly::makeMoveWrapper(std::move(p));
256   auto funcm = folly::makeMoveWrapper(std::move(func));
257   setCallback_([pm, funcm](Try<T>&& t) mutable {
258     if (!t.template withException<Exn>([&] (Exn& e) {
259           pm->setWith([&]{
260             return (*funcm)(e);
261           });
262         })) {
263       pm->setTry(std::move(t));
264     }
265   });
266
267   return f;
268 }
269
270 // onError where the callback returns Future<T>
271 template <class T>
272 template <class F>
273 typename std::enable_if<
274   !detail::callableWith<F, exception_wrapper>::value &&
275   detail::Extract<F>::ReturnsFuture::value,
276   Future<T>>::type
277 Future<T>::onError(F&& func) {
278   static_assert(
279       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
280       "Return type of onError callback must be T or Future<T>");
281   typedef typename detail::Extract<F>::FirstArg Exn;
282
283   Promise<T> p;
284   auto f = p.getFuture();
285   auto pm = folly::makeMoveWrapper(std::move(p));
286   auto funcm = folly::makeMoveWrapper(std::move(func));
287   setCallback_([pm, funcm](Try<T>&& t) mutable {
288     if (!t.template withException<Exn>([&] (Exn& e) {
289           try {
290             auto f2 = (*funcm)(e);
291             f2.setCallback_([pm](Try<T>&& t2) mutable {
292               pm->setTry(std::move(t2));
293             });
294           } catch (const std::exception& e2) {
295             pm->setException(exception_wrapper(std::current_exception(), e2));
296           } catch (...) {
297             pm->setException(exception_wrapper(std::current_exception()));
298           }
299         })) {
300       pm->setTry(std::move(t));
301     }
302   });
303
304   return f;
305 }
306
307 template <class T>
308 template <class F>
309 Future<T> Future<T>::ensure(F func) {
310   MoveWrapper<F> funcw(std::move(func));
311   return this->then([funcw](Try<T>&& t) {
312     (*funcw)();
313     return makeFuture(std::move(t));
314   });
315 }
316
317 template <class T>
318 template <class F>
319 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
320   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
321   return within(dur, tk)
322     .onError([funcw](TimedOut const&) { return (*funcw)(); });
323 }
324
325 template <class T>
326 template <class F>
327 typename std::enable_if<
328   detail::callableWith<F, exception_wrapper>::value &&
329   detail::Extract<F>::ReturnsFuture::value,
330   Future<T>>::type
331 Future<T>::onError(F&& func) {
332   static_assert(
333       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
334       "Return type of onError callback must be T or Future<T>");
335
336   Promise<T> p;
337   auto f = p.getFuture();
338   auto pm = folly::makeMoveWrapper(std::move(p));
339   auto funcm = folly::makeMoveWrapper(std::move(func));
340   setCallback_([pm, funcm](Try<T> t) mutable {
341     if (t.hasException()) {
342       try {
343         auto f2 = (*funcm)(std::move(t.exception()));
344         f2.setCallback_([pm](Try<T> t2) mutable {
345           pm->setTry(std::move(t2));
346         });
347       } catch (const std::exception& e2) {
348         pm->setException(exception_wrapper(std::current_exception(), e2));
349       } catch (...) {
350         pm->setException(exception_wrapper(std::current_exception()));
351       }
352     } else {
353       pm->setTry(std::move(t));
354     }
355   });
356
357   return f;
358 }
359
360 // onError(exception_wrapper) that returns T
361 template <class T>
362 template <class F>
363 typename std::enable_if<
364   detail::callableWith<F, exception_wrapper>::value &&
365   !detail::Extract<F>::ReturnsFuture::value,
366   Future<T>>::type
367 Future<T>::onError(F&& func) {
368   static_assert(
369       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
370       "Return type of onError callback must be T or Future<T>");
371
372   Promise<T> p;
373   auto f = p.getFuture();
374   auto pm = folly::makeMoveWrapper(std::move(p));
375   auto funcm = folly::makeMoveWrapper(std::move(func));
376   setCallback_([pm, funcm](Try<T> t) mutable {
377     if (t.hasException()) {
378       pm->setWith([&]{
379         return (*funcm)(std::move(t.exception()));
380       });
381     } else {
382       pm->setTry(std::move(t));
383     }
384   });
385
386   return f;
387 }
388
389 template <class T>
390 typename std::add_lvalue_reference<T>::type Future<T>::value() {
391   throwIfInvalid();
392
393   return core_->getTry().value();
394 }
395
396 template <class T>
397 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
398   throwIfInvalid();
399
400   return core_->getTry().value();
401 }
402
403 template <class T>
404 Try<T>& Future<T>::getTry() {
405   throwIfInvalid();
406
407   return core_->getTry();
408 }
409
410 template <class T>
411 Optional<Try<T>> Future<T>::poll() {
412   Optional<Try<T>> o;
413   if (core_->ready()) {
414     o = std::move(core_->getTry());
415   }
416   return o;
417 }
418
419 template <class T>
420 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
421   throwIfInvalid();
422
423   setExecutor(executor, priority);
424
425   return std::move(*this);
426 }
427
428 template <class T>
429 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
430   throwIfInvalid();
431
432   MoveWrapper<Promise<T>> p;
433   auto f = p->getFuture();
434   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
435   return std::move(f).via(executor, priority);
436 }
437
438 template <class T>
439 bool Future<T>::isReady() const {
440   throwIfInvalid();
441   return core_->ready();
442 }
443
444 template <class T>
445 void Future<T>::raise(exception_wrapper exception) {
446   core_->raise(std::move(exception));
447 }
448
449 // makeFuture
450
451 template <class T>
452 Future<typename std::decay<T>::type> makeFuture(T&& t) {
453   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
454 }
455
456 inline // for multiple translation units
457 Future<void> makeFuture() {
458   return makeFuture(Try<void>());
459 }
460
461 template <class F>
462 auto makeFutureWith(
463     F&& func,
464     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
465     -> Future<decltype(func())> {
466   return makeFuture(makeTryWith([&func]() {
467     return (func)();
468   }));
469 }
470
471 template <class F>
472 auto makeFutureWith(F const& func) -> Future<decltype(func())> {
473   F copy = func;
474   return makeFuture(makeTryWith(std::move(copy)));
475 }
476
477 template <class T>
478 Future<T> makeFuture(std::exception_ptr const& e) {
479   return makeFuture(Try<T>(e));
480 }
481
482 template <class T>
483 Future<T> makeFuture(exception_wrapper ew) {
484   return makeFuture(Try<T>(std::move(ew)));
485 }
486
487 template <class T, class E>
488 typename std::enable_if<std::is_base_of<std::exception, E>::value,
489                         Future<T>>::type
490 makeFuture(E const& e) {
491   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
492 }
493
494 template <class T>
495 Future<T> makeFuture(Try<T>&& t) {
496   return Future<T>(new detail::Core<T>(std::move(t)));
497 }
498
499 // via
500 Future<void> via(Executor* executor, int8_t priority) {
501   return makeFuture().via(executor, priority);
502 }
503
504 // mapSetCallback calls func(i, Try<T>) when every future completes
505
506 template <class T, class InputIterator, class F>
507 void mapSetCallback(InputIterator first, InputIterator last, F func) {
508   for (size_t i = 0; first != last; ++first, ++i) {
509     first->setCallback_([func, i](Try<T>&& t) {
510       func(i, std::move(t));
511     });
512   }
513 }
514
515 // collectAll (variadic)
516
517 template <typename... Fs>
518 typename detail::CollectAllVariadicContext<
519   typename std::decay<Fs>::type::value_type...>::type
520 collectAll(Fs&&... fs) {
521   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
522     typename std::decay<Fs>::type::value_type...>>();
523   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
524     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
525   return ctx->p.getFuture();
526 }
527
528 // collectAll (iterator)
529
530 template <class InputIterator>
531 Future<
532   std::vector<
533   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
534 collectAll(InputIterator first, InputIterator last) {
535   typedef
536     typename std::iterator_traits<InputIterator>::value_type::value_type T;
537
538   struct CollectAllContext {
539     CollectAllContext(int n) : results(n) {}
540     ~CollectAllContext() {
541       p.setValue(std::move(results));
542     }
543     Promise<std::vector<Try<T>>> p;
544     std::vector<Try<T>> results;
545   };
546
547   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
548   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
549     ctx->results[i] = std::move(t);
550   });
551   return ctx->p.getFuture();
552 }
553
554 // collect (iterator)
555
556 namespace detail {
557
558 template <typename T>
559 struct CollectContext {
560   struct Nothing { explicit Nothing(int n) {} };
561
562   using Result = typename std::conditional<
563     std::is_void<T>::value,
564     void,
565     std::vector<T>>::type;
566
567   using InternalResult = typename std::conditional<
568     std::is_void<T>::value,
569     Nothing,
570     std::vector<Optional<T>>>::type;
571
572   explicit CollectContext(int n) : result(n) {}
573   ~CollectContext() {
574     if (!threw.exchange(true)) {
575       // map Optional<T> -> T
576       std::vector<T> finalResult;
577       finalResult.reserve(result.size());
578       std::transform(result.begin(), result.end(),
579                      std::back_inserter(finalResult),
580                      [](Optional<T>& o) { return std::move(o.value()); });
581       p.setValue(std::move(finalResult));
582     }
583   }
584   inline void setPartialResult(size_t i, Try<T>& t) {
585     result[i] = std::move(t.value());
586   }
587   Promise<Result> p;
588   InternalResult result;
589   std::atomic<bool> threw;
590 };
591
592 // Specialize for void (implementations in Future.cpp)
593
594 template <>
595 CollectContext<void>::~CollectContext();
596
597 template <>
598 void CollectContext<void>::setPartialResult(size_t i, Try<void>& t);
599
600 }
601
602 template <class InputIterator>
603 Future<typename detail::CollectContext<
604   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
605 collect(InputIterator first, InputIterator last) {
606   typedef
607     typename std::iterator_traits<InputIterator>::value_type::value_type T;
608
609   auto ctx = std::make_shared<detail::CollectContext<T>>(
610     std::distance(first, last));
611   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
612     if (t.hasException()) {
613        if (!ctx->threw.exchange(true)) {
614          ctx->p.setException(std::move(t.exception()));
615        }
616      } else if (!ctx->threw) {
617        ctx->setPartialResult(i, t);
618      }
619   });
620   return ctx->p.getFuture();
621 }
622
623 // collect (variadic)
624
625 template <typename... Fs>
626 typename detail::CollectVariadicContext<
627   typename std::decay<Fs>::type::value_type...>::type
628 collect(Fs&&... fs) {
629   auto ctx = std::make_shared<detail::CollectVariadicContext<
630     typename std::decay<Fs>::type::value_type...>>();
631   detail::collectVariadicHelper<detail::CollectVariadicContext>(
632     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
633   return ctx->p.getFuture();
634 }
635
636 // collectAny (iterator)
637
638 template <class InputIterator>
639 Future<
640   std::pair<size_t,
641             Try<
642               typename
643               std::iterator_traits<InputIterator>::value_type::value_type>>>
644 collectAny(InputIterator first, InputIterator last) {
645   typedef
646     typename std::iterator_traits<InputIterator>::value_type::value_type T;
647
648   struct CollectAnyContext {
649     CollectAnyContext(size_t n) : done(false) {};
650     Promise<std::pair<size_t, Try<T>>> p;
651     std::atomic<bool> done;
652   };
653
654   auto ctx = std::make_shared<CollectAnyContext>(std::distance(first, last));
655   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
656     if (!ctx->done.exchange(true)) {
657       ctx->p.setValue(std::make_pair(i, std::move(t)));
658     }
659   });
660   return ctx->p.getFuture();
661 }
662
663 // collectN (iterator)
664
665 template <class InputIterator>
666 Future<std::vector<std::pair<size_t, Try<typename
667   std::iterator_traits<InputIterator>::value_type::value_type>>>>
668 collectN(InputIterator first, InputIterator last, size_t n) {
669   typedef typename
670     std::iterator_traits<InputIterator>::value_type::value_type T;
671   typedef std::vector<std::pair<size_t, Try<T>>> V;
672
673   struct CollectNContext {
674     V v;
675     std::atomic<size_t> completed = {0};
676     Promise<V> p;
677   };
678   auto ctx = std::make_shared<CollectNContext>();
679
680   if (size_t(std::distance(first, last)) < n) {
681     ctx->p.setException(std::runtime_error("Not enough futures"));
682   } else {
683     // for each completed Future, increase count and add to vector, until we
684     // have n completed futures at which point we fulfil our Promise with the
685     // vector
686     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
687       auto c = ++ctx->completed;
688       if (c <= n) {
689         assert(ctx->v.size() < n);
690         ctx->v.push_back(std::make_pair(i, std::move(t)));
691         if (c == n) {
692           ctx->p.setTry(Try<V>(std::move(ctx->v)));
693         }
694       }
695     });
696   }
697
698   return ctx->p.getFuture();
699 }
700
701 // reduce (iterator)
702
703 template <class It, class T, class F>
704 Future<T> reduce(It first, It last, T&& initial, F&& func) {
705   if (first == last) {
706     return makeFuture(std::move(initial));
707   }
708
709   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
710   typedef typename std::conditional<
711     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
712   typedef isTry<Arg> IsTry;
713
714   folly::MoveWrapper<T> minitial(std::move(initial));
715   auto sfunc = std::make_shared<F>(std::move(func));
716
717   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
718     return (*sfunc)(std::move(*minitial),
719                 head.template get<IsTry::value, Arg&&>());
720   });
721
722   for (++first; first != last; ++first) {
723     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
724       return (*sfunc)(std::move(std::get<0>(t).value()),
725                   // Either return a ItT&& or a Try<ItT>&& depending
726                   // on the type of the argument of func.
727                   std::get<1>(t).template get<IsTry::value, Arg&&>());
728     });
729   }
730
731   return f;
732 }
733
734 // window (collection)
735
736 template <class Collection, class F, class ItT, class Result>
737 std::vector<Future<Result>>
738 window(Collection input, F func, size_t n) {
739   struct WindowContext {
740     WindowContext(Collection&& i, F&& fn)
741         : i_(0), input_(std::move(i)), promises_(input_.size()),
742           func_(std::move(fn))
743       {}
744     std::atomic<size_t> i_;
745     Collection input_;
746     std::vector<Promise<Result>> promises_;
747     F func_;
748
749     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
750       size_t i = ctx->i_++;
751       if (i < ctx->input_.size()) {
752         // Using setCallback_ directly since we don't need the Future
753         ctx->func_(std::move(ctx->input_[i])).setCallback_(
754           // ctx is captured by value
755           [ctx, i](Try<Result>&& t) {
756             ctx->promises_[i].setTry(std::move(t));
757             // Chain another future onto this one
758             spawn(std::move(ctx));
759           });
760       }
761     }
762   };
763
764   auto max = std::min(n, input.size());
765
766   auto ctx = std::make_shared<WindowContext>(
767     std::move(input), std::move(func));
768
769   for (size_t i = 0; i < max; ++i) {
770     // Start the first n Futures
771     WindowContext::spawn(ctx);
772   }
773
774   std::vector<Future<Result>> futures;
775   futures.reserve(ctx->promises_.size());
776   for (auto& promise : ctx->promises_) {
777     futures.emplace_back(promise.getFuture());
778   }
779
780   return futures;
781 }
782
783 // reduce
784
785 template <class T>
786 template <class I, class F>
787 Future<I> Future<T>::reduce(I&& initial, F&& func) {
788   folly::MoveWrapper<I> minitial(std::move(initial));
789   folly::MoveWrapper<F> mfunc(std::move(func));
790   return then([minitial, mfunc](T& vals) mutable {
791     auto ret = std::move(*minitial);
792     for (auto& val : vals) {
793       ret = (*mfunc)(std::move(ret), std::move(val));
794     }
795     return ret;
796   });
797 }
798
799 // unorderedReduce (iterator)
800
801 template <class It, class T, class F, class ItT, class Arg>
802 Future<T> unorderedReduce(It first, It last, T initial, F func) {
803   if (first == last) {
804     return makeFuture(std::move(initial));
805   }
806
807   typedef isTry<Arg> IsTry;
808
809   struct UnorderedReduceContext {
810     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
811         : lock_(), memo_(makeFuture<T>(std::move(memo))),
812           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
813       {};
814     folly::MicroSpinLock lock_; // protects memo_ and numThens_
815     Future<T> memo_;
816     F func_;
817     size_t numThens_; // how many Futures completed and called .then()
818     size_t numFutures_; // how many Futures in total
819     Promise<T> promise_;
820   };
821
822   auto ctx = std::make_shared<UnorderedReduceContext>(
823     std::move(initial), std::move(func), std::distance(first, last));
824
825   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
826     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
827     // Futures can be completed in any order, simultaneously.
828     // To make this non-blocking, we create a new Future chain in
829     // the order of completion to reduce the values.
830     // The spinlock just protects chaining a new Future, not actually
831     // executing the reduce, which should be really fast.
832     folly::MSLGuard lock(ctx->lock_);
833     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
834       // Either return a ItT&& or a Try<ItT>&& depending
835       // on the type of the argument of func.
836       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
837     });
838     if (++ctx->numThens_ == ctx->numFutures_) {
839       // After reducing the value of the last Future, fulfill the Promise
840       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
841         ctx->promise_.setValue(std::move(t2));
842       });
843     }
844   });
845
846   return ctx->promise_.getFuture();
847 }
848
849 // within
850
851 template <class T>
852 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
853   return within(dur, TimedOut(), tk);
854 }
855
856 template <class T>
857 template <class E>
858 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
859
860   struct Context {
861     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
862     E exception;
863     Promise<T> promise;
864     std::atomic<bool> token;
865   };
866   auto ctx = std::make_shared<Context>(std::move(e));
867
868   if (!tk) {
869     tk = folly::detail::getTimekeeperSingleton();
870   }
871
872   tk->after(dur)
873     .then([ctx](Try<void> const& t) {
874       if (ctx->token.exchange(true) == false) {
875         if (t.hasException()) {
876           ctx->promise.setException(std::move(t.exception()));
877         } else {
878           ctx->promise.setException(std::move(ctx->exception));
879         }
880       }
881     });
882
883   this->then([ctx](Try<T>&& t) {
884     if (ctx->token.exchange(true) == false) {
885       ctx->promise.setTry(std::move(t));
886     }
887   });
888
889   return ctx->promise.getFuture().via(getExecutor());
890 }
891
892 // delayed
893
894 template <class T>
895 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
896   return collectAll(*this, futures::sleep(dur, tk))
897     .then([](std::tuple<Try<T>, Try<void>> tup) {
898       Try<T>& t = std::get<0>(tup);
899       return makeFuture<T>(std::move(t));
900     });
901 }
902
903 namespace detail {
904
905 template <class T>
906 void waitImpl(Future<T>& f) {
907   // short-circuit if there's nothing to do
908   if (f.isReady()) return;
909
910   folly::fibers::Baton baton;
911   f = f.then([&](Try<T> t) {
912     baton.post();
913     return makeFuture(std::move(t));
914   });
915   baton.wait();
916
917   // There's a race here between the return here and the actual finishing of
918   // the future. f is completed, but the setup may not have finished on done
919   // after the baton has posted.
920   while (!f.isReady()) {
921     std::this_thread::yield();
922   }
923 }
924
925 template <class T>
926 void waitImpl(Future<T>& f, Duration dur) {
927   // short-circuit if there's nothing to do
928   if (f.isReady()) return;
929
930   auto baton = std::make_shared<folly::fibers::Baton>();
931   f = f.then([baton](Try<T> t) {
932     baton->post();
933     return makeFuture(std::move(t));
934   });
935
936   // Let's preserve the invariant that if we did not timeout (timed_wait returns
937   // true), then the returned Future is complete when it is returned to the
938   // caller. We need to wait out the race for that Future to complete.
939   if (baton->timed_wait(dur)) {
940     while (!f.isReady()) {
941       std::this_thread::yield();
942     }
943   }
944 }
945
946 template <class T>
947 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
948   while (!f.isReady()) {
949     e->drive();
950   }
951 }
952
953 } // detail
954
955 template <class T>
956 Future<T>& Future<T>::wait() & {
957   detail::waitImpl(*this);
958   return *this;
959 }
960
961 template <class T>
962 Future<T>&& Future<T>::wait() && {
963   detail::waitImpl(*this);
964   return std::move(*this);
965 }
966
967 template <class T>
968 Future<T>& Future<T>::wait(Duration dur) & {
969   detail::waitImpl(*this, dur);
970   return *this;
971 }
972
973 template <class T>
974 Future<T>&& Future<T>::wait(Duration dur) && {
975   detail::waitImpl(*this, dur);
976   return std::move(*this);
977 }
978
979 template <class T>
980 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
981   detail::waitViaImpl(*this, e);
982   return *this;
983 }
984
985 template <class T>
986 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
987   detail::waitViaImpl(*this, e);
988   return std::move(*this);
989 }
990
991 template <class T>
992 T Future<T>::get() {
993   return std::move(wait().value());
994 }
995
996 template <>
997 inline void Future<void>::get() {
998   wait().value();
999 }
1000
1001 template <class T>
1002 T Future<T>::get(Duration dur) {
1003   wait(dur);
1004   if (isReady()) {
1005     return std::move(value());
1006   } else {
1007     throw TimedOut();
1008   }
1009 }
1010
1011 template <>
1012 inline void Future<void>::get(Duration dur) {
1013   wait(dur);
1014   if (isReady()) {
1015     return;
1016   } else {
1017     throw TimedOut();
1018   }
1019 }
1020
1021 template <class T>
1022 T Future<T>::getVia(DrivableExecutor* e) {
1023   return std::move(waitVia(e).value());
1024 }
1025
1026 template <>
1027 inline void Future<void>::getVia(DrivableExecutor* e) {
1028   waitVia(e).value();
1029 }
1030
1031 namespace detail {
1032   template <class T>
1033   struct TryEquals {
1034     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1035       return t1.value() == t2.value();
1036     }
1037   };
1038
1039   template <>
1040   struct TryEquals<void> {
1041     static bool equals(const Try<void>& t1, const Try<void>& t2) {
1042       return true;
1043     }
1044   };
1045 }
1046
1047 template <class T>
1048 Future<bool> Future<T>::willEqual(Future<T>& f) {
1049   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1050     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1051       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1052     } else {
1053       return false;
1054       }
1055   });
1056 }
1057
1058 template <class T>
1059 template <class F>
1060 Future<T> Future<T>::filter(F predicate) {
1061   auto p = folly::makeMoveWrapper(std::move(predicate));
1062   return this->then([p](T val) {
1063     T const& valConstRef = val;
1064     if (!(*p)(valConstRef)) {
1065       throw PredicateDoesNotObtain();
1066     }
1067     return val;
1068   });
1069 }
1070
1071 template <class T>
1072 template <class Callback>
1073 auto Future<T>::thenMulti(Callback&& fn)
1074     -> decltype(this->then(std::forward<Callback>(fn))) {
1075   // thenMulti with one callback is just a then
1076   return then(std::forward<Callback>(fn));
1077 }
1078
1079 template <class T>
1080 template <class Callback, class... Callbacks>
1081 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1082     -> decltype(this->then(std::forward<Callback>(fn)).
1083                       thenMulti(std::forward<Callbacks>(fns)...)) {
1084   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1085   return then(std::forward<Callback>(fn)).
1086          thenMulti(std::forward<Callbacks>(fns)...);
1087 }
1088
1089 template <class T>
1090 template <class Callback, class... Callbacks>
1091 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1092                                       Callbacks&&... fns)
1093     -> decltype(this->then(std::forward<Callback>(fn)).
1094                       thenMulti(std::forward<Callbacks>(fns)...)) {
1095   // thenMultiExecutor with two callbacks is
1096   // via(x).then(a).thenMulti(b, ...).via(oldX)
1097   auto oldX = getExecutor();
1098   setExecutor(x);
1099   return then(std::forward<Callback>(fn)).
1100          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1101 }
1102
1103 template <class T>
1104 template <class Callback>
1105 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1106     -> decltype(this->then(std::forward<Callback>(fn))) {
1107   // thenMulti with one callback is just a then with an executor
1108   return then(x, std::forward<Callback>(fn));
1109 }
1110
1111 namespace futures {
1112   template <class It, class F, class ItT, class Result>
1113   std::vector<Future<Result>> map(It first, It last, F func) {
1114     std::vector<Future<Result>> results;
1115     for (auto it = first; it != last; it++) {
1116       results.push_back(it->then(func));
1117     }
1118     return results;
1119   }
1120 }
1121
1122 // Instantiate the most common Future types to save compile time
1123 extern template class Future<void>;
1124 extern template class Future<bool>;
1125 extern template class Future<int>;
1126 extern template class Future<int64_t>;
1127 extern template class Future<std::string>;
1128 extern template class Future<double>;
1129
1130 } // namespace folly
1131
1132 // I haven't included a Future<T&> specialization because I don't forsee us
1133 // using it, however it is not difficult to add when needed. Refer to
1134 // Future<void> for guidance. std::future and boost::future code would also be
1135 // instructive.