(Wangle) Implement collect* using mapSetCallback and shared_ptrs
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
26
27 namespace folly {
28
29 class Timekeeper;
30
31 namespace detail {
32   Timekeeper* getTimekeeperSingleton();
33 }
34
35 template <class T>
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37   other.core_ = nullptr;
38 }
39
40 template <class T>
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42   std::swap(core_, other.core_);
43   return *this;
44 }
45
46 template <class T>
47 template <class T2,
48           typename std::enable_if<!isFuture<T2>::value, void*>::type>
49 Future<T>::Future(T2&& val) : core_(nullptr) {
50   Promise<T> p;
51   p.setValue(std::forward<T2>(val));
52   *this = p.getFuture();
53 }
54
55 template <class T>
56 template <class T2,
57           typename std::enable_if<
58             folly::is_void_or_unit<T2>::value,
59             int>::type>
60 Future<T>::Future() : core_(nullptr) {
61   Promise<T> p;
62   p.setValue();
63   *this = p.getFuture();
64 }
65
66
67 template <class T>
68 Future<T>::~Future() {
69   detach();
70 }
71
72 template <class T>
73 void Future<T>::detach() {
74   if (core_) {
75     core_->detachFuture();
76     core_ = nullptr;
77   }
78 }
79
80 template <class T>
81 void Future<T>::throwIfInvalid() const {
82   if (!core_)
83     throw NoState();
84 }
85
86 template <class T>
87 template <class F>
88 void Future<T>::setCallback_(F&& func) {
89   throwIfInvalid();
90   core_->setCallback(std::move(func));
91 }
92
93 // unwrap
94
95 template <class T>
96 template <class F>
97 typename std::enable_if<isFuture<F>::value,
98                         Future<typename isFuture<T>::Inner>>::type
99 Future<T>::unwrap() {
100   return then([](Future<typename isFuture<T>::Inner> internal_future) {
101       return internal_future;
102   });
103 }
104
105 // then
106
107 // Variant: returns a value
108 // e.g. f.then([](Try<T>&& t){ return t.value(); });
109 template <class T>
110 template <typename F, typename R, bool isTry, typename... Args>
111 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
112 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
113   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
114   typedef typename R::ReturnsFuture::Inner B;
115
116   throwIfInvalid();
117
118   // wrap these so we can move them into the lambda
119   folly::MoveWrapper<Promise<B>> p;
120   folly::MoveWrapper<F> funcm(std::forward<F>(func));
121
122   // grab the Future now before we lose our handle on the Promise
123   auto f = p->getFuture();
124   if (getExecutor()) {
125     f.setExecutor(getExecutor());
126   }
127
128   /* This is a bit tricky.
129
130      We can't just close over *this in case this Future gets moved. So we
131      make a new dummy Future. We could figure out something more
132      sophisticated that avoids making a new Future object when it can, as an
133      optimization. But this is correct.
134
135      core_ can't be moved, it is explicitly disallowed (as is copying). But
136      if there's ever a reason to allow it, this is one place that makes that
137      assumption and would need to be fixed. We use a standard shared pointer
138      for core_ (by copying it in), which means in essence obj holds a shared
139      pointer to itself.  But this shouldn't leak because Promise will not
140      outlive the continuation, because Promise will setException() with a
141      broken Promise if it is destructed before completed. We could use a
142      weak pointer but it would have to be converted to a shared pointer when
143      func is executed (because the Future returned by func may possibly
144      persist beyond the callback, if it gets moved), and so it is an
145      optimization to just make it shared from the get-go.
146
147      We have to move in the Promise and func using the MoveWrapper
148      hack. (func could be copied but it's a big drag on perf).
149
150      Two subtle but important points about this design. detail::Core has no
151      back pointers to Future or Promise, so if Future or Promise get moved
152      (and they will be moved in performant code) we don't have to do
153      anything fancy. And because we store the continuation in the
154      detail::Core, not in the Future, we can execute the continuation even
155      after the Future has gone out of scope. This is an intentional design
156      decision. It is likely we will want to be able to cancel a continuation
157      in some circumstances, but I think it should be explicit not implicit
158      in the destruction of the Future used to create it.
159      */
160   setCallback_(
161     [p, funcm](Try<T>&& t) mutable {
162       if (!isTry && t.hasException()) {
163         p->setException(std::move(t.exception()));
164       } else {
165         p->setWith([&]() {
166           return (*funcm)(t.template get<isTry, Args>()...);
167         });
168       }
169     });
170
171   return f;
172 }
173
174 // Variant: returns a Future
175 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
176 template <class T>
177 template <typename F, typename R, bool isTry, typename... Args>
178 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
179 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
180   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
181   typedef typename R::ReturnsFuture::Inner B;
182
183   throwIfInvalid();
184
185   // wrap these so we can move them into the lambda
186   folly::MoveWrapper<Promise<B>> p;
187   folly::MoveWrapper<F> funcm(std::forward<F>(func));
188
189   // grab the Future now before we lose our handle on the Promise
190   auto f = p->getFuture();
191   if (getExecutor()) {
192     f.setExecutor(getExecutor());
193   }
194
195   setCallback_(
196     [p, funcm](Try<T>&& t) mutable {
197       if (!isTry && t.hasException()) {
198         p->setException(std::move(t.exception()));
199       } else {
200         try {
201           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
202           // that didn't throw, now we can steal p
203           f2.setCallback_([p](Try<B>&& b) mutable {
204             p->setTry(std::move(b));
205           });
206         } catch (const std::exception& e) {
207           p->setException(exception_wrapper(std::current_exception(), e));
208         } catch (...) {
209           p->setException(exception_wrapper(std::current_exception()));
210         }
211       }
212     });
213
214   return f;
215 }
216
217 template <typename T>
218 template <typename R, typename Caller, typename... Args>
219   Future<typename isFuture<R>::Inner>
220 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
221   typedef typename std::remove_cv<
222     typename std::remove_reference<
223       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
224   return then([instance, func](Try<T>&& t){
225     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
226   });
227 }
228
229 template <class T>
230 template <class Executor, class Arg, class... Args>
231 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
232   -> decltype(this->then(std::forward<Arg>(arg),
233                          std::forward<Args>(args)...))
234 {
235   auto oldX = getExecutor();
236   setExecutor(x);
237   return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
238                via(oldX);
239 }
240
241 template <class T>
242 Future<void> Future<T>::then() {
243   return then([] (Try<T>&& t) {});
244 }
245
246 // onError where the callback returns T
247 template <class T>
248 template <class F>
249 typename std::enable_if<
250   !detail::callableWith<F, exception_wrapper>::value &&
251   !detail::Extract<F>::ReturnsFuture::value,
252   Future<T>>::type
253 Future<T>::onError(F&& func) {
254   typedef typename detail::Extract<F>::FirstArg Exn;
255   static_assert(
256       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
257       "Return type of onError callback must be T or Future<T>");
258
259   Promise<T> p;
260   auto f = p.getFuture();
261   auto pm = folly::makeMoveWrapper(std::move(p));
262   auto funcm = folly::makeMoveWrapper(std::move(func));
263   setCallback_([pm, funcm](Try<T>&& t) mutable {
264     if (!t.template withException<Exn>([&] (Exn& e) {
265           pm->setWith([&]{
266             return (*funcm)(e);
267           });
268         })) {
269       pm->setTry(std::move(t));
270     }
271   });
272
273   return f;
274 }
275
276 // onError where the callback returns Future<T>
277 template <class T>
278 template <class F>
279 typename std::enable_if<
280   !detail::callableWith<F, exception_wrapper>::value &&
281   detail::Extract<F>::ReturnsFuture::value,
282   Future<T>>::type
283 Future<T>::onError(F&& func) {
284   static_assert(
285       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
286       "Return type of onError callback must be T or Future<T>");
287   typedef typename detail::Extract<F>::FirstArg Exn;
288
289   Promise<T> p;
290   auto f = p.getFuture();
291   auto pm = folly::makeMoveWrapper(std::move(p));
292   auto funcm = folly::makeMoveWrapper(std::move(func));
293   setCallback_([pm, funcm](Try<T>&& t) mutable {
294     if (!t.template withException<Exn>([&] (Exn& e) {
295           try {
296             auto f2 = (*funcm)(e);
297             f2.setCallback_([pm](Try<T>&& t2) mutable {
298               pm->setTry(std::move(t2));
299             });
300           } catch (const std::exception& e2) {
301             pm->setException(exception_wrapper(std::current_exception(), e2));
302           } catch (...) {
303             pm->setException(exception_wrapper(std::current_exception()));
304           }
305         })) {
306       pm->setTry(std::move(t));
307     }
308   });
309
310   return f;
311 }
312
313 template <class T>
314 template <class F>
315 Future<T> Future<T>::ensure(F func) {
316   MoveWrapper<F> funcw(std::move(func));
317   return this->then([funcw](Try<T>&& t) {
318     (*funcw)();
319     return makeFuture(std::move(t));
320   });
321 }
322
323 template <class T>
324 template <class F>
325 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
326   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
327   return within(dur, tk)
328     .onError([funcw](TimedOut const&) { return (*funcw)(); });
329 }
330
331 template <class T>
332 template <class F>
333 typename std::enable_if<
334   detail::callableWith<F, exception_wrapper>::value &&
335   detail::Extract<F>::ReturnsFuture::value,
336   Future<T>>::type
337 Future<T>::onError(F&& func) {
338   static_assert(
339       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
340       "Return type of onError callback must be T or Future<T>");
341
342   Promise<T> p;
343   auto f = p.getFuture();
344   auto pm = folly::makeMoveWrapper(std::move(p));
345   auto funcm = folly::makeMoveWrapper(std::move(func));
346   setCallback_([pm, funcm](Try<T> t) mutable {
347     if (t.hasException()) {
348       try {
349         auto f2 = (*funcm)(std::move(t.exception()));
350         f2.setCallback_([pm](Try<T> t2) mutable {
351           pm->setTry(std::move(t2));
352         });
353       } catch (const std::exception& e2) {
354         pm->setException(exception_wrapper(std::current_exception(), e2));
355       } catch (...) {
356         pm->setException(exception_wrapper(std::current_exception()));
357       }
358     } else {
359       pm->setTry(std::move(t));
360     }
361   });
362
363   return f;
364 }
365
366 // onError(exception_wrapper) that returns T
367 template <class T>
368 template <class F>
369 typename std::enable_if<
370   detail::callableWith<F, exception_wrapper>::value &&
371   !detail::Extract<F>::ReturnsFuture::value,
372   Future<T>>::type
373 Future<T>::onError(F&& func) {
374   static_assert(
375       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
376       "Return type of onError callback must be T or Future<T>");
377
378   Promise<T> p;
379   auto f = p.getFuture();
380   auto pm = folly::makeMoveWrapper(std::move(p));
381   auto funcm = folly::makeMoveWrapper(std::move(func));
382   setCallback_([pm, funcm](Try<T> t) mutable {
383     if (t.hasException()) {
384       pm->setWith([&]{
385         return (*funcm)(std::move(t.exception()));
386       });
387     } else {
388       pm->setTry(std::move(t));
389     }
390   });
391
392   return f;
393 }
394
395 template <class T>
396 typename std::add_lvalue_reference<T>::type Future<T>::value() {
397   throwIfInvalid();
398
399   return core_->getTry().value();
400 }
401
402 template <class T>
403 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
404   throwIfInvalid();
405
406   return core_->getTry().value();
407 }
408
409 template <class T>
410 Try<T>& Future<T>::getTry() {
411   throwIfInvalid();
412
413   return core_->getTry();
414 }
415
416 template <class T>
417 Optional<Try<T>> Future<T>::poll() {
418   Optional<Try<T>> o;
419   if (core_->ready()) {
420     o = std::move(core_->getTry());
421   }
422   return o;
423 }
424
425 template <class T>
426 inline Future<T> Future<T>::via(Executor* executor) && {
427   throwIfInvalid();
428
429   setExecutor(executor);
430
431   return std::move(*this);
432 }
433
434 template <class T>
435 inline Future<T> Future<T>::via(Executor* executor) & {
436   throwIfInvalid();
437
438   MoveWrapper<Promise<T>> p;
439   auto f = p->getFuture();
440   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
441   return std::move(f).via(executor);
442 }
443
444 template <class T>
445 bool Future<T>::isReady() const {
446   throwIfInvalid();
447   return core_->ready();
448 }
449
450 template <class T>
451 void Future<T>::raise(exception_wrapper exception) {
452   core_->raise(std::move(exception));
453 }
454
455 // makeFuture
456
457 template <class T>
458 Future<typename std::decay<T>::type> makeFuture(T&& t) {
459   Promise<typename std::decay<T>::type> p;
460   p.setValue(std::forward<T>(t));
461   return p.getFuture();
462 }
463
464 inline // for multiple translation units
465 Future<void> makeFuture() {
466   Promise<void> p;
467   p.setValue();
468   return p.getFuture();
469 }
470
471 template <class F>
472 auto makeFutureWith(
473     F&& func,
474     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
475     -> Future<decltype(func())> {
476   Promise<decltype(func())> p;
477   p.setWith(
478     [&func]() {
479       return (func)();
480     });
481   return p.getFuture();
482 }
483
484 template <class F>
485 auto makeFutureWith(F const& func) -> Future<decltype(func())> {
486   F copy = func;
487   return makeFutureWith(std::move(copy));
488 }
489
490 template <class T>
491 Future<T> makeFuture(std::exception_ptr const& e) {
492   Promise<T> p;
493   p.setException(e);
494   return p.getFuture();
495 }
496
497 template <class T>
498 Future<T> makeFuture(exception_wrapper ew) {
499   Promise<T> p;
500   p.setException(std::move(ew));
501   return p.getFuture();
502 }
503
504 template <class T, class E>
505 typename std::enable_if<std::is_base_of<std::exception, E>::value,
506                         Future<T>>::type
507 makeFuture(E const& e) {
508   Promise<T> p;
509   p.setException(make_exception_wrapper<E>(e));
510   return p.getFuture();
511 }
512
513 template <class T>
514 Future<T> makeFuture(Try<T>&& t) {
515   Promise<typename std::decay<T>::type> p;
516   p.setTry(std::move(t));
517   return p.getFuture();
518 }
519
520 template <>
521 inline Future<void> makeFuture(Try<void>&& t) {
522   if (t.hasException()) {
523     return makeFuture<void>(std::move(t.exception()));
524   } else {
525     return makeFuture();
526   }
527 }
528
529 // via
530 inline Future<void> via(Executor* executor) {
531   return makeFuture().via(executor);
532 }
533
534 // mapSetCallback calls func(i, Try<T>) when every future completes
535
536 template <class T, class InputIterator, class F>
537 void mapSetCallback(InputIterator first, InputIterator last, F func) {
538   for (size_t i = 0; first != last; ++first, ++i) {
539     first->setCallback_([func, i](Try<T>&& t) {
540       func(i, std::move(t));
541     });
542   }
543 }
544
545 // collectAll (variadic)
546
547 template <typename... Fs>
548 typename detail::VariadicContext<
549   typename std::decay<Fs>::type::value_type...>::type
550 collectAll(Fs&&... fs) {
551   auto ctx = std::make_shared<detail::VariadicContext<
552     typename std::decay<Fs>::type::value_type...>>();
553   detail::collectAllVariadicHelper(ctx,
554     std::forward<typename std::decay<Fs>::type>(fs)...);
555   return ctx->p.getFuture();
556 }
557
558 // collectAll (iterator)
559
560 template <class InputIterator>
561 Future<
562   std::vector<
563   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
564 collectAll(InputIterator first, InputIterator last) {
565   typedef
566     typename std::iterator_traits<InputIterator>::value_type::value_type T;
567
568   struct CollectAllContext {
569     CollectAllContext(int n) : results(n) {}
570     ~CollectAllContext() {
571       p.setValue(std::move(results));
572     }
573     Promise<std::vector<Try<T>>> p;
574     std::vector<Try<T>> results;
575   };
576
577   auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
578   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
579     ctx->results[i] = std::move(t);
580   });
581   return ctx->p.getFuture();
582 }
583
584 namespace detail {
585
586 template <typename T>
587 struct CollectContext {
588   struct Nothing { explicit Nothing(int n) {} };
589
590   using Result = typename std::conditional<
591     std::is_void<T>::value,
592     void,
593     std::vector<T>>::type;
594
595   using InternalResult = typename std::conditional<
596     std::is_void<T>::value,
597     Nothing,
598     std::vector<Optional<T>>>::type;
599
600   explicit CollectContext(int n) : result(n) {}
601   ~CollectContext() {
602     if (!threw.exchange(true)) {
603       // map Optional<T> -> T
604       std::vector<T> finalResult;
605       finalResult.reserve(result.size());
606       std::transform(result.begin(), result.end(),
607                      std::back_inserter(finalResult),
608                      [](Optional<T>& o) { return std::move(o.value()); });
609       p.setValue(std::move(finalResult));
610     }
611   }
612   inline void setPartialResult(size_t i, Try<T>& t) {
613     result[i] = std::move(t.value());
614   }
615   Promise<Result> p;
616   InternalResult result;
617   std::atomic<bool> threw;
618 };
619
620 // Specialize for void (implementations in Future.cpp)
621
622 template <>
623 CollectContext<void>::~CollectContext();
624
625 template <>
626 void CollectContext<void>::setPartialResult(size_t i, Try<void>& t);
627
628 }
629
630 template <class InputIterator>
631 Future<typename detail::CollectContext<
632   typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
633 collect(InputIterator first, InputIterator last) {
634   typedef
635     typename std::iterator_traits<InputIterator>::value_type::value_type T;
636
637   auto ctx = std::make_shared<detail::CollectContext<T>>(
638     std::distance(first, last));
639   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
640     if (t.hasException()) {
641        if (!ctx->threw.exchange(true)) {
642          ctx->p.setException(std::move(t.exception()));
643        }
644      } else if (!ctx->threw) {
645        ctx->setPartialResult(i, t);
646      }
647   });
648   return ctx->p.getFuture();
649 }
650
651 template <class InputIterator>
652 Future<
653   std::pair<size_t,
654             Try<
655               typename
656               std::iterator_traits<InputIterator>::value_type::value_type>>>
657 collectAny(InputIterator first, InputIterator last) {
658   typedef
659     typename std::iterator_traits<InputIterator>::value_type::value_type T;
660
661   struct CollectAnyContext {
662     CollectAnyContext(size_t n) : done(false) {};
663     Promise<std::pair<size_t, Try<T>>> p;
664     std::atomic<bool> done;
665   };
666
667   auto ctx = std::make_shared<CollectAnyContext>(std::distance(first, last));
668   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
669     if (!ctx->done.exchange(true)) {
670       ctx->p.setValue(std::make_pair(i, std::move(t)));
671     }
672   });
673   return ctx->p.getFuture();
674 }
675
676 template <class InputIterator>
677 Future<std::vector<std::pair<size_t, Try<typename
678   std::iterator_traits<InputIterator>::value_type::value_type>>>>
679 collectN(InputIterator first, InputIterator last, size_t n) {
680   typedef typename
681     std::iterator_traits<InputIterator>::value_type::value_type T;
682   typedef std::vector<std::pair<size_t, Try<T>>> V;
683
684   struct CollectNContext {
685     V v;
686     std::atomic<size_t> completed = {0};
687     Promise<V> p;
688   };
689   auto ctx = std::make_shared<CollectNContext>();
690
691   if (std::distance(first, last) < n) {
692     ctx->p.setException(std::runtime_error("Not enough futures"));
693   } else {
694     // for each completed Future, increase count and add to vector, until we
695     // have n completed futures at which point we fulfil our Promise with the
696     // vector
697     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
698       auto c = ++ctx->completed;
699       if (c <= n) {
700         assert(ctx->v.size() < n);
701         ctx->v.push_back(std::make_pair(i, std::move(t)));
702         if (c == n) {
703           ctx->p.setTry(Try<V>(std::move(ctx->v)));
704         }
705       }
706     });
707   }
708
709   return ctx->p.getFuture();
710 }
711
712 template <class It, class T, class F>
713 Future<T> reduce(It first, It last, T&& initial, F&& func) {
714   if (first == last) {
715     return makeFuture(std::move(initial));
716   }
717
718   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
719   typedef typename std::conditional<
720     detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
721   typedef isTry<Arg> IsTry;
722
723   folly::MoveWrapper<T> minitial(std::move(initial));
724   auto sfunc = std::make_shared<F>(std::move(func));
725
726   auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
727     return (*sfunc)(std::move(*minitial),
728                 head.template get<IsTry::value, Arg&&>());
729   });
730
731   for (++first; first != last; ++first) {
732     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
733       return (*sfunc)(std::move(std::get<0>(t).value()),
734                   // Either return a ItT&& or a Try<ItT>&& depending
735                   // on the type of the argument of func.
736                   std::get<1>(t).template get<IsTry::value, Arg&&>());
737     });
738   }
739
740   return f;
741 }
742
743 template <class T>
744 template <class I, class F>
745 Future<I> Future<T>::reduce(I&& initial, F&& func) {
746   folly::MoveWrapper<I> minitial(std::move(initial));
747   folly::MoveWrapper<F> mfunc(std::move(func));
748   return then([minitial, mfunc](T& vals) mutable {
749     auto ret = std::move(*minitial);
750     for (auto& val : vals) {
751       ret = (*mfunc)(std::move(ret), std::move(val));
752     }
753     return ret;
754   });
755 }
756
757 template <class T>
758 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
759   return within(dur, TimedOut(), tk);
760 }
761
762 template <class T>
763 template <class E>
764 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
765
766   struct Context {
767     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
768     E exception;
769     Promise<T> promise;
770     std::atomic<bool> token;
771   };
772   auto ctx = std::make_shared<Context>(std::move(e));
773
774   if (!tk) {
775     tk = folly::detail::getTimekeeperSingleton();
776   }
777
778   tk->after(dur)
779     .then([ctx](Try<void> const& t) {
780       if (ctx->token.exchange(true) == false) {
781         if (t.hasException()) {
782           ctx->promise.setException(std::move(t.exception()));
783         } else {
784           ctx->promise.setException(std::move(ctx->exception));
785         }
786       }
787     });
788
789   this->then([ctx](Try<T>&& t) {
790     if (ctx->token.exchange(true) == false) {
791       ctx->promise.setTry(std::move(t));
792     }
793   });
794
795   return ctx->promise.getFuture();
796 }
797
798 template <class T>
799 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
800   return collectAll(*this, futures::sleep(dur, tk))
801     .then([](std::tuple<Try<T>, Try<void>> tup) {
802       Try<T>& t = std::get<0>(tup);
803       return makeFuture<T>(std::move(t));
804     });
805 }
806
807 namespace detail {
808
809 template <class T>
810 void waitImpl(Future<T>& f) {
811   // short-circuit if there's nothing to do
812   if (f.isReady()) return;
813
814   folly::fibers::Baton baton;
815   f = f.then([&](Try<T> t) {
816     baton.post();
817     return makeFuture(std::move(t));
818   });
819   baton.wait();
820
821   // There's a race here between the return here and the actual finishing of
822   // the future. f is completed, but the setup may not have finished on done
823   // after the baton has posted.
824   while (!f.isReady()) {
825     std::this_thread::yield();
826   }
827 }
828
829 template <class T>
830 void waitImpl(Future<T>& f, Duration dur) {
831   // short-circuit if there's nothing to do
832   if (f.isReady()) return;
833
834   auto baton = std::make_shared<folly::fibers::Baton>();
835   f = f.then([baton](Try<T> t) {
836     baton->post();
837     return makeFuture(std::move(t));
838   });
839
840   // Let's preserve the invariant that if we did not timeout (timed_wait returns
841   // true), then the returned Future is complete when it is returned to the
842   // caller. We need to wait out the race for that Future to complete.
843   if (baton->timed_wait(dur)) {
844     while (!f.isReady()) {
845       std::this_thread::yield();
846     }
847   }
848 }
849
850 template <class T>
851 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
852   while (!f.isReady()) {
853     e->drive();
854   }
855 }
856
857 } // detail
858
859 template <class T>
860 Future<T>& Future<T>::wait() & {
861   detail::waitImpl(*this);
862   return *this;
863 }
864
865 template <class T>
866 Future<T>&& Future<T>::wait() && {
867   detail::waitImpl(*this);
868   return std::move(*this);
869 }
870
871 template <class T>
872 Future<T>& Future<T>::wait(Duration dur) & {
873   detail::waitImpl(*this, dur);
874   return *this;
875 }
876
877 template <class T>
878 Future<T>&& Future<T>::wait(Duration dur) && {
879   detail::waitImpl(*this, dur);
880   return std::move(*this);
881 }
882
883 template <class T>
884 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
885   detail::waitViaImpl(*this, e);
886   return *this;
887 }
888
889 template <class T>
890 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
891   detail::waitViaImpl(*this, e);
892   return std::move(*this);
893 }
894
895 template <class T>
896 T Future<T>::get() {
897   return std::move(wait().value());
898 }
899
900 template <>
901 inline void Future<void>::get() {
902   wait().value();
903 }
904
905 template <class T>
906 T Future<T>::get(Duration dur) {
907   wait(dur);
908   if (isReady()) {
909     return std::move(value());
910   } else {
911     throw TimedOut();
912   }
913 }
914
915 template <>
916 inline void Future<void>::get(Duration dur) {
917   wait(dur);
918   if (isReady()) {
919     return;
920   } else {
921     throw TimedOut();
922   }
923 }
924
925 template <class T>
926 T Future<T>::getVia(DrivableExecutor* e) {
927   return std::move(waitVia(e).value());
928 }
929
930 template <>
931 inline void Future<void>::getVia(DrivableExecutor* e) {
932   waitVia(e).value();
933 }
934
935 namespace detail {
936   template <class T>
937   struct TryEquals {
938     static bool equals(const Try<T>& t1, const Try<T>& t2) {
939       return t1.value() == t2.value();
940     }
941   };
942
943   template <>
944   struct TryEquals<void> {
945     static bool equals(const Try<void>& t1, const Try<void>& t2) {
946       return true;
947     }
948   };
949 }
950
951 template <class T>
952 Future<bool> Future<T>::willEqual(Future<T>& f) {
953   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
954     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
955       return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
956     } else {
957       return false;
958       }
959   });
960 }
961
962 template <class T>
963 template <class F>
964 Future<T> Future<T>::filter(F predicate) {
965   auto p = folly::makeMoveWrapper(std::move(predicate));
966   return this->then([p](T val) {
967     T const& valConstRef = val;
968     if (!(*p)(valConstRef)) {
969       throw PredicateDoesNotObtain();
970     }
971     return val;
972   });
973 }
974
975 namespace futures {
976   namespace {
977     template <class Z>
978     Future<Z> chainHelper(Future<Z> f) {
979       return f;
980     }
981
982     template <class Z, class F, class Fn, class... Callbacks>
983     Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
984       return chainHelper<Z>(f.then(fn), fns...);
985     }
986   }
987
988   template <class A, class Z, class... Callbacks>
989   std::function<Future<Z>(Try<A>)>
990   chain(Callbacks... fns) {
991     MoveWrapper<Promise<A>> pw;
992     MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
993     return [=](Try<A> t) mutable {
994       pw->setTry(std::move(t));
995       return std::move(*fw);
996     };
997   }
998
999   template <class It, class F, class ItT, class Result>
1000   std::vector<Future<Result>> map(It first, It last, F func) {
1001     std::vector<Future<Result>> results;
1002     for (auto it = first; it != last; it++) {
1003       results.push_back(it->then(func));
1004     }
1005     return results;
1006   }
1007 }
1008
1009 // Instantiate the most common Future types to save compile time
1010 extern template class Future<void>;
1011 extern template class Future<bool>;
1012 extern template class Future<int>;
1013 extern template class Future<int64_t>;
1014 extern template class Future<std::string>;
1015 extern template class Future<double>;
1016
1017 } // namespace folly
1018
1019 // I haven't included a Future<T&> specialization because I don't forsee us
1020 // using it, however it is not difficult to add when needed. Refer to
1021 // Future<void> for guidance. std::future and boost::future code would also be
1022 // instructive.