fix memory leak in case of large number of retries
[folly.git] / folly / futures / Future-inl.h
index 61ea4ba86e7b8a3f5d87684b9f2e8c2ed75e2790..b353ff81a411f932c28da12af08239ff7ba48e32 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -62,20 +62,6 @@ Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
   return *this;
 }
 
-template <class T>
-template <typename U, typename>
-Future<T>::Future(Future<U>&& other) noexcept
-    : core_(detail::Core<T>::convert(other.core_)) {
-  other.core_ = nullptr;
-}
-
-template <class T>
-template <typename U, typename>
-Future<T>& Future<T>::operator=(Future<U>&& other) noexcept {
-  std::swap(core_, detail::Core<T>::convert(other.core_));
-  return *this;
-}
-
 template <class T>
 template <class T2, typename>
 Future<T>::Future(T2&& val)
@@ -173,14 +159,16 @@ Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
      in some circumstances, but I think it should be explicit not implicit
      in the destruction of the Future used to create it.
      */
-  setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
-      Try<T> && t) mutable {
-    if (!isTry && t.hasException()) {
-      pm.setException(std::move(t.exception()));
-    } else {
-      pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
-    }
-  });
+  setCallback_(
+      [ func = std::forward<F>(func), pm = std::move(p) ](Try<T> && t) mutable {
+        if (!isTry && t.hasException()) {
+          pm.setException(std::move(t.exception()));
+        } else {
+          pm.setWith([&]() {
+            return std::move(func)(t.template get<isTry, Args>()...);
+          });
+        }
+      });
 
   return f;
 }
@@ -203,22 +191,28 @@ Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
   auto f = p.getFuture();
   f.core_->setExecutorNoLock(getExecutor());
 
-  setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
+  setCallback_([ func = std::forward<F>(func), pm = std::move(p) ](
       Try<T> && t) mutable {
-    if (!isTry && t.hasException()) {
-      pm.setException(std::move(t.exception()));
-    } else {
-      try {
-        auto f2 = funcm(t.template get<isTry, Args>()...);
-        // that didn't throw, now we can steal p
-        f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
-          p.setTry(std::move(b));
-        });
-      } catch (const std::exception& e) {
-        pm.setException(exception_wrapper(std::current_exception(), e));
-      } catch (...) {
-        pm.setException(exception_wrapper(std::current_exception()));
+    auto ew = [&] {
+      if (!isTry && t.hasException()) {
+        return std::move(t.exception());
+      } else {
+        try {
+          auto f2 = std::move(func)(t.template get<isTry, Args>()...);
+          // that didn't throw, now we can steal p
+          f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
+            p.setTry(std::move(b));
+          });
+          return exception_wrapper();
+        } catch (const std::exception& e) {
+          return exception_wrapper(std::current_exception(), e);
+        } catch (...) {
+          return exception_wrapper(std::current_exception());
+        }
       }
+    }();
+    if (ew) {
+      pm.setException(std::move(ew));
     }
   });
 
@@ -271,13 +265,14 @@ Future<T>::onError(F&& func) {
   p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
   auto f = p.getFuture();
 
-  setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
-      Try<T> && t) mutable {
-    if (!t.template withException<Exn>(
-            [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
-      pm.setTry(std::move(t));
-    }
-  });
+  setCallback_(
+      [ func = std::forward<F>(func), pm = std::move(p) ](Try<T> && t) mutable {
+        if (!t.template withException<Exn>([&](Exn& e) {
+              pm.setWith([&] { return std::move(func)(e); });
+            })) {
+          pm.setTry(std::move(t));
+        }
+      });
 
   return f;
 }
