Avoid deprecated Singleton<T>::get() in folly/futures
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/experimental/fibers/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 namespace folly {
32
33 class Timekeeper;
34
35 namespace detail {
36   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
37 }
38
39 template <class T>
40 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
41   other.core_ = nullptr;
42 }
43
44 template <class T>
45 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
46   std::swap(core_, other.core_);
47   return *this;
48 }
49
50 template <class T>
51 template <class T2, typename>
52 Future<T>::Future(T2&& val)
53   : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
54
55 template <class T>
56 template <typename, typename>
57 Future<T>::Future()
58   : core_(new detail::Core<T>(Try<T>(T()))) {}
59
60 template <class T>
61 Future<T>::~Future() {
62   detach();
63 }
64
65 template <class T>
66 void Future<T>::detach() {
67   if (core_) {
68     core_->detachFuture();
69     core_ = nullptr;
70   }
71 }
72
73 template <class T>
74 void Future<T>::throwIfInvalid() const {
75   if (!core_)
76     throw NoState();
77 }
78
79 template <class T>
80 template <class F>
81 void Future<T>::setCallback_(F&& func) {
82   throwIfInvalid();
83   core_->setCallback(std::move(func));
84 }
85
86 // unwrap
87
88 template <class T>
89 template <class F>
90 typename std::enable_if<isFuture<F>::value,
91                         Future<typename isFuture<T>::Inner>>::type
92 Future<T>::unwrap() {
93   return then([](Future<typename isFuture<T>::Inner> internal_future) {
94       return internal_future;
95   });
96 }
97
98 // then
99
100 // Variant: returns a value
101 // e.g. f.then([](Try<T>&& t){ return t.value(); });
102 template <class T>
103 template <typename F, typename R, bool isTry, typename... Args>
104 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
105 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
106   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
107   typedef typename R::ReturnsFuture::Inner B;
108
109   throwIfInvalid();
110
111   // wrap these so we can move them into the lambda
112   folly::MoveWrapper<Promise<B>> p;
113   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
114   folly::MoveWrapper<F> funcm(std::forward<F>(func));
115
116   // grab the Future now before we lose our handle on the Promise
117   auto f = p->getFuture();
118   f.core_->setExecutorNoLock(getExecutor());
119
120   /* This is a bit tricky.
121
122      We can't just close over *this in case this Future gets moved. So we
123      make a new dummy Future. We could figure out something more
124      sophisticated that avoids making a new Future object when it can, as an
125      optimization. But this is correct.
126
127      core_ can't be moved, it is explicitly disallowed (as is copying). But
128      if there's ever a reason to allow it, this is one place that makes that
129      assumption and would need to be fixed. We use a standard shared pointer
130      for core_ (by copying it in), which means in essence obj holds a shared
131      pointer to itself.  But this shouldn't leak because Promise will not
132      outlive the continuation, because Promise will setException() with a
133      broken Promise if it is destructed before completed. We could use a
134      weak pointer but it would have to be converted to a shared pointer when
135      func is executed (because the Future returned by func may possibly
136      persist beyond the callback, if it gets moved), and so it is an
137      optimization to just make it shared from the get-go.
138
139      We have to move in the Promise and func using the MoveWrapper
140      hack. (func could be copied but it's a big drag on perf).
141
142      Two subtle but important points about this design. detail::Core has no
143      back pointers to Future or Promise, so if Future or Promise get moved
144      (and they will be moved in performant code) we don't have to do
145      anything fancy. And because we store the continuation in the
146      detail::Core, not in the Future, we can execute the continuation even
147      after the Future has gone out of scope. This is an intentional design
148      decision. It is likely we will want to be able to cancel a continuation
149      in some circumstances, but I think it should be explicit not implicit
150      in the destruction of the Future used to create it.
151      */
152   setCallback_(
153     [p, funcm](Try<T>&& t) mutable {
154       if (!isTry && t.hasException()) {
155         p->setException(std::move(t.exception()));
156       } else {
157         p->setWith([&]() {
158           return (*funcm)(t.template get<isTry, Args>()...);
159         });
160       }
161     });
162
163   return f;
164 }
165
166 // Variant: returns a Future
167 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
168 template <class T>
169 template <typename F, typename R, bool isTry, typename... Args>
170 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
171 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
172   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
173   typedef typename R::ReturnsFuture::Inner B;
174
175   throwIfInvalid();
176
177   // wrap these so we can move them into the lambda
178   folly::MoveWrapper<Promise<B>> p;
179   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
180   folly::MoveWrapper<F> funcm(std::forward<F>(func));
181
182   // grab the Future now before we lose our handle on the Promise
183   auto f = p->getFuture();
184   f.core_->setExecutorNoLock(getExecutor());
185
186   setCallback_(
187     [p, funcm](Try<T>&& t) mutable {
188       if (!isTry && t.hasException()) {
189         p->setException(std::move(t.exception()));
190       } else {
191         try {
192           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
193           // that didn't throw, now we can steal p
194           f2.setCallback_([p](Try<B>&& b) mutable {
195             p->setTry(std::move(b));
196           });
197         } catch (const std::exception& e) {
198           p->setException(exception_wrapper(std::current_exception(), e));
199         } catch (...) {
200           p->setException(exception_wrapper(std::current_exception()));
201         }
202       }
203     });
204
205   return f;
206 }
207
208 template <typename T>
209 template <typename R, typename Caller, typename... Args>
210   Future<typename isFuture<R>::Inner>
211 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
212   typedef typename std::remove_cv<
213     typename std::remove_reference<
214       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
215   return then([instance, func](Try<T>&& t){
216     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
217   });
218 }
219
220 template <class T>
221 template <class Executor, class Arg, class... Args>
222 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
223   -> decltype(this->then(std::forward<Arg>(arg),
224                          std::forward<Args>(args)...))
225 {
226   auto oldX = getExecutor();
227   setExecutor(x);
228   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
229                via(oldX);
230 }
231
232 template <class T>
233 Future<Unit> Future<T>::then() {
234   return then([] () {});
235 }
236
237 // onError where the callback returns T
238 template <class T>
239 template <class F>
240 typename std::enable_if<
241   !detail::callableWith<F, exception_wrapper>::value &&
242   !detail::Extract<F>::ReturnsFuture::value,
243   Future<T>>::type
244 Future<T>::onError(F&& func) {
245   typedef typename detail::Extract<F>::FirstArg Exn;
246   static_assert(
247       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
248       "Return type of onError callback must be T or Future<T>");
249
250   Promise<T> p;
251   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
252   auto f = p.getFuture();
253   auto pm = folly::makeMoveWrapper(std::move(p));
254   auto funcm = folly::makeMoveWrapper(std::move(func));
255   setCallback_([pm, funcm](Try<T>&& t) mutable {
256     if (!t.template withException<Exn>([&] (Exn& e) {
257           pm->setWith([&]{
258             return (*funcm)(e);
259           });
260         })) {
261       pm->setTry(std::move(t));
262     }
263   });
264
265   return f;
266 }
267
268 // onError where the callback returns Future<T>
269 template <class T>
270 template <class F>
271 typename std::enable_if<
272   !detail::callableWith<F, exception_wrapper>::value &&
273   detail::Extract<F>::ReturnsFuture::value,
274   Future<T>>::type
275 Future<T>::onError(F&& func) {
276   static_assert(
277       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
278       "Return type of onError callback must be T or Future<T>");
279   typedef typename detail::Extract<F>::FirstArg Exn;
280
281   Promise<T> p;
282   auto f = p.getFuture();
283   auto pm = folly::makeMoveWrapper(std::move(p));
284   auto funcm = folly::makeMoveWrapper(std::move(func));
285   setCallback_([pm, funcm](Try<T>&& t) mutable {
286     if (!t.template withException<Exn>([&] (Exn& e) {
287           try {
288             auto f2 = (*funcm)(e);
289             f2.setCallback_([pm](Try<T>&& t2) mutable {
290               pm->setTry(std::move(t2));
291             });
292           } catch (const std::exception& e2) {
293             pm->setException(exception_wrapper(std::current_exception(), e2));
294           } catch (...) {
295             pm->setException(exception_wrapper(std::current_exception()));
296           }
297         })) {
298       pm->setTry(std::move(t));
299     }
300   });
301
302   return f;
303 }
304
305 template <class T>
306 template <class F>
307 Future<T> Future<T>::ensure(F func) {
308   MoveWrapper<F> funcw(std::move(func));
309   return this->then([funcw](Try<T>&& t) mutable {
310     (*funcw)();
311     return makeFuture(std::move(t));
312   });
313 }
314
315 template <class T>
316 template <class F>
317 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
318   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
319   return within(dur, tk)
320     .onError([funcw](TimedOut const&) { return (*funcw)(); });
321 }
322
323 template <class T>
324 template <class F>
325 typename std::enable_if<
326   detail::callableWith<F, exception_wrapper>::value &&
327   detail::Extract<F>::ReturnsFuture::value,
328   Future<T>>::type
329 Future<T>::onError(F&& func) {
330   static_assert(
331       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
332       "Return type of onError callback must be T or Future<T>");
333
334   Promise<T> p;
335   auto f = p.getFuture();
336   auto pm = folly::makeMoveWrapper(std::move(p));
337   auto funcm = folly::makeMoveWrapper(std::move(func));
338   setCallback_([pm, funcm](Try<T> t) mutable {
339     if (t.hasException()) {
340       try {
341         auto f2 = (*funcm)(std::move(t.exception()));
342         f2.setCallback_([pm](Try<T> t2) mutable {
343           pm->setTry(std::move(t2));
344         });
345       } catch (const std::exception& e2) {
346         pm->setException(exception_wrapper(std::current_exception(), e2));
347       } catch (...) {
348         pm->setException(exception_wrapper(std::current_exception()));
349       }
350     } else {
351       pm->setTry(std::move(t));
352     }
353   });
354
355   return f;
356 }
357
358 // onError(exception_wrapper) that returns T
359 template <class T>
360 template <class F>
361 typename std::enable_if<
362   detail::callableWith<F, exception_wrapper>::value &&
363   !detail::Extract<F>::ReturnsFuture::value,
364   Future<T>>::type
365 Future<T>::onError(F&& func) {
366   static_assert(
367       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
368       "Return type of onError callback must be T or Future<T>");
369
370   Promise<T> p;
371   auto f = p.getFuture();
372   auto pm = folly::makeMoveWrapper(std::move(p));
373   auto funcm = folly::makeMoveWrapper(std::move(func));
374   setCallback_([pm, funcm](Try<T> t) mutable {
375     if (t.hasException()) {
376       pm->setWith([&]{
377         return (*funcm)(std::move(t.exception()));
378       });
379     } else {
380       pm->setTry(std::move(t));
381     }
382   });
383
384   return f;
385 }
386
387 template <class T>
388 typename std::add_lvalue_reference<T>::type Future<T>::value() {
389   throwIfInvalid();
390
391   return core_->getTry().value();
392 }
393
394 template <class T>
395 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
396   throwIfInvalid();
397
398   return core_->getTry().value();
399 }
400
401 template <class T>
402 Try<T>& Future<T>::getTry() {
403   throwIfInvalid();
404
405   return core_->getTry();
406 }
407
408 template <class T>
409 Optional<Try<T>> Future<T>::poll() {
410   Optional<Try<T>> o;
411   if (core_->ready()) {
412     o = std::move(core_->getTry());
413   }
414   return o;
415 }
416
417 template <class T>
418 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
419   throwIfInvalid();
420
421   setExecutor(executor, priority);
422
423   return std::move(*this);
424 }
425
426 template <class T>
427 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
428   throwIfInvalid();
429
430   MoveWrapper<Promise<T>> p;
431   auto f = p->getFuture();
432   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
433   return std::move(f).via(executor, priority);
434 }
435
436
437 template <class Func>
438 auto via(Executor* x, Func func)
439   -> Future<typename isFuture<decltype(func())>::Inner>
440 {
441   // TODO make this actually more performant. :-P #7260175
442   return via(x).then(func);
443 }
444
445 template <class T>
446 bool Future<T>::isReady() const {
447   throwIfInvalid();
448   return core_->ready();
449 }
450
451 template <class T>
452 bool Future<T>::hasValue() {
453   return getTry().hasValue();
454 }
455
456 template <class T>
457 bool Future<T>::hasException() {
458   return getTry().hasException();
459 }
460
461 template <class T>
462 void Future<T>::raise(exception_wrapper exception) {
463   core_->raise(std::move(exception));
464 }
465
466 // makeFuture
467
468 template <class T>
469 Future<typename std::decay<T>::type> makeFuture(T&& t) {
470   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
471 }
472
473 inline // for multiple translation units
474 Future<Unit> makeFuture() {
475   return makeFuture(Unit{});
476 }
477
478 // makeFutureWith(Future<T>()) -> Future<T>
479 template <class F>
480 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
481                         typename std::result_of<F()>::type>::type
482 makeFutureWith(F&& func) {
483   using InnerType =
484       typename isFuture<typename std::result_of<F()>::type>::Inner;
485   try {
486     return func();
487   } catch (std::exception& e) {
488     return makeFuture<InnerType>(
489         exception_wrapper(std::current_exception(), e));
490   } catch (...) {
491     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
492   }
493 }
494
495 // makeFutureWith(T()) -> Future<T>
496 // makeFutureWith(void()) -> Future<Unit>
497 template <class F>
498 typename std::enable_if<
499     !(isFuture<typename std::result_of<F()>::type>::value),
500     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
501 makeFutureWith(F&& func) {
502   using LiftedResult =
503       typename Unit::Lift<typename std::result_of<F()>::type>::type;
504   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
505     return func();
506   }));
507 }
508
509 template <class T>
510 Future<T> makeFuture(std::exception_ptr const& e) {
511   return makeFuture(Try<T>(e));
512 }
513
514 template <class T>
515 Future<T> makeFuture(exception_wrapper ew) {
516   return makeFuture(Try<T>(std::move(ew)));
517 }
518
519 template <class T, class E>
520 typename std::enable_if<std::is_base_of<std::exception, E>::value,
521                         Future<T>>::type
522 makeFuture(E const& e) {
523   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
524 }
525
526 template <class T>
527 Future<T> makeFuture(Try<T>&& t) {
528   return Future<T>(new detail::Core<T>(std::move(t)));
529 }
530
531 // via
532 Future<Unit> via(Executor* executor, int8_t priority) {
533   return makeFuture().via(executor, priority);
534 }
535
536 // mapSetCallback calls func(i, Try<T>) when every future completes
537
538 template <class T, class InputIterator, class F>
539 void mapSetCallback(InputIterator first, InputIterator last, F func) {
540   for (size_t i = 0; first != last; ++first, ++i) {
541     first->setCallback_([func, i](Try<T>&& t) {
542       func(i, std::move(t));
543     });
544   }
545 }
546
547 // collectAll (variadic)
548
549 template <typename... Fs>
550 typename detail::CollectAllVariadicContext<
551   typename std::decay<Fs>::type::value_type...>::type
552 collectAll(Fs&&... fs) {
553   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
554     typename std::decay<Fs>::type::value_type...>>();
555   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
556     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
557   return ctx->p.getFuture();
558 }
559
560 // collectAll (iterator)
561
562 template <class InputIterator>
563 Future<
564   std::vector<
565   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
566 collectAll(InputIterator first, InputIterator last) {
567   typedef
568     typename std::iterator_traits<InputIterator>::value_type::value_type T;
569
570   struct CollectAllContext {
571     CollectAllContext(int n) : results(n) {}
572     ~CollectAllContext() {
573       p.setValue(std::move(results));
574     }
575     Promise<std::vector<Try<T>>> p;
576     std::vector<Try<T>> results;
577   };
578
579   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
580   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
581     ctx->results[i] = std::move(t);
582   });
583   return ctx->p.getFuture();
584 }
585
586 // collect (iterator)
587
588 namespace detail {
589
590 template <typename T>
591 struct CollectContext {
592   struct Nothing { explicit Nothing(int n) {} };
593
594   using Result = typename std::conditional<
595     std::is_void<T>::value,
596     void,
597     std::vector<T>>::type;
598
599   using InternalResult = typename std::conditional<
600     std::is_void<T>::value,
601     Nothing,
602     std::vector<Optional<T>>>::type;
603
604   explicit CollectContext(int n) : result(n) {}
605   ~CollectContext() {
606     if (!threw.exchange(true)) {
607       // map Optional<T> -> T
608       std::vector<T> finalResult;
609       finalResult.reserve(result.size());
610       std::transform(result.begin(), result.end(),
611                      std::back_inserter(finalResult),
612                      [](Optional<T>& o) { return std::move(o.value()); });
613       p.setValue(std::move(finalResult));
614     }
615   }
616   inline void setPartialResult(size_t i, Try<T>& t) {
617     result[i] = std::move(t.value());
618   }
619   Promise<Result> p;
620   InternalResult result;
621   std::atomic<bool> threw {false};
622 };
623
624 }
625
626 template <class InputIterator>
627 Future<typename detail::CollectContext<
628   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
629 collect(InputIterator first, InputIterator last) {
630   typedef
631     typename std::iterator_traits<InputIterator>::value_type::value_type T;
632
633   auto ctx = std::make_shared<detail::CollectContext<T>>(
634     std::distance(first, last));
635   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
636     if (t.hasException()) {
637        if (!ctx->threw.exchange(true)) {
638          ctx->p.setException(std::move(t.exception()));
639        }
640      } else if (!ctx->threw) {
641        ctx->setPartialResult(i, t);
642      }
643   });
644   return ctx->p.getFuture();
645 }
646
647 // collect (variadic)
648
649 template <typename... Fs>
650 typename detail::CollectVariadicContext<
651   typename std::decay<Fs>::type::value_type...>::type
652 collect(Fs&&... fs) {
653   auto ctx = std::make_shared<detail::CollectVariadicContext<
654     typename std::decay<Fs>::type::value_type...>>();
655   detail::collectVariadicHelper<detail::CollectVariadicContext>(
656     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
657   return ctx->p.getFuture();
658 }
659
660 // collectAny (iterator)
661
662 template <class InputIterator>
663 Future<
664   std::pair<size_t,
665             Try<
666               typename
667               std::iterator_traits<InputIterator>::value_type::value_type>>>
668 collectAny(InputIterator first, InputIterator last) {
669   typedef
670     typename std::iterator_traits<InputIterator>::value_type::value_type T;
671
672   struct CollectAnyContext {
673     CollectAnyContext() {};
674     Promise<std::pair<size_t, Try<T>>> p;
675     std::atomic<bool> done {false};
676   };
677
678   auto ctx = std::make_shared<CollectAnyContext>();
679   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
680     if (!ctx->done.exchange(true)) {
681       ctx->p.setValue(std::make_pair(i, std::move(t)));
682     }
683   });
684   return ctx->p.getFuture();
685 }
686
687 // collectN (iterator)
688
689 template <class InputIterator>
690 Future<std::vector<std::pair<size_t, Try<typename
691   std::iterator_traits<InputIterator>::value_type::value_type>>>>
692 collectN(InputIterator first, InputIterator last, size_t n) {
693   typedef typename
694     std::iterator_traits<InputIterator>::value_type::value_type T;
695   typedef std::vector<std::pair<size_t, Try<T>>> V;
696
697   struct CollectNContext {
698     V v;
699     std::atomic<size_t> completed = {0};
700     Promise<V> p;
701   };
702   auto ctx = std::make_shared<CollectNContext>();
703
704   if (size_t(std::distance(first, last)) < n) {
705     ctx->p.setException(std::runtime_error("Not enough futures"));
706   } else {
707     // for each completed Future, increase count and add to vector, until we
708     // have n completed futures at which point we fulfil our Promise with the
709     // vector
710     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
711       auto c = ++ctx->completed;
712       if (c <= n) {
713         assert(ctx->v.size() < n);
714         ctx->v.emplace_back(i, std::move(t));
715         if (c == n) {
716           ctx->p.setTry(Try<V>(std::move(ctx->v)));
717         }
718       }
719     });
720   }
721
722   return ctx->p.getFuture();
723 }
724
725 // reduce (iterator)
726
727 template <class It, class T, class F>
728 Future<T> reduce(It first, It last, T&& initial, F&& func) {
729   if (first == last) {
730     return makeFuture(std::move(initial));
731   }
732
733   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
734   typedef typename std::conditional<
735     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
736   typedef isTry<Arg> IsTry;
737
738   folly::MoveWrapper<T> minitial(std::move(initial));
739   auto sfunc = std::make_shared<F>(std::move(func));
740
741   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
742     return (*sfunc)(std::move(*minitial),
743                 head.template get<IsTry::value, Arg&&>());
744   });
745
746   for (++first; first != last; ++first) {
747     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
748       return (*sfunc)(std::move(std::get<0>(t).value()),
749                   // Either return a ItT&& or a Try<ItT>&& depending
750                   // on the type of the argument of func.
751                   std::get<1>(t).template get<IsTry::value, Arg&&>());
752     });
753   }
754
755   return f;
756 }
757
758 // window (collection)
759
760 template <class Collection, class F, class ItT, class Result>
761 std::vector<Future<Result>>
762 window(Collection input, F func, size_t n) {
763   struct WindowContext {
764     WindowContext(Collection&& i, F&& fn)
765         : input_(std::move(i)), promises_(input_.size()),
766           func_(std::move(fn))
767       {}
768     std::atomic<size_t> i_ {0};
769     Collection input_;
770     std::vector<Promise<Result>> promises_;
771     F func_;
772
773     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
774       size_t i = ctx->i_++;
775       if (i < ctx->input_.size()) {
776         // Using setCallback_ directly since we don't need the Future
777         ctx->func_(std::move(ctx->input_[i])).setCallback_(
778           // ctx is captured by value
779           [ctx, i](Try<Result>&& t) {
780             ctx->promises_[i].setTry(std::move(t));
781             // Chain another future onto this one
782             spawn(std::move(ctx));
783           });
784       }
785     }
786   };
787
788   auto max = std::min(n, input.size());
789
790   auto ctx = std::make_shared<WindowContext>(
791     std::move(input), std::move(func));
792
793   for (size_t i = 0; i < max; ++i) {
794     // Start the first n Futures
795     WindowContext::spawn(ctx);
796   }
797
798   std::vector<Future<Result>> futures;
799   futures.reserve(ctx->promises_.size());
800   for (auto& promise : ctx->promises_) {
801     futures.emplace_back(promise.getFuture());
802   }
803
804   return futures;
805 }
806
807 // reduce
808
809 template <class T>
810 template <class I, class F>
811 Future<I> Future<T>::reduce(I&& initial, F&& func) {
812   folly::MoveWrapper<I> minitial(std::move(initial));
813   folly::MoveWrapper<F> mfunc(std::move(func));
814   return then([minitial, mfunc](T& vals) mutable {
815     auto ret = std::move(*minitial);
816     for (auto& val : vals) {
817       ret = (*mfunc)(std::move(ret), std::move(val));
818     }
819     return ret;
820   });
821 }
822
823 // unorderedReduce (iterator)
824
825 template <class It, class T, class F, class ItT, class Arg>
826 Future<T> unorderedReduce(It first, It last, T initial, F func) {
827   if (first == last) {
828     return makeFuture(std::move(initial));
829   }
830
831   typedef isTry<Arg> IsTry;
832
833   struct UnorderedReduceContext {
834     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
835         : lock_(), memo_(makeFuture<T>(std::move(memo))),
836           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
837       {};
838     folly::MicroSpinLock lock_; // protects memo_ and numThens_
839     Future<T> memo_;
840     F func_;
841     size_t numThens_; // how many Futures completed and called .then()
842     size_t numFutures_; // how many Futures in total
843     Promise<T> promise_;
844   };
845
846   auto ctx = std::make_shared<UnorderedReduceContext>(
847     std::move(initial), std::move(func), std::distance(first, last));
848
849   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
850     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
851     // Futures can be completed in any order, simultaneously.
852     // To make this non-blocking, we create a new Future chain in
853     // the order of completion to reduce the values.
854     // The spinlock just protects chaining a new Future, not actually
855     // executing the reduce, which should be really fast.
856     folly::MSLGuard lock(ctx->lock_);
857     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
858       // Either return a ItT&& or a Try<ItT>&& depending
859       // on the type of the argument of func.
860       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
861     });
862     if (++ctx->numThens_ == ctx->numFutures_) {
863       // After reducing the value of the last Future, fulfill the Promise
864       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
865         ctx->promise_.setValue(std::move(t2));
866       });
867     }
868   });
869
870   return ctx->promise_.getFuture();
871 }
872
873 // within
874
875 template <class T>
876 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
877   return within(dur, TimedOut(), tk);
878 }
879
880 template <class T>
881 template <class E>
882 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
883
884   struct Context {
885     Context(E ex) : exception(std::move(ex)), promise() {}
886     E exception;
887     Future<Unit> thisFuture;
888     Promise<T> promise;
889     std::atomic<bool> token {false};
890   };
891
892   std::shared_ptr<Timekeeper> tks;
893   if (!tk) {
894     tks = folly::detail::getTimekeeperSingleton();
895     tk = DCHECK_NOTNULL(tks.get());
896   }
897
898   auto ctx = std::make_shared<Context>(std::move(e));
899
900   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
901     // TODO: "this" completed first, cancel "after"
902     if (ctx->token.exchange(true) == false) {
903       ctx->promise.setTry(std::move(t));
904     }
905   });
906
907   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
908     // "after" completed first, cancel "this"
909     ctx->thisFuture.raise(TimedOut());
910     if (ctx->token.exchange(true) == false) {
911       if (t.hasException()) {
912         ctx->promise.setException(std::move(t.exception()));
913       } else {
914         ctx->promise.setException(std::move(ctx->exception));
915       }
916     }
917   });
918
919   return ctx->promise.getFuture().via(getExecutor());
920 }
921
922 // delayed
923
924 template <class T>
925 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
926   return collectAll(*this, futures::sleep(dur, tk))
927     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
928       Try<T>& t = std::get<0>(tup);
929       return makeFuture<T>(std::move(t));
930     });
931 }
932
933 namespace detail {
934
935 template <class T>
936 void waitImpl(Future<T>& f) {
937   // short-circuit if there's nothing to do
938   if (f.isReady()) return;
939
940   folly::fibers::Baton baton;
941   f.setCallback_([&](const Try<T>& t) { baton.post(); });
942   baton.wait();
943   assert(f.isReady());
944 }
945
946 template <class T>
947 void waitImpl(Future<T>& f, Duration dur) {
948   // short-circuit if there's nothing to do
949   if (f.isReady()) return;
950
951   folly::MoveWrapper<Promise<T>> promise;
952   auto ret = promise->getFuture();
953   auto baton = std::make_shared<folly::fibers::Baton>();
954   f.setCallback_([baton, promise](Try<T>&& t) mutable {
955     promise->setTry(std::move(t));
956     baton->post();
957   });
958   f = std::move(ret);
959   if (baton->timed_wait(dur)) {
960     assert(f.isReady());
961   }
962 }
963
964 template <class T>
965 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
966   while (!f.isReady()) {
967     e->drive();
968   }
969 }
970
971 } // detail
972
973 template <class T>
974 Future<T>& Future<T>::wait() & {
975   detail::waitImpl(*this);
976   return *this;
977 }
978
979 template <class T>
980 Future<T>&& Future<T>::wait() && {
981   detail::waitImpl(*this);
982   return std::move(*this);
983 }
984
985 template <class T>
986 Future<T>& Future<T>::wait(Duration dur) & {
987   detail::waitImpl(*this, dur);
988   return *this;
989 }
990
991 template <class T>
992 Future<T>&& Future<T>::wait(Duration dur) && {
993   detail::waitImpl(*this, dur);
994   return std::move(*this);
995 }
996
997 template <class T>
998 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
999   detail::waitViaImpl(*this, e);
1000   return *this;
1001 }
1002
1003 template <class T>
1004 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1005   detail::waitViaImpl(*this, e);
1006   return std::move(*this);
1007 }
1008
1009 template <class T>
1010 T Future<T>::get() {
1011   return std::move(wait().value());
1012 }
1013
1014 template <class T>
1015 T Future<T>::get(Duration dur) {
1016   wait(dur);
1017   if (isReady()) {
1018     return std::move(value());
1019   } else {
1020     throw TimedOut();
1021   }
1022 }
1023
1024 template <class T>
1025 T Future<T>::getVia(DrivableExecutor* e) {
1026   return std::move(waitVia(e).value());
1027 }
1028
1029 namespace detail {
1030   template <class T>
1031   struct TryEquals {
1032     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1033       return t1.value() == t2.value();
1034     }
1035   };
1036 }
1037
1038 template <class T>
1039 Future<bool> Future<T>::willEqual(Future<T>& f) {
1040   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1041     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1042       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1043     } else {
1044       return false;
1045       }
1046   });
1047 }
1048
1049 template <class T>
1050 template <class F>
1051 Future<T> Future<T>::filter(F predicate) {
1052   auto p = folly::makeMoveWrapper(std::move(predicate));
1053   return this->then([p](T val) {
1054     T const& valConstRef = val;
1055     if (!(*p)(valConstRef)) {
1056       throw PredicateDoesNotObtain();
1057     }
1058     return val;
1059   });
1060 }
1061
1062 template <class T>
1063 template <class Callback>
1064 auto Future<T>::thenMulti(Callback&& fn)
1065     -> decltype(this->then(std::forward<Callback>(fn))) {
1066   // thenMulti with one callback is just a then
1067   return then(std::forward<Callback>(fn));
1068 }
1069
1070 template <class T>
1071 template <class Callback, class... Callbacks>
1072 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1073     -> decltype(this->then(std::forward<Callback>(fn)).
1074                       thenMulti(std::forward<Callbacks>(fns)...)) {
1075   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1076   return then(std::forward<Callback>(fn)).
1077          thenMulti(std::forward<Callbacks>(fns)...);
1078 }
1079
1080 template <class T>
1081 template <class Callback, class... Callbacks>
1082 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1083                                       Callbacks&&... fns)
1084     -> decltype(this->then(std::forward<Callback>(fn)).
1085                       thenMulti(std::forward<Callbacks>(fns)...)) {
1086   // thenMultiExecutor with two callbacks is
1087   // via(x).then(a).thenMulti(b, ...).via(oldX)
1088   auto oldX = getExecutor();
1089   setExecutor(x);
1090   return then(std::forward<Callback>(fn)).
1091          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1092 }
1093
1094 template <class T>
1095 template <class Callback>
1096 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1097     -> decltype(this->then(std::forward<Callback>(fn))) {
1098   // thenMulti with one callback is just a then with an executor
1099   return then(x, std::forward<Callback>(fn));
1100 }
1101
1102 template <class F>
1103 inline Future<Unit> when(bool p, F thunk) {
1104   return p ? thunk().unit() : makeFuture();
1105 }
1106
1107 template <class P, class F>
1108 Future<Unit> whileDo(P predicate, F thunk) {
1109   if (predicate()) {
1110     return thunk().then([=] {
1111       return whileDo(predicate, thunk);
1112     });
1113   }
1114   return makeFuture();
1115 }
1116
1117 template <class F>
1118 Future<Unit> times(const int n, F thunk) {
1119   auto count = folly::makeMoveWrapper(
1120     std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
1121   );
1122   return folly::whileDo([=]() mutable {
1123       return (*count)->fetch_add(1) < n;
1124     }, thunk);
1125 }
1126
1127 namespace futures {
1128   template <class It, class F, class ItT, class Result>
1129   std::vector<Future<Result>> map(It first, It last, F func) {
1130     std::vector<Future<Result>> results;
1131     for (auto it = first; it != last; it++) {
1132       results.push_back(it->then(func));
1133     }
1134     return results;
1135   }
1136 }
1137
1138 namespace futures {
1139
1140 namespace detail {
1141
1142 struct retrying_policy_raw_tag {};
1143 struct retrying_policy_fut_tag {};
1144
1145 template <class Policy>
1146 struct retrying_policy_traits {
1147   using ew = exception_wrapper;
1148   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1149   template <class Ret>
1150   using has_op = typename std::integral_constant<bool,
1151         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1152         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1153   using is_raw = has_op<bool>;
1154   using is_fut = has_op<Future<bool>>;
1155   using tag = typename std::conditional<
1156         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1157         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1158 };
1159
1160 template <class Policy, class FF>
1161 typename std::result_of<FF(size_t)>::type
1162 retrying(size_t k, Policy&& p, FF&& ff) {
1163   using F = typename std::result_of<FF(size_t)>::type;
1164   using T = typename F::value_type;
1165   auto f = ff(k++);
1166   auto pm = makeMoveWrapper(p);
1167   auto ffm = makeMoveWrapper(ff);
1168   return f.onError([=](exception_wrapper x) mutable {
1169       auto q = (*pm)(k, x);
1170       auto xm = makeMoveWrapper(std::move(x));
1171       return q.then([=](bool r) mutable {
1172           return r
1173             ? retrying(k, pm.move(), ffm.move())
1174             : makeFuture<T>(xm.move());
1175       });
1176   });
1177 }
1178
1179 template <class Policy, class FF>
1180 typename std::result_of<FF(size_t)>::type
1181 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1182   auto pm = makeMoveWrapper(std::move(p));
1183   auto q = [=](size_t k, exception_wrapper x) {
1184     return makeFuture<bool>((*pm)(k, x));
1185   };
1186   return retrying(0, std::move(q), std::forward<FF>(ff));
1187 }
1188
1189 template <class Policy, class FF>
1190 typename std::result_of<FF(size_t)>::type
1191 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1192   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1193 }
1194
1195 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1196 template <class URNG>
1197 Duration retryingJitteredExponentialBackoffDur(
1198     size_t n,
1199     Duration backoff_min,
1200     Duration backoff_max,
1201     double jitter_param,
1202     URNG& rng) {
1203   using d = Duration;
1204   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1205   auto jitter = std::exp(dist(rng));
1206   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1207   return std::max(backoff_min, std::min(backoff_max, backoff));
1208 }
1209
1210 template <class Policy, class URNG>
1211 std::function<Future<bool>(size_t, const exception_wrapper&)>
1212 retryingPolicyCappedJitteredExponentialBackoff(
1213     size_t max_tries,
1214     Duration backoff_min,
1215     Duration backoff_max,
1216     double jitter_param,
1217     URNG rng,
1218     Policy&& p) {
1219   auto pm = makeMoveWrapper(std::move(p));
1220   auto rngp = std::make_shared<URNG>(std::move(rng));
1221   return [=](size_t n, const exception_wrapper& ex) mutable {
1222     if (n == max_tries) { return makeFuture(false); }
1223     return (*pm)(n, ex).then([=](bool v) {
1224         if (!v) { return makeFuture(false); }
1225         auto backoff = detail::retryingJitteredExponentialBackoffDur(
1226             n, backoff_min, backoff_max, jitter_param, *rngp);
1227         return futures::sleep(backoff).then([] { return true; });
1228     });
1229   };
1230 }
1231
1232 template <class Policy, class URNG>
1233 std::function<Future<bool>(size_t, const exception_wrapper&)>
1234 retryingPolicyCappedJitteredExponentialBackoff(
1235     size_t max_tries,
1236     Duration backoff_min,
1237     Duration backoff_max,
1238     double jitter_param,
1239     URNG rng,
1240     Policy&& p,
1241     retrying_policy_raw_tag) {
1242   auto pm = makeMoveWrapper(std::move(p));
1243   auto q = [=](size_t n, const exception_wrapper& e) {
1244     return makeFuture((*pm)(n, e));
1245   };
1246   return retryingPolicyCappedJitteredExponentialBackoff(
1247       max_tries,
1248       backoff_min,
1249       backoff_max,
1250       jitter_param,
1251       std::move(rng),
1252       std::move(q));
1253 }
1254
1255 template <class Policy, class URNG>
1256 std::function<Future<bool>(size_t, const exception_wrapper&)>
1257 retryingPolicyCappedJitteredExponentialBackoff(
1258     size_t max_tries,
1259     Duration backoff_min,
1260     Duration backoff_max,
1261     double jitter_param,
1262     URNG rng,
1263     Policy&& p,
1264     retrying_policy_fut_tag) {
1265   return retryingPolicyCappedJitteredExponentialBackoff(
1266       max_tries,
1267       backoff_min,
1268       backoff_max,
1269       jitter_param,
1270       std::move(rng),
1271       std::move(p));
1272 }
1273
1274 }
1275
1276 template <class Policy, class FF>
1277 typename std::result_of<FF(size_t)>::type
1278 retrying(Policy&& p, FF&& ff) {
1279   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1280   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1281 }
1282
1283 inline
1284 std::function<bool(size_t, const exception_wrapper&)>
1285 retryingPolicyBasic(
1286     size_t max_tries) {
1287   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1288 }
1289
1290 template <class Policy, class URNG>
1291 std::function<Future<bool>(size_t, const exception_wrapper&)>
1292 retryingPolicyCappedJitteredExponentialBackoff(
1293     size_t max_tries,
1294     Duration backoff_min,
1295     Duration backoff_max,
1296     double jitter_param,
1297     URNG rng,
1298     Policy&& p) {
1299   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1300   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1301       max_tries,
1302       backoff_min,
1303       backoff_max,
1304       jitter_param,
1305       std::move(rng),
1306       std::move(p),
1307       tag());
1308 }
1309
1310 inline
1311 std::function<Future<bool>(size_t, const exception_wrapper&)>
1312 retryingPolicyCappedJitteredExponentialBackoff(
1313     size_t max_tries,
1314     Duration backoff_min,
1315     Duration backoff_max,
1316     double jitter_param) {
1317   auto p = [](size_t, const exception_wrapper&) { return true; };
1318   return retryingPolicyCappedJitteredExponentialBackoff(
1319       max_tries,
1320       backoff_min,
1321       backoff_max,
1322       jitter_param,
1323       ThreadLocalPRNG(),
1324       std::move(p));
1325 }
1326
1327 }
1328
1329 // Instantiate the most common Future types to save compile time
1330 extern template class Future<Unit>;
1331 extern template class Future<bool>;
1332 extern template class Future<int>;
1333 extern template class Future<int64_t>;
1334 extern template class Future<std::string>;
1335 extern template class Future<double>;
1336
1337 } // namespace folly