Fix race condition in collect(..)
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
26
27 namespace folly {
28
29 class Timekeeper;
30
31 namespace detail {
32   Timekeeper* getTimekeeperSingleton();
33 }
34
35 template <class T>
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37   other.core_ = nullptr;
38 }
39
40 template <class T>
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42   std::swap(core_, other.core_);
43   return *this;
44 }
45
46 template <class T>
47 template <class T2,
48           typename std::enable_if<!isFuture<T2>::value, void*>::type>
49 Future<T>::Future(T2&& val) : core_(nullptr) {
50   Promise<T> p;
51   p.setValue(std::forward<T2>(val));
52   *this = p.getFuture();
53 }
54
55 template <class T>
56 template <class T2,
57           typename std::enable_if<
58             folly::is_void_or_unit<T2>::value,
59             int>::type>
60 Future<T>::Future() : core_(nullptr) {
61   Promise<T> p;
62   p.setValue();
63   *this = p.getFuture();
64 }
65
66
67 template <class T>
68 Future<T>::~Future() {
69   detach();
70 }
71
72 template <class T>
73 void Future<T>::detach() {
74   if (core_) {
75     core_->detachFuture();
76     core_ = nullptr;
77   }
78 }
79
80 template <class T>
81 void Future<T>::throwIfInvalid() const {
82   if (!core_)
83     throw NoState();
84 }
85
86 template <class T>
87 template <class F>
88 void Future<T>::setCallback_(F&& func) {
89   throwIfInvalid();
90   core_->setCallback(std::move(func));
91 }
92
93 // unwrap
94
95 template <class T>
96 template <class F>
97 typename std::enable_if<isFuture<F>::value,
98                         Future<typename isFuture<T>::Inner>>::type
99 Future<T>::unwrap() {
100   return then([](Future<typename isFuture<T>::Inner> internal_future) {
101       return internal_future;
102   });
103 }
104
105 // then
106
107 // Variant: returns a value
108 // e.g. f.then([](Try<T>&& t){ return t.value(); });
109 template <class T>
110 template <typename F, typename R, bool isTry, typename... Args>
111 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
112 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
113   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
114   typedef typename R::ReturnsFuture::Inner B;
115
116   throwIfInvalid();
117
118   // wrap these so we can move them into the lambda
119   folly::MoveWrapper<Promise<B>> p;
120   folly::MoveWrapper<F> funcm(std::forward<F>(func));
121
122   // grab the Future now before we lose our handle on the Promise
123   auto f = p->getFuture();
124   if (getExecutor()) {
125     f.setExecutor(getExecutor());
126   }
127
128   /* This is a bit tricky.
129
130      We can't just close over *this in case this Future gets moved. So we
131      make a new dummy Future. We could figure out something more
132      sophisticated that avoids making a new Future object when it can, as an
133      optimization. But this is correct.
134
135      core_ can't be moved, it is explicitly disallowed (as is copying). But
136      if there's ever a reason to allow it, this is one place that makes that
137      assumption and would need to be fixed. We use a standard shared pointer
138      for core_ (by copying it in), which means in essence obj holds a shared
139      pointer to itself.  But this shouldn't leak because Promise will not
140      outlive the continuation, because Promise will setException() with a
141      broken Promise if it is destructed before completed. We could use a
142      weak pointer but it would have to be converted to a shared pointer when
143      func is executed (because the Future returned by func may possibly
144      persist beyond the callback, if it gets moved), and so it is an
145      optimization to just make it shared from the get-go.
146
147      We have to move in the Promise and func using the MoveWrapper
148      hack. (func could be copied but it's a big drag on perf).
149
150      Two subtle but important points about this design. detail::Core has no
151      back pointers to Future or Promise, so if Future or Promise get moved
152      (and they will be moved in performant code) we don't have to do
153      anything fancy. And because we store the continuation in the
154      detail::Core, not in the Future, we can execute the continuation even
155      after the Future has gone out of scope. This is an intentional design
156      decision. It is likely we will want to be able to cancel a continuation
157      in some circumstances, but I think it should be explicit not implicit
158      in the destruction of the Future used to create it.
159      */
160   setCallback_(
161     [p, funcm](Try<T>&& t) mutable {
162       if (!isTry && t.hasException()) {
163         p->setException(std::move(t.exception()));
164       } else {
165         p->setWith([&]() {
166           return (*funcm)(t.template get<isTry, Args>()...);
167         });
168       }
169     });
170
171   return f;
172 }
173
174 // Variant: returns a Future
175 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
176 template <class T>
177 template <typename F, typename R, bool isTry, typename... Args>
178 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
179 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
180   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
181   typedef typename R::ReturnsFuture::Inner B;
182
183   throwIfInvalid();
184
185   // wrap these so we can move them into the lambda
186   folly::MoveWrapper<Promise<B>> p;
187   folly::MoveWrapper<F> funcm(std::forward<F>(func));
188
189   // grab the Future now before we lose our handle on the Promise
190   auto f = p->getFuture();
191   if (getExecutor()) {
192     f.setExecutor(getExecutor());
193   }
194
195   setCallback_(
196     [p, funcm](Try<T>&& t) mutable {
197       if (!isTry && t.hasException()) {
198         p->setException(std::move(t.exception()));
199       } else {
200         try {
201           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
202           // that didn't throw, now we can steal p
203           f2.setCallback_([p](Try<B>&& b) mutable {
204             p->setTry(std::move(b));
205           });
206         } catch (const std::exception& e) {
207           p->setException(exception_wrapper(std::current_exception(), e));
208         } catch (...) {
209           p->setException(exception_wrapper(std::current_exception()));
210         }
211       }
212     });
213
214   return f;
215 }
216
217 template <typename T>
218 template <typename R, typename Caller, typename... Args>
219   Future<typename isFuture<R>::Inner>
220 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
221   typedef typename std::remove_cv<
222     typename std::remove_reference<
223       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
224   return then([instance, func](Try<T>&& t){
225     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
226   });
227 }
228
229 // TODO(6838553)
230 #ifndef __clang__
231 template <class T>
232 template <class... Args>
233 auto Future<T>::then(Executor* x, Args&&... args)
234   -> decltype(this->then(std::forward<Args>(args)...))
235 {
236   auto oldX = getExecutor();
237   setExecutor(x);
238   return this->then(std::forward<Args>(args)...).via(oldX);
239 }
240 #endif
241
242 template <class T>
243 Future<void> Future<T>::then() {
244   return then([] (Try<T>&& t) {});
245 }
246
247 // onError where the callback returns T
248 template <class T>
249 template <class F>
250 typename std::enable_if<
251   !detail::callableWith<F, exception_wrapper>::value &&
252   !detail::Extract<F>::ReturnsFuture::value,
253   Future<T>>::type
254 Future<T>::onError(F&& func) {
255   typedef typename detail::Extract<F>::FirstArg Exn;
256   static_assert(
257       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
258       "Return type of onError callback must be T or Future<T>");
259
260   Promise<T> p;
261   auto f = p.getFuture();
262   auto pm = folly::makeMoveWrapper(std::move(p));
263   auto funcm = folly::makeMoveWrapper(std::move(func));
264   setCallback_([pm, funcm](Try<T>&& t) mutable {
265     if (!t.template withException<Exn>([&] (Exn& e) {
266           pm->setWith([&]{
267             return (*funcm)(e);
268           });
269         })) {
270       pm->setTry(std::move(t));
271     }
272   });
273
274   return f;
275 }
276
277 // onError where the callback returns Future<T>
278 template <class T>
279 template <class F>
280 typename std::enable_if<
281   !detail::callableWith<F, exception_wrapper>::value &&
282   detail::Extract<F>::ReturnsFuture::value,
283   Future<T>>::type
284 Future<T>::onError(F&& func) {
285   static_assert(
286       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
287       "Return type of onError callback must be T or Future<T>");
288   typedef typename detail::Extract<F>::FirstArg Exn;
289
290   Promise<T> p;
291   auto f = p.getFuture();
292   auto pm = folly::makeMoveWrapper(std::move(p));
293   auto funcm = folly::makeMoveWrapper(std::move(func));
294   setCallback_([pm, funcm](Try<T>&& t) mutable {
295     if (!t.template withException<Exn>([&] (Exn& e) {
296           try {
297             auto f2 = (*funcm)(e);
298             f2.setCallback_([pm](Try<T>&& t2) mutable {
299               pm->setTry(std::move(t2));
300             });
301           } catch (const std::exception& e2) {
302             pm->setException(exception_wrapper(std::current_exception(), e2));
303           } catch (...) {
304             pm->setException(exception_wrapper(std::current_exception()));
305           }
306         })) {
307       pm->setTry(std::move(t));
308     }
309   });
310
311   return f;
312 }
313
314 template <class T>
315 template <class F>
316 Future<T> Future<T>::ensure(F func) {
317   MoveWrapper<F> funcw(std::move(func));
318   return this->then([funcw](Try<T>&& t) {
319     (*funcw)();
320     return makeFuture(std::move(t));
321   });
322 }
323
324 template <class T>
325 template <class F>
326 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
327   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
328   return within(dur, tk)
329     .onError([funcw](TimedOut const&) { return (*funcw)(); });
330 }
331
332 template <class T>
333 template <class F>
334 typename std::enable_if<
335   detail::callableWith<F, exception_wrapper>::value &&
336   detail::Extract<F>::ReturnsFuture::value,
337   Future<T>>::type
338 Future<T>::onError(F&& func) {
339   static_assert(
340       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
341       "Return type of onError callback must be T or Future<T>");
342
343   Promise<T> p;
344   auto f = p.getFuture();
345   auto pm = folly::makeMoveWrapper(std::move(p));
346   auto funcm = folly::makeMoveWrapper(std::move(func));
347   setCallback_([pm, funcm](Try<T> t) mutable {
348     if (t.hasException()) {
349       try {
350         auto f2 = (*funcm)(std::move(t.exception()));
351         f2.setCallback_([pm](Try<T> t2) mutable {
352           pm->setTry(std::move(t2));
353         });
354       } catch (const std::exception& e2) {
355         pm->setException(exception_wrapper(std::current_exception(), e2));
356       } catch (...) {
357         pm->setException(exception_wrapper(std::current_exception()));
358       }
359     } else {
360       pm->setTry(std::move(t));
361     }
362   });
363
364   return f;
365 }
366
367 // onError(exception_wrapper) that returns T
368 template <class T>
369 template <class F>
370 typename std::enable_if<
371   detail::callableWith<F, exception_wrapper>::value &&
372   !detail::Extract<F>::ReturnsFuture::value,
373   Future<T>>::type
374 Future<T>::onError(F&& func) {
375   static_assert(
376       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
377       "Return type of onError callback must be T or Future<T>");
378
379   Promise<T> p;
380   auto f = p.getFuture();
381   auto pm = folly::makeMoveWrapper(std::move(p));
382   auto funcm = folly::makeMoveWrapper(std::move(func));
383   setCallback_([pm, funcm](Try<T> t) mutable {
384     if (t.hasException()) {
385       pm->setWith([&]{
386         return (*funcm)(std::move(t.exception()));
387       });
388     } else {
389       pm->setTry(std::move(t));
390     }
391   });
392
393   return f;
394 }
395
396 template <class T>
397 typename std::add_lvalue_reference<T>::type Future<T>::value() {
398   throwIfInvalid();
399
400   return core_->getTry().value();
401 }
402
403 template <class T>
404 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
405   throwIfInvalid();
406
407   return core_->getTry().value();
408 }
409
410 template <class T>
411 Try<T>& Future<T>::getTry() {
412   throwIfInvalid();
413
414   return core_->getTry();
415 }
416
417 template <class T>
418 Optional<Try<T>> Future<T>::poll() {
419   Optional<Try<T>> o;
420   if (core_->ready()) {
421     o = std::move(core_->getTry());
422   }
423   return o;
424 }
425
426 template <class T>
427 template <typename Executor>
428 inline Future<T> Future<T>::via(Executor* executor) && {
429   throwIfInvalid();
430
431   setExecutor(executor);
432
433   return std::move(*this);
434 }
435
436 template <class T>
437 template <typename Executor>
438 inline Future<T> Future<T>::via(Executor* executor) & {
439   throwIfInvalid();
440
441   MoveWrapper<Promise<T>> p;
442   auto f = p->getFuture();
443   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
444   return std::move(f).via(executor);
445 }
446
447 template <class T>
448 bool Future<T>::isReady() const {
449   throwIfInvalid();
450   return core_->ready();
451 }
452
453 template <class T>
454 void Future<T>::raise(exception_wrapper exception) {
455   core_->raise(std::move(exception));
456 }
457
458 // makeFuture
459
460 template <class T>
461 Future<typename std::decay<T>::type> makeFuture(T&& t) {
462   Promise<typename std::decay<T>::type> p;
463   p.setValue(std::forward<T>(t));
464   return p.getFuture();
465 }
466
467 inline // for multiple translation units
468 Future<void> makeFuture() {
469   Promise<void> p;
470   p.setValue();
471   return p.getFuture();
472 }
473
474 template <class F>
475 auto makeFutureWith(
476     F&& func,
477     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
478     -> Future<decltype(func())> {
479   Promise<decltype(func())> p;
480   p.setWith(
481     [&func]() {
482       return (func)();
483     });
484   return p.getFuture();
485 }
486
487 template <class F>
488 auto makeFutureWith(F const& func) -> Future<decltype(func())> {
489   F copy = func;
490   return makeFutureWith(std::move(copy));
491 }
492
493 template <class T>
494 Future<T> makeFuture(std::exception_ptr const& e) {
495   Promise<T> p;
496   p.setException(e);
497   return p.getFuture();
498 }
499
500 template <class T>
501 Future<T> makeFuture(exception_wrapper ew) {
502   Promise<T> p;
503   p.setException(std::move(ew));
504   return p.getFuture();
505 }
506
507 template <class T, class E>
508 typename std::enable_if<std::is_base_of<std::exception, E>::value,
509                         Future<T>>::type
510 makeFuture(E const& e) {
511   Promise<T> p;
512   p.setException(make_exception_wrapper<E>(e));
513   return p.getFuture();
514 }
515
516 template <class T>
517 Future<T> makeFuture(Try<T>&& t) {
518   Promise<typename std::decay<T>::type> p;
519   p.setTry(std::move(t));
520   return p.getFuture();
521 }
522
523 template <>
524 inline Future<void> makeFuture(Try<void>&& t) {
525   if (t.hasException()) {
526     return makeFuture<void>(std::move(t.exception()));
527   } else {
528     return makeFuture();
529   }
530 }
531
532 // via
533 template <typename Executor>
534 Future<void> via(Executor* executor) {
535   return makeFuture().via(executor);
536 }
537
538 // when (variadic)
539
540 template <typename... Fs>
541 typename detail::VariadicContext<
542   typename std::decay<Fs>::type::value_type...>::type
543 collectAll(Fs&&... fs) {
544   auto ctx =
545     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
546   ctx->total = sizeof...(fs);
547   auto f_saved = ctx->p.getFuture();
548   detail::collectAllVariadicHelper(ctx,
549     std::forward<typename std::decay<Fs>::type>(fs)...);
550   return f_saved;
551 }
552
553 // when (iterator)
554
555 template <class InputIterator>
556 Future<
557   std::vector<
558   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
559 collectAll(InputIterator first, InputIterator last) {
560   typedef
561     typename std::iterator_traits<InputIterator>::value_type::value_type T;
562
563   if (first >= last) {
564     return makeFuture(std::vector<Try<T>>());
565   }
566   size_t n = std::distance(first, last);
567
568   auto ctx = new detail::WhenAllContext<T>();
569
570   ctx->results.resize(n);
571
572   auto f_saved = ctx->p.getFuture();
573
574   for (size_t i = 0; first != last; ++first, ++i) {
575      assert(i < n);
576      auto& f = *first;
577      f.setCallback_([ctx, i, n](Try<T> t) {
578        ctx->results[i] = std::move(t);
579        if (++ctx->count == n) {
580          ctx->p.setValue(std::move(ctx->results));
581          delete ctx;
582        }
583      });
584   }
585
586   return f_saved;
587 }
588
589 namespace detail {
590
591 template <class, class, typename = void> struct CollectContextHelper;
592
593 template <class T, class VecT>
594 struct CollectContextHelper<T, VecT,
595     typename std::enable_if<std::is_same<T, VecT>::value>::type> {
596   static inline std::vector<T>&& getResults(std::vector<VecT>& results) {
597     return std::move(results);
598   }
599 };
600
601 template <class T, class VecT>
602 struct CollectContextHelper<T, VecT,
603     typename std::enable_if<!std::is_same<T, VecT>::value>::type> {
604   static inline std::vector<T> getResults(std::vector<VecT>& results) {
605     std::vector<T> finalResults;
606     finalResults.reserve(results.size());
607     for (auto& opt : results) {
608       finalResults.push_back(std::move(opt.value()));
609     }
610     return finalResults;
611   }
612 };
613
614 template <typename T>
615 struct CollectContext {
616
617   typedef typename std::conditional<
618     std::is_default_constructible<T>::value,
619     T,
620     Optional<T>
621    >::type VecT;
622
623   explicit CollectContext(int n) : count(0), success_count(0), threw(false) {
624     results.resize(n);
625   }
626
627   Promise<std::vector<T>> p;
628   std::vector<VecT> results;
629   std::atomic<size_t> count, success_count;
630   std::atomic_bool threw;
631
632   typedef std::vector<T> result_type;
633
634   static inline Future<std::vector<T>> makeEmptyFuture() {
635     return makeFuture(std::vector<T>());
636   }
637
638   inline void setValue() {
639     p.setValue(CollectContextHelper<T, VecT>::getResults(results));
640   }
641
642   inline void addResult(int i, Try<T>& t) {
643     results[i] = std::move(t.value());
644   }
645 };
646
647 template <>
648 struct CollectContext<void> {
649
650   explicit CollectContext(int n) : count(0), success_count(0), threw(false) {}
651
652   Promise<void> p;
653   std::atomic<size_t> count, success_count;
654   std::atomic_bool threw;
655
656   typedef void result_type;
657
658   static inline Future<void> makeEmptyFuture() {
659     return makeFuture();
660   }
661
662   inline void setValue() {
663     p.setValue();
664   }
665
666   inline void addResult(int i, Try<void>& t) {
667     // do nothing
668   }
669 };
670
671 } // detail
672
673 template <class InputIterator>
674 Future<typename detail::CollectContext<
675   typename std::iterator_traits<InputIterator>::value_type::value_type
676 >::result_type>
677 collect(InputIterator first, InputIterator last) {
678   typedef
679     typename std::iterator_traits<InputIterator>::value_type::value_type T;
680
681   if (first >= last) {
682     return detail::CollectContext<T>::makeEmptyFuture();
683   }
684
685   size_t n = std::distance(first, last);
686   auto ctx = new detail::CollectContext<T>(n);
687   auto f_saved = ctx->p.getFuture();
688
689   for (size_t i = 0; first != last; ++first, ++i) {
690      assert(i < n);
691      auto& f = *first;
692      f.setCallback_([ctx, i, n](Try<T> t) {
693
694        if (t.hasException()) {
695          if (!ctx->threw.exchange(true)) {
696            ctx->p.setException(std::move(t.exception()));
697          }
698        } else if (!ctx->threw) {
699          ctx->addResult(i, t);
700          if (++ctx->success_count == n) {
701            ctx->setValue();
702          }
703        }
704
705        if (++ctx->count == n) {
706          delete ctx;
707        }
708      });
709   }
710
711   return f_saved;
712 }
713
714 template <class InputIterator>
715 Future<
716   std::pair<size_t,
717             Try<
718               typename
719               std::iterator_traits<InputIterator>::value_type::value_type> > >
720 collectAny(InputIterator first, InputIterator last) {
721   typedef
722     typename std::iterator_traits<InputIterator>::value_type::value_type T;
723
724   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
725   auto f_saved = ctx->p.getFuture();
726
727   for (size_t i = 0; first != last; first++, i++) {
728     auto& f = *first;
729     f.setCallback_([i, ctx](Try<T>&& t) {
730       if (!ctx->done.exchange(true)) {
731         ctx->p.setValue(std::make_pair(i, std::move(t)));
732       }
733       ctx->decref();
734     });
735   }
736
737   return f_saved;
738 }
739
740 template <class InputIterator>
741 Future<std::vector<std::pair<size_t, Try<typename
742   std::iterator_traits<InputIterator>::value_type::value_type>>>>
743 collectN(InputIterator first, InputIterator last, size_t n) {
744   typedef typename
745     std::iterator_traits<InputIterator>::value_type::value_type T;
746   typedef std::vector<std::pair<size_t, Try<T>>> V;
747
748   struct ctx_t {
749     V v;
750     size_t completed;
751     Promise<V> p;
752   };
753   auto ctx = std::make_shared<ctx_t>();
754   ctx->completed = 0;
755
756   // for each completed Future, increase count and add to vector, until we
757   // have n completed futures at which point we fulfill our Promise with the
758   // vector
759   auto it = first;
760   size_t i = 0;
761   while (it != last) {
762     it->then([ctx, n, i](Try<T>&& t) {
763       auto& v = ctx->v;
764       auto c = ++ctx->completed;
765       if (c <= n) {
766         assert(ctx->v.size() < n);
767         v.push_back(std::make_pair(i, std::move(t)));
768         if (c == n) {
769           ctx->p.setTry(Try<V>(std::move(v)));
770         }
771       }
772     });
773
774     it++;
775     i++;
776   }
777
778   if (i < n) {
779     ctx->p.setException(std::runtime_error("Not enough futures"));
780   }
781
782   return ctx->p.getFuture();
783 }
784
785 template <class It, class T, class F, class ItT, class Arg>
786 typename std::enable_if<!isFutureResult<F, T, Arg>::value, Future<T>>::type
787 reduce(It first, It last, T initial, F func) {
788   if (first == last) {
789     return makeFuture(std::move(initial));
790   }
791
792   typedef isTry<Arg> IsTry;
793
794   return collectAll(first, last)
795     .then([initial, func](std::vector<Try<ItT>>& vals) mutable {
796       for (auto& val : vals) {
797         initial = func(std::move(initial),
798                        // Either return a ItT&& or a Try<ItT>&& depending
799                        // on the type of the argument of func.
800                        val.template get<IsTry::value, Arg&&>());
801       }
802       return initial;
803     });
804 }
805
806 template <class It, class T, class F, class ItT, class Arg>
807 typename std::enable_if<isFutureResult<F, T, Arg>::value, Future<T>>::type
808 reduce(It first, It last, T initial, F func) {
809   if (first == last) {
810     return makeFuture(std::move(initial));
811   }
812
813   typedef isTry<Arg> IsTry;
814
815   auto f = first->then([initial, func](Try<ItT>& head) mutable {
816     return func(std::move(initial),
817                 head.template get<IsTry::value, Arg&&>());
818   });
819
820   for (++first; first != last; ++first) {
821     f = collectAll(f, *first).then([func](std::tuple<Try<T>, Try<ItT>>& t) {
822       return func(std::move(std::get<0>(t).value()),
823                   // Either return a ItT&& or a Try<ItT>&& depending
824                   // on the type of the argument of func.
825                   std::get<1>(t).template get<IsTry::value, Arg&&>());
826     });
827   }
828
829   return f;
830 }
831
832 template <class T>
833 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
834   return within(dur, TimedOut(), tk);
835 }
836
837 template <class T>
838 template <class E>
839 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
840
841   struct Context {
842     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
843     E exception;
844     Promise<T> promise;
845     std::atomic<bool> token;
846   };
847   auto ctx = std::make_shared<Context>(std::move(e));
848
849   if (!tk) {
850     tk = folly::detail::getTimekeeperSingleton();
851   }
852
853   tk->after(dur)
854     .then([ctx](Try<void> const& t) {
855       if (ctx->token.exchange(true) == false) {
856         if (t.hasException()) {
857           ctx->promise.setException(std::move(t.exception()));
858         } else {
859           ctx->promise.setException(std::move(ctx->exception));
860         }
861       }
862     });
863
864   this->then([ctx](Try<T>&& t) {
865     if (ctx->token.exchange(true) == false) {
866       ctx->promise.setTry(std::move(t));
867     }
868   });
869
870   return ctx->promise.getFuture();
871 }
872
873 template <class T>
874 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
875   return collectAll(*this, futures::sleep(dur, tk))
876     .then([](std::tuple<Try<T>, Try<void>> tup) {
877       Try<T>& t = std::get<0>(tup);
878       return makeFuture<T>(std::move(t));
879     });
880 }
881
882 namespace detail {
883
884 template <class T>
885 void waitImpl(Future<T>& f) {
886   // short-circuit if there's nothing to do
887   if (f.isReady()) return;
888
889   folly::fibers::Baton baton;
890   f = f.then([&](Try<T> t) {
891     baton.post();
892     return makeFuture(std::move(t));
893   });
894   baton.wait();
895
896   // There's a race here between the return here and the actual finishing of
897   // the future. f is completed, but the setup may not have finished on done
898   // after the baton has posted.
899   while (!f.isReady()) {
900     std::this_thread::yield();
901   }
902 }
903
904 template <class T>
905 void waitImpl(Future<T>& f, Duration dur) {
906   // short-circuit if there's nothing to do
907   if (f.isReady()) return;
908
909   auto baton = std::make_shared<folly::fibers::Baton>();
910   f = f.then([baton](Try<T> t) {
911     baton->post();
912     return makeFuture(std::move(t));
913   });
914
915   // Let's preserve the invariant that if we did not timeout (timed_wait returns
916   // true), then the returned Future is complete when it is returned to the
917   // caller. We need to wait out the race for that Future to complete.
918   if (baton->timed_wait(dur)) {
919     while (!f.isReady()) {
920       std::this_thread::yield();
921     }
922   }
923 }
924
925 template <class T>
926 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
927   while (!f.isReady()) {
928     e->drive();
929   }
930 }
931
932 } // detail
933
934 template <class T>
935 Future<T>& Future<T>::wait() & {
936   detail::waitImpl(*this);
937   return *this;
938 }
939
940 template <class T>
941 Future<T>&& Future<T>::wait() && {
942   detail::waitImpl(*this);
943   return std::move(*this);
944 }
945
946 template <class T>
947 Future<T>& Future<T>::wait(Duration dur) & {
948   detail::waitImpl(*this, dur);
949   return *this;
950 }
951
952 template <class T>
953 Future<T>&& Future<T>::wait(Duration dur) && {
954   detail::waitImpl(*this, dur);
955   return std::move(*this);
956 }
957
958 template <class T>
959 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
960   detail::waitViaImpl(*this, e);
961   return *this;
962 }
963
964 template <class T>
965 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
966   detail::waitViaImpl(*this, e);
967   return std::move(*this);
968 }
969
970 template <class T>
971 T Future<T>::get() {
972   return std::move(wait().value());
973 }
974
975 template <>
976 inline void Future<void>::get() {
977   wait().value();
978 }
979
980 template <class T>
981 T Future<T>::get(Duration dur) {
982   wait(dur);
983   if (isReady()) {
984     return std::move(value());
985   } else {
986     throw TimedOut();
987   }
988 }
989
990 template <>
991 inline void Future<void>::get(Duration dur) {
992   wait(dur);
993   if (isReady()) {
994     return;
995   } else {
996     throw TimedOut();
997   }
998 }
999
1000 template <class T>
1001 T Future<T>::getVia(DrivableExecutor* e) {
1002   return std::move(waitVia(e).value());
1003 }
1004
1005 template <>
1006 inline void Future<void>::getVia(DrivableExecutor* e) {
1007   waitVia(e).value();
1008 }
1009
1010 template <class T>
1011 Future<bool> Future<T>::willEqual(Future<T>& f) {
1012   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1013     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1014       return std::get<0>(t).value() == std::get<1>(t).value();
1015     } else {
1016       return false;
1017       }
1018   });
1019 }
1020
1021 template <class T>
1022 template <class F>
1023 Future<T> Future<T>::filter(F predicate) {
1024   auto p = folly::makeMoveWrapper(std::move(predicate));
1025   return this->then([p](T val) {
1026     T const& valConstRef = val;
1027     if (!(*p)(valConstRef)) {
1028       throw PredicateDoesNotObtain();
1029     }
1030     return val;
1031   });
1032 }
1033
1034 namespace futures {
1035   namespace {
1036     template <class Z>
1037     Future<Z> chainHelper(Future<Z> f) {
1038       return f;
1039     }
1040
1041     template <class Z, class F, class Fn, class... Callbacks>
1042     Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
1043       return chainHelper<Z>(f.then(fn), fns...);
1044     }
1045   }
1046
1047   template <class A, class Z, class... Callbacks>
1048   std::function<Future<Z>(Try<A>)>
1049   chain(Callbacks... fns) {
1050     MoveWrapper<Promise<A>> pw;
1051     MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
1052     return [=](Try<A> t) mutable {
1053       pw->setTry(std::move(t));
1054       return std::move(*fw);
1055     };
1056   }
1057
1058   template <class It, class F, class ItT, class Result>
1059   std::vector<Future<Result>> map(It first, It last, F func) {
1060     std::vector<Future<Result>> results;
1061     for (auto it = first; it != last; it++) {
1062       results.push_back(it->then(func));
1063     }
1064     return results;
1065   }
1066 }
1067
1068 } // namespace folly
1069
1070 // I haven't included a Future<T&> specialization because I don't forsee us
1071 // using it, however it is not difficult to add when needed. Refer to
1072 // Future<void> for guidance. std::future and boost::future code would also be
1073 // instructive.