fix memory leak in case of large number of retries
[folly.git] / folly / futures / Future-inl.h
index a8454155f518c25a82173ed9a49bf595ef9b452e..b353ff81a411f932c28da12af08239ff7ba48e32 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 #pragma once
 
+#include <algorithm>
+#include <cassert>
 #include <chrono>
+#include <random>
 #include <thread>
-
 #include <folly/Baton.h>
+#include <folly/Optional.h>
+#include <folly/Random.h>
+#include <folly/Traits.h>
 #include <folly/futures/detail/Core.h>
 #include <folly/futures/Timekeeper.h>
 
+#if FOLLY_MOBILE || defined(__APPLE__)
+#define FOLLY_FUTURE_USING_FIBER 0
+#else
+#define FOLLY_FUTURE_USING_FIBER 1
+#include <folly/fibers/Baton.h>
+#endif
+
 namespace folly {
 
 class Timekeeper;
 
 namespace detail {
-  Timekeeper* getTimekeeperSingleton();
+#if FOLLY_FUTURE_USING_FIBER
+typedef folly::fibers::Baton FutureBatonType;
+#else
+typedef folly::Baton<> FutureBatonType;
+#endif
+}
+
+namespace detail {
+  std::shared_ptr<Timekeeper> getTimekeeperSingleton();
 }
 
 template <class T>
-Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
-  *this = std::move(other);
+Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
+  other.core_ = nullptr;
 }
 
 template <class T>
-Future<T>& Future<T>::operator=(Future<T>&& other) {
+Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
   std::swap(core_, other.core_);
   return *this;
 }
 
 template <class T>
-template <class F>
-Future<T>::Future(
-  const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
-    : core_(nullptr) {
-  Promise<F> p;
-  p.setValue(val);
-  *this = p.getFuture();
-}
+template <class T2, typename>
+Future<T>::Future(T2&& val)
+    : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
 
 template <class T>
-template <class F>
-Future<T>::Future(
-  typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
-    : core_(nullptr) {
-  Promise<F> p;
-  p.setValue(std::forward<F>(val));
-  *this = p.getFuture();
-}
-
-template <>
-template <class F,
-          typename std::enable_if<std::is_void<F>::value, int>::type>
-Future<void>::Future() : core_(nullptr) {
-  Promise<void> p;
-  p.setValue();
-  *this = p.getFuture();
-}
-
+template <typename T2>
+Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
+    : core_(new detail::Core<T>(Try<T>(T()))) {}
 
 template <class T>
 Future<T>::~Future() {
@@ -95,26 +95,40 @@ template <class T>
 template <class F>
 void Future<T>::setCallback_(F&& func) {
   throwIfInvalid();
-  core_->setCallback(std::move(func));
+  core_->setCallback(std::forward<F>(func));
 }
 
+// unwrap
+
+template <class T>
+template <class F>
+typename std::enable_if<isFuture<F>::value,
+                        Future<typename isFuture<T>::Inner>>::type
+Future<T>::unwrap() {
+  return then([](Future<typename isFuture<T>::Inner> internal_future) {
+      return internal_future;
+  });
+}
+
+// then
+
 // Variant: returns a value
 // e.g. f.then([](Try<T>&& t){ return t.value(); });
 template <class T>
 template <typename F, typename R, bool isTry, typename... Args>
 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
-Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
+Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
   typedef typename R::ReturnsFuture::Inner B;
 
   throwIfInvalid();
 
-  // wrap these so we can move them into the lambda
-  folly::MoveWrapper<Promise<B>> p;
-  folly::MoveWrapper<F> funcm(std::forward<F>(func));
+  Promise<B> p;
+  p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
 
   // grab the Future now before we lose our handle on the Promise
-  auto f = p->getFuture();
+  auto f = p.getFuture();
+  f.core_->setExecutorNoLock(getExecutor());
 
   /* This is a bit tricky.
 
@@ -135,9 +149,6 @@ Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
      persist beyond the callback, if it gets moved), and so it is an
      optimization to just make it shared from the get-go.
 
-     We have to move in the Promise and func using the MoveWrapper
-     hack. (func could be copied but it's a big drag on perf).
-
      Two subtle but important points about this design. detail::Core has no
      back pointers to Future or Promise, so if Future or Promise get moved
      (and they will be moved in performant code) we don't have to do
@@ -149,15 +160,15 @@ Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
      in the destruction of the Future used to create it.
      */
   setCallback_(
-    [p, funcm](Try<T>&& t) mutable {
-      if (!isTry && t.hasException()) {
-        p->setException(std::move(t.exception()));
-      } else {
-        p->fulfil([&]() {
-          return (*funcm)(t.template get<isTry, Args>()...);
-        });
-      }
-    });
+      [ func = std::forward<F>(func), pm = std::move(p) ](Try<T> && t) mutable {
+        if (!isTry && t.hasException()) {
+          pm.setException(std::move(t.exception()));
+        } else {
+          pm.setWith([&]() {
+            return std::move(func)(t.template get<isTry, Args>()...);
+          });
+        }
+      });
 
   return f;
 }
@@ -167,37 +178,43 @@ Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
 template <class T>
 template <typename F, typename R, bool isTry, typename... Args>
 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
-Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
+Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
   typedef typename R::ReturnsFuture::Inner B;
 
   throwIfInvalid();
 
-  // wrap these so we can move them into the lambda
-  folly::MoveWrapper<Promise<B>> p;
-  folly::MoveWrapper<F> funcm(std::forward<F>(func));
+  Promise<B> p;
+  p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
 
   // grab the Future now before we lose our handle on the Promise
-  auto f = p->getFuture();
+  auto f = p.getFuture();
+  f.core_->setExecutorNoLock(getExecutor());
 
-  setCallback_(
-    [p, funcm](Try<T>&& t) mutable {
+  setCallback_([ func = std::forward<F>(func), pm = std::move(p) ](
+      Try<T> && t) mutable {
+    auto ew = [&] {
       if (!isTry && t.hasException()) {
-        p->setException(std::move(t.exception()));
+        return std::move(t.exception());
       } else {
         try {
-          auto f2 = (*funcm)(t.template get<isTry, Args>()...);
+          auto f2 = std::move(func)(t.template get<isTry, Args>()...);
           // that didn't throw, now we can steal p
-          f2.setCallback_([p](Try<B>&& b) mutable {
-            p->fulfilTry(std::move(b));
+          f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
+            p.setTry(std::move(b));
           });
+          return exception_wrapper();
         } catch (const std::exception& e) {
-          p->setException(exception_wrapper(std::current_exception(), e));
+          return exception_wrapper(std::current_exception(), e);
         } catch (...) {
-          p->setException(exception_wrapper(std::current_exception()));
+          return exception_wrapper(std::current_exception());
         }
       }
-    });
+    }();
+    if (ew) {
+      pm.setException(std::move(ew));
+    }
+  });
 
   return f;
 }