@@ -298,18 +293,24 @@ Future<T>::onError(F&& func) {
   Promise<T> p;
   auto f = p.getFuture();
 
-  setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
+  setCallback_([ pm = std::move(p), func = std::forward<F>(func) ](
       Try<T> && t) mutable {
     if (!t.template withException<Exn>([&](Exn& e) {
-          try {
-            auto f2 = funcm(e);
-            f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
-              pm.setTry(std::move(t2));
-            });
-          } catch (const std::exception& e2) {
-            pm.setException(exception_wrapper(std::current_exception(), e2));
-          } catch (...) {
-            pm.setException(exception_wrapper(std::current_exception()));
+          auto ew = [&] {
+            try {
+              auto f2 = std::move(func)(e);
+              f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
+                pm.setTry(std::move(t2));
+              });
+              return exception_wrapper();
+            } catch (const std::exception& e2) {
+              return exception_wrapper(std::current_exception(), e2);
+            } catch (...) {
+              return exception_wrapper(std::current_exception());
+            }
+          }();
+          if (ew) {
+            pm.setException(std::move(ew));
           }
         })) {
       pm.setTry(std::move(t));
@@ -348,17 +349,23 @@ Future<T>::onError(F&& func) {
   Promise<T> p;
   auto f = p.getFuture();
   setCallback_(
-      [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
+      [ pm = std::move(p), func = std::forward<F>(func) ](Try<T> t) mutable {
         if (t.hasException()) {
-          try {
-            auto f2 = funcm(std::move(t.exception()));
-            f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
-              pm.setTry(std::move(t2));
-            });
-          } catch (const std::exception& e2) {
-            pm.setException(exception_wrapper(std::current_exception(), e2));
-          } catch (...) {
-            pm.setException(exception_wrapper(std::current_exception()));
+          auto ew = [&] {
+            try {
+              auto f2 = std::move(func)(std::move(t.exception()));
+              f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
+                pm.setTry(std::move(t2));
+              });
+              return exception_wrapper();
+            } catch (const std::exception& e2) {
+              return exception_wrapper(std::current_exception(), e2);
+            } catch (...) {
+              return exception_wrapper(std::current_exception());
+            }
+          }();
+          if (ew) {
+            pm.setException(std::move(ew));
           }
         } else {
           pm.setTry(std::move(t));
@@ -383,9 +390,9 @@ Future<T>::onError(F&& func) {
   Promise<T> p;
   auto f = p.getFuture();
   setCallback_(
-      [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
+      [ pm = std::move(p), func = std::forward<F>(func) ](Try<T> t) mutable {
         if (t.hasException()) {
-          pm.setWith([&] { return funcm(std::move(t.exception())); });
+          pm.setWith([&] { return std::move(func)(std::move(t.exception())); });
         } else {
           pm.setTry(std::move(t));
         }
@@ -582,7 +589,7 @@ collectAll(InputIterator first, InputIterator last) {
     typename std::iterator_traits<InputIterator>::value_type::value_type T;
 
   struct CollectAllContext {
-    CollectAllContext(int n) : results(n) {}
+    CollectAllContext(size_t n) : results(n) {}
     ~CollectAllContext() {
       p.setValue(std::move(results));
     }
@@ -590,7 +597,8 @@ collectAll(InputIterator first, InputIterator last) {
     std::vector<Try<T>> results;
   };
 
-  auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
+  auto ctx =
+      std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
     ctx->results[i] = std::move(t);
   });
@@ -617,7 +625,7 @@ struct CollectContext {
     Nothing,
     std::vector<Optional<T>>>::type;
 
-  explicit CollectContext(int n) : result(n) {}
+  explicit CollectContext(size_t n) : result(n) {}
   ~CollectContext() {
     if (!threw.exchange(true)) {
       // map Optional<T> -> T
@@ -686,7 +694,7 @@ collectAny(InputIterator first, InputIterator last) {
     typename std::iterator_traits<InputIterator>::value_type::value_type T;
 
   struct CollectAnyContext {
-    CollectAnyContext() {};
+    CollectAnyContext() {}
     Promise<std::pair<size_t, Try<T>>> p;
     std::atomic<bool> done {false};
   };
@@ -711,7 +719,7 @@ collectAnyWithoutException(InputIterator first, InputIterator last) {
       typename std::iterator_traits<InputIterator>::value_type::value_type T;
 
   struct CollectAnyWithoutExceptionContext {
-    CollectAnyWithoutExceptionContext(){};
+    CollectAnyWithoutExceptionContext(){}
     Promise<std::pair<size_t, T>> p;
     std::atomic<bool> done{false};
     std::atomic<size_t> nFulfilled{0};
@@ -719,7 +727,7 @@ collectAnyWithoutException(InputIterator first, InputIterator last) {
   };
 
   auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
-  ctx->nTotal = std::distance(first, last);
+  ctx->nTotal = size_t(std::distance(first, last));
 
   mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
     if (!t.hasException() && !ctx->done.exchange(true)) {
@@ -884,7 +892,7 @@ Future<T> unorderedReduce(It first, It last, T initial, F func) {
     UnorderedReduceContext(T&& memo, F&& fn, size_t n)
         : lock_(), memo_(makeFuture<T>(std::move(memo))),
           func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
-      {};
+      {}
     folly::MicroSpinLock lock_; // protects memo_ and numThens_
     Future<T> memo_;
     F func_;
@@ -1221,23 +1229,49 @@ struct retrying_policy_traits {
         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
 };
 
+template <class Policy, class FF, class Prom>
+void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
+  using F = typename std::result_of<FF(size_t)>::type;
+  using T = typename F::value_type;
+  auto f = ff(k++);
+  f.then([
+    k,
+    prom = std::move(prom),
+    pm = std::forward<Policy>(p),
+    ffm = std::forward<FF>(ff)
+  ](Try<T> && t) mutable {
+    if (t.hasValue()) {
+      prom.setValue(std::move(t).value());
+      return;
+    }
+    auto& x = t.exception();
+    auto q = pm(k, x);
+    q.then([
+      k,
+      prom = std::move(prom),
+      xm = std::move(x),
+      pm = std::move(pm),
+      ffm = std::move(ffm)
+    ](bool shouldRetry) mutable {
+      if (shouldRetry) {
+        retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
+      } else {
+        prom.setException(std::move(xm));
+      };
+    });
+  });
+}
+
 template <class Policy, class FF>
 typename std::result_of<FF(size_t)>::type
 retrying(size_t k, Policy&& p, FF&& ff) {
   using F = typename std::result_of<FF(size_t)>::type;
   using T = typename F::value_type;
-  auto f = ff(k++);
-  return f.onError(
-      [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
-          exception_wrapper x) mutable {
-        auto q = pm(k, x);
-        return q.then(
-            [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
-                bool r) mutable {
-              return r ? retrying(k, std::move(pm), std::move(ffm))
-                       : makeFuture<T>(std::move(xm));
-            });
-      });
+  auto prom = Promise<T>();
+  auto f = prom.getFuture();
+  retryingImpl(
+      k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
+  return f;
 }
 
 template <class Policy, class FF>