fix memory leak in case of large number of retries
authorMainak Mandal <mmandal@fb.com>
Sat, 1 Apr 2017 02:01:12 +0000 (19:01 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Sat, 1 Apr 2017 02:08:27 +0000 (19:08 -0700)
Summary: Infinite retries is something that is often needed for read-modify-write like workflows. The current implementation was creating a nested chain of implicit promises. This manifests as a memory leak after some time. Worse yet, even if it succeeds, it will take a long time to churn thru the chain of promises.

Reviewed By: yfeldblum

Differential Revision: D4770335

fbshipit-source-id: 44b8d6df1084de4514b66919a9838cf2322d6dce

folly/Makefile.am
folly/futures/Future-inl.h
folly/futures/helpers.h
folly/futures/test/RetryingTest.cpp
folly/futures/test/TestExecutor.cpp [new file with mode: 0644]
folly/futures/test/TestExecutor.h [new file with mode: 0644]
folly/futures/test/TestExecutorTest.cpp [new file with mode: 0644]
folly/test/Makefile.am

index 16ea751f2f6f30e2c7d91bcd36b15c93e70ac12f..aedd5989896572bb76fbd0169a339ffe7420c0de 100644 (file)
@@ -176,6 +176,7 @@ nobase_follyinclude_HEADERS = \
        futures/detail/Core.h \
        futures/detail/FSM.h \
        futures/detail/Types.h \
+       futures/test/TestExecutor.h \
        gen/Base.h \
        gen/Base-inl.h \
        gen/Combine.h \
@@ -434,6 +435,7 @@ libfolly_la_SOURCES = \
        futures/ManualExecutor.cpp \
        futures/QueuedImmediateExecutor.cpp \
        futures/ThreadWheelTimekeeper.cpp \
+       futures/test/TestExecutor.cpp \
        detail/Futex.cpp \
        detail/StaticSingletonManager.cpp \
        detail/ThreadLocalDetail.cpp \
index 28a5ffaa8f3f611ccdd44dfadbf04446ae77e137..b353ff81a411f932c28da12af08239ff7ba48e32 100644 (file)
@@ -1229,23 +1229,49 @@ struct retrying_policy_traits {
         is_fut::value, retrying_policy_fut_tag, void>::type>::type;
 };
 
+template <class Policy, class FF, class Prom>
+void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
+  using F = typename std::result_of<FF(size_t)>::type;
+  using T = typename F::value_type;
+  auto f = ff(k++);
+  f.then([
+    k,
+    prom = std::move(prom),
+    pm = std::forward<Policy>(p),
+    ffm = std::forward<FF>(ff)
+  ](Try<T> && t) mutable {
+    if (t.hasValue()) {
+      prom.setValue(std::move(t).value());
+      return;
+    }
+    auto& x = t.exception();
+    auto q = pm(k, x);
+    q.then([
+      k,
+      prom = std::move(prom),
+      xm = std::move(x),
+      pm = std::move(pm),
+      ffm = std::move(ffm)
+    ](bool shouldRetry) mutable {
+      if (shouldRetry) {
+        retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
+      } else {
+        prom.setException(std::move(xm));
+      };
+    });
+  });
+}
+
 template <class Policy, class FF>
 typename std::result_of<FF(size_t)>::type
 retrying(size_t k, Policy&& p, FF&& ff) {
   using F = typename std::result_of<FF(size_t)>::type;
   using T = typename F::value_type;
-  auto f = ff(k++);
-  return f.onError(
-      [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
-          exception_wrapper x) mutable {
-        auto q = pm(k, x);
-        return q.then(
-            [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
-                bool r) mutable {
-              return r ? retrying(k, std::move(pm), std::move(ffm))
-                       : makeFuture<T>(std::move(xm));
-            });
-      });
+  auto prom = Promise<T>();
+  auto f = prom.getFuture();
+  retryingImpl(
+      k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
+  return f;
 }
 
 template <class Policy, class FF>
index 379baf703a4a25b899e295c51f37325ad683f147..bc0b8b950e6aa07739d4dc24d188062dd1820e78 100644 (file)
@@ -363,6 +363,9 @@ namespace futures {
  *  indicating that the failure was transitory.
  *
  *  Cancellation is not supported.
+ *
+ *  If both FF and Policy inline executes, then it is possible to hit a stack
+ *  overflow due to the recursive nature of the retry implementation
  */
 template <class Policy, class FF>
 typename std::result_of<FF(size_t)>::type
index d1c429dfd323b40e504ec30df6b587b5af8c72b6..2de5c8a441a4a6884181eb4699ca60717381b24a 100644 (file)
@@ -20,6 +20,8 @@
 
 #include <folly/futures/Future.h>
 #include <folly/portability/GTest.h>
+#include <folly/portability/SysResource.h>
+#include "TestExecutor.h"
 
 using namespace std;
 using namespace std::chrono;
@@ -137,6 +139,45 @@ TEST(RetryingTest, policy_sleep_defaults) {
   });
 }
 