@@ -215,14 +232,27 @@ Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
 }
 
 template <class T>
-Future<void> Future<T>::then() {
-  return then([] (Try<T>&& t) {});
+template <class Executor, class Arg, class... Args>
+auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
+  -> decltype(this->then(std::forward<Arg>(arg),
+                         std::forward<Args>(args)...))
+{
+  auto oldX = getExecutor();
+  setExecutor(x);
+  return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
+               via(oldX);
+}
+
+template <class T>
+Future<Unit> Future<T>::then() {
+  return then([] () {});
 }
 
 // onError where the callback returns T
 template <class T>
 template <class F>
 typename std::enable_if<
+  !detail::callableWith<F, exception_wrapper>::value &&
   !detail::Extract<F>::ReturnsFuture::value,
   Future<T>>::type
 Future<T>::onError(F&& func) {
@@ -232,18 +262,17 @@ Future<T>::onError(F&& func) {
       "Return type of onError callback must be T or Future<T>");
 
   Promise<T> p;
+  p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
   auto f = p.getFuture();
-  auto pm = folly::makeMoveWrapper(std::move(p));
-  auto funcm = folly::makeMoveWrapper(std::move(func));
-  setCallback_([pm, funcm](Try<T>&& t) mutable {
-    if (!t.template withException<Exn>([&] (Exn& e) {
-          pm->fulfil([&]{
-            return (*funcm)(e);
-          });
-        })) {
-      pm->fulfilTry(std::move(t));
-    }
-  });
+
+  setCallback_(
+      [ func = std::forward<F>(func), pm = std::move(p) ](Try<T> && t) mutable {
+        if (!t.template withException<Exn>([&](Exn& e) {
+              pm.setWith([&] { return std::move(func)(e); });
+            })) {
+          pm.setTry(std::move(t));
+        }
+      });
 
   return f;
 }
@@ -252,6 +281,7 @@ Future<T>::onError(F&& func) {
 template <class T>
 template <class F>
 typename std::enable_if<
+  !detail::callableWith<F, exception_wrapper>::value &&
   detail::Extract<F>::ReturnsFuture::value,
   Future<T>>::type
 Future<T>::onError(F&& func) {
@@ -262,34 +292,113 @@ Future<T>::onError(F&& func) {
 
   Promise<T> p;
   auto f = p.getFuture();
-  auto pm = folly::makeMoveWrapper(std::move(p));
-  auto funcm = folly::makeMoveWrapper(std::move(func));
-  setCallback_([pm, funcm](Try<T>&& t) mutable {
-    if (!t.template withException<Exn>([&] (Exn& e) {
-          try {
-            auto f2 = (*funcm)(e);
-            f2.setCallback_([pm](Try<T>&& t2) mutable {
-              pm->fulfilTry(std::move(t2));
-            });
-          } catch (const std::exception& e2) {
-            pm->setException(exception_wrapper(std::current_exception(), e2));
-          } catch (...) {
-            pm->setException(exception_wrapper(std::current_exception()));
+
+  setCallback_([ pm = std::move(p), func = std::forward<F>(func) ](
+      Try<T> && t) mutable {
+    if (!t.template withException<Exn>([&](Exn& e) {
+          auto ew = [&] {
+            try {
+              auto f2 = std::move(func)(e);
+              f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
+                pm.setTry(std::move(t2));
+              });
+              return exception_wrapper();
+            } catch (const std::exception& e2) {
+              return exception_wrapper(std::current_exception(), e2);
+            } catch (...) {
+              return exception_wrapper(std::current_exception());
+            }
+          }();
+          if (ew) {
+            pm.setException(std::move(ew));
           }
         })) {
-      pm->fulfilTry(std::move(t));
+      pm.setTry(std::move(t));
     }
   });
 
   return f;
 }
 
+template <class T>
+template <class F>
+Future<T> Future<T>::ensure(F&& func) {
+  return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
+    funcw();
+    return makeFuture(std::move(t));
+  });
+}
+
 template <class T>
 template <class F>
 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
-  auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
-  return within(dur, tk)
-    .onError([funcw](TimedOut const&) { return (*funcw)(); });
+  return within(dur, tk).onError([funcw = std::forward<F>(func)](
+      TimedOut const&) { return funcw(); });
+}
+
+template <class T>
+template <class F>
+typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
+                            detail::Extract<F>::ReturnsFuture::value,
+                        Future<T>>::type
+Future<T>::onError(F&& func) {
+  static_assert(
+      std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
+      "Return type of onError callback must be T or Future<T>");
+
+  Promise<T> p;
+  auto f = p.getFuture();
+  setCallback_(
+      [ pm = std::move(p), func = std::forward<F>(func) ](Try<T> t) mutable {
+        if (t.hasException()) {
+          auto ew = [&] {
+            try {
+              auto f2 = std::move(func)(std::move(t.exception()));
+              f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
+                pm.setTry(std::move(t2));
+              });
+              return exception_wrapper();
+            } catch (const std::exception& e2) {
+              return exception_wrapper(std::current_exception(), e2);
+            } catch (...) {
+              return exception_wrapper(std::current_exception());
+            }
+          }();
+          if (ew) {
+            pm.setException(std::move(ew));
+          }
+        } else {
+          pm.setTry(std::move(t));
+        }
+      });
+
+  return f;
+}
+
+// onError(exception_wrapper) that returns T
+template <class T>
+template <class F>
+typename std::enable_if<
+  detail::callableWith<F, exception_wrapper>::value &&
+  !detail::Extract<F>::ReturnsFuture::value,
+  Future<T>>::type
+Future<T>::onError(F&& func) {
+  static_assert(
+      std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
+      "Return type of onError callback must be T or Future<T>");
+
+  Promise<T> p;
+  auto f = p.getFuture();
+  setCallback_(
+      [ pm = std::move(p), func = std::forward<F>(func) ](Try<T> t) mutable {
+        if (t.hasException()) {
+          pm.setWith([&] { return std::move(func)(std::move(t.exception())); });
+        } else {
+          pm.setTry(std::move(t));
+        }
+      });
+
+  return f;
 }
 
 template <class T>
@@ -314,25 +423,44 @@ Try<T>& Future<T>::getTry() {
 }
 
 template <class T>
-template <typename Executor>
-inline Future<T> Future<T>::via(Executor* executor) && {
+Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
+  return waitVia(e).getTry();
+}
+
+template <class T>
+Optional<Try<T>> Future<T>::poll() {
+  Optional<Try<T>> o;
+  if (core_->ready()) {
+    o = std::move(core_->getTry());
+  }
+  return o;
+}
+
+template <class T>
+inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
   throwIfInvalid();
 
-  this->deactivate();
-  core_->setExecutor(executor);
+  setExecutor(executor, priority);
 
   return std::move(*this);
 }
 
 template <class T>
-template <typename Executor>
-inline Future<T> Future<T>::via(Executor* executor) & {
+inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
   throwIfInvalid();
 
-  MoveWrapper<Promise<T>> p;
-  auto f = p->getFuture();
-  then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
-  return std::move(f).via(executor);
+  Promise<T> p;
+  auto f = p.getFuture();
+  then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
+  return std::move(f).via(executor, priority);
+}
+
+template <class Func>
+auto via(Executor* x, Func&& func)
+  -> Future<typename isFuture<decltype(func())>::Inner>
+{
+  // TODO make this actually more performant. :-P #7260175
+  return via(x).then(std::forward<Func>(func));
 }
 
 template <class T>
@@ -341,6 +469,16 @@ bool Future<T>::isReady() const {
   return core_->ready();
 }
 
+template <class T>
+bool Future<T>::hasValue() {
+  return getTry().hasValue();
+}
+
+template <class T>
+bool Future<T>::hasException() {
+  return getTry().hasException();
+}
+
 template <class T>
 void Future<T>::raise(exception_wrapper exception) {
   core_->raise(std::move(exception));
@@ -350,296 +488,451 @@ void Future<T>::raise(exception_wrapper exception) {
 
 template <class T>
 Future<typename std::decay<T>::type> makeFuture(T&& t) {
-  Promise<typename std::decay<T>::type> p;
-  p.setValue(std::forward<T>(t));
-  return p.getFuture();
+  return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
 }
 
 inline // for multiple translation units
-Future<void> makeFuture() {
-  Promise<void> p;
-  p.setValue();
-  return p.getFuture();
+Future<Unit> makeFuture() {
+  return makeFuture(Unit{});
 }
 
+// makeFutureWith(Future<T>()) -> Future<T>
 template <class F>
-auto makeFutureTry(
-    F&& func,
-    typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
-    -> Future<decltype(func())> {
-  Promise<decltype(func())> p;
-  p.fulfil(
-    [&func]() {
-      return (func)();
-    });
-  return p.getFuture();
+typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
+                        typename std::result_of<F()>::type>::type
+makeFutureWith(F&& func) {
+  using InnerType =
+      typename isFuture<typename std::result_of<F()>::type>::Inner;
+  try {
+    return func();
+  } catch (std::exception& e) {
+    return makeFuture<InnerType>(
+        exception_wrapper(std::current_exception(), e));
+  } catch (...) {
+    return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
+  }
 }
 
+// makeFutureWith(T()) -> Future<T>
+// makeFutureWith(void()) -> Future<Unit>
 template <class F>
-auto makeFutureTry(F const& func) -> Future<decltype(func())> {
-  F copy = func;
-  return makeFutureTry(std::move(copy));
+typename std::enable_if<
+    !(isFuture<typename std::result_of<F()>::type>::value),
+    Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
+makeFutureWith(F&& func) {
+  using LiftedResult =
+      typename Unit::Lift<typename std::result_of<F()>::type>::type;
+  return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
+    return func();
+  }));
 }
 
 template <class T>
 Future<T> makeFuture(std::exception_ptr const& e) {
-  Promise<T> p;
-  p.setException(e);
-  return p.getFuture();
+  return makeFuture(Try<T>(e));
 }
 
 template <class T>
 Future<T> makeFuture(exception_wrapper ew) {
-  Promise<T> p;
-  p.setException(std::move(ew));
-  return p.getFuture();
+  return makeFuture(Try<T>(std::move(ew)));
 }
 
 template <class T, class E>
 typename std::enable_if<std::is_base_of<std::exception, E>::value,
                         Future<T>>::type
 makeFuture(E const& e) {
-  Promise<T> p;
-  p.setException(make_exception_wrapper<E>(e));
-  return p.getFuture();
+  return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
 }
 
 template <class T>
 Future<T> makeFuture(Try<T>&& t) {
-  Promise<typename std::decay<T>::type> p;
-  p.fulfilTry(std::move(t));
-  return p.getFuture();
+  return Future<T>(new detail::Core<T>(std::move(t)));
 }
 
-template <>
-inline Future<void> makeFuture(Try<void>&& t) {
-  if (t.hasException()) {
-    return makeFuture<void>(std::move(t.exception()));
-  } else {
-    return makeFuture();
-  }
+// via
+Future<Unit> via(Executor* executor, int8_t priority) {
+  return makeFuture().via(executor, priority);
 }
 
-// via
-template <typename Executor>
-Future<void> via(Executor* executor) {
-  return makeFuture().via(executor);
+// mapSetCallback calls func(i, Try<T>) when every future completes
+
+template <class T, class InputIterator, class F>
+void mapSetCallback(InputIterator first, InputIterator last, F func) {
+  for (size_t i = 0; first != last; ++first, ++i) {
+    first->setCallback_([func, i](Try<T>&& t) {
+      func(i, std::move(t));
+    });
+  }
 }
 
-// when (variadic)
+// collectAll (variadic)
 
 template <typename... Fs>
-typename detail::VariadicContext<
+typename detail::CollectAllVariadicContext<
   typename std::decay<Fs>::type::value_type...>::type
-whenAll(Fs&&... fs)
-{
-  auto ctx =
-    new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
-  ctx->total = sizeof...(fs);
-  auto f_saved = ctx->p.getFuture();
-  detail::whenAllVariadicHelper(ctx,
-    std::forward<typename std::decay<Fs>::type>(fs)...);
-  return f_saved;
+collectAll(Fs&&... fs) {
+  auto ctx = std::make_shared<detail::CollectAllVariadicContext<
+    typename std::decay<Fs>::type::value_type...>>();
+  detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
+    ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
+  return ctx->p.getFuture();
 }
 
-// when (iterator)
+// collectAll (iterator)
 
 template <class InputIterator>
 Future<
   std::vector<
   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
-whenAll(InputIterator first, InputIterator last)
-{
+collectAll(InputIterator first, InputIterator last) {
   typedef
     typename std::iterator_traits<InputIterator>::value_type::value_type T;
 
-  if (first >= last) {
-    return makeFuture(std::vector<Try<T>>());
-  }
-  size_t n = std::distance(first, last);
+  struct CollectAllContext {
+    CollectAllContext(size_t n) : results(n) {}
+    ~CollectAllContext() {
+      p.setValue(std::move(results));
+    }
+    Promise<std::vector<Try<T>>> p;
+    std::vector<Try<T>> results;
+  };
 
-  auto ctx = new detail::WhenAllContext<T>();
+  auto ctx =
+      std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
+  mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
+    ctx->results[i] = std::move(t);
+  });
+  return ctx->p.getFuture();
+}
 
-  ctx->results.resize(n);
+// collect (iterator)
 
-  auto f_saved = ctx->p.getFuture();
+namespace detail {
 
-  for (size_t i = 0; first != last; ++first, ++i) {
-     assert(i < n);
-     auto& f = *first;
-     f.setCallback_([ctx, i, n](Try<T>&& t) {
-         ctx->results[i] = std::move(t);
-         if (++ctx->count == n) {
-           ctx->p.setValue(std::move(ctx->results));
-           delete ctx;
-         }
-       });
+template <typename T>
+struct CollectContext {
+  struct Nothing {
+    explicit Nothing(int /* n */) {}
+  };
+
+  using Result = typename std::conditional<
+    std::is_void<T>::value,
+    void,
+    std::vector<T>>::type;
+
+  using InternalResult = typename std::conditional<
+    std::is_void<T>::value,
+    Nothing,
+    std::vector<Optional<T>>>::type;
+
+  explicit CollectContext(size_t n) : result(n) {}
+  ~CollectContext() {
+    if (!threw.exchange(true)) {
+      // map Optional<T> -> T
+      std::vector<T> finalResult;
+      finalResult.reserve(result.size());
+      std::transform(result.begin(), result.end(),
+                     std::back_inserter(finalResult),
+                     [](Optional<T>& o) { return std::move(o.value()); });
+      p.setValue(std::move(finalResult));
+    }
   }
+  inline void setPartialResult(size_t i, Try<T>& t) {
+    result[i] = std::move(t.value());
+  }
+  Promise<Result> p;
+  InternalResult result;
+  std::atomic<bool> threw {false};
+};
 
-  return f_saved;
 }
 
+template <class InputIterator>
+Future<typename detail::CollectContext<
+  typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
+collect(InputIterator first, InputIterator last) {
+  typedef
+    typename std::iterator_traits<InputIterator>::value_type::value_type T;
+
+  auto ctx = std::make_shared<detail::CollectContext<T>>(
+    std::distance(first, last));
+  mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
+    if (t.hasException()) {
+       if (!ctx->threw.exchange(true)) {
+         ctx->p.setException(std::move(t.exception()));
+       }
+     } else if (!ctx->threw) {
+       ctx->setPartialResult(i, t);
+     }
+  });
+  return ctx->p.getFuture();
+}
+
+// collect (variadic)
+
+template <typename... Fs>
+typename detail::CollectVariadicContext<
+  typename std::decay<Fs>::type::value_type...>::type
+collect(Fs&&... fs) {
+  auto ctx = std::make_shared<detail::CollectVariadicContext<
+    typename std::decay<Fs>::type::value_type...>>();
+  detail::collectVariadicHelper<detail::CollectVariadicContext>(
+    ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
+  return ctx->p.getFuture();
+}
+
+// collectAny (iterator)
+
 template <class InputIterator>
 Future<
   std::pair<size_t,
             Try<
               typename
-              std::iterator_traits<InputIterator>::value_type::value_type> > >
-whenAny(InputIterator first, InputIterator last) {
+              std::iterator_traits<InputIterator>::value_type::value_type>>>
+collectAny(InputIterator first, InputIterator last) {
   typedef
     typename std::iterator_traits<InputIterator>::value_type::value_type T;
 
-  auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
-  auto f_saved = ctx->p.getFuture();
+  struct CollectAnyContext {
+    CollectAnyContext() {}
+    Promise<std::pair<size_t, Try<T>>> p;
+    std::atomic<bool> done {false};
+  };
 
-  for (size_t i = 0; first != last; first++, i++) {
-    auto& f = *first;
-    f.setCallback_([i, ctx](Try<T>&& t) {
-      if (!ctx->done.exchange(true)) {
-        ctx->p.setValue(std::make_pair(i, std::move(t)));
-      }
-      ctx->decref();
-    });
-  }
+  auto ctx = std::make_shared<CollectAnyContext>();
+  mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
+    if (!ctx->done.exchange(true)) {
+      ctx->p.setValue(std::make_pair(i, std::move(t)));
+    }
+  });
+  return ctx->p.getFuture();
+}
+
+// collectAnyWithoutException (iterator)
+
+template <class InputIterator>
+Future<std::pair<
+    size_t,
+    typename std::iterator_traits<InputIterator>::value_type::value_type>>
+collectAnyWithoutException(InputIterator first, InputIterator last) {
+  typedef
+      typename std::iterator_traits<InputIterator>::value_type::value_type T;
+
+  struct CollectAnyWithoutExceptionContext {
+    CollectAnyWithoutExceptionContext(){}
+    Promise<std::pair<size_t, T>> p;
+    std::atomic<bool> done{false};
+    std::atomic<size_t> nFulfilled{0};
+    size_t nTotal;
+  };
 
-  return f_saved;
+  auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
+  ctx->nTotal = size_t(std::distance(first, last));
+
+  mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
+    if (!t.hasException() && !ctx->done.exchange(true)) {
+      ctx->p.setValue(std::make_pair(i, std::move(t.value())));
+    } else if (++ctx->nFulfilled == ctx->nTotal) {
+      ctx->p.setException(t.exception());
+    }
+  });
+  return ctx->p.getFuture();
 }
 
+// collectN (iterator)
+
 template <class InputIterator>
 Future<std::vector<std::pair<size_t, Try<typename
   std::iterator_traits<InputIterator>::value_type::value_type>>>>
-whenN(InputIterator first, InputIterator last, size_t n) {
+collectN(InputIterator first, InputIterator last, size_t n) {
   typedef typename
     std::iterator_traits<InputIterator>::value_type::value_type T;
   typedef std::vector<std::pair<size_t, Try<T>>> V;
 
-  struct ctx_t {
+  struct CollectNContext {
     V v;
-    size_t completed;
+    std::atomic<size_t> completed = {0};
     Promise<V> p;
   };
-  auto ctx = std::make_shared<ctx_t>();
-  ctx->completed = 0;
-
-  // for each completed Future, increase count and add to vector, until we
-  // have n completed futures at which point we fulfil our Promise with the
-  // vector
-  auto it = first;
-  size_t i = 0;
-  while (it != last) {
-    it->then([ctx, n, i](Try<T>&& t) {
-      auto& v = ctx->v;
+  auto ctx = std::make_shared<CollectNContext>();
+
+  if (size_t(std::distance(first, last)) < n) {
+    ctx->p.setException(std::runtime_error("Not enough futures"));
+  } else {
+    // for each completed Future, increase count and add to vector, until we
+    // have n completed futures at which point we fulfil our Promise with the
+    // vector
+    mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
       auto c = ++ctx->completed;
       if (c <= n) {
         assert(ctx->v.size() < n);
-        v.push_back(std::make_pair(i, std::move(t)));
+        ctx->v.emplace_back(i, std::move(t));
         if (c == n) {
-          ctx->p.fulfilTry(Try<V>(std::move(v)));
+          ctx->p.setTry(Try<V>(std::move(ctx->v)));
         }
       }
     });
-
-    it++;
-    i++;
-  }
-
-  if (i < n) {
-    ctx->p.setException(std::runtime_error("Not enough futures"));
   }
 
   return ctx->p.getFuture();
 }
 
-namespace {
-  template <class T>
-  void getWaitHelper(Future<T>* f) {
-    // If we already have a value do the cheap thing
-    if (f->isReady()) {
-      return;
-    }
+// reduce (iterator)
 
-    folly::Baton<> baton;
-    f->then([&](Try<T> const&) {
-      baton.post();
-    });
-    baton.wait();
+template <class It, class T, class F>
+Future<T> reduce(It first, It last, T&& initial, F&& func) {
+  if (first == last) {
+    return makeFuture(std::move(initial));
   }
 
-  template <class T>
-  Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
-    // TODO make and use variadic whenAny #5877971
-    Promise<T> p;
-    auto token = std::make_shared<std::atomic<bool>>();
-    folly::Baton<> baton;
-
-    folly::detail::getTimekeeperSingleton()->after(dur)
-      .then([&,token](Try<void> const& t) {
-        if (token->exchange(true) == false) {
-          if (t.hasException()) {
-            p.setException(std::move(t.exception()));
-          } else {
-            p.setException(TimedOut());
-          }
-          baton.post();
-        }
+  typedef typename std::iterator_traits<It>::value_type::value_type ItT;
+  typedef
+      typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
+                                Try<ItT>,
+                                ItT>::type Arg;
+  typedef isTry<Arg> IsTry;
+
+  auto sfunc = std::make_shared<F>(std::move(func));
+
+  auto f = first->then(
+      [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
+        return (*sfunc)(
+            std::move(minitial), head.template get<IsTry::value, Arg&&>());
       });
 
-    f->then([&, token](Try<T>&& t) {
-      if (token->exchange(true) == false) {
-        p.fulfilTry(std::move(t));
-        baton.post();
-      }
+  for (++first; first != last; ++first) {
+    f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
+      return (*sfunc)(std::move(std::get<0>(t).value()),
+                  // Either return a ItT&& or a Try<ItT>&& depending
+                  // on the type of the argument of func.
+                  std::get<1>(t).template get<IsTry::value, Arg&&>());
     });
-
-    baton.wait();
-    return p.getFuture();
   }
+
+  return f;
 }
 
-template <class T>
-T Future<T>::get() {
-  getWaitHelper(this);
+// window (collection)
+
+template <class Collection, class F, class ItT, class Result>
+std::vector<Future<Result>>
+window(Collection input, F func, size_t n) {
+  struct WindowContext {
+    WindowContext(Collection&& i, F&& fn)
+        : input_(std::move(i)), promises_(input_.size()),
+          func_(std::move(fn))
+      {}
+    std::atomic<size_t> i_ {0};
+    Collection input_;
+    std::vector<Promise<Result>> promises_;
+    F func_;
+
+    static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
+      size_t i = ctx->i_++;
+      if (i < ctx->input_.size()) {
+        // Using setCallback_ directly since we don't need the Future
+        ctx->func_(std::move(ctx->input_[i])).setCallback_(
+          // ctx is captured by value
+          [ctx, i](Try<Result>&& t) {
+            ctx->promises_[i].setTry(std::move(t));
+            // Chain another future onto this one
+            spawn(std::move(ctx));
+          });
+      }
+    }
+  };
 
-  // Big assumption here: the then() call above, since it doesn't move out
-  // the value, leaves us with a value to return here. This would be a big
-  // no-no in user code, but I'm invoking internal developer privilege. This
-  // is slightly more efficient (save a move()) especially if there's an
-  // exception (save a throw).
-  return std::move(value());
-}
+  auto max = std::min(n, input.size());
 
-template <>
-inline void Future<void>::get() {
-  getWaitHelper(this);
-  value();
-}
+  auto ctx = std::make_shared<WindowContext>(
+    std::move(input), std::move(func));
 
-template <class T>
-T Future<T>::get(Duration dur) {
-  return std::move(getWaitTimeoutHelper(this, dur).value());
-}
+  for (size_t i = 0; i < max; ++i) {
+    // Start the first n Futures
+    WindowContext::spawn(ctx);
+  }
 
-template <>
-inline void Future<void>::get(Duration dur) {
-  getWaitTimeoutHelper(this, dur).value();
+  std::vector<Future<Result>> futures;
+  futures.reserve(ctx->promises_.size());
+  for (auto& promise : ctx->promises_) {
+    futures.emplace_back(promise.getFuture());
+  }
+
+  return futures;
 }
 
+// reduce
+
 template <class T>
-T Future<T>::getVia(DrivableExecutor* e) {
-  while (!isReady()) {
-    e->drive();
-  }
-  return std::move(value());
+template <class I, class F>
+Future<I> Future<T>::reduce(I&& initial, F&& func) {
+  return then([
+    minitial = std::forward<I>(initial),
+    mfunc = std::forward<F>(func)
+  ](T& vals) mutable {
+    auto ret = std::move(minitial);
+    for (auto& val : vals) {
+      ret = mfunc(std::move(ret), std::move(val));
+    }
+    return ret;
+  });
 }
 
-template <>
-inline void Future<void>::getVia(DrivableExecutor* e) {
-  while (!isReady()) {
-    e->drive();
+// unorderedReduce (iterator)
+
+template <class It, class T, class F, class ItT, class Arg>
+Future<T> unorderedReduce(It first, It last, T initial, F func) {
+  if (first == last) {
+    return makeFuture(std::move(initial));
   }
-  value();
+
+  typedef isTry<Arg> IsTry;
+
+  struct UnorderedReduceContext {
+    UnorderedReduceContext(T&& memo, F&& fn, size_t n)
+        : lock_(), memo_(makeFuture<T>(std::move(memo))),
+          func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
+      {}
+    folly::MicroSpinLock lock_; // protects memo_ and numThens_
+    Future<T> memo_;
+    F func_;
+    size_t numThens_; // how many Futures completed and called .then()
+    size_t numFutures_; // how many Futures in total
+    Promise<T> promise_;
+  };
+
+  auto ctx = std::make_shared<UnorderedReduceContext>(
+    std::move(initial), std::move(func), std::distance(first, last));
+
+  mapSetCallback<ItT>(
+      first,
+      last,
+      [ctx](size_t /* i */, Try<ItT>&& t) {
+        // Futures can be completed in any order, simultaneously.
+        // To make this non-blocking, we create a new Future chain in
+        // the order of completion to reduce the values.
+        // The spinlock just protects chaining a new Future, not actually
+        // executing the reduce, which should be really fast.
+        folly::MSLGuard lock(ctx->lock_);
+        ctx->memo_ =
+            ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
+              // Either return a ItT&& or a Try<ItT>&& depending
+              // on the type of the argument of func.
+              return ctx->func_(std::move(v),
+                                mt.template get<IsTry::value, Arg&&>());
+            });
+        if (++ctx->numThens_ == ctx->numFutures_) {
+          // After reducing the value of the last Future, fulfill the Promise
+          ctx->memo_.setCallback_(
+              [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
+        }
+      });
+
+  return ctx->promise_.getFuture();
 }
 
+// within
+
 template <class T>
 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
   return within(dur, TimedOut(), tk);
@@ -650,41 +943,49 @@ template <class E>
 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
 
   struct Context {
-    Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
+    Context(E ex) : exception(std::move(ex)), promise() {}
     E exception;
+    Future<Unit> thisFuture;
     Promise<T> promise;
-    std::atomic<bool> token;
+    std::atomic<bool> token {false};
   };
-  auto ctx = std::make_shared<Context>(std::move(e));
 
+  std::shared_ptr<Timekeeper> tks;
   if (!tk) {
-    tk = folly::detail::getTimekeeperSingleton();
+    tks = folly::detail::getTimekeeperSingleton();
+    tk = DCHECK_NOTNULL(tks.get());
   }
 
-  tk->after(dur)
-    .then([ctx](Try<void> const& t) {
-      if (ctx->token.exchange(true) == false) {
-        if (t.hasException()) {
-          ctx->promise.setException(std::move(t.exception()));
-        } else {
-          ctx->promise.setException(std::move(ctx->exception));
-        }
-      }
-    });
+  auto ctx = std::make_shared<Context>(std::move(e));
 
-  this->then([ctx](Try<T>&& t) {
+  ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
+    // TODO: "this" completed first, cancel "after"
     if (ctx->token.exchange(true) == false) {
-      ctx->promise.fulfilTry(std::move(t));
+      ctx->promise.setTry(std::move(t));
     }
   });
 
-  return ctx->promise.getFuture();
+  tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
+    // "after" completed first, cancel "this"
+    ctx->thisFuture.raise(TimedOut());
+    if (ctx->token.exchange(true) == false) {
+      if (t.hasException()) {
+        ctx->promise.setException(std::move(t.exception()));
+      } else {
+        ctx->promise.setException(std::move(ctx->exception));
+      }
+    }
+  });
+
+  return ctx->promise.getFuture().via(getExecutor());
 }
 
+// delayed
+
 template <class T>
 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
-  return whenAll(*this, futures::sleep(dur, tk))
-    .then([](std::tuple<Try<T>, Try<void>> tup) {
+  return collectAll(*this, futures::sleep(dur, tk))
+    .then([](std::tuple<Try<T>, Try<Unit>> tup) {
       Try<T>& t = std::get<0>(tup);
       return makeFuture<T>(std::move(t));
     });
@@ -694,42 +995,47 @@ namespace detail {
 
 template <class T>
 void waitImpl(Future<T>& f) {
-  Baton<> baton;
-  f = f.then([&](Try<T> t) {
-    baton.post();
-    return makeFuture(std::move(t));
-  });
+  // short-circuit if there's nothing to do
+  if (f.isReady()) return;
+
+  FutureBatonType baton;
+  f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
   baton.wait();
-  // There's a race here between the return here and the actual finishing of
-  // the future. f is completed, but the setup may not have finished on done
-  // after the baton has posted.
-  while (!f.isReady()) {
-    std::this_thread::yield();
-  }
+  assert(f.isReady());
 }
 
 template <class T>
 void waitImpl(Future<T>& f, Duration dur) {
-  auto baton = std::make_shared<Baton<>>();
-  f = f.then([baton](Try<T> t) {
+  // short-circuit if there's nothing to do
+  if (f.isReady()) {
+    return;
+  }
+
+  Promise<T> promise;
+  auto ret = promise.getFuture();
+  auto baton = std::make_shared<FutureBatonType>();
+  f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
+    promise.setTry(std::move(t));
     baton->post();
-    return makeFuture(std::move(t));
   });
-  // Let's preserve the invariant that if we did not timeout (timed_wait returns
-  // true), then the returned Future is complete when it is returned to the
-  // caller. We need to wait out the race for that Future to complete.
-  if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
-    while (!f.isReady()) {
-      std::this_thread::yield();
-    }
+  f = std::move(ret);
+  if (baton->timed_wait(dur)) {
+    assert(f.isReady());
   }
 }
 
 template <class T>
 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
+  // Set callback so to ensure that the via executor has something on it
+  // so that once the preceding future triggers this callback, drive will
+  // always have a callback to satisfy it
+  if (f.isReady())
+    return;
+  f = f.via(e).then([](T&& t) { return std::move(t); });
   while (!f.isReady()) {
     e->drive();
   }
+  assert(f.isReady());
 }
 
 } // detail
@@ -770,36 +1076,369 @@ Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
   return std::move(*this);
 }
 
-namespace futures {
+template <class T>
+T Future<T>::get() {
+  return std::move(wait().value());
+}
 
-  namespace {
-    template <class Z>
-    Future<Z> chainHelper(Future<Z> f) {
-      return f;
+template <class T>
+T Future<T>::get(Duration dur) {
+  wait(dur);
+  if (isReady()) {
+    return std::move(value());
+  } else {
+    throw TimedOut();
+  }
+}
+
+template <class T>
+T Future<T>::getVia(DrivableExecutor* e) {
+  return std::move(waitVia(e).value());
+}
+
+namespace detail {
+  template <class T>
+  struct TryEquals {
+    static bool equals(const Try<T>& t1, const Try<T>& t2) {
+      return t1.value() == t2.value();
     }
+  };
+}
+
+template <class T>
+Future<bool> Future<T>::willEqual(Future<T>& f) {
+  return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
+    if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
+      return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
+    } else {
+      return false;
+      }
+  });
+}
 
-    template <class Z, class F, class Fn, class... Callbacks>
-    Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
-      return chainHelper<Z>(f.then(fn), fns...);
+template <class T>
+template <class F>
+Future<T> Future<T>::filter(F&& predicate) {
+  return this->then([p = std::forward<F>(predicate)](T val) {
+    T const& valConstRef = val;
+    if (!p(valConstRef)) {
+      throw PredicateDoesNotObtain();
     }
+    return val;
+  });
+}
+
+template <class T>
+template <class Callback>
+auto Future<T>::thenMulti(Callback&& fn)
+    -> decltype(this->then(std::forward<Callback>(fn))) {
+  // thenMulti with one callback is just a then
+  return then(std::forward<Callback>(fn));
+}
+
+template <class T>
+template <class Callback, class... Callbacks>
+auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
+    -> decltype(this->then(std::forward<Callback>(fn)).
+                      thenMulti(std::forward<Callbacks>(fns)...)) {
+  // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
+  return then(std::forward<Callback>(fn)).
+         thenMulti(std::forward<Callbacks>(fns)...);
+}
+
+template <class T>
+template <class Callback, class... Callbacks>
+auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
+                                      Callbacks&&... fns)
+    -> decltype(this->then(std::forward<Callback>(fn)).
+                      thenMulti(std::forward<Callbacks>(fns)...)) {
+  // thenMultiExecutor with two callbacks is
+  // via(x).then(a).thenMulti(b, ...).via(oldX)
+  auto oldX = getExecutor();
+  setExecutor(x);
+  return then(std::forward<Callback>(fn)).
+         thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
+}
+
+template <class T>
+template <class Callback>
+auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
+    -> decltype(this->then(std::forward<Callback>(fn))) {
+  // thenMulti with one callback is just a then with an executor
+  return then(x, std::forward<Callback>(fn));
+}
+
+template <class F>
+inline Future<Unit> when(bool p, F&& thunk) {
+  return p ? std::forward<F>(thunk)().unit() : makeFuture();
+}
+
+template <class P, class F>
+Future<Unit> whileDo(P&& predicate, F&& thunk) {
+  if (predicate()) {
+    auto future = thunk();
+    return future.then([
+      predicate = std::forward<P>(predicate),
+      thunk = std::forward<F>(thunk)
+    ]() mutable {
+      return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
+    });
   }
+  return makeFuture();
+}
+
+template <class F>
+Future<Unit> times(const int n, F&& thunk) {
+  return folly::whileDo(
+      [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
+        return count->fetch_add(1) < n;
+      },
+      std::forward<F>(thunk));
+}
 
-  template <class A, class Z, class... Callbacks>
-  std::function<Future<Z>(Try<A>)>
-  chain(Callbacks... fns) {
-    MoveWrapper<Promise<A>> pw;
-    MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
-    return [=](Try<A> t) mutable {
-      pw->fulfilTry(std::move(t));
-      return std::move(*fw);
-    };
+namespace futures {
+  template <class It, class F, class ItT, class Result>
+  std::vector<Future<Result>> map(It first, It last, F func) {
+    std::vector<Future<Result>> results;
+    for (auto it = first; it != last; it++) {
+      results.push_back(it->then(func));
+    }
+    return results;
   }
+}
 
+namespace futures {
+
+namespace detail {
+
+struct retrying_policy_raw_tag {};
+struct retrying_policy_fut_tag {};
+
+template <class Policy>
+struct retrying_policy_traits {
+  using ew = exception_wrapper;
+  FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
+  template <class Ret>
+  using has_op = typename std::integral_constant<bool,
+        has_op_call<Policy, Ret(size_t, const ew&)>::value ||
+        has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
+  using is_raw = has_op<bool>;
+  using is_fut = has_op<Future<bool>>;
+  using tag = typename std::conditional<
+        is_raw::value, retrying_policy_raw_tag, typename std::conditional<
+        is_fut::value, retrying_policy_fut_tag, void>::type>::type;
+};
+
+template <class Policy, class FF, class Prom>
+void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
+  using F = typename std::result_of<FF(size_t)>::type;
+  using T = typename F::value_type;
+  auto f = ff(k++);
+  f.then([
+    k,
+    prom = std::move(prom),
+    pm = std::forward<Policy>(p),
+    ffm = std::forward<FF>(ff)
+  ](Try<T> && t) mutable {
+    if (t.hasValue()) {
+      prom.setValue(std::move(t).value());
+      return;
+    }
+    auto& x = t.exception();
+    auto q = pm(k, x);
+    q.then([
+      k,
+      prom = std::move(prom),
+      xm = std::move(x),
+      pm = std::move(pm),
+      ffm = std::move(ffm)
+    ](bool shouldRetry) mutable {
+      if (shouldRetry) {
+        retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
+      } else {
+        prom.setException(std::move(xm));
+      };
+    });
+  });
 }
 
-} // namespace folly
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(size_t k, Policy&& p, FF&& ff) {
+  using F = typename std::result_of<FF(size_t)>::type;
+  using T = typename F::value_type;
+  auto prom = Promise<T>();
+  auto f = prom.getFuture();
+  retryingImpl(
+      k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
+  return f;
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
+  auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
+    return makeFuture<bool>(pm(k, x));
+  };
+  return retrying(0, std::move(q), std::forward<FF>(ff));
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
+  return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
+}
 
-// I haven't included a Future<T&> specialization because I don't forsee us
-// using it, however it is not difficult to add when needed. Refer to
-// Future<void> for guidance. std::future and boost::future code would also be
-// instructive.
+//  jittered exponential backoff, clamped to [backoff_min, backoff_max]
+template <class URNG>
+Duration retryingJitteredExponentialBackoffDur(
+    size_t n,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG& rng) {
+  using d = Duration;
+  auto dist = std::normal_distribution<double>(0.0, jitter_param);
+  auto jitter = std::exp(dist(rng));
+  auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
+  return std::max(backoff_min, std::min(backoff_max, backoff));
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG&& rng,
+    Policy&& p) {
+  return [
+    pm = std::forward<Policy>(p),
+    max_tries,
+    backoff_min,
+    backoff_max,
+    jitter_param,
+    rngp = std::forward<URNG>(rng)
+  ](size_t n, const exception_wrapper& ex) mutable {
+    if (n == max_tries) {
+      return makeFuture(false);
+    }
+    return pm(n, ex).then(
+        [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
+            bool v) mutable {
+          if (!v) {
+            return makeFuture(false);
+          }
+          auto backoff = detail::retryingJitteredExponentialBackoffDur(
+              n, backoff_min, backoff_max, jitter_param, rngp);
+          return futures::sleep(backoff).then([] { return true; });
+        });
+  };
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG&& rng,
+    Policy&& p,
+    retrying_policy_raw_tag) {
+  auto q = [pm = std::forward<Policy>(p)](
+      size_t n, const exception_wrapper& e) {
+    return makeFuture(pm(n, e));
+  };
+  return retryingPolicyCappedJitteredExponentialBackoff(
+      max_tries,
+      backoff_min,
+      backoff_max,
+      jitter_param,
+      std::forward<URNG>(rng),
+      std::move(q));
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG&& rng,
+    Policy&& p,
+    retrying_policy_fut_tag) {
+  return retryingPolicyCappedJitteredExponentialBackoff(
+      max_tries,
+      backoff_min,
+      backoff_max,
+      jitter_param,
+      std::forward<URNG>(rng),
+      std::forward<Policy>(p));
+}
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff) {
+  using tag = typename detail::retrying_policy_traits<Policy>::tag;
+  return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
+}
+
+inline
+std::function<bool(size_t, const exception_wrapper&)>
+retryingPolicyBasic(
+    size_t max_tries) {
+  return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG&& rng,
+    Policy&& p) {
+  using tag = typename detail::retrying_policy_traits<Policy>::tag;
+  return detail::retryingPolicyCappedJitteredExponentialBackoff(
+      max_tries,
+      backoff_min,
+      backoff_max,
+      jitter_param,
+      std::forward<URNG>(rng),
+      std::forward<Policy>(p),
+      tag());
+}
+
+inline
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param) {
+  auto p = [](size_t, const exception_wrapper&) { return true; };
+  return retryingPolicyCappedJitteredExponentialBackoff(
+      max_tries,
+      backoff_min,
+      backoff_max,
+      jitter_param,
+      ThreadLocalPRNG(),
+      std::move(p));
+}
+
+}
+
+// Instantiate the most common Future types to save compile time
+extern template class Future<Unit>;
+extern template class Future<bool>;
+extern template class Future<int>;
+extern template class Future<int64_t>;
+extern template class Future<std::string>;
+extern template class Future<double>;
+
+} // namespace folly