5979d183c35622bf079ea35762ae957cea1dc851
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <chrono>
21 #include <random>
22 #include <thread>
23
24 #include <folly/experimental/fibers/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 namespace folly {
32
33 class Timekeeper;
34
35 namespace detail {
36   Timekeeper* getTimekeeperSingleton();
37 }
38
39 template <class T>
40 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
41   other.core_ = nullptr;
42 }
43
44 template <class T>
45 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
46   std::swap(core_, other.core_);
47   return *this;
48 }
49
50 template <class T>
51 template <class T2, typename>
52 Future<T>::Future(T2&& val)
53   : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
54
55 template <class T>
56 template <typename, typename>
57 Future<T>::Future()
58   : core_(new detail::Core<T>(Try<T>(T()))) {}
59
60 template <class T>
61 Future<T>::~Future() {
62   detach();
63 }
64
65 template <class T>
66 void Future<T>::detach() {
67   if (core_) {
68     core_->detachFuture();
69     core_ = nullptr;
70   }
71 }
72
73 template <class T>
74 void Future<T>::throwIfInvalid() const {
75   if (!core_)
76     throw NoState();
77 }
78
79 template <class T>
80 template <class F>
81 void Future<T>::setCallback_(F&& func) {
82   throwIfInvalid();
83   core_->setCallback(std::move(func));
84 }
85
86 // unwrap
87
88 template <class T>
89 template <class F>
90 typename std::enable_if<isFuture<F>::value,
91                         Future<typename isFuture<T>::Inner>>::type
92 Future<T>::unwrap() {
93   return then([](Future<typename isFuture<T>::Inner> internal_future) {
94       return internal_future;
95   });
96 }
97
98 // then
99
100 // Variant: returns a value
101 // e.g. f.then([](Try<T>&& t){ return t.value(); });
102 template <class T>
103 template <typename F, typename R, bool isTry, typename... Args>
104 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
105 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
106   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
107   typedef typename R::ReturnsFuture::Inner B;
108
109   throwIfInvalid();
110
111   // wrap these so we can move them into the lambda
112   folly::MoveWrapper<Promise<B>> p;
113   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
114   folly::MoveWrapper<F> funcm(std::forward<F>(func));
115
116   // grab the Future now before we lose our handle on the Promise
117   auto f = p->getFuture();
118   f.core_->setExecutorNoLock(getExecutor());
119
120   /* This is a bit tricky.
121
122      We can't just close over *this in case this Future gets moved. So we
123      make a new dummy Future. We could figure out something more
124      sophisticated that avoids making a new Future object when it can, as an
125      optimization. But this is correct.
126
127      core_ can't be moved, it is explicitly disallowed (as is copying). But
128      if there's ever a reason to allow it, this is one place that makes that
129      assumption and would need to be fixed. We use a standard shared pointer
130      for core_ (by copying it in), which means in essence obj holds a shared
131      pointer to itself.  But this shouldn't leak because Promise will not
132      outlive the continuation, because Promise will setException() with a
133      broken Promise if it is destructed before completed. We could use a
134      weak pointer but it would have to be converted to a shared pointer when
135      func is executed (because the Future returned by func may possibly
136      persist beyond the callback, if it gets moved), and so it is an
137      optimization to just make it shared from the get-go.
138
139      We have to move in the Promise and func using the MoveWrapper
140      hack. (func could be copied but it's a big drag on perf).
141
142      Two subtle but important points about this design. detail::Core has no
143      back pointers to Future or Promise, so if Future or Promise get moved
144      (and they will be moved in performant code) we don't have to do
145      anything fancy. And because we store the continuation in the
146      detail::Core, not in the Future, we can execute the continuation even
147      after the Future has gone out of scope. This is an intentional design
148      decision. It is likely we will want to be able to cancel a continuation
149      in some circumstances, but I think it should be explicit not implicit
150      in the destruction of the Future used to create it.
151      */
152   setCallback_(
153     [p, funcm](Try<T>&& t) mutable {
154       if (!isTry && t.hasException()) {
155         p->setException(std::move(t.exception()));
156       } else {
157         p->setWith([&]() {
158           return (*funcm)(t.template get<isTry, Args>()...);
159         });
160       }
161     });
162
163   return f;
164 }
165
166 // Variant: returns a Future
167 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
168 template <class T>
169 template <typename F, typename R, bool isTry, typename... Args>
170 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
171 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
172   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
173   typedef typename R::ReturnsFuture::Inner B;
174
175   throwIfInvalid();
176
177   // wrap these so we can move them into the lambda
178   folly::MoveWrapper<Promise<B>> p;
179   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
180   folly::MoveWrapper<F> funcm(std::forward<F>(func));
181
182   // grab the Future now before we lose our handle on the Promise
183   auto f = p->getFuture();
184   f.core_->setExecutorNoLock(getExecutor());
185
186   setCallback_(
187     [p, funcm](Try<T>&& t) mutable {
188       if (!isTry && t.hasException()) {
189         p->setException(std::move(t.exception()));
190       } else {
191         try {
192           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
193           // that didn't throw, now we can steal p
194           f2.setCallback_([p](Try<B>&& b) mutable {
195             p->setTry(std::move(b));
196           });
197         } catch (const std::exception& e) {
198           p->setException(exception_wrapper(std::current_exception(), e));
199         } catch (...) {
200           p->setException(exception_wrapper(std::current_exception()));
201         }
202       }
203     });
204
205   return f;
206 }
207
208 template <typename T>
209 template <typename R, typename Caller, typename... Args>
210   Future<typename isFuture<R>::Inner>
211 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
212   typedef typename std::remove_cv<
213     typename std::remove_reference<
214       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
215   return then([instance, func](Try<T>&& t){
216     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
217   });
218 }
219
220 template <class T>
221 template <class Executor, class Arg, class... Args>
222 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
223   -> decltype(this->then(std::forward<Arg>(arg),
224                          std::forward<Args>(args)...))
225 {
226   auto oldX = getExecutor();
227   setExecutor(x);
228   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
229                via(oldX);
230 }
231
232 template <class T>
233 Future<Unit> Future<T>::then() {
234   return then([] () {});
235 }
236
237 // onError where the callback returns T
238 template <class T>
239 template <class F>
240 typename std::enable_if<
241   !detail::callableWith<F, exception_wrapper>::value &&
242   !detail::Extract<F>::ReturnsFuture::value,
243   Future<T>>::type
244 Future<T>::onError(F&& func) {
245   typedef typename detail::Extract<F>::FirstArg Exn;
246   static_assert(
247       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
248       "Return type of onError callback must be T or Future<T>");
249
250   Promise<T> p;
251   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
252   auto f = p.getFuture();
253   auto pm = folly::makeMoveWrapper(std::move(p));
254   auto funcm = folly::makeMoveWrapper(std::move(func));
255   setCallback_([pm, funcm](Try<T>&& t) mutable {
256     if (!t.template withException<Exn>([&] (Exn& e) {
257           pm->setWith([&]{
258             return (*funcm)(e);
259           });
260         })) {
261       pm->setTry(std::move(t));
262     }
263   });
264
265   return f;
266 }
267
268 // onError where the callback returns Future<T>
269 template <class T>
270 template <class F>
271 typename std::enable_if<
272   !detail::callableWith<F, exception_wrapper>::value &&
273   detail::Extract<F>::ReturnsFuture::value,
274   Future<T>>::type
275 Future<T>::onError(F&& func) {
276   static_assert(
277       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
278       "Return type of onError callback must be T or Future<T>");
279   typedef typename detail::Extract<F>::FirstArg Exn;
280
281   Promise<T> p;
282   auto f = p.getFuture();
283   auto pm = folly::makeMoveWrapper(std::move(p));
284   auto funcm = folly::makeMoveWrapper(std::move(func));
285   setCallback_([pm, funcm](Try<T>&& t) mutable {
286     if (!t.template withException<Exn>([&] (Exn& e) {
287           try {
288             auto f2 = (*funcm)(e);
289             f2.setCallback_([pm](Try<T>&& t2) mutable {
290               pm->setTry(std::move(t2));
291             });
292           } catch (const std::exception& e2) {
293             pm->setException(exception_wrapper(std::current_exception(), e2));
294           } catch (...) {
295             pm->setException(exception_wrapper(std::current_exception()));
296           }
297         })) {
298       pm->setTry(std::move(t));
299     }
300   });
301
302   return f;
303 }
304
305 template <class T>
306 template <class F>
307 Future<T> Future<T>::ensure(F func) {
308   MoveWrapper<F> funcw(std::move(func));
309   return this->then([funcw](Try<T>&& t) mutable {
310     (*funcw)();
311     return makeFuture(std::move(t));
312   });
313 }
314
315 template <class T>
316 template <class F>
317 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
318   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
319   return within(dur, tk)
320     .onError([funcw](TimedOut const&) { return (*funcw)(); });
321 }
322
323 template <class T>
324 template <class F>
325 typename std::enable_if<
326   detail::callableWith<F, exception_wrapper>::value &&
327   detail::Extract<F>::ReturnsFuture::value,
328   Future<T>>::type
329 Future<T>::onError(F&& func) {
330   static_assert(
331       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
332       "Return type of onError callback must be T or Future<T>");
333
334   Promise<T> p;
335   auto f = p.getFuture();
336   auto pm = folly::makeMoveWrapper(std::move(p));
337   auto funcm = folly::makeMoveWrapper(std::move(func));
338   setCallback_([pm, funcm](Try<T> t) mutable {
339     if (t.hasException()) {
340       try {
341         auto f2 = (*funcm)(std::move(t.exception()));
342         f2.setCallback_([pm](Try<T> t2) mutable {
343           pm->setTry(std::move(t2));
344         });
345       } catch (const std::exception& e2) {
346         pm->setException(exception_wrapper(std::current_exception(), e2));
347       } catch (...) {
348         pm->setException(exception_wrapper(std::current_exception()));
349       }
350     } else {
351       pm->setTry(std::move(t));
352     }
353   });
354
355   return f;
356 }
357
358 // onError(exception_wrapper) that returns T
359 template <class T>
360 template <class F>
361 typename std::enable_if<
362   detail::callableWith<F, exception_wrapper>::value &&
363   !detail::Extract<F>::ReturnsFuture::value,
364   Future<T>>::type
365 Future<T>::onError(F&& func) {
366   static_assert(
367       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
368       "Return type of onError callback must be T or Future<T>");
369
370   Promise<T> p;
371   auto f = p.getFuture();
372   auto pm = folly::makeMoveWrapper(std::move(p));
373   auto funcm = folly::makeMoveWrapper(std::move(func));
374   setCallback_([pm, funcm](Try<T> t) mutable {
375     if (t.hasException()) {
376       pm->setWith([&]{
377         return (*funcm)(std::move(t.exception()));
378       });
379     } else {
380       pm->setTry(std::move(t));
381     }
382   });
383
384   return f;
385 }
386
387 template <class T>
388 typename std::add_lvalue_reference<T>::type Future<T>::value() {
389   throwIfInvalid();
390
391   return core_->getTry().value();
392 }
393
394 template <class T>
395 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
396   throwIfInvalid();
397
398   return core_->getTry().value();
399 }
400
401 template <class T>
402 Try<T>& Future<T>::getTry() {
403   throwIfInvalid();
404
405   return core_->getTry();
406 }
407
408 template <class T>
409 Optional<Try<T>> Future<T>::poll() {
410   Optional<Try<T>> o;
411   if (core_->ready()) {
412     o = std::move(core_->getTry());
413   }
414   return o;
415 }
416
417 template <class T>
418 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
419   throwIfInvalid();
420
421   setExecutor(executor, priority);
422
423   return std::move(*this);
424 }
425
426 template <class T>
427 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
428   throwIfInvalid();
429
430   MoveWrapper<Promise<T>> p;
431   auto f = p->getFuture();
432   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
433   return std::move(f).via(executor, priority);
434 }
435
436
437 template <class Func>
438 auto via(Executor* x, Func func)
439   -> Future<typename isFuture<decltype(func())>::Inner>
440 {
441   // TODO make this actually more performant. :-P #7260175
442   return via(x).then(func);
443 }
444
445 template <class T>
446 bool Future<T>::isReady() const {
447   throwIfInvalid();
448   return core_->ready();
449 }
450
451 template <class T>
452 bool Future<T>::hasValue() {
453   return getTry().hasValue();
454 }
455
456 template <class T>
457 bool Future<T>::hasException() {
458   return getTry().hasException();
459 }
460
461 template <class T>
462 void Future<T>::raise(exception_wrapper exception) {
463   core_->raise(std::move(exception));
464 }
465
466 // makeFuture
467
468 template <class T>
469 Future<typename std::decay<T>::type> makeFuture(T&& t) {
470   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
471 }
472
473 inline // for multiple translation units
474 Future<Unit> makeFuture() {
475   return makeFuture(Unit{});
476 }
477
478 // makeFutureWith(Future<T>()) -> Future<T>
479 template <class F>
480 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
481                         typename std::result_of<F()>::type>::type
482 makeFutureWith(F&& func) {
483   using InnerType =
484       typename isFuture<typename std::result_of<F()>::type>::Inner;
485   try {
486     return func();
487   } catch (std::exception& e) {
488     return makeFuture<InnerType>(
489         exception_wrapper(std::current_exception(), e));
490   } catch (...) {
491     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
492   }
493 }
494
495 // makeFutureWith(T()) -> Future<T>
496 // makeFutureWith(void()) -> Future<Unit>
497 template <class F>
498 typename std::enable_if<
499     !(isFuture<typename std::result_of<F()>::type>::value),
500     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
501 makeFutureWith(F&& func) {
502   using LiftedResult =
503       typename Unit::Lift<typename std::result_of<F()>::type>::type;
504   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
505     return func();
506   }));
507 }
508
509 template <class T>
510 Future<T> makeFuture(std::exception_ptr const& e) {
511   return makeFuture(Try<T>(e));
512 }
513
514 template <class T>
515 Future<T> makeFuture(exception_wrapper ew) {
516   return makeFuture(Try<T>(std::move(ew)));
517 }
518
519 template <class T, class E>
520 typename std::enable_if<std::is_base_of<std::exception, E>::value,
521                         Future<T>>::type
522 makeFuture(E const& e) {
523   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
524 }
525
526 template <class T>
527 Future<T> makeFuture(Try<T>&& t) {
528   return Future<T>(new detail::Core<T>(std::move(t)));
529 }
530
531 // via
532 Future<Unit> via(Executor* executor, int8_t priority) {
533   return makeFuture().via(executor, priority);
534 }
535
536 // mapSetCallback calls func(i, Try<T>) when every future completes
537
538 template <class T, class InputIterator, class F>
539 void mapSetCallback(InputIterator first, InputIterator last, F func) {
540   for (size_t i = 0; first != last; ++first, ++i) {
541     first->setCallback_([func, i](Try<T>&& t) {
542       func(i, std::move(t));
543     });
544   }
545 }
546
547 // collectAll (variadic)
548
549 template <typename... Fs>
550 typename detail::CollectAllVariadicContext<
551   typename std::decay<Fs>::type::value_type...>::type
552 collectAll(Fs&&... fs) {
553   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
554     typename std::decay<Fs>::type::value_type...>>();
555   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
556     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
557   return ctx->p.getFuture();
558 }
559
560 // collectAll (iterator)
561
562 template <class InputIterator>
563 Future<
564   std::vector<
565   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
566 collectAll(InputIterator first, InputIterator last) {
567   typedef
568     typename std::iterator_traits<InputIterator>::value_type::value_type T;
569
570   struct CollectAllContext {
571     CollectAllContext(int n) : results(n) {}
572     ~CollectAllContext() {
573       p.setValue(std::move(results));
574     }
575     Promise<std::vector<Try<T>>> p;
576     std::vector<Try<T>> results;
577   };
578
579   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
580   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
581     ctx->results[i] = std::move(t);
582   });
583   return ctx->p.getFuture();
584 }
585
586 // collect (iterator)
587
588 namespace detail {
589
590 template <typename T>
591 struct CollectContext {
592   struct Nothing { explicit Nothing(int n) {} };
593
594   using Result = typename std::conditional<
595     std::is_void<T>::value,
596     void,
597     std::vector<T>>::type;
598
599   using InternalResult = typename std::conditional<
600     std::is_void<T>::value,
601     Nothing,
602     std::vector<Optional<T>>>::type;
603
604   explicit CollectContext(int n) : result(n) {}
605   ~CollectContext() {
606     if (!threw.exchange(true)) {
607       // map Optional<T> -> T
608       std::vector<T> finalResult;
609       finalResult.reserve(result.size());
610       std::transform(result.begin(), result.end(),
611                      std::back_inserter(finalResult),
612                      [](Optional<T>& o) { return std::move(o.value()); });
613       p.setValue(std::move(finalResult));
614     }
615   }
616   inline void setPartialResult(size_t i, Try<T>& t) {
617     result[i] = std::move(t.value());
618   }
619   Promise<Result> p;
620   InternalResult result;
621   std::atomic<bool> threw {false};
622 };
623
624 }
625
626 template <class InputIterator>
627 Future<typename detail::CollectContext<
628   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
629 collect(InputIterator first, InputIterator last) {
630   typedef
631     typename std::iterator_traits<InputIterator>::value_type::value_type T;
632
633   auto ctx = std::make_shared<detail::CollectContext<T>>(
634     std::distance(first, last));
635   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
636     if (t.hasException()) {
637        if (!ctx->threw.exchange(true)) {
638          ctx->p.setException(std::move(t.exception()));
639        }
640      } else if (!ctx->threw) {
641        ctx->setPartialResult(i, t);
642      }
643   });
644   return ctx->p.getFuture();
645 }
646
647 // collect (variadic)
648
649 template <typename... Fs>
650 typename detail::CollectVariadicContext<
651   typename std::decay<Fs>::type::value_type...>::type
652 collect(Fs&&... fs) {
653   auto ctx = std::make_shared<detail::CollectVariadicContext<
654     typename std::decay<Fs>::type::value_type...>>();
655   detail::collectVariadicHelper<detail::CollectVariadicContext>(
656     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
657   return ctx->p.getFuture();
658 }
659
660 // collectAny (iterator)
661
662 template <class InputIterator>
663 Future<
664   std::pair<size_t,
665             Try<
666               typename
667               std::iterator_traits<InputIterator>::value_type::value_type>>>
668 collectAny(InputIterator first, InputIterator last) {
669   typedef
670     typename std::iterator_traits<InputIterator>::value_type::value_type T;
671
672   struct CollectAnyContext {
673     CollectAnyContext() {};
674     Promise<std::pair<size_t, Try<T>>> p;
675     std::atomic<bool> done {false};
676   };
677
678   auto ctx = std::make_shared<CollectAnyContext>();
679   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
680     if (!ctx->done.exchange(true)) {
681       ctx->p.setValue(std::make_pair(i, std::move(t)));
682     }
683   });
684   return ctx->p.getFuture();
685 }
686
687 // collectN (iterator)
688
689 template <class InputIterator>
690 Future<std::vector<std::pair<size_t, Try<typename
691   std::iterator_traits<InputIterator>::value_type::value_type>>>>
692 collectN(InputIterator first, InputIterator last, size_t n) {
693   typedef typename
694     std::iterator_traits<InputIterator>::value_type::value_type T;
695   typedef std::vector<std::pair<size_t, Try<T>>> V;
696
697   struct CollectNContext {
698     V v;
699     std::atomic<size_t> completed = {0};
700     Promise<V> p;
701   };
702   auto ctx = std::make_shared<CollectNContext>();
703
704   if (size_t(std::distance(first, last)) < n) {
705     ctx->p.setException(std::runtime_error("Not enough futures"));
706   } else {
707     // for each completed Future, increase count and add to vector, until we
708     // have n completed futures at which point we fulfil our Promise with the
709     // vector
710     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
711       auto c = ++ctx->completed;
712       if (c <= n) {
713         assert(ctx->v.size() < n);
714         ctx->v.emplace_back(i, std::move(t));
715         if (c == n) {
716           ctx->p.setTry(Try<V>(std::move(ctx->v)));
717         }
718       }
719     });
720   }
721
722   return ctx->p.getFuture();
723 }
724
725 // reduce (iterator)
726
727 template <class It, class T, class F>
728 Future<T> reduce(It first, It last, T&& initial, F&& func) {
729   if (first == last) {
730     return makeFuture(std::move(initial));
731   }
732
733   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
734   typedef typename std::conditional<
735     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
736   typedef isTry<Arg> IsTry;
737
738   folly::MoveWrapper<T> minitial(std::move(initial));
739   auto sfunc = std::make_shared<F>(std::move(func));
740
741   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
742     return (*sfunc)(std::move(*minitial),
743                 head.template get<IsTry::value, Arg&&>());
744   });
745
746   for (++first; first != last; ++first) {
747     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
748       return (*sfunc)(std::move(std::get<0>(t).value()),
749                   // Either return a ItT&& or a Try<ItT>&& depending
750                   // on the type of the argument of func.
751                   std::get<1>(t).template get<IsTry::value, Arg&&>());
752     });
753   }
754
755   return f;
756 }
757
758 // window (collection)
759
760 template <class Collection, class F, class ItT, class Result>
761 std::vector<Future<Result>>
762 window(Collection input, F func, size_t n) {
763   struct WindowContext {
764     WindowContext(Collection&& i, F&& fn)
765         : input_(std::move(i)), promises_(input_.size()),
766           func_(std::move(fn))
767       {}
768     std::atomic<size_t> i_ {0};
769     Collection input_;
770     std::vector<Promise<Result>> promises_;
771     F func_;
772
773     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
774       size_t i = ctx->i_++;
775       if (i < ctx->input_.size()) {
776         // Using setCallback_ directly since we don't need the Future
777         ctx->func_(std::move(ctx->input_[i])).setCallback_(
778           // ctx is captured by value
779           [ctx, i](Try<Result>&& t) {
780             ctx->promises_[i].setTry(std::move(t));
781             // Chain another future onto this one
782             spawn(std::move(ctx));
783           });
784       }
785     }
786   };
787
788   auto max = std::min(n, input.size());
789
790   auto ctx = std::make_shared<WindowContext>(
791     std::move(input), std::move(func));
792
793   for (size_t i = 0; i < max; ++i) {
794     // Start the first n Futures
795     WindowContext::spawn(ctx);
796   }
797
798   std::vector<Future<Result>> futures;
799   futures.reserve(ctx->promises_.size());
800   for (auto& promise : ctx->promises_) {
801     futures.emplace_back(promise.getFuture());
802   }
803
804   return futures;
805 }
806
807 // reduce
808
809 template <class T>
810 template <class I, class F>
811 Future<I> Future<T>::reduce(I&& initial, F&& func) {
812   folly::MoveWrapper<I> minitial(std::move(initial));
813   folly::MoveWrapper<F> mfunc(std::move(func));
814   return then([minitial, mfunc](T& vals) mutable {
815     auto ret = std::move(*minitial);
816     for (auto& val : vals) {
817       ret = (*mfunc)(std::move(ret), std::move(val));
818     }
819     return ret;
820   });
821 }
822
823 // unorderedReduce (iterator)
824
825 template <class It, class T, class F, class ItT, class Arg>
826 Future<T> unorderedReduce(It first, It last, T initial, F func) {
827   if (first == last) {
828     return makeFuture(std::move(initial));
829   }
830
831   typedef isTry<Arg> IsTry;
832
833   struct UnorderedReduceContext {
834     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
835         : lock_(), memo_(makeFuture<T>(std::move(memo))),
836           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
837       {};
838     folly::MicroSpinLock lock_; // protects memo_ and numThens_
839     Future<T> memo_;
840     F func_;
841     size_t numThens_; // how many Futures completed and called .then()
842     size_t numFutures_; // how many Futures in total
843     Promise<T> promise_;
844   };
845
846   auto ctx = std::make_shared<UnorderedReduceContext>(
847     std::move(initial), std::move(func), std::distance(first, last));
848
849   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
850     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
851     // Futures can be completed in any order, simultaneously.
852     // To make this non-blocking, we create a new Future chain in
853     // the order of completion to reduce the values.
854     // The spinlock just protects chaining a new Future, not actually
855     // executing the reduce, which should be really fast.
856     folly::MSLGuard lock(ctx->lock_);
857     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
858       // Either return a ItT&& or a Try<ItT>&& depending
859       // on the type of the argument of func.
860       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
861     });
862     if (++ctx->numThens_ == ctx->numFutures_) {
863       // After reducing the value of the last Future, fulfill the Promise
864       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
865         ctx->promise_.setValue(std::move(t2));
866       });
867     }
868   });
869
870   return ctx->promise_.getFuture();
871 }
872
873 // within
874
875 template <class T>
876 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
877   return within(dur, TimedOut(), tk);
878 }
879
880 template <class T>
881 template <class E>
882 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
883
884   struct Context {
885     Context(E ex) : exception(std::move(ex)), promise() {}
886     E exception;
887     Future<Unit> thisFuture;
888     Promise<T> promise;
889     std::atomic<bool> token {false};
890   };
891
892   if (!tk) {
893     tk = folly::detail::getTimekeeperSingleton();
894   }
895
896   auto ctx = std::make_shared<Context>(std::move(e));
897
898   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
899     // TODO: "this" completed first, cancel "after"
900     if (ctx->token.exchange(true) == false) {
901       ctx->promise.setTry(std::move(t));
902     }
903   });
904
905   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
906     // "after" completed first, cancel "this"
907     ctx->thisFuture.raise(TimedOut());
908     if (ctx->token.exchange(true) == false) {
909       if (t.hasException()) {
910         ctx->promise.setException(std::move(t.exception()));
911       } else {
912         ctx->promise.setException(std::move(ctx->exception));
913       }
914     }
915   });
916
917   return ctx->promise.getFuture().via(getExecutor());
918 }
919
920 // delayed
921
922 template <class T>
923 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
924   return collectAll(*this, futures::sleep(dur, tk))
925     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
926       Try<T>& t = std::get<0>(tup);
927       return makeFuture<T>(std::move(t));
928     });
929 }
930
931 namespace detail {
932
933 template <class T>
934 void waitImpl(Future<T>& f) {
935   // short-circuit if there's nothing to do
936   if (f.isReady()) return;
937
938   folly::fibers::Baton baton;
939   f.setCallback_([&](const Try<T>& t) { baton.post(); });
940   baton.wait();
941 }
942
943 template <class T>
944 void waitImpl(Future<T>& f, Duration dur) {
945   // short-circuit if there's nothing to do
946   if (f.isReady()) return;
947
948   auto baton = std::make_shared<folly::fibers::Baton>();
949   f.setCallback_([baton](const Try<T>& t) {
950     baton->post();
951   });
952   baton->timed_wait(dur);
953 }
954
955 template <class T>
956 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
957   while (!f.isReady()) {
958     e->drive();
959   }
960 }
961
962 } // detail
963
964 template <class T>
965 Future<T>& Future<T>::wait() & {
966   detail::waitImpl(*this);
967   return *this;
968 }
969
970 template <class T>
971 Future<T>&& Future<T>::wait() && {
972   detail::waitImpl(*this);
973   return std::move(*this);
974 }
975
976 template <class T>
977 Future<T>& Future<T>::wait(Duration dur) & {
978   detail::waitImpl(*this, dur);
979   return *this;
980 }
981
982 template <class T>
983 Future<T>&& Future<T>::wait(Duration dur) && {
984   detail::waitImpl(*this, dur);
985   return std::move(*this);
986 }
987
988 template <class T>
989 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
990   detail::waitViaImpl(*this, e);
991   return *this;
992 }
993
994 template <class T>
995 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
996   detail::waitViaImpl(*this, e);
997   return std::move(*this);
998 }
999
1000 template <class T>
1001 T Future<T>::get() {
1002   return std::move(wait().value());
1003 }
1004
1005 template <class T>
1006 T Future<T>::get(Duration dur) {
1007   wait(dur);
1008   if (isReady()) {
1009     return std::move(value());
1010   } else {
1011     throw TimedOut();
1012   }
1013 }
1014
1015 template <class T>
1016 T Future<T>::getVia(DrivableExecutor* e) {
1017   return std::move(waitVia(e).value());
1018 }
1019
1020 namespace detail {
1021   template <class T>
1022   struct TryEquals {
1023     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1024       return t1.value() == t2.value();
1025     }
1026   };
1027 }
1028
1029 template <class T>
1030 Future<bool> Future<T>::willEqual(Future<T>& f) {
1031   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1032     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1033       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1034     } else {
1035       return false;
1036       }
1037   });
1038 }
1039
1040 template <class T>
1041 template <class F>
1042 Future<T> Future<T>::filter(F predicate) {
1043   auto p = folly::makeMoveWrapper(std::move(predicate));
1044   return this->then([p](T val) {
1045     T const& valConstRef = val;
1046     if (!(*p)(valConstRef)) {
1047       throw PredicateDoesNotObtain();
1048     }
1049     return val;
1050   });
1051 }
1052
1053 template <class T>
1054 template <class Callback>
1055 auto Future<T>::thenMulti(Callback&& fn)
1056     -> decltype(this->then(std::forward<Callback>(fn))) {
1057   // thenMulti with one callback is just a then
1058   return then(std::forward<Callback>(fn));
1059 }
1060
1061 template <class T>
1062 template <class Callback, class... Callbacks>
1063 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1064     -> decltype(this->then(std::forward<Callback>(fn)).
1065                       thenMulti(std::forward<Callbacks>(fns)...)) {
1066   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1067   return then(std::forward<Callback>(fn)).
1068          thenMulti(std::forward<Callbacks>(fns)...);
1069 }
1070
1071 template <class T>
1072 template <class Callback, class... Callbacks>
1073 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1074                                       Callbacks&&... fns)
1075     -> decltype(this->then(std::forward<Callback>(fn)).
1076                       thenMulti(std::forward<Callbacks>(fns)...)) {
1077   // thenMultiExecutor with two callbacks is
1078   // via(x).then(a).thenMulti(b, ...).via(oldX)
1079   auto oldX = getExecutor();
1080   setExecutor(x);
1081   return then(std::forward<Callback>(fn)).
1082          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1083 }
1084
1085 template <class T>
1086 template <class Callback>
1087 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1088     -> decltype(this->then(std::forward<Callback>(fn))) {
1089   // thenMulti with one callback is just a then with an executor
1090   return then(x, std::forward<Callback>(fn));
1091 }
1092
1093 template <class F>
1094 inline Future<Unit> when(bool p, F thunk) {
1095   return p ? thunk().unit() : makeFuture();
1096 }
1097
1098 template <class P, class F>
1099 Future<Unit> whileDo(P predicate, F thunk) {
1100   if (predicate()) {
1101     return thunk().then([=] {
1102       return whileDo(predicate, thunk);
1103     });
1104   }
1105   return makeFuture();
1106 }
1107
1108 template <class F>
1109 Future<Unit> times(const int n, F thunk) {
1110   auto count = folly::makeMoveWrapper(
1111     std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
1112   );
1113   return folly::whileDo([=]() mutable {
1114       return (*count)->fetch_add(1) < n;
1115     }, thunk);
1116 }
1117
1118 namespace futures {
1119   template <class It, class F, class ItT, class Result>
1120   std::vector<Future<Result>> map(It first, It last, F func) {
1121     std::vector<Future<Result>> results;
1122     for (auto it = first; it != last; it++) {
1123       results.push_back(it->then(func));
1124     }
1125     return results;
1126   }
1127 }
1128
1129 namespace futures {
1130
1131 namespace detail {
1132
1133 struct retrying_policy_raw_tag {};
1134 struct retrying_policy_fut_tag {};
1135
1136 template <class Policy>
1137 struct retrying_policy_traits {
1138   using ew = exception_wrapper;
1139   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1140   template <class Ret>
1141   using has_op = typename std::integral_constant<bool,
1142         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1143         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1144   using is_raw = has_op<bool>;
1145   using is_fut = has_op<Future<bool>>;
1146   using tag = typename std::conditional<
1147         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1148         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1149 };
1150
1151 template <class Policy, class FF>
1152 typename std::result_of<FF(size_t)>::type
1153 retrying(size_t k, Policy&& p, FF&& ff) {
1154   using F = typename std::result_of<FF(size_t)>::type;
1155   using T = typename F::value_type;
1156   auto f = ff(k++);
1157   auto pm = makeMoveWrapper(p);
1158   auto ffm = makeMoveWrapper(ff);
1159   return f.onError([=](exception_wrapper x) mutable {
1160       auto q = (*pm)(k, x);
1161       auto xm = makeMoveWrapper(std::move(x));
1162       return q.then([=](bool r) mutable {
1163           return r
1164             ? retrying(k, pm.move(), ffm.move())
1165             : makeFuture<T>(xm.move());
1166       });
1167   });
1168 }
1169
1170 template <class Policy, class FF>
1171 typename std::result_of<FF(size_t)>::type
1172 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1173   auto pm = makeMoveWrapper(std::move(p));
1174   auto q = [=](size_t k, exception_wrapper x) {
1175     return makeFuture<bool>((*pm)(k, x));
1176   };
1177   return retrying(0, std::move(q), std::forward<FF>(ff));
1178 }
1179
1180 template <class Policy, class FF>
1181 typename std::result_of<FF(size_t)>::type
1182 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1183   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1184 }
1185
1186 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1187 template <class URNG>
1188 Duration retryingJitteredExponentialBackoffDur(
1189     size_t n,
1190     Duration backoff_min,
1191     Duration backoff_max,
1192     double jitter_param,
1193     URNG& rng) {
1194   using d = Duration;
1195   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1196   auto jitter = std::exp(dist(rng));
1197   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1198   return std::max(backoff_min, std::min(backoff_max, backoff));
1199 }
1200
1201 template <class Policy, class URNG>
1202 std::function<Future<bool>(size_t, const exception_wrapper&)>
1203 retryingPolicyCappedJitteredExponentialBackoff(
1204     size_t max_tries,
1205     Duration backoff_min,
1206     Duration backoff_max,
1207     double jitter_param,
1208     URNG rng,
1209     Policy&& p) {
1210   auto pm = makeMoveWrapper(std::move(p));
1211   auto rngp = std::make_shared<URNG>(std::move(rng));
1212   return [=](size_t n, const exception_wrapper& ex) mutable {
1213     if (n == max_tries) { return makeFuture(false); }
1214     return (*pm)(n, ex).then([=](bool v) {
1215         if (!v) { return makeFuture(false); }
1216         auto backoff = detail::retryingJitteredExponentialBackoffDur(
1217             n, backoff_min, backoff_max, jitter_param, *rngp);
1218         return futures::sleep(backoff).then([] { return true; });
1219     });
1220   };
1221 }
1222
1223 template <class Policy, class URNG>
1224 std::function<Future<bool>(size_t, const exception_wrapper&)>
1225 retryingPolicyCappedJitteredExponentialBackoff(
1226     size_t max_tries,
1227     Duration backoff_min,
1228     Duration backoff_max,
1229     double jitter_param,
1230     URNG rng,
1231     Policy&& p,
1232     retrying_policy_raw_tag) {
1233   auto pm = makeMoveWrapper(std::move(p));
1234   auto q = [=](size_t n, const exception_wrapper& e) {
1235     return makeFuture((*pm)(n, e));
1236   };
1237   return retryingPolicyCappedJitteredExponentialBackoff(
1238       max_tries,
1239       backoff_min,
1240       backoff_max,
1241       jitter_param,
1242       std::move(rng),
1243       std::move(q));
1244 }
1245
1246 template <class Policy, class URNG>
1247 std::function<Future<bool>(size_t, const exception_wrapper&)>
1248 retryingPolicyCappedJitteredExponentialBackoff(
1249     size_t max_tries,
1250     Duration backoff_min,
1251     Duration backoff_max,
1252     double jitter_param,
1253     URNG rng,
1254     Policy&& p,
1255     retrying_policy_fut_tag) {
1256   return retryingPolicyCappedJitteredExponentialBackoff(
1257       max_tries,
1258       backoff_min,
1259       backoff_max,
1260       jitter_param,
1261       std::move(rng),
1262       std::move(p));
1263 }
1264
1265 }
1266
1267 template <class Policy, class FF>
1268 typename std::result_of<FF(size_t)>::type
1269 retrying(Policy&& p, FF&& ff) {
1270   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1271   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1272 }
1273
1274 inline
1275 std::function<bool(size_t, const exception_wrapper&)>
1276 retryingPolicyBasic(
1277     size_t max_tries) {
1278   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1279 }
1280
1281 template <class Policy, class URNG>
1282 std::function<Future<bool>(size_t, const exception_wrapper&)>
1283 retryingPolicyCappedJitteredExponentialBackoff(
1284     size_t max_tries,
1285     Duration backoff_min,
1286     Duration backoff_max,
1287     double jitter_param,
1288     URNG rng,
1289     Policy&& p) {
1290   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1291   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1292       max_tries,
1293       backoff_min,
1294       backoff_max,
1295       jitter_param,
1296       std::move(rng),
1297       std::move(p),
1298       tag());
1299 }
1300
1301 inline
1302 std::function<Future<bool>(size_t, const exception_wrapper&)>
1303 retryingPolicyCappedJitteredExponentialBackoff(
1304     size_t max_tries,
1305     Duration backoff_min,
1306     Duration backoff_max,
1307     double jitter_param) {
1308   auto p = [](size_t, const exception_wrapper&) { return true; };
1309   return retryingPolicyCappedJitteredExponentialBackoff(
1310       max_tries,
1311       backoff_min,
1312       backoff_max,
1313       jitter_param,
1314       ThreadLocalPRNG(),
1315       std::move(p));
1316 }
1317
1318 }
1319
1320 // Instantiate the most common Future types to save compile time
1321 extern template class Future<Unit>;
1322 extern template class Future<bool>;
1323 extern template class Future<int>;
1324 extern template class Future<int64_t>;
1325 extern template class Future<std::string>;
1326 extern template class Future<double>;
1327
1328 } // namespace folly