Modification to futures to remove deadlock in certain use cases for getVia(executor).
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 #if defined(__ANDROID__) || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
33 #else
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/experimental/fibers/Baton.h>
36 #endif
37
38 namespace folly {
39
40 class Timekeeper;
41
42 namespace detail {
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
45 #else
46 typedef folly::Baton<> FutureBatonType;
47 #endif
48 }
49
50 namespace detail {
51   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52 }
53
54 template <class T>
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56   other.core_ = nullptr;
57 }
58
59 template <class T>
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61   std::swap(core_, other.core_);
62   return *this;
63 }
64
65 template <class T>
66 template <class T2, typename>
67 Future<T>::Future(T2&& val)
68   : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
69
70 template <class T>
71 template <typename, typename>
72 Future<T>::Future()
73   : core_(new detail::Core<T>(Try<T>(T()))) {}
74
75 template <class T>
76 Future<T>::~Future() {
77   detach();
78 }
79
80 template <class T>
81 void Future<T>::detach() {
82   if (core_) {
83     core_->detachFuture();
84     core_ = nullptr;
85   }
86 }
87
88 template <class T>
89 void Future<T>::throwIfInvalid() const {
90   if (!core_)
91     throw NoState();
92 }
93
94 template <class T>
95 template <class F>
96 void Future<T>::setCallback_(F&& func) {
97   throwIfInvalid();
98   core_->setCallback(std::move(func));
99 }
100
101 // unwrap
102
103 template <class T>
104 template <class F>
105 typename std::enable_if<isFuture<F>::value,
106                         Future<typename isFuture<T>::Inner>>::type
107 Future<T>::unwrap() {
108   return then([](Future<typename isFuture<T>::Inner> internal_future) {
109       return internal_future;
110   });
111 }
112
113 // then
114
115 // Variant: returns a value
116 // e.g. f.then([](Try<T>&& t){ return t.value(); });
117 template <class T>
118 template <typename F, typename R, bool isTry, typename... Args>
119 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
120 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
121   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
122   typedef typename R::ReturnsFuture::Inner B;
123
124   throwIfInvalid();
125
126   // wrap these so we can move them into the lambda
127   folly::MoveWrapper<Promise<B>> p;
128   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
129   folly::MoveWrapper<F> funcm(std::forward<F>(func));
130
131   // grab the Future now before we lose our handle on the Promise
132   auto f = p->getFuture();
133   f.core_->setExecutorNoLock(getExecutor());
134
135   /* This is a bit tricky.
136
137      We can't just close over *this in case this Future gets moved. So we
138      make a new dummy Future. We could figure out something more
139      sophisticated that avoids making a new Future object when it can, as an
140      optimization. But this is correct.
141
142      core_ can't be moved, it is explicitly disallowed (as is copying). But
143      if there's ever a reason to allow it, this is one place that makes that
144      assumption and would need to be fixed. We use a standard shared pointer
145      for core_ (by copying it in), which means in essence obj holds a shared
146      pointer to itself.  But this shouldn't leak because Promise will not
147      outlive the continuation, because Promise will setException() with a
148      broken Promise if it is destructed before completed. We could use a
149      weak pointer but it would have to be converted to a shared pointer when
150      func is executed (because the Future returned by func may possibly
151      persist beyond the callback, if it gets moved), and so it is an
152      optimization to just make it shared from the get-go.
153
154      We have to move in the Promise and func using the MoveWrapper
155      hack. (func could be copied but it's a big drag on perf).
156
157      Two subtle but important points about this design. detail::Core has no
158      back pointers to Future or Promise, so if Future or Promise get moved
159      (and they will be moved in performant code) we don't have to do
160      anything fancy. And because we store the continuation in the
161      detail::Core, not in the Future, we can execute the continuation even
162      after the Future has gone out of scope. This is an intentional design
163      decision. It is likely we will want to be able to cancel a continuation
164      in some circumstances, but I think it should be explicit not implicit
165      in the destruction of the Future used to create it.
166      */
167   setCallback_(
168     [p, funcm](Try<T>&& t) mutable {
169       if (!isTry && t.hasException()) {
170         p->setException(std::move(t.exception()));
171       } else {
172         p->setWith([&]() {
173           return (*funcm)(t.template get<isTry, Args>()...);
174         });
175       }
176     });
177
178   return f;
179 }
180
181 // Variant: returns a Future
182 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
183 template <class T>
184 template <typename F, typename R, bool isTry, typename... Args>
185 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
186 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
187   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
188   typedef typename R::ReturnsFuture::Inner B;
189
190   throwIfInvalid();
191
192   // wrap these so we can move them into the lambda
193   folly::MoveWrapper<Promise<B>> p;
194   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
195   folly::MoveWrapper<F> funcm(std::forward<F>(func));
196
197   // grab the Future now before we lose our handle on the Promise
198   auto f = p->getFuture();
199   f.core_->setExecutorNoLock(getExecutor());
200
201   setCallback_(
202     [p, funcm](Try<T>&& t) mutable {
203       if (!isTry && t.hasException()) {
204         p->setException(std::move(t.exception()));
205       } else {
206         try {
207           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
208           // that didn't throw, now we can steal p
209           f2.setCallback_([p](Try<B>&& b) mutable {
210             p->setTry(std::move(b));
211           });
212         } catch (const std::exception& e) {
213           p->setException(exception_wrapper(std::current_exception(), e));
214         } catch (...) {
215           p->setException(exception_wrapper(std::current_exception()));
216         }
217       }
218     });
219
220   return f;
221 }
222
223 template <typename T>
224 template <typename R, typename Caller, typename... Args>
225   Future<typename isFuture<R>::Inner>
226 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
227   typedef typename std::remove_cv<
228     typename std::remove_reference<
229       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
230   return then([instance, func](Try<T>&& t){
231     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
232   });
233 }
234
235 template <class T>
236 template <class Executor, class Arg, class... Args>
237 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
238   -> decltype(this->then(std::forward<Arg>(arg),
239                          std::forward<Args>(args)...))
240 {
241   auto oldX = getExecutor();
242   setExecutor(x);
243   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
244                via(oldX);
245 }
246
247 template <class T>
248 Future<Unit> Future<T>::then() {
249   return then([] () {});
250 }
251
252 // onError where the callback returns T
253 template <class T>
254 template <class F>
255 typename std::enable_if<
256   !detail::callableWith<F, exception_wrapper>::value &&
257   !detail::Extract<F>::ReturnsFuture::value,
258   Future<T>>::type
259 Future<T>::onError(F&& func) {
260   typedef typename detail::Extract<F>::FirstArg Exn;
261   static_assert(
262       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
263       "Return type of onError callback must be T or Future<T>");
264
265   Promise<T> p;
266   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
267   auto f = p.getFuture();
268   auto pm = folly::makeMoveWrapper(std::move(p));
269   auto funcm = folly::makeMoveWrapper(std::move(func));
270   setCallback_([pm, funcm](Try<T>&& t) mutable {
271     if (!t.template withException<Exn>([&] (Exn& e) {
272           pm->setWith([&]{
273             return (*funcm)(e);
274           });
275         })) {
276       pm->setTry(std::move(t));
277     }
278   });
279
280   return f;
281 }
282
283 // onError where the callback returns Future<T>
284 template <class T>
285 template <class F>
286 typename std::enable_if<
287   !detail::callableWith<F, exception_wrapper>::value &&
288   detail::Extract<F>::ReturnsFuture::value,
289   Future<T>>::type
290 Future<T>::onError(F&& func) {
291   static_assert(
292       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
293       "Return type of onError callback must be T or Future<T>");
294   typedef typename detail::Extract<F>::FirstArg Exn;
295
296   Promise<T> p;
297   auto f = p.getFuture();
298   auto pm = folly::makeMoveWrapper(std::move(p));
299   auto funcm = folly::makeMoveWrapper(std::move(func));
300   setCallback_([pm, funcm](Try<T>&& t) mutable {
301     if (!t.template withException<Exn>([&] (Exn& e) {
302           try {
303             auto f2 = (*funcm)(e);
304             f2.setCallback_([pm](Try<T>&& t2) mutable {
305               pm->setTry(std::move(t2));
306             });
307           } catch (const std::exception& e2) {
308             pm->setException(exception_wrapper(std::current_exception(), e2));
309           } catch (...) {
310             pm->setException(exception_wrapper(std::current_exception()));
311           }
312         })) {
313       pm->setTry(std::move(t));
314     }
315   });
316
317   return f;
318 }
319
320 template <class T>
321 template <class F>
322 Future<T> Future<T>::ensure(F func) {
323   MoveWrapper<F> funcw(std::move(func));
324   return this->then([funcw](Try<T>&& t) mutable {
325     (*funcw)();
326     return makeFuture(std::move(t));
327   });
328 }
329
330 template <class T>
331 template <class F>
332 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
333   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
334   return within(dur, tk)
335     .onError([funcw](TimedOut const&) { return (*funcw)(); });
336 }
337
338 template <class T>
339 template <class F>
340 typename std::enable_if<
341   detail::callableWith<F, exception_wrapper>::value &&
342   detail::Extract<F>::ReturnsFuture::value,
343   Future<T>>::type
344 Future<T>::onError(F&& func) {
345   static_assert(
346       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
347       "Return type of onError callback must be T or Future<T>");
348
349   Promise<T> p;
350   auto f = p.getFuture();
351   auto pm = folly::makeMoveWrapper(std::move(p));
352   auto funcm = folly::makeMoveWrapper(std::move(func));
353   setCallback_([pm, funcm](Try<T> t) mutable {
354     if (t.hasException()) {
355       try {
356         auto f2 = (*funcm)(std::move(t.exception()));
357         f2.setCallback_([pm](Try<T> t2) mutable {
358           pm->setTry(std::move(t2));
359         });
360       } catch (const std::exception& e2) {
361         pm->setException(exception_wrapper(std::current_exception(), e2));
362       } catch (...) {
363         pm->setException(exception_wrapper(std::current_exception()));
364       }
365     } else {
366       pm->setTry(std::move(t));
367     }
368   });
369
370   return f;
371 }
372
373 // onError(exception_wrapper) that returns T
374 template <class T>
375 template <class F>
376 typename std::enable_if<
377   detail::callableWith<F, exception_wrapper>::value &&
378   !detail::Extract<F>::ReturnsFuture::value,
379   Future<T>>::type
380 Future<T>::onError(F&& func) {
381   static_assert(
382       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
383       "Return type of onError callback must be T or Future<T>");
384
385   Promise<T> p;
386   auto f = p.getFuture();
387   auto pm = folly::makeMoveWrapper(std::move(p));
388   auto funcm = folly::makeMoveWrapper(std::move(func));
389   setCallback_([pm, funcm](Try<T> t) mutable {
390     if (t.hasException()) {
391       pm->setWith([&]{
392         return (*funcm)(std::move(t.exception()));
393       });
394     } else {
395       pm->setTry(std::move(t));
396     }
397   });
398
399   return f;
400 }
401
402 template <class T>
403 typename std::add_lvalue_reference<T>::type Future<T>::value() {
404   throwIfInvalid();
405
406   return core_->getTry().value();
407 }
408
409 template <class T>
410 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
411   throwIfInvalid();
412
413   return core_->getTry().value();
414 }
415
416 template <class T>
417 Try<T>& Future<T>::getTry() {
418   throwIfInvalid();
419
420   return core_->getTry();
421 }
422
423 template <class T>
424 Optional<Try<T>> Future<T>::poll() {
425   Optional<Try<T>> o;
426   if (core_->ready()) {
427     o = std::move(core_->getTry());
428   }
429   return o;
430 }
431
432 template <class T>
433 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
434   throwIfInvalid();
435
436   setExecutor(executor, priority);
437
438   return std::move(*this);
439 }
440
441 template <class T>
442 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
443   throwIfInvalid();
444
445   MoveWrapper<Promise<T>> p;
446   auto f = p->getFuture();
447   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
448   return std::move(f).via(executor, priority);
449 }
450
451
452 template <class Func>
453 auto via(Executor* x, Func func)
454   -> Future<typename isFuture<decltype(func())>::Inner>
455 {
456   // TODO make this actually more performant. :-P #7260175
457   return via(x).then(func);
458 }
459
460 template <class T>
461 bool Future<T>::isReady() const {
462   throwIfInvalid();
463   return core_->ready();
464 }
465
466 template <class T>
467 bool Future<T>::hasValue() {
468   return getTry().hasValue();
469 }
470
471 template <class T>
472 bool Future<T>::hasException() {
473   return getTry().hasException();
474 }
475
476 template <class T>
477 void Future<T>::raise(exception_wrapper exception) {
478   core_->raise(std::move(exception));
479 }
480
481 // makeFuture
482
483 template <class T>
484 Future<typename std::decay<T>::type> makeFuture(T&& t) {
485   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
486 }
487
488 inline // for multiple translation units
489 Future<Unit> makeFuture() {
490   return makeFuture(Unit{});
491 }
492
493 // makeFutureWith(Future<T>()) -> Future<T>
494 template <class F>
495 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
496                         typename std::result_of<F()>::type>::type
497 makeFutureWith(F&& func) {
498   using InnerType =
499       typename isFuture<typename std::result_of<F()>::type>::Inner;
500   try {
501     return func();
502   } catch (std::exception& e) {
503     return makeFuture<InnerType>(
504         exception_wrapper(std::current_exception(), e));
505   } catch (...) {
506     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
507   }
508 }
509
510 // makeFutureWith(T()) -> Future<T>
511 // makeFutureWith(void()) -> Future<Unit>
512 template <class F>
513 typename std::enable_if<
514     !(isFuture<typename std::result_of<F()>::type>::value),
515     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
516 makeFutureWith(F&& func) {
517   using LiftedResult =
518       typename Unit::Lift<typename std::result_of<F()>::type>::type;
519   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
520     return func();
521   }));
522 }
523
524 template <class T>
525 Future<T> makeFuture(std::exception_ptr const& e) {
526   return makeFuture(Try<T>(e));
527 }
528
529 template <class T>
530 Future<T> makeFuture(exception_wrapper ew) {
531   return makeFuture(Try<T>(std::move(ew)));
532 }
533
534 template <class T, class E>
535 typename std::enable_if<std::is_base_of<std::exception, E>::value,
536                         Future<T>>::type
537 makeFuture(E const& e) {
538   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
539 }
540
541 template <class T>
542 Future<T> makeFuture(Try<T>&& t) {
543   return Future<T>(new detail::Core<T>(std::move(t)));
544 }
545
546 // via
547 Future<Unit> via(Executor* executor, int8_t priority) {
548   return makeFuture().via(executor, priority);
549 }
550
551 // mapSetCallback calls func(i, Try<T>) when every future completes
552
553 template <class T, class InputIterator, class F>
554 void mapSetCallback(InputIterator first, InputIterator last, F func) {
555   for (size_t i = 0; first != last; ++first, ++i) {
556     first->setCallback_([func, i](Try<T>&& t) {
557       func(i, std::move(t));
558     });
559   }
560 }
561
562 // collectAll (variadic)
563
564 template <typename... Fs>
565 typename detail::CollectAllVariadicContext<
566   typename std::decay<Fs>::type::value_type...>::type
567 collectAll(Fs&&... fs) {
568   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
569     typename std::decay<Fs>::type::value_type...>>();
570   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
571     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
572   return ctx->p.getFuture();
573 }
574
575 // collectAll (iterator)
576
577 template <class InputIterator>
578 Future<
579   std::vector<
580   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
581 collectAll(InputIterator first, InputIterator last) {
582   typedef
583     typename std::iterator_traits<InputIterator>::value_type::value_type T;
584
585   struct CollectAllContext {
586     CollectAllContext(int n) : results(n) {}
587     ~CollectAllContext() {
588       p.setValue(std::move(results));
589     }
590     Promise<std::vector<Try<T>>> p;
591     std::vector<Try<T>> results;
592   };
593
594   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
595   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
596     ctx->results[i] = std::move(t);
597   });
598   return ctx->p.getFuture();
599 }
600
601 // collect (iterator)
602
603 namespace detail {
604
605 template <typename T>
606 struct CollectContext {
607   struct Nothing {
608     explicit Nothing(int /* n */) {}
609   };
610
611   using Result = typename std::conditional<
612     std::is_void<T>::value,
613     void,
614     std::vector<T>>::type;
615
616   using InternalResult = typename std::conditional<
617     std::is_void<T>::value,
618     Nothing,
619     std::vector<Optional<T>>>::type;
620
621   explicit CollectContext(int n) : result(n) {}
622   ~CollectContext() {
623     if (!threw.exchange(true)) {
624       // map Optional<T> -> T
625       std::vector<T> finalResult;
626       finalResult.reserve(result.size());
627       std::transform(result.begin(), result.end(),
628                      std::back_inserter(finalResult),
629                      [](Optional<T>& o) { return std::move(o.value()); });
630       p.setValue(std::move(finalResult));
631     }
632   }
633   inline void setPartialResult(size_t i, Try<T>& t) {
634     result[i] = std::move(t.value());
635   }
636   Promise<Result> p;
637   InternalResult result;
638   std::atomic<bool> threw {false};
639 };
640
641 }
642
643 template <class InputIterator>
644 Future<typename detail::CollectContext<
645   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
646 collect(InputIterator first, InputIterator last) {
647   typedef
648     typename std::iterator_traits<InputIterator>::value_type::value_type T;
649
650   auto ctx = std::make_shared<detail::CollectContext<T>>(
651     std::distance(first, last));
652   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
653     if (t.hasException()) {
654        if (!ctx->threw.exchange(true)) {
655          ctx->p.setException(std::move(t.exception()));
656        }
657      } else if (!ctx->threw) {
658        ctx->setPartialResult(i, t);
659      }
660   });
661   return ctx->p.getFuture();
662 }
663
664 // collect (variadic)
665
666 template <typename... Fs>
667 typename detail::CollectVariadicContext<
668   typename std::decay<Fs>::type::value_type...>::type
669 collect(Fs&&... fs) {
670   auto ctx = std::make_shared<detail::CollectVariadicContext<
671     typename std::decay<Fs>::type::value_type...>>();
672   detail::collectVariadicHelper<detail::CollectVariadicContext>(
673     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
674   return ctx->p.getFuture();
675 }
676
677 // collectAny (iterator)
678
679 template <class InputIterator>
680 Future<
681   std::pair<size_t,
682             Try<
683               typename
684               std::iterator_traits<InputIterator>::value_type::value_type>>>
685 collectAny(InputIterator first, InputIterator last) {
686   typedef
687     typename std::iterator_traits<InputIterator>::value_type::value_type T;
688
689   struct CollectAnyContext {
690     CollectAnyContext() {};
691     Promise<std::pair<size_t, Try<T>>> p;
692     std::atomic<bool> done {false};
693   };
694
695   auto ctx = std::make_shared<CollectAnyContext>();
696   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
697     if (!ctx->done.exchange(true)) {
698       ctx->p.setValue(std::make_pair(i, std::move(t)));
699     }
700   });
701   return ctx->p.getFuture();
702 }
703
704 // collectN (iterator)
705
706 template <class InputIterator>
707 Future<std::vector<std::pair<size_t, Try<typename
708   std::iterator_traits<InputIterator>::value_type::value_type>>>>
709 collectN(InputIterator first, InputIterator last, size_t n) {
710   typedef typename
711     std::iterator_traits<InputIterator>::value_type::value_type T;
712   typedef std::vector<std::pair<size_t, Try<T>>> V;
713
714   struct CollectNContext {
715     V v;
716     std::atomic<size_t> completed = {0};
717     Promise<V> p;
718   };
719   auto ctx = std::make_shared<CollectNContext>();
720
721   if (size_t(std::distance(first, last)) < n) {
722     ctx->p.setException(std::runtime_error("Not enough futures"));
723   } else {
724     // for each completed Future, increase count and add to vector, until we
725     // have n completed futures at which point we fulfil our Promise with the
726     // vector
727     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
728       auto c = ++ctx->completed;
729       if (c <= n) {
730         assert(ctx->v.size() < n);
731         ctx->v.emplace_back(i, std::move(t));
732         if (c == n) {
733           ctx->p.setTry(Try<V>(std::move(ctx->v)));
734         }
735       }
736     });
737   }
738
739   return ctx->p.getFuture();
740 }
741
742 // reduce (iterator)
743
744 template <class It, class T, class F>
745 Future<T> reduce(It first, It last, T&& initial, F&& func) {
746   if (first == last) {
747     return makeFuture(std::move(initial));
748   }
749
750   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
751   typedef typename std::conditional<
752     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
753   typedef isTry<Arg> IsTry;
754
755   folly::MoveWrapper<T> minitial(std::move(initial));
756   auto sfunc = std::make_shared<F>(std::move(func));
757
758   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
759     return (*sfunc)(std::move(*minitial),
760                 head.template get<IsTry::value, Arg&&>());
761   });
762
763   for (++first; first != last; ++first) {
764     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
765       return (*sfunc)(std::move(std::get<0>(t).value()),
766                   // Either return a ItT&& or a Try<ItT>&& depending
767                   // on the type of the argument of func.
768                   std::get<1>(t).template get<IsTry::value, Arg&&>());
769     });
770   }
771
772   return f;
773 }
774
775 // window (collection)
776
777 template <class Collection, class F, class ItT, class Result>
778 std::vector<Future<Result>>
779 window(Collection input, F func, size_t n) {
780   struct WindowContext {
781     WindowContext(Collection&& i, F&& fn)
782         : input_(std::move(i)), promises_(input_.size()),
783           func_(std::move(fn))
784       {}
785     std::atomic<size_t> i_ {0};
786     Collection input_;
787     std::vector<Promise<Result>> promises_;
788     F func_;
789
790     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
791       size_t i = ctx->i_++;
792       if (i < ctx->input_.size()) {
793         // Using setCallback_ directly since we don't need the Future
794         ctx->func_(std::move(ctx->input_[i])).setCallback_(
795           // ctx is captured by value
796           [ctx, i](Try<Result>&& t) {
797             ctx->promises_[i].setTry(std::move(t));
798             // Chain another future onto this one
799             spawn(std::move(ctx));
800           });
801       }
802     }
803   };
804
805   auto max = std::min(n, input.size());
806
807   auto ctx = std::make_shared<WindowContext>(
808     std::move(input), std::move(func));
809
810   for (size_t i = 0; i < max; ++i) {
811     // Start the first n Futures
812     WindowContext::spawn(ctx);
813   }
814
815   std::vector<Future<Result>> futures;
816   futures.reserve(ctx->promises_.size());
817   for (auto& promise : ctx->promises_) {
818     futures.emplace_back(promise.getFuture());
819   }
820
821   return futures;
822 }
823
824 // reduce
825
826 template <class T>
827 template <class I, class F>
828 Future<I> Future<T>::reduce(I&& initial, F&& func) {
829   folly::MoveWrapper<I> minitial(std::move(initial));
830   folly::MoveWrapper<F> mfunc(std::move(func));
831   return then([minitial, mfunc](T& vals) mutable {
832     auto ret = std::move(*minitial);
833     for (auto& val : vals) {
834       ret = (*mfunc)(std::move(ret), std::move(val));
835     }
836     return ret;
837   });
838 }
839
840 // unorderedReduce (iterator)
841
842 template <class It, class T, class F, class ItT, class Arg>
843 Future<T> unorderedReduce(It first, It last, T initial, F func) {
844   if (first == last) {
845     return makeFuture(std::move(initial));
846   }
847
848   typedef isTry<Arg> IsTry;
849
850   struct UnorderedReduceContext {
851     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
852         : lock_(), memo_(makeFuture<T>(std::move(memo))),
853           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
854       {};
855     folly::MicroSpinLock lock_; // protects memo_ and numThens_
856     Future<T> memo_;
857     F func_;
858     size_t numThens_; // how many Futures completed and called .then()
859     size_t numFutures_; // how many Futures in total
860     Promise<T> promise_;
861   };
862
863   auto ctx = std::make_shared<UnorderedReduceContext>(
864     std::move(initial), std::move(func), std::distance(first, last));
865
866   mapSetCallback<ItT>(
867       first,
868       last,
869       [ctx](size_t /* i */, Try<ItT>&& t) {
870         folly::MoveWrapper<Try<ItT>> mt(std::move(t));
871         // Futures can be completed in any order, simultaneously.
872         // To make this non-blocking, we create a new Future chain in
873         // the order of completion to reduce the values.
874         // The spinlock just protects chaining a new Future, not actually
875         // executing the reduce, which should be really fast.
876         folly::MSLGuard lock(ctx->lock_);
877         ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
878           // Either return a ItT&& or a Try<ItT>&& depending
879           // on the type of the argument of func.
880           return ctx->func_(std::move(v),
881                             mt->template get<IsTry::value, Arg&&>());
882         });
883         if (++ctx->numThens_ == ctx->numFutures_) {
884           // After reducing the value of the last Future, fulfill the Promise
885           ctx->memo_.setCallback_(
886               [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
887         }
888       });
889
890   return ctx->promise_.getFuture();
891 }
892
893 // within
894
895 template <class T>
896 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
897   return within(dur, TimedOut(), tk);
898 }
899
900 template <class T>
901 template <class E>
902 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
903
904   struct Context {
905     Context(E ex) : exception(std::move(ex)), promise() {}
906     E exception;
907     Future<Unit> thisFuture;
908     Promise<T> promise;
909     std::atomic<bool> token {false};
910   };
911
912   std::shared_ptr<Timekeeper> tks;
913   if (!tk) {
914     tks = folly::detail::getTimekeeperSingleton();
915     tk = DCHECK_NOTNULL(tks.get());
916   }
917
918   auto ctx = std::make_shared<Context>(std::move(e));
919
920   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
921     // TODO: "this" completed first, cancel "after"
922     if (ctx->token.exchange(true) == false) {
923       ctx->promise.setTry(std::move(t));
924     }
925   });
926
927   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
928     // "after" completed first, cancel "this"
929     ctx->thisFuture.raise(TimedOut());
930     if (ctx->token.exchange(true) == false) {
931       if (t.hasException()) {
932         ctx->promise.setException(std::move(t.exception()));
933       } else {
934         ctx->promise.setException(std::move(ctx->exception));
935       }
936     }
937   });
938
939   return ctx->promise.getFuture().via(getExecutor());
940 }
941
942 // delayed
943
944 template <class T>
945 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
946   return collectAll(*this, futures::sleep(dur, tk))
947     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
948       Try<T>& t = std::get<0>(tup);
949       return makeFuture<T>(std::move(t));
950     });
951 }
952
953 namespace detail {
954
955 template <class T>
956 void waitImpl(Future<T>& f) {
957   // short-circuit if there's nothing to do
958   if (f.isReady()) return;
959
960   FutureBatonType baton;
961   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
962   baton.wait();
963   assert(f.isReady());
964 }
965
966 template <class T>
967 void waitImpl(Future<T>& f, Duration dur) {
968   // short-circuit if there's nothing to do
969   if (f.isReady()) return;
970
971   folly::MoveWrapper<Promise<T>> promise;
972   auto ret = promise->getFuture();
973   auto baton = std::make_shared<FutureBatonType>();
974   f.setCallback_([baton, promise](Try<T>&& t) mutable {
975     promise->setTry(std::move(t));
976     baton->post();
977   });
978   f = std::move(ret);
979   if (baton->timed_wait(dur)) {
980     assert(f.isReady());
981   }
982 }
983
984 template <class T>
985 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
986   // Set callback so to ensure that the via executor has something on it
987   // so that once the preceding future triggers this callback, drive will
988   // always have a callback to satisfy it
989   if (f.isReady())
990     return;
991   f = f.then([](T&& t) { return std::move(t); });
992   while (!f.isReady()) {
993     e->drive();
994   }
995   assert(f.isReady());
996 }
997
998 } // detail
999
1000 template <class T>
1001 Future<T>& Future<T>::wait() & {
1002   detail::waitImpl(*this);
1003   return *this;
1004 }
1005
1006 template <class T>
1007 Future<T>&& Future<T>::wait() && {
1008   detail::waitImpl(*this);
1009   return std::move(*this);
1010 }
1011
1012 template <class T>
1013 Future<T>& Future<T>::wait(Duration dur) & {
1014   detail::waitImpl(*this, dur);
1015   return *this;
1016 }
1017
1018 template <class T>
1019 Future<T>&& Future<T>::wait(Duration dur) && {
1020   detail::waitImpl(*this, dur);
1021   return std::move(*this);
1022 }
1023
1024 template <class T>
1025 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1026   detail::waitViaImpl(*this, e);
1027   return *this;
1028 }
1029
1030 template <class T>
1031 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1032   detail::waitViaImpl(*this, e);
1033   return std::move(*this);
1034 }
1035
1036 template <class T>
1037 T Future<T>::get() {
1038   return std::move(wait().value());
1039 }
1040
1041 template <class T>
1042 T Future<T>::get(Duration dur) {
1043   wait(dur);
1044   if (isReady()) {
1045     return std::move(value());
1046   } else {
1047     throw TimedOut();
1048   }
1049 }
1050
1051 template <class T>
1052 T Future<T>::getVia(DrivableExecutor* e) {
1053   return std::move(waitVia(e).value());
1054 }
1055
1056 namespace detail {
1057   template <class T>
1058   struct TryEquals {
1059     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1060       return t1.value() == t2.value();
1061     }
1062   };
1063 }
1064
1065 template <class T>
1066 Future<bool> Future<T>::willEqual(Future<T>& f) {
1067   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1068     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1069       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1070     } else {
1071       return false;
1072       }
1073   });
1074 }
1075
1076 template <class T>
1077 template <class F>
1078 Future<T> Future<T>::filter(F predicate) {
1079   auto p = folly::makeMoveWrapper(std::move(predicate));
1080   return this->then([p](T val) {
1081     T const& valConstRef = val;
1082     if (!(*p)(valConstRef)) {
1083       throw PredicateDoesNotObtain();
1084     }
1085     return val;
1086   });
1087 }
1088
1089 template <class T>
1090 template <class Callback>
1091 auto Future<T>::thenMulti(Callback&& fn)
1092     -> decltype(this->then(std::forward<Callback>(fn))) {
1093   // thenMulti with one callback is just a then
1094   return then(std::forward<Callback>(fn));
1095 }
1096
1097 template <class T>
1098 template <class Callback, class... Callbacks>
1099 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1100     -> decltype(this->then(std::forward<Callback>(fn)).
1101                       thenMulti(std::forward<Callbacks>(fns)...)) {
1102   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1103   return then(std::forward<Callback>(fn)).
1104          thenMulti(std::forward<Callbacks>(fns)...);
1105 }
1106
1107 template <class T>
1108 template <class Callback, class... Callbacks>
1109 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1110                                       Callbacks&&... fns)
1111     -> decltype(this->then(std::forward<Callback>(fn)).
1112                       thenMulti(std::forward<Callbacks>(fns)...)) {
1113   // thenMultiExecutor with two callbacks is
1114   // via(x).then(a).thenMulti(b, ...).via(oldX)
1115   auto oldX = getExecutor();
1116   setExecutor(x);
1117   return then(std::forward<Callback>(fn)).
1118          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1119 }
1120
1121 template <class T>
1122 template <class Callback>
1123 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1124     -> decltype(this->then(std::forward<Callback>(fn))) {
1125   // thenMulti with one callback is just a then with an executor
1126   return then(x, std::forward<Callback>(fn));
1127 }
1128
1129 template <class F>
1130 inline Future<Unit> when(bool p, F thunk) {
1131   return p ? thunk().unit() : makeFuture();
1132 }
1133
1134 template <class P, class F>
1135 Future<Unit> whileDo(P predicate, F thunk) {
1136   if (predicate()) {
1137     return thunk().then([=] {
1138       return whileDo(predicate, thunk);
1139     });
1140   }
1141   return makeFuture();
1142 }
1143
1144 template <class F>
1145 Future<Unit> times(const int n, F thunk) {
1146   auto count = folly::makeMoveWrapper(
1147     std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
1148   );
1149   return folly::whileDo([=]() mutable {
1150       return (*count)->fetch_add(1) < n;
1151     }, thunk);
1152 }
1153
1154 namespace futures {
1155   template <class It, class F, class ItT, class Result>
1156   std::vector<Future<Result>> map(It first, It last, F func) {
1157     std::vector<Future<Result>> results;
1158     for (auto it = first; it != last; it++) {
1159       results.push_back(it->then(func));
1160     }
1161     return results;
1162   }
1163 }
1164
1165 namespace futures {
1166
1167 namespace detail {
1168
1169 struct retrying_policy_raw_tag {};
1170 struct retrying_policy_fut_tag {};
1171
1172 template <class Policy>
1173 struct retrying_policy_traits {
1174   using ew = exception_wrapper;
1175   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1176   template <class Ret>
1177   using has_op = typename std::integral_constant<bool,
1178         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1179         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1180   using is_raw = has_op<bool>;
1181   using is_fut = has_op<Future<bool>>;
1182   using tag = typename std::conditional<
1183         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1184         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1185 };
1186
1187 template <class Policy, class FF>
1188 typename std::result_of<FF(size_t)>::type
1189 retrying(size_t k, Policy&& p, FF&& ff) {
1190   using F = typename std::result_of<FF(size_t)>::type;
1191   using T = typename F::value_type;
1192   auto f = ff(k++);
1193   auto pm = makeMoveWrapper(p);
1194   auto ffm = makeMoveWrapper(ff);
1195   return f.onError([=](exception_wrapper x) mutable {
1196       auto q = (*pm)(k, x);
1197       auto xm = makeMoveWrapper(std::move(x));
1198       return q.then([=](bool r) mutable {
1199           return r
1200             ? retrying(k, pm.move(), ffm.move())
1201             : makeFuture<T>(xm.move());
1202       });
1203   });
1204 }
1205
1206 template <class Policy, class FF>
1207 typename std::result_of<FF(size_t)>::type
1208 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1209   auto pm = makeMoveWrapper(std::move(p));
1210   auto q = [=](size_t k, exception_wrapper x) {
1211     return makeFuture<bool>((*pm)(k, x));
1212   };
1213   return retrying(0, std::move(q), std::forward<FF>(ff));
1214 }
1215
1216 template <class Policy, class FF>
1217 typename std::result_of<FF(size_t)>::type
1218 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1219   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1220 }
1221
1222 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1223 template <class URNG>
1224 Duration retryingJitteredExponentialBackoffDur(
1225     size_t n,
1226     Duration backoff_min,
1227     Duration backoff_max,
1228     double jitter_param,
1229     URNG& rng) {
1230   using d = Duration;
1231   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1232   auto jitter = std::exp(dist(rng));
1233   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1234   return std::max(backoff_min, std::min(backoff_max, backoff));
1235 }
1236
1237 template <class Policy, class URNG>
1238 std::function<Future<bool>(size_t, const exception_wrapper&)>
1239 retryingPolicyCappedJitteredExponentialBackoff(
1240     size_t max_tries,
1241     Duration backoff_min,
1242     Duration backoff_max,
1243     double jitter_param,
1244     URNG rng,
1245     Policy&& p) {
1246   auto pm = makeMoveWrapper(std::move(p));
1247   auto rngp = std::make_shared<URNG>(std::move(rng));
1248   return [=](size_t n, const exception_wrapper& ex) mutable {
1249     if (n == max_tries) { return makeFuture(false); }
1250     return (*pm)(n, ex).then([=](bool v) {
1251         if (!v) { return makeFuture(false); }
1252         auto backoff = detail::retryingJitteredExponentialBackoffDur(
1253             n, backoff_min, backoff_max, jitter_param, *rngp);
1254         return futures::sleep(backoff).then([] { return true; });
1255     });
1256   };
1257 }
1258
1259 template <class Policy, class URNG>
1260 std::function<Future<bool>(size_t, const exception_wrapper&)>
1261 retryingPolicyCappedJitteredExponentialBackoff(
1262     size_t max_tries,
1263     Duration backoff_min,
1264     Duration backoff_max,
1265     double jitter_param,
1266     URNG rng,
1267     Policy&& p,
1268     retrying_policy_raw_tag) {
1269   auto pm = makeMoveWrapper(std::move(p));
1270   auto q = [=](size_t n, const exception_wrapper& e) {
1271     return makeFuture((*pm)(n, e));
1272   };
1273   return retryingPolicyCappedJitteredExponentialBackoff(
1274       max_tries,
1275       backoff_min,
1276       backoff_max,
1277       jitter_param,
1278       std::move(rng),
1279       std::move(q));
1280 }
1281
1282 template <class Policy, class URNG>
1283 std::function<Future<bool>(size_t, const exception_wrapper&)>
1284 retryingPolicyCappedJitteredExponentialBackoff(
1285     size_t max_tries,
1286     Duration backoff_min,
1287     Duration backoff_max,
1288     double jitter_param,
1289     URNG rng,
1290     Policy&& p,
1291     retrying_policy_fut_tag) {
1292   return retryingPolicyCappedJitteredExponentialBackoff(
1293       max_tries,
1294       backoff_min,
1295       backoff_max,
1296       jitter_param,
1297       std::move(rng),
1298       std::move(p));
1299 }
1300
1301 }
1302
1303 template <class Policy, class FF>
1304 typename std::result_of<FF(size_t)>::type
1305 retrying(Policy&& p, FF&& ff) {
1306   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1307   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1308 }
1309
1310 inline
1311 std::function<bool(size_t, const exception_wrapper&)>
1312 retryingPolicyBasic(
1313     size_t max_tries) {
1314   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1315 }
1316
1317 template <class Policy, class URNG>
1318 std::function<Future<bool>(size_t, const exception_wrapper&)>
1319 retryingPolicyCappedJitteredExponentialBackoff(
1320     size_t max_tries,
1321     Duration backoff_min,
1322     Duration backoff_max,
1323     double jitter_param,
1324     URNG rng,
1325     Policy&& p) {
1326   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1327   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1328       max_tries,
1329       backoff_min,
1330       backoff_max,
1331       jitter_param,
1332       std::move(rng),
1333       std::move(p),
1334       tag());
1335 }
1336
1337 inline
1338 std::function<Future<bool>(size_t, const exception_wrapper&)>
1339 retryingPolicyCappedJitteredExponentialBackoff(
1340     size_t max_tries,
1341     Duration backoff_min,
1342     Duration backoff_max,
1343     double jitter_param) {
1344   auto p = [](size_t, const exception_wrapper&) { return true; };
1345   return retryingPolicyCappedJitteredExponentialBackoff(
1346       max_tries,
1347       backoff_min,
1348       backoff_max,
1349       jitter_param,
1350       ThreadLocalPRNG(),
1351       std::move(p));
1352 }
1353
1354 }
1355
1356 // Instantiate the most common Future types to save compile time
1357 extern template class Future<Unit>;
1358 extern template class Future<bool>;
1359 extern template class Future<int>;
1360 extern template class Future<int64_t>;
1361 extern template class Future<std::string>;
1362 extern template class Future<double>;
1363
1364 } // namespace folly