fix memory leak in case of large number of retries
[folly.git] / folly / futures / Future-inl.h
index 3a919ff489231c1b5a110cb1b2b3ad6571ec9077..b353ff81a411f932c28da12af08239ff7ba48e32 100644 (file)
@@ -1,5 +1,5 @@
 /*
 /*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 #include <chrono>
 #include <random>
 #include <thread>
 #include <chrono>
 #include <random>
 #include <thread>
-#include <folly/experimental/fibers/Baton.h>
+#include <folly/Baton.h>
 #include <folly/Optional.h>
 #include <folly/Random.h>
 #include <folly/Traits.h>
 #include <folly/futures/detail/Core.h>
 #include <folly/futures/Timekeeper.h>
 
 #include <folly/Optional.h>
 #include <folly/Random.h>
 #include <folly/Traits.h>
 #include <folly/futures/detail/Core.h>
 #include <folly/futures/Timekeeper.h>
 
+#if FOLLY_MOBILE || defined(__APPLE__)
+#define FOLLY_FUTURE_USING_FIBER 0
+#else
+#define FOLLY_FUTURE_USING_FIBER 1
+#include <folly/fibers/Baton.h>
+#endif
+
 namespace folly {
 
 class Timekeeper;
 
 namespace folly {
 
 class Timekeeper;
 
+namespace detail {
+#if FOLLY_FUTURE_USING_FIBER
+typedef folly::fibers::Baton FutureBatonType;
+#else
+typedef folly::Baton<> FutureBatonType;
+#endif
+}
+
 namespace detail {
   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
 }
 namespace detail {
   std::shared_ptr<Timekeeper> getTimekeeperSingleton();
 }
@@ -50,12 +65,12 @@ Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
 template <class T>
 template <class T2, typename>
 Future<T>::Future(T2&& val)
 template <class T>
 template <class T2, typename>
 Future<T>::Future(T2&& val)
-  : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
+    : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
 
 template <class T>
 
 template <class T>
-template <typename, typename>
-Future<T>::Future()
-  : core_(new detail::Core<T>(Try<T>(T()))) {}
+template <typename T2>
+Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
+    : core_(new detail::Core<T>(Try<T>(T()))) {}
 
 template <class T>
 Future<T>::~Future() {
 
 template <class T>
 Future<T>::~Future() {
@@ -80,7 +95,7 @@ template <class T>
 template <class F>
 void Future<T>::setCallback_(F&& func) {
   throwIfInvalid();
 template <class F>
 void Future<T>::setCallback_(F&& func) {
   throwIfInvalid();
-  core_->setCallback(std::move(func));
+  core_->setCallback(std::forward<F>(func));
 }
 
 // unwrap
 }
 
 // unwrap
@@ -102,19 +117,17 @@ Future<T>::unwrap() {
 template <class T>
 template <typename F, typename R, bool isTry, typename... Args>
 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
 template <class T>
 template <typename F, typename R, bool isTry, typename... Args>
 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
-Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
+Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
   typedef typename R::ReturnsFuture::Inner B;
 
   throwIfInvalid();
 
   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
   typedef typename R::ReturnsFuture::Inner B;
 
   throwIfInvalid();
 
-  // wrap these so we can move them into the lambda
-  folly::MoveWrapper<Promise<B>> p;
-  p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
-  folly::MoveWrapper<F> funcm(std::forward<F>(func));
+  Promise<B> p;
+  p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
 
   // grab the Future now before we lose our handle on the Promise
 
   // grab the Future now before we lose our handle on the Promise
-  auto f = p->getFuture();
+  auto f = p.getFuture();
   f.core_->setExecutorNoLock(getExecutor());
 
   /* This is a bit tricky.
   f.core_->setExecutorNoLock(getExecutor());
 
   /* This is a bit tricky.
@@ -136,9 +149,6 @@ Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
      persist beyond the callback, if it gets moved), and so it is an
      optimization to just make it shared from the get-go.
 
      persist beyond the callback, if it gets moved), and so it is an
      optimization to just make it shared from the get-go.
 
-     We have to move in the Promise and func using the MoveWrapper
-     hack. (func could be copied but it's a big drag on perf).
-
      Two subtle but important points about this design. detail::Core has no
      back pointers to Future or Promise, so if Future or Promise get moved
      (and they will be moved in performant code) we don't have to do
      Two subtle but important points about this design. detail::Core has no
      back pointers to Future or Promise, so if Future or Promise get moved
      (and they will be moved in performant code) we don't have to do
@@ -150,15 +160,15 @@ Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
      in the destruction of the Future used to create it.
      */
   setCallback_(
      in the destruction of the Future used to create it.
      */
   setCallback_(
-    [p, funcm](Try<T>&& t) mutable {
-      if (!isTry && t.hasException()) {
-        p->setException(std::move(t.exception()));
-      } else {
-        p->setWith([&]() {
-          return (*funcm)(t.template get<isTry, Args>()...);
-        });
-      }
-    });
+      [ func = std::forward<F>(func), pm = std::move(p) ](Try<T> && t) mutable {
+        if (!isTry && t.hasException()) {
+          pm.setException(std::move(t.exception()));
+        } else {
+          pm.setWith([&]() {
+            return std::move(func)(t.template get<isTry, Args>()...);
+          });
+        }
+      });
 
   return f;
 }
 
   return f;
 }
