Adding DeferredExecutor to support deferred execution of tasks on a future returned...
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <cassert>
21 #include <chrono>
22 #include <thread>
23
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/executors/InlineExecutor.h>
27 #include <folly/futures/Timekeeper.h>
28 #include <folly/futures/detail/Core.h>
29
30 #ifndef FOLLY_FUTURE_USING_FIBER
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
33 #else
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
36 #endif
37 #endif
38
39 namespace folly {
40
41 class Timekeeper;
42
43 namespace futures {
44 namespace detail {
45 #if FOLLY_FUTURE_USING_FIBER
46 typedef folly::fibers::Baton FutureBatonType;
47 #else
48 typedef folly::Baton<> FutureBatonType;
49 #endif
50 } // namespace detail
51 } // namespace futures
52
53 namespace detail {
54 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
55 } // namespace detail
56
57 namespace futures {
58 namespace detail {
59 //  Guarantees that the stored functor is destructed before the stored promise
60 //  may be fulfilled. Assumes the stored functor to be noexcept-destructible.
61 template <typename T, typename F>
62 class CoreCallbackState {
63  public:
64   template <typename FF>
65   CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
66       noexcept(F(std::declval<FF>())))
67       : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
68     assert(before_barrier());
69   }
70
71   CoreCallbackState(CoreCallbackState&& that) noexcept(
72       noexcept(F(std::declval<F>()))) {
73     if (that.before_barrier()) {
74       new (&func_) F(std::move(that.func_));
75       promise_ = that.stealPromise();
76     }
77   }
78
79   CoreCallbackState& operator=(CoreCallbackState&&) = delete;
80
81   ~CoreCallbackState() {
82     if (before_barrier()) {
83       stealPromise();
84     }
85   }
86
87   template <typename... Args>
88   auto invoke(Args&&... args) noexcept(
89       noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
90     assert(before_barrier());
91     return std::move(func_)(std::forward<Args>(args)...);
92   }
93
94   template <typename... Args>
95   auto tryInvoke(Args&&... args) noexcept {
96     return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
97   }
98
99   void setTry(Try<T>&& t) {
100     stealPromise().setTry(std::move(t));
101   }
102
103   void setException(exception_wrapper&& ew) {
104     stealPromise().setException(std::move(ew));
105   }
106
107   Promise<T> stealPromise() noexcept {
108     assert(before_barrier());
109     func_.~F();
110     return std::move(promise_);
111   }
112
113  private:
114   bool before_barrier() const noexcept {
115     return !promise_.isFulfilled();
116   }
117
118   union {
119     F func_;
120   };
121   Promise<T> promise_{Promise<T>::makeEmpty()};
122 };
123
124 template <typename T, typename F>
125 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
126     noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
127         std::declval<Promise<T>&&>(),
128         std::declval<F&&>()))) {
129   return CoreCallbackState<T, _t<std::decay<F>>>(
130       std::move(p), std::forward<F>(f));
131 }
132
133 template <class T>
134 FutureBase<T>::FutureBase(SemiFuture<T>&& other) noexcept : core_(other.core_) {
135   other.core_ = nullptr;
136 }
137
138 template <class T>
139 FutureBase<T>::FutureBase(Future<T>&& other) noexcept : core_(other.core_) {
140   other.core_ = nullptr;
141 }
142
143 template <class T>
144 template <class T2, typename>
145 FutureBase<T>::FutureBase(T2&& val)
146     : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
147
148 template <class T>
149 template <typename T2>
150 FutureBase<T>::FutureBase(
151     typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
152     : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
153
154 template <class T>
155 template <
156     class... Args,
157     typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
158         type>
159 FutureBase<T>::FutureBase(in_place_t, Args&&... args)
160     : core_(
161           new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
162 }
163
164 template <class T>
165 template <class FutureType>
166 void FutureBase<T>::assign(FutureType& other) noexcept {
167   std::swap(core_, other.core_);
168 }
169
170 template <class T>
171 FutureBase<T>::~FutureBase() {
172   detach();
173 }
174
175 template <class T>
176 T& FutureBase<T>::value() & {
177   throwIfInvalid();
178
179   return core_->getTry().value();
180 }
181
182 template <class T>
183 T const& FutureBase<T>::value() const& {
184   throwIfInvalid();
185
186   return core_->getTry().value();
187 }
188
189 template <class T>
190 T&& FutureBase<T>::value() && {
191   throwIfInvalid();
192
193   return std::move(core_->getTry().value());
194 }
195
196 template <class T>
197 T const&& FutureBase<T>::value() const&& {
198   throwIfInvalid();
199
200   return std::move(core_->getTry().value());
201 }
202
203 template <class T>
204 bool FutureBase<T>::isReady() const {
205   throwIfInvalid();
206   return core_->ready();
207 }
208
209 template <class T>
210 bool FutureBase<T>::hasValue() {
211   return getTry().hasValue();
212 }
213
214 template <class T>
215 bool FutureBase<T>::hasException() {
216   return getTry().hasException();
217 }
218
219 template <class T>
220 void FutureBase<T>::detach() {
221   if (core_) {
222     core_->detachFuture();
223     core_ = nullptr;
224   }
225 }
226
227 template <class T>
228 Try<T>& FutureBase<T>::getTry() {
229   throwIfInvalid();
230
231   return core_->getTry();
232 }
233
234 template <class T>
235 void FutureBase<T>::throwIfInvalid() const {
236   if (!core_) {
237     throwNoState();
238   }
239 }
240
241 template <class T>
242 Optional<Try<T>> FutureBase<T>::poll() {
243   Optional<Try<T>> o;
244   if (core_->ready()) {
245     o = std::move(core_->getTry());
246   }
247   return o;
248 }
249
250 template <class T>
251 void FutureBase<T>::raise(exception_wrapper exception) {
252   core_->raise(std::move(exception));
253 }
254
255 template <class T>
256 template <class F>
257 void FutureBase<T>::setCallback_(F&& func) {
258   throwIfInvalid();
259   core_->setCallback(std::forward<F>(func));
260 }
261
262 template <class T>
263 FutureBase<T>::FutureBase(futures::detail::EmptyConstruct) noexcept
264     : core_(nullptr) {}
265
266 // then
267
268 // Variant: returns a value
269 // e.g. f.then([](Try<T>&& t){ return t.value(); });
270 template <class T>
271 template <typename F, typename R, bool isTry, typename... Args>
272 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
273 FutureBase<T>::thenImplementation(
274     F&& func,
275     futures::detail::argResult<isTry, F, Args...>) {
276   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
277   typedef typename R::ReturnsFuture::Inner B;
278
279   this->throwIfInvalid();
280
281   Promise<B> p;
282   p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
283
284   // grab the Future now before we lose our handle on the Promise
285   auto f = p.getFuture();
286   f.core_->setExecutorNoLock(this->getExecutor());
287
288   /* This is a bit tricky.
289
290      We can't just close over *this in case this Future gets moved. So we
291      make a new dummy Future. We could figure out something more
292      sophisticated that avoids making a new Future object when it can, as an
293      optimization. But this is correct.
294
295      core_ can't be moved, it is explicitly disallowed (as is copying). But
296      if there's ever a reason to allow it, this is one place that makes that
297      assumption and would need to be fixed. We use a standard shared pointer
298      for core_ (by copying it in), which means in essence obj holds a shared
299      pointer to itself.  But this shouldn't leak because Promise will not
300      outlive the continuation, because Promise will setException() with a
301      broken Promise if it is destructed before completed. We could use a
302      weak pointer but it would have to be converted to a shared pointer when
303      func is executed (because the Future returned by func may possibly
304      persist beyond the callback, if it gets moved), and so it is an
305      optimization to just make it shared from the get-go.
306
307      Two subtle but important points about this design. futures::detail::Core
308      has no back pointers to Future or Promise, so if Future or Promise get
309      moved (and they will be moved in performant code) we don't have to do
310      anything fancy. And because we store the continuation in the
311      futures::detail::Core, not in the Future, we can execute the continuation
312      even after the Future has gone out of scope. This is an intentional design
313      decision. It is likely we will want to be able to cancel a continuation
314      in some circumstances, but I think it should be explicit not implicit
315      in the destruction of the Future used to create it.
316      */
317   this->setCallback_(
318       [state = futures::detail::makeCoreCallbackState(
319            std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
320
321         if (!isTry && t.hasException()) {
322           state.setException(std::move(t.exception()));
323         } else {
324           state.setTry(makeTryWith(
325               [&] { return state.invoke(t.template get<isTry, Args>()...); }));
326         }
327       });
328   return f;
329 }
330
331 // Variant: returns a Future
332 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
333 template <class T>
334 template <typename F, typename R, bool isTry, typename... Args>
335 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
336 FutureBase<T>::thenImplementation(
337     F&& func,
338     futures::detail::argResult<isTry, F, Args...>) {
339   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
340   typedef typename R::ReturnsFuture::Inner B;
341   this->throwIfInvalid();
342
343   Promise<B> p;
344   p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
345
346   // grab the Future now before we lose our handle on the Promise
347   auto f = p.getFuture();
348   f.core_->setExecutorNoLock(this->getExecutor());
349
350   this->setCallback_(
351       [state = futures::detail::makeCoreCallbackState(
352            std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
353         if (!isTry && t.hasException()) {
354           state.setException(std::move(t.exception()));
355         } else {
356           auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
357           if (tf2.hasException()) {
358             state.setException(std::move(tf2.exception()));
359           } else {
360             tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
361               p.setTry(std::move(b));
362             });
363           }
364         }
365       });
366
367   return f;
368 }
369 } // namespace detail
370 } // namespace futures
371
372 template <class T>
373 SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
374   return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
375 }
376
377 // makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
378 template <class F>
379 typename std::enable_if<
380     isSemiFuture<typename std::result_of<F()>::type>::value,
381     typename std::result_of<F()>::type>::type
382 makeSemiFutureWith(F&& func) {
383   using InnerType =
384       typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
385   try {
386     return std::forward<F>(func)();
387   } catch (std::exception& e) {
388     return makeSemiFuture<InnerType>(
389         exception_wrapper(std::current_exception(), e));
390   } catch (...) {
391     return makeSemiFuture<InnerType>(
392         exception_wrapper(std::current_exception()));
393   }
394 }
395
396 // makeSemiFutureWith(T()) -> SemiFuture<T>
397 // makeSemiFutureWith(void()) -> SemiFuture<Unit>
398 template <class F>
399 typename std::enable_if<
400     !(isSemiFuture<typename std::result_of<F()>::type>::value),
401     SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
402 makeSemiFutureWith(F&& func) {
403   using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
404   return makeSemiFuture<LiftedResult>(
405       makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
406 }
407
408 template <class T>
409 SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
410   return makeSemiFuture(Try<T>(e));
411 }
412
413 template <class T>
414 SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
415   return makeSemiFuture(Try<T>(std::move(ew)));
416 }
417
418 template <class T, class E>
419 typename std::
420     enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
421     makeSemiFuture(E const& e) {
422   return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
423 }
424
425 template <class T>
426 SemiFuture<T> makeSemiFuture(Try<T>&& t) {
427   return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
428 }
429
430 // This must be defined after the constructors to avoid a bug in MSVC
431 // https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
432 inline SemiFuture<Unit> makeSemiFuture() {
433   return makeSemiFuture(Unit{});
434 }
435
436 template <class T>
437 SemiFuture<T> SemiFuture<T>::makeEmpty() {
438   return SemiFuture<T>(futures::detail::EmptyConstruct{});
439 }
440
441 template <class T>
442 SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept
443     : futures::detail::FutureBase<T>(std::move(other)) {}
444
445 template <class T>
446 SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept
447     : futures::detail::FutureBase<T>(std::move(other)) {
448   // SemiFuture should not have an executor on construction
449   if (this->core_) {
450     this->setExecutor(nullptr);
451   }
452 }
453
454 template <class T>
455 SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
456   this->assign(other);
457   return *this;
458 }
459
460 template <class T>
461 SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
462   this->assign(other);
463   // SemiFuture should not have an executor on construction
464   if (this->core_) {
465     this->setExecutor(nullptr);
466   }
467   return *this;
468 }
469
470 template <class T>
471 void SemiFuture<T>::boost_() {
472   // If a SemiFuture has an executor it should be deferred, so boost it
473   if (auto e = this->getExecutor()) {
474     // We know in a SemiFuture that if we have an executor it should be
475     // DeferredExecutor. Verify this in debug mode.
476     DCHECK(dynamic_cast<DeferredExecutor*>(e));
477
478     auto ka = static_cast<DeferredExecutor*>(e)->getKeepAliveToken();
479     static_cast<DeferredExecutor*>(e)->boost();
480   }
481 }
482
483 template <class T>
484 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
485   throwIfInvalid();
486
487   // If current executor is deferred, boost block to ensure that work
488   // progresses and is run on the new executor.
489   auto oldExecutor = this->getExecutor();
490   if (oldExecutor && executor && (executor != oldExecutor)) {
491     // We know in a SemiFuture that if we have an executor it should be
492     // DeferredExecutor. Verify this in debug mode.
493     DCHECK(dynamic_cast<DeferredExecutor*>(this->getExecutor()));
494     if (static_cast<DeferredExecutor*>(oldExecutor)) {
495       executor->add([oldExecutorKA = oldExecutor->getKeepAliveToken()]() {
496         static_cast<DeferredExecutor*>(oldExecutorKA.get())->boost();
497       });
498     }
499   }
500
501   this->setExecutor(executor, priority);
502
503   auto newFuture = Future<T>(this->core_);
504   this->core_ = nullptr;
505   return newFuture;
506 }
507
508 template <class T>
509 template <typename F>
510 SemiFuture<typename futures::detail::callableResult<T, F>::Return::value_type>
511 SemiFuture<T>::defer(F&& func) && {
512   // If we already have a deferred executor, use it, otherwise create one
513   auto defKeepAlive = this->getExecutor()
514       ? this->getExecutor()->getKeepAliveToken()
515       : DeferredExecutor::create();
516   auto e = defKeepAlive.get();
517   // We know in a SemiFuture that if we have an executor it should be
518   // DeferredExecutor (either it was that way before, or we just created it).
519   // Verify this in debug mode.
520   DCHECK(dynamic_cast<DeferredExecutor*>(e));
521   // Convert to a folly::future with a deferred executor
522   // Will be low-cost if this is not a new executor as via optimises for that
523   // case
524   auto sf =
525       std::move(*this)
526           .via(defKeepAlive.get())
527           // Then add the work, with a wrapper function that captures the
528           // keepAlive so the executor is destroyed at the right time.
529           .then(
530               DeferredExecutor::wrap(std::move(defKeepAlive), std::move(func)))
531           // Finally, convert back o a folly::SemiFuture to hide the executor
532           .semi();
533   // Carry deferred executor through chain as constructor from Future will
534   // nullify it
535   sf.setExecutor(e);
536   return sf;
537 }
538
539 template <class T>
540 Future<T> Future<T>::makeEmpty() {
541   return Future<T>(futures::detail::EmptyConstruct{});
542 }
543
544 template <class T>
545 Future<T>::Future(Future<T>&& other) noexcept
546     : futures::detail::FutureBase<T>(std::move(other)) {}
547
548 template <class T>
549 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
550   this->assign(other);
551   return *this;
552 }
553
554 template <class T>
555 template <
556     class T2,
557     typename std::enable_if<
558         !std::is_same<T, typename std::decay<T2>::type>::value &&
559             std::is_constructible<T, T2&&>::value &&
560             std::is_convertible<T2&&, T>::value,
561         int>::type>
562 Future<T>::Future(Future<T2>&& other)
563     : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
564
565 template <class T>
566 template <
567     class T2,
568     typename std::enable_if<
569         !std::is_same<T, typename std::decay<T2>::type>::value &&
570             std::is_constructible<T, T2&&>::value &&
571             !std::is_convertible<T2&&, T>::value,
572         int>::type>
573 Future<T>::Future(Future<T2>&& other)
574     : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
575
576 template <class T>
577 template <
578     class T2,
579     typename std::enable_if<
580         !std::is_same<T, typename std::decay<T2>::type>::value &&
581             std::is_constructible<T, T2&&>::value,
582         int>::type>
583 Future<T>& Future<T>::operator=(Future<T2>&& other) {
584   return operator=(
585       std::move(other).then([](T2&& v) { return T(std::move(v)); }));
586 }
587
588 // unwrap
589
590 template <class T>
591 template <class F>
592 typename std::
593     enable_if<isFuture<F>::value, Future<typename isFuture<T>::Inner>>::type
594     Future<T>::unwrap() {
595   return then([](Future<typename isFuture<T>::Inner> internal_future) {
596     return internal_future;
597   });
598 }
599
600 template <class T>
601 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
602   this->throwIfInvalid();
603
604   this->setExecutor(executor, priority);
605
606   auto newFuture = Future<T>(this->core_);
607   this->core_ = nullptr;
608   return newFuture;
609 }
610
611 template <class T>
612 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
613   this->throwIfInvalid();
614   Promise<T> p;
615   auto f = p.getFuture();
616   auto func = [p = std::move(p)](Try<T>&& t) mutable {
617     p.setTry(std::move(t));
618   };
619   using R = futures::detail::callableResult<T, decltype(func)>;
620   this->template thenImplementation<decltype(func), R>(
621       std::move(func), typename R::Arg());
622   return std::move(f).via(executor, priority);
623 }
624
625 template <typename T>
626 template <typename R, typename Caller, typename... Args>
627   Future<typename isFuture<R>::Inner>
628 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
629   typedef typename std::remove_cv<typename std::remove_reference<
630       typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
631       FirstArg;
632
633   return then([instance, func](Try<T>&& t){
634     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
635   });
636 }
637
638 template <class T>
639 Future<Unit> Future<T>::then() {
640   return then([] () {});
641 }
642
643 // onError where the callback returns T
644 template <class T>
645 template <class F>
646 typename std::enable_if<
647     !futures::detail::callableWith<F, exception_wrapper>::value &&
648         !futures::detail::callableWith<F, exception_wrapper&>::value &&
649         !futures::detail::Extract<F>::ReturnsFuture::value,
650     Future<T>>::type
651 Future<T>::onError(F&& func) {
652   typedef std::remove_reference_t<
653       typename futures::detail::Extract<F>::FirstArg>
654       Exn;
655   static_assert(
656       std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
657       "Return type of onError callback must be T or Future<T>");
658
659   Promise<T> p;
660   p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
661   auto f = p.getFuture();
662
663   this->setCallback_(
664       [state = futures::detail::makeCoreCallbackState(
665            std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
666         if (auto e = t.template tryGetExceptionObject<Exn>()) {
667           state.setTry(makeTryWith([&] { return state.invoke(*e); }));
668         } else {
669           state.setTry(std::move(t));
670         }
671       });
672
673   return f;
674 }
675
676 // onError where the callback returns Future<T>
677 template <class T>
678 template <class F>
679 typename std::enable_if<
680     !futures::detail::callableWith<F, exception_wrapper>::value &&
681         !futures::detail::callableWith<F, exception_wrapper&>::value &&
682         futures::detail::Extract<F>::ReturnsFuture::value,
683     Future<T>>::type
684 Future<T>::onError(F&& func) {
685   static_assert(
686       std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
687           value,
688       "Return type of onError callback must be T or Future<T>");
689   typedef std::remove_reference_t<
690       typename futures::detail::Extract<F>::FirstArg>
691       Exn;
692
693   Promise<T> p;
694   auto f = p.getFuture();
695
696   this->setCallback_(
697       [state = futures::detail::makeCoreCallbackState(
698            std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
699         if (auto e = t.template tryGetExceptionObject<Exn>()) {
700           auto tf2 = state.tryInvoke(*e);
701           if (tf2.hasException()) {
702             state.setException(std::move(tf2.exception()));
703           } else {
704             tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
705               p.setTry(std::move(t3));
706             });
707           }
708         } else {
709           state.setTry(std::move(t));
710         }
711       });
712
713   return f;
714 }
715
716 template <class T>
717 template <class F>
718 Future<T> Future<T>::ensure(F&& func) {
719   return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
720     std::move(funcw)();
721     return makeFuture(std::move(t));
722   });
723 }
724
725 template <class T>
726 template <class F>
727 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
728   return within(dur, tk).onError([funcw = std::forward<F>(func)](
729       TimedOut const&) { return std::move(funcw)(); });
730 }
731
732 template <class T>
733 template <class F>
734 typename std::enable_if<
735     futures::detail::callableWith<F, exception_wrapper>::value &&
736         futures::detail::Extract<F>::ReturnsFuture::value,
737     Future<T>>::type
738 Future<T>::onError(F&& func) {
739   static_assert(
740       std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
741           value,
742       "Return type of onError callback must be T or Future<T>");
743
744   Promise<T> p;
745   auto f = p.getFuture();
746   this->setCallback_(
747       [state = futures::detail::makeCoreCallbackState(
748            std::move(p), std::forward<F>(func))](Try<T> t) mutable {
749         if (t.hasException()) {
750           auto tf2 = state.tryInvoke(std::move(t.exception()));
751           if (tf2.hasException()) {
752             state.setException(std::move(tf2.exception()));
753           } else {
754             tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
755               p.setTry(std::move(t3));
756             });
757           }
758         } else {
759           state.setTry(std::move(t));
760         }
761       });
762
763   return f;
764 }
765
766 // onError(exception_wrapper) that returns T
767 template <class T>
768 template <class F>
769 typename std::enable_if<
770     futures::detail::callableWith<F, exception_wrapper>::value &&
771         !futures::detail::Extract<F>::ReturnsFuture::value,
772     Future<T>>::type
773 Future<T>::onError(F&& func) {
774   static_assert(
775       std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
776           value,
777       "Return type of onError callback must be T or Future<T>");
778
779   Promise<T> p;
780   auto f = p.getFuture();
781   this->setCallback_(
782       [state = futures::detail::makeCoreCallbackState(
783            std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
784         if (t.hasException()) {
785           state.setTry(makeTryWith(
786               [&] { return state.invoke(std::move(t.exception())); }));
787         } else {
788           state.setTry(std::move(t));
789         }
790       });
791
792   return f;
793 }
794
795 template <class T>
796 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
797   return waitVia(e).getTry();
798 }
799
800 template <class Func>
801 auto via(Executor* x, Func&& func)
802     -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
803   // TODO make this actually more performant. :-P #7260175
804   return via(x).then(std::forward<Func>(func));
805 }
806
807 // makeFuture
808
809 template <class T>
810 Future<typename std::decay<T>::type> makeFuture(T&& t) {
811   return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
812 }
813
814 inline Future<Unit> makeFuture() {
815   return makeFuture(Unit{});
816 }
817
818 // makeFutureWith(Future<T>()) -> Future<T>
819 template <class F>
820 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
821                         typename std::result_of<F()>::type>::type
822 makeFutureWith(F&& func) {
823   using InnerType =
824       typename isFuture<typename std::result_of<F()>::type>::Inner;
825   try {
826     return std::forward<F>(func)();
827   } catch (std::exception& e) {
828     return makeFuture<InnerType>(
829         exception_wrapper(std::current_exception(), e));
830   } catch (...) {
831     return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
832   }
833 }
834
835 // makeFutureWith(T()) -> Future<T>
836 // makeFutureWith(void()) -> Future<Unit>
837 template <class F>
838 typename std::enable_if<
839     !(isFuture<typename std::result_of<F()>::type>::value),
840     Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
841 makeFutureWith(F&& func) {
842   using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
843   return makeFuture<LiftedResult>(
844       makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
845 }
846
847 template <class T>
848 Future<T> makeFuture(std::exception_ptr const& e) {
849   return makeFuture(Try<T>(e));
850 }
851
852 template <class T>
853 Future<T> makeFuture(exception_wrapper ew) {
854   return makeFuture(Try<T>(std::move(ew)));
855 }
856
857 template <class T, class E>
858 typename std::enable_if<std::is_base_of<std::exception, E>::value,
859                         Future<T>>::type
860 makeFuture(E const& e) {
861   return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
862 }
863
864 template <class T>
865 Future<T> makeFuture(Try<T>&& t) {
866   return Future<T>(new futures::detail::Core<T>(std::move(t)));
867 }
868
869 // via
870 Future<Unit> via(Executor* executor, int8_t priority) {
871   return makeFuture().via(executor, priority);
872 }
873
874 // mapSetCallback calls func(i, Try<T>) when every future completes
875
876 template <class T, class InputIterator, class F>
877 void mapSetCallback(InputIterator first, InputIterator last, F func) {
878   for (size_t i = 0; first != last; ++first, ++i) {
879     first->setCallback_([func, i](Try<T>&& t) {
880       func(i, std::move(t));
881     });
882   }
883 }
884
885 // collectAll (variadic)
886
887 template <typename... Fs>
888 typename futures::detail::CollectAllVariadicContext<
889     typename std::decay<Fs>::type::value_type...>::type
890 collectAll(Fs&&... fs) {
891   auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
892       typename std::decay<Fs>::type::value_type...>>();
893   futures::detail::collectVariadicHelper<
894       futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
895   return ctx->p.getFuture();
896 }
897
898 // collectAll (iterator)
899
900 template <class InputIterator>
901 Future<
902   std::vector<
903   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
904 collectAll(InputIterator first, InputIterator last) {
905   typedef
906     typename std::iterator_traits<InputIterator>::value_type::value_type T;
907
908   struct CollectAllContext {
909     CollectAllContext(size_t n) : results(n) {}
910     ~CollectAllContext() {
911       p.setValue(std::move(results));
912     }
913     Promise<std::vector<Try<T>>> p;
914     std::vector<Try<T>> results;
915   };
916
917   auto ctx =
918       std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
919   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
920     ctx->results[i] = std::move(t);
921   });
922   return ctx->p.getFuture();
923 }
924
925 // collect (iterator)
926
927 namespace futures {
928 namespace detail {
929
930 template <typename T>
931 struct CollectContext {
932   struct Nothing {
933     explicit Nothing(int /* n */) {}
934   };
935
936   using Result = typename std::conditional<
937     std::is_void<T>::value,
938     void,
939     std::vector<T>>::type;
940
941   using InternalResult = typename std::conditional<
942     std::is_void<T>::value,
943     Nothing,
944     std::vector<Optional<T>>>::type;
945
946   explicit CollectContext(size_t n) : result(n) {}
947   ~CollectContext() {
948     if (!threw.exchange(true)) {
949       // map Optional<T> -> T
950       std::vector<T> finalResult;
951       finalResult.reserve(result.size());
952       std::transform(result.begin(), result.end(),
953                      std::back_inserter(finalResult),
954                      [](Optional<T>& o) { return std::move(o.value()); });
955       p.setValue(std::move(finalResult));
956     }
957   }
958   inline void setPartialResult(size_t i, Try<T>& t) {
959     result[i] = std::move(t.value());
960   }
961   Promise<Result> p;
962   InternalResult result;
963   std::atomic<bool> threw {false};
964 };
965
966 } // namespace detail
967 } // namespace futures
968
969 template <class InputIterator>
970 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
971     InputIterator>::value_type::value_type>::Result>
972 collect(InputIterator first, InputIterator last) {
973   typedef
974     typename std::iterator_traits<InputIterator>::value_type::value_type T;
975
976   auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
977       std::distance(first, last));
978   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
979     if (t.hasException()) {
980        if (!ctx->threw.exchange(true)) {
981          ctx->p.setException(std::move(t.exception()));
982        }
983      } else if (!ctx->threw) {
984        ctx->setPartialResult(i, t);
985      }
986   });
987   return ctx->p.getFuture();
988 }
989
990 // collect (variadic)
991
992 template <typename... Fs>
993 typename futures::detail::CollectVariadicContext<
994     typename std::decay<Fs>::type::value_type...>::type
995 collect(Fs&&... fs) {
996   auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
997       typename std::decay<Fs>::type::value_type...>>();
998   futures::detail::collectVariadicHelper<
999       futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
1000   return ctx->p.getFuture();
1001 }
1002
1003 // collectAny (iterator)
1004
1005 template <class InputIterator>
1006 Future<
1007   std::pair<size_t,
1008             Try<
1009               typename
1010               std::iterator_traits<InputIterator>::value_type::value_type>>>
1011 collectAny(InputIterator first, InputIterator last) {
1012   typedef
1013     typename std::iterator_traits<InputIterator>::value_type::value_type T;
1014
1015   struct CollectAnyContext {
1016     CollectAnyContext() {}
1017     Promise<std::pair<size_t, Try<T>>> p;
1018     std::atomic<bool> done {false};
1019   };
1020
1021   auto ctx = std::make_shared<CollectAnyContext>();
1022   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
1023     if (!ctx->done.exchange(true)) {
1024       ctx->p.setValue(std::make_pair(i, std::move(t)));
1025     }
1026   });
1027   return ctx->p.getFuture();
1028 }
1029
1030 // collectAnyWithoutException (iterator)
1031
1032 template <class InputIterator>
1033 Future<std::pair<
1034     size_t,
1035     typename std::iterator_traits<InputIterator>::value_type::value_type>>
1036 collectAnyWithoutException(InputIterator first, InputIterator last) {
1037   typedef
1038       typename std::iterator_traits<InputIterator>::value_type::value_type T;
1039
1040   struct CollectAnyWithoutExceptionContext {
1041     CollectAnyWithoutExceptionContext(){}
1042     Promise<std::pair<size_t, T>> p;
1043     std::atomic<bool> done{false};
1044     std::atomic<size_t> nFulfilled{0};
1045     size_t nTotal;
1046   };
1047
1048   auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
1049   ctx->nTotal = size_t(std::distance(first, last));
1050
1051   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
1052     if (!t.hasException() && !ctx->done.exchange(true)) {
1053       ctx->p.setValue(std::make_pair(i, std::move(t.value())));
1054     } else if (++ctx->nFulfilled == ctx->nTotal) {
1055       ctx->p.setException(t.exception());
1056     }
1057   });
1058   return ctx->p.getFuture();
1059 }
1060
1061 // collectN (iterator)
1062
1063 template <class InputIterator>
1064 Future<std::vector<std::pair<size_t, Try<typename
1065   std::iterator_traits<InputIterator>::value_type::value_type>>>>
1066 collectN(InputIterator first, InputIterator last, size_t n) {
1067   typedef typename
1068     std::iterator_traits<InputIterator>::value_type::value_type T;
1069   typedef std::vector<std::pair<size_t, Try<T>>> V;
1070
1071   struct CollectNContext {
1072     V v;
1073     std::atomic<size_t> completed = {0};
1074     Promise<V> p;
1075   };
1076   auto ctx = std::make_shared<CollectNContext>();
1077
1078   if (size_t(std::distance(first, last)) < n) {
1079     ctx->p.setException(std::runtime_error("Not enough futures"));
1080   } else {
1081     // for each completed Future, increase count and add to vector, until we
1082     // have n completed futures at which point we fulfil our Promise with the
1083     // vector
1084     mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
1085       auto c = ++ctx->completed;
1086       if (c <= n) {
1087         assert(ctx->v.size() < n);
1088         ctx->v.emplace_back(i, std::move(t));
1089         if (c == n) {
1090           ctx->p.setTry(Try<V>(std::move(ctx->v)));
1091         }
1092       }
1093     });
1094   }
1095
1096   return ctx->p.getFuture();
1097 }
1098
1099 // reduce (iterator)
1100
1101 template <class It, class T, class F>
1102 Future<T> reduce(It first, It last, T&& initial, F&& func) {
1103   if (first == last) {
1104     return makeFuture(std::move(initial));
1105   }
1106
1107   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
1108   typedef typename std::conditional<
1109       futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
1110       Try<ItT>,
1111       ItT>::type Arg;
1112   typedef isTry<Arg> IsTry;
1113
1114   auto sfunc = std::make_shared<F>(std::move(func));
1115
1116   auto f = first->then(
1117       [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
1118         return (*sfunc)(
1119             std::move(minitial), head.template get<IsTry::value, Arg&&>());
1120       });
1121
1122   for (++first; first != last; ++first) {
1123     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
1124       return (*sfunc)(std::move(std::get<0>(t).value()),
1125                   // Either return a ItT&& or a Try<ItT>&& depending
1126                   // on the type of the argument of func.
1127                   std::get<1>(t).template get<IsTry::value, Arg&&>());
1128     });
1129   }
1130
1131   return f;
1132 }
1133
1134 // window (collection)
1135
1136 template <class Collection, class F, class ItT, class Result>
1137 std::vector<Future<Result>>
1138 window(Collection input, F func, size_t n) {
1139   // Use global inline executor singleton
1140   auto executor = &InlineExecutor::instance();
1141   return window(executor, std::move(input), std::move(func), n);
1142 }
1143
1144 template <class Collection, class F, class ItT, class Result>
1145 std::vector<Future<Result>>
1146 window(Executor* executor, Collection input, F func, size_t n) {
1147   struct WindowContext {
1148     WindowContext(Executor* executor_, Collection&& input_, F&& func_)
1149         : executor(executor_),
1150           input(std::move(input_)),
1151           promises(input.size()),
1152           func(std::move(func_)) {}
1153     std::atomic<size_t> i{0};
1154     Executor* executor;
1155     Collection input;
1156     std::vector<Promise<Result>> promises;
1157     F func;
1158
1159     static inline void spawn(std::shared_ptr<WindowContext> ctx) {
1160       size_t i = ctx->i++;
1161       if (i < ctx->input.size()) {
1162         auto fut = ctx->func(std::move(ctx->input[i]));
1163         fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
1164           const auto executor_ = ctx->executor;
1165           executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
1166             ctx->promises[i].setTry(std::move(t));
1167             // Chain another future onto this one
1168             spawn(std::move(ctx));
1169           });
1170         });
1171       }
1172     }
1173   };
1174
1175   auto max = std::min(n, input.size());
1176
1177   auto ctx = std::make_shared<WindowContext>(
1178       executor, std::move(input), std::move(func));
1179
1180   // Start the first n Futures
1181   for (size_t i = 0; i < max; ++i) {
1182     executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
1183   }
1184
1185   std::vector<Future<Result>> futures;
1186   futures.reserve(ctx->promises.size());
1187   for (auto& promise : ctx->promises) {
1188     futures.emplace_back(promise.getFuture());
1189   }
1190
1191   return futures;
1192 }
1193
1194 // reduce
1195
1196 template <class T>
1197 template <class I, class F>
1198 Future<I> Future<T>::reduce(I&& initial, F&& func) {
1199   return then([
1200     minitial = std::forward<I>(initial),
1201     mfunc = std::forward<F>(func)
1202   ](T& vals) mutable {
1203     auto ret = std::move(minitial);
1204     for (auto& val : vals) {
1205       ret = mfunc(std::move(ret), std::move(val));
1206     }
1207     return ret;
1208   });
1209 }
1210
1211 // unorderedReduce (iterator)
1212
1213 template <class It, class T, class F, class ItT, class Arg>
1214 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1215   if (first == last) {
1216     return makeFuture(std::move(initial));
1217   }
1218
1219   typedef isTry<Arg> IsTry;
1220
1221   struct UnorderedReduceContext {
1222     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1223         : lock_(), memo_(makeFuture<T>(std::move(memo))),
1224           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1225       {}
1226     folly::MicroSpinLock lock_; // protects memo_ and numThens_
1227     Future<T> memo_;
1228     F func_;
1229     size_t numThens_; // how many Futures completed and called .then()
1230     size_t numFutures_; // how many Futures in total
1231     Promise<T> promise_;
1232   };
1233
1234   auto ctx = std::make_shared<UnorderedReduceContext>(
1235     std::move(initial), std::move(func), std::distance(first, last));
1236
1237   mapSetCallback<ItT>(
1238       first,
1239       last,
1240       [ctx](size_t /* i */, Try<ItT>&& t) {
1241         // Futures can be completed in any order, simultaneously.
1242         // To make this non-blocking, we create a new Future chain in
1243         // the order of completion to reduce the values.
1244         // The spinlock just protects chaining a new Future, not actually
1245         // executing the reduce, which should be really fast.
1246         folly::MSLGuard lock(ctx->lock_);
1247         ctx->memo_ =
1248             ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1249               // Either return a ItT&& or a Try<ItT>&& depending
1250               // on the type of the argument of func.
1251               return ctx->func_(std::move(v),
1252                                 mt.template get<IsTry::value, Arg&&>());
1253             });
1254         if (++ctx->numThens_ == ctx->numFutures_) {
1255           // After reducing the value of the last Future, fulfill the Promise
1256           ctx->memo_.setCallback_(
1257               [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1258         }
1259       });
1260
1261   return ctx->promise_.getFuture();
1262 }
1263
1264 // within
1265
1266 template <class T>
1267 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1268   return within(dur, TimedOut(), tk);
1269 }
1270
1271 template <class T>
1272 template <class E>
1273 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1274
1275   struct Context {
1276     Context(E ex) : exception(std::move(ex)), promise() {}
1277     E exception;
1278     Future<Unit> thisFuture;
1279     Promise<T> promise;
1280     std::atomic<bool> token {false};
1281   };
1282
1283   if (this->isReady()) {
1284     return std::move(*this);
1285   }
1286
1287   std::shared_ptr<Timekeeper> tks;
1288   if (LIKELY(!tk)) {
1289     tks = folly::detail::getTimekeeperSingleton();
1290     tk = tks.get();
1291   }
1292
1293   if (UNLIKELY(!tk)) {
1294     return makeFuture<T>(NoTimekeeper());
1295   }
1296
1297   auto ctx = std::make_shared<Context>(std::move(e));
1298
1299   ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1300     if (ctx->token.exchange(true) == false) {
1301       ctx->promise.setTry(std::move(t));
1302     }
1303   });
1304
1305   // Have time keeper use a weak ptr to hold ctx,
1306   // so that ctx can be deallocated as soon as the future job finished.
1307   tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1308     auto lockedCtx = weakCtx.lock();
1309     if (!lockedCtx) {
1310       // ctx already released. "this" completed first, cancel "after"
1311       return;
1312     }
1313     // "after" completed first, cancel "this"
1314     lockedCtx->thisFuture.raise(TimedOut());
1315     if (lockedCtx->token.exchange(true) == false) {
1316       if (t.hasException()) {
1317         lockedCtx->promise.setException(std::move(t.exception()));
1318       } else {
1319         lockedCtx->promise.setException(std::move(lockedCtx->exception));
1320       }
1321     }
1322   });
1323
1324   return ctx->promise.getFuture().via(this->getExecutor());
1325 }
1326
1327 // delayed
1328
1329 template <class T>
1330 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1331   return collectAll(*this, futures::sleep(dur, tk))
1332       .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1333         Try<T>& t = std::get<0>(tup);
1334         return makeFuture<T>(std::move(t));
1335       });
1336 }
1337
1338 namespace futures {
1339 namespace detail {
1340
1341 template <class T>
1342 void doBoost(folly::Future<T>& /* usused */) {}
1343
1344 template <class T>
1345 void doBoost(folly::SemiFuture<T>& f) {
1346   f.boost_();
1347 }
1348
1349 template <class FutureType, typename T = typename FutureType::value_type>
1350 void waitImpl(FutureType& f) {
1351   // short-circuit if there's nothing to do
1352   if (f.isReady()) {
1353     return;
1354   }
1355
1356   FutureBatonType baton;
1357   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1358   doBoost(f);
1359   baton.wait();
1360   assert(f.isReady());
1361 }
1362
1363 template <class FutureType, typename T = typename FutureType::value_type>
1364 void waitImpl(FutureType& f, Duration dur) {
1365   // short-circuit if there's nothing to do
1366   if (f.isReady()) {
1367     return;
1368   }
1369
1370   Promise<T> promise;
1371   auto ret = promise.getFuture();
1372   auto baton = std::make_shared<FutureBatonType>();
1373   f.setCallback_([baton, promise = std::move(promise)](Try<T>&& t) mutable {
1374     promise.setTry(std::move(t));
1375     baton->post();
1376   });
1377   doBoost(f);
1378   f = std::move(ret);
1379   if (baton->timed_wait(dur)) {
1380     assert(f.isReady());
1381   }
1382 }
1383
1384 template <class T>
1385 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1386   // Set callback so to ensure that the via executor has something on it
1387   // so that once the preceding future triggers this callback, drive will
1388   // always have a callback to satisfy it
1389   if (f.isReady()) {
1390     return;
1391   }
1392   f = f.via(e).then([](T&& t) { return std::move(t); });
1393   while (!f.isReady()) {
1394     e->drive();
1395   }
1396   assert(f.isReady());
1397 }
1398
1399 } // namespace detail
1400 } // namespace futures
1401
1402 template <class T>
1403 SemiFuture<T>& SemiFuture<T>::wait() & {
1404   futures::detail::waitImpl(*this);
1405   return *this;
1406 }
1407
1408 template <class T>
1409 SemiFuture<T>&& SemiFuture<T>::wait() && {
1410   futures::detail::waitImpl(*this);
1411   return std::move(*this);
1412 }
1413
1414 template <class T>
1415 SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
1416   futures::detail::waitImpl(*this, dur);
1417   return *this;
1418 }
1419
1420 template <class T>
1421 SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
1422   futures::detail::waitImpl(*this, dur);
1423   return std::move(*this);
1424 }
1425
1426 template <class T>
1427 T SemiFuture<T>::get() && {
1428   return std::move(wait().value());
1429 }
1430
1431 template <class T>
1432 T SemiFuture<T>::get(Duration dur) && {
1433   wait(dur);
1434   if (this->isReady()) {
1435     return std::move(this->value());
1436   } else {
1437     throwTimedOut();
1438   }
1439 }
1440
1441 template <class T>
1442 Future<T>& Future<T>::wait() & {
1443   futures::detail::waitImpl(*this);
1444   return *this;
1445 }
1446
1447 template <class T>
1448 Future<T>&& Future<T>::wait() && {
1449   futures::detail::waitImpl(*this);
1450   return std::move(*this);
1451 }
1452
1453 template <class T>
1454 Future<T>& Future<T>::wait(Duration dur) & {
1455   futures::detail::waitImpl(*this, dur);
1456   return *this;
1457 }
1458
1459 template <class T>
1460 Future<T>&& Future<T>::wait(Duration dur) && {
1461   futures::detail::waitImpl(*this, dur);
1462   return std::move(*this);
1463 }
1464
1465 template <class T>
1466 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1467   futures::detail::waitViaImpl(*this, e);
1468   return *this;
1469 }
1470
1471 template <class T>
1472 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1473   futures::detail::waitViaImpl(*this, e);
1474   return std::move(*this);
1475 }
1476
1477 template <class T>
1478 T Future<T>::get() {
1479   return std::move(wait().value());
1480 }
1481
1482 template <class T>
1483 T Future<T>::get(Duration dur) {
1484   wait(dur);
1485   if (this->isReady()) {
1486     return std::move(this->value());
1487   } else {
1488     throwTimedOut();
1489   }
1490 }
1491
1492 template <class T>
1493 T Future<T>::getVia(DrivableExecutor* e) {
1494   return std::move(waitVia(e).value());
1495 }
1496
1497 namespace futures {
1498 namespace detail {
1499 template <class T>
1500 struct TryEquals {
1501   static bool equals(const Try<T>& t1, const Try<T>& t2) {
1502     return t1.value() == t2.value();
1503   }
1504 };
1505 } // namespace detail
1506 } // namespace futures
1507
1508 template <class T>
1509 Future<bool> Future<T>::willEqual(Future<T>& f) {
1510   return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1511     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1512       return futures::detail::TryEquals<T>::equals(
1513           std::get<0>(t), std::get<1>(t));
1514     } else {
1515       return false;
1516       }
1517   });
1518 }
1519
1520 template <class T>
1521 template <class F>
1522 Future<T> Future<T>::filter(F&& predicate) {
1523   return this->then([p = std::forward<F>(predicate)](T val) {
1524     T const& valConstRef = val;
1525     if (!p(valConstRef)) {
1526       throwPredicateDoesNotObtain();
1527     }
1528     return val;
1529   });
1530 }
1531
1532 template <class F>
1533 inline Future<Unit> when(bool p, F&& thunk) {
1534   return p ? std::forward<F>(thunk)().unit() : makeFuture();
1535 }
1536
1537 template <class P, class F>
1538 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1539   if (predicate()) {
1540     auto future = thunk();
1541     return future.then([
1542       predicate = std::forward<P>(predicate),
1543       thunk = std::forward<F>(thunk)
1544     ]() mutable {
1545       return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1546     });
1547   }
1548   return makeFuture();
1549 }
1550
1551 template <class F>
1552 Future<Unit> times(const int n, F&& thunk) {
1553   return folly::whileDo(
1554       [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1555         return count->fetch_add(1) < n;
1556       },
1557       std::forward<F>(thunk));
1558 }
1559
1560 namespace futures {
1561 template <class It, class F, class ItT, class Result>
1562 std::vector<Future<Result>> map(It first, It last, F func) {
1563   std::vector<Future<Result>> results;
1564   for (auto it = first; it != last; it++) {
1565     results.push_back(it->then(func));
1566   }
1567   return results;
1568 }
1569 } // namespace futures
1570
1571 // Instantiate the most common Future types to save compile time
1572 extern template class Future<Unit>;
1573 extern template class Future<bool>;
1574 extern template class Future<int>;
1575 extern template class Future<int64_t>;
1576 extern template class Future<std::string>;
1577 extern template class Future<double>;
1578 } // namespace folly