From 4ebfdff303924da5636e512ef7268612cba7c803 Mon Sep 17 00:00:00 2001 From: Dave Watson Date: Mon, 27 Oct 2014 10:11:03 -0700 Subject: [PATCH] Add MemoryIdler suppot to IOThreadPoolExecutor Summary: Idle memory in IO threads. If loop is unused for a period of time, free associated memory, and call epoll again. Had to add a new list of callbacks that don't make the loop nonblocking (i.e. using runInLoop() instead would use the nonblocking version of epoll). Could bake this in to EventBase directly, but that seems like the wrong abstraction, since EventBase doesn't actually control the thread - for example, that approach would also free up memory for stack-allocated EventBases where they are used synchronously by clients. This diff doesn't change IO scheduling at all - current IO work is round robin, so this probably only helps if the whole server is idle (at least until we add smarter scheduling) Test Plan: Based on top of D1585087. fbconfig thrift/perf/cpp; fbmake dbg _bin/thrift/perf/cpp/ThriftServer _bin/thrift/perf/cpp/loadgen -num_threads=100 -weight_sendrecv=1 -cpp2 -async Ran loadgen for a while, watched res memory in top. Stopped loadgen. After ~5 sec, res memory was much reduced. Reviewed By: jsedgwick@fb.com Subscribers: trunkagent, doug, fugalh, njormrod, folly-diffs@ FB internal diff: D1641057 Tasks: 5002425 --- folly/detail/MemoryIdler.h | 46 +++++++++++-------- .../concurrent/IOThreadPoolExecutor.cpp | 45 ++++++++++++++++++ folly/io/async/EventBase.cpp | 22 ++++++++- folly/io/async/EventBase.h | 3 ++ 4 files changed, 97 insertions(+), 19 deletions(-) diff --git a/folly/detail/MemoryIdler.h b/folly/detail/MemoryIdler.h index 28d3c196..cbb84612 100644 --- a/folly/detail/MemoryIdler.h +++ b/folly/detail/MemoryIdler.h @@ -74,6 +74,31 @@ struct MemoryIdler { /// avoid synchronizing their flushes. static AtomicStruct defaultIdleTimeout; + /// Selects a timeout pseudo-randomly chosen to be between + /// idleTimeout and idleTimeout * (1 + timeoutVariationFraction), to + /// smooth out the behavior in a bursty system + template + static typename Clock::duration getVariationTimeout( + typename Clock::duration idleTimeout + = defaultIdleTimeout.load(std::memory_order_acquire), + float timeoutVariationFrac = 0.5) { + if (idleTimeout.count() > 0 && timeoutVariationFrac > 0) { + // hash the pthread_t and the time to get the adjustment. + // Standard hash func isn't very good, so bit mix the result + auto pr = std::make_pair(pthread_self(), + Clock::now().time_since_epoch().count()); + std::hash hash_fn; + uint64_t h = folly::hash::twang_mix64(hash_fn(pr)); + + // multiplying the duration by a floating point doesn't work, grr.. + auto extraFrac = + timeoutVariationFrac / std::numeric_limits::max() * h; + uint64_t tics = idleTimeout.count() * (1 + extraFrac); + idleTimeout = typename Clock::duration(tics); + } + + return idleTimeout; + } /// Equivalent to fut.futexWait(expected, waitMask), but calls /// flushLocalMallocCaches() and unmapUnusedStack(stackToRetain) @@ -100,26 +125,11 @@ struct MemoryIdler { return fut.futexWait(expected, waitMask); } + idleTimeout = getVariationTimeout(idleTimeout, timeoutVariationFrac); if (idleTimeout.count() > 0) { - auto begin = Clock::now(); - - if (timeoutVariationFrac > 0) { - // hash the pthread_t and the time to get the adjustment. - // Standard hash func isn't very good, so bit mix the result - auto pr = std::make_pair(pthread_self(), - begin.time_since_epoch().count()); - std::hash hash_fn; - uint64_t h = folly::hash::twang_mix64(hash_fn(pr)); - - // multiplying the duration by a floating point doesn't work, grr.. - auto extraFrac = - timeoutVariationFrac / std::numeric_limits::max() * h; - uint64_t tics = idleTimeout.count() * (1 + extraFrac); - idleTimeout = typename Clock::duration(tics); - } - while (true) { - auto rv = fut.futexWaitUntil(expected, begin + idleTimeout, waitMask); + auto rv = fut.futexWaitUntil( + expected, Clock::now() + idleTimeout, waitMask); if (rv == FutexResult::TIMEDOUT) { // timeout is over break; diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp index f0a1b492..db8ffdca 100644 --- a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp +++ b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp @@ -20,8 +20,49 @@ #include #include +#include + namespace folly { namespace wangle { +using folly::detail::MemoryIdler; + +/* Class that will free jemalloc caches and madvise the stack away + * if the event loop is unused for some period of time + */ +class MemoryIdlerTimeout + : public AsyncTimeout , public EventBase::LoopCallback { + public: + explicit MemoryIdlerTimeout(EventBase* b) : AsyncTimeout(b), base_(b) {} + + virtual void timeoutExpired() noexcept { + idled = true; + } + + virtual void runLoopCallback() noexcept { + if (idled) { + MemoryIdler::flushLocalMallocCaches(); + MemoryIdler::unmapUnusedStack(MemoryIdler::kDefaultStackToRetain); + + idled = false; + } else { + std::chrono::steady_clock::duration idleTimeout = + MemoryIdler::defaultIdleTimeout.load( + std::memory_order_acquire); + + idleTimeout = MemoryIdler::getVariationTimeout(idleTimeout); + + scheduleTimeout(std::chrono::duration_cast( + idleTimeout).count()); + } + + // reschedule this callback for the next event loop. + base_->runBeforeLoop(this); + } + private: + EventBase* base_; + bool idled{false}; +} ; + IOThreadPoolExecutor::IOThreadPoolExecutor( size_t numThreads, std::shared_ptr threadFactory) @@ -73,6 +114,10 @@ void IOThreadPoolExecutor::threadRun(ThreadPtr thread) { const auto ioThread = std::static_pointer_cast(thread); ioThread->eventBase = folly::EventBaseManager::get()->getEventBase(); + + auto idler = new MemoryIdlerTimeout(ioThread->eventBase); + ioThread->eventBase->runBeforeLoop(idler); + thread->startupBaton.post(); while (ioThread->shouldRun) { ioThread->eventBase->loopForever(); diff --git a/folly/io/async/EventBase.cpp b/folly/io/async/EventBase.cpp index 17cdd5c1..aa79aba3 100644 --- a/folly/io/async/EventBase.cpp +++ b/folly/io/async/EventBase.cpp @@ -185,7 +185,7 @@ EventBase::~EventBase() { callback->runLoopCallback(); } - // Delete any unfired CobTimeout objects, so that we don't leak memory + // Delete any unfired callback objects, so that we don't leak memory // (Note that we don't fire them. The caller is responsible for cleaning up // its own data structures if it destroys the EventBase with unfired events // remaining.) @@ -194,6 +194,10 @@ EventBase::~EventBase() { delete timeout; } + while (!noWaitLoopCallbacks_.empty()) { + delete &noWaitLoopCallbacks_.front(); + } + (void) runLoopCallbacks(false); // Stop consumer before deleting NotificationQueue @@ -274,10 +278,20 @@ bool EventBase::loopBody(int flags) { // nobody can add loop callbacks from within this thread if // we don't have to handle anything to start with... if (blocking && loopCallbacks_.empty()) { + LoopCallbackList callbacks; + callbacks.swap(noWaitLoopCallbacks_); + + while(!callbacks.empty()) { + auto* item = &callbacks.front(); + callbacks.pop_front(); + item->runLoopCallback(); + } + res = event_base_loop(evb_, EVLOOP_ONCE); } else { res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK); } + ranLoopCallbacks = runLoopCallbacks(); int64_t busy = std::chrono::duration_cast( @@ -458,6 +472,12 @@ void EventBase::runOnDestruction(LoopCallback* callback) { onDestructionCallbacks_.push_back(*callback); } +void EventBase::runBeforeLoop(LoopCallback* callback) { + DCHECK(isInEventBaseThread()); + callback->cancelLoopCallback(); + noWaitLoopCallbacks_.push_back(*callback); +} + bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) { // Send the message. // It will be received by the FunctionRunner in the EventBase's thread. diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h index 4930478f..98d5769c 100644 --- a/folly/io/async/EventBase.h +++ b/folly/io/async/EventBase.h @@ -259,6 +259,8 @@ class EventBase : private boost::noncopyable, public TimeoutManager { */ void runOnDestruction(LoopCallback* callback); + void runBeforeLoop(LoopCallback* callback); + /** * Run the specified function in the EventBase's thread. * @@ -519,6 +521,7 @@ class EventBase : private boost::noncopyable, public TimeoutManager { CobTimeout::List pendingCobTimeouts_; LoopCallbackList loopCallbacks_; + LoopCallbackList noWaitLoopCallbacks_; LoopCallbackList onDestructionCallbacks_; // This will be null most of the time, but point to currentCallbacks -- 2.34.1