fix collect() for move-only types
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
26
27 namespace folly {
28
29 class Timekeeper;
30
31 namespace detail {
32   Timekeeper* getTimekeeperSingleton();
33 }
34
35 template <class T>
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37   other.core_ = nullptr;
38 }
39
40 template <class T>
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42   std::swap(core_, other.core_);
43   return *this;
44 }
45
46 template <class T>
47 template <class F>
48 Future<T>::Future(
49   const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
50     : core_(nullptr) {
51   Promise<F> p;
52   p.setValue(val);
53   *this = p.getFuture();
54 }
55
56 template <class T>
57 template <class F>
58 Future<T>::Future(
59   typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
60     : core_(nullptr) {
61   Promise<F> p;
62   p.setValue(std::forward<F>(val));
63   *this = p.getFuture();
64 }
65
66 template <>
67 template <class F,
68           typename std::enable_if<std::is_void<F>::value, int>::type>
69 Future<void>::Future() : core_(nullptr) {
70   Promise<void> p;
71   p.setValue();
72   *this = p.getFuture();
73 }
74
75
76 template <class T>
77 Future<T>::~Future() {
78   detach();
79 }
80
81 template <class T>
82 void Future<T>::detach() {
83   if (core_) {
84     core_->detachFuture();
85     core_ = nullptr;
86   }
87 }
88
89 template <class T>
90 void Future<T>::throwIfInvalid() const {
91   if (!core_)
92     throw NoState();
93 }
94
95 template <class T>
96 template <class F>
97 void Future<T>::setCallback_(F&& func) {
98   throwIfInvalid();
99   core_->setCallback(std::move(func));
100 }
101
102 // unwrap
103
104 template <class T>
105 template <class F>
106 typename std::enable_if<isFuture<F>::value,
107                         Future<typename isFuture<T>::Inner>>::type
108 Future<T>::unwrap() {
109   return then([](Future<typename isFuture<T>::Inner> internal_future) {
110       return internal_future;
111   });
112 }
113
114 // then
115
116 // Variant: returns a value
117 // e.g. f.then([](Try<T>&& t){ return t.value(); });
118 template <class T>
119 template <typename F, typename R, bool isTry, typename... Args>
120 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
121 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
122   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
123   typedef typename R::ReturnsFuture::Inner B;
124
125   throwIfInvalid();
126
127   // wrap these so we can move them into the lambda
128   folly::MoveWrapper<Promise<B>> p;
129   folly::MoveWrapper<F> funcm(std::forward<F>(func));
130
131   // grab the Future now before we lose our handle on the Promise
132   auto f = p->getFuture();
133   if (getExecutor()) {
134     f.setExecutor(getExecutor());
135   }
136
137   /* This is a bit tricky.
138
139      We can't just close over *this in case this Future gets moved. So we
140      make a new dummy Future. We could figure out something more
141      sophisticated that avoids making a new Future object when it can, as an
142      optimization. But this is correct.
143
144      core_ can't be moved, it is explicitly disallowed (as is copying). But
145      if there's ever a reason to allow it, this is one place that makes that
146      assumption and would need to be fixed. We use a standard shared pointer
147      for core_ (by copying it in), which means in essence obj holds a shared
148      pointer to itself.  But this shouldn't leak because Promise will not
149      outlive the continuation, because Promise will setException() with a
150      broken Promise if it is destructed before completed. We could use a
151      weak pointer but it would have to be converted to a shared pointer when
152      func is executed (because the Future returned by func may possibly
153      persist beyond the callback, if it gets moved), and so it is an
154      optimization to just make it shared from the get-go.
155
156      We have to move in the Promise and func using the MoveWrapper
157      hack. (func could be copied but it's a big drag on perf).
158
159      Two subtle but important points about this design. detail::Core has no
160      back pointers to Future or Promise, so if Future or Promise get moved
161      (and they will be moved in performant code) we don't have to do
162      anything fancy. And because we store the continuation in the
163      detail::Core, not in the Future, we can execute the continuation even
164      after the Future has gone out of scope. This is an intentional design
165      decision. It is likely we will want to be able to cancel a continuation
166      in some circumstances, but I think it should be explicit not implicit
167      in the destruction of the Future used to create it.
168      */
169   setCallback_(
170     [p, funcm](Try<T>&& t) mutable {
171       if (!isTry && t.hasException()) {
172         p->setException(std::move(t.exception()));
173       } else {
174         p->setWith([&]() {
175           return (*funcm)(t.template get<isTry, Args>()...);
176         });
177       }
178     });
179
180   return f;
181 }
182
183 // Variant: returns a Future
184 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
185 template <class T>
186 template <typename F, typename R, bool isTry, typename... Args>
187 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
188 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
189   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
190   typedef typename R::ReturnsFuture::Inner B;
191
192   throwIfInvalid();
193
194   // wrap these so we can move them into the lambda
195   folly::MoveWrapper<Promise<B>> p;
196   folly::MoveWrapper<F> funcm(std::forward<F>(func));
197
198   // grab the Future now before we lose our handle on the Promise
199   auto f = p->getFuture();
200   if (getExecutor()) {
201     f.setExecutor(getExecutor());
202   }
203
204   setCallback_(
205     [p, funcm](Try<T>&& t) mutable {
206       if (!isTry && t.hasException()) {
207         p->setException(std::move(t.exception()));
208       } else {
209         try {
210           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
211           // that didn't throw, now we can steal p
212           f2.setCallback_([p](Try<B>&& b) mutable {
213             p->setTry(std::move(b));
214           });
215         } catch (const std::exception& e) {
216           p->setException(exception_wrapper(std::current_exception(), e));
217         } catch (...) {
218           p->setException(exception_wrapper(std::current_exception()));
219         }
220       }
221     });
222
223   return f;
224 }
225
226 template <typename T>
227 template <typename R, typename Caller, typename... Args>
228   Future<typename isFuture<R>::Inner>
229 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
230   typedef typename std::remove_cv<
231     typename std::remove_reference<
232       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
233   return then([instance, func](Try<T>&& t){
234     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
235   });
236 }
237
238 // TODO(6838553)
239 #ifndef __clang__
240 template <class T>
241 template <class... Args>
242 auto Future<T>::then(Executor* x, Args&&... args)
243   -> decltype(this->then(std::forward<Args>(args)...))
244 {
245   auto oldX = getExecutor();
246   setExecutor(x);
247   return this->then(std::forward<Args>(args)...).via(oldX);
248 }
249 #endif
250
251 template <class T>
252 Future<void> Future<T>::then() {
253   return then([] (Try<T>&& t) {});
254 }
255
256 // onError where the callback returns T
257 template <class T>
258 template <class F>
259 typename std::enable_if<
260   !detail::callableWith<F, exception_wrapper>::value &&
261   !detail::Extract<F>::ReturnsFuture::value,
262   Future<T>>::type
263 Future<T>::onError(F&& func) {
264   typedef typename detail::Extract<F>::FirstArg Exn;
265   static_assert(
266       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
267       "Return type of onError callback must be T or Future<T>");
268
269   Promise<T> p;
270   auto f = p.getFuture();
271   auto pm = folly::makeMoveWrapper(std::move(p));
272   auto funcm = folly::makeMoveWrapper(std::move(func));
273   setCallback_([pm, funcm](Try<T>&& t) mutable {
274     if (!t.template withException<Exn>([&] (Exn& e) {
275           pm->setWith([&]{
276             return (*funcm)(e);
277           });
278         })) {
279       pm->setTry(std::move(t));
280     }
281   });
282
283   return f;
284 }
285
286 // onError where the callback returns Future<T>
287 template <class T>
288 template <class F>
289 typename std::enable_if<
290   !detail::callableWith<F, exception_wrapper>::value &&
291   detail::Extract<F>::ReturnsFuture::value,
292   Future<T>>::type
293 Future<T>::onError(F&& func) {
294   static_assert(
295       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
296       "Return type of onError callback must be T or Future<T>");
297   typedef typename detail::Extract<F>::FirstArg Exn;
298
299   Promise<T> p;
300   auto f = p.getFuture();
301   auto pm = folly::makeMoveWrapper(std::move(p));
302   auto funcm = folly::makeMoveWrapper(std::move(func));
303   setCallback_([pm, funcm](Try<T>&& t) mutable {
304     if (!t.template withException<Exn>([&] (Exn& e) {
305           try {
306             auto f2 = (*funcm)(e);
307             f2.setCallback_([pm](Try<T>&& t2) mutable {
308               pm->setTry(std::move(t2));
309             });
310           } catch (const std::exception& e2) {
311             pm->setException(exception_wrapper(std::current_exception(), e2));
312           } catch (...) {
313             pm->setException(exception_wrapper(std::current_exception()));
314           }
315         })) {
316       pm->setTry(std::move(t));
317     }
318   });
319
320   return f;
321 }
322
323 template <class T>
324 template <class F>
325 Future<T> Future<T>::ensure(F func) {
326   MoveWrapper<F> funcw(std::move(func));
327   return this->then([funcw](Try<T>&& t) {
328     (*funcw)();
329     return makeFuture(std::move(t));
330   });
331 }
332
333 template <class T>
334 template <class F>
335 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
336   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
337   return within(dur, tk)
338     .onError([funcw](TimedOut const&) { return (*funcw)(); });
339 }
340
341 template <class T>
342 template <class F>
343 typename std::enable_if<
344   detail::callableWith<F, exception_wrapper>::value &&
345   detail::Extract<F>::ReturnsFuture::value,
346   Future<T>>::type
347 Future<T>::onError(F&& func) {
348   static_assert(
349       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
350       "Return type of onError callback must be T or Future<T>");
351
352   Promise<T> p;
353   auto f = p.getFuture();
354   auto pm = folly::makeMoveWrapper(std::move(p));
355   auto funcm = folly::makeMoveWrapper(std::move(func));
356   setCallback_([pm, funcm](Try<T> t) mutable {
357     if (t.hasException()) {
358       try {
359         auto f2 = (*funcm)(std::move(t.exception()));
360         f2.setCallback_([pm](Try<T> t2) mutable {
361           pm->setTry(std::move(t2));
362         });
363       } catch (const std::exception& e2) {
364         pm->setException(exception_wrapper(std::current_exception(), e2));
365       } catch (...) {
366         pm->setException(exception_wrapper(std::current_exception()));
367       }
368     } else {
369       pm->setTry(std::move(t));
370     }
371   });
372
373   return f;
374 }
375
376 // onError(exception_wrapper) that returns T
377 template <class T>
378 template <class F>
379 typename std::enable_if<
380   detail::callableWith<F, exception_wrapper>::value &&
381   !detail::Extract<F>::ReturnsFuture::value,
382   Future<T>>::type
383 Future<T>::onError(F&& func) {
384   static_assert(
385       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
386       "Return type of onError callback must be T or Future<T>");
387
388   Promise<T> p;
389   auto f = p.getFuture();
390   auto pm = folly::makeMoveWrapper(std::move(p));
391   auto funcm = folly::makeMoveWrapper(std::move(func));
392   setCallback_([pm, funcm](Try<T> t) mutable {
393     if (t.hasException()) {
394       pm->setWith([&]{
395         return (*funcm)(std::move(t.exception()));
396       });
397     } else {
398       pm->setTry(std::move(t));
399     }
400   });
401
402   return f;
403 }
404
405 template <class T>
406 typename std::add_lvalue_reference<T>::type Future<T>::value() {
407   throwIfInvalid();
408
409   return core_->getTry().value();
410 }
411
412 template <class T>
413 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
414   throwIfInvalid();
415
416   return core_->getTry().value();
417 }
418
419 template <class T>
420 Try<T>& Future<T>::getTry() {
421   throwIfInvalid();
422
423   return core_->getTry();
424 }
425
426 template <class T>
427 Optional<Try<T>> Future<T>::poll() {
428   Optional<Try<T>> o;
429   if (core_->ready()) {
430     o = std::move(core_->getTry());
431   }
432   return o;
433 }
434
435 template <class T>
436 template <typename Executor>
437 inline Future<T> Future<T>::via(Executor* executor) && {
438   throwIfInvalid();
439
440   setExecutor(executor);
441
442   return std::move(*this);
443 }
444
445 template <class T>
446 template <typename Executor>
447 inline Future<T> Future<T>::via(Executor* executor) & {
448   throwIfInvalid();
449
450   MoveWrapper<Promise<T>> p;
451   auto f = p->getFuture();
452   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
453   return std::move(f).via(executor);
454 }
455
456 template <class T>
457 bool Future<T>::isReady() const {
458   throwIfInvalid();
459   return core_->ready();
460 }
461
462 template <class T>
463 void Future<T>::raise(exception_wrapper exception) {
464   core_->raise(std::move(exception));
465 }
466
467 // makeFuture
468
469 template <class T>
470 Future<typename std::decay<T>::type> makeFuture(T&& t) {
471   Promise<typename std::decay<T>::type> p;
472   p.setValue(std::forward<T>(t));
473   return p.getFuture();
474 }
475
476 inline // for multiple translation units
477 Future<void> makeFuture() {
478   Promise<void> p;
479   p.setValue();
480   return p.getFuture();
481 }
482
483 template <class F>
484 auto makeFutureTry(
485     F&& func,
486     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
487     -> Future<decltype(func())> {
488   Promise<decltype(func())> p;
489   p.setWith(
490     [&func]() {
491       return (func)();
492     });
493   return p.getFuture();
494 }
495
496 template <class F>
497 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
498   F copy = func;
499   return makeFutureTry(std::move(copy));
500 }
501
502 template <class T>
503 Future<T> makeFuture(std::exception_ptr const& e) {
504   Promise<T> p;
505   p.setException(e);
506   return p.getFuture();
507 }
508
509 template <class T>
510 Future<T> makeFuture(exception_wrapper ew) {
511   Promise<T> p;
512   p.setException(std::move(ew));
513   return p.getFuture();
514 }
515
516 template <class T, class E>
517 typename std::enable_if<std::is_base_of<std::exception, E>::value,
518                         Future<T>>::type
519 makeFuture(E const& e) {
520   Promise<T> p;
521   p.setException(make_exception_wrapper<E>(e));
522   return p.getFuture();
523 }
524
525 template <class T>
526 Future<T> makeFuture(Try<T>&& t) {
527   Promise<typename std::decay<T>::type> p;
528   p.setTry(std::move(t));
529   return p.getFuture();
530 }
531
532 template <>
533 inline Future<void> makeFuture(Try<void>&& t) {
534   if (t.hasException()) {
535     return makeFuture<void>(std::move(t.exception()));
536   } else {
537     return makeFuture();
538   }
539 }
540
541 // via
542 template <typename Executor>
543 Future<void> via(Executor* executor) {
544   return makeFuture().via(executor);
545 }
546
547 // when (variadic)
548
549 template <typename... Fs>
550 typename detail::VariadicContext<
551   typename std::decay<Fs>::type::value_type...>::type
552 whenAll(Fs&&... fs) {
553   auto ctx =
554     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
555   ctx->total = sizeof...(fs);
556   auto f_saved = ctx->p.getFuture();
557   detail::whenAllVariadicHelper(ctx,
558     std::forward<typename std::decay<Fs>::type>(fs)...);
559   return f_saved;
560 }
561
562 // when (iterator)
563
564 template <class InputIterator>
565 Future<
566   std::vector<
567   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
568 whenAll(InputIterator first, InputIterator last) {
569   typedef
570     typename std::iterator_traits<InputIterator>::value_type::value_type T;
571
572   if (first >= last) {
573     return makeFuture(std::vector<Try<T>>());
574   }
575   size_t n = std::distance(first, last);
576
577   auto ctx = new detail::WhenAllContext<T>();
578
579   ctx->results.resize(n);
580
581   auto f_saved = ctx->p.getFuture();
582
583   for (size_t i = 0; first != last; ++first, ++i) {
584      assert(i < n);
585      auto& f = *first;
586      f.setCallback_([ctx, i, n](Try<T> t) {
587        ctx->results[i] = std::move(t);
588        if (++ctx->count == n) {
589          ctx->p.setValue(std::move(ctx->results));
590          delete ctx;
591        }
592      });
593   }
594
595   return f_saved;
596 }
597
598 namespace detail {
599
600 template <class, class, typename = void> struct CollectContextHelper;
601
602 template <class T, class VecT>
603 struct CollectContextHelper<T, VecT,
604     typename std::enable_if<std::is_same<T, VecT>::value>::type> {
605   static inline std::vector<T>&& getResults(std::vector<VecT>& results) {
606     return std::move(results);
607   }
608 };
609
610 template <class T, class VecT>
611 struct CollectContextHelper<T, VecT,
612     typename std::enable_if<!std::is_same<T, VecT>::value>::type> {
613   static inline std::vector<T> getResults(std::vector<VecT>& results) {
614     std::vector<T> finalResults;
615     finalResults.reserve(results.size());
616     for (auto& opt : results) {
617       finalResults.push_back(std::move(opt.value()));
618     }
619     return finalResults;
620   }
621 };
622
623 template <typename T>
624 struct CollectContext {
625
626   typedef typename std::conditional<
627     std::is_default_constructible<T>::value,
628     T,
629     Optional<T>
630    >::type VecT;
631
632   explicit CollectContext(int n) : count(0), threw(false) {
633     results.resize(n);
634   }
635
636   Promise<std::vector<T>> p;
637   std::vector<VecT> results;
638   std::atomic<size_t> count;
639   std::atomic_bool threw;
640
641   typedef std::vector<T> result_type;
642
643   static inline Future<std::vector<T>> makeEmptyFuture() {
644     return makeFuture(std::vector<T>());
645   }
646
647   inline void setValue() {
648     p.setValue(CollectContextHelper<T, VecT>::getResults(results));
649   }
650
651   inline void addResult(int i, Try<T>& t) {
652     results[i] = std::move(t.value());
653   }
654 };
655
656 template <>
657 struct CollectContext<void> {
658
659   explicit CollectContext(int n) : count(0), threw(false) {}
660
661   Promise<void> p;
662   std::atomic<size_t> count;
663   std::atomic_bool threw;
664
665   typedef void result_type;
666
667   static inline Future<void> makeEmptyFuture() {
668     return makeFuture();
669   }
670
671   inline void setValue() {
672     p.setValue();
673   }
674
675   inline void addResult(int i, Try<void>& t) {
676     // do nothing
677   }
678 };
679
680 } // detail
681
682 template <class InputIterator>
683 Future<typename detail::CollectContext<
684   typename std::iterator_traits<InputIterator>::value_type::value_type
685 >::result_type>
686 collect(InputIterator first, InputIterator last) {
687   typedef
688     typename std::iterator_traits<InputIterator>::value_type::value_type T;
689
690   if (first >= last) {
691     return detail::CollectContext<T>::makeEmptyFuture();
692   }
693
694   size_t n = std::distance(first, last);
695   auto ctx = new detail::CollectContext<T>(n);
696   auto f_saved = ctx->p.getFuture();
697
698   for (size_t i = 0; first != last; ++first, ++i) {
699      assert(i < n);
700      auto& f = *first;
701      f.setCallback_([ctx, i, n](Try<T> t) {
702        auto c = ++ctx->count;
703
704        if (t.hasException()) {
705          if (!ctx->threw.exchange(true)) {
706            ctx->p.setException(std::move(t.exception()));
707          }
708        } else if (!ctx->threw) {
709          ctx->addResult(i, t);
710          if (c == n) {
711            ctx->setValue();
712          }
713        }
714
715        if (c == n) {
716          delete ctx;
717        }
718      });
719   }
720
721   return f_saved;
722 }
723
724 template <class InputIterator>
725 Future<
726   std::pair<size_t,
727             Try<
728               typename
729               std::iterator_traits<InputIterator>::value_type::value_type> > >
730 whenAny(InputIterator first, InputIterator last) {
731   typedef
732     typename std::iterator_traits<InputIterator>::value_type::value_type T;
733
734   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
735   auto f_saved = ctx->p.getFuture();
736
737   for (size_t i = 0; first != last; first++, i++) {
738     auto& f = *first;
739     f.setCallback_([i, ctx](Try<T>&& t) {
740       if (!ctx->done.exchange(true)) {
741         ctx->p.setValue(std::make_pair(i, std::move(t)));
742       }
743       ctx->decref();
744     });
745   }
746
747   return f_saved;
748 }
749
750 template <class InputIterator>
751 Future<std::vector<std::pair<size_t, Try<typename
752   std::iterator_traits<InputIterator>::value_type::value_type>>>>
753 whenN(InputIterator first, InputIterator last, size_t n) {
754   typedef typename
755     std::iterator_traits<InputIterator>::value_type::value_type T;
756   typedef std::vector<std::pair<size_t, Try<T>>> V;
757
758   struct ctx_t {
759     V v;
760     size_t completed;
761     Promise<V> p;
762   };
763   auto ctx = std::make_shared<ctx_t>();
764   ctx->completed = 0;
765
766   // for each completed Future, increase count and add to vector, until we
767   // have n completed futures at which point we fulfill our Promise with the
768   // vector
769   auto it = first;
770   size_t i = 0;
771   while (it != last) {
772     it->then([ctx, n, i](Try<T>&& t) {
773       auto& v = ctx->v;
774       auto c = ++ctx->completed;
775       if (c <= n) {
776         assert(ctx->v.size() < n);
777         v.push_back(std::make_pair(i, std::move(t)));
778         if (c == n) {
779           ctx->p.setTry(Try<V>(std::move(v)));
780         }
781       }
782     });
783
784     it++;
785     i++;
786   }
787
788   if (i < n) {
789     ctx->p.setException(std::runtime_error("Not enough futures"));
790   }
791
792   return ctx->p.getFuture();
793 }
794
795 template <class It, class T, class F, class ItT, class Arg>
796 typename std::enable_if<!isFutureResult<F, T, Arg>::value, Future<T>>::type
797 reduce(It first, It last, T initial, F func) {
798   if (first == last) {
799     return makeFuture(std::move(initial));
800   }
801
802   typedef isTry<Arg> IsTry;
803
804   return whenAll(first, last)
805     .then([initial, func](std::vector<Try<ItT>>& vals) mutable {
806       for (auto& val : vals) {
807         initial = func(std::move(initial),
808                        // Either return a ItT&& or a Try<ItT>&& depending
809                        // on the type of the argument of func.
810                        val.template get<IsTry::value, Arg&&>());
811       }
812       return initial;
813     });
814 }
815
816 template <class It, class T, class F, class ItT, class Arg>
817 typename std::enable_if<isFutureResult<F, T, Arg>::value, Future<T>>::type
818 reduce(It first, It last, T initial, F func) {
819   if (first == last) {
820     return makeFuture(std::move(initial));
821   }
822
823   typedef isTry<Arg> IsTry;
824
825   auto f = first->then([initial, func](Try<ItT>& head) mutable {
826     return func(std::move(initial),
827                 head.template get<IsTry::value, Arg&&>());
828   });
829
830   for (++first; first != last; ++first) {
831     f = whenAll(f, *first).then([func](std::tuple<Try<T>, Try<ItT>>& t) {
832       return func(std::move(std::get<0>(t).value()),
833                   // Either return a ItT&& or a Try<ItT>&& depending
834                   // on the type of the argument of func.
835                   std::get<1>(t).template get<IsTry::value, Arg&&>());
836     });
837   }
838
839   return f;
840 }
841
842 template <class T>
843 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
844   return within(dur, TimedOut(), tk);
845 }
846
847 template <class T>
848 template <class E>
849 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
850
851   struct Context {
852     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
853     E exception;
854     Promise<T> promise;
855     std::atomic<bool> token;
856   };
857   auto ctx = std::make_shared<Context>(std::move(e));
858
859   if (!tk) {
860     tk = folly::detail::getTimekeeperSingleton();
861   }
862
863   tk->after(dur)
864     .then([ctx](Try<void> const& t) {
865       if (ctx->token.exchange(true) == false) {
866         if (t.hasException()) {
867           ctx->promise.setException(std::move(t.exception()));
868         } else {
869           ctx->promise.setException(std::move(ctx->exception));
870         }
871       }
872     });
873
874   this->then([ctx](Try<T>&& t) {
875     if (ctx->token.exchange(true) == false) {
876       ctx->promise.setTry(std::move(t));
877     }
878   });
879
880   return ctx->promise.getFuture();
881 }
882
883 template <class T>
884 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
885   return whenAll(*this, futures::sleep(dur, tk))
886     .then([](std::tuple<Try<T>, Try<void>> tup) {
887       Try<T>& t = std::get<0>(tup);
888       return makeFuture<T>(std::move(t));
889     });
890 }
891
892 namespace detail {
893
894 template <class T>
895 void waitImpl(Future<T>& f) {
896   // short-circuit if there's nothing to do
897   if (f.isReady()) return;
898
899   Baton<> baton;
900   f = f.then([&](Try<T> t) {
901     baton.post();
902     return makeFuture(std::move(t));
903   });
904   baton.wait();
905
906   // There's a race here between the return here and the actual finishing of
907   // the future. f is completed, but the setup may not have finished on done
908   // after the baton has posted.
909   while (!f.isReady()) {
910     std::this_thread::yield();
911   }
912 }
913
914 template <class T>
915 void waitImpl(Future<T>& f, Duration dur) {
916   // short-circuit if there's nothing to do
917   if (f.isReady()) return;
918
919   auto baton = std::make_shared<Baton<>>();
920   f = f.then([baton](Try<T> t) {
921     baton->post();
922     return makeFuture(std::move(t));
923   });
924
925   // Let's preserve the invariant that if we did not timeout (timed_wait returns
926   // true), then the returned Future is complete when it is returned to the
927   // caller. We need to wait out the race for that Future to complete.
928   if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
929     while (!f.isReady()) {
930       std::this_thread::yield();
931     }
932   }
933 }
934
935 template <class T>
936 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
937   while (!f.isReady()) {
938     e->drive();
939   }
940 }
941
942 } // detail
943
944 template <class T>
945 Future<T>& Future<T>::wait() & {
946   detail::waitImpl(*this);
947   return *this;
948 }
949
950 template <class T>
951 Future<T>&& Future<T>::wait() && {
952   detail::waitImpl(*this);
953   return std::move(*this);
954 }
955
956 template <class T>
957 Future<T>& Future<T>::wait(Duration dur) & {
958   detail::waitImpl(*this, dur);
959   return *this;
960 }
961
962 template <class T>
963 Future<T>&& Future<T>::wait(Duration dur) && {
964   detail::waitImpl(*this, dur);
965   return std::move(*this);
966 }
967
968 template <class T>
969 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
970   detail::waitViaImpl(*this, e);
971   return *this;
972 }
973
974 template <class T>
975 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
976   detail::waitViaImpl(*this, e);
977   return std::move(*this);
978 }
979
980 template <class T>
981 T Future<T>::get() {
982   return std::move(wait().value());
983 }
984
985 template <>
986 inline void Future<void>::get() {
987   wait().value();
988 }
989
990 template <class T>
991 T Future<T>::get(Duration dur) {
992   wait(dur);
993   if (isReady()) {
994     return std::move(value());
995   } else {
996     throw TimedOut();
997   }
998 }
999
1000 template <>
1001 inline void Future<void>::get(Duration dur) {
1002   wait(dur);
1003   if (isReady()) {
1004     return;
1005   } else {
1006     throw TimedOut();
1007   }
1008 }
1009
1010 template <class T>
1011 T Future<T>::getVia(DrivableExecutor* e) {
1012   return std::move(waitVia(e).value());
1013 }
1014
1015 template <>
1016 inline void Future<void>::getVia(DrivableExecutor* e) {
1017   waitVia(e).value();
1018 }
1019
1020 template <class T>
1021 Future<bool> Future<T>::willEqual(Future<T>& f) {
1022   return whenAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1023     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1024       return std::get<0>(t).value() == std::get<1>(t).value();
1025     } else {
1026       return false;
1027       }
1028   });
1029 }
1030
1031 template <class T>
1032 template <class F>
1033 Future<T> Future<T>::filter(F predicate) {
1034   auto p = folly::makeMoveWrapper(std::move(predicate));
1035   return this->then([p](T val) {
1036     T const& valConstRef = val;
1037     if (!(*p)(valConstRef)) {
1038       throw PredicateDoesNotObtain();
1039     }
1040     return val;
1041   });
1042 }
1043
1044 namespace futures {
1045   namespace {
1046     template <class Z>
1047     Future<Z> chainHelper(Future<Z> f) {
1048       return f;
1049     }
1050
1051     template <class Z, class F, class Fn, class... Callbacks>
1052     Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
1053       return chainHelper<Z>(f.then(fn), fns...);
1054     }
1055   }
1056
1057   template <class A, class Z, class... Callbacks>
1058   std::function<Future<Z>(Try<A>)>
1059   chain(Callbacks... fns) {
1060     MoveWrapper<Promise<A>> pw;
1061     MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
1062     return [=](Try<A> t) mutable {
1063       pw->setTry(std::move(t));
1064       return std::move(*fw);
1065     };
1066   }
1067
1068   template <class It, class F, class ItT, class Result>
1069   std::vector<Future<Result>> map(It first, It last, F func) {
1070     std::vector<Future<Result>> results;
1071     for (auto it = first; it != last; it++) {
1072       results.push_back(it->then(func));
1073     }
1074     return results;
1075   }
1076 }
1077
1078 } // namespace folly
1079
1080 // I haven't included a Future<T&> specialization because I don't forsee us
1081 // using it, however it is not difficult to add when needed. Refer to
1082 // Future<void> for guidance. std::future and boost::future code would also be
1083 // instructive.