fd2c71532d35951a37e20290d77873c599157109
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 #if defined(__ANDROID__) || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
33 #else
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/experimental/fibers/Baton.h>
36 #endif
37
38 namespace folly {
39
40 class Timekeeper;
41
42 namespace detail {
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
45 #else
46 typedef folly::Baton<> FutureBatonType;
47 #endif
48 }
49
50 namespace detail {
51   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52 }
53
54 template <class T>
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56   other.core_ = nullptr;
57 }
58
59 template <class T>
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61   std::swap(core_, other.core_);
62   return *this;
63 }
64
65 template <class T>
66 template <class T2, typename>
67 Future<T>::Future(T2&& val)
68   : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
69
70 template <class T>
71 template <typename, typename>
72 Future<T>::Future()
73   : core_(new detail::Core<T>(Try<T>(T()))) {}
74
75 template <class T>
76 Future<T>::~Future() {
77   detach();
78 }
79
80 template <class T>
81 void Future<T>::detach() {
82   if (core_) {
83     core_->detachFuture();
84     core_ = nullptr;
85   }
86 }
87
88 template <class T>
89 void Future<T>::throwIfInvalid() const {
90   if (!core_)
91     throw NoState();
92 }
93
94 template <class T>
95 template <class F>
96 void Future<T>::setCallback_(F&& func) {
97   throwIfInvalid();
98   core_->setCallback(std::move(func));
99 }
100
101 // unwrap
102
103 template <class T>
104 template <class F>
105 typename std::enable_if<isFuture<F>::value,
106                         Future<typename isFuture<T>::Inner>>::type
107 Future<T>::unwrap() {
108   return then([](Future<typename isFuture<T>::Inner> internal_future) {
109       return internal_future;
110   });
111 }
112
113 // then
114
115 // Variant: returns a value
116 // e.g. f.then([](Try<T>&& t){ return t.value(); });
117 template <class T>
118 template <typename F, typename R, bool isTry, typename... Args>
119 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
120 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
121   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
122   typedef typename R::ReturnsFuture::Inner B;
123
124   throwIfInvalid();
125
126   // wrap these so we can move them into the lambda
127   folly::MoveWrapper<Promise<B>> p;
128   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
129   folly::MoveWrapper<F> funcm(std::forward<F>(func));
130
131   // grab the Future now before we lose our handle on the Promise
132   auto f = p->getFuture();
133   f.core_->setExecutorNoLock(getExecutor());
134
135   /* This is a bit tricky.
136
137      We can't just close over *this in case this Future gets moved. So we
138      make a new dummy Future. We could figure out something more
139      sophisticated that avoids making a new Future object when it can, as an
140      optimization. But this is correct.
141
142      core_ can't be moved, it is explicitly disallowed (as is copying). But
143      if there's ever a reason to allow it, this is one place that makes that
144      assumption and would need to be fixed. We use a standard shared pointer
145      for core_ (by copying it in), which means in essence obj holds a shared
146      pointer to itself.  But this shouldn't leak because Promise will not
147      outlive the continuation, because Promise will setException() with a
148      broken Promise if it is destructed before completed. We could use a
149      weak pointer but it would have to be converted to a shared pointer when
150      func is executed (because the Future returned by func may possibly
151      persist beyond the callback, if it gets moved), and so it is an
152      optimization to just make it shared from the get-go.
153
154      We have to move in the Promise and func using the MoveWrapper
155      hack. (func could be copied but it's a big drag on perf).
156
157      Two subtle but important points about this design. detail::Core has no
158      back pointers to Future or Promise, so if Future or Promise get moved
159      (and they will be moved in performant code) we don't have to do
160      anything fancy. And because we store the continuation in the
161      detail::Core, not in the Future, we can execute the continuation even
162      after the Future has gone out of scope. This is an intentional design
163      decision. It is likely we will want to be able to cancel a continuation
164      in some circumstances, but I think it should be explicit not implicit
165      in the destruction of the Future used to create it.
166      */
167   setCallback_(
168     [p, funcm](Try<T>&& t) mutable {
169       if (!isTry && t.hasException()) {
170         p->setException(std::move(t.exception()));
171       } else {
172         p->setWith([&]() {
173           return (*funcm)(t.template get<isTry, Args>()...);
174         });
175       }
176     });
177
178   return f;
179 }
180
181 // Variant: returns a Future
182 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
183 template <class T>
184 template <typename F, typename R, bool isTry, typename... Args>
185 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
186 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
187   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
188   typedef typename R::ReturnsFuture::Inner B;
189
190   throwIfInvalid();
191
192   // wrap these so we can move them into the lambda
193   folly::MoveWrapper<Promise<B>> p;
194   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
195   folly::MoveWrapper<F> funcm(std::forward<F>(func));
196
197   // grab the Future now before we lose our handle on the Promise
198   auto f = p->getFuture();
199   f.core_->setExecutorNoLock(getExecutor());
200
201   setCallback_(
202     [p, funcm](Try<T>&& t) mutable {
203       if (!isTry && t.hasException()) {
204         p->setException(std::move(t.exception()));
205       } else {
206         try {
207           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
208           // that didn't throw, now we can steal p
209           f2.setCallback_([p](Try<B>&& b) mutable {
210             p->setTry(std::move(b));
211           });
212         } catch (const std::exception& e) {
213           p->setException(exception_wrapper(std::current_exception(), e));
214         } catch (...) {
215           p->setException(exception_wrapper(std::current_exception()));
216         }
217       }
218     });
219
220   return f;
221 }
222
223 template <typename T>
224 template <typename R, typename Caller, typename... Args>
225   Future<typename isFuture<R>::Inner>
226 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
227   typedef typename std::remove_cv<
228     typename std::remove_reference<
229       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
230   return then([instance, func](Try<T>&& t){
231     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
232   });
233 }
234
235 template <class T>
236 template <class Executor, class Arg, class... Args>
237 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
238   -> decltype(this->then(std::forward<Arg>(arg),
239                          std::forward<Args>(args)...))
240 {
241   auto oldX = getExecutor();
242   setExecutor(x);
243   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
244                via(oldX);
245 }
246
247 template <class T>
248 Future<Unit> Future<T>::then() {
249   return then([] () {});
250 }
251
252 // onError where the callback returns T
253 template <class T>
254 template <class F>
255 typename std::enable_if<
256   !detail::callableWith<F, exception_wrapper>::value &&
257   !detail::Extract<F>::ReturnsFuture::value,
258   Future<T>>::type
259 Future<T>::onError(F&& func) {
260   typedef typename detail::Extract<F>::FirstArg Exn;
261   static_assert(
262       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
263       "Return type of onError callback must be T or Future<T>");
264
265   Promise<T> p;
266   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
267   auto f = p.getFuture();
268   auto pm = folly::makeMoveWrapper(std::move(p));
269   auto funcm = folly::makeMoveWrapper(std::move(func));
270   setCallback_([pm, funcm](Try<T>&& t) mutable {
271     if (!t.template withException<Exn>([&] (Exn& e) {
272           pm->setWith([&]{
273             return (*funcm)(e);
274           });
275         })) {
276       pm->setTry(std::move(t));
277     }
278   });
279
280   return f;
281 }
282
283 // onError where the callback returns Future<T>
284 template <class T>
285 template <class F>
286 typename std::enable_if<
287   !detail::callableWith<F, exception_wrapper>::value &&
288   detail::Extract<F>::ReturnsFuture::value,
289   Future<T>>::type
290 Future<T>::onError(F&& func) {
291   static_assert(
292       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
293       "Return type of onError callback must be T or Future<T>");
294   typedef typename detail::Extract<F>::FirstArg Exn;
295
296   Promise<T> p;
297   auto f = p.getFuture();
298   auto pm = folly::makeMoveWrapper(std::move(p));
299   auto funcm = folly::makeMoveWrapper(std::move(func));
300   setCallback_([pm, funcm](Try<T>&& t) mutable {
301     if (!t.template withException<Exn>([&] (Exn& e) {
302           try {
303             auto f2 = (*funcm)(e);
304             f2.setCallback_([pm](Try<T>&& t2) mutable {
305               pm->setTry(std::move(t2));
306             });
307           } catch (const std::exception& e2) {
308             pm->setException(exception_wrapper(std::current_exception(), e2));
309           } catch (...) {
310             pm->setException(exception_wrapper(std::current_exception()));
311           }
312         })) {
313       pm->setTry(std::move(t));
314     }
315   });
316
317   return f;
318 }
319
320 template <class T>
321 template <class F>
322 Future<T> Future<T>::ensure(F func) {
323   MoveWrapper<F> funcw(std::move(func));
324   return this->then([funcw](Try<T>&& t) mutable {
325     (*funcw)();
326     return makeFuture(std::move(t));
327   });
328 }
329
330 template <class T>
331 template <class F>
332 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
333   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
334   return within(dur, tk)
335     .onError([funcw](TimedOut const&) { return (*funcw)(); });
336 }
337
338 template <class T>
339 template <class F>
340 typename std::enable_if<
341   detail::callableWith<F, exception_wrapper>::value &&
342   detail::Extract<F>::ReturnsFuture::value,
343   Future<T>>::type
344 Future<T>::onError(F&& func) {
345   static_assert(
346       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
347       "Return type of onError callback must be T or Future<T>");
348
349   Promise<T> p;
350   auto f = p.getFuture();
351   auto pm = folly::makeMoveWrapper(std::move(p));
352   auto funcm = folly::makeMoveWrapper(std::move(func));
353   setCallback_([pm, funcm](Try<T> t) mutable {
354     if (t.hasException()) {
355       try {
356         auto f2 = (*funcm)(std::move(t.exception()));
357         f2.setCallback_([pm](Try<T> t2) mutable {
358           pm->setTry(std::move(t2));
359         });
360       } catch (const std::exception& e2) {
361         pm->setException(exception_wrapper(std::current_exception(), e2));
362       } catch (...) {
363         pm->setException(exception_wrapper(std::current_exception()));
364       }
365     } else {
366       pm->setTry(std::move(t));
367     }
368   });
369
370   return f;
371 }
372
373 // onError(exception_wrapper) that returns T
374 template <class T>
375 template <class F>
376 typename std::enable_if<
377   detail::callableWith<F, exception_wrapper>::value &&
378   !detail::Extract<F>::ReturnsFuture::value,
379   Future<T>>::type
380 Future<T>::onError(F&& func) {
381   static_assert(
382       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
383       "Return type of onError callback must be T or Future<T>");
384
385   Promise<T> p;
386   auto f = p.getFuture();
387   auto pm = folly::makeMoveWrapper(std::move(p));
388   auto funcm = folly::makeMoveWrapper(std::move(func));
389   setCallback_([pm, funcm](Try<T> t) mutable {
390     if (t.hasException()) {
391       pm->setWith([&]{
392         return (*funcm)(std::move(t.exception()));
393       });
394     } else {
395       pm->setTry(std::move(t));
396     }
397   });
398
399   return f;
400 }
401
402 template <class T>
403 typename std::add_lvalue_reference<T>::type Future<T>::value() {
404   throwIfInvalid();
405
406   return core_->getTry().value();
407 }
408
409 template <class T>
410 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
411   throwIfInvalid();
412
413   return core_->getTry().value();
414 }
415
416 template <class T>
417 Try<T>& Future<T>::getTry() {
418   throwIfInvalid();
419
420   return core_->getTry();
421 }
422
423 template <class T>
424 Optional<Try<T>> Future<T>::poll() {
425   Optional<Try<T>> o;
426   if (core_->ready()) {
427     o = std::move(core_->getTry());
428   }
429   return o;
430 }
431
432 template <class T>
433 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
434   throwIfInvalid();
435
436   setExecutor(executor, priority);
437
438   return std::move(*this);
439 }
440
441 template <class T>
442 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
443   throwIfInvalid();
444
445   MoveWrapper<Promise<T>> p;
446   auto f = p->getFuture();
447   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
448   return std::move(f).via(executor, priority);
449 }
450
451
452 template <class Func>
453 auto via(Executor* x, Func func)
454   -> Future<typename isFuture<decltype(func())>::Inner>
455 {
456   // TODO make this actually more performant. :-P #7260175
457   return via(x).then(func);
458 }
459
460 template <class T>
461 bool Future<T>::isReady() const {
462   throwIfInvalid();
463   return core_->ready();
464 }
465
466 template <class T>
467 bool Future<T>::hasValue() {
468   return getTry().hasValue();
469 }
470
471 template <class T>
472 bool Future<T>::hasException() {
473   return getTry().hasException();
474 }
475
476 template <class T>
477 void Future<T>::raise(exception_wrapper exception) {
478   core_->raise(std::move(exception));
479 }
480
481 // makeFuture
482
483 template <class T>
484 Future<typename std::decay<T>::type> makeFuture(T&& t) {
485   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
486 }
487
488 inline // for multiple translation units
489 Future<Unit> makeFuture() {
490   return makeFuture(Unit{});
491 }
492
493 // makeFutureWith(Future<T>()) -> Future<T>
494 template <class F>
495 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
496                         typename std::result_of<F()>::type>::type
497 makeFutureWith(F&& func) {
498   using InnerType =
499       typename isFuture<typename std::result_of<F()>::type>::Inner;
500   try {
501     return func();
502   } catch (std::exception& e) {
503     return makeFuture<InnerType>(
504         exception_wrapper(std::current_exception(), e));
505   } catch (...) {
506     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
507   }
508 }
509
510 // makeFutureWith(T()) -> Future<T>
511 // makeFutureWith(void()) -> Future<Unit>
512 template <class F>
513 typename std::enable_if<
514     !(isFuture<typename std::result_of<F()>::type>::value),
515     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
516 makeFutureWith(F&& func) {
517   using LiftedResult =
518       typename Unit::Lift<typename std::result_of<F()>::type>::type;
519   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
520     return func();
521   }));
522 }
523
524 template <class T>
525 Future<T> makeFuture(std::exception_ptr const& e) {
526   return makeFuture(Try<T>(e));
527 }
528
529 template <class T>
530 Future<T> makeFuture(exception_wrapper ew) {
531   return makeFuture(Try<T>(std::move(ew)));
532 }
533
534 template <class T, class E>
535 typename std::enable_if<std::is_base_of<std::exception, E>::value,
536                         Future<T>>::type
537 makeFuture(E const& e) {
538   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
539 }
540
541 template <class T>
542 Future<T> makeFuture(Try<T>&& t) {
543   return Future<T>(new detail::Core<T>(std::move(t)));
544 }
545
546 // via
547 Future<Unit> via(Executor* executor, int8_t priority) {
548   return makeFuture().via(executor, priority);
549 }
550
551 // mapSetCallback calls func(i, Try<T>) when every future completes
552
553 template <class T, class InputIterator, class F>
554 void mapSetCallback(InputIterator first, InputIterator last, F func) {
555   for (size_t i = 0; first != last; ++first, ++i) {
556     first->setCallback_([func, i](Try<T>&& t) {
557       func(i, std::move(t));
558     });
559   }
560 }
561
562 // collectAll (variadic)
563
564 template <typename... Fs>
565 typename detail::CollectAllVariadicContext<
566   typename std::decay<Fs>::type::value_type...>::type
567 collectAll(Fs&&... fs) {
568   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
569     typename std::decay<Fs>::type::value_type...>>();
570   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
571     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
572   return ctx->p.getFuture();
573 }
574
575 // collectAll (iterator)
576
577 template <class InputIterator>
578 Future<
579   std::vector<
580   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
581 collectAll(InputIterator first, InputIterator last) {
582   typedef
583     typename std::iterator_traits<InputIterator>::value_type::value_type T;
584
585   struct CollectAllContext {
586     CollectAllContext(int n) : results(n) {}
587     ~CollectAllContext() {
588       p.setValue(std::move(results));
589     }
590     Promise<std::vector<Try<T>>> p;
591     std::vector<Try<T>> results;
592   };
593
594   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
595   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
596     ctx->results[i] = std::move(t);
597   });
598   return ctx->p.getFuture();
599 }
600
601 // collect (iterator)
602
603 namespace detail {
604
605 template <typename T>
606 struct CollectContext {
607   struct Nothing { explicit Nothing(int n) {} };
608
609   using Result = typename std::conditional<
610     std::is_void<T>::value,
611     void,
612     std::vector<T>>::type;
613
614   using InternalResult = typename std::conditional<
615     std::is_void<T>::value,
616     Nothing,
617     std::vector<Optional<T>>>::type;
618
619   explicit CollectContext(int n) : result(n) {}
620   ~CollectContext() {
621     if (!threw.exchange(true)) {
622       // map Optional<T> -> T
623       std::vector<T> finalResult;
624       finalResult.reserve(result.size());
625       std::transform(result.begin(), result.end(),
626                      std::back_inserter(finalResult),
627                      [](Optional<T>& o) { return std::move(o.value()); });
628       p.setValue(std::move(finalResult));
629     }
630   }
631   inline void setPartialResult(size_t i, Try<T>& t) {
632     result[i] = std::move(t.value());
633   }
634   Promise<Result> p;
635   InternalResult result;
636   std::atomic<bool> threw {false};
637 };
638
639 }
640
641 template <class InputIterator>
642 Future<typename detail::CollectContext<
643   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
644 collect(InputIterator first, InputIterator last) {
645   typedef
646     typename std::iterator_traits<InputIterator>::value_type::value_type T;
647
648   auto ctx = std::make_shared<detail::CollectContext<T>>(
649     std::distance(first, last));
650   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
651     if (t.hasException()) {
652        if (!ctx->threw.exchange(true)) {
653          ctx->p.setException(std::move(t.exception()));
654        }
655      } else if (!ctx->threw) {
656        ctx->setPartialResult(i, t);
657      }
658   });
659   return ctx->p.getFuture();
660 }
661
662 // collect (variadic)
663
664 template <typename... Fs>
665 typename detail::CollectVariadicContext<
666   typename std::decay<Fs>::type::value_type...>::type
667 collect(Fs&&... fs) {
668   auto ctx = std::make_shared<detail::CollectVariadicContext<
669     typename std::decay<Fs>::type::value_type...>>();
670   detail::collectVariadicHelper<detail::CollectVariadicContext>(
671     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
672   return ctx->p.getFuture();
673 }
674
675 // collectAny (iterator)
676
677 template <class InputIterator>
678 Future<
679   std::pair<size_t,
680             Try<
681               typename
682               std::iterator_traits<InputIterator>::value_type::value_type>>>
683 collectAny(InputIterator first, InputIterator last) {
684   typedef
685     typename std::iterator_traits<InputIterator>::value_type::value_type T;
686
687   struct CollectAnyContext {
688     CollectAnyContext() {};
689     Promise<std::pair<size_t, Try<T>>> p;
690     std::atomic<bool> done {false};
691   };
692
693   auto ctx = std::make_shared<CollectAnyContext>();
694   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
695     if (!ctx->done.exchange(true)) {
696       ctx->p.setValue(std::make_pair(i, std::move(t)));
697     }
698   });
699   return ctx->p.getFuture();
700 }
701
702 // collectN (iterator)
703
704 template <class InputIterator>
705 Future<std::vector<std::pair<size_t, Try<typename
706   std::iterator_traits<InputIterator>::value_type::value_type>>>>
707 collectN(InputIterator first, InputIterator last, size_t n) {
708   typedef typename
709     std::iterator_traits<InputIterator>::value_type::value_type T;
710   typedef std::vector<std::pair<size_t, Try<T>>> V;
711
712   struct CollectNContext {
713     V v;
714     std::atomic<size_t> completed = {0};
715     Promise<V> p;
716   };
717   auto ctx = std::make_shared<CollectNContext>();
718
719   if (size_t(std::distance(first, last)) < n) {
720     ctx->p.setException(std::runtime_error("Not enough futures"));
721   } else {
722     // for each completed Future, increase count and add to vector, until we
723     // have n completed futures at which point we fulfil our Promise with the
724     // vector
725     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
726       auto c = ++ctx->completed;
727       if (c <= n) {
728         assert(ctx->v.size() < n);
729         ctx->v.emplace_back(i, std::move(t));
730         if (c == n) {
731           ctx->p.setTry(Try<V>(std::move(ctx->v)));
732         }
733       }
734     });
735   }
736
737   return ctx->p.getFuture();
738 }
739
740 // reduce (iterator)
741
742 template <class It, class T, class F>
743 Future<T> reduce(It first, It last, T&& initial, F&& func) {
744   if (first == last) {
745     return makeFuture(std::move(initial));
746   }
747
748   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
749   typedef typename std::conditional<
750     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
751   typedef isTry<Arg> IsTry;
752
753   folly::MoveWrapper<T> minitial(std::move(initial));
754   auto sfunc = std::make_shared<F>(std::move(func));
755
756   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
757     return (*sfunc)(std::move(*minitial),
758                 head.template get<IsTry::value, Arg&&>());
759   });
760
761   for (++first; first != last; ++first) {
762     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
763       return (*sfunc)(std::move(std::get<0>(t).value()),
764                   // Either return a ItT&& or a Try<ItT>&& depending
765                   // on the type of the argument of func.
766                   std::get<1>(t).template get<IsTry::value, Arg&&>());
767     });
768   }
769
770   return f;
771 }
772
773 // window (collection)
774
775 template <class Collection, class F, class ItT, class Result>
776 std::vector<Future<Result>>
777 window(Collection input, F func, size_t n) {
778   struct WindowContext {
779     WindowContext(Collection&& i, F&& fn)
780         : input_(std::move(i)), promises_(input_.size()),
781           func_(std::move(fn))
782       {}
783     std::atomic<size_t> i_ {0};
784     Collection input_;
785     std::vector<Promise<Result>> promises_;
786     F func_;
787
788     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
789       size_t i = ctx->i_++;
790       if (i < ctx->input_.size()) {
791         // Using setCallback_ directly since we don't need the Future
792         ctx->func_(std::move(ctx->input_[i])).setCallback_(
793           // ctx is captured by value
794           [ctx, i](Try<Result>&& t) {
795             ctx->promises_[i].setTry(std::move(t));
796             // Chain another future onto this one
797             spawn(std::move(ctx));
798           });
799       }
800     }
801   };
802
803   auto max = std::min(n, input.size());
804
805   auto ctx = std::make_shared<WindowContext>(
806     std::move(input), std::move(func));
807
808   for (size_t i = 0; i < max; ++i) {
809     // Start the first n Futures
810     WindowContext::spawn(ctx);
811   }
812
813   std::vector<Future<Result>> futures;
814   futures.reserve(ctx->promises_.size());
815   for (auto& promise : ctx->promises_) {
816     futures.emplace_back(promise.getFuture());
817   }
818
819   return futures;
820 }
821
822 // reduce
823
824 template <class T>
825 template <class I, class F>
826 Future<I> Future<T>::reduce(I&& initial, F&& func) {
827   folly::MoveWrapper<I> minitial(std::move(initial));
828   folly::MoveWrapper<F> mfunc(std::move(func));
829   return then([minitial, mfunc](T& vals) mutable {
830     auto ret = std::move(*minitial);
831     for (auto& val : vals) {
832       ret = (*mfunc)(std::move(ret), std::move(val));
833     }
834     return ret;
835   });
836 }
837
838 // unorderedReduce (iterator)
839
840 template <class It, class T, class F, class ItT, class Arg>
841 Future<T> unorderedReduce(It first, It last, T initial, F func) {
842   if (first == last) {
843     return makeFuture(std::move(initial));
844   }
845
846   typedef isTry<Arg> IsTry;
847
848   struct UnorderedReduceContext {
849     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
850         : lock_(), memo_(makeFuture<T>(std::move(memo))),
851           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
852       {};
853     folly::MicroSpinLock lock_; // protects memo_ and numThens_
854     Future<T> memo_;
855     F func_;
856     size_t numThens_; // how many Futures completed and called .then()
857     size_t numFutures_; // how many Futures in total
858     Promise<T> promise_;
859   };
860
861   auto ctx = std::make_shared<UnorderedReduceContext>(
862     std::move(initial), std::move(func), std::distance(first, last));
863
864   mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
865     folly::MoveWrapper<Try<ItT>> mt(std::move(t));
866     // Futures can be completed in any order, simultaneously.
867     // To make this non-blocking, we create a new Future chain in
868     // the order of completion to reduce the values.
869     // The spinlock just protects chaining a new Future, not actually
870     // executing the reduce, which should be really fast.
871     folly::MSLGuard lock(ctx->lock_);
872     ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
873       // Either return a ItT&& or a Try<ItT>&& depending
874       // on the type of the argument of func.
875       return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
876     });
877     if (++ctx->numThens_ == ctx->numFutures_) {
878       // After reducing the value of the last Future, fulfill the Promise
879       ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
880         ctx->promise_.setValue(std::move(t2));
881       });
882     }
883   });
884
885   return ctx->promise_.getFuture();
886 }
887
888 // within
889
890 template <class T>
891 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
892   return within(dur, TimedOut(), tk);
893 }
894
895 template <class T>
896 template <class E>
897 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
898
899   struct Context {
900     Context(E ex) : exception(std::move(ex)), promise() {}
901     E exception;
902     Future<Unit> thisFuture;
903     Promise<T> promise;
904     std::atomic<bool> token {false};
905   };
906
907   std::shared_ptr<Timekeeper> tks;
908   if (!tk) {
909     tks = folly::detail::getTimekeeperSingleton();
910     tk = DCHECK_NOTNULL(tks.get());
911   }
912
913   auto ctx = std::make_shared<Context>(std::move(e));
914
915   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
916     // TODO: "this" completed first, cancel "after"
917     if (ctx->token.exchange(true) == false) {
918       ctx->promise.setTry(std::move(t));
919     }
920   });
921
922   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
923     // "after" completed first, cancel "this"
924     ctx->thisFuture.raise(TimedOut());
925     if (ctx->token.exchange(true) == false) {
926       if (t.hasException()) {
927         ctx->promise.setException(std::move(t.exception()));
928       } else {
929         ctx->promise.setException(std::move(ctx->exception));
930       }
931     }
932   });
933
934   return ctx->promise.getFuture().via(getExecutor());
935 }
936
937 // delayed
938
939 template <class T>
940 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
941   return collectAll(*this, futures::sleep(dur, tk))
942     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
943       Try<T>& t = std::get<0>(tup);
944       return makeFuture<T>(std::move(t));
945     });
946 }
947
948 namespace detail {
949
950 template <class T>
951 void waitImpl(Future<T>& f) {
952   // short-circuit if there's nothing to do
953   if (f.isReady()) return;
954
955   FutureBatonType baton;
956   f.setCallback_([&](const Try<T>& t) { baton.post(); });
957   baton.wait();
958   assert(f.isReady());
959 }
960
961 template <class T>
962 void waitImpl(Future<T>& f, Duration dur) {
963   // short-circuit if there's nothing to do
964   if (f.isReady()) return;
965
966   folly::MoveWrapper<Promise<T>> promise;
967   auto ret = promise->getFuture();
968   auto baton = std::make_shared<FutureBatonType>();
969   f.setCallback_([baton, promise](Try<T>&& t) mutable {
970     promise->setTry(std::move(t));
971     baton->post();
972   });
973   f = std::move(ret);
974   if (baton->timed_wait(dur)) {
975     assert(f.isReady());
976   }
977 }
978
979 template <class T>
980 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
981   while (!f.isReady()) {
982     e->drive();
983   }
984 }
985
986 } // detail
987
988 template <class T>
989 Future<T>& Future<T>::wait() & {
990   detail::waitImpl(*this);
991   return *this;
992 }
993
994 template <class T>
995 Future<T>&& Future<T>::wait() && {
996   detail::waitImpl(*this);
997   return std::move(*this);
998 }
999
1000 template <class T>
1001 Future<T>& Future<T>::wait(Duration dur) & {
1002   detail::waitImpl(*this, dur);
1003   return *this;
1004 }
1005
1006 template <class T>
1007 Future<T>&& Future<T>::wait(Duration dur) && {
1008   detail::waitImpl(*this, dur);
1009   return std::move(*this);
1010 }
1011
1012 template <class T>
1013 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1014   detail::waitViaImpl(*this, e);
1015   return *this;
1016 }
1017
1018 template <class T>
1019 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1020   detail::waitViaImpl(*this, e);
1021   return std::move(*this);
1022 }
1023
1024 template <class T>
1025 T Future<T>::get() {
1026   return std::move(wait().value());
1027 }
1028
1029 template <class T>
1030 T Future<T>::get(Duration dur) {
1031   wait(dur);
1032   if (isReady()) {
1033     return std::move(value());
1034   } else {
1035     throw TimedOut();
1036   }
1037 }
1038
1039 template <class T>
1040 T Future<T>::getVia(DrivableExecutor* e) {
1041   return std::move(waitVia(e).value());
1042 }
1043
1044 namespace detail {
1045   template <class T>
1046   struct TryEquals {
1047     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1048       return t1.value() == t2.value();
1049     }
1050   };
1051 }
1052
1053 template <class T>
1054 Future<bool> Future<T>::willEqual(Future<T>& f) {
1055   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1056     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1057       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1058     } else {
1059       return false;
1060       }
1061   });
1062 }
1063
1064 template <class T>
1065 template <class F>
1066 Future<T> Future<T>::filter(F predicate) {
1067   auto p = folly::makeMoveWrapper(std::move(predicate));
1068   return this->then([p](T val) {
1069     T const& valConstRef = val;
1070     if (!(*p)(valConstRef)) {
1071       throw PredicateDoesNotObtain();
1072     }
1073     return val;
1074   });
1075 }
1076
1077 template <class T>
1078 template <class Callback>
1079 auto Future<T>::thenMulti(Callback&& fn)
1080     -> decltype(this->then(std::forward<Callback>(fn))) {
1081   // thenMulti with one callback is just a then
1082   return then(std::forward<Callback>(fn));
1083 }
1084
1085 template <class T>
1086 template <class Callback, class... Callbacks>
1087 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1088     -> decltype(this->then(std::forward<Callback>(fn)).
1089                       thenMulti(std::forward<Callbacks>(fns)...)) {
1090   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1091   return then(std::forward<Callback>(fn)).
1092          thenMulti(std::forward<Callbacks>(fns)...);
1093 }
1094
1095 template <class T>
1096 template <class Callback, class... Callbacks>
1097 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1098                                       Callbacks&&... fns)
1099     -> decltype(this->then(std::forward<Callback>(fn)).
1100                       thenMulti(std::forward<Callbacks>(fns)...)) {
1101   // thenMultiExecutor with two callbacks is
1102   // via(x).then(a).thenMulti(b, ...).via(oldX)
1103   auto oldX = getExecutor();
1104   setExecutor(x);
1105   return then(std::forward<Callback>(fn)).
1106          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1107 }
1108
1109 template <class T>
1110 template <class Callback>
1111 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1112     -> decltype(this->then(std::forward<Callback>(fn))) {
1113   // thenMulti with one callback is just a then with an executor
1114   return then(x, std::forward<Callback>(fn));
1115 }
1116
1117 template <class F>
1118 inline Future<Unit> when(bool p, F thunk) {
1119   return p ? thunk().unit() : makeFuture();
1120 }
1121
1122 template <class P, class F>
1123 Future<Unit> whileDo(P predicate, F thunk) {
1124   if (predicate()) {
1125     return thunk().then([=] {
1126       return whileDo(predicate, thunk);
1127     });
1128   }
1129   return makeFuture();
1130 }
1131
1132 template <class F>
1133 Future<Unit> times(const int n, F thunk) {
1134   auto count = folly::makeMoveWrapper(
1135     std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
1136   );
1137   return folly::whileDo([=]() mutable {
1138       return (*count)->fetch_add(1) < n;
1139     }, thunk);
1140 }
1141
1142 namespace futures {
1143   template <class It, class F, class ItT, class Result>
1144   std::vector<Future<Result>> map(It first, It last, F func) {
1145     std::vector<Future<Result>> results;
1146     for (auto it = first; it != last; it++) {
1147       results.push_back(it->then(func));
1148     }
1149     return results;
1150   }
1151 }
1152
1153 namespace futures {
1154
1155 namespace detail {
1156
1157 struct retrying_policy_raw_tag {};
1158 struct retrying_policy_fut_tag {};
1159
1160 template <class Policy>
1161 struct retrying_policy_traits {
1162   using ew = exception_wrapper;
1163   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1164   template <class Ret>
1165   using has_op = typename std::integral_constant<bool,
1166         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1167         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1168   using is_raw = has_op<bool>;
1169   using is_fut = has_op<Future<bool>>;
1170   using tag = typename std::conditional<
1171         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1172         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1173 };
1174
1175 template <class Policy, class FF>
1176 typename std::result_of<FF(size_t)>::type
1177 retrying(size_t k, Policy&& p, FF&& ff) {
1178   using F = typename std::result_of<FF(size_t)>::type;
1179   using T = typename F::value_type;
1180   auto f = ff(k++);
1181   auto pm = makeMoveWrapper(p);
1182   auto ffm = makeMoveWrapper(ff);
1183   return f.onError([=](exception_wrapper x) mutable {
1184       auto q = (*pm)(k, x);
1185       auto xm = makeMoveWrapper(std::move(x));
1186       return q.then([=](bool r) mutable {
1187           return r
1188             ? retrying(k, pm.move(), ffm.move())
1189             : makeFuture<T>(xm.move());
1190       });
1191   });
1192 }
1193
1194 template <class Policy, class FF>
1195 typename std::result_of<FF(size_t)>::type
1196 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1197   auto pm = makeMoveWrapper(std::move(p));
1198   auto q = [=](size_t k, exception_wrapper x) {
1199     return makeFuture<bool>((*pm)(k, x));
1200   };
1201   return retrying(0, std::move(q), std::forward<FF>(ff));
1202 }
1203
1204 template <class Policy, class FF>
1205 typename std::result_of<FF(size_t)>::type
1206 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1207   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1208 }
1209
1210 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1211 template <class URNG>
1212 Duration retryingJitteredExponentialBackoffDur(
1213     size_t n,
1214     Duration backoff_min,
1215     Duration backoff_max,
1216     double jitter_param,
1217     URNG& rng) {
1218   using d = Duration;
1219   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1220   auto jitter = std::exp(dist(rng));
1221   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1222   return std::max(backoff_min, std::min(backoff_max, backoff));
1223 }
1224
1225 template <class Policy, class URNG>
1226 std::function<Future<bool>(size_t, const exception_wrapper&)>
1227 retryingPolicyCappedJitteredExponentialBackoff(
1228     size_t max_tries,
1229     Duration backoff_min,
1230     Duration backoff_max,
1231     double jitter_param,
1232     URNG rng,
1233     Policy&& p) {
1234   auto pm = makeMoveWrapper(std::move(p));
1235   auto rngp = std::make_shared<URNG>(std::move(rng));
1236   return [=](size_t n, const exception_wrapper& ex) mutable {
1237     if (n == max_tries) { return makeFuture(false); }
1238     return (*pm)(n, ex).then([=](bool v) {
1239         if (!v) { return makeFuture(false); }
1240         auto backoff = detail::retryingJitteredExponentialBackoffDur(
1241             n, backoff_min, backoff_max, jitter_param, *rngp);
1242         return futures::sleep(backoff).then([] { return true; });
1243     });
1244   };
1245 }
1246
1247 template <class Policy, class URNG>
1248 std::function<Future<bool>(size_t, const exception_wrapper&)>
1249 retryingPolicyCappedJitteredExponentialBackoff(
1250     size_t max_tries,
1251     Duration backoff_min,
1252     Duration backoff_max,
1253     double jitter_param,
1254     URNG rng,
1255     Policy&& p,
1256     retrying_policy_raw_tag) {
1257   auto pm = makeMoveWrapper(std::move(p));
1258   auto q = [=](size_t n, const exception_wrapper& e) {
1259     return makeFuture((*pm)(n, e));
1260   };
1261   return retryingPolicyCappedJitteredExponentialBackoff(
1262       max_tries,
1263       backoff_min,
1264       backoff_max,
1265       jitter_param,
1266       std::move(rng),
1267       std::move(q));
1268 }
1269
1270 template <class Policy, class URNG>
1271 std::function<Future<bool>(size_t, const exception_wrapper&)>
1272 retryingPolicyCappedJitteredExponentialBackoff(
1273     size_t max_tries,
1274     Duration backoff_min,
1275     Duration backoff_max,
1276     double jitter_param,
1277     URNG rng,
1278     Policy&& p,
1279     retrying_policy_fut_tag) {
1280   return retryingPolicyCappedJitteredExponentialBackoff(
1281       max_tries,
1282       backoff_min,
1283       backoff_max,
1284       jitter_param,
1285       std::move(rng),
1286       std::move(p));
1287 }
1288
1289 }
1290
1291 template <class Policy, class FF>
1292 typename std::result_of<FF(size_t)>::type
1293 retrying(Policy&& p, FF&& ff) {
1294   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1295   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1296 }
1297
1298 inline
1299 std::function<bool(size_t, const exception_wrapper&)>
1300 retryingPolicyBasic(
1301     size_t max_tries) {
1302   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1303 }
1304
1305 template <class Policy, class URNG>
1306 std::function<Future<bool>(size_t, const exception_wrapper&)>
1307 retryingPolicyCappedJitteredExponentialBackoff(
1308     size_t max_tries,
1309     Duration backoff_min,
1310     Duration backoff_max,
1311     double jitter_param,
1312     URNG rng,
1313     Policy&& p) {
1314   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1315   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1316       max_tries,
1317       backoff_min,
1318       backoff_max,
1319       jitter_param,
1320       std::move(rng),
1321       std::move(p),
1322       tag());
1323 }
1324
1325 inline
1326 std::function<Future<bool>(size_t, const exception_wrapper&)>
1327 retryingPolicyCappedJitteredExponentialBackoff(
1328     size_t max_tries,
1329     Duration backoff_min,
1330     Duration backoff_max,
1331     double jitter_param) {
1332   auto p = [](size_t, const exception_wrapper&) { return true; };
1333   return retryingPolicyCappedJitteredExponentialBackoff(
1334       max_tries,
1335       backoff_min,
1336       backoff_max,
1337       jitter_param,
1338       ThreadLocalPRNG(),
1339       std::move(p));
1340 }
1341
1342 }
1343
1344 // Instantiate the most common Future types to save compile time
1345 extern template class Future<Unit>;
1346 extern template class Future<bool>;
1347 extern template class Future<int>;
1348 extern template class Future<int64_t>;
1349 extern template class Future<std::string>;
1350 extern template class Future<double>;
1351
1352 } // namespace folly