folly copyright 2015 -> copyright 2016
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <random>
23 #include <thread>
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
30
31 #if defined(__ANDROID__) || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
33 #else
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/experimental/fibers/Baton.h>
36 #endif
37
38 namespace folly {
39
40 class Timekeeper;
41
42 namespace detail {
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
45 #else
46 typedef folly::Baton<> FutureBatonType;
47 #endif
48 }
49
50 namespace detail {
51   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52 }
53
54 template <class T>
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56   other.core_ = nullptr;
57 }
58
59 template <class T>
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61   std::swap(core_, other.core_);
62   return *this;
63 }
64
65 template <class T>
66 template <class T2, typename>
67 Future<T>::Future(T2&& val)
68   : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
69
70 template <class T>
71 template <typename, typename>
72 Future<T>::Future()
73   : core_(new detail::Core<T>(Try<T>(T()))) {}
74
75 template <class T>
76 Future<T>::~Future() {
77   detach();
78 }
79
80 template <class T>
81 void Future<T>::detach() {
82   if (core_) {
83     core_->detachFuture();
84     core_ = nullptr;
85   }
86 }
87
88 template <class T>
89 void Future<T>::throwIfInvalid() const {
90   if (!core_)
91     throw NoState();
92 }
93
94 template <class T>
95 template <class F>
96 void Future<T>::setCallback_(F&& func) {
97   throwIfInvalid();
98   core_->setCallback(std::move(func));
99 }
100
101 // unwrap
102
103 template <class T>
104 template <class F>
105 typename std::enable_if<isFuture<F>::value,
106                         Future<typename isFuture<T>::Inner>>::type
107 Future<T>::unwrap() {
108   return then([](Future<typename isFuture<T>::Inner> internal_future) {
109       return internal_future;
110   });
111 }
112
113 // then
114
115 // Variant: returns a value
116 // e.g. f.then([](Try<T>&& t){ return t.value(); });
117 template <class T>
118 template <typename F, typename R, bool isTry, typename... Args>
119 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
120 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
121   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
122   typedef typename R::ReturnsFuture::Inner B;
123
124   throwIfInvalid();
125
126   // wrap these so we can move them into the lambda
127   folly::MoveWrapper<Promise<B>> p;
128   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
129   folly::MoveWrapper<F> funcm(std::forward<F>(func));
130
131   // grab the Future now before we lose our handle on the Promise
132   auto f = p->getFuture();
133   f.core_->setExecutorNoLock(getExecutor());
134
135   /* This is a bit tricky.
136
137      We can't just close over *this in case this Future gets moved. So we
138      make a new dummy Future. We could figure out something more
139      sophisticated that avoids making a new Future object when it can, as an
140      optimization. But this is correct.
141
142      core_ can't be moved, it is explicitly disallowed (as is copying). But
143      if there's ever a reason to allow it, this is one place that makes that
144      assumption and would need to be fixed. We use a standard shared pointer
145      for core_ (by copying it in), which means in essence obj holds a shared
146      pointer to itself.  But this shouldn't leak because Promise will not
147      outlive the continuation, because Promise will setException() with a
148      broken Promise if it is destructed before completed. We could use a
149      weak pointer but it would have to be converted to a shared pointer when
150      func is executed (because the Future returned by func may possibly
151      persist beyond the callback, if it gets moved), and so it is an
152      optimization to just make it shared from the get-go.
153
154      We have to move in the Promise and func using the MoveWrapper
155      hack. (func could be copied but it's a big drag on perf).
156
157      Two subtle but important points about this design. detail::Core has no
158      back pointers to Future or Promise, so if Future or Promise get moved
159      (and they will be moved in performant code) we don't have to do
160      anything fancy. And because we store the continuation in the
161      detail::Core, not in the Future, we can execute the continuation even
162      after the Future has gone out of scope. This is an intentional design
163      decision. It is likely we will want to be able to cancel a continuation
164      in some circumstances, but I think it should be explicit not implicit
165      in the destruction of the Future used to create it.
166      */
167   setCallback_(
168     [p, funcm](Try<T>&& t) mutable {
169       if (!isTry && t.hasException()) {
170         p->setException(std::move(t.exception()));
171       } else {
172         p->setWith([&]() {
173           return (*funcm)(t.template get<isTry, Args>()...);
174         });
175       }
176     });
177
178   return f;
179 }
180
181 // Variant: returns a Future
182 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
183 template <class T>
184 template <typename F, typename R, bool isTry, typename... Args>
185 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
186 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
187   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
188   typedef typename R::ReturnsFuture::Inner B;
189
190   throwIfInvalid();
191
192   // wrap these so we can move them into the lambda
193   folly::MoveWrapper<Promise<B>> p;
194   p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
195   folly::MoveWrapper<F> funcm(std::forward<F>(func));
196
197   // grab the Future now before we lose our handle on the Promise
198   auto f = p->getFuture();
199   f.core_->setExecutorNoLock(getExecutor());
200
201   setCallback_(
202     [p, funcm](Try<T>&& t) mutable {
203       if (!isTry && t.hasException()) {
204         p->setException(std::move(t.exception()));
205       } else {
206         try {
207           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
208           // that didn't throw, now we can steal p
209           f2.setCallback_([p](Try<B>&& b) mutable {
210             p->setTry(std::move(b));
211           });
212         } catch (const std::exception& e) {
213           p->setException(exception_wrapper(std::current_exception(), e));
214         } catch (...) {
215           p->setException(exception_wrapper(std::current_exception()));
216         }
217       }
218     });
219
220   return f;
221 }
222
223 template <typename T>
224 template <typename R, typename Caller, typename... Args>
225   Future<typename isFuture<R>::Inner>
226 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
227   typedef typename std::remove_cv<
228     typename std::remove_reference<
229       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
230   return then([instance, func](Try<T>&& t){
231     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
232   });
233 }
234
235 template <class T>
236 template <class Executor, class Arg, class... Args>
237 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
238   -> decltype(this->then(std::forward<Arg>(arg),
239                          std::forward<Args>(args)...))
240 {
241   auto oldX = getExecutor();
242   setExecutor(x);
243   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
244                via(oldX);
245 }
246
247 template <class T>
248 Future<Unit> Future<T>::then() {
249   return then([] () {});
250 }
251
252 // onError where the callback returns T
253 template <class T>
254 template <class F>
255 typename std::enable_if<
256   !detail::callableWith<F, exception_wrapper>::value &&
257   !detail::Extract<F>::ReturnsFuture::value,
258   Future<T>>::type
259 Future<T>::onError(F&& func) {
260   typedef typename detail::Extract<F>::FirstArg Exn;
261   static_assert(
262       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
263       "Return type of onError callback must be T or Future<T>");
264
265   Promise<T> p;
266   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
267   auto f = p.getFuture();
268   auto pm = folly::makeMoveWrapper(std::move(p));
269   auto funcm = folly::makeMoveWrapper(std::move(func));
270   setCallback_([pm, funcm](Try<T>&& t) mutable {
271     if (!t.template withException<Exn>([&] (Exn& e) {
272           pm->setWith([&]{
273             return (*funcm)(e);
274           });
275         })) {
276       pm->setTry(std::move(t));
277     }
278   });
279
280   return f;
281 }
282
283 // onError where the callback returns Future<T>
284 template <class T>
285 template <class F>
286 typename std::enable_if<
287   !detail::callableWith<F, exception_wrapper>::value &&
288   detail::Extract<F>::ReturnsFuture::value,
289   Future<T>>::type
290 Future<T>::onError(F&& func) {
291   static_assert(
292       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
293       "Return type of onError callback must be T or Future<T>");
294   typedef typename detail::Extract<F>::FirstArg Exn;
295
296   Promise<T> p;
297   auto f = p.getFuture();
298   auto pm = folly::makeMoveWrapper(std::move(p));
299   auto funcm = folly::makeMoveWrapper(std::move(func));
300   setCallback_([pm, funcm](Try<T>&& t) mutable {
301     if (!t.template withException<Exn>([&] (Exn& e) {
302           try {
303             auto f2 = (*funcm)(e);
304             f2.setCallback_([pm](Try<T>&& t2) mutable {
305               pm->setTry(std::move(t2));
306             });
307           } catch (const std::exception& e2) {
308             pm->setException(exception_wrapper(std::current_exception(), e2));
309           } catch (...) {
310             pm->setException(exception_wrapper(std::current_exception()));
311           }
312         })) {
313       pm->setTry(std::move(t));
314     }
315   });
316
317   return f;
318 }
319
320 template <class T>
321 template <class F>
322 Future<T> Future<T>::ensure(F func) {
323   MoveWrapper<F> funcw(std::move(func));
324   return this->then([funcw](Try<T>&& t) mutable {
325     (*funcw)();
326     return makeFuture(std::move(t));
327   });
328 }
329
330 template <class T>
331 template <class F>
332 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
333   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
334   return within(dur, tk)
335     .onError([funcw](TimedOut const&) { return (*funcw)(); });
336 }
337
338 template <class T>
339 template <class F>
340 typename std::enable_if<
341   detail::callableWith<F, exception_wrapper>::value &&
342   detail::Extract<F>::ReturnsFuture::value,
343   Future<T>>::type
344 Future<T>::onError(F&& func) {
345   static_assert(
346       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
347       "Return type of onError callback must be T or Future<T>");
348
349   Promise<T> p;
350   auto f = p.getFuture();
351   auto pm = folly::makeMoveWrapper(std::move(p));
352   auto funcm = folly::makeMoveWrapper(std::move(func));
353   setCallback_([pm, funcm](Try<T> t) mutable {
354     if (t.hasException()) {
355       try {
356         auto f2 = (*funcm)(std::move(t.exception()));
357         f2.setCallback_([pm](Try<T> t2) mutable {
358           pm->setTry(std::move(t2));
359         });
360       } catch (const std::exception& e2) {
361         pm->setException(exception_wrapper(std::current_exception(), e2));
362       } catch (...) {
363         pm->setException(exception_wrapper(std::current_exception()));
364       }
365     } else {
366       pm->setTry(std::move(t));
367     }
368   });
369
370   return f;
371 }
372
373 // onError(exception_wrapper) that returns T
374 template <class T>
375 template <class F>
376 typename std::enable_if<
377   detail::callableWith<F, exception_wrapper>::value &&
378   !detail::Extract<F>::ReturnsFuture::value,
379   Future<T>>::type
380 Future<T>::onError(F&& func) {
381   static_assert(
382       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
383       "Return type of onError callback must be T or Future<T>");
384
385   Promise<T> p;
386   auto f = p.getFuture();
387   auto pm = folly::makeMoveWrapper(std::move(p));
388   auto funcm = folly::makeMoveWrapper(std::move(func));
389   setCallback_([pm, funcm](Try<T> t) mutable {
390     if (t.hasException()) {
391       pm->setWith([&]{
392         return (*funcm)(std::move(t.exception()));
393       });
394     } else {
395       pm->setTry(std::move(t));
396     }
397   });
398
399   return f;
400 }
401
402 template <class T>
403 typename std::add_lvalue_reference<T>::type Future<T>::value() {
404   throwIfInvalid();
405
406   return core_->getTry().value();
407 }
408
409 template <class T>
410 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
411   throwIfInvalid();
412
413   return core_->getTry().value();
414 }
415
416 template <class T>
417 Try<T>& Future<T>::getTry() {
418   throwIfInvalid();
419
420   return core_->getTry();
421 }
422
423 template <class T>
424 Optional<Try<T>> Future<T>::poll() {
425   Optional<Try<T>> o;
426   if (core_->ready()) {
427     o = std::move(core_->getTry());
428   }
429   return o;
430 }
431
432 template <class T>
433 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
434   throwIfInvalid();
435
436   setExecutor(executor, priority);
437
438   return std::move(*this);
439 }
440
441 template <class T>
442 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
443   throwIfInvalid();
444
445   MoveWrapper<Promise<T>> p;
446   auto f = p->getFuture();
447   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
448   return std::move(f).via(executor, priority);
449 }
450
451
452 template <class Func>
453 auto via(Executor* x, Func func)
454   -> Future<typename isFuture<decltype(func())>::Inner>
455 {
456   // TODO make this actually more performant. :-P #7260175
457   return via(x).then(func);
458 }
459
460 template <class T>
461 bool Future<T>::isReady() const {
462   throwIfInvalid();
463   return core_->ready();
464 }
465
466 template <class T>
467 bool Future<T>::hasValue() {
468   return getTry().hasValue();
469 }
470
471 template <class T>
472 bool Future<T>::hasException() {
473   return getTry().hasException();
474 }
475
476 template <class T>
477 void Future<T>::raise(exception_wrapper exception) {
478   core_->raise(std::move(exception));
479 }
480
481 // makeFuture
482
483 template <class T>
484 Future<typename std::decay<T>::type> makeFuture(T&& t) {
485   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
486 }
487
488 inline // for multiple translation units
489 Future<Unit> makeFuture() {
490   return makeFuture(Unit{});
491 }
492
493 // makeFutureWith(Future<T>()) -> Future<T>
494 template <class F>
495 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
496                         typename std::result_of<F()>::type>::type
497 makeFutureWith(F&& func) {
498   using InnerType =
499       typename isFuture<typename std::result_of<F()>::type>::Inner;
500   try {
501     return func();
502   } catch (std::exception& e) {
503     return makeFuture<InnerType>(
504         exception_wrapper(std::current_exception(), e));
505   } catch (...) {
506     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
507   }
508 }
509
510 // makeFutureWith(T()) -> Future<T>
511 // makeFutureWith(void()) -> Future<Unit>
512 template <class F>
513 typename std::enable_if<
514     !(isFuture<typename std::result_of<F()>::type>::value),
515     Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
516 makeFutureWith(F&& func) {
517   using LiftedResult =
518       typename Unit::Lift<typename std::result_of<F()>::type>::type;
519   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
520     return func();
521   }));
522 }
523
524 template <class T>
525 Future<T> makeFuture(std::exception_ptr const& e) {
526   return makeFuture(Try<T>(e));
527 }
528
529 template <class T>
530 Future<T> makeFuture(exception_wrapper ew) {
531   return makeFuture(Try<T>(std::move(ew)));
532 }
533
534 template <class T, class E>
535 typename std::enable_if<std::is_base_of<std::exception, E>::value,
536                         Future<T>>::type
537 makeFuture(E const& e) {
538   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
539 }
540
541 template <class T>
542 Future<T> makeFuture(Try<T>&& t) {
543   return Future<T>(new detail::Core<T>(std::move(t)));
544 }
545
546 // via
547 Future<Unit> via(Executor* executor, int8_t priority) {
548   return makeFuture().via(executor, priority);
549 }
550
551 // mapSetCallback calls func(i, Try<T>) when every future completes
552
553 template <class T, class InputIterator, class F>
554 void mapSetCallback(InputIterator first, InputIterator last, F func) {
555   for (size_t i = 0; first != last; ++first, ++i) {
556     first->setCallback_([func, i](Try<T>&& t) {
557       func(i, std::move(t));
558     });
559   }
560 }
561
562 // collectAll (variadic)
563
564 template <typename... Fs>
565 typename detail::CollectAllVariadicContext<
566   typename std::decay<Fs>::type::value_type...>::type
567 collectAll(Fs&&... fs) {
568   auto ctx = std::make_shared<detail::CollectAllVariadicContext<
569     typename std::decay<Fs>::type::value_type...>>();
570   detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
571     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
572   return ctx->p.getFuture();
573 }
574
575 // collectAll (iterator)
576
577 template <class InputIterator>
578 Future<
579   std::vector<
580   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
581 collectAll(InputIterator first, InputIterator last) {
582   typedef
583     typename std::iterator_traits<InputIterator>::value_type::value_type T;
584
585   struct CollectAllContext {
586     CollectAllContext(int n) : results(n) {}
587     ~CollectAllContext() {
588       p.setValue(std::move(results));
589     }
590     Promise<std::vector<Try<T>>> p;
591     std::vector<Try<T>> results;
592   };
593
594   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
595   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
596     ctx->results[i] = std::move(t);
597   });
598   return ctx->p.getFuture();
599 }
600
601 // collect (iterator)
602
603 namespace detail {
604
605 template <typename T>
606 struct CollectContext {
607   struct Nothing {
608     explicit Nothing(int /* n */) {}
609   };
610
611   using Result = typename std::conditional<
612     std::is_void<T>::value,
613     void,
614     std::vector<T>>::type;
615
616   using InternalResult = typename std::conditional<
617     std::is_void<T>::value,
618     Nothing,
619     std::vector<Optional<T>>>::type;
620
621   explicit CollectContext(int n) : result(n) {}
622   ~CollectContext() {
623     if (!threw.exchange(true)) {
624       // map Optional<T> -> T
625       std::vector<T> finalResult;
626       finalResult.reserve(result.size());
627       std::transform(result.begin(), result.end(),
628                      std::back_inserter(finalResult),
629                      [](Optional<T>& o) { return std::move(o.value()); });
630       p.setValue(std::move(finalResult));
631     }
632   }
633   inline void setPartialResult(size_t i, Try<T>& t) {
634     result[i] = std::move(t.value());
635   }
636   Promise<Result> p;
637   InternalResult result;
638   std::atomic<bool> threw {false};
639 };
640
641 }
642
643 template <class InputIterator>
644 Future<typename detail::CollectContext<
645   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
646 collect(InputIterator first, InputIterator last) {
647   typedef
648     typename std::iterator_traits<InputIterator>::value_type::value_type T;
649
650   auto ctx = std::make_shared<detail::CollectContext<T>>(
651     std::distance(first, last));
652   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
653     if (t.hasException()) {
654        if (!ctx->threw.exchange(true)) {
655          ctx->p.setException(std::move(t.exception()));
656        }
657      } else if (!ctx->threw) {
658        ctx->setPartialResult(i, t);
659      }
660   });
661   return ctx->p.getFuture();
662 }
663
664 // collect (variadic)
665
666 template <typename... Fs>
667 typename detail::CollectVariadicContext<
668   typename std::decay<Fs>::type::value_type...>::type
669 collect(Fs&&... fs) {
670   auto ctx = std::make_shared<detail::CollectVariadicContext<
671     typename std::decay<Fs>::type::value_type...>>();
672   detail::collectVariadicHelper<detail::CollectVariadicContext>(
673     ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
674   return ctx->p.getFuture();
675 }
676
677 // collectAny (iterator)
678
679 template <class InputIterator>
680 Future<
681   std::pair<size_t,
682             Try<
683               typename
684               std::iterator_traits<InputIterator>::value_type::value_type>>>
685 collectAny(InputIterator first, InputIterator last) {
686   typedef
687     typename std::iterator_traits<InputIterator>::value_type::value_type T;
688
689   struct CollectAnyContext {
690     CollectAnyContext() {};
691     Promise<std::pair<size_t, Try<T>>> p;
692     std::atomic<bool> done {false};
693   };
694
695   auto ctx = std::make_shared<CollectAnyContext>();
696   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
697     if (!ctx->done.exchange(true)) {
698       ctx->p.setValue(std::make_pair(i, std::move(t)));
699     }
700   });
701   return ctx->p.getFuture();
702 }
703
704 // collectN (iterator)
705
706 template <class InputIterator>
707 Future<std::vector<std::pair<size_t, Try<typename
708   std::iterator_traits<InputIterator>::value_type::value_type>>>>
709 collectN(InputIterator first, InputIterator last, size_t n) {
710   typedef typename
711     std::iterator_traits<InputIterator>::value_type::value_type T;
712   typedef std::vector<std::pair<size_t, Try<T>>> V;
713
714   struct CollectNContext {
715     V v;
716     std::atomic<size_t> completed = {0};
717     Promise<V> p;
718   };
719   auto ctx = std::make_shared<CollectNContext>();
720
721   if (size_t(std::distance(first, last)) < n) {
722     ctx->p.setException(std::runtime_error("Not enough futures"));
723   } else {
724     // for each completed Future, increase count and add to vector, until we
725     // have n completed futures at which point we fulfil our Promise with the
726     // vector
727     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
728       auto c = ++ctx->completed;
729       if (c <= n) {
730         assert(ctx->v.size() < n);
731         ctx->v.emplace_back(i, std::move(t));
732         if (c == n) {
733           ctx->p.setTry(Try<V>(std::move(ctx->v)));
734         }
735       }
736     });
737   }
738
739   return ctx->p.getFuture();
740 }
741
742 // reduce (iterator)
743
744 template <class It, class T, class F>
745 Future<T> reduce(It first, It last, T&& initial, F&& func) {
746   if (first == last) {
747     return makeFuture(std::move(initial));
748   }
749
750   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
751   typedef typename std::conditional<
752     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
753   typedef isTry<Arg> IsTry;
754
755   folly::MoveWrapper<T> minitial(std::move(initial));
756   auto sfunc = std::make_shared<F>(std::move(func));
757
758   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
759     return (*sfunc)(std::move(*minitial),
760                 head.template get<IsTry::value, Arg&&>());
761   });
762
763   for (++first; first != last; ++first) {
764     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
765       return (*sfunc)(std::move(std::get<0>(t).value()),
766                   // Either return a ItT&& or a Try<ItT>&& depending
767                   // on the type of the argument of func.
768                   std::get<1>(t).template get<IsTry::value, Arg&&>());
769     });
770   }
771
772   return f;
773 }
774
775 // window (collection)
776
777 template <class Collection, class F, class ItT, class Result>
778 std::vector<Future<Result>>
779 window(Collection input, F func, size_t n) {
780   struct WindowContext {
781     WindowContext(Collection&& i, F&& fn)
782         : input_(std::move(i)), promises_(input_.size()),
783           func_(std::move(fn))
784       {}
785     std::atomic<size_t> i_ {0};
786     Collection input_;
787     std::vector<Promise<Result>> promises_;
788     F func_;
789
790     static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
791       size_t i = ctx->i_++;
792       if (i < ctx->input_.size()) {
793         // Using setCallback_ directly since we don't need the Future
794         ctx->func_(std::move(ctx->input_[i])).setCallback_(
795           // ctx is captured by value
796           [ctx, i](Try<Result>&& t) {
797             ctx->promises_[i].setTry(std::move(t));
798             // Chain another future onto this one
799             spawn(std::move(ctx));
800           });
801       }
802     }
803   };
804
805   auto max = std::min(n, input.size());
806
807   auto ctx = std::make_shared<WindowContext>(
808     std::move(input), std::move(func));
809
810   for (size_t i = 0; i < max; ++i) {
811     // Start the first n Futures
812     WindowContext::spawn(ctx);
813   }
814
815   std::vector<Future<Result>> futures;
816   futures.reserve(ctx->promises_.size());
817   for (auto& promise : ctx->promises_) {
818     futures.emplace_back(promise.getFuture());
819   }
820
821   return futures;
822 }
823
824 // reduce
825
826 template <class T>
827 template <class I, class F>
828 Future<I> Future<T>::reduce(I&& initial, F&& func) {
829   folly::MoveWrapper<I> minitial(std::move(initial));
830   folly::MoveWrapper<F> mfunc(std::move(func));
831   return then([minitial, mfunc](T& vals) mutable {
832     auto ret = std::move(*minitial);
833     for (auto& val : vals) {
834       ret = (*mfunc)(std::move(ret), std::move(val));
835     }
836     return ret;
837   });
838 }
839
840 // unorderedReduce (iterator)
841
842 template <class It, class T, class F, class ItT, class Arg>
843 Future<T> unorderedReduce(It first, It last, T initial, F func) {
844   if (first == last) {
845     return makeFuture(std::move(initial));
846   }
847
848   typedef isTry<Arg> IsTry;
849
850   struct UnorderedReduceContext {
851     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
852         : lock_(), memo_(makeFuture<T>(std::move(memo))),
853           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
854       {};
855     folly::MicroSpinLock lock_; // protects memo_ and numThens_
856     Future<T> memo_;
857     F func_;
858     size_t numThens_; // how many Futures completed and called .then()
859     size_t numFutures_; // how many Futures in total
860     Promise<T> promise_;
861   };
862
863   auto ctx = std::make_shared<UnorderedReduceContext>(
864     std::move(initial), std::move(func), std::distance(first, last));
865
866   mapSetCallback<ItT>(
867       first,
868       last,
869       [ctx](size_t /* i */, Try<ItT>&& t) {
870         folly::MoveWrapper<Try<ItT>> mt(std::move(t));
871         // Futures can be completed in any order, simultaneously.
872         // To make this non-blocking, we create a new Future chain in
873         // the order of completion to reduce the values.
874         // The spinlock just protects chaining a new Future, not actually
875         // executing the reduce, which should be really fast.
876         folly::MSLGuard lock(ctx->lock_);
877         ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
878           // Either return a ItT&& or a Try<ItT>&& depending
879           // on the type of the argument of func.
880           return ctx->func_(std::move(v),
881                             mt->template get<IsTry::value, Arg&&>());
882         });
883         if (++ctx->numThens_ == ctx->numFutures_) {
884           // After reducing the value of the last Future, fulfill the Promise
885           ctx->memo_.setCallback_(
886               [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
887         }
888       });
889
890   return ctx->promise_.getFuture();
891 }
892
893 // within
894
895 template <class T>
896 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
897   return within(dur, TimedOut(), tk);
898 }
899
900 template <class T>
901 template <class E>
902 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
903
904   struct Context {
905     Context(E ex) : exception(std::move(ex)), promise() {}
906     E exception;
907     Future<Unit> thisFuture;
908     Promise<T> promise;
909     std::atomic<bool> token {false};
910   };
911
912   std::shared_ptr<Timekeeper> tks;
913   if (!tk) {
914     tks = folly::detail::getTimekeeperSingleton();
915     tk = DCHECK_NOTNULL(tks.get());
916   }
917
918   auto ctx = std::make_shared<Context>(std::move(e));
919
920   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
921     // TODO: "this" completed first, cancel "after"
922     if (ctx->token.exchange(true) == false) {
923       ctx->promise.setTry(std::move(t));
924     }
925   });
926
927   tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
928     // "after" completed first, cancel "this"
929     ctx->thisFuture.raise(TimedOut());
930     if (ctx->token.exchange(true) == false) {
931       if (t.hasException()) {
932         ctx->promise.setException(std::move(t.exception()));
933       } else {
934         ctx->promise.setException(std::move(ctx->exception));
935       }
936     }
937   });
938
939   return ctx->promise.getFuture().via(getExecutor());
940 }
941
942 // delayed
943
944 template <class T>
945 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
946   return collectAll(*this, futures::sleep(dur, tk))
947     .then([](std::tuple<Try<T>, Try<Unit>> tup) {
948       Try<T>& t = std::get<0>(tup);
949       return makeFuture<T>(std::move(t));
950     });
951 }
952
953 namespace detail {
954
955 template <class T>
956 void waitImpl(Future<T>& f) {
957   // short-circuit if there's nothing to do
958   if (f.isReady()) return;
959
960   FutureBatonType baton;
961   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
962   baton.wait();
963   assert(f.isReady());
964 }
965
966 template <class T>
967 void waitImpl(Future<T>& f, Duration dur) {
968   // short-circuit if there's nothing to do
969   if (f.isReady()) return;
970
971   folly::MoveWrapper<Promise<T>> promise;
972   auto ret = promise->getFuture();
973   auto baton = std::make_shared<FutureBatonType>();
974   f.setCallback_([baton, promise](Try<T>&& t) mutable {
975     promise->setTry(std::move(t));
976     baton->post();
977   });
978   f = std::move(ret);
979   if (baton->timed_wait(dur)) {
980     assert(f.isReady());
981   }
982 }
983
984 template <class T>
985 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
986   while (!f.isReady()) {
987     e->drive();
988   }
989 }
990
991 } // detail
992
993 template <class T>
994 Future<T>& Future<T>::wait() & {
995   detail::waitImpl(*this);
996   return *this;
997 }
998
999 template <class T>
1000 Future<T>&& Future<T>::wait() && {
1001   detail::waitImpl(*this);
1002   return std::move(*this);
1003 }
1004
1005 template <class T>
1006 Future<T>& Future<T>::wait(Duration dur) & {
1007   detail::waitImpl(*this, dur);
1008   return *this;
1009 }
1010
1011 template <class T>
1012 Future<T>&& Future<T>::wait(Duration dur) && {
1013   detail::waitImpl(*this, dur);
1014   return std::move(*this);
1015 }
1016
1017 template <class T>
1018 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1019   detail::waitViaImpl(*this, e);
1020   return *this;
1021 }
1022
1023 template <class T>
1024 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1025   detail::waitViaImpl(*this, e);
1026   return std::move(*this);
1027 }
1028
1029 template <class T>
1030 T Future<T>::get() {
1031   return std::move(wait().value());
1032 }
1033
1034 template <class T>
1035 T Future<T>::get(Duration dur) {
1036   wait(dur);
1037   if (isReady()) {
1038     return std::move(value());
1039   } else {
1040     throw TimedOut();
1041   }
1042 }
1043
1044 template <class T>
1045 T Future<T>::getVia(DrivableExecutor* e) {
1046   return std::move(waitVia(e).value());
1047 }
1048
1049 namespace detail {
1050   template <class T>
1051   struct TryEquals {
1052     static bool equals(const Try<T>& t1, const Try<T>& t2) {
1053       return t1.value() == t2.value();
1054     }
1055   };
1056 }
1057
1058 template <class T>
1059 Future<bool> Future<T>::willEqual(Future<T>& f) {
1060   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1061     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1062       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1063     } else {
1064       return false;
1065       }
1066   });
1067 }
1068
1069 template <class T>
1070 template <class F>
1071 Future<T> Future<T>::filter(F predicate) {
1072   auto p = folly::makeMoveWrapper(std::move(predicate));
1073   return this->then([p](T val) {
1074     T const& valConstRef = val;
1075     if (!(*p)(valConstRef)) {
1076       throw PredicateDoesNotObtain();
1077     }
1078     return val;
1079   });
1080 }
1081
1082 template <class T>
1083 template <class Callback>
1084 auto Future<T>::thenMulti(Callback&& fn)
1085     -> decltype(this->then(std::forward<Callback>(fn))) {
1086   // thenMulti with one callback is just a then
1087   return then(std::forward<Callback>(fn));
1088 }
1089
1090 template <class T>
1091 template <class Callback, class... Callbacks>
1092 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1093     -> decltype(this->then(std::forward<Callback>(fn)).
1094                       thenMulti(std::forward<Callbacks>(fns)...)) {
1095   // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1096   return then(std::forward<Callback>(fn)).
1097          thenMulti(std::forward<Callbacks>(fns)...);
1098 }
1099
1100 template <class T>
1101 template <class Callback, class... Callbacks>
1102 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1103                                       Callbacks&&... fns)
1104     -> decltype(this->then(std::forward<Callback>(fn)).
1105                       thenMulti(std::forward<Callbacks>(fns)...)) {
1106   // thenMultiExecutor with two callbacks is
1107   // via(x).then(a).thenMulti(b, ...).via(oldX)
1108   auto oldX = getExecutor();
1109   setExecutor(x);
1110   return then(std::forward<Callback>(fn)).
1111          thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1112 }
1113
1114 template <class T>
1115 template <class Callback>
1116 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1117     -> decltype(this->then(std::forward<Callback>(fn))) {
1118   // thenMulti with one callback is just a then with an executor
1119   return then(x, std::forward<Callback>(fn));
1120 }
1121
1122 template <class F>
1123 inline Future<Unit> when(bool p, F thunk) {
1124   return p ? thunk().unit() : makeFuture();
1125 }
1126
1127 template <class P, class F>
1128 Future<Unit> whileDo(P predicate, F thunk) {
1129   if (predicate()) {
1130     return thunk().then([=] {
1131       return whileDo(predicate, thunk);
1132     });
1133   }
1134   return makeFuture();
1135 }
1136
1137 template <class F>
1138 Future<Unit> times(const int n, F thunk) {
1139   auto count = folly::makeMoveWrapper(
1140     std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
1141   );
1142   return folly::whileDo([=]() mutable {
1143       return (*count)->fetch_add(1) < n;
1144     }, thunk);
1145 }
1146
1147 namespace futures {
1148   template <class It, class F, class ItT, class Result>
1149   std::vector<Future<Result>> map(It first, It last, F func) {
1150     std::vector<Future<Result>> results;
1151     for (auto it = first; it != last; it++) {
1152       results.push_back(it->then(func));
1153     }
1154     return results;
1155   }
1156 }
1157
1158 namespace futures {
1159
1160 namespace detail {
1161
1162 struct retrying_policy_raw_tag {};
1163 struct retrying_policy_fut_tag {};
1164
1165 template <class Policy>
1166 struct retrying_policy_traits {
1167   using ew = exception_wrapper;
1168   FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1169   template <class Ret>
1170   using has_op = typename std::integral_constant<bool,
1171         has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1172         has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1173   using is_raw = has_op<bool>;
1174   using is_fut = has_op<Future<bool>>;
1175   using tag = typename std::conditional<
1176         is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1177         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1178 };
1179
1180 template <class Policy, class FF>
1181 typename std::result_of<FF(size_t)>::type
1182 retrying(size_t k, Policy&& p, FF&& ff) {
1183   using F = typename std::result_of<FF(size_t)>::type;
1184   using T = typename F::value_type;
1185   auto f = ff(k++);
1186   auto pm = makeMoveWrapper(p);
1187   auto ffm = makeMoveWrapper(ff);
1188   return f.onError([=](exception_wrapper x) mutable {
1189       auto q = (*pm)(k, x);
1190       auto xm = makeMoveWrapper(std::move(x));
1191       return q.then([=](bool r) mutable {
1192           return r
1193             ? retrying(k, pm.move(), ffm.move())
1194             : makeFuture<T>(xm.move());
1195       });
1196   });
1197 }
1198
1199 template <class Policy, class FF>
1200 typename std::result_of<FF(size_t)>::type
1201 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1202   auto pm = makeMoveWrapper(std::move(p));
1203   auto q = [=](size_t k, exception_wrapper x) {
1204     return makeFuture<bool>((*pm)(k, x));
1205   };
1206   return retrying(0, std::move(q), std::forward<FF>(ff));
1207 }
1208
1209 template <class Policy, class FF>
1210 typename std::result_of<FF(size_t)>::type
1211 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1212   return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1213 }
1214
1215 //  jittered exponential backoff, clamped to [backoff_min, backoff_max]
1216 template <class URNG>
1217 Duration retryingJitteredExponentialBackoffDur(
1218     size_t n,
1219     Duration backoff_min,
1220     Duration backoff_max,
1221     double jitter_param,
1222     URNG& rng) {
1223   using d = Duration;
1224   auto dist = std::normal_distribution<double>(0.0, jitter_param);
1225   auto jitter = std::exp(dist(rng));
1226   auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1227   return std::max(backoff_min, std::min(backoff_max, backoff));
1228 }
1229
1230 template <class Policy, class URNG>
1231 std::function<Future<bool>(size_t, const exception_wrapper&)>
1232 retryingPolicyCappedJitteredExponentialBackoff(
1233     size_t max_tries,
1234     Duration backoff_min,
1235     Duration backoff_max,
1236     double jitter_param,
1237     URNG rng,
1238     Policy&& p) {
1239   auto pm = makeMoveWrapper(std::move(p));
1240   auto rngp = std::make_shared<URNG>(std::move(rng));
1241   return [=](size_t n, const exception_wrapper& ex) mutable {
1242     if (n == max_tries) { return makeFuture(false); }
1243     return (*pm)(n, ex).then([=](bool v) {
1244         if (!v) { return makeFuture(false); }
1245         auto backoff = detail::retryingJitteredExponentialBackoffDur(
1246             n, backoff_min, backoff_max, jitter_param, *rngp);
1247         return futures::sleep(backoff).then([] { return true; });
1248     });
1249   };
1250 }
1251
1252 template <class Policy, class URNG>
1253 std::function<Future<bool>(size_t, const exception_wrapper&)>
1254 retryingPolicyCappedJitteredExponentialBackoff(
1255     size_t max_tries,
1256     Duration backoff_min,
1257     Duration backoff_max,
1258     double jitter_param,
1259     URNG rng,
1260     Policy&& p,
1261     retrying_policy_raw_tag) {
1262   auto pm = makeMoveWrapper(std::move(p));
1263   auto q = [=](size_t n, const exception_wrapper& e) {
1264     return makeFuture((*pm)(n, e));
1265   };
1266   return retryingPolicyCappedJitteredExponentialBackoff(
1267       max_tries,
1268       backoff_min,
1269       backoff_max,
1270       jitter_param,
1271       std::move(rng),
1272       std::move(q));
1273 }
1274
1275 template <class Policy, class URNG>
1276 std::function<Future<bool>(size_t, const exception_wrapper&)>
1277 retryingPolicyCappedJitteredExponentialBackoff(
1278     size_t max_tries,
1279     Duration backoff_min,
1280     Duration backoff_max,
1281     double jitter_param,
1282     URNG rng,
1283     Policy&& p,
1284     retrying_policy_fut_tag) {
1285   return retryingPolicyCappedJitteredExponentialBackoff(
1286       max_tries,
1287       backoff_min,
1288       backoff_max,
1289       jitter_param,
1290       std::move(rng),
1291       std::move(p));
1292 }
1293
1294 }
1295
1296 template <class Policy, class FF>
1297 typename std::result_of<FF(size_t)>::type
1298 retrying(Policy&& p, FF&& ff) {
1299   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1300   return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1301 }
1302
1303 inline
1304 std::function<bool(size_t, const exception_wrapper&)>
1305 retryingPolicyBasic(
1306     size_t max_tries) {
1307   return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1308 }
1309
1310 template <class Policy, class URNG>
1311 std::function<Future<bool>(size_t, const exception_wrapper&)>
1312 retryingPolicyCappedJitteredExponentialBackoff(
1313     size_t max_tries,
1314     Duration backoff_min,
1315     Duration backoff_max,
1316     double jitter_param,
1317     URNG rng,
1318     Policy&& p) {
1319   using tag = typename detail::retrying_policy_traits<Policy>::tag;
1320   return detail::retryingPolicyCappedJitteredExponentialBackoff(
1321       max_tries,
1322       backoff_min,
1323       backoff_max,
1324       jitter_param,
1325       std::move(rng),
1326       std::move(p),
1327       tag());
1328 }
1329
1330 inline
1331 std::function<Future<bool>(size_t, const exception_wrapper&)>
1332 retryingPolicyCappedJitteredExponentialBackoff(
1333     size_t max_tries,
1334     Duration backoff_min,
1335     Duration backoff_max,
1336     double jitter_param) {
1337   auto p = [](size_t, const exception_wrapper&) { return true; };
1338   return retryingPolicyCappedJitteredExponentialBackoff(
1339       max_tries,
1340       backoff_min,
1341       backoff_max,
1342       jitter_param,
1343       ThreadLocalPRNG(),
1344       std::move(p));
1345 }
1346
1347 }
1348
1349 // Instantiate the most common Future types to save compile time
1350 extern template class Future<Unit>;
1351 extern template class Future<bool>;
1352 extern template class Future<int>;
1353 extern template class Future<int64_t>;
1354 extern template class Future<std::string>;
1355 extern template class Future<double>;
1356
1357 } // namespace folly