Chain executor in timeout functions
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
26
27 namespace folly {
28
29 class Timekeeper;
30
31 namespace detail {
32   Timekeeper* getTimekeeperSingleton();
33 }
34
35 template <class T>
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37   other.core_ = nullptr;
38 }
39
40 template <class T>
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42   std::swap(core_, other.core_);
43   return *this;
44 }
45
46 template <class T>
47 template <class T2, typename>
48 Future<T>::Future(T2&& val) : core_(nullptr) {
49   Promise<T> p;
50   p.setValue(std::forward<T2>(val));
51   *this = p.getFuture();
52 }
53
54 template <class T>
55 template <class T2,
56           typename std::enable_if<
57             folly::is_void_or_unit<T2>::value,
58             int>::type>
59 Future<T>::Future() : core_(nullptr) {
60   Promise<T> p;
61   p.setValue();
62   *this = p.getFuture();
63 }
64
65
66 template <class T>
67 Future<T>::~Future() {
68   detach();
69 }
70
71 template <class T>
72 void Future<T>::detach() {
73   if (core_) {
74     core_->detachFuture();
75     core_ = nullptr;
76   }
77 }
78
79 template <class T>
80 void Future<T>::throwIfInvalid() const {
81   if (!core_)
82     throw NoState();
83 }
84
85 template <class T>
86 template <class F>
87 void Future<T>::setCallback_(F&& func) {
88   throwIfInvalid();
89   core_->setCallback(std::move(func));
90 }
91
92 // unwrap
93
94 template <class T>
95 template <class F>
96 typename std::enable_if<isFuture<F>::value,
97                         Future<typename isFuture<T>::Inner>>::type
98 Future<T>::unwrap() {
99   return then([](Future<typename isFuture<T>::Inner> internal_future) {
100       return internal_future;
101   });
102 }
103
104 // then
105
106 // Variant: returns a value
107 // e.g. f.then([](Try<T>&& t){ return t.value(); });
108 template <class T>
109 template <typename F, typename R, bool isTry, typename... Args>
110 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
111 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
112   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
113   typedef typename R::ReturnsFuture::Inner B;
114
115   throwIfInvalid();
116
117   // wrap these so we can move them into the lambda
118   folly::MoveWrapper<Promise<B>> p;
119   p->setInterruptHandler(core_->getInterruptHandler());
120   folly::MoveWrapper<F> funcm(std::forward<F>(func));
121
122   // grab the Future now before we lose our handle on the Promise
123   auto f = p->getFuture();
124   if (getExecutor()) {
125     f.setExecutor(getExecutor());
126   }
127
128   /* This is a bit tricky.
129
130      We can't just close over *this in case this Future gets moved. So we
131      make a new dummy Future. We could figure out something more
132      sophisticated that avoids making a new Future object when it can, as an
133      optimization. But this is correct.
134
135      core_ can't be moved, it is explicitly disallowed (as is copying). But
136      if there's ever a reason to allow it, this is one place that makes that
137      assumption and would need to be fixed. We use a standard shared pointer
138      for core_ (by copying it in), which means in essence obj holds a shared
139      pointer to itself.  But this shouldn't leak because Promise will not
140      outlive the continuation, because Promise will setException() with a
141      broken Promise if it is destructed before completed. We could use a
142      weak pointer but it would have to be converted to a shared pointer when
143      func is executed (because the Future returned by func may possibly
144      persist beyond the callback, if it gets moved), and so it is an
145      optimization to just make it shared from the get-go.
146
147      We have to move in the Promise and func using the MoveWrapper
148      hack. (func could be copied but it's a big drag on perf).
149
150      Two subtle but important points about this design. detail::Core has no
151      back pointers to Future or Promise, so if Future or Promise get moved
152      (and they will be moved in performant code) we don't have to do
153      anything fancy. And because we store the continuation in the
154      detail::Core, not in the Future, we can execute the continuation even
155      after the Future has gone out of scope. This is an intentional design
156      decision. It is likely we will want to be able to cancel a continuation
157      in some circumstances, but I think it should be explicit not implicit
158      in the destruction of the Future used to create it.
159      */
160   setCallback_(
161     [p, funcm](Try<T>&& t) mutable {
162       if (!isTry && t.hasException()) {
163         p->setException(std::move(t.exception()));
164       } else {
165         p->setWith([&]() {
166           return (*funcm)(t.template get<isTry, Args>()...);
167         });
168       }
169     });
170
171   return f;
172 }
173
174 // Variant: returns a Future
175 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
176 template <class T>
177 template <typename F, typename R, bool isTry, typename... Args>
178 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
179 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
180   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
181   typedef typename R::ReturnsFuture::Inner B;
182
183   throwIfInvalid();
184
185   // wrap these so we can move them into the lambda
186   folly::MoveWrapper<Promise<B>> p;
187   folly::MoveWrapper<F> funcm(std::forward<F>(func));
188
189   // grab the Future now before we lose our handle on the Promise
190   auto f = p->getFuture();
191   if (getExecutor()) {
192     f.setExecutor(getExecutor());
193   }
194
195   setCallback_(
196     [p, funcm](Try<T>&& t) mutable {
197       if (!isTry && t.hasException()) {
198         p->setException(std::move(t.exception()));
199       } else {
200         try {
201           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
202           // that didn't throw, now we can steal p
203           f2.setCallback_([p](Try<B>&& b) mutable {
204             p->setTry(std::move(b));
205           });
206         } catch (const std::exception& e) {
207           p->setException(exception_wrapper(std::current_exception(), e));
208         } catch (...) {
209           p->setException(exception_wrapper(std::current_exception()));
210         }
211       }
212     });
213
214   return f;
215 }
216
217 template <typename T>
218 template <typename R, typename Caller, typename... Args>
219   Future<typename isFuture<R>::Inner>
220 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
221   typedef typename std::remove_cv<
222     typename std::remove_reference<
223       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
224   return then([instance, func](Try<T>&& t){
225     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
226   });
227 }
228
229 template <class T>
230 template <class Executor, class Arg, class... Args>
231 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
232   -> decltype(this->then(std::forward<Arg>(arg),
233                          std::forward<Args>(args)...))
234 {
235   auto oldX = getExecutor();
236   setExecutor(x);
237   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
238                via(oldX);
239 }
240
241 template <class T>
242 Future<void> Future<T>::then() {
243   return then([] (Try<T>&& t) {});
244 }
245
246 // onError where the callback returns T
247 template <class T>
248 template <class F>
249 typename std::enable_if<
250   !detail::callableWith<F, exception_wrapper>::value &&
251   !detail::Extract<F>::ReturnsFuture::value,
252   Future<T>>::type
253 Future<T>::onError(F&& func) {
254   typedef typename detail::Extract<F>::FirstArg Exn;
255   static_assert(
256       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
257       "Return type of onError callback must be T or Future<T>");
258
259   Promise<T> p;
260   auto f = p.getFuture();
261   auto pm = folly::makeMoveWrapper(std::move(p));
262   auto funcm = folly::makeMoveWrapper(std::move(func));
263   setCallback_([pm, funcm](Try<T>&& t) mutable {
264     if (!t.template withException<Exn>([&] (Exn& e) {
265           pm->setWith([&]{
266             return (*funcm)(e);
267           });
268         })) {
269       pm->setTry(std::move(t));
270     }
271   });
272
273   return f;
274 }
275
276 // onError where the callback returns Future<T>
277 template <class T>
278 template <class F>
279 typename std::enable_if<
280   !detail::callableWith<F, exception_wrapper>::value &&
281   detail::Extract<F>::ReturnsFuture::value,
282   Future<T>>::type
283 Future<T>::onError(F&& func) {
284   static_assert(
285       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
286       "Return type of onError callback must be T or Future<T>");
287   typedef typename detail::Extract<F>::FirstArg Exn;
288
289   Promise<T> p;
290   auto f = p.getFuture();
291   auto pm = folly::makeMoveWrapper(std::move(p));
292   auto funcm = folly::makeMoveWrapper(std::move(func));
293   setCallback_([pm, funcm](Try<T>&& t) mutable {
294     if (!t.template withException<Exn>([&] (Exn& e) {
295           try {
296             auto f2 = (*funcm)(e);
297             f2.setCallback_([pm](Try<T>&& t2) mutable {
298               pm->setTry(std::move(t2));
299             });
300           } catch (const std::exception& e2) {
301             pm->setException(exception_wrapper(std::current_exception(), e2));
302           } catch (...) {
303             pm->setException(exception_wrapper(std::current_exception()));
304           }
305         })) {
306       pm->setTry(std::move(t));
307     }
308   });
309
310   return f;
311 }
312
313 template <class T>
314 template <class F>
315 Future<T> Future<T>::ensure(F func) {
316   MoveWrapper<F> funcw(std::move(func));
317   return this->then([funcw](Try<T>&& t) {
318     (*funcw)();
319     return makeFuture(std::move(t));
320   });
321 }
322
323 template <class T>
324 template <class F>
325 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
326   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
327   return within(dur, tk)
328     .onError([funcw](TimedOut const&) { return (*funcw)(); });
329 }
330
331 template <class T>
332 template <class F>
333 typename std::enable_if<
334   detail::callableWith<F, exception_wrapper>::value &&
335   detail::Extract<F>::ReturnsFuture::value,
336   Future<T>>::type
337 Future<T>::onError(F&& func) {
338   static_assert(
339       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
340       "Return type of onError callback must be T or Future<T>");
341
342   Promise<T> p;
343   auto f = p.getFuture();
344   auto pm = folly::makeMoveWrapper(std::move(p));
345   auto funcm = folly::makeMoveWrapper(std::move(func));
346   setCallback_([pm, funcm](Try<T> t) mutable {
347     if (t.hasException()) {
348       try {
349         auto f2 = (*funcm)(std::move(t.exception()));
350         f2.setCallback_([pm](Try<T> t2) mutable {
351           pm->setTry(std::move(t2));
352         });
353       } catch (const std::exception& e2) {
354         pm->setException(exception_wrapper(std::current_exception(), e2));
355       } catch (...) {
356         pm->setException(exception_wrapper(std::current_exception()));
357       }
358     } else {
359       pm->setTry(std::move(t));
360     }
361   });
362
363   return f;
364 }
365
366 // onError(exception_wrapper) that returns T
367 template <class T>
368 template <class F>
369 typename std::enable_if<
370   detail::callableWith<F, exception_wrapper>::value &&
371   !detail::Extract<F>::ReturnsFuture::value,
372   Future<T>>::type
373 Future<T>::onError(F&& func) {
374   static_assert(
375       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
376       "Return type of onError callback must be T or Future<T>");
377
378   Promise<T> p;
379   auto f = p.getFuture();
380   auto pm = folly::makeMoveWrapper(std::move(p));
381   auto funcm = folly::makeMoveWrapper(std::move(func));
382   setCallback_([pm, funcm](Try<T> t) mutable {
383     if (t.hasException()) {
384       pm->setWith([&]{
385         return (*funcm)(std::move(t.exception()));
386       });
387     } else {
388       pm->setTry(std::move(t));
389     }
390   });
391
392   return f;
393 }
394
395 template <class T>
396 typename std::add_lvalue_reference<T>::type Future<T>::value() {
397   throwIfInvalid();
398
399   return core_->getTry().value();
400 }
401
402 template <class T>
403 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
404   throwIfInvalid();
405
406   return core_->getTry().value();
407 }
408
409 template <class T>
410 Try<T>& Future<T>::getTry() {
411   throwIfInvalid();
412
413   return core_->getTry();
414 }
415
416 template <class T>
417 Optional<Try<T>> Future<T>::poll() {
418   Optional<Try<T>> o;
419   if (core_->ready()) {
420     o = std::move(core_->getTry());
421   }
422   return o;
423 }
424
425 template <class T>
426 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
427   throwIfInvalid();
428
429   setExecutor(executor, priority);
430
431   return std::move(*this);
432 }
433
434 template <class T>
435 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
436   throwIfInvalid();
437
438   MoveWrapper<Promise<T>> p;
439   auto f = p->getFuture();
440   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
441   return std::move(f).via(executor, priority);
442 }
443
444 template <class T>
445 bool Future<T>::isReady() const {
446   throwIfInvalid();
447   return core_->ready();
448 }
449
450 template <class T>
451 void Future<T>::raise(exception_wrapper exception) {
452   core_->raise(std::move(exception));
453 }
454
455 // makeFuture
456
457 template <class T>
458 Future<typename std::decay<T>::type> makeFuture(T&& t) {
459   Promise<typename std::decay<T>::type> p;
460   p.setValue(std::forward<T>(t));
461   return p.getFuture();
462 }
463
464 inline // for multiple translation units
465 Future<void> makeFuture() {
466   Promise<void> p;
467   p.setValue();
468   return p.getFuture();
469 }
470
471 template <class F>
472 auto makeFutureWith(
473     F&& func,
474     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
475     -> Future<decltype(func())> {
476   Promise<decltype(func())> p;
477   p.setWith(
478     [&func]() {
479       return (func)();
480     });
481   return p.getFuture();
482 }
483
484 template <class F>
485 auto makeFutureWith(F const& func) -> Future<decltype(func())> {
486   F copy = func;
487   return makeFutureWith(std::move(copy));
488 }
489
490 template <class T>
491 Future<T> makeFuture(std::exception_ptr const& e) {
492   Promise<T> p;
493   p.setException(e);
494   return p.getFuture();
495 }
496
497 template <class T>
498 Future<T> makeFuture(exception_wrapper ew) {
499   Promise<T> p;
500   p.setException(std::move(ew));
501   return p.getFuture();
502 }
503
504 template <class T, class E>
505 typename std::enable_if<std::is_base_of<std::exception, E>::value,
506                         Future<T>>::type
507 makeFuture(E const& e) {
508   Promise<T> p;
509   p.setException(make_exception_wrapper<E>(e));
510   return p.getFuture();
511 }
512
513 template <class T>
514 Future<T> makeFuture(Try<T>&& t) {
515   Promise<typename std::decay<T>::type> p;
516   p.setTry(std::move(t));
517   return p.getFuture();
518 }
519
520 template <>
521 inline Future<void> makeFuture(Try<void>&& t) {
522   if (t.hasException()) {
523     return makeFuture<void>(std::move(t.exception()));
524   } else {
525     return makeFuture();
526   }
527 }
528
529 // via
530 Future<void> via(Executor* executor, int8_t priority) {
531   return makeFuture().via(executor, priority);
532 }
533
534 // mapSetCallback calls func(i, Try<T>) when every future completes
535
536 template <class T, class InputIterator, class F>
537 void mapSetCallback(InputIterator first, InputIterator last, F func) {
538   for (size_t i = 0; first != last; ++first, ++i) {
539     first->setCallback_([func, i](Try<T>&& t) {
540       func(i, std::move(t));
541     });
542   }
543 }
544
545 // collectAll (variadic)
546
547 template <typename... Fs>
548 typename detail::VariadicContext<
549   typename std::decay<Fs>::type::value_type...>::type
550 collectAll(Fs&&... fs) {
551   auto ctx = std::make_shared<detail::VariadicContext<
552     typename std::decay<Fs>::type::value_type...>>();
553   detail::collectAllVariadicHelper(ctx,
554     std::forward<typename std::decay<Fs>::type>(fs)...);
555   return ctx->p.getFuture();
556 }
557
558 // collectAll (iterator)
559
560 template <class InputIterator>
561 Future<
562   std::vector<
563   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
564 collectAll(InputIterator first, InputIterator last) {
565   typedef
566     typename std::iterator_traits<InputIterator>::value_type::value_type T;
567
568   struct CollectAllContext {
569     CollectAllContext(int n) : results(n) {}
570     ~CollectAllContext() {
571       p.setValue(std::move(results));
572     }
573     Promise<std::vector<Try<T>>> p;
574     std::vector<Try<T>> results;
575   };
576
577   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
578   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
579     ctx->results[i] = std::move(t);
580   });
581   return ctx->p.getFuture();
582 }
583
584 namespace detail {
585
586 template <typename T>
587 struct CollectContext {
588   struct Nothing { explicit Nothing(int n) {} };
589
590   using Result = typename std::conditional<
591     std::is_void<T>::value,
592     void,
593     std::vector<T>>::type;
594
595   using InternalResult = typename std::conditional<
596     std::is_void<T>::value,
597     Nothing,
598     std::vector<Optional<T>>>::type;
599
600   explicit CollectContext(int n) : result(n) {}
601   ~CollectContext() {
602     if (!threw.exchange(true)) {
603       // map Optional<T> -> T
604       std::vector<T> finalResult;
605       finalResult.reserve(result.size());
606       std::transform(result.begin(), result.end(),
607                      std::back_inserter(finalResult),
608                      [](Optional<T>& o) { return std::move(o.value()); });
609       p.setValue(std::move(finalResult));
610     }
611   }
612   inline void setPartialResult(size_t i, Try<T>& t) {
613     result[i] = std::move(t.value());
614   }
615   Promise<Result> p;
616   InternalResult result;
617   std::atomic<bool> threw;
618 };
619
620 // Specialize for void (implementations in Future.cpp)
621
622 template <>
623 CollectContext<void>::~CollectContext();
624
625 template <>
626 void CollectContext<void>::setPartialResult(size_t i, Try<void>& t);
627
628 }
629
630 template <class InputIterator>
631 Future<typename detail::CollectContext<
632   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
633 collect(InputIterator first, InputIterator last) {
634   typedef
635     typename std::iterator_traits<InputIterator>::value_type::value_type T;
636
637   auto ctx = std::make_shared<detail::CollectContext<T>>(
638     std::distance(first, last));
639   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
640     if (t.hasException()) {
641        if (!ctx->threw.exchange(true)) {
642          ctx->p.setException(std::move(t.exception()));
643        }
644      } else if (!ctx->threw) {
645        ctx->setPartialResult(i, t);
646      }
647   });
648   return ctx->p.getFuture();
649 }
650
651 template <class InputIterator>
652 Future<
653   std::pair<size_t,
654             Try<
655               typename
656               std::iterator_traits<InputIterator>::value_type::value_type>>>
657 collectAny(InputIterator first, InputIterator last) {
658   typedef
659     typename std::iterator_traits<InputIterator>::value_type::value_type T;
660
661   struct CollectAnyContext {
662     CollectAnyContext(size_t n) : done(false) {};
663     Promise<std::pair<size_t, Try<T>>> p;
664     std::atomic<bool> done;
665   };
666
667   auto ctx = std::make_shared<CollectAnyContext>(std::distance(first, last));
668   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
669     if (!ctx->done.exchange(true)) {
670       ctx->p.setValue(std::make_pair(i, std::move(t)));
671     }
672   });
673   return ctx->p.getFuture();
674 }
675
676 template <class InputIterator>
677 Future<std::vector<std::pair<size_t, Try<typename
678   std::iterator_traits<InputIterator>::value_type::value_type>>>>
679 collectN(InputIterator first, InputIterator last, size_t n) {
680   typedef typename
681     std::iterator_traits<InputIterator>::value_type::value_type T;
682   typedef std::vector<std::pair<size_t, Try<T>>> V;
683
684   struct CollectNContext {
685     V v;
686     std::atomic<size_t> completed = {0};
687     Promise<V> p;
688   };
689   auto ctx = std::make_shared<CollectNContext>();
690
691   if (std::distance(first, last) < n) {
692     ctx->p.setException(std::runtime_error("Not enough futures"));
693   } else {
694     // for each completed Future, increase count and add to vector, until we
695     // have n completed futures at which point we fulfil our Promise with the
696     // vector
697     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
698       auto c = ++ctx->completed;
699       if (c <= n) {
700         assert(ctx->v.size() < n);
701         ctx->v.push_back(std::make_pair(i, std::move(t)));
702         if (c == n) {
703           ctx->p.setTry(Try<V>(std::move(ctx->v)));
704         }
705       }
706     });
707   }
708
709   return ctx->p.getFuture();
710 }
711
712 template <class It, class T, class F>
713 Future<T> reduce(It first, It last, T&& initial, F&& func) {
714   if (first == last) {
715     return makeFuture(std::move(initial));
716   }
717
718   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
719   typedef typename std::conditional<
720     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
721   typedef isTry<Arg> IsTry;
722
723   folly::MoveWrapper<T> minitial(std::move(initial));
724   auto sfunc = std::make_shared<F>(std::move(func));
725
726   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
727     return (*sfunc)(std::move(*minitial),
728                 head.template get<IsTry::value, Arg&&>());
729   });
730
731   for (++first; first != last; ++first) {
732     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
733       return (*sfunc)(std::move(std::get<0>(t).value()),
734                   // Either return a ItT&& or a Try<ItT>&& depending
735                   // on the type of the argument of func.
736                   std::get<1>(t).template get<IsTry::value, Arg&&>());
737     });
738   }
739
740   return f;
741 }
742
743 template <class Collection, class F, class ItT, class Result>
744 std::vector<Future<Result>>
745 window(Collection input, F func, size_t n) {
746   struct WindowContext {
747     WindowContext(Collection&& i, F&& fn)
748         : i_(0), input_(std::move(i)), promises_(input_.size()),
749           func_(std::move(fn))
750       {}
751     std::atomic<size_t> i_;
752     Collection input_;
753     std::vector<Promise<Result>> promises_;
754     F func_;
755
756     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
757       size_t i = ctx->i_++;
758       if (i < ctx->input_.size()) {
759         // Using setCallback_ directly since we don't need the Future
760         ctx->func_(std::move(ctx->input_[i])).setCallback_(
761           // ctx is captured by value
762           [ctx, i](Try<Result>&& t) {
763             ctx->promises_[i].setTry(std::move(t));
764             // Chain another future onto this one
765             spawn(std::move(ctx));
766           });
767       }
768     }
769   };
770
771   auto max = std::min(n, input.size());
772
773   auto ctx = std::make_shared<WindowContext>(
774     std::move(input), std::move(func));
775
776   for (size_t i = 0; i < max; ++i) {
777     // Start the first n Futures
778     WindowContext::spawn(ctx);
779   }
780
781   std::vector<Future<Result>> futures;
782   futures.reserve(ctx->promises_.size());
783   for (auto& promise : ctx->promises_) {
784     futures.emplace_back(promise.getFuture());
785   }
786
787   return futures;
788 }
789
790 template <class T>
791 template <class I, class F>
792 Future<I> Future<T>::reduce(I&& initial, F&& func) {
793   folly::MoveWrapper<I> minitial(std::move(initial));
794   folly::MoveWrapper<F> mfunc(std::move(func));
795   return then([minitial, mfunc](T& vals) mutable {
796     auto ret = std::move(*minitial);
797     for (auto& val : vals) {
798       ret = (*mfunc)(std::move(ret), std::move(val));
799     }
800     return ret;
801   });
802 }
803
804 template <class It, class T, class F, class ItT, class Arg>
805 Future<T> unorderedReduce(It first, It last, T initial, F func) {
806   if (first == last) {
807     return makeFuture(std::move(initial));
808   }
809
810   typedef isTry<Arg> IsTry;
811
812   struct UnorderedReduceContext {
813     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
814         : lock_(), memo_(makeFuture<T>(std::move(memo))),
815           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
816       {};
817     folly::MicroSpinLock lock_; // protects memo_ and numThens_
818     Future<T> memo_;
819     F func_;
820     size_t numThens_; // how many Futures completed and called .then()
821     size_t numFutures_; // how many Futures in total
822     Promise<T> promise_;
823   };
824
825   auto ctx = std::make_shared<UnorderedReduceContext>(
826     std::move(initial), std::move(func), std::distance(first, last));
827
828   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
829     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
830     // Futures can be completed in any order, simultaneously.
831     // To make this non-blocking, we create a new Future chain in
832     // the order of completion to reduce the values.
833     // The spinlock just protects chaining a new Future, not actually
834     // executing the reduce, which should be really fast.
835     folly::MSLGuard lock(ctx->lock_);
836     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
837       // Either return a ItT&& or a Try<ItT>&& depending
838       // on the type of the argument of func.
839       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
840     });
841     if (++ctx->numThens_ == ctx->numFutures_) {
842       // After reducing the value of the last Future, fulfill the Promise
843       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
844         ctx->promise_.setValue(std::move(t2));
845       });
846     }
847   });
848
849   return ctx->promise_.getFuture();
850 }
851
852 template <class T>
853 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
854   return within(dur, TimedOut(), tk);
855 }
856
857 template <class T>
858 template <class E>
859 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
860
861   struct Context {
862     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
863     E exception;
864     Promise<T> promise;
865     std::atomic<bool> token;
866   };
867   auto ctx = std::make_shared<Context>(std::move(e));
868
869   if (!tk) {
870     tk = folly::detail::getTimekeeperSingleton();
871   }
872
873   tk->after(dur)
874     .then([ctx](Try<void> const& t) {
875       if (ctx->token.exchange(true) == false) {
876         if (t.hasException()) {
877           ctx->promise.setException(std::move(t.exception()));
878         } else {
879           ctx->promise.setException(std::move(ctx->exception));
880         }
881       }
882     });
883
884   this->then([ctx](Try<T>&& t) {
885     if (ctx->token.exchange(true) == false) {
886       ctx->promise.setTry(std::move(t));
887     }
888   });
889
890   return ctx->promise.getFuture().via(getExecutor());
891 }
892
893 template <class T>
894 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
895   return collectAll(*this, futures::sleep(dur, tk))
896     .then([](std::tuple<Try<T>, Try<void>> tup) {
897       Try<T>& t = std::get<0>(tup);
898       return makeFuture<T>(std::move(t));
899     });
900 }
901
902 namespace detail {
903
904 template <class T>
905 void waitImpl(Future<T>& f) {
906   // short-circuit if there's nothing to do
907   if (f.isReady()) return;
908
909   folly::fibers::Baton baton;
910   f = f.then([&](Try<T> t) {
911     baton.post();
912     return makeFuture(std::move(t));
913   });
914   baton.wait();
915
916   // There's a race here between the return here and the actual finishing of
917   // the future. f is completed, but the setup may not have finished on done
918   // after the baton has posted.
919   while (!f.isReady()) {
920     std::this_thread::yield();
921   }
922 }
923
924 template <class T>
925 void waitImpl(Future<T>& f, Duration dur) {
926   // short-circuit if there's nothing to do
927   if (f.isReady()) return;
928
929   auto baton = std::make_shared<folly::fibers::Baton>();
930   f = f.then([baton](Try<T> t) {
931     baton->post();
932     return makeFuture(std::move(t));
933   });
934
935   // Let's preserve the invariant that if we did not timeout (timed_wait returns
936   // true), then the returned Future is complete when it is returned to the
937   // caller. We need to wait out the race for that Future to complete.
938   if (baton->timed_wait(dur)) {
939     while (!f.isReady()) {
940       std::this_thread::yield();
941     }
942   }
943 }
944
945 template <class T>
946 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
947   while (!f.isReady()) {
948     e->drive();
949   }
950 }
951
952 } // detail
953
954 template <class T>
955 Future<T>& Future<T>::wait() & {
956   detail::waitImpl(*this);
957   return *this;
958 }
959
960 template <class T>
961 Future<T>&& Future<T>::wait() && {
962   detail::waitImpl(*this);
963   return std::move(*this);
964 }
965
966 template <class T>
967 Future<T>& Future<T>::wait(Duration dur) & {
968   detail::waitImpl(*this, dur);
969   return *this;
970 }
971
972 template <class T>
973 Future<T>&& Future<T>::wait(Duration dur) && {
974   detail::waitImpl(*this, dur);
975   return std::move(*this);
976 }
977
978 template <class T>
979 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
980   detail::waitViaImpl(*this, e);
981   return *this;
982 }
983
984 template <class T>
985 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
986   detail::waitViaImpl(*this, e);
987   return std::move(*this);
988 }
989
990 template <class T>
991 T Future<T>::get() {
992   return std::move(wait().value());
993 }
994
995 template <>
996 inline void Future<void>::get() {
997   wait().value();
998 }
999
1000 template <class T>
1001 T Future<T>::get(Duration dur) {
1002   wait(dur);
1003   if (isReady()) {
1004     return std::move(value());
1005   } else {
1006     throw TimedOut();
1007   }
1008 }
1009
1010 template <>
1011 inline void Future<void>::get(Duration dur) {
1012   wait(dur);
1013   if (isReady()) {
1014     return;
1015   } else {
1016     throw TimedOut();
1017   }
1018 }
1019
1020 template <class T>
1021 T Future<T>::getVia(DrivableExecutor* e) {
1022   return std::move(waitVia(e).value());
1023 }
1024
1025 template <>
1026 inline void Future<void>::getVia(DrivableExecutor* e) {
1027   waitVia(e).value();
1028 }
1029
1030 namespace detail {
1031   template <class T>
1032   struct TryEquals {
1033     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1034       return t1.value() == t2.value();
1035     }
1036   };
1037
1038   template <>
1039   struct TryEquals<void> {
1040     static bool equals(const Try<void>& t1, const Try<void>& t2) {
1041       return true;
1042     }
1043   };
1044 }
1045
1046 template <class T>
1047 Future<bool> Future<T>::willEqual(Future<T>& f) {
1048   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1049     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1050       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1051     } else {
1052       return false;
1053       }
1054   });
1055 }
1056
1057 template <class T>
1058 template <class F>
1059 Future<T> Future<T>::filter(F predicate) {
1060   auto p = folly::makeMoveWrapper(std::move(predicate));
1061   return this->then([p](T val) {
1062     T const& valConstRef = val;
1063     if (!(*p)(valConstRef)) {
1064       throw PredicateDoesNotObtain();
1065     }
1066     return val;
1067   });
1068 }
1069
1070 template <class T>
1071 template <class Callback>
1072 auto Future<T>::thenMulti(Callback&& fn)
1073     -> decltype(this->then(std::forward<Callback>(fn))) {
1074   // thenMulti with one callback is just a then
1075   return then(std::forward<Callback>(fn));
1076 }
1077
1078 template <class T>
1079 template <class Callback, class... Callbacks>
1080 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1081     -> decltype(this->then(std::forward<Callback>(fn)).
1082                       thenMulti(std::forward<Callbacks>(fns)...)) {
1083   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1084   return then(std::forward<Callback>(fn)).
1085          thenMulti(std::forward<Callbacks>(fns)...);
1086 }
1087
1088 template <class T>
1089 template <class Callback, class... Callbacks>
1090 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1091                                       Callbacks&&... fns)
1092     -> decltype(this->then(std::forward<Callback>(fn)).
1093                       thenMulti(std::forward<Callbacks>(fns)...)) {
1094   // thenMultiExecutor with two callbacks is
1095   // via(x).then(a).thenMulti(b, ...).via(oldX)
1096   auto oldX = getExecutor();
1097   setExecutor(x);
1098   return then(std::forward<Callback>(fn)).
1099          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1100 }
1101
1102 template <class T>
1103 template <class Callback>
1104 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1105     -> decltype(this->then(std::forward<Callback>(fn))) {
1106   // thenMulti with one callback is just a then with an executor
1107   return then(x, std::forward<Callback>(fn));
1108 }
1109
1110 namespace futures {
1111   template <class It, class F, class ItT, class Result>
1112   std::vector<Future<Result>> map(It first, It last, F func) {
1113     std::vector<Future<Result>> results;
1114     for (auto it = first; it != last; it++) {
1115       results.push_back(it->then(func));
1116     }
1117     return results;
1118   }
1119 }
1120
1121 // Instantiate the most common Future types to save compile time
1122 extern template class Future<void>;
1123 extern template class Future<bool>;
1124 extern template class Future<int>;
1125 extern template class Future<int64_t>;
1126 extern template class Future<std::string>;
1127 extern template class Future<double>;
1128
1129 } // namespace folly
1130
1131 // I haven't included a Future<T&> specialization because I don't forsee us
1132 // using it, however it is not difficult to add when needed. Refer to
1133 // Future<void> for guidance. std::future and boost::future code would also be
1134 // instructive.