Nuke Future<void> (folly/futures)
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
26
27 namespace folly {
28
29 class Timekeeper;
30
31 namespace detail {
32   Timekeeper* getTimekeeperSingleton();
33 }
34
35 template <class T>
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37   other.core_ = nullptr;
38 }
39
40 template <class T>
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42   std::swap(core_, other.core_);
43   return *this;
44 }
45
46 template <class T>
47 template <class T2, typename>
48 Future<T>::Future(T2&& val)
49   : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
50
51 template <class T>
52 template <typename, typename>
53 Future<T>::Future()
54   : core_(new detail::Core<T>(Try<T>(T()))) {}
55
56 template <class T>
57 Future<T>::~Future() {
58   detach();
59 }
60
61 template <class T>
62 void Future<T>::detach() {
63   if (core_) {
64     core_->detachFuture();
65     core_ = nullptr;
66   }
67 }
68
69 template <class T>
70 void Future<T>::throwIfInvalid() const {
71   if (!core_)
72     throw NoState();
73 }
74
75 template <class T>
76 template <class F>
77 void Future<T>::setCallback_(F&& func) {
78   throwIfInvalid();
79   core_->setCallback(std::move(func));
80 }
81
82 // unwrap
83
84 template <class T>
85 template <class F>
86 typename std::enable_if<isFuture<F>::value,
87                         Future<typename isFuture<T>::Inner>>::type
88 Future<T>::unwrap() {
89   return then([](Future<typename isFuture<T>::Inner> internal_future) {
90       return internal_future;
91   });
92 }
93
94 // then
95
96 // Variant: returns a value
97 // e.g. f.then([](Try<T>&& t){ return t.value(); });
98 template <class T>
99 template <typename F, typename R, bool isTry, typename... Args>
100 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
101 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
102   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
103   typedef typename R::ReturnsFuture::Inner B;
104
105   throwIfInvalid();
106
107   // wrap these so we can move them into the lambda
108   folly::MoveWrapper<Promise<B>> p;
109   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
110   folly::MoveWrapper<F> funcm(std::forward<F>(func));
111
112   // grab the Future now before we lose our handle on the Promise
113   auto f = p->getFuture();
114   f.core_->setExecutorNoLock(getExecutor());
115
116   /* This is a bit tricky.
117
118      We can't just close over *this in case this Future gets moved. So we
119      make a new dummy Future. We could figure out something more
120      sophisticated that avoids making a new Future object when it can, as an
121      optimization. But this is correct.
122
123      core_ can't be moved, it is explicitly disallowed (as is copying). But
124      if there's ever a reason to allow it, this is one place that makes that
125      assumption and would need to be fixed. We use a standard shared pointer
126      for core_ (by copying it in), which means in essence obj holds a shared
127      pointer to itself.  But this shouldn't leak because Promise will not
128      outlive the continuation, because Promise will setException() with a
129      broken Promise if it is destructed before completed. We could use a
130      weak pointer but it would have to be converted to a shared pointer when
131      func is executed (because the Future returned by func may possibly
132      persist beyond the callback, if it gets moved), and so it is an
133      optimization to just make it shared from the get-go.
134
135      We have to move in the Promise and func using the MoveWrapper
136      hack. (func could be copied but it's a big drag on perf).
137
138      Two subtle but important points about this design. detail::Core has no
139      back pointers to Future or Promise, so if Future or Promise get moved
140      (and they will be moved in performant code) we don't have to do
141      anything fancy. And because we store the continuation in the
142      detail::Core, not in the Future, we can execute the continuation even
143      after the Future has gone out of scope. This is an intentional design
144      decision. It is likely we will want to be able to cancel a continuation
145      in some circumstances, but I think it should be explicit not implicit
146      in the destruction of the Future used to create it.
147      */
148   setCallback_(
149     [p, funcm](Try<T>&& t) mutable {
150       if (!isTry && t.hasException()) {
151         p->setException(std::move(t.exception()));
152       } else {
153         p->setWith([&]() {
154           return (*funcm)(t.template get<isTry, Args>()...);
155         });
156       }
157     });
158
159   return f;
160 }
161
162 // Variant: returns a Future
163 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
164 template <class T>
165 template <typename F, typename R, bool isTry, typename... Args>
166 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
167 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
168   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
169   typedef typename R::ReturnsFuture::Inner B;
170
171   throwIfInvalid();
172
173   // wrap these so we can move them into the lambda
174   folly::MoveWrapper<Promise<B>> p;
175   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
176   folly::MoveWrapper<F> funcm(std::forward<F>(func));
177
178   // grab the Future now before we lose our handle on the Promise
179   auto f = p->getFuture();
180   f.core_->setExecutorNoLock(getExecutor());
181
182   setCallback_(
183     [p, funcm](Try<T>&& t) mutable {
184       if (!isTry && t.hasException()) {
185         p->setException(std::move(t.exception()));
186       } else {
187         try {
188           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
189           // that didn't throw, now we can steal p
190           f2.setCallback_([p](Try<B>&& b) mutable {
191             p->setTry(std::move(b));
192           });
193         } catch (const std::exception& e) {
194           p->setException(exception_wrapper(std::current_exception(), e));
195         } catch (...) {
196           p->setException(exception_wrapper(std::current_exception()));
197         }
198       }
199     });
200
201   return f;
202 }
203
204 template <typename T>
205 template <typename R, typename Caller, typename... Args>
206   Future<typename isFuture<R>::Inner>
207 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
208   typedef typename std::remove_cv<
209     typename std::remove_reference<
210       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
211   return then([instance, func](Try<T>&& t){
212     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
213   });
214 }
215
216 template <class T>
217 template <class Executor, class Arg, class... Args>
218 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
219   -> decltype(this->then(std::forward<Arg>(arg),
220                          std::forward<Args>(args)...))
221 {
222   auto oldX = getExecutor();
223   setExecutor(x);
224   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
225                via(oldX);
226 }
227
228 template <class T>
229 Future<Unit> Future<T>::then() {
230   return then([] () {});
231 }
232
233 // onError where the callback returns T
234 template <class T>
235 template <class F>
236 typename std::enable_if<
237   !detail::callableWith<F, exception_wrapper>::value &&
238   !detail::Extract<F>::ReturnsFuture::value,
239   Future<T>>::type
240 Future<T>::onError(F&& func) {
241   typedef typename detail::Extract<F>::FirstArg Exn;
242   static_assert(
243       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
244       "Return type of onError callback must be T or Future<T>");
245
246   Promise<T> p;
247   auto f = p.getFuture();
248   auto pm = folly::makeMoveWrapper(std::move(p));
249   auto funcm = folly::makeMoveWrapper(std::move(func));
250   setCallback_([pm, funcm](Try<T>&& t) mutable {
251     if (!t.template withException<Exn>([&] (Exn& e) {
252           pm->setWith([&]{
253             return (*funcm)(e);
254           });
255         })) {
256       pm->setTry(std::move(t));
257     }
258   });
259
260   return f;
261 }
262
263 // onError where the callback returns Future<T>
264 template <class T>
265 template <class F>
266 typename std::enable_if<
267   !detail::callableWith<F, exception_wrapper>::value &&
268   detail::Extract<F>::ReturnsFuture::value,
269   Future<T>>::type
270 Future<T>::onError(F&& func) {
271   static_assert(
272       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
273       "Return type of onError callback must be T or Future<T>");
274   typedef typename detail::Extract<F>::FirstArg Exn;
275
276   Promise<T> p;
277   auto f = p.getFuture();
278   auto pm = folly::makeMoveWrapper(std::move(p));
279   auto funcm = folly::makeMoveWrapper(std::move(func));
280   setCallback_([pm, funcm](Try<T>&& t) mutable {
281     if (!t.template withException<Exn>([&] (Exn& e) {
282           try {
283             auto f2 = (*funcm)(e);
284             f2.setCallback_([pm](Try<T>&& t2) mutable {
285               pm->setTry(std::move(t2));
286             });
287           } catch (const std::exception& e2) {
288             pm->setException(exception_wrapper(std::current_exception(), e2));
289           } catch (...) {
290             pm->setException(exception_wrapper(std::current_exception()));
291           }
292         })) {
293       pm->setTry(std::move(t));
294     }
295   });
296
297   return f;
298 }
299
300 template <class T>
301 template <class F>
302 Future<T> Future<T>::ensure(F func) {
303   MoveWrapper<F> funcw(std::move(func));
304   return this->then([funcw](Try<T>&& t) {
305     (*funcw)();
306     return makeFuture(std::move(t));
307   });
308 }
309
310 template <class T>
311 template <class F>
312 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
313   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
314   return within(dur, tk)
315     .onError([funcw](TimedOut const&) { return (*funcw)(); });
316 }
317
318 template <class T>
319 template <class F>
320 typename std::enable_if<
321   detail::callableWith<F, exception_wrapper>::value &&
322   detail::Extract<F>::ReturnsFuture::value,
323   Future<T>>::type
324 Future<T>::onError(F&& func) {
325   static_assert(
326       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
327       "Return type of onError callback must be T or Future<T>");
328
329   Promise<T> p;
330   auto f = p.getFuture();
331   auto pm = folly::makeMoveWrapper(std::move(p));
332   auto funcm = folly::makeMoveWrapper(std::move(func));
333   setCallback_([pm, funcm](Try<T> t) mutable {
334     if (t.hasException()) {
335       try {
336         auto f2 = (*funcm)(std::move(t.exception()));
337         f2.setCallback_([pm](Try<T> t2) mutable {
338           pm->setTry(std::move(t2));
339         });
340       } catch (const std::exception& e2) {
341         pm->setException(exception_wrapper(std::current_exception(), e2));
342       } catch (...) {
343         pm->setException(exception_wrapper(std::current_exception()));
344       }
345     } else {
346       pm->setTry(std::move(t));
347     }
348   });
349
350   return f;
351 }
352
353 // onError(exception_wrapper) that returns T
354 template <class T>
355 template <class F>
356 typename std::enable_if<
357   detail::callableWith<F, exception_wrapper>::value &&
358   !detail::Extract<F>::ReturnsFuture::value,
359   Future<T>>::type
360 Future<T>::onError(F&& func) {
361   static_assert(
362       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
363       "Return type of onError callback must be T or Future<T>");
364
365   Promise<T> p;
366   auto f = p.getFuture();
367   auto pm = folly::makeMoveWrapper(std::move(p));
368   auto funcm = folly::makeMoveWrapper(std::move(func));
369   setCallback_([pm, funcm](Try<T> t) mutable {
370     if (t.hasException()) {
371       pm->setWith([&]{
372         return (*funcm)(std::move(t.exception()));
373       });
374     } else {
375       pm->setTry(std::move(t));
376     }
377   });
378
379   return f;
380 }
381
382 template <class T>
383 typename std::add_lvalue_reference<T>::type Future<T>::value() {
384   throwIfInvalid();
385
386   return core_->getTry().value();
387 }
388
389 template <class T>
390 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
391   throwIfInvalid();
392
393   return core_->getTry().value();
394 }
395
396 template <class T>
397 Try<T>& Future<T>::getTry() {
398   throwIfInvalid();
399
400   return core_->getTry();
401 }
402
403 template <class T>
404 Optional<Try<T>> Future<T>::poll() {
405   Optional<Try<T>> o;
406   if (core_->ready()) {
407     o = std::move(core_->getTry());
408   }
409   return o;
410 }
411
412 template <class T>
413 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
414   throwIfInvalid();
415
416   setExecutor(executor, priority);
417
418   return std::move(*this);
419 }
420
421 template <class T>
422 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
423   throwIfInvalid();
424
425   MoveWrapper<Promise<T>> p;
426   auto f = p->getFuture();
427   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
428   return std::move(f).via(executor, priority);
429 }
430
431
432 template <class Func>
433 auto via(Executor* x, Func func)
434   -> Future<typename isFuture<decltype(func())>::Inner>
435 {
436   // TODO make this actually more performant. :-P #7260175
437   return via(x).then(func);
438 }
439
440 template <class T>
441 bool Future<T>::isReady() const {
442   throwIfInvalid();
443   return core_->ready();
444 }
445
446 template <class T>
447 bool Future<T>::hasValue() {
448   return getTry().hasValue();
449 }
450
451 template <class T>
452 bool Future<T>::hasException() {
453   return getTry().hasException();
454 }
455
456 template <class T>
457 void Future<T>::raise(exception_wrapper exception) {
458   core_->raise(std::move(exception));
459 }
460
461 // makeFuture
462
463 template <class T>
464 Future<typename std::decay<T>::type> makeFuture(T&& t) {
465   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
466 }
467
468 inline // for multiple translation units
469 Future<Unit> makeFuture() {
470   return makeFuture(Unit{});
471 }
472
473 // XXX why is the dual necessary here? Can't we just use perfect forwarding
474 // and capture func by reference always?
475 template <class F>
476 auto makeFutureWith(
477     F&& func,
478     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
479     -> Future<typename Unit::Lift<decltype(func())>::type> {
480   using LiftedResult = typename Unit::Lift<decltype(func())>::type;
481   return makeFuture<LiftedResult>(makeTryWith([&func]() {
482     return (func)();
483   }));
484 }
485
486 template <class F>
487 auto makeFutureWith(F const& func)
488     -> Future<typename Unit::Lift<decltype(func())>::type> {
489   F copy = func;
490   using LiftedResult = typename Unit::Lift<decltype(func())>::type;
491   return makeFuture<LiftedResult>(makeTryWith(std::move(copy)));
492 }
493
494 template <class T>
495 Future<T> makeFuture(std::exception_ptr const& e) {
496   return makeFuture(Try<T>(e));
497 }
498
499 template <class T>
500 Future<T> makeFuture(exception_wrapper ew) {
501   return makeFuture(Try<T>(std::move(ew)));
502 }
503
504 template <class T, class E>
505 typename std::enable_if<std::is_base_of<std::exception, E>::value,
506                         Future<T>>::type
507 makeFuture(E const& e) {
508   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
509 }
510
511 template <class T>
512 Future<T> makeFuture(Try<T>&& t) {
513   return Future<T>(new detail::Core<T>(std::move(t)));
514 }
515
516 // via
517 Future<Unit> via(Executor* executor, int8_t priority) {
518   return makeFuture().via(executor, priority);
519 }
520
521 // mapSetCallback calls func(i, Try<T>) when every future completes
522
523 template <class T, class InputIterator, class F>
524 void mapSetCallback(InputIterator first, InputIterator last, F func) {
525   for (size_t i = 0; first != last; ++first, ++i) {
526     first->setCallback_([func, i](Try<T>&& t) {
527       func(i, std::move(t));
528     });
529   }
530 }
531
532 // collectAll (variadic)
533
534 template <typename... Fs>
535 typename detail::CollectAllVariadicContext<
536   typename std::decay<Fs>::type::value_type...>::type
537 collectAll(Fs&&... fs) {
538   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
539     typename std::decay<Fs>::type::value_type...>>();
540   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
541     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
542   return ctx->p.getFuture();
543 }
544
545 // collectAll (iterator)
546
547 template <class InputIterator>
548 Future<
549   std::vector<
550   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
551 collectAll(InputIterator first, InputIterator last) {
552   typedef
553     typename std::iterator_traits<InputIterator>::value_type::value_type T;
554
555   struct CollectAllContext {
556     CollectAllContext(int n) : results(n) {}
557     ~CollectAllContext() {
558       p.setValue(std::move(results));
559     }
560     Promise<std::vector<Try<T>>> p;
561     std::vector<Try<T>> results;
562   };
563
564   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
565   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
566     ctx->results[i] = std::move(t);
567   });
568   return ctx->p.getFuture();
569 }
570
571 // collect (iterator)
572
573 namespace detail {
574
575 template <typename T>
576 struct CollectContext {
577   struct Nothing { explicit Nothing(int n) {} };
578
579   using Result = typename std::conditional<
580     std::is_void<T>::value,
581     void,
582     std::vector<T>>::type;
583
584   using InternalResult = typename std::conditional<
585     std::is_void<T>::value,
586     Nothing,
587     std::vector<Optional<T>>>::type;
588
589   explicit CollectContext(int n) : result(n) {}
590   ~CollectContext() {
591     if (!threw.exchange(true)) {
592       // map Optional<T> -> T
593       std::vector<T> finalResult;
594       finalResult.reserve(result.size());
595       std::transform(result.begin(), result.end(),
596                      std::back_inserter(finalResult),
597                      [](Optional<T>& o) { return std::move(o.value()); });
598       p.setValue(std::move(finalResult));
599     }
600   }
601   inline void setPartialResult(size_t i, Try<T>& t) {
602     result[i] = std::move(t.value());
603   }
604   Promise<Result> p;
605   InternalResult result;
606   std::atomic<bool> threw {false};
607 };
608
609 }
610
611 template <class InputIterator>
612 Future<typename detail::CollectContext<
613   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
614 collect(InputIterator first, InputIterator last) {
615   typedef
616     typename std::iterator_traits<InputIterator>::value_type::value_type T;
617
618   auto ctx = std::make_shared<detail::CollectContext<T>>(
619     std::distance(first, last));
620   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
621     if (t.hasException()) {
622        if (!ctx->threw.exchange(true)) {
623          ctx->p.setException(std::move(t.exception()));
624        }
625      } else if (!ctx->threw) {
626        ctx->setPartialResult(i, t);
627      }
628   });
629   return ctx->p.getFuture();
630 }
631
632 // collect (variadic)
633
634 template <typename... Fs>
635 typename detail::CollectVariadicContext<
636   typename std::decay<Fs>::type::value_type...>::type
637 collect(Fs&&... fs) {
638   auto ctx = std::make_shared<detail::CollectVariadicContext<
639     typename std::decay<Fs>::type::value_type...>>();
640   detail::collectVariadicHelper<detail::CollectVariadicContext>(
641     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
642   return ctx->p.getFuture();
643 }
644
645 // collectAny (iterator)
646
647 template <class InputIterator>
648 Future<
649   std::pair<size_t,
650             Try<
651               typename
652               std::iterator_traits<InputIterator>::value_type::value_type>>>
653 collectAny(InputIterator first, InputIterator last) {
654   typedef
655     typename std::iterator_traits<InputIterator>::value_type::value_type T;
656
657   struct CollectAnyContext {
658     CollectAnyContext() {};
659     Promise<std::pair<size_t, Try<T>>> p;
660     std::atomic<bool> done {false};
661   };
662
663   auto ctx = std::make_shared<CollectAnyContext>();
664   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
665     if (!ctx->done.exchange(true)) {
666       ctx->p.setValue(std::make_pair(i, std::move(t)));
667     }
668   });
669   return ctx->p.getFuture();
670 }
671
672 // collectN (iterator)
673
674 template <class InputIterator>
675 Future<std::vector<std::pair<size_t, Try<typename
676   std::iterator_traits<InputIterator>::value_type::value_type>>>>
677 collectN(InputIterator first, InputIterator last, size_t n) {
678   typedef typename
679     std::iterator_traits<InputIterator>::value_type::value_type T;
680   typedef std::vector<std::pair<size_t, Try<T>>> V;
681
682   struct CollectNContext {
683     V v;
684     std::atomic<size_t> completed = {0};
685     Promise<V> p;
686   };
687   auto ctx = std::make_shared<CollectNContext>();
688
689   if (size_t(std::distance(first, last)) < n) {
690     ctx->p.setException(std::runtime_error("Not enough futures"));
691   } else {
692     // for each completed Future, increase count and add to vector, until we
693     // have n completed futures at which point we fulfil our Promise with the
694     // vector
695     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
696       auto c = ++ctx->completed;
697       if (c <= n) {
698         assert(ctx->v.size() < n);
699         ctx->v.emplace_back(i, std::move(t));
700         if (c == n) {
701           ctx->p.setTry(Try<V>(std::move(ctx->v)));
702         }
703       }
704     });
705   }
706
707   return ctx->p.getFuture();
708 }
709
710 // reduce (iterator)
711
712 template <class It, class T, class F>
713 Future<T> reduce(It first, It last, T&& initial, F&& func) {
714   if (first == last) {
715     return makeFuture(std::move(initial));
716   }
717
718   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
719   typedef typename std::conditional<
720     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
721   typedef isTry<Arg> IsTry;
722
723   folly::MoveWrapper<T> minitial(std::move(initial));
724   auto sfunc = std::make_shared<F>(std::move(func));
725
726   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
727     return (*sfunc)(std::move(*minitial),
728                 head.template get<IsTry::value, Arg&&>());
729   });
730
731   for (++first; first != last; ++first) {
732     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
733       return (*sfunc)(std::move(std::get<0>(t).value()),
734                   // Either return a ItT&& or a Try<ItT>&& depending
735                   // on the type of the argument of func.
736                   std::get<1>(t).template get<IsTry::value, Arg&&>());
737     });
738   }
739
740   return f;
741 }
742
743 // window (collection)
744
745 template <class Collection, class F, class ItT, class Result>
746 std::vector<Future<Result>>
747 window(Collection input, F func, size_t n) {
748   struct WindowContext {
749     WindowContext(Collection&& i, F&& fn)
750         : input_(std::move(i)), promises_(input_.size()),
751           func_(std::move(fn))
752       {}
753     std::atomic<size_t> i_ {0};
754     Collection input_;
755     std::vector<Promise<Result>> promises_;
756     F func_;
757
758     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
759       size_t i = ctx->i_++;
760       if (i < ctx->input_.size()) {
761         // Using setCallback_ directly since we don't need the Future
762         ctx->func_(std::move(ctx->input_[i])).setCallback_(
763           // ctx is captured by value
764           [ctx, i](Try<Result>&& t) {
765             ctx->promises_[i].setTry(std::move(t));
766             // Chain another future onto this one
767             spawn(std::move(ctx));
768           });
769       }
770     }
771   };
772
773   auto max = std::min(n, input.size());
774
775   auto ctx = std::make_shared<WindowContext>(
776     std::move(input), std::move(func));
777
778   for (size_t i = 0; i < max; ++i) {
779     // Start the first n Futures
780     WindowContext::spawn(ctx);
781   }
782
783   std::vector<Future<Result>> futures;
784   futures.reserve(ctx->promises_.size());
785   for (auto& promise : ctx->promises_) {
786     futures.emplace_back(promise.getFuture());
787   }
788
789   return futures;
790 }
791
792 // reduce
793
794 template <class T>
795 template <class I, class F>
796 Future<I> Future<T>::reduce(I&& initial, F&& func) {
797   folly::MoveWrapper<I> minitial(std::move(initial));
798   folly::MoveWrapper<F> mfunc(std::move(func));
799   return then([minitial, mfunc](T& vals) mutable {
800     auto ret = std::move(*minitial);
801     for (auto& val : vals) {
802       ret = (*mfunc)(std::move(ret), std::move(val));
803     }
804     return ret;
805   });
806 }
807
808 // unorderedReduce (iterator)
809
810 template <class It, class T, class F, class ItT, class Arg>
811 Future<T> unorderedReduce(It first, It last, T initial, F func) {
812   if (first == last) {
813     return makeFuture(std::move(initial));
814   }
815
816   typedef isTry<Arg> IsTry;
817
818   struct UnorderedReduceContext {
819     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
820         : lock_(), memo_(makeFuture<T>(std::move(memo))),
821           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
822       {};
823     folly::MicroSpinLock lock_; // protects memo_ and numThens_
824     Future<T> memo_;
825     F func_;
826     size_t numThens_; // how many Futures completed and called .then()
827     size_t numFutures_; // how many Futures in total
828     Promise<T> promise_;
829   };
830
831   auto ctx = std::make_shared<UnorderedReduceContext>(
832     std::move(initial), std::move(func), std::distance(first, last));
833
834   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
835     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
836     // Futures can be completed in any order, simultaneously.
837     // To make this non-blocking, we create a new Future chain in
838     // the order of completion to reduce the values.
839     // The spinlock just protects chaining a new Future, not actually
840     // executing the reduce, which should be really fast.
841     folly::MSLGuard lock(ctx->lock_);
842     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
843       // Either return a ItT&& or a Try<ItT>&& depending
844       // on the type of the argument of func.
845       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
846     });
847     if (++ctx->numThens_ == ctx->numFutures_) {
848       // After reducing the value of the last Future, fulfill the Promise
849       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
850         ctx->promise_.setValue(std::move(t2));
851       });
852     }
853   });
854
855   return ctx->promise_.getFuture();
856 }
857
858 // within
859
860 template <class T>
861 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
862   return within(dur, TimedOut(), tk);
863 }
864
865 template <class T>
866 template <class E>
867 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
868
869   struct Context {
870     Context(E ex) : exception(std::move(ex)), promise() {}
871     E exception;
872     Promise<T> promise;
873     std::atomic<bool> token {false};
874   };
875   auto ctx = std::make_shared<Context>(std::move(e));
876
877   if (!tk) {
878     tk = folly::detail::getTimekeeperSingleton();
879   }
880
881   tk->after(dur)
882     .then([ctx](Try<Unit> const& t) {
883       if (ctx->token.exchange(true) == false) {
884         if (t.hasException()) {
885           ctx->promise.setException(std::move(t.exception()));
886         } else {
887           ctx->promise.setException(std::move(ctx->exception));
888         }
889       }
890     });
891
892   this->then([ctx](Try<T>&& t) {
893     if (ctx->token.exchange(true) == false) {
894       ctx->promise.setTry(std::move(t));
895     }
896   });
897
898   return ctx->promise.getFuture().via(getExecutor());
899 }
900
901 // delayed
902
903 template <class T>
904 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
905   return collectAll(*this, futures::sleep(dur, tk))
906     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
907       Try<T>& t = std::get<0>(tup);
908       return makeFuture<T>(std::move(t));
909     });
910 }
911
912 namespace detail {
913
914 template <class T>
915 void waitImpl(Future<T>& f) {
916   // short-circuit if there's nothing to do
917   if (f.isReady()) return;
918
919   folly::fibers::Baton baton;
920   f = f.then([&](Try<T> t) {
921     baton.post();
922     return makeFuture(std::move(t));
923   });
924   baton.wait();
925
926   // There's a race here between the return here and the actual finishing of
927   // the future. f is completed, but the setup may not have finished on done
928   // after the baton has posted.
929   while (!f.isReady()) {
930     std::this_thread::yield();
931   }
932 }
933
934 template <class T>
935 void waitImpl(Future<T>& f, Duration dur) {
936   // short-circuit if there's nothing to do
937   if (f.isReady()) return;
938
939   auto baton = std::make_shared<folly::fibers::Baton>();
940   f = f.then([baton](Try<T> t) {
941     baton->post();
942     return makeFuture(std::move(t));
943   });
944
945   // Let's preserve the invariant that if we did not timeout (timed_wait returns
946   // true), then the returned Future is complete when it is returned to the
947   // caller. We need to wait out the race for that Future to complete.
948   if (baton->timed_wait(dur)) {
949     while (!f.isReady()) {
950       std::this_thread::yield();
951     }
952   }
953 }
954
955 template <class T>
956 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
957   while (!f.isReady()) {
958     e->drive();
959   }
960 }
961
962 } // detail
963
964 template <class T>
965 Future<T>& Future<T>::wait() & {
966   detail::waitImpl(*this);
967   return *this;
968 }
969
970 template <class T>
971 Future<T>&& Future<T>::wait() && {
972   detail::waitImpl(*this);
973   return std::move(*this);
974 }
975
976 template <class T>
977 Future<T>& Future<T>::wait(Duration dur) & {
978   detail::waitImpl(*this, dur);
979   return *this;
980 }
981
982 template <class T>
983 Future<T>&& Future<T>::wait(Duration dur) && {
984   detail::waitImpl(*this, dur);
985   return std::move(*this);
986 }
987
988 template <class T>
989 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
990   detail::waitViaImpl(*this, e);
991   return *this;
992 }
993
994 template <class T>
995 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
996   detail::waitViaImpl(*this, e);
997   return std::move(*this);
998 }
999
1000 template <class T>
1001 T Future<T>::get() {
1002   return std::move(wait().value());
1003 }
1004
1005 template <class T>
1006 T Future<T>::get(Duration dur) {
1007   wait(dur);
1008   if (isReady()) {
1009     return std::move(value());
1010   } else {
1011     throw TimedOut();
1012   }
1013 }
1014
1015 template <class T>
1016 T Future<T>::getVia(DrivableExecutor* e) {
1017   return std::move(waitVia(e).value());
1018 }
1019
1020 namespace detail {
1021   template <class T>
1022   struct TryEquals {
1023     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1024       return t1.value() == t2.value();
1025     }
1026   };
1027 }
1028
1029 template <class T>
1030 Future<bool> Future<T>::willEqual(Future<T>& f) {
1031   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1032     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1033       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1034     } else {
1035       return false;
1036       }
1037   });
1038 }
1039
1040 template <class T>
1041 template <class F>
1042 Future<T> Future<T>::filter(F predicate) {
1043   auto p = folly::makeMoveWrapper(std::move(predicate));
1044   return this->then([p](T val) {
1045     T const& valConstRef = val;
1046     if (!(*p)(valConstRef)) {
1047       throw PredicateDoesNotObtain();
1048     }
1049     return val;
1050   });
1051 }
1052
1053 template <class T>
1054 template <class Callback>
1055 auto Future<T>::thenMulti(Callback&& fn)
1056     -> decltype(this->then(std::forward<Callback>(fn))) {
1057   // thenMulti with one callback is just a then
1058   return then(std::forward<Callback>(fn));
1059 }
1060
1061 template <class T>
1062 template <class Callback, class... Callbacks>
1063 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1064     -> decltype(this->then(std::forward<Callback>(fn)).
1065                       thenMulti(std::forward<Callbacks>(fns)...)) {
1066   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1067   return then(std::forward<Callback>(fn)).
1068          thenMulti(std::forward<Callbacks>(fns)...);
1069 }
1070
1071 template <class T>
1072 template <class Callback, class... Callbacks>
1073 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1074                                       Callbacks&&... fns)
1075     -> decltype(this->then(std::forward<Callback>(fn)).
1076                       thenMulti(std::forward<Callbacks>(fns)...)) {
1077   // thenMultiExecutor with two callbacks is
1078   // via(x).then(a).thenMulti(b, ...).via(oldX)
1079   auto oldX = getExecutor();
1080   setExecutor(x);
1081   return then(std::forward<Callback>(fn)).
1082          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1083 }
1084
1085 template <class T>
1086 template <class Callback>
1087 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1088     -> decltype(this->then(std::forward<Callback>(fn))) {
1089   // thenMulti with one callback is just a then with an executor
1090   return then(x, std::forward<Callback>(fn));
1091 }
1092
1093 namespace futures {
1094   template <class It, class F, class ItT, class Result>
1095   std::vector<Future<Result>> map(It first, It last, F func) {
1096     std::vector<Future<Result>> results;
1097     for (auto it = first; it != last; it++) {
1098       results.push_back(it->then(func));
1099     }
1100     return results;
1101   }
1102 }
1103
1104 // Instantiate the most common Future types to save compile time
1105 extern template class Future<Unit>;
1106 extern template class Future<bool>;
1107 extern template class Future<int>;
1108 extern template class Future<int64_t>;
1109 extern template class Future<std::string>;
1110 extern template class Future<double>;
1111
1112 } // namespace folly