From 8e16a2eb93a5a2764e370b63ca0f210b140cc308 Mon Sep 17 00:00:00 2001 From: Mainak Mandal Date: Fri, 31 Mar 2017 19:01:12 -0700 Subject: [PATCH] fix memory leak in case of large number of retries Summary: Infinite retries is something that is often needed for read-modify-write like workflows. The current implementation was creating a nested chain of implicit promises. This manifests as a memory leak after some time. Worse yet, even if it succeeds, it will take a long time to churn thru the chain of promises. Reviewed By: yfeldblum Differential Revision: D4770335 fbshipit-source-id: 44b8d6df1084de4514b66919a9838cf2322d6dce --- folly/Makefile.am | 2 + folly/futures/Future-inl.h | 50 ++++++++++++----- folly/futures/helpers.h | 3 ++ folly/futures/test/RetryingTest.cpp | 41 ++++++++++++++ folly/futures/test/TestExecutor.cpp | 72 +++++++++++++++++++++++++ folly/futures/test/TestExecutor.h | 50 +++++++++++++++++ folly/futures/test/TestExecutorTest.cpp | 39 ++++++++++++++ folly/test/Makefile.am | 1 + 8 files changed, 246 insertions(+), 12 deletions(-) create mode 100644 folly/futures/test/TestExecutor.cpp create mode 100644 folly/futures/test/TestExecutor.h create mode 100644 folly/futures/test/TestExecutorTest.cpp diff --git a/folly/Makefile.am b/folly/Makefile.am index 16ea751f..aedd5989 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -176,6 +176,7 @@ nobase_follyinclude_HEADERS = \ futures/detail/Core.h \ futures/detail/FSM.h \ futures/detail/Types.h \ + futures/test/TestExecutor.h \ gen/Base.h \ gen/Base-inl.h \ gen/Combine.h \ @@ -434,6 +435,7 @@ libfolly_la_SOURCES = \ futures/ManualExecutor.cpp \ futures/QueuedImmediateExecutor.cpp \ futures/ThreadWheelTimekeeper.cpp \ + futures/test/TestExecutor.cpp \ detail/Futex.cpp \ detail/StaticSingletonManager.cpp \ detail/ThreadLocalDetail.cpp \ diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index 28a5ffaa..b353ff81 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -1229,23 +1229,49 @@ struct retrying_policy_traits { is_fut::value, retrying_policy_fut_tag, void>::type>::type; }; +template +void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) { + using F = typename std::result_of::type; + using T = typename F::value_type; + auto f = ff(k++); + f.then([ + k, + prom = std::move(prom), + pm = std::forward(p), + ffm = std::forward(ff) + ](Try && t) mutable { + if (t.hasValue()) { + prom.setValue(std::move(t).value()); + return; + } + auto& x = t.exception(); + auto q = pm(k, x); + q.then([ + k, + prom = std::move(prom), + xm = std::move(x), + pm = std::move(pm), + ffm = std::move(ffm) + ](bool shouldRetry) mutable { + if (shouldRetry) { + retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom)); + } else { + prom.setException(std::move(xm)); + }; + }); + }); +} + template typename std::result_of::type retrying(size_t k, Policy&& p, FF&& ff) { using F = typename std::result_of::type; using T = typename F::value_type; - auto f = ff(k++); - return f.onError( - [ k, pm = std::forward(p), ffm = std::forward(ff) ]( - exception_wrapper x) mutable { - auto q = pm(k, x); - return q.then( - [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ]( - bool r) mutable { - return r ? retrying(k, std::move(pm), std::move(ffm)) - : makeFuture(std::move(xm)); - }); - }); + auto prom = Promise(); + auto f = prom.getFuture(); + retryingImpl( + k, std::forward(p), std::forward(ff), std::move(prom)); + return f; } template diff --git a/folly/futures/helpers.h b/folly/futures/helpers.h index 379baf70..bc0b8b95 100644 --- a/folly/futures/helpers.h +++ b/folly/futures/helpers.h @@ -363,6 +363,9 @@ namespace futures { * indicating that the failure was transitory. * * Cancellation is not supported. + * + * If both FF and Policy inline executes, then it is possible to hit a stack + * overflow due to the recursive nature of the retry implementation */ template typename std::result_of::type diff --git a/folly/futures/test/RetryingTest.cpp b/folly/futures/test/RetryingTest.cpp index d1c429df..2de5c8a4 100644 --- a/folly/futures/test/RetryingTest.cpp +++ b/folly/futures/test/RetryingTest.cpp @@ -20,6 +20,8 @@ #include #include +#include +#include "TestExecutor.h" using namespace std; using namespace std::chrono; @@ -137,6 +139,45 @@ TEST(RetryingTest, policy_sleep_defaults) { }); } +TEST(RetryingTest, large_retries) { + rlimit oldMemLimit; + PCHECK(getrlimit(RLIMIT_AS, &oldMemLimit) == 0); + + rlimit newMemLimit; + newMemLimit.rlim_cur = std::min(1UL << 30, oldMemLimit.rlim_max); + newMemLimit.rlim_max = oldMemLimit.rlim_max; + PCHECK(setrlimit(RLIMIT_AS, &newMemLimit) == 0); + SCOPE_EXIT { + PCHECK(setrlimit(RLIMIT_AS, &oldMemLimit) == 0); + }; + + TestExecutor executor; + // size of implicit promise is at least the size of the return. + using LargeReturn = array; + auto func = [&executor](size_t retryNum) -> Future { + return via(&executor).then([retryNum] { + return retryNum < 10000 + ? makeFuture( + make_exception_wrapper("keep trying")) + : makeFuture(LargeReturn()); + }); + }; + + vector> futures; + for (auto idx = 0; idx < 40; ++idx) { + futures.emplace_back(futures::retrying( + [&executor](size_t, const exception_wrapper&) { + return via(&executor).then([] { return true; }); + }, + func)); + } + + for (auto& f : futures) { + f.wait(); + EXPECT_TRUE(f.hasValue()); + } +} + /* TEST(RetryingTest, policy_sleep_cancel) { multiAttemptExpectDurationWithin(5, milliseconds(0), milliseconds(10), []{ diff --git a/folly/futures/test/TestExecutor.cpp b/folly/futures/test/TestExecutor.cpp new file mode 100644 index 00000000..16f8d26d --- /dev/null +++ b/folly/futures/test/TestExecutor.cpp @@ -0,0 +1,72 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TestExecutor.h" + +using namespace std; + +namespace folly { + +TestExecutor::TestExecutor() { + const auto kWorkers = std::max(1U, thread::hardware_concurrency()); + for (auto idx = 0U; idx < kWorkers; ++idx) { + workers_.emplace_back([this] { + while (true) { + Func work; + { + unique_lock lk(m_); + cv_.wait(lk, [this] { return !workItems_.empty(); }); + work = std::move(workItems_.front()); + workItems_.pop(); + } + if (!work) { + break; + } + work(); + } + }); + } +} + +TestExecutor::~TestExecutor() { + for (auto& worker : workers_) { + addImpl({}); + } + + for (auto& worker : workers_) { + worker.join(); + } +} + +void TestExecutor::add(Func f) { + if (f) { + addImpl(std::move(f)); + } +} + +uint32_t TestExecutor::numThreads() const { + return workers_.size(); +} + +void TestExecutor::addImpl(Func f) { + { + lock_guard g(m_); + workItems_.push(std::move(f)); + } + cv_.notify_one(); +} + +} // folly diff --git a/folly/futures/test/TestExecutor.h b/folly/futures/test/TestExecutor.h new file mode 100644 index 00000000..c09713be --- /dev/null +++ b/folly/futures/test/TestExecutor.h @@ -0,0 +1,50 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include + +namespace folly { + +/** + * A simple multithreaded executor for use in tests etc + */ +class TestExecutor : public Executor { + public: + TestExecutor(); + + ~TestExecutor() override; + + void add(Func f) override; + + uint32_t numThreads() const; + + private: + void addImpl(Func f); + + std::mutex m_; + std::queue workItems_; + std::condition_variable cv_; + + std::vector workers_; +}; + +} // folly diff --git a/folly/futures/test/TestExecutorTest.cpp b/folly/futures/test/TestExecutorTest.cpp new file mode 100644 index 00000000..750e904b --- /dev/null +++ b/folly/futures/test/TestExecutorTest.cpp @@ -0,0 +1,39 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "TestExecutor.h" + +using namespace std; +using namespace std::chrono; +using namespace folly; + +TEST(TestExecutor, parallel_run) { + mutex m; + set ids; + auto executor = std::make_unique(); + const auto numThreads = executor->numThreads(); + for (auto idx = 0U; idx < numThreads * 10; ++idx) { + executor->add([&m, &ids]() mutable { + /* sleep override */ this_thread::sleep_for(milliseconds(100)); + lock_guard lg(m); + ids.insert(this_thread::get_id()); + }); + } + + executor = nullptr; + EXPECT_EQ(ids.size(), numThreads); +} diff --git a/folly/test/Makefile.am b/folly/test/Makefile.am index 9d983a3d..b22aa683 100644 --- a/folly/test/Makefile.am +++ b/folly/test/Makefile.am @@ -280,6 +280,7 @@ futures_test_SOURCES = \ ../futures/test/RetryingTest.cpp \ ../futures/test/SelfDestructTest.cpp \ ../futures/test/SharedPromiseTest.cpp \ + ../futures/test/TestExecutorTest.cpp \ ../futures/test/ThenCompileTest.cpp \ ../futures/test/ThenTest.cpp \ ../futures/test/TimekeeperTest.cpp \ -- 2.34.1