+TEST(RetryingTest, large_retries) {
+  rlimit oldMemLimit;
+  PCHECK(getrlimit(RLIMIT_AS, &oldMemLimit) == 0);
+
+  rlimit newMemLimit;
+  newMemLimit.rlim_cur = std::min(1UL << 30, oldMemLimit.rlim_max);
+  newMemLimit.rlim_max = oldMemLimit.rlim_max;
+  PCHECK(setrlimit(RLIMIT_AS, &newMemLimit) == 0);
+  SCOPE_EXIT {
+    PCHECK(setrlimit(RLIMIT_AS, &oldMemLimit) == 0);
+  };
+
+  TestExecutor executor;
+  // size of implicit promise is at least the size of the return.
+  using LargeReturn = array<uint64_t, 16000>;
+  auto func = [&executor](size_t retryNum) -> Future<LargeReturn> {
+    return via(&executor).then([retryNum] {
+      return retryNum < 10000
+          ? makeFuture<LargeReturn>(
+                make_exception_wrapper<std::runtime_error>("keep trying"))
+          : makeFuture<LargeReturn>(LargeReturn());
+    });
+  };
+
+  vector<Future<LargeReturn>> futures;
+  for (auto idx = 0; idx < 40; ++idx) {
+    futures.emplace_back(futures::retrying(
+        [&executor](size_t, const exception_wrapper&) {
+          return via(&executor).then([] { return true; });
+        },
+        func));
+  }
+
+  for (auto& f : futures) {
+    f.wait();
+    EXPECT_TRUE(f.hasValue());
+  }
+}
+
 /*
 TEST(RetryingTest, policy_sleep_cancel) {
   multiAttemptExpectDurationWithin(5, milliseconds(0), milliseconds(10), []{
diff --git a/folly/futures/test/TestExecutor.cpp b/folly/futures/test/TestExecutor.cpp
new file mode 100644 (file)
index 0000000..16f8d26
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestExecutor.h"
+
+using namespace std;
+
+namespace folly {
+
+TestExecutor::TestExecutor() {
+  const auto kWorkers = std::max(1U, thread::hardware_concurrency());
+  for (auto idx = 0U; idx < kWorkers; ++idx) {
+    workers_.emplace_back([this] {
+      while (true) {
+        Func work;
+        {
+          unique_lock<mutex> lk(m_);
+          cv_.wait(lk, [this] { return !workItems_.empty(); });
+          work = std::move(workItems_.front());
+          workItems_.pop();
+        }
+        if (!work) {
+          break;
+        }
+        work();
+      }
+    });
+  }
+}
+
+TestExecutor::~TestExecutor() {
+  for (auto& worker : workers_) {
+    addImpl({});
+  }
+
+  for (auto& worker : workers_) {
+    worker.join();
+  }
+}
+
+void TestExecutor::add(Func f) {
+  if (f) {
+    addImpl(std::move(f));
+  }
+}
+
+uint32_t TestExecutor::numThreads() const {
+  return workers_.size();
+}
+
+void TestExecutor::addImpl(Func f) {
+  {
+    lock_guard<mutex> g(m_);
+    workItems_.push(std::move(f));
+  }
+  cv_.notify_one();
+}
+
+} // folly
diff --git a/folly/futures/test/TestExecutor.h b/folly/futures/test/TestExecutor.h
new file mode 100644 (file)
index 0000000..c09713b
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <condition_variable>
+#include <queue>
+#include <thread>
+
+#include <folly/Executor.h>
+
+namespace folly {
+
+/**
+ * A simple multithreaded executor for use in tests etc
+ */
+class TestExecutor : public Executor {
+ public:
+  TestExecutor();
+
+  ~TestExecutor() override;
+
+  void add(Func f) override;
+
+  uint32_t numThreads() const;
+
+ private:
+  void addImpl(Func f);
+
+  std::mutex m_;
+  std::queue<Func> workItems_;
+  std::condition_variable cv_;
+
+  std::vector<std::thread> workers_;
+};
+
+} // folly
diff --git a/folly/futures/test/TestExecutorTest.cpp b/folly/futures/test/TestExecutorTest.cpp
new file mode 100644 (file)
index 0000000..750e904
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/portability/GTest.h>
+#include "TestExecutor.h"
+
+using namespace std;
+using namespace std::chrono;
+using namespace folly;
+
+TEST(TestExecutor, parallel_run) {
+  mutex m;
+  set<thread::id> ids;
+  auto executor = std::make_unique<TestExecutor>();
+  const auto numThreads = executor->numThreads();
+  for (auto idx = 0U; idx < numThreads * 10; ++idx) {
+    executor->add([&m, &ids]() mutable {
+      /* sleep override */ this_thread::sleep_for(milliseconds(100));
+      lock_guard<mutex> lg(m);
+      ids.insert(this_thread::get_id());
+    });
+  }
+
+  executor = nullptr;
+  EXPECT_EQ(ids.size(), numThreads);
+}
index 9d983a3d692893fb37dae73834e9577c5d8d4215..b22aa68348b4adeeb0ec4371959a46d97ad377a0 100644 (file)
@@ -280,6 +280,7 @@ futures_test_SOURCES = \
     ../futures/test/RetryingTest.cpp \
     ../futures/test/SelfDestructTest.cpp \
     ../futures/test/SharedPromiseTest.cpp \
+    ../futures/test/TestExecutorTest.cpp \
     ../futures/test/ThenCompileTest.cpp \
     ../futures/test/ThenTest.cpp \
     ../futures/test/TimekeeperTest.cpp \