Allow for mutable lambdas with Future::ensure()
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
26
27 namespace folly {
28
29 class Timekeeper;
30
31 namespace detail {
32   Timekeeper* getTimekeeperSingleton();
33 }
34
35 template <class T>
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37   other.core_ = nullptr;
38 }
39
40 template <class T>
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42   std::swap(core_, other.core_);
43   return *this;
44 }
45
46 template <class T>
47 template <class T2, typename>
48 Future<T>::Future(T2&& val)
49   : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
50
51 template <class T>
52 template <typename, typename>
53 Future<T>::Future()
54   : core_(new detail::Core<T>(Try<T>(T()))) {}
55
56 template <class T>
57 Future<T>::~Future() {
58   detach();
59 }
60
61 template <class T>
62 void Future<T>::detach() {
63   if (core_) {
64     core_->detachFuture();
65     core_ = nullptr;
66   }
67 }
68
69 template <class T>
70 void Future<T>::throwIfInvalid() const {
71   if (!core_)
72     throw NoState();
73 }
74
75 template <class T>
76 template <class F>
77 void Future<T>::setCallback_(F&& func) {
78   throwIfInvalid();
79   core_->setCallback(std::move(func));
80 }
81
82 // unwrap
83
84 template <class T>
85 template <class F>
86 typename std::enable_if<isFuture<F>::value,
87                         Future<typename isFuture<T>::Inner>>::type
88 Future<T>::unwrap() {
89   return then([](Future<typename isFuture<T>::Inner> internal_future) {
90       return internal_future;
91   });
92 }
93
94 // then
95
96 // Variant: returns a value
97 // e.g. f.then([](Try<T>&& t){ return t.value(); });
98 template <class T>
99 template <typename F, typename R, bool isTry, typename... Args>
100 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
101 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
102   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
103   typedef typename R::ReturnsFuture::Inner B;
104
105   throwIfInvalid();
106
107   // wrap these so we can move them into the lambda
108   folly::MoveWrapper<Promise<B>> p;
109   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
110   folly::MoveWrapper<F> funcm(std::forward<F>(func));
111
112   // grab the Future now before we lose our handle on the Promise
113   auto f = p->getFuture();
114   f.core_->setExecutorNoLock(getExecutor());
115
116   /* This is a bit tricky.
117
118      We can't just close over *this in case this Future gets moved. So we
119      make a new dummy Future. We could figure out something more
120      sophisticated that avoids making a new Future object when it can, as an
121      optimization. But this is correct.
122
123      core_ can't be moved, it is explicitly disallowed (as is copying). But
124      if there's ever a reason to allow it, this is one place that makes that
125      assumption and would need to be fixed. We use a standard shared pointer
126      for core_ (by copying it in), which means in essence obj holds a shared
127      pointer to itself.  But this shouldn't leak because Promise will not
128      outlive the continuation, because Promise will setException() with a
129      broken Promise if it is destructed before completed. We could use a
130      weak pointer but it would have to be converted to a shared pointer when
131      func is executed (because the Future returned by func may possibly
132      persist beyond the callback, if it gets moved), and so it is an
133      optimization to just make it shared from the get-go.
134
135      We have to move in the Promise and func using the MoveWrapper
136      hack. (func could be copied but it's a big drag on perf).
137
138      Two subtle but important points about this design. detail::Core has no
139      back pointers to Future or Promise, so if Future or Promise get moved
140      (and they will be moved in performant code) we don't have to do
141      anything fancy. And because we store the continuation in the
142      detail::Core, not in the Future, we can execute the continuation even
143      after the Future has gone out of scope. This is an intentional design
144      decision. It is likely we will want to be able to cancel a continuation
145      in some circumstances, but I think it should be explicit not implicit
146      in the destruction of the Future used to create it.
147      */
148   setCallback_(
149     [p, funcm](Try<T>&& t) mutable {
150       if (!isTry && t.hasException()) {
151         p->setException(std::move(t.exception()));
152       } else {
153         p->setWith([&]() {
154           return (*funcm)(t.template get<isTry, Args>()...);
155         });
156       }
157     });
158
159   return f;
160 }
161
162 // Variant: returns a Future
163 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
164 template <class T>
165 template <typename F, typename R, bool isTry, typename... Args>
166 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
167 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
168   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
169   typedef typename R::ReturnsFuture::Inner B;
170
171   throwIfInvalid();
172
173   // wrap these so we can move them into the lambda
174   folly::MoveWrapper<Promise<B>> p;
175   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
176   folly::MoveWrapper<F> funcm(std::forward<F>(func));
177
178   // grab the Future now before we lose our handle on the Promise
179   auto f = p->getFuture();
180   f.core_->setExecutorNoLock(getExecutor());
181
182   setCallback_(
183     [p, funcm](Try<T>&& t) mutable {
184       if (!isTry && t.hasException()) {
185         p->setException(std::move(t.exception()));
186       } else {
187         try {
188           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
189           // that didn't throw, now we can steal p
190           f2.setCallback_([p](Try<B>&& b) mutable {
191             p->setTry(std::move(b));
192           });
193         } catch (const std::exception& e) {
194           p->setException(exception_wrapper(std::current_exception(), e));
195         } catch (...) {
196           p->setException(exception_wrapper(std::current_exception()));
197         }
198       }
199     });
200
201   return f;
202 }
203
204 template <typename T>
205 template <typename R, typename Caller, typename... Args>
206   Future<typename isFuture<R>::Inner>
207 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
208   typedef typename std::remove_cv<
209     typename std::remove_reference<
210       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
211   return then([instance, func](Try<T>&& t){
212     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
213   });
214 }
215
216 template <class T>
217 template <class Executor, class Arg, class... Args>
218 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
219   -> decltype(this->then(std::forward<Arg>(arg),
220                          std::forward<Args>(args)...))
221 {
222   auto oldX = getExecutor();
223   setExecutor(x);
224   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
225                via(oldX);
226 }
227
228 template <class T>
229 Future<Unit> Future<T>::then() {
230   return then([] () {});
231 }
232
233 // onError where the callback returns T
234 template <class T>
235 template <class F>
236 typename std::enable_if<
237   !detail::callableWith<F, exception_wrapper>::value &&
238   !detail::Extract<F>::ReturnsFuture::value,
239   Future<T>>::type
240 Future<T>::onError(F&& func) {
241   typedef typename detail::Extract<F>::FirstArg Exn;
242   static_assert(
243       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
244       "Return type of onError callback must be T or Future<T>");
245
246   Promise<T> p;
247   auto f = p.getFuture();
248   auto pm = folly::makeMoveWrapper(std::move(p));
249   auto funcm = folly::makeMoveWrapper(std::move(func));
250   setCallback_([pm, funcm](Try<T>&& t) mutable {
251     if (!t.template withException<Exn>([&] (Exn& e) {
252           pm->setWith([&]{
253             return (*funcm)(e);
254           });
255         })) {
256       pm->setTry(std::move(t));
257     }
258   });
259
260   return f;
261 }
262
263 // onError where the callback returns Future<T>
264 template <class T>
265 template <class F>
266 typename std::enable_if<
267   !detail::callableWith<F, exception_wrapper>::value &&
268   detail::Extract<F>::ReturnsFuture::value,
269   Future<T>>::type
270 Future<T>::onError(F&& func) {
271   static_assert(
272       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
273       "Return type of onError callback must be T or Future<T>");
274   typedef typename detail::Extract<F>::FirstArg Exn;
275
276   Promise<T> p;
277   auto f = p.getFuture();
278   auto pm = folly::makeMoveWrapper(std::move(p));
279   auto funcm = folly::makeMoveWrapper(std::move(func));
280   setCallback_([pm, funcm](Try<T>&& t) mutable {
281     if (!t.template withException<Exn>([&] (Exn& e) {
282           try {
283             auto f2 = (*funcm)(e);
284             f2.setCallback_([pm](Try<T>&& t2) mutable {
285               pm->setTry(std::move(t2));
286             });
287           } catch (const std::exception& e2) {
288             pm->setException(exception_wrapper(std::current_exception(), e2));
289           } catch (...) {
290             pm->setException(exception_wrapper(std::current_exception()));
291           }
292         })) {
293       pm->setTry(std::move(t));
294     }
295   });
296
297   return f;
298 }
299
300 template <class T>
301 template <class F>
302 Future<T> Future<T>::ensure(F func) {
303   MoveWrapper<F> funcw(std::move(func));
304   return this->then([funcw](Try<T>&& t) mutable {
305     (*funcw)();
306     return makeFuture(std::move(t));
307   });
308 }
309
310 template <class T>
311 template <class F>
312 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
313   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
314   return within(dur, tk)
315     .onError([funcw](TimedOut const&) { return (*funcw)(); });
316 }
317
318 template <class T>
319 template <class F>
320 typename std::enable_if<
321   detail::callableWith<F, exception_wrapper>::value &&
322   detail::Extract<F>::ReturnsFuture::value,
323   Future<T>>::type
324 Future<T>::onError(F&& func) {
325   static_assert(
326       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
327       "Return type of onError callback must be T or Future<T>");
328
329   Promise<T> p;
330   auto f = p.getFuture();
331   auto pm = folly::makeMoveWrapper(std::move(p));
332   auto funcm = folly::makeMoveWrapper(std::move(func));
333   setCallback_([pm, funcm](Try<T> t) mutable {
334     if (t.hasException()) {
335       try {
336         auto f2 = (*funcm)(std::move(t.exception()));
337         f2.setCallback_([pm](Try<T> t2) mutable {
338           pm->setTry(std::move(t2));
339         });
340       } catch (const std::exception& e2) {
341         pm->setException(exception_wrapper(std::current_exception(), e2));
342       } catch (...) {
343         pm->setException(exception_wrapper(std::current_exception()));
344       }
345     } else {
346       pm->setTry(std::move(t));
347     }
348   });
349
350   return f;
351 }
352
353 // onError(exception_wrapper) that returns T
354 template <class T>
355 template <class F>
356 typename std::enable_if<
357   detail::callableWith<F, exception_wrapper>::value &&
358   !detail::Extract<F>::ReturnsFuture::value,
359   Future<T>>::type
360 Future<T>::onError(F&& func) {
361   static_assert(
362       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
363       "Return type of onError callback must be T or Future<T>");
364
365   Promise<T> p;
366   auto f = p.getFuture();
367   auto pm = folly::makeMoveWrapper(std::move(p));
368   auto funcm = folly::makeMoveWrapper(std::move(func));
369   setCallback_([pm, funcm](Try<T> t) mutable {
370     if (t.hasException()) {
371       pm->setWith([&]{
372         return (*funcm)(std::move(t.exception()));
373       });
374     } else {
375       pm->setTry(std::move(t));
376     }
377   });
378
379   return f;
380 }
381
382 template <class T>
383 typename std::add_lvalue_reference<T>::type Future<T>::value() {
384   throwIfInvalid();
385
386   return core_->getTry().value();
387 }
388
389 template <class T>
390 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
391   throwIfInvalid();
392
393   return core_->getTry().value();
394 }
395
396 template <class T>
397 Try<T>& Future<T>::getTry() {
398   throwIfInvalid();
399
400   return core_->getTry();
401 }
402
403 template <class T>
404 Optional<Try<T>> Future<T>::poll() {
405   Optional<Try<T>> o;
406   if (core_->ready()) {
407     o = std::move(core_->getTry());
408   }
409   return o;
410 }
411
412 template <class T>
413 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
414   throwIfInvalid();
415
416   setExecutor(executor, priority);
417
418   return std::move(*this);
419 }
420
421 template <class T>
422 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
423   throwIfInvalid();
424
425   MoveWrapper<Promise<T>> p;
426   auto f = p->getFuture();
427   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
428   return std::move(f).via(executor, priority);
429 }
430
431
432 template <class Func>
433 auto via(Executor* x, Func func)
434   -> Future<typename isFuture<decltype(func())>::Inner>
435 {
436   // TODO make this actually more performant. :-P #7260175
437   return via(x).then(func);
438 }
439
440 template <class T>
441 bool Future<T>::isReady() const {
442   throwIfInvalid();
443   return core_->ready();
444 }
445
446 template <class T>
447 bool Future<T>::hasValue() {
448   return getTry().hasValue();
449 }
450
451 template <class T>
452 bool Future<T>::hasException() {
453   return getTry().hasException();
454 }
455
456 template <class T>
457 void Future<T>::raise(exception_wrapper exception) {
458   core_->raise(std::move(exception));
459 }
460
461 // makeFuture
462
463 template <class T>
464 Future<typename std::decay<T>::type> makeFuture(T&& t) {
465   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
466 }
467
468 inline // for multiple translation units
469 Future<Unit> makeFuture() {
470   return makeFuture(Unit{});
471 }
472
473 template <class F>
474 auto makeFutureWith(F&& func)
475     -> Future<typename Unit::Lift<decltype(func())>::type> {
476   using LiftedResult = typename Unit::Lift<decltype(func())>::type;
477   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
478     return func();
479   }));
480 }
481
482 template <class T>
483 Future<T> makeFuture(std::exception_ptr const& e) {
484   return makeFuture(Try<T>(e));
485 }
486
487 template <class T>
488 Future<T> makeFuture(exception_wrapper ew) {
489   return makeFuture(Try<T>(std::move(ew)));
490 }
491
492 template <class T, class E>
493 typename std::enable_if<std::is_base_of<std::exception, E>::value,
494                         Future<T>>::type
495 makeFuture(E const& e) {
496   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
497 }
498
499 template <class T>
500 Future<T> makeFuture(Try<T>&& t) {
501   return Future<T>(new detail::Core<T>(std::move(t)));
502 }
503
504 // via
505 Future<Unit> via(Executor* executor, int8_t priority) {
506   return makeFuture().via(executor, priority);
507 }
508
509 // mapSetCallback calls func(i, Try<T>) when every future completes
510
511 template <class T, class InputIterator, class F>
512 void mapSetCallback(InputIterator first, InputIterator last, F func) {
513   for (size_t i = 0; first != last; ++first, ++i) {
514     first->setCallback_([func, i](Try<T>&& t) {
515       func(i, std::move(t));
516     });
517   }
518 }
519
520 // collectAll (variadic)
521
522 template <typename... Fs>
523 typename detail::CollectAllVariadicContext<
524   typename std::decay<Fs>::type::value_type...>::type
525 collectAll(Fs&&... fs) {
526   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
527     typename std::decay<Fs>::type::value_type...>>();
528   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
529     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
530   return ctx->p.getFuture();
531 }
532
533 // collectAll (iterator)
534
535 template <class InputIterator>
536 Future<
537   std::vector<
538   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
539 collectAll(InputIterator first, InputIterator last) {
540   typedef
541     typename std::iterator_traits<InputIterator>::value_type::value_type T;
542
543   struct CollectAllContext {
544     CollectAllContext(int n) : results(n) {}
545     ~CollectAllContext() {
546       p.setValue(std::move(results));
547     }
548     Promise<std::vector<Try<T>>> p;
549     std::vector<Try<T>> results;
550   };
551
552   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
553   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
554     ctx->results[i] = std::move(t);
555   });
556   return ctx->p.getFuture();
557 }
558
559 // collect (iterator)
560
561 namespace detail {
562
563 template <typename T>
564 struct CollectContext {
565   struct Nothing { explicit Nothing(int n) {} };
566
567   using Result = typename std::conditional<
568     std::is_void<T>::value,
569     void,
570     std::vector<T>>::type;
571
572   using InternalResult = typename std::conditional<
573     std::is_void<T>::value,
574     Nothing,
575     std::vector<Optional<T>>>::type;
576
577   explicit CollectContext(int n) : result(n) {}
578   ~CollectContext() {
579     if (!threw.exchange(true)) {
580       // map Optional<T> -> T
581       std::vector<T> finalResult;
582       finalResult.reserve(result.size());
583       std::transform(result.begin(), result.end(),
584                      std::back_inserter(finalResult),
585                      [](Optional<T>& o) { return std::move(o.value()); });
586       p.setValue(std::move(finalResult));
587     }
588   }
589   inline void setPartialResult(size_t i, Try<T>& t) {
590     result[i] = std::move(t.value());
591   }
592   Promise<Result> p;
593   InternalResult result;
594   std::atomic<bool> threw {false};
595 };
596
597 }
598
599 template <class InputIterator>
600 Future<typename detail::CollectContext<
601   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
602 collect(InputIterator first, InputIterator last) {
603   typedef
604     typename std::iterator_traits<InputIterator>::value_type::value_type T;
605
606   auto ctx = std::make_shared<detail::CollectContext<T>>(
607     std::distance(first, last));
608   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
609     if (t.hasException()) {
610        if (!ctx->threw.exchange(true)) {
611          ctx->p.setException(std::move(t.exception()));
612        }
613      } else if (!ctx->threw) {
614        ctx->setPartialResult(i, t);
615      }
616   });
617   return ctx->p.getFuture();
618 }
619
620 // collect (variadic)
621
622 template <typename... Fs>
623 typename detail::CollectVariadicContext<
624   typename std::decay<Fs>::type::value_type...>::type
625 collect(Fs&&... fs) {
626   auto ctx = std::make_shared<detail::CollectVariadicContext<
627     typename std::decay<Fs>::type::value_type...>>();
628   detail::collectVariadicHelper<detail::CollectVariadicContext>(
629     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
630   return ctx->p.getFuture();
631 }
632
633 // collectAny (iterator)
634
635 template <class InputIterator>
636 Future<
637   std::pair<size_t,
638             Try<
639               typename
640               std::iterator_traits<InputIterator>::value_type::value_type>>>
641 collectAny(InputIterator first, InputIterator last) {
642   typedef
643     typename std::iterator_traits<InputIterator>::value_type::value_type T;
644
645   struct CollectAnyContext {
646     CollectAnyContext() {};
647     Promise<std::pair<size_t, Try<T>>> p;
648     std::atomic<bool> done {false};
649   };
650
651   auto ctx = std::make_shared<CollectAnyContext>();
652   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
653     if (!ctx->done.exchange(true)) {
654       ctx->p.setValue(std::make_pair(i, std::move(t)));
655     }
656   });
657   return ctx->p.getFuture();
658 }
659
660 // collectN (iterator)
661
662 template <class InputIterator>
663 Future<std::vector<std::pair<size_t, Try<typename
664   std::iterator_traits<InputIterator>::value_type::value_type>>>>
665 collectN(InputIterator first, InputIterator last, size_t n) {
666   typedef typename
667     std::iterator_traits<InputIterator>::value_type::value_type T;
668   typedef std::vector<std::pair<size_t, Try<T>>> V;
669
670   struct CollectNContext {
671     V v;
672     std::atomic<size_t> completed = {0};
673     Promise<V> p;
674   };
675   auto ctx = std::make_shared<CollectNContext>();
676
677   if (size_t(std::distance(first, last)) < n) {
678     ctx->p.setException(std::runtime_error("Not enough futures"));
679   } else {
680     // for each completed Future, increase count and add to vector, until we
681     // have n completed futures at which point we fulfil our Promise with the
682     // vector
683     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
684       auto c = ++ctx->completed;
685       if (c <= n) {
686         assert(ctx->v.size() < n);
687         ctx->v.emplace_back(i, std::move(t));
688         if (c == n) {
689           ctx->p.setTry(Try<V>(std::move(ctx->v)));
690         }
691       }
692     });
693   }
694
695   return ctx->p.getFuture();
696 }
697
698 // reduce (iterator)
699
700 template <class It, class T, class F>
701 Future<T> reduce(It first, It last, T&& initial, F&& func) {
702   if (first == last) {
703     return makeFuture(std::move(initial));
704   }
705
706   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
707   typedef typename std::conditional<
708     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
709   typedef isTry<Arg> IsTry;
710
711   folly::MoveWrapper<T> minitial(std::move(initial));
712   auto sfunc = std::make_shared<F>(std::move(func));
713
714   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
715     return (*sfunc)(std::move(*minitial),
716                 head.template get<IsTry::value, Arg&&>());
717   });
718
719   for (++first; first != last; ++first) {
720     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
721       return (*sfunc)(std::move(std::get<0>(t).value()),
722                   // Either return a ItT&& or a Try<ItT>&& depending
723                   // on the type of the argument of func.
724                   std::get<1>(t).template get<IsTry::value, Arg&&>());
725     });
726   }
727
728   return f;
729 }
730
731 // window (collection)
732
733 template <class Collection, class F, class ItT, class Result>
734 std::vector<Future<Result>>
735 window(Collection input, F func, size_t n) {
736   struct WindowContext {
737     WindowContext(Collection&& i, F&& fn)
738         : input_(std::move(i)), promises_(input_.size()),
739           func_(std::move(fn))
740       {}
741     std::atomic<size_t> i_ {0};
742     Collection input_;
743     std::vector<Promise<Result>> promises_;
744     F func_;
745
746     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
747       size_t i = ctx->i_++;
748       if (i < ctx->input_.size()) {
749         // Using setCallback_ directly since we don't need the Future
750         ctx->func_(std::move(ctx->input_[i])).setCallback_(
751           // ctx is captured by value
752           [ctx, i](Try<Result>&& t) {
753             ctx->promises_[i].setTry(std::move(t));
754             // Chain another future onto this one
755             spawn(std::move(ctx));
756           });
757       }
758     }
759   };
760
761   auto max = std::min(n, input.size());
762
763   auto ctx = std::make_shared<WindowContext>(
764     std::move(input), std::move(func));
765
766   for (size_t i = 0; i < max; ++i) {
767     // Start the first n Futures
768     WindowContext::spawn(ctx);
769   }
770
771   std::vector<Future<Result>> futures;
772   futures.reserve(ctx->promises_.size());
773   for (auto& promise : ctx->promises_) {
774     futures.emplace_back(promise.getFuture());
775   }
776
777   return futures;
778 }
779
780 // reduce
781
782 template <class T>
783 template <class I, class F>
784 Future<I> Future<T>::reduce(I&& initial, F&& func) {
785   folly::MoveWrapper<I> minitial(std::move(initial));
786   folly::MoveWrapper<F> mfunc(std::move(func));
787   return then([minitial, mfunc](T& vals) mutable {
788     auto ret = std::move(*minitial);
789     for (auto& val : vals) {
790       ret = (*mfunc)(std::move(ret), std::move(val));
791     }
792     return ret;
793   });
794 }
795
796 // unorderedReduce (iterator)
797
798 template <class It, class T, class F, class ItT, class Arg>
799 Future<T> unorderedReduce(It first, It last, T initial, F func) {
800   if (first == last) {
801     return makeFuture(std::move(initial));
802   }
803
804   typedef isTry<Arg> IsTry;
805
806   struct UnorderedReduceContext {
807     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
808         : lock_(), memo_(makeFuture<T>(std::move(memo))),
809           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
810       {};
811     folly::MicroSpinLock lock_; // protects memo_ and numThens_
812     Future<T> memo_;
813     F func_;
814     size_t numThens_; // how many Futures completed and called .then()
815     size_t numFutures_; // how many Futures in total
816     Promise<T> promise_;
817   };
818
819   auto ctx = std::make_shared<UnorderedReduceContext>(
820     std::move(initial), std::move(func), std::distance(first, last));
821
822   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
823     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
824     // Futures can be completed in any order, simultaneously.
825     // To make this non-blocking, we create a new Future chain in
826     // the order of completion to reduce the values.
827     // The spinlock just protects chaining a new Future, not actually
828     // executing the reduce, which should be really fast.
829     folly::MSLGuard lock(ctx->lock_);
830     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
831       // Either return a ItT&& or a Try<ItT>&& depending
832       // on the type of the argument of func.
833       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
834     });
835     if (++ctx->numThens_ == ctx->numFutures_) {
836       // After reducing the value of the last Future, fulfill the Promise
837       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
838         ctx->promise_.setValue(std::move(t2));
839       });
840     }
841   });
842
843   return ctx->promise_.getFuture();
844 }
845
846 // within
847
848 template <class T>
849 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
850   return within(dur, TimedOut(), tk);
851 }
852
853 template <class T>
854 template <class E>
855 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
856
857   struct Context {
858     Context(E ex) : exception(std::move(ex)), promise() {}
859     E exception;
860     Future<Unit> thisFuture;
861     Promise<T> promise;
862     std::atomic<bool> token {false};
863   };
864
865   if (!tk) {
866     tk = folly::detail::getTimekeeperSingleton();
867   }
868
869   auto ctx = std::make_shared<Context>(std::move(e));
870
871   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
872     // TODO: "this" completed first, cancel "after"
873     if (ctx->token.exchange(true) == false) {
874       ctx->promise.setTry(std::move(t));
875     }
876   });
877
878   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
879     // "after" completed first, cancel "this"
880     ctx->thisFuture.raise(TimedOut());
881     if (ctx->token.exchange(true) == false) {
882       if (t.hasException()) {
883         ctx->promise.setException(std::move(t.exception()));
884       } else {
885         ctx->promise.setException(std::move(ctx->exception));
886       }
887     }
888   });
889
890   return ctx->promise.getFuture().via(getExecutor());
891 }
892
893 // delayed
894
895 template <class T>
896 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
897   return collectAll(*this, futures::sleep(dur, tk))
898     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
899       Try<T>& t = std::get<0>(tup);
900       return makeFuture<T>(std::move(t));
901     });
902 }
903
904 namespace detail {
905
906 template <class T>
907 void waitImpl(Future<T>& f) {
908   // short-circuit if there's nothing to do
909   if (f.isReady()) return;
910
911   folly::fibers::Baton baton;
912   f = f.then([&](Try<T> t) {
913     baton.post();
914     return makeFuture(std::move(t));
915   });
916   baton.wait();
917
918   // There's a race here between the return here and the actual finishing of
919   // the future. f is completed, but the setup may not have finished on done
920   // after the baton has posted.
921   while (!f.isReady()) {
922     std::this_thread::yield();
923   }
924 }
925
926 template <class T>
927 void waitImpl(Future<T>& f, Duration dur) {
928   // short-circuit if there's nothing to do
929   if (f.isReady()) return;
930
931   auto baton = std::make_shared<folly::fibers::Baton>();
932   f = f.then([baton](Try<T> t) {
933     baton->post();
934     return makeFuture(std::move(t));
935   });
936
937   // Let's preserve the invariant that if we did not timeout (timed_wait returns
938   // true), then the returned Future is complete when it is returned to the
939   // caller. We need to wait out the race for that Future to complete.
940   if (baton->timed_wait(dur)) {
941     while (!f.isReady()) {
942       std::this_thread::yield();
943     }
944   }
945 }
946
947 template <class T>
948 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
949   while (!f.isReady()) {
950     e->drive();
951   }
952 }
953
954 } // detail
955
956 template <class T>
957 Future<T>& Future<T>::wait() & {
958   detail::waitImpl(*this);
959   return *this;
960 }
961
962 template <class T>
963 Future<T>&& Future<T>::wait() && {
964   detail::waitImpl(*this);
965   return std::move(*this);
966 }
967
968 template <class T>
969 Future<T>& Future<T>::wait(Duration dur) & {
970   detail::waitImpl(*this, dur);
971   return *this;
972 }
973
974 template <class T>
975 Future<T>&& Future<T>::wait(Duration dur) && {
976   detail::waitImpl(*this, dur);
977   return std::move(*this);
978 }
979
980 template <class T>
981 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
982   detail::waitViaImpl(*this, e);
983   return *this;
984 }
985
986 template <class T>
987 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
988   detail::waitViaImpl(*this, e);
989   return std::move(*this);
990 }
991
992 template <class T>
993 T Future<T>::get() {
994   return std::move(wait().value());
995 }
996
997 template <class T>
998 T Future<T>::get(Duration dur) {
999   wait(dur);
1000   if (isReady()) {
1001     return std::move(value());
1002   } else {
1003     throw TimedOut();
1004   }
1005 }
1006
1007 template <class T>
1008 T Future<T>::getVia(DrivableExecutor* e) {
1009   return std::move(waitVia(e).value());
1010 }
1011
1012 namespace detail {
1013   template <class T>
1014   struct TryEquals {
1015     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1016       return t1.value() == t2.value();
1017     }
1018   };
1019 }
1020
1021 template <class T>
1022 Future<bool> Future<T>::willEqual(Future<T>& f) {
1023   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1024     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1025       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1026     } else {
1027       return false;
1028       }
1029   });
1030 }
1031
1032 template <class T>
1033 template <class F>
1034 Future<T> Future<T>::filter(F predicate) {
1035   auto p = folly::makeMoveWrapper(std::move(predicate));
1036   return this->then([p](T val) {
1037     T const& valConstRef = val;
1038     if (!(*p)(valConstRef)) {
1039       throw PredicateDoesNotObtain();
1040     }
1041     return val;
1042   });
1043 }
1044
1045 template <class T>
1046 template <class Callback>
1047 auto Future<T>::thenMulti(Callback&& fn)
1048     -> decltype(this->then(std::forward<Callback>(fn))) {
1049   // thenMulti with one callback is just a then
1050   return then(std::forward<Callback>(fn));
1051 }
1052
1053 template <class T>
1054 template <class Callback, class... Callbacks>
1055 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1056     -> decltype(this->then(std::forward<Callback>(fn)).
1057                       thenMulti(std::forward<Callbacks>(fns)...)) {
1058   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1059   return then(std::forward<Callback>(fn)).
1060          thenMulti(std::forward<Callbacks>(fns)...);
1061 }
1062
1063 template <class T>
1064 template <class Callback, class... Callbacks>
1065 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1066                                       Callbacks&&... fns)
1067     -> decltype(this->then(std::forward<Callback>(fn)).
1068                       thenMulti(std::forward<Callbacks>(fns)...)) {
1069   // thenMultiExecutor with two callbacks is
1070   // via(x).then(a).thenMulti(b, ...).via(oldX)
1071   auto oldX = getExecutor();
1072   setExecutor(x);
1073   return then(std::forward<Callback>(fn)).
1074          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1075 }
1076
1077 template <class T>
1078 template <class Callback>
1079 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1080     -> decltype(this->then(std::forward<Callback>(fn))) {
1081   // thenMulti with one callback is just a then with an executor
1082   return then(x, std::forward<Callback>(fn));
1083 }
1084
1085 namespace futures {
1086   template <class It, class F, class ItT, class Result>
1087   std::vector<Future<Result>> map(It first, It last, F func) {
1088     std::vector<Future<Result>> results;
1089     for (auto it = first; it != last; it++) {
1090       results.push_back(it->then(func));
1091     }
1092     return results;
1093   }
1094 }
1095
1096 // Instantiate the most common Future types to save compile time
1097 extern template class Future<Unit>;
1098 extern template class Future<bool>;
1099 extern template class Future<int>;
1100 extern template class Future<int64_t>;
1101 extern template class Future<std::string>;
1102 extern template class Future<double>;
1103
1104 } // namespace folly