@@ -168,39 +178,43 @@ Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
 template <class T>
 template <typename F, typename R, bool isTry, typename... Args>
 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
 template <class T>
 template <typename F, typename R, bool isTry, typename... Args>
 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
-Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
+Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
   typedef typename R::ReturnsFuture::Inner B;
 
   throwIfInvalid();
 
   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
   typedef typename R::ReturnsFuture::Inner B;
 
   throwIfInvalid();
 
-  // wrap these so we can move them into the lambda
-  folly::MoveWrapper<Promise<B>> p;
-  p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
-  folly::MoveWrapper<F> funcm(std::forward<F>(func));
+  Promise<B> p;
+  p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
 
   // grab the Future now before we lose our handle on the Promise
 
   // grab the Future now before we lose our handle on the Promise
-  auto f = p->getFuture();
+  auto f = p.getFuture();
   f.core_->setExecutorNoLock(getExecutor());
 
   f.core_->setExecutorNoLock(getExecutor());
 
-  setCallback_(
-    [p, funcm](Try<T>&& t) mutable {
+  setCallback_([ func = std::forward<F>(func), pm = std::move(p) ](
+      Try<T> && t) mutable {
+    auto ew = [&] {
       if (!isTry && t.hasException()) {
       if (!isTry && t.hasException()) {
-        p->setException(std::move(t.exception()));
+        return std::move(t.exception());
       } else {
         try {
       } else {
         try {
-          auto f2 = (*funcm)(t.template get<isTry, Args>()...);
+          auto f2 = std::move(func)(t.template get<isTry, Args>()...);
           // that didn't throw, now we can steal p
           // that didn't throw, now we can steal p
-          f2.setCallback_([p](Try<B>&& b) mutable {
-            p->setTry(std::move(b));
+          f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
+            p.setTry(std::move(b));
           });
           });
+          return exception_wrapper();
         } catch (const std::exception& e) {
         } catch (const std::exception& e) {
-          p->setException(exception_wrapper(std::current_exception(), e));
+          return exception_wrapper(std::current_exception(), e);
         } catch (...) {
         } catch (...) {
-          p->setException(exception_wrapper(std::current_exception()));
+          return exception_wrapper(std::current_exception());
         }
       }
         }
       }
-    });
+    }();
+    if (ew) {
+      pm.setException(std::move(ew));
+    }
+  });
 
   return f;
 }
 
   return f;
 }
@@ -250,17 +264,15 @@ Future<T>::onError(F&& func) {
   Promise<T> p;
   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
   auto f = p.getFuture();
   Promise<T> p;
   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
   auto f = p.getFuture();
-  auto pm = folly::makeMoveWrapper(std::move(p));
-  auto funcm = folly::makeMoveWrapper(std::move(func));
-  setCallback_([pm, funcm](Try<T>&& t) mutable {
-    if (!t.template withException<Exn>([&] (Exn& e) {
-          pm->setWith([&]{
-            return (*funcm)(e);
-          });
-        })) {
-      pm->setTry(std::move(t));
-    }
-  });
+
+  setCallback_(
+      [ func = std::forward<F>(func), pm = std::move(p) ](Try<T> && t) mutable {
+        if (!t.template withException<Exn>([&](Exn& e) {
+              pm.setWith([&] { return std::move(func)(e); });
+            })) {
+          pm.setTry(std::move(t));
+        }
+      });
 
   return f;
 }
 
   return f;
 }
