Revert my change (which broke down the cont build)
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <chrono>
21 #include <random>
22 #include <thread>
23
24 #include <folly/experimental/fibers/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 namespace folly {
32
33 class Timekeeper;
34
35 namespace detail {
36   Timekeeper* getTimekeeperSingleton();
37 }
38
39 template <class T>
40 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
41   other.core_ = nullptr;
42 }
43
44 template <class T>
45 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
46   std::swap(core_, other.core_);
47   return *this;
48 }
49
50 template <class T>
51 template <class T2, typename>
52 Future<T>::Future(T2&& val)
53   : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
54
55 template <class T>
56 template <typename, typename>
57 Future<T>::Future()
58   : core_(new detail::Core<T>(Try<T>(T()))) {}
59
60 template <class T>
61 Future<T>::~Future() {
62   detach();
63 }
64
65 template <class T>
66 void Future<T>::detach() {
67   if (core_) {
68     core_->detachFuture();
69     core_ = nullptr;
70   }
71 }
72
73 template <class T>
74 void Future<T>::throwIfInvalid() const {
75   if (!core_)
76     throw NoState();
77 }
78
79 template <class T>
80 template <class F>
81 void Future<T>::setCallback_(F&& func) {
82   throwIfInvalid();
83   core_->setCallback(std::move(func));
84 }
85
86 // unwrap
87
88 template <class T>
89 template <class F>
90 typename std::enable_if<isFuture<F>::value,
91                         Future<typename isFuture<T>::Inner>>::type
92 Future<T>::unwrap() {
93   return then([](Future<typename isFuture<T>::Inner> internal_future) {
94       return internal_future;
95   });
96 }
97
98 // then
99
100 // Variant: returns a value
101 // e.g. f.then([](Try<T>&& t){ return t.value(); });
102 template <class T>
103 template <typename F, typename R, bool isTry, typename... Args>
104 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
105 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
106   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
107   typedef typename R::ReturnsFuture::Inner B;
108
109   throwIfInvalid();
110
111   // wrap these so we can move them into the lambda
112   folly::MoveWrapper<Promise<B>> p;
113   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
114   folly::MoveWrapper<F> funcm(std::forward<F>(func));
115
116   // grab the Future now before we lose our handle on the Promise
117   auto f = p->getFuture();
118   f.core_->setExecutorNoLock(getExecutor());
119
120   /* This is a bit tricky.
121
122      We can't just close over *this in case this Future gets moved. So we
123      make a new dummy Future. We could figure out something more
124      sophisticated that avoids making a new Future object when it can, as an
125      optimization. But this is correct.
126
127      core_ can't be moved, it is explicitly disallowed (as is copying). But
128      if there's ever a reason to allow it, this is one place that makes that
129      assumption and would need to be fixed. We use a standard shared pointer
130      for core_ (by copying it in), which means in essence obj holds a shared
131      pointer to itself.  But this shouldn't leak because Promise will not
132      outlive the continuation, because Promise will setException() with a
133      broken Promise if it is destructed before completed. We could use a
134      weak pointer but it would have to be converted to a shared pointer when
135      func is executed (because the Future returned by func may possibly
136      persist beyond the callback, if it gets moved), and so it is an
137      optimization to just make it shared from the get-go.
138
139      We have to move in the Promise and func using the MoveWrapper
140      hack. (func could be copied but it's a big drag on perf).
141
142      Two subtle but important points about this design. detail::Core has no
143      back pointers to Future or Promise, so if Future or Promise get moved
144      (and they will be moved in performant code) we don't have to do
145      anything fancy. And because we store the continuation in the
146      detail::Core, not in the Future, we can execute the continuation even
147      after the Future has gone out of scope. This is an intentional design
148      decision. It is likely we will want to be able to cancel a continuation
149      in some circumstances, but I think it should be explicit not implicit
150      in the destruction of the Future used to create it.
151      */
152   setCallback_(
153     [p, funcm](Try<T>&& t) mutable {
154       if (!isTry && t.hasException()) {
155         p->setException(std::move(t.exception()));
156       } else {
157         p->setWith([&]() {
158           return (*funcm)(t.template get<isTry, Args>()...);
159         });
160       }
161     });
162
163   return f;
164 }
165
166 // Variant: returns a Future
167 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
168 template <class T>
169 template <typename F, typename R, bool isTry, typename... Args>
170 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
171 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
172   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
173   typedef typename R::ReturnsFuture::Inner B;
174
175   throwIfInvalid();
176
177   // wrap these so we can move them into the lambda
178   folly::MoveWrapper<Promise<B>> p;
179   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
180   folly::MoveWrapper<F> funcm(std::forward<F>(func));
181
182   // grab the Future now before we lose our handle on the Promise
183   auto f = p->getFuture();
184   f.core_->setExecutorNoLock(getExecutor());
185
186   setCallback_(
187     [p, funcm](Try<T>&& t) mutable {
188       if (!isTry && t.hasException()) {
189         p->setException(std::move(t.exception()));
190       } else {
191         try {
192           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
193           // that didn't throw, now we can steal p
194           f2.setCallback_([p](Try<B>&& b) mutable {
195             p->setTry(std::move(b));
196           });
197         } catch (const std::exception& e) {
198           p->setException(exception_wrapper(std::current_exception(), e));
199         } catch (...) {
200           p->setException(exception_wrapper(std::current_exception()));
201         }
202       }
203     });
204
205   return f;
206 }
207
208 template <typename T>
209 template <typename R, typename Caller, typename... Args>
210   Future<typename isFuture<R>::Inner>
211 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
212   typedef typename std::remove_cv<
213     typename std::remove_reference<
214       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
215   return then([instance, func](Try<T>&& t){
216     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
217   });
218 }
219
220 template <class T>
221 template <class Executor, class Arg, class... Args>
222 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
223   -> decltype(this->then(std::forward<Arg>(arg),
224                          std::forward<Args>(args)...))
225 {
226   auto oldX = getExecutor();
227   setExecutor(x);
228   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
229                via(oldX);
230 }
231
232 template <class T>
233 Future<Unit> Future<T>::then() {
234   return then([] () {});
235 }
236
237 // onError where the callback returns T
238 template <class T>
239 template <class F>
240 typename std::enable_if<
241   !detail::callableWith<F, exception_wrapper>::value &&
242   !detail::Extract<F>::ReturnsFuture::value,
243   Future<T>>::type
244 Future<T>::onError(F&& func) {
245   typedef typename detail::Extract<F>::FirstArg Exn;
246   static_assert(
247       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
248       "Return type of onError callback must be T or Future<T>");
249
250   Promise<T> p;
251   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
252   auto f = p.getFuture();
253   auto pm = folly::makeMoveWrapper(std::move(p));
254   auto funcm = folly::makeMoveWrapper(std::move(func));
255   setCallback_([pm, funcm](Try<T>&& t) mutable {
256     if (!t.template withException<Exn>([&] (Exn& e) {
257           pm->setWith([&]{
258             return (*funcm)(e);
259           });
260         })) {
261       pm->setTry(std::move(t));
262     }
263   });
264
265   return f;
266 }
267
268 // onError where the callback returns Future<T>
269 template <class T>
270 template <class F>
271 typename std::enable_if<
272   !detail::callableWith<F, exception_wrapper>::value &&
273   detail::Extract<F>::ReturnsFuture::value,
274   Future<T>>::type
275 Future<T>::onError(F&& func) {
276   static_assert(
277       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
278       "Return type of onError callback must be T or Future<T>");
279   typedef typename detail::Extract<F>::FirstArg Exn;
280
281   Promise<T> p;
282   auto f = p.getFuture();
283   auto pm = folly::makeMoveWrapper(std::move(p));
284   auto funcm = folly::makeMoveWrapper(std::move(func));
285   setCallback_([pm, funcm](Try<T>&& t) mutable {
286     if (!t.template withException<Exn>([&] (Exn& e) {
287           try {
288             auto f2 = (*funcm)(e);
289             f2.setCallback_([pm](Try<T>&& t2) mutable {
290               pm->setTry(std::move(t2));
291             });
292           } catch (const std::exception& e2) {
293             pm->setException(exception_wrapper(std::current_exception(), e2));
294           } catch (...) {
295             pm->setException(exception_wrapper(std::current_exception()));
296           }
297         })) {
298       pm->setTry(std::move(t));
299     }
300   });
301
302   return f;
303 }
304
305 template <class T>
306 template <class F>
307 Future<T> Future<T>::ensure(F func) {
308   MoveWrapper<F> funcw(std::move(func));
309   return this->then([funcw](Try<T>&& t) mutable {
310     (*funcw)();
311     return makeFuture(std::move(t));
312   });
313 }
314
315 template <class T>
316 template <class F>
317 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
318   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
319   return within(dur, tk)
320     .onError([funcw](TimedOut const&) { return (*funcw)(); });
321 }
322
323 template <class T>
324 template <class F>
325 typename std::enable_if<
326   detail::callableWith<F, exception_wrapper>::value &&
327   detail::Extract<F>::ReturnsFuture::value,
328   Future<T>>::type
329 Future<T>::onError(F&& func) {
330   static_assert(
331       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
332       "Return type of onError callback must be T or Future<T>");
333
334   Promise<T> p;
335   auto f = p.getFuture();
336   auto pm = folly::makeMoveWrapper(std::move(p));
337   auto funcm = folly::makeMoveWrapper(std::move(func));
338   setCallback_([pm, funcm](Try<T> t) mutable {
339     if (t.hasException()) {
340       try {
341         auto f2 = (*funcm)(std::move(t.exception()));
342         f2.setCallback_([pm](Try<T> t2) mutable {
343           pm->setTry(std::move(t2));
344         });
345       } catch (const std::exception& e2) {
346         pm->setException(exception_wrapper(std::current_exception(), e2));
347       } catch (...) {
348         pm->setException(exception_wrapper(std::current_exception()));
349       }
350     } else {
351       pm->setTry(std::move(t));
352     }
353   });
354
355   return f;
356 }
357
358 // onError(exception_wrapper) that returns T
359 template <class T>
360 template <class F>
361 typename std::enable_if<
362   detail::callableWith<F, exception_wrapper>::value &&
363   !detail::Extract<F>::ReturnsFuture::value,
364   Future<T>>::type
365 Future<T>::onError(F&& func) {
366   static_assert(
367       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
368       "Return type of onError callback must be T or Future<T>");
369
370   Promise<T> p;
371   auto f = p.getFuture();
372   auto pm = folly::makeMoveWrapper(std::move(p));
373   auto funcm = folly::makeMoveWrapper(std::move(func));
374   setCallback_([pm, funcm](Try<T> t) mutable {
375     if (t.hasException()) {
376       pm->setWith([&]{
377         return (*funcm)(std::move(t.exception()));
378       });
379     } else {
380       pm->setTry(std::move(t));
381     }
382   });
383
384   return f;
385 }
386
387 template <class T>
388 typename std::add_lvalue_reference<T>::type Future<T>::value() {
389   throwIfInvalid();
390
391   return core_->getTry().value();
392 }
393
394 template <class T>
395 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
396   throwIfInvalid();
397
398   return core_->getTry().value();
399 }
400
401 template <class T>
402 Try<T>& Future<T>::getTry() {
403   throwIfInvalid();
404
405   return core_->getTry();
406 }
407
408 template <class T>
409 Optional<Try<T>> Future<T>::poll() {
410   Optional<Try<T>> o;
411   if (core_->ready()) {
412     o = std::move(core_->getTry());
413   }
414   return o;
415 }
416
417 template <class T>
418 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
419   throwIfInvalid();
420
421   setExecutor(executor, priority);
422
423   return std::move(*this);
424 }
425
426 template <class T>
427 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
428   throwIfInvalid();
429
430   MoveWrapper<Promise<T>> p;
431   auto f = p->getFuture();
432   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
433   return std::move(f).via(executor, priority);
434 }
435
436
437 template <class Func>
438 auto via(Executor* x, Func func)
439   -> Future<typename isFuture<decltype(func())>::Inner>
440 {
441   // TODO make this actually more performant. :-P #7260175
442   return via(x).then(func);
443 }
444
445 template <class T>
446 bool Future<T>::isReady() const {
447   throwIfInvalid();
448   return core_->ready();
449 }
450
451 template <class T>
452 bool Future<T>::hasValue() {
453   return getTry().hasValue();
454 }
455
456 template <class T>
457 bool Future<T>::hasException() {
458   return getTry().hasException();
459 }
460
461 template <class T>
462 void Future<T>::raise(exception_wrapper exception) {
463   core_->raise(std::move(exception));
464 }
465
466 // makeFuture
467
468 template <class T>
469 Future<typename std::decay<T>::type> makeFuture(T&& t) {
470   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
471 }
472
473 inline // for multiple translation units
474 Future<Unit> makeFuture() {
475   return makeFuture(Unit{});
476 }
477
478 // makeFutureWith(Future<T>()) -> Future<T>
479 template <class F>
480 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
481                         typename std::result_of<F()>::type>::type
482 makeFutureWith(F&& func) {
483   using InnerType =
484       typename isFuture<typename std::result_of<F()>::type>::Inner;
485   try {
486     return func();
487   } catch (std::exception& e) {
488     return makeFuture<InnerType>(
489         exception_wrapper(std::current_exception(), e));
490   } catch (...) {
491     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
492   }
493 }
494
495 // makeFutureWith(T()) -> Future<T>
496 // makeFutureWith(void()) -> Future<Unit>
497 template <class F>
498 typename std::enable_if<
499     !(isFuture<typename std::result_of<F()>::type>::value),
500     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
501 makeFutureWith(F&& func) {
502   using LiftedResult =
503       typename Unit::Lift<typename std::result_of<F()>::type>::type;
504   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
505     return func();
506   }));
507 }
508
509 template <class T>
510 Future<T> makeFuture(std::exception_ptr const& e) {
511   return makeFuture(Try<T>(e));
512 }
513
514 template <class T>
515 Future<T> makeFuture(exception_wrapper ew) {
516   return makeFuture(Try<T>(std::move(ew)));
517 }
518
519 template <class T, class E>
520 typename std::enable_if<std::is_base_of<std::exception, E>::value,
521                         Future<T>>::type
522 makeFuture(E const& e) {
523   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
524 }
525
526 template <class T>
527 Future<T> makeFuture(Try<T>&& t) {
528   return Future<T>(new detail::Core<T>(std::move(t)));
529 }
530
531 // via
532 Future<Unit> via(Executor* executor, int8_t priority) {
533   return makeFuture().via(executor, priority);
534 }
535
536 // mapSetCallback calls func(i, Try<T>) when every future completes
537
538 template <class T, class InputIterator, class F>
539 void mapSetCallback(InputIterator first, InputIterator last, F func) {
540   for (size_t i = 0; first != last; ++first, ++i) {
541     first->setCallback_([func, i](Try<T>&& t) {
542       func(i, std::move(t));
543     });
544   }
545 }
546
547 // collectAll (variadic)
548
549 template <typename... Fs>
550 typename detail::CollectAllVariadicContext<
551   typename std::decay<Fs>::type::value_type...>::type
552 collectAll(Fs&&... fs) {
553   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
554     typename std::decay<Fs>::type::value_type...>>();
555   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
556     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
557   return ctx->p.getFuture();
558 }
559
560 // collectAll (iterator)
561
562 template <class InputIterator>
563 Future<
564   std::vector<
565   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
566 collectAll(InputIterator first, InputIterator last) {
567   typedef
568     typename std::iterator_traits<InputIterator>::value_type::value_type T;
569
570   struct CollectAllContext {
571     CollectAllContext(int n) : results(n) {}
572     ~CollectAllContext() {
573       p.setValue(std::move(results));
574     }
575     Promise<std::vector<Try<T>>> p;
576     std::vector<Try<T>> results;
577   };
578
579   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
580   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
581     ctx->results[i] = std::move(t);
582   });
583   return ctx->p.getFuture();
584 }
585
586 // collect (iterator)
587
588 namespace detail {
589
590 template <typename T>
591 struct CollectContext {
592   struct Nothing { explicit Nothing(int n) {} };
593
594   using Result = typename std::conditional<
595     std::is_void<T>::value,
596     void,
597     std::vector<T>>::type;
598
599   using InternalResult = typename std::conditional<
600     std::is_void<T>::value,
601     Nothing,
602     std::vector<Optional<T>>>::type;
603
604   explicit CollectContext(int n) : result(n) {}
605   ~CollectContext() {
606     if (!threw.exchange(true)) {
607       // map Optional<T> -> T
608       std::vector<T> finalResult;
609       finalResult.reserve(result.size());
610       std::transform(result.begin(), result.end(),
611                      std::back_inserter(finalResult),
612                      [](Optional<T>& o) { return std::move(o.value()); });
613       p.setValue(std::move(finalResult));
614     }
615   }
616   inline void setPartialResult(size_t i, Try<T>& t) {
617     result[i] = std::move(t.value());
618   }
619   Promise<Result> p;
620   InternalResult result;
621   std::atomic<bool> threw {false};
622 };
623
624 }
625
626 template <class InputIterator>
627 Future<typename detail::CollectContext<
628   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
629 collect(InputIterator first, InputIterator last) {
630   typedef
631     typename std::iterator_traits<InputIterator>::value_type::value_type T;
632
633   auto ctx = std::make_shared<detail::CollectContext<T>>(
634     std::distance(first, last));
635   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
636     if (t.hasException()) {
637        if (!ctx->threw.exchange(true)) {
638          ctx->p.setException(std::move(t.exception()));
639        }
640      } else if (!ctx->threw) {
641        ctx->setPartialResult(i, t);
642      }
643   });
644   return ctx->p.getFuture();
645 }
646
647 // collect (variadic)
648
649 template <typename... Fs>
650 typename detail::CollectVariadicContext<
651   typename std::decay<Fs>::type::value_type...>::type
652 collect(Fs&&... fs) {
653   auto ctx = std::make_shared<detail::CollectVariadicContext<
654     typename std::decay<Fs>::type::value_type...>>();
655   detail::collectVariadicHelper<detail::CollectVariadicContext>(
656     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
657   return ctx->p.getFuture();
658 }
659
660 // collectAny (iterator)
661
662 template <class InputIterator>
663 Future<
664   std::pair<size_t,
665             Try<
666               typename
667               std::iterator_traits<InputIterator>::value_type::value_type>>>
668 collectAny(InputIterator first, InputIterator last) {
669   typedef
670     typename std::iterator_traits<InputIterator>::value_type::value_type T;
671
672   struct CollectAnyContext {
673     CollectAnyContext() {};
674     Promise<std::pair<size_t, Try<T>>> p;
675     std::atomic<bool> done {false};
676   };
677
678   auto ctx = std::make_shared<CollectAnyContext>();
679   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
680     if (!ctx->done.exchange(true)) {
681       ctx->p.setValue(std::make_pair(i, std::move(t)));
682     }
683   });
684   return ctx->p.getFuture();
685 }
686
687 // collectN (iterator)
688
689 template <class InputIterator>
690 Future<std::vector<std::pair<size_t, Try<typename
691   std::iterator_traits<InputIterator>::value_type::value_type>>>>
692 collectN(InputIterator first, InputIterator last, size_t n) {
693   typedef typename
694     std::iterator_traits<InputIterator>::value_type::value_type T;
695   typedef std::vector<std::pair<size_t, Try<T>>> V;
696
697   struct CollectNContext {
698     V v;
699     std::atomic<size_t> completed = {0};
700     Promise<V> p;
701   };
702   auto ctx = std::make_shared<CollectNContext>();
703
704   if (size_t(std::distance(first, last)) < n) {
705     ctx->p.setException(std::runtime_error("Not enough futures"));
706   } else {
707     // for each completed Future, increase count and add to vector, until we
708     // have n completed futures at which point we fulfil our Promise with the
709     // vector
710     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
711       auto c = ++ctx->completed;
712       if (c <= n) {
713         assert(ctx->v.size() < n);
714         ctx->v.emplace_back(i, std::move(t));
715         if (c == n) {
716           ctx->p.setTry(Try<V>(std::move(ctx->v)));
717         }
718       }
719     });
720   }
721
722   return ctx->p.getFuture();
723 }
724
725 // reduce (iterator)
726
727 template <class It, class T, class F>
728 Future<T> reduce(It first, It last, T&& initial, F&& func) {
729   if (first == last) {
730     return makeFuture(std::move(initial));
731   }
732
733   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
734   typedef typename std::conditional<
735     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
736   typedef isTry<Arg> IsTry;
737
738   folly::MoveWrapper<T> minitial(std::move(initial));
739   auto sfunc = std::make_shared<F>(std::move(func));
740
741   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
742     return (*sfunc)(std::move(*minitial),
743                 head.template get<IsTry::value, Arg&&>());
744   });
745
746   for (++first; first != last; ++first) {
747     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
748       return (*sfunc)(std::move(std::get<0>(t).value()),
749                   // Either return a ItT&& or a Try<ItT>&& depending
750                   // on the type of the argument of func.
751                   std::get<1>(t).template get<IsTry::value, Arg&&>());
752     });
753   }
754
755   return f;
756 }
757
758 // window (collection)
759
760 template <class Collection, class F, class ItT, class Result>
761 std::vector<Future<Result>>
762 window(Collection input, F func, size_t n) {
763   struct WindowContext {
764     WindowContext(Collection&& i, F&& fn)
765         : input_(std::move(i)), promises_(input_.size()),
766           func_(std::move(fn))
767       {}
768     std::atomic<size_t> i_ {0};
769     Collection input_;
770     std::vector<Promise<Result>> promises_;
771     F func_;
772
773     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
774       size_t i = ctx->i_++;
775       if (i < ctx->input_.size()) {
776         // Using setCallback_ directly since we don't need the Future
777         ctx->func_(std::move(ctx->input_[i])).setCallback_(
778           // ctx is captured by value
779           [ctx, i](Try<Result>&& t) {
780             ctx->promises_[i].setTry(std::move(t));
781             // Chain another future onto this one
782             spawn(std::move(ctx));
783           });
784       }
785     }
786   };
787
788   auto max = std::min(n, input.size());
789
790   auto ctx = std::make_shared<WindowContext>(
791     std::move(input), std::move(func));
792
793   for (size_t i = 0; i < max; ++i) {
794     // Start the first n Futures
795     WindowContext::spawn(ctx);
796   }
797
798   std::vector<Future<Result>> futures;
799   futures.reserve(ctx->promises_.size());
800   for (auto& promise : ctx->promises_) {
801     futures.emplace_back(promise.getFuture());
802   }
803
804   return futures;
805 }
806
807 // reduce
808
809 template <class T>
810 template <class I, class F>
811 Future<I> Future<T>::reduce(I&& initial, F&& func) {
812   folly::MoveWrapper<I> minitial(std::move(initial));
813   folly::MoveWrapper<F> mfunc(std::move(func));
814   return then([minitial, mfunc](T& vals) mutable {
815     auto ret = std::move(*minitial);
816     for (auto& val : vals) {
817       ret = (*mfunc)(std::move(ret), std::move(val));
818     }
819     return ret;
820   });
821 }
822
823 // unorderedReduce (iterator)
824
825 template <class It, class T, class F, class ItT, class Arg>
826 Future<T> unorderedReduce(It first, It last, T initial, F func) {
827   if (first == last) {
828     return makeFuture(std::move(initial));
829   }
830
831   typedef isTry<Arg> IsTry;
832
833   struct UnorderedReduceContext {
834     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
835         : lock_(), memo_(makeFuture<T>(std::move(memo))),
836           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
837       {};
838     folly::MicroSpinLock lock_; // protects memo_ and numThens_
839     Future<T> memo_;
840     F func_;
841     size_t numThens_; // how many Futures completed and called .then()
842     size_t numFutures_; // how many Futures in total
843     Promise<T> promise_;
844   };
845
846   auto ctx = std::make_shared<UnorderedReduceContext>(
847     std::move(initial), std::move(func), std::distance(first, last));
848
849   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
850     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
851     // Futures can be completed in any order, simultaneously.
852     // To make this non-blocking, we create a new Future chain in
853     // the order of completion to reduce the values.
854     // The spinlock just protects chaining a new Future, not actually
855     // executing the reduce, which should be really fast.
856     folly::MSLGuard lock(ctx->lock_);
857     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
858       // Either return a ItT&& or a Try<ItT>&& depending
859       // on the type of the argument of func.
860       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
861     });
862     if (++ctx->numThens_ == ctx->numFutures_) {
863       // After reducing the value of the last Future, fulfill the Promise
864       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
865         ctx->promise_.setValue(std::move(t2));
866       });
867     }
868   });
869
870   return ctx->promise_.getFuture();
871 }
872
873 // within
874
875 template <class T>
876 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
877   return within(dur, TimedOut(), tk);
878 }
879
880 template <class T>
881 template <class E>
882 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
883
884   struct Context {
885     Context(E ex) : exception(std::move(ex)), promise() {}
886     E exception;
887     Future<Unit> thisFuture;
888     Promise<T> promise;
889     std::atomic<bool> token {false};
890   };
891
892   if (!tk) {
893     tk = folly::detail::getTimekeeperSingleton();
894   }
895
896   auto ctx = std::make_shared<Context>(std::move(e));
897
898   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
899     // TODO: "this" completed first, cancel "after"
900     if (ctx->token.exchange(true) == false) {
901       ctx->promise.setTry(std::move(t));
902     }
903   });
904
905   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
906     // "after" completed first, cancel "this"
907     ctx->thisFuture.raise(TimedOut());
908     if (ctx->token.exchange(true) == false) {
909       if (t.hasException()) {
910         ctx->promise.setException(std::move(t.exception()));
911       } else {
912         ctx->promise.setException(std::move(ctx->exception));
913       }
914     }
915   });
916
917   return ctx->promise.getFuture().via(getExecutor());
918 }
919
920 // delayed
921
922 template <class T>
923 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
924   return collectAll(*this, futures::sleep(dur, tk))
925     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
926       Try<T>& t = std::get<0>(tup);
927       return makeFuture<T>(std::move(t));
928     });
929 }
930
931 namespace detail {
932
933 template <class T>
934 void waitImpl(Future<T>& f) {
935   // short-circuit if there's nothing to do
936   if (f.isReady()) return;
937
938   folly::fibers::Baton baton;
939   f = f.then([&](Try<T> t) {
940     baton.post();
941     return makeFuture(std::move(t));
942   });
943   baton.wait();
944
945   // There's a race here between the return here and the actual finishing of
946   // the future. f is completed, but the setup may not have finished on done
947   // after the baton has posted.
948   while (!f.isReady()) {
949     std::this_thread::yield();
950   }
951 }
952
953 template <class T>
954 void waitImpl(Future<T>& f, Duration dur) {
955   // short-circuit if there's nothing to do
956   if (f.isReady()) return;
957
958   auto baton = std::make_shared<folly::fibers::Baton>();
959   f = f.then([baton](Try<T> t) {
960     baton->post();
961     return makeFuture(std::move(t));
962   });
963
964   // Let's preserve the invariant that if we did not timeout (timed_wait returns
965   // true), then the returned Future is complete when it is returned to the
966   // caller. We need to wait out the race for that Future to complete.
967   if (baton->timed_wait(dur)) {
968     while (!f.isReady()) {
969       std::this_thread::yield();
970     }
971   }
972 }
973
974 template <class T>
975 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
976   while (!f.isReady()) {
977     e->drive();
978   }
979 }
980
981 } // detail
982
983 template <class T>
984 Future<T>& Future<T>::wait() & {
985   detail::waitImpl(*this);
986   return *this;
987 }
988
989 template <class T>
990 Future<T>&& Future<T>::wait() && {
991   detail::waitImpl(*this);
992   return std::move(*this);
993 }
994
995 template <class T>
996 Future<T>& Future<T>::wait(Duration dur) & {
997   detail::waitImpl(*this, dur);
998   return *this;
999 }
1000
1001 template <class T>
1002 Future<T>&& Future<T>::wait(Duration dur) && {
1003   detail::waitImpl(*this, dur);
1004   return std::move(*this);
1005 }
1006
1007 template <class T>
1008 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1009   detail::waitViaImpl(*this, e);
1010   return *this;
1011 }
1012
1013 template <class T>
1014 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1015   detail::waitViaImpl(*this, e);
1016   return std::move(*this);
1017 }
1018
1019 template <class T>
1020 T Future<T>::get() {
1021   return std::move(wait().value());
1022 }
1023
1024 template <class T>
1025 T Future<T>::get(Duration dur) {
1026   wait(dur);
1027   if (isReady()) {
1028     return std::move(value());
1029   } else {
1030     throw TimedOut();
1031   }
1032 }
1033
1034 template <class T>
1035 T Future<T>::getVia(DrivableExecutor* e) {
1036   return std::move(waitVia(e).value());
1037 }
1038
1039 namespace detail {
1040   template <class T>
1041   struct TryEquals {
1042     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1043       return t1.value() == t2.value();
1044     }
1045   };
1046 }
1047
1048 template <class T>
1049 Future<bool> Future<T>::willEqual(Future<T>& f) {
1050   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1051     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1052       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1053     } else {
1054       return false;
1055       }
1056   });
1057 }
1058
1059 template <class T>
1060 template <class F>
1061 Future<T> Future<T>::filter(F predicate) {
1062   auto p = folly::makeMoveWrapper(std::move(predicate));
1063   return this->then([p](T val) {
1064     T const& valConstRef = val;
1065     if (!(*p)(valConstRef)) {
1066       throw PredicateDoesNotObtain();
1067     }
1068     return val;
1069   });
1070 }
1071
1072 template <class T>
1073 template <class Callback>
1074 auto Future<T>::thenMulti(Callback&& fn)
1075     -> decltype(this->then(std::forward<Callback>(fn))) {
1076   // thenMulti with one callback is just a then
1077   return then(std::forward<Callback>(fn));
1078 }
1079
1080 template <class T>
1081 template <class Callback, class... Callbacks>
1082 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1083     -> decltype(this->then(std::forward<Callback>(fn)).
1084                       thenMulti(std::forward<Callbacks>(fns)...)) {
1085   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1086   return then(std::forward<Callback>(fn)).
1087          thenMulti(std::forward<Callbacks>(fns)...);
1088 }
1089
1090 template <class T>
1091 template <class Callback, class... Callbacks>
1092 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1093                                       Callbacks&&... fns)
1094     -> decltype(this->then(std::forward<Callback>(fn)).
1095                       thenMulti(std::forward<Callbacks>(fns)...)) {
1096   // thenMultiExecutor with two callbacks is
1097   // via(x).then(a).thenMulti(b, ...).via(oldX)
1098   auto oldX = getExecutor();
1099   setExecutor(x);
1100   return then(std::forward<Callback>(fn)).
1101          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1102 }
1103
1104 template <class T>
1105 template <class Callback>
1106 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1107     -> decltype(this->then(std::forward<Callback>(fn))) {
1108   // thenMulti with one callback is just a then with an executor
1109   return then(x, std::forward<Callback>(fn));
1110 }
1111
1112 template <class F>
1113 inline Future<Unit> when(bool p, F thunk) {
1114   return p ? thunk().unit() : makeFuture();
1115 }
1116
1117 template <class P, class F>
1118 Future<Unit> whileDo(P predicate, F thunk) {
1119   if (predicate()) {
1120     return thunk().then([=] {
1121       return whileDo(predicate, thunk);
1122     });
1123   }
1124   return makeFuture();
1125 }
1126
1127 template <class F>
1128 Future<Unit> times(const int n, F thunk) {
1129   auto count = folly::makeMoveWrapper(
1130     std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
1131   );
1132   return folly::whileDo([=]() mutable {
1133       return (*count)->fetch_add(1) < n;
1134     }, thunk);
1135 }
1136
1137 namespace futures {
1138   template <class It, class F, class ItT, class Result>
1139   std::vector<Future<Result>> map(It first, It last, F func) {
1140     std::vector<Future<Result>> results;
1141     for (auto it = first; it != last; it++) {
1142       results.push_back(it->then(func));
1143     }
1144     return results;
1145   }
1146 }
1147
1148 namespace futures {
1149
1150 namespace detail {
1151
1152 struct retrying_policy_raw_tag {};
1153 struct retrying_policy_fut_tag {};
1154
1155 template <class Policy>
1156 struct retrying_policy_traits {
1157   using ew = exception_wrapper;
1158   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1159   template <class Ret>
1160   using has_op = typename std::integral_constant<bool,
1161         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1162         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1163   using is_raw = has_op<bool>;
1164   using is_fut = has_op<Future<bool>>;
1165   using tag = typename std::conditional<
1166         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1167         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1168 };
1169
1170 template <class Policy, class FF>
1171 typename std::result_of<FF(size_t)>::type
1172 retrying(size_t k, Policy&& p, FF&& ff) {
1173   using F = typename std::result_of<FF(size_t)>::type;
1174   using T = typename F::value_type;
1175   auto f = ff(k++);
1176   auto pm = makeMoveWrapper(p);
1177   auto ffm = makeMoveWrapper(ff);
1178   return f.onError([=](exception_wrapper x) mutable {
1179       auto q = (*pm)(k, x);
1180       auto xm = makeMoveWrapper(std::move(x));
1181       return q.then([=](bool r) mutable {
1182           return r
1183             ? retrying(k, pm.move(), ffm.move())
1184             : makeFuture<T>(xm.move());
1185       });
1186   });
1187 }
1188
1189 template <class Policy, class FF>
1190 typename std::result_of<FF(size_t)>::type
1191 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1192   auto pm = makeMoveWrapper(std::move(p));
1193   auto q = [=](size_t k, exception_wrapper x) {
1194     return makeFuture<bool>((*pm)(k, x));
1195   };
1196   return retrying(0, std::move(q), std::forward<FF>(ff));
1197 }
1198
1199 template <class Policy, class FF>
1200 typename std::result_of<FF(size_t)>::type
1201 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1202   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1203 }
1204
1205 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1206 template <class URNG>
1207 Duration retryingJitteredExponentialBackoffDur(
1208     size_t n,
1209     Duration backoff_min,
1210     Duration backoff_max,
1211     double jitter_param,
1212     URNG& rng) {
1213   using d = Duration;
1214   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1215   auto jitter = std::exp(dist(rng));
1216   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1217   return std::max(backoff_min, std::min(backoff_max, backoff));
1218 }
1219
1220 template <class Policy, class URNG>
1221 std::function<Future<bool>(size_t, const exception_wrapper&)>
1222 retryingPolicyCappedJitteredExponentialBackoff(
1223     size_t max_tries,
1224     Duration backoff_min,
1225     Duration backoff_max,
1226     double jitter_param,
1227     URNG rng,
1228     Policy&& p) {
1229   auto pm = makeMoveWrapper(std::move(p));
1230   auto rngp = std::make_shared<URNG>(std::move(rng));
1231   return [=](size_t n, const exception_wrapper& ex) mutable {
1232     if (n == max_tries) { return makeFuture(false); }
1233     return (*pm)(n, ex).then([=](bool v) {
1234         if (!v) { return makeFuture(false); }
1235         auto backoff = detail::retryingJitteredExponentialBackoffDur(
1236             n, backoff_min, backoff_max, jitter_param, *rngp);
1237         return futures::sleep(backoff).then([] { return true; });
1238     });
1239   };
1240 }
1241
1242 template <class Policy, class URNG>
1243 std::function<Future<bool>(size_t, const exception_wrapper&)>
1244 retryingPolicyCappedJitteredExponentialBackoff(
1245     size_t max_tries,
1246     Duration backoff_min,
1247     Duration backoff_max,
1248     double jitter_param,
1249     URNG rng,
1250     Policy&& p,
1251     retrying_policy_raw_tag) {
1252   auto pm = makeMoveWrapper(std::move(p));
1253   auto q = [=](size_t n, const exception_wrapper& e) {
1254     return makeFuture((*pm)(n, e));
1255   };
1256   return retryingPolicyCappedJitteredExponentialBackoff(
1257       max_tries,
1258       backoff_min,
1259       backoff_max,
1260       jitter_param,
1261       std::move(rng),
1262       std::move(q));
1263 }
1264
1265 template <class Policy, class URNG>
1266 std::function<Future<bool>(size_t, const exception_wrapper&)>
1267 retryingPolicyCappedJitteredExponentialBackoff(
1268     size_t max_tries,
1269     Duration backoff_min,
1270     Duration backoff_max,
1271     double jitter_param,
1272     URNG rng,
1273     Policy&& p,
1274     retrying_policy_fut_tag) {
1275   return retryingPolicyCappedJitteredExponentialBackoff(
1276       max_tries,
1277       backoff_min,
1278       backoff_max,
1279       jitter_param,
1280       std::move(rng),
1281       std::move(p));
1282 }
1283
1284 }
1285
1286 template <class Policy, class FF>
1287 typename std::result_of<FF(size_t)>::type
1288 retrying(Policy&& p, FF&& ff) {
1289   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1290   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1291 }
1292
1293 inline
1294 std::function<bool(size_t, const exception_wrapper&)>
1295 retryingPolicyBasic(
1296     size_t max_tries) {
1297   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1298 }
1299
1300 template <class Policy, class URNG>
1301 std::function<Future<bool>(size_t, const exception_wrapper&)>
1302 retryingPolicyCappedJitteredExponentialBackoff(
1303     size_t max_tries,
1304     Duration backoff_min,
1305     Duration backoff_max,
1306     double jitter_param,
1307     URNG rng,
1308     Policy&& p) {
1309   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1310   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1311       max_tries,
1312       backoff_min,
1313       backoff_max,
1314       jitter_param,
1315       std::move(rng),
1316       std::move(p),
1317       tag());
1318 }
1319
1320 inline
1321 std::function<Future<bool>(size_t, const exception_wrapper&)>
1322 retryingPolicyCappedJitteredExponentialBackoff(
1323     size_t max_tries,
1324     Duration backoff_min,
1325     Duration backoff_max,
1326     double jitter_param) {
1327   auto p = [](size_t, const exception_wrapper&) { return true; };
1328   return retryingPolicyCappedJitteredExponentialBackoff(
1329       max_tries,
1330       backoff_min,
1331       backoff_max,
1332       jitter_param,
1333       ThreadLocalPRNG(),
1334       std::move(p));
1335 }
1336
1337 }
1338
1339 // Instantiate the most common Future types to save compile time
1340 extern template class Future<Unit>;
1341 extern template class Future<bool>;
1342 extern template class Future<int>;
1343 extern template class Future<int64_t>;
1344 extern template class Future<std::string>;
1345 extern template class Future<double>;
1346
1347 } // namespace folly