9cacf19db2afd47b56583ea9e062875f2db27083
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
26
27 namespace folly {
28
29 class Timekeeper;
30
31 namespace detail {
32   Timekeeper* getTimekeeperSingleton();
33 }
34
35 template <class T>
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37   other.core_ = nullptr;
38 }
39
40 template <class T>
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42   std::swap(core_, other.core_);
43   return *this;
44 }
45
46 template <class T>
47 template <class T2, typename>
48 Future<T>::Future(T2&& val)
49   : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
50
51 template <class T>
52 template <typename, typename>
53 Future<T>::Future()
54   : core_(new detail::Core<T>(Try<T>())) {}
55
56 template <class T>
57 Future<T>::~Future() {
58   detach();
59 }
60
61 template <class T>
62 void Future<T>::detach() {
63   if (core_) {
64     core_->detachFuture();
65     core_ = nullptr;
66   }
67 }
68
69 template <class T>
70 void Future<T>::throwIfInvalid() const {
71   if (!core_)
72     throw NoState();
73 }
74
75 template <class T>
76 template <class F>
77 void Future<T>::setCallback_(F&& func) {
78   throwIfInvalid();
79   core_->setCallback(std::move(func));
80 }
81
82 // unwrap
83
84 template <class T>
85 template <class F>
86 typename std::enable_if<isFuture<F>::value,
87                         Future<typename isFuture<T>::Inner>>::type
88 Future<T>::unwrap() {
89   return then([](Future<typename isFuture<T>::Inner> internal_future) {
90       return internal_future;
91   });
92 }
93
94 // then
95
96 // Variant: returns a value
97 // e.g. f.then([](Try<T>&& t){ return t.value(); });
98 template <class T>
99 template <typename F, typename R, bool isTry, typename... Args>
100 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
101 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
102   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
103   typedef typename R::ReturnsFuture::Inner B;
104
105   throwIfInvalid();
106
107   // wrap these so we can move them into the lambda
108   folly::MoveWrapper<Promise<B>> p;
109   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
110   folly::MoveWrapper<F> funcm(std::forward<F>(func));
111
112   // grab the Future now before we lose our handle on the Promise
113   auto f = p->getFuture();
114   f.core_->setExecutorNoLock(getExecutor());
115
116   /* This is a bit tricky.
117
118      We can't just close over *this in case this Future gets moved. So we
119      make a new dummy Future. We could figure out something more
120      sophisticated that avoids making a new Future object when it can, as an
121      optimization. But this is correct.
122
123      core_ can't be moved, it is explicitly disallowed (as is copying). But
124      if there's ever a reason to allow it, this is one place that makes that
125      assumption and would need to be fixed. We use a standard shared pointer
126      for core_ (by copying it in), which means in essence obj holds a shared
127      pointer to itself.  But this shouldn't leak because Promise will not
128      outlive the continuation, because Promise will setException() with a
129      broken Promise if it is destructed before completed. We could use a
130      weak pointer but it would have to be converted to a shared pointer when
131      func is executed (because the Future returned by func may possibly
132      persist beyond the callback, if it gets moved), and so it is an
133      optimization to just make it shared from the get-go.
134
135      We have to move in the Promise and func using the MoveWrapper
136      hack. (func could be copied but it's a big drag on perf).
137
138      Two subtle but important points about this design. detail::Core has no
139      back pointers to Future or Promise, so if Future or Promise get moved
140      (and they will be moved in performant code) we don't have to do
141      anything fancy. And because we store the continuation in the
142      detail::Core, not in the Future, we can execute the continuation even
143      after the Future has gone out of scope. This is an intentional design
144      decision. It is likely we will want to be able to cancel a continuation
145      in some circumstances, but I think it should be explicit not implicit
146      in the destruction of the Future used to create it.
147      */
148   setCallback_(
149     [p, funcm](Try<T>&& t) mutable {
150       if (!isTry && t.hasException()) {
151         p->setException(std::move(t.exception()));
152       } else {
153         p->setWith([&]() {
154           return (*funcm)(t.template get<isTry, Args>()...);
155         });
156       }
157     });
158
159   return f;
160 }
161
162 // Variant: returns a Future
163 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
164 template <class T>
165 template <typename F, typename R, bool isTry, typename... Args>
166 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
167 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
168   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
169   typedef typename R::ReturnsFuture::Inner B;
170
171   throwIfInvalid();
172
173   // wrap these so we can move them into the lambda
174   folly::MoveWrapper<Promise<B>> p;
175   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
176   folly::MoveWrapper<F> funcm(std::forward<F>(func));
177
178   // grab the Future now before we lose our handle on the Promise
179   auto f = p->getFuture();
180   f.core_->setExecutorNoLock(getExecutor());
181
182   setCallback_(
183     [p, funcm](Try<T>&& t) mutable {
184       if (!isTry && t.hasException()) {
185         p->setException(std::move(t.exception()));
186       } else {
187         try {
188           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
189           // that didn't throw, now we can steal p
190           f2.setCallback_([p](Try<B>&& b) mutable {
191             p->setTry(std::move(b));
192           });
193         } catch (const std::exception& e) {
194           p->setException(exception_wrapper(std::current_exception(), e));
195         } catch (...) {
196           p->setException(exception_wrapper(std::current_exception()));
197         }
198       }
199     });
200
201   return f;
202 }
203
204 template <typename T>
205 template <typename R, typename Caller, typename... Args>
206   Future<typename isFuture<R>::Inner>
207 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
208   typedef typename std::remove_cv<
209     typename std::remove_reference<
210       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
211   return then([instance, func](Try<T>&& t){
212     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
213   });
214 }
215
216 template <class T>
217 template <class Executor, class Arg, class... Args>
218 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
219   -> decltype(this->then(std::forward<Arg>(arg),
220                          std::forward<Args>(args)...))
221 {
222   auto oldX = getExecutor();
223   setExecutor(x);
224   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
225                via(oldX);
226 }
227
228 template <class T>
229 Future<void> Future<T>::then() {
230   return then([] () {});
231 }
232
233 // onError where the callback returns T
234 template <class T>
235 template <class F>
236 typename std::enable_if<
237   !detail::callableWith<F, exception_wrapper>::value &&
238   !detail::Extract<F>::ReturnsFuture::value,
239   Future<T>>::type
240 Future<T>::onError(F&& func) {
241   typedef typename detail::Extract<F>::FirstArg Exn;
242   static_assert(
243       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
244       "Return type of onError callback must be T or Future<T>");
245
246   Promise<T> p;
247   auto f = p.getFuture();
248   auto pm = folly::makeMoveWrapper(std::move(p));
249   auto funcm = folly::makeMoveWrapper(std::move(func));
250   setCallback_([pm, funcm](Try<T>&& t) mutable {
251     if (!t.template withException<Exn>([&] (Exn& e) {
252           pm->setWith([&]{
253             return (*funcm)(e);
254           });
255         })) {
256       pm->setTry(std::move(t));
257     }
258   });
259
260   return f;
261 }
262
263 // onError where the callback returns Future<T>
264 template <class T>
265 template <class F>
266 typename std::enable_if<
267   !detail::callableWith<F, exception_wrapper>::value &&
268   detail::Extract<F>::ReturnsFuture::value,
269   Future<T>>::type
270 Future<T>::onError(F&& func) {
271   static_assert(
272       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
273       "Return type of onError callback must be T or Future<T>");
274   typedef typename detail::Extract<F>::FirstArg Exn;
275
276   Promise<T> p;
277   auto f = p.getFuture();
278   auto pm = folly::makeMoveWrapper(std::move(p));
279   auto funcm = folly::makeMoveWrapper(std::move(func));
280   setCallback_([pm, funcm](Try<T>&& t) mutable {
281     if (!t.template withException<Exn>([&] (Exn& e) {
282           try {
283             auto f2 = (*funcm)(e);
284             f2.setCallback_([pm](Try<T>&& t2) mutable {
285               pm->setTry(std::move(t2));
286             });
287           } catch (const std::exception& e2) {
288             pm->setException(exception_wrapper(std::current_exception(), e2));
289           } catch (...) {
290             pm->setException(exception_wrapper(std::current_exception()));
291           }
292         })) {
293       pm->setTry(std::move(t));
294     }
295   });
296
297   return f;
298 }
299
300 template <class T>
301 template <class F>
302 Future<T> Future<T>::ensure(F func) {
303   MoveWrapper<F> funcw(std::move(func));
304   return this->then([funcw](Try<T>&& t) {
305     (*funcw)();
306     return makeFuture(std::move(t));
307   });
308 }
309
310 template <class T>
311 template <class F>
312 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
313   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
314   return within(dur, tk)
315     .onError([funcw](TimedOut const&) { return (*funcw)(); });
316 }
317
318 template <class T>
319 template <class F>
320 typename std::enable_if<
321   detail::callableWith<F, exception_wrapper>::value &&
322   detail::Extract<F>::ReturnsFuture::value,
323   Future<T>>::type
324 Future<T>::onError(F&& func) {
325   static_assert(
326       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
327       "Return type of onError callback must be T or Future<T>");
328
329   Promise<T> p;
330   auto f = p.getFuture();
331   auto pm = folly::makeMoveWrapper(std::move(p));
332   auto funcm = folly::makeMoveWrapper(std::move(func));
333   setCallback_([pm, funcm](Try<T> t) mutable {
334     if (t.hasException()) {
335       try {
336         auto f2 = (*funcm)(std::move(t.exception()));
337         f2.setCallback_([pm](Try<T> t2) mutable {
338           pm->setTry(std::move(t2));
339         });
340       } catch (const std::exception& e2) {
341         pm->setException(exception_wrapper(std::current_exception(), e2));
342       } catch (...) {
343         pm->setException(exception_wrapper(std::current_exception()));
344       }
345     } else {
346       pm->setTry(std::move(t));
347     }
348   });
349
350   return f;
351 }
352
353 // onError(exception_wrapper) that returns T
354 template <class T>
355 template <class F>
356 typename std::enable_if<
357   detail::callableWith<F, exception_wrapper>::value &&
358   !detail::Extract<F>::ReturnsFuture::value,
359   Future<T>>::type
360 Future<T>::onError(F&& func) {
361   static_assert(
362       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
363       "Return type of onError callback must be T or Future<T>");
364
365   Promise<T> p;
366   auto f = p.getFuture();
367   auto pm = folly::makeMoveWrapper(std::move(p));
368   auto funcm = folly::makeMoveWrapper(std::move(func));
369   setCallback_([pm, funcm](Try<T> t) mutable {
370     if (t.hasException()) {
371       pm->setWith([&]{
372         return (*funcm)(std::move(t.exception()));
373       });
374     } else {
375       pm->setTry(std::move(t));
376     }
377   });
378
379   return f;
380 }
381
382 template <class T>
383 typename std::add_lvalue_reference<T>::type Future<T>::value() {
384   throwIfInvalid();
385
386   return core_->getTry().value();
387 }
388
389 template <class T>
390 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
391   throwIfInvalid();
392
393   return core_->getTry().value();
394 }
395
396 template <class T>
397 Try<T>& Future<T>::getTry() {
398   throwIfInvalid();
399
400   return core_->getTry();
401 }
402
403 template <class T>
404 Optional<Try<T>> Future<T>::poll() {
405   Optional<Try<T>> o;
406   if (core_->ready()) {
407     o = std::move(core_->getTry());
408   }
409   return o;
410 }
411
412 template <class T>
413 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
414   throwIfInvalid();
415
416   setExecutor(executor, priority);
417
418   return std::move(*this);
419 }
420
421 template <class T>
422 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
423   throwIfInvalid();
424
425   MoveWrapper<Promise<T>> p;
426   auto f = p->getFuture();
427   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
428   return std::move(f).via(executor, priority);
429 }
430
431
432 template <class Func>
433 auto via(Executor* x, Func func)
434   -> Future<typename isFuture<decltype(func())>::Inner>
435 // this would work, if not for Future<void> :-/
436 // -> decltype(via(x).then(func))
437 {
438   // TODO make this actually more performant. :-P #7260175
439   return via(x).then(func);
440 }
441
442 template <class T>
443 bool Future<T>::isReady() const {
444   throwIfInvalid();
445   return core_->ready();
446 }
447
448 template <class T>
449 bool Future<T>::hasValue() {
450   return getTry().hasValue();
451 }
452
453 template <class T>
454 bool Future<T>::hasException() {
455   return getTry().hasException();
456 }
457
458 template <class T>
459 void Future<T>::raise(exception_wrapper exception) {
460   core_->raise(std::move(exception));
461 }
462
463 // makeFuture
464
465 template <class T>
466 Future<typename std::decay<T>::type> makeFuture(T&& t) {
467   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
468 }
469
470 inline // for multiple translation units
471 Future<void> makeFuture() {
472   return makeFuture(Try<void>());
473 }
474
475 template <class F>
476 auto makeFutureWith(
477     F&& func,
478     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
479     -> Future<decltype(func())> {
480   return makeFuture(makeTryWith([&func]() {
481     return (func)();
482   }));
483 }
484
485 template <class F>
486 auto makeFutureWith(F const& func) -> Future<decltype(func())> {
487   F copy = func;
488   return makeFuture(makeTryWith(std::move(copy)));
489 }
490
491 template <class T>
492 Future<T> makeFuture(std::exception_ptr const& e) {
493   return makeFuture(Try<T>(e));
494 }
495
496 template <class T>
497 Future<T> makeFuture(exception_wrapper ew) {
498   return makeFuture(Try<T>(std::move(ew)));
499 }
500
501 template <class T, class E>
502 typename std::enable_if<std::is_base_of<std::exception, E>::value,
503                         Future<T>>::type
504 makeFuture(E const& e) {
505   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
506 }
507
508 template <class T>
509 Future<T> makeFuture(Try<T>&& t) {
510   return Future<T>(new detail::Core<T>(std::move(t)));
511 }
512
513 // via
514 Future<void> via(Executor* executor, int8_t priority) {
515   return makeFuture().via(executor, priority);
516 }
517
518 // mapSetCallback calls func(i, Try<T>) when every future completes
519
520 template <class T, class InputIterator, class F>
521 void mapSetCallback(InputIterator first, InputIterator last, F func) {
522   for (size_t i = 0; first != last; ++first, ++i) {
523     first->setCallback_([func, i](Try<T>&& t) {
524       func(i, std::move(t));
525     });
526   }
527 }
528
529 // collectAll (variadic)
530
531 template <typename... Fs>
532 typename detail::CollectAllVariadicContext<
533   typename std::decay<Fs>::type::value_type...>::type
534 collectAll(Fs&&... fs) {
535   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
536     typename std::decay<Fs>::type::value_type...>>();
537   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
538     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
539   return ctx->p.getFuture();
540 }
541
542 // collectAll (iterator)
543
544 template <class InputIterator>
545 Future<
546   std::vector<
547   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
548 collectAll(InputIterator first, InputIterator last) {
549   typedef
550     typename std::iterator_traits<InputIterator>::value_type::value_type T;
551
552   struct CollectAllContext {
553     CollectAllContext(int n) : results(n) {}
554     ~CollectAllContext() {
555       p.setValue(std::move(results));
556     }
557     Promise<std::vector<Try<T>>> p;
558     std::vector<Try<T>> results;
559   };
560
561   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
562   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
563     ctx->results[i] = std::move(t);
564   });
565   return ctx->p.getFuture();
566 }
567
568 // collect (iterator)
569
570 namespace detail {
571
572 template <typename T>
573 struct CollectContext {
574   struct Nothing { explicit Nothing(int n) {} };
575
576   using Result = typename std::conditional<
577     std::is_void<T>::value,
578     void,
579     std::vector<T>>::type;
580
581   using InternalResult = typename std::conditional<
582     std::is_void<T>::value,
583     Nothing,
584     std::vector<Optional<T>>>::type;
585
586   explicit CollectContext(int n) : result(n) {}
587   ~CollectContext() {
588     if (!threw.exchange(true)) {
589       // map Optional<T> -> T
590       std::vector<T> finalResult;
591       finalResult.reserve(result.size());
592       std::transform(result.begin(), result.end(),
593                      std::back_inserter(finalResult),
594                      [](Optional<T>& o) { return std::move(o.value()); });
595       p.setValue(std::move(finalResult));
596     }
597   }
598   inline void setPartialResult(size_t i, Try<T>& t) {
599     result[i] = std::move(t.value());
600   }
601   Promise<Result> p;
602   InternalResult result;
603   std::atomic<bool> threw {false};
604 };
605
606 // Specialize for void (implementations in Future.cpp)
607
608 template <>
609 CollectContext<void>::~CollectContext();
610
611 template <>
612 void CollectContext<void>::setPartialResult(size_t i, Try<void>& t);
613
614 }
615
616 template <class InputIterator>
617 Future<typename detail::CollectContext<
618   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
619 collect(InputIterator first, InputIterator last) {
620   typedef
621     typename std::iterator_traits<InputIterator>::value_type::value_type T;
622
623   auto ctx = std::make_shared<detail::CollectContext<T>>(
624     std::distance(first, last));
625   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
626     if (t.hasException()) {
627        if (!ctx->threw.exchange(true)) {
628          ctx->p.setException(std::move(t.exception()));
629        }
630      } else if (!ctx->threw) {
631        ctx->setPartialResult(i, t);
632      }
633   });
634   return ctx->p.getFuture();
635 }
636
637 // collect (variadic)
638
639 template <typename... Fs>
640 typename detail::CollectVariadicContext<
641   typename std::decay<Fs>::type::value_type...>::type
642 collect(Fs&&... fs) {
643   auto ctx = std::make_shared<detail::CollectVariadicContext<
644     typename std::decay<Fs>::type::value_type...>>();
645   detail::collectVariadicHelper<detail::CollectVariadicContext>(
646     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
647   return ctx->p.getFuture();
648 }
649
650 // collectAny (iterator)
651
652 template <class InputIterator>
653 Future<
654   std::pair<size_t,
655             Try<
656               typename
657               std::iterator_traits<InputIterator>::value_type::value_type>>>
658 collectAny(InputIterator first, InputIterator last) {
659   typedef
660     typename std::iterator_traits<InputIterator>::value_type::value_type T;
661
662   struct CollectAnyContext {
663     CollectAnyContext() {};
664     Promise<std::pair<size_t, Try<T>>> p;
665     std::atomic<bool> done {false};
666   };
667
668   auto ctx = std::make_shared<CollectAnyContext>();
669   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
670     if (!ctx->done.exchange(true)) {
671       ctx->p.setValue(std::make_pair(i, std::move(t)));
672     }
673   });
674   return ctx->p.getFuture();
675 }
676
677 // collectN (iterator)
678
679 template <class InputIterator>
680 Future<std::vector<std::pair<size_t, Try<typename
681   std::iterator_traits<InputIterator>::value_type::value_type>>>>
682 collectN(InputIterator first, InputIterator last, size_t n) {
683   typedef typename
684     std::iterator_traits<InputIterator>::value_type::value_type T;
685   typedef std::vector<std::pair<size_t, Try<T>>> V;
686
687   struct CollectNContext {
688     V v;
689     std::atomic<size_t> completed = {0};
690     Promise<V> p;
691   };
692   auto ctx = std::make_shared<CollectNContext>();
693
694   if (size_t(std::distance(first, last)) < n) {
695     ctx->p.setException(std::runtime_error("Not enough futures"));
696   } else {
697     // for each completed Future, increase count and add to vector, until we
698     // have n completed futures at which point we fulfil our Promise with the
699     // vector
700     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
701       auto c = ++ctx->completed;
702       if (c <= n) {
703         assert(ctx->v.size() < n);
704         ctx->v.emplace_back(i, std::move(t));
705         if (c == n) {
706           ctx->p.setTry(Try<V>(std::move(ctx->v)));
707         }
708       }
709     });
710   }
711
712   return ctx->p.getFuture();
713 }
714
715 // reduce (iterator)
716
717 template <class It, class T, class F>
718 Future<T> reduce(It first, It last, T&& initial, F&& func) {
719   if (first == last) {
720     return makeFuture(std::move(initial));
721   }
722
723   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
724   typedef typename std::conditional<
725     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
726   typedef isTry<Arg> IsTry;
727
728   folly::MoveWrapper<T> minitial(std::move(initial));
729   auto sfunc = std::make_shared<F>(std::move(func));
730
731   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
732     return (*sfunc)(std::move(*minitial),
733                 head.template get<IsTry::value, Arg&&>());
734   });
735
736   for (++first; first != last; ++first) {
737     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
738       return (*sfunc)(std::move(std::get<0>(t).value()),
739                   // Either return a ItT&& or a Try<ItT>&& depending
740                   // on the type of the argument of func.
741                   std::get<1>(t).template get<IsTry::value, Arg&&>());
742     });
743   }
744
745   return f;
746 }
747
748 // window (collection)
749
750 template <class Collection, class F, class ItT, class Result>
751 std::vector<Future<Result>>
752 window(Collection input, F func, size_t n) {
753   struct WindowContext {
754     WindowContext(Collection&& i, F&& fn)
755         : input_(std::move(i)), promises_(input_.size()),
756           func_(std::move(fn))
757       {}
758     std::atomic<size_t> i_ {0};
759     Collection input_;
760     std::vector<Promise<Result>> promises_;
761     F func_;
762
763     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
764       size_t i = ctx->i_++;
765       if (i < ctx->input_.size()) {
766         // Using setCallback_ directly since we don't need the Future
767         ctx->func_(std::move(ctx->input_[i])).setCallback_(
768           // ctx is captured by value
769           [ctx, i](Try<Result>&& t) {
770             ctx->promises_[i].setTry(std::move(t));
771             // Chain another future onto this one
772             spawn(std::move(ctx));
773           });
774       }
775     }
776   };
777
778   auto max = std::min(n, input.size());
779
780   auto ctx = std::make_shared<WindowContext>(
781     std::move(input), std::move(func));
782
783   for (size_t i = 0; i < max; ++i) {
784     // Start the first n Futures
785     WindowContext::spawn(ctx);
786   }
787
788   std::vector<Future<Result>> futures;
789   futures.reserve(ctx->promises_.size());
790   for (auto& promise : ctx->promises_) {
791     futures.emplace_back(promise.getFuture());
792   }
793
794   return futures;
795 }
796
797 // reduce
798
799 template <class T>
800 template <class I, class F>
801 Future<I> Future<T>::reduce(I&& initial, F&& func) {
802   folly::MoveWrapper<I> minitial(std::move(initial));
803   folly::MoveWrapper<F> mfunc(std::move(func));
804   return then([minitial, mfunc](T& vals) mutable {
805     auto ret = std::move(*minitial);
806     for (auto& val : vals) {
807       ret = (*mfunc)(std::move(ret), std::move(val));
808     }
809     return ret;
810   });
811 }
812
813 // unorderedReduce (iterator)
814
815 template <class It, class T, class F, class ItT, class Arg>
816 Future<T> unorderedReduce(It first, It last, T initial, F func) {
817   if (first == last) {
818     return makeFuture(std::move(initial));
819   }
820
821   typedef isTry<Arg> IsTry;
822
823   struct UnorderedReduceContext {
824     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
825         : lock_(), memo_(makeFuture<T>(std::move(memo))),
826           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
827       {};
828     folly::MicroSpinLock lock_; // protects memo_ and numThens_
829     Future<T> memo_;
830     F func_;
831     size_t numThens_; // how many Futures completed and called .then()
832     size_t numFutures_; // how many Futures in total
833     Promise<T> promise_;
834   };
835
836   auto ctx = std::make_shared<UnorderedReduceContext>(
837     std::move(initial), std::move(func), std::distance(first, last));
838
839   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
840     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
841     // Futures can be completed in any order, simultaneously.
842     // To make this non-blocking, we create a new Future chain in
843     // the order of completion to reduce the values.
844     // The spinlock just protects chaining a new Future, not actually
845     // executing the reduce, which should be really fast.
846     folly::MSLGuard lock(ctx->lock_);
847     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
848       // Either return a ItT&& or a Try<ItT>&& depending
849       // on the type of the argument of func.
850       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
851     });
852     if (++ctx->numThens_ == ctx->numFutures_) {
853       // After reducing the value of the last Future, fulfill the Promise
854       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
855         ctx->promise_.setValue(std::move(t2));
856       });
857     }
858   });
859
860   return ctx->promise_.getFuture();
861 }
862
863 // within
864
865 template <class T>
866 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
867   return within(dur, TimedOut(), tk);
868 }
869
870 template <class T>
871 template <class E>
872 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
873
874   struct Context {
875     Context(E ex) : exception(std::move(ex)), promise() {}
876     E exception;
877     Promise<T> promise;
878     std::atomic<bool> token {false};
879   };
880   auto ctx = std::make_shared<Context>(std::move(e));
881
882   if (!tk) {
883     tk = folly::detail::getTimekeeperSingleton();
884   }
885
886   tk->after(dur)
887     .then([ctx](Try<void> const& t) {
888       if (ctx->token.exchange(true) == false) {
889         if (t.hasException()) {
890           ctx->promise.setException(std::move(t.exception()));
891         } else {
892           ctx->promise.setException(std::move(ctx->exception));
893         }
894       }
895     });
896
897   this->then([ctx](Try<T>&& t) {
898     if (ctx->token.exchange(true) == false) {
899       ctx->promise.setTry(std::move(t));
900     }
901   });
902
903   return ctx->promise.getFuture().via(getExecutor());
904 }
905
906 // delayed
907
908 template <class T>
909 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
910   return collectAll(*this, futures::sleep(dur, tk))
911     .then([](std::tuple<Try<T>, Try<void>> tup) {
912       Try<T>& t = std::get<0>(tup);
913       return makeFuture<T>(std::move(t));
914     });
915 }
916
917 namespace detail {
918
919 template <class T>
920 void waitImpl(Future<T>& f) {
921   // short-circuit if there's nothing to do
922   if (f.isReady()) return;
923
924   folly::fibers::Baton baton;
925   f = f.then([&](Try<T> t) {
926     baton.post();
927     return makeFuture(std::move(t));
928   });
929   baton.wait();
930
931   // There's a race here between the return here and the actual finishing of
932   // the future. f is completed, but the setup may not have finished on done
933   // after the baton has posted.
934   while (!f.isReady()) {
935     std::this_thread::yield();
936   }
937 }
938
939 template <class T>
940 void waitImpl(Future<T>& f, Duration dur) {
941   // short-circuit if there's nothing to do
942   if (f.isReady()) return;
943
944   auto baton = std::make_shared<folly::fibers::Baton>();
945   f = f.then([baton](Try<T> t) {
946     baton->post();
947     return makeFuture(std::move(t));
948   });
949
950   // Let's preserve the invariant that if we did not timeout (timed_wait returns
951   // true), then the returned Future is complete when it is returned to the
952   // caller. We need to wait out the race for that Future to complete.
953   if (baton->timed_wait(dur)) {
954     while (!f.isReady()) {
955       std::this_thread::yield();
956     }
957   }
958 }
959
960 template <class T>
961 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
962   while (!f.isReady()) {
963     e->drive();
964   }
965 }
966
967 } // detail
968
969 template <class T>
970 Future<T>& Future<T>::wait() & {
971   detail::waitImpl(*this);
972   return *this;
973 }
974
975 template <class T>
976 Future<T>&& Future<T>::wait() && {
977   detail::waitImpl(*this);
978   return std::move(*this);
979 }
980
981 template <class T>
982 Future<T>& Future<T>::wait(Duration dur) & {
983   detail::waitImpl(*this, dur);
984   return *this;
985 }
986
987 template <class T>
988 Future<T>&& Future<T>::wait(Duration dur) && {
989   detail::waitImpl(*this, dur);
990   return std::move(*this);
991 }
992
993 template <class T>
994 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
995   detail::waitViaImpl(*this, e);
996   return *this;
997 }
998
999 template <class T>
1000 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1001   detail::waitViaImpl(*this, e);
1002   return std::move(*this);
1003 }
1004
1005 template <class T>
1006 T Future<T>::get() {
1007   return std::move(wait().value());
1008 }
1009
1010 template <>
1011 inline void Future<void>::get() {
1012   wait().value();
1013 }
1014
1015 template <class T>
1016 T Future<T>::get(Duration dur) {
1017   wait(dur);
1018   if (isReady()) {
1019     return std::move(value());
1020   } else {
1021     throw TimedOut();
1022   }
1023 }
1024
1025 template <>
1026 inline void Future<void>::get(Duration dur) {
1027   wait(dur);
1028   if (isReady()) {
1029     return;
1030   } else {
1031     throw TimedOut();
1032   }
1033 }
1034
1035 template <class T>
1036 T Future<T>::getVia(DrivableExecutor* e) {
1037   return std::move(waitVia(e).value());
1038 }
1039
1040 template <>
1041 inline void Future<void>::getVia(DrivableExecutor* e) {
1042   waitVia(e).value();
1043 }
1044
1045 namespace detail {
1046   template <class T>
1047   struct TryEquals {
1048     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1049       return t1.value() == t2.value();
1050     }
1051   };
1052
1053   template <>
1054   struct TryEquals<void> {
1055     static bool equals(const Try<void>& t1, const Try<void>& t2) {
1056       return true;
1057     }
1058   };
1059 }
1060
1061 template <class T>
1062 Future<bool> Future<T>::willEqual(Future<T>& f) {
1063   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1064     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1065       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1066     } else {
1067       return false;
1068       }
1069   });
1070 }
1071
1072 template <class T>
1073 template <class F>
1074 Future<T> Future<T>::filter(F predicate) {
1075   auto p = folly::makeMoveWrapper(std::move(predicate));
1076   return this->then([p](T val) {
1077     T const& valConstRef = val;
1078     if (!(*p)(valConstRef)) {
1079       throw PredicateDoesNotObtain();
1080     }
1081     return val;
1082   });
1083 }
1084
1085 template <class T>
1086 template <class Callback>
1087 auto Future<T>::thenMulti(Callback&& fn)
1088     -> decltype(this->then(std::forward<Callback>(fn))) {
1089   // thenMulti with one callback is just a then
1090   return then(std::forward<Callback>(fn));
1091 }
1092
1093 template <class T>
1094 template <class Callback, class... Callbacks>
1095 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1096     -> decltype(this->then(std::forward<Callback>(fn)).
1097                       thenMulti(std::forward<Callbacks>(fns)...)) {
1098   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1099   return then(std::forward<Callback>(fn)).
1100          thenMulti(std::forward<Callbacks>(fns)...);
1101 }
1102
1103 template <class T>
1104 template <class Callback, class... Callbacks>
1105 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1106                                       Callbacks&&... fns)
1107     -> decltype(this->then(std::forward<Callback>(fn)).
1108                       thenMulti(std::forward<Callbacks>(fns)...)) {
1109   // thenMultiExecutor with two callbacks is
1110   // via(x).then(a).thenMulti(b, ...).via(oldX)
1111   auto oldX = getExecutor();
1112   setExecutor(x);
1113   return then(std::forward<Callback>(fn)).
1114          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1115 }
1116
1117 template <class T>
1118 template <class Callback>
1119 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1120     -> decltype(this->then(std::forward<Callback>(fn))) {
1121   // thenMulti with one callback is just a then with an executor
1122   return then(x, std::forward<Callback>(fn));
1123 }
1124
1125 namespace futures {
1126   template <class It, class F, class ItT, class Result>
1127   std::vector<Future<Result>> map(It first, It last, F func) {
1128     std::vector<Future<Result>> results;
1129     for (auto it = first; it != last; it++) {
1130       results.push_back(it->then(func));
1131     }
1132     return results;
1133   }
1134 }
1135
1136 // Instantiate the most common Future types to save compile time
1137 extern template class Future<void>;
1138 extern template class Future<bool>;
1139 extern template class Future<int>;
1140 extern template class Future<int64_t>;
1141 extern template class Future<std::string>;
1142 extern template class Future<double>;
1143
1144 } // namespace folly
1145
1146 // I haven't included a Future<T&> specialization because I don't forsee us
1147 // using it, however it is not difficult to add when needed. Refer to
1148 // Future<void> for guidance. std::future and boost::future code would also be
1149 // instructive.