@@ -280,22 +292,28 @@ Future<T>::onError(F&& func) {
 
   Promise<T> p;
   auto f = p.getFuture();
 
   Promise<T> p;
   auto f = p.getFuture();
-  auto pm = folly::makeMoveWrapper(std::move(p));
-  auto funcm = folly::makeMoveWrapper(std::move(func));
-  setCallback_([pm, funcm](Try<T>&& t) mutable {
-    if (!t.template withException<Exn>([&] (Exn& e) {
-          try {
-            auto f2 = (*funcm)(e);
-            f2.setCallback_([pm](Try<T>&& t2) mutable {
-              pm->setTry(std::move(t2));
-            });
-          } catch (const std::exception& e2) {
-            pm->setException(exception_wrapper(std::current_exception(), e2));
-          } catch (...) {
-            pm->setException(exception_wrapper(std::current_exception()));
+
+  setCallback_([ pm = std::move(p), func = std::forward<F>(func) ](
+      Try<T> && t) mutable {
+    if (!t.template withException<Exn>([&](Exn& e) {
+          auto ew = [&] {
+            try {
+              auto f2 = std::move(func)(e);
+              f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
+                pm.setTry(std::move(t2));
+              });
+              return exception_wrapper();
+            } catch (const std::exception& e2) {
+              return exception_wrapper(std::current_exception(), e2);
+            } catch (...) {
+              return exception_wrapper(std::current_exception());
+            }
+          }();
+          if (ew) {
+            pm.setException(std::move(ew));
           }
         })) {
           }
         })) {
-      pm->setTry(std::move(t));
+      pm.setTry(std::move(t));
     }
   });
 
     }
   });
 
@@ -304,10 +322,9 @@ Future<T>::onError(F&& func) {
 
 template <class T>
 template <class F>
 
 template <class T>
 template <class F>
-Future<T> Future<T>::ensure(F func) {
-  MoveWrapper<F> funcw(std::move(func));
-  return this->then([funcw](Try<T>&& t) mutable {
-    (*funcw)();
+Future<T> Future<T>::ensure(F&& func) {
+  return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
+    funcw();
     return makeFuture(std::move(t));
   });
 }
     return makeFuture(std::move(t));
   });
 }
@@ -315,17 +332,15 @@ Future<T> Future<T>::ensure(F func) {
 template <class T>
 template <class F>
 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
 template <class T>
 template <class F>
 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
-  auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
-  return within(dur, tk)
-    .onError([funcw](TimedOut const&) { return (*funcw)(); });
+  return within(dur, tk).onError([funcw = std::forward<F>(func)](
+      TimedOut const&) { return funcw(); });
 }
 
 template <class T>
 template <class F>
 }
 
 template <class T>
 template <class F>
-typename std::enable_if<
-  detail::callableWith<F, exception_wrapper>::value &&
-  detail::Extract<F>::ReturnsFuture::value,
-  Future<T>>::type
+typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
+                            detail::Extract<F>::ReturnsFuture::value,
+                        Future<T>>::type
 Future<T>::onError(F&& func) {
   static_assert(
       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
 Future<T>::onError(F&& func) {
   static_assert(
       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
@@ -333,24 +348,29 @@ Future<T>::onError(F&& func) {
 
   Promise<T> p;
   auto f = p.getFuture();
 
   Promise<T> p;
   auto f = p.getFuture();
-  auto pm = folly::makeMoveWrapper(std::move(p));
-  auto funcm = folly::makeMoveWrapper(std::move(func));
-  setCallback_([pm, funcm](Try<T> t) mutable {
-    if (t.hasException()) {
-      try {
-        auto f2 = (*funcm)(std::move(t.exception()));
-        f2.setCallback_([pm](Try<T> t2) mutable {
-          pm->setTry(std::move(t2));
-        });
-      } catch (const std::exception& e2) {
-        pm->setException(exception_wrapper(std::current_exception(), e2));
-      } catch (...) {
-        pm->setException(exception_wrapper(std::current_exception()));
-      }
-    } else {
-      pm->setTry(std::move(t));
-    }
-  });
+  setCallback_(
+      [ pm = std::move(p), func = std::forward<F>(func) ](Try<T> t) mutable {
+        if (t.hasException()) {
+          auto ew = [&] {
+            try {
+              auto f2 = std::move(func)(std::move(t.exception()));
+              f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
+                pm.setTry(std::move(t2));
+              });
+              return exception_wrapper();
+            } catch (const std::exception& e2) {
+              return exception_wrapper(std::current_exception(), e2);
+            } catch (...) {
+              return exception_wrapper(std::current_exception());
+            }
+          }();
+          if (ew) {
+            pm.setException(std::move(ew));
+          }
+        } else {
+          pm.setTry(std::move(t));
+        }
+      });
 
   return f;
 }
 
   return f;
 }
@@ -369,17 +389,14 @@ Future<T>::onError(F&& func) {
 
   Promise<T> p;
   auto f = p.getFuture();
 
   Promise<T> p;
   auto f = p.getFuture();
-  auto pm = folly::makeMoveWrapper(std::move(p));
-  auto funcm = folly::makeMoveWrapper(std::move(func));
-  setCallback_([pm, funcm](Try<T> t) mutable {
-    if (t.hasException()) {
-      pm->setWith([&]{
-        return (*funcm)(std::move(t.exception()));
+  setCallback_(
+      [ pm = std::move(p), func = std::forward<F>(func) ](Try<T> t) mutable {
+        if (t.hasException()) {
+          pm.setWith([&] { return std::move(func)(std::move(t.exception())); });
+        } else {
+          pm.setTry(std::move(t));
+        }
       });
       });
-    } else {
-      pm->setTry(std::move(t));
-    }
-  });
 
   return f;
 }
 
   return f;
 }
@@ -405,6 +422,11 @@ Try<T>& Future<T>::getTry() {
   return core_->getTry();
 }
 
   return core_->getTry();
 }
 
+template <class T>
+Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
+  return waitVia(e).getTry();
+}
+
 template <class T>
 Optional<Try<T>> Future<T>::poll() {
   Optional<Try<T>> o;
 template <class T>
 Optional<Try<T>> Future<T>::poll() {
   Optional<Try<T>> o;
@@ -427,19 +449,18 @@ template <class T>
 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
   throwIfInvalid();
 
 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
   throwIfInvalid();
 
-  MoveWrapper<Promise<T>> p;
-  auto f = p->getFuture();
-  then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
+  Promise<T> p;
+  auto f = p.getFuture();
+  then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
   return std::move(f).via(executor, priority);
 }
 
   return std::move(f).via(executor, priority);
 }
 
-
 template <class Func>
 template <class Func>
-auto via(Executor* x, Func func)
+auto via(Executor* x, Func&& func)
   -> Future<typename isFuture<decltype(func())>::Inner>
 {
   // TODO make this actually more performant. :-P #7260175
   -> Future<typename isFuture<decltype(func())>::Inner>
 {
   // TODO make this actually more performant. :-P #7260175
-  return via(x).then(func);
+  return via(x).then(std::forward<Func>(func));
 }
 
 template <class T>
 }
 
 template <class T>
@@ -568,7 +589,7 @@ collectAll(InputIterator first, InputIterator last) {
     typename std::iterator_traits<InputIterator>::value_type::value_type T;
 
   struct CollectAllContext {
     typename std::iterator_traits<InputIterator>::value_type::value_type T;
 
   struct CollectAllContext {
-    CollectAllContext(int n) : results(n) {}
+    CollectAllContext(size_t n) : results(n) {}
     ~CollectAllContext() {
       p.setValue(std::move(results));
     }
     ~CollectAllContext() {
       p.setValue(std::move(results));
     }
@@ -576,7 +597,8 @@ collectAll(InputIterator first, InputIterator last) {
     std::vector<Try<T>> results;
   };
 
     std::vector<Try<T>> results;
   };
 
-  auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
+  auto ctx =
+      std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
     ctx->results[i] = std::move(t);
   });
   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
     ctx->results[i] = std::move(t);
   });
@@ -589,7 +611,9 @@ namespace detail {
 
 template <typename T>
 struct CollectContext {
 
 template <typename T>
 struct CollectContext {
-  struct Nothing { explicit Nothing(int n) {} };
+  struct Nothing {
+    explicit Nothing(int /* n */) {}
+  };
 
   using Result = typename std::conditional<
     std::is_void<T>::value,
 
   using Result = typename std::conditional<
     std::is_void<T>::value,
@@ -601,7 +625,7 @@ struct CollectContext {
     Nothing,
     std::vector<Optional<T>>>::type;
 
     Nothing,
     std::vector<Optional<T>>>::type;
 
-  explicit CollectContext(int n) : result(n) {}
+  explicit CollectContext(size_t n) : result(n) {}
   ~CollectContext() {
     if (!threw.exchange(true)) {
       // map Optional<T> -> T
   ~CollectContext() {
     if (!threw.exchange(true)) {
       // map Optional<T> -> T
@@ -670,7 +694,7 @@ collectAny(InputIterator first, InputIterator last) {
     typename std::iterator_traits<InputIterator>::value_type::value_type T;
 
   struct CollectAnyContext {
     typename std::iterator_traits<InputIterator>::value_type::value_type T;
 
   struct CollectAnyContext {
-    CollectAnyContext() {};
+    CollectAnyContext() {}
     Promise<std::pair<size_t, Try<T>>> p;
     std::atomic<bool> done {false};
   };
     Promise<std::pair<size_t, Try<T>>> p;
     std::atomic<bool> done {false};
   };
@@ -684,6 +708,37 @@ collectAny(InputIterator first, InputIterator last) {
   return ctx->p.getFuture();
 }
 
   return ctx->p.getFuture();
 }
 
+// collectAnyWithoutException (iterator)
+
+template <class InputIterator>
+Future<std::pair<
+    size_t,
+    typename std::iterator_traits<InputIterator>::value_type::value_type>>
+collectAnyWithoutException(InputIterator first, InputIterator last) {
+  typedef
+      typename std::iterator_traits<InputIterator>::value_type::value_type T;
+
+  struct CollectAnyWithoutExceptionContext {
+    CollectAnyWithoutExceptionContext(){}
+    Promise<std::pair<size_t, T>> p;
+    std::atomic<bool> done{false};
+    std::atomic<size_t> nFulfilled{0};
+    size_t nTotal;
+  };
+
+  auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
+  ctx->nTotal = size_t(std::distance(first, last));
+
+  mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
+    if (!t.hasException() && !ctx->done.exchange(true)) {
+      ctx->p.setValue(std::make_pair(i, std::move(t.value())));
+    } else if (++ctx->nFulfilled == ctx->nTotal) {
+      ctx->p.setException(t.exception());
+    }
+  });
+  return ctx->p.getFuture();
+}
+
 // collectN (iterator)
 
 template <class InputIterator>
 // collectN (iterator)
 
 template <class InputIterator>
@@ -731,17 +786,19 @@ Future<T> reduce(It first, It last, T&& initial, F&& func) {
   }
 
   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
   }
 
   typedef typename std::iterator_traits<It>::value_type::value_type ItT;
-  typedef typename std::conditional<
-    detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
+  typedef
+      typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
+                                Try<ItT>,
+                                ItT>::type Arg;
   typedef isTry<Arg> IsTry;
 
   typedef isTry<Arg> IsTry;
 
-  folly::MoveWrapper<T> minitial(std::move(initial));
   auto sfunc = std::make_shared<F>(std::move(func));
 
   auto sfunc = std::make_shared<F>(std::move(func));
 
-  auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
-    return (*sfunc)(std::move(*minitial),
-                head.template get<IsTry::value, Arg&&>());
-  });
+  auto f = first->then(
+      [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
+        return (*sfunc)(
+            std::move(minitial), head.template get<IsTry::value, Arg&&>());
+      });
 
   for (++first; first != last; ++first) {
     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
 
   for (++first; first != last; ++first) {
     f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
@@ -809,12 +866,13 @@ window(Collection input, F func, size_t n) {
 template <class T>
 template <class I, class F>
 Future<I> Future<T>::reduce(I&& initial, F&& func) {
 template <class T>
 template <class I, class F>
 Future<I> Future<T>::reduce(I&& initial, F&& func) {
-  folly::MoveWrapper<I> minitial(std::move(initial));
-  folly::MoveWrapper<F> mfunc(std::move(func));
-  return then([minitial, mfunc](T& vals) mutable {
-    auto ret = std::move(*minitial);
+  return then([
+    minitial = std::forward<I>(initial),
+    mfunc = std::forward<F>(func)
+  ](T& vals) mutable {
+    auto ret = std::move(minitial);
     for (auto& val : vals) {
     for (auto& val : vals) {
-      ret = (*mfunc)(std::move(ret), std::move(val));
+      ret = mfunc(std::move(ret), std::move(val));
     }
     return ret;
   });
     }
     return ret;
   });
@@ -834,7 +892,7 @@ Future<T> unorderedReduce(It first, It last, T initial, F func) {
     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
         : lock_(), memo_(makeFuture<T>(std::move(memo))),
           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
         : lock_(), memo_(makeFuture<T>(std::move(memo))),
           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
-      {};
+      {}
     folly::MicroSpinLock lock_; // protects memo_ and numThens_
     Future<T> memo_;
     F func_;
     folly::MicroSpinLock lock_; // protects memo_ and numThens_
     Future<T> memo_;
     F func_;
@@ -846,26 +904,29 @@ Future<T> unorderedReduce(It first, It last, T initial, F func) {
   auto ctx = std::make_shared<UnorderedReduceContext>(
     std::move(initial), std::move(func), std::distance(first, last));
 
   auto ctx = std::make_shared<UnorderedReduceContext>(
     std::move(initial), std::move(func), std::distance(first, last));
 
-  mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
-    folly::MoveWrapper<Try<ItT>> mt(std::move(t));
-    // Futures can be completed in any order, simultaneously.
-    // To make this non-blocking, we create a new Future chain in
-    // the order of completion to reduce the values.
-    // The spinlock just protects chaining a new Future, not actually
-    // executing the reduce, which should be really fast.
-    folly::MSLGuard lock(ctx->lock_);
-    ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
-      // Either return a ItT&& or a Try<ItT>&& depending
-      // on the type of the argument of func.
-      return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
-    });
-    if (++ctx->numThens_ == ctx->numFutures_) {
-      // After reducing the value of the last Future, fulfill the Promise
-      ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
-        ctx->promise_.setValue(std::move(t2));
+  mapSetCallback<ItT>(
+      first,
+      last,
+      [ctx](size_t /* i */, Try<ItT>&& t) {
+        // Futures can be completed in any order, simultaneously.
+        // To make this non-blocking, we create a new Future chain in
+        // the order of completion to reduce the values.
+        // The spinlock just protects chaining a new Future, not actually
+        // executing the reduce, which should be really fast.
+        folly::MSLGuard lock(ctx->lock_);
+        ctx->memo_ =
+            ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
+              // Either return a ItT&& or a Try<ItT>&& depending
+              // on the type of the argument of func.
+              return ctx->func_(std::move(v),
+                                mt.template get<IsTry::value, Arg&&>());
+            });
+        if (++ctx->numThens_ == ctx->numFutures_) {
+          // After reducing the value of the last Future, fulfill the Promise
+          ctx->memo_.setCallback_(
+              [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
+        }
       });
       });
-    }
-  });
 
   return ctx->promise_.getFuture();
 }
 
   return ctx->promise_.getFuture();
 }
@@ -937,8 +998,8 @@ void waitImpl(Future<T>& f) {
   // short-circuit if there's nothing to do
   if (f.isReady()) return;
 
   // short-circuit if there's nothing to do
   if (f.isReady()) return;
 
-  folly::fibers::Baton baton;
-  f.setCallback_([&](const Try<T>& t) { baton.post(); });
+  FutureBatonType baton;
+  f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
   baton.wait();
   assert(f.isReady());
 }
   baton.wait();
   assert(f.isReady());
 }
@@ -946,13 +1007,15 @@ void waitImpl(Future<T>& f) {
 template <class T>
 void waitImpl(Future<T>& f, Duration dur) {
   // short-circuit if there's nothing to do
 template <class T>
 void waitImpl(Future<T>& f, Duration dur) {
   // short-circuit if there's nothing to do
-  if (f.isReady()) return;
+  if (f.isReady()) {
+    return;
+  }
 
 
-  folly::MoveWrapper<Promise<T>> promise;
-  auto ret = promise->getFuture();
-  auto baton = std::make_shared<folly::fibers::Baton>();
-  f.setCallback_([baton, promise](Try<T>&& t) mutable {
-    promise->setTry(std::move(t));
+  Promise<T> promise;
+  auto ret = promise.getFuture();
+  auto baton = std::make_shared<FutureBatonType>();
+  f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
+    promise.setTry(std::move(t));
     baton->post();
   });
   f = std::move(ret);
     baton->post();
   });
   f = std::move(ret);
@@ -963,9 +1026,16 @@ void waitImpl(Future<T>& f, Duration dur) {
 
 template <class T>
 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
 
 template <class T>
 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
+  // Set callback so to ensure that the via executor has something on it
+  // so that once the preceding future triggers this callback, drive will
+  // always have a callback to satisfy it
+  if (f.isReady())
+    return;
+  f = f.via(e).then([](T&& t) { return std::move(t); });
   while (!f.isReady()) {
     e->drive();
   }
   while (!f.isReady()) {
     e->drive();
   }
+  assert(f.isReady());
 }
 
 } // detail
 }
 
 } // detail
@@ -1048,11 +1118,10 @@ Future<bool> Future<T>::willEqual(Future<T>& f) {
 
 template <class T>
 template <class F>
 
 template <class T>
 template <class F>
-Future<T> Future<T>::filter(F predicate) {
-  auto p = folly::makeMoveWrapper(std::move(predicate));
-  return this->then([p](T val) {
+Future<T> Future<T>::filter(F&& predicate) {
+  return this->then([p = std::forward<F>(predicate)](T val) {
     T const& valConstRef = val;
     T const& valConstRef = val;
-    if (!(*p)(valConstRef)) {
+    if (!p(valConstRef)) {
       throw PredicateDoesNotObtain();
     }
     return val;
       throw PredicateDoesNotObtain();
     }
     return val;
@@ -1100,28 +1169,31 @@ auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
 }
 
 template <class F>
 }
 
 template <class F>
-inline Future<Unit> when(bool p, F thunk) {
-  return p ? thunk().unit() : makeFuture();
+inline Future<Unit> when(bool p, F&& thunk) {
+  return p ? std::forward<F>(thunk)().unit() : makeFuture();
 }
 
 template <class P, class F>
 }
 
 template <class P, class F>
-Future<Unit> whileDo(P predicate, F thunk) {
+Future<Unit> whileDo(P&& predicate, F&& thunk) {
   if (predicate()) {
   if (predicate()) {
-    return thunk().then([=] {
-      return whileDo(predicate, thunk);
+    auto future = thunk();
+    return future.then([
+      predicate = std::forward<P>(predicate),
+      thunk = std::forward<F>(thunk)
+    ]() mutable {
+      return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
     });
   }
   return makeFuture();
 }
 
 template <class F>
     });
   }
   return makeFuture();
 }
 
 template <class F>
-Future<Unit> times(const int n, F thunk) {
-  auto count = folly::makeMoveWrapper(
-    std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
-  );
-  return folly::whileDo([=]() mutable {
-      return (*count)->fetch_add(1) < n;
-    }, thunk);
+Future<Unit> times(const int n, F&& thunk) {
+  return folly::whileDo(
+      [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
+        return count->fetch_add(1) < n;
+      },
+      std::forward<F>(thunk));
 }
 
 namespace futures {
 }
 
 namespace futures {
@@ -1157,31 +1229,56 @@ struct retrying_policy_traits {
         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
 };
 
         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
 };
 
+template <class Policy, class FF, class Prom>
+void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
+  using F = typename std::result_of<FF(size_t)>::type;
+  using T = typename F::value_type;
+  auto f = ff(k++);
+  f.then([
+    k,
+    prom = std::move(prom),
+    pm = std::forward<Policy>(p),
+    ffm = std::forward<FF>(ff)
+  ](Try<T> && t) mutable {
+    if (t.hasValue()) {
+      prom.setValue(std::move(t).value());
+      return;
+    }
+    auto& x = t.exception();
+    auto q = pm(k, x);
+    q.then([
+      k,
+      prom = std::move(prom),
+      xm = std::move(x),
+      pm = std::move(pm),
+      ffm = std::move(ffm)
+    ](bool shouldRetry) mutable {
+      if (shouldRetry) {
+        retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
+      } else {
+        prom.setException(std::move(xm));
+      };
+    });
+  });
+}
+
 template <class Policy, class FF>
 typename std::result_of<FF(size_t)>::type
 retrying(size_t k, Policy&& p, FF&& ff) {
   using F = typename std::result_of<FF(size_t)>::type;
   using T = typename F::value_type;
 template <class Policy, class FF>
 typename std::result_of<FF(size_t)>::type
 retrying(size_t k, Policy&& p, FF&& ff) {
   using F = typename std::result_of<FF(size_t)>::type;
   using T = typename F::value_type;
-  auto f = ff(k++);
-  auto pm = makeMoveWrapper(p);
-  auto ffm = makeMoveWrapper(ff);
-  return f.onError([=](exception_wrapper x) mutable {
-      auto q = (*pm)(k, x);
-      auto xm = makeMoveWrapper(std::move(x));
-      return q.then([=](bool r) mutable {
-          return r
-            ? retrying(k, pm.move(), ffm.move())
-            : makeFuture<T>(xm.move());
-      });
-  });
+  auto prom = Promise<T>();
+  auto f = prom.getFuture();
+  retryingImpl(
+      k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
+  return f;
 }
 
 template <class Policy, class FF>
 typename std::result_of<FF(size_t)>::type
 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
 }
 
 template <class Policy, class FF>
 typename std::result_of<FF(size_t)>::type
 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
-  auto pm = makeMoveWrapper(std::move(p));
-  auto q = [=](size_t k, exception_wrapper x) {
-    return makeFuture<bool>((*pm)(k, x));
+  auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
+    return makeFuture<bool>(pm(k, x));
   };
   return retrying(0, std::move(q), std::forward<FF>(ff));
 }
   };
   return retrying(0, std::move(q), std::forward<FF>(ff));
 }
@@ -1214,18 +1311,29 @@ retryingPolicyCappedJitteredExponentialBackoff(
     Duration backoff_min,
     Duration backoff_max,
     double jitter_param,
     Duration backoff_min,
     Duration backoff_max,
     double jitter_param,
-    URNG rng,
+    URNG&& rng,
     Policy&& p) {
     Policy&& p) {
-  auto pm = makeMoveWrapper(std::move(p));
-  auto rngp = std::make_shared<URNG>(std::move(rng));
-  return [=](size_t n, const exception_wrapper& ex) mutable {
-    if (n == max_tries) { return makeFuture(false); }
-    return (*pm)(n, ex).then([=](bool v) {
-        if (!v) { return makeFuture(false); }
-        auto backoff = detail::retryingJitteredExponentialBackoffDur(
-            n, backoff_min, backoff_max, jitter_param, *rngp);
-        return futures::sleep(backoff).then([] { return true; });
-    });
+  return [
+    pm = std::forward<Policy>(p),
+    max_tries,
+    backoff_min,
+    backoff_max,
+    jitter_param,
+    rngp = std::forward<URNG>(rng)
+  ](size_t n, const exception_wrapper& ex) mutable {
+    if (n == max_tries) {
+      return makeFuture(false);
+    }
+    return pm(n, ex).then(
+        [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
+            bool v) mutable {
+          if (!v) {
+            return makeFuture(false);
+          }
+          auto backoff = detail::retryingJitteredExponentialBackoffDur(
+              n, backoff_min, backoff_max, jitter_param, rngp);
+          return futures::sleep(backoff).then([] { return true; });
+        });
   };
 }
 
   };
 }
 
@@ -1236,19 +1344,19 @@ retryingPolicyCappedJitteredExponentialBackoff(
     Duration backoff_min,
     Duration backoff_max,
     double jitter_param,
     Duration backoff_min,
     Duration backoff_max,
     double jitter_param,
-    URNG rng,
+    URNG&& rng,
     Policy&& p,
     retrying_policy_raw_tag) {
     Policy&& p,
     retrying_policy_raw_tag) {
-  auto pm = makeMoveWrapper(std::move(p));
-  auto q = [=](size_t n, const exception_wrapper& e) {
-    return makeFuture((*pm)(n, e));
+  auto q = [pm = std::forward<Policy>(p)](
+      size_t n, const exception_wrapper& e) {
+    return makeFuture(pm(n, e));
   };
   return retryingPolicyCappedJitteredExponentialBackoff(
       max_tries,
       backoff_min,
       backoff_max,
       jitter_param,
   };
   return retryingPolicyCappedJitteredExponentialBackoff(
       max_tries,
       backoff_min,
       backoff_max,
       jitter_param,
-      std::move(rng),
+      std::forward<URNG>(rng),
       std::move(q));
 }
 
       std::move(q));
 }
 
@@ -1259,7 +1367,7 @@ retryingPolicyCappedJitteredExponentialBackoff(
     Duration backoff_min,
     Duration backoff_max,
     double jitter_param,
     Duration backoff_min,
     Duration backoff_max,
     double jitter_param,
-    URNG rng,
+    URNG&& rng,
     Policy&& p,
     retrying_policy_fut_tag) {
   return retryingPolicyCappedJitteredExponentialBackoff(
     Policy&& p,
     retrying_policy_fut_tag) {
   return retryingPolicyCappedJitteredExponentialBackoff(
@@ -1267,10 +1375,9 @@ retryingPolicyCappedJitteredExponentialBackoff(
       backoff_min,
       backoff_max,
       jitter_param,
       backoff_min,
       backoff_max,
       jitter_param,
-      std::move(rng),
-      std::move(p));
+      std::forward<URNG>(rng),
+      std::forward<Policy>(p));
 }
 }
-
 }
 
 template <class Policy, class FF>
 }
 
 template <class Policy, class FF>
@@ -1294,7 +1401,7 @@ retryingPolicyCappedJitteredExponentialBackoff(
     Duration backoff_min,
     Duration backoff_max,
     double jitter_param,
     Duration backoff_min,
     Duration backoff_max,
     double jitter_param,
-    URNG rng,
+    URNG&& rng,
     Policy&& p) {
   using tag = typename detail::retrying_policy_traits<Policy>::tag;
   return detail::retryingPolicyCappedJitteredExponentialBackoff(
     Policy&& p) {
   using tag = typename detail::retrying_policy_traits<Policy>::tag;
   return detail::retryingPolicyCappedJitteredExponentialBackoff(
@@ -1302,8 +1409,8 @@ retryingPolicyCappedJitteredExponentialBackoff(
       backoff_min,
       backoff_max,
       jitter_param,
       backoff_min,
       backoff_max,
       jitter_param,
-      std::move(rng),
-      std::move(p),
+      std::forward<URNG>(rng),
+      std::forward<Policy>(p),
       tag());
 }
 
       tag());
 }