Add MemoryIdler suppot to IOThreadPoolExecutor
authorDave Watson <davejwatson@fb.com>
Mon, 27 Oct 2014 17:11:03 +0000 (10:11 -0700)
committerdcsommer <dcsommer@fb.com>
Wed, 29 Oct 2014 23:06:51 +0000 (16:06 -0700)
Summary:
Idle memory in IO threads.   If loop is unused for a period of time, free associated memory, and call epoll again.

Had to add a new list of callbacks that don't make the loop nonblocking (i.e. using runInLoop() instead would use the nonblocking version of epoll).

Could bake this in to EventBase directly, but that seems like the wrong abstraction, since EventBase doesn't actually control the thread - for example, that approach would also free up memory for stack-allocated EventBases where they are used synchronously by clients.

This diff doesn't change IO scheduling at all - current IO work is round robin, so this probably only helps if the whole server is idle (at least until we add smarter scheduling)

Test Plan:
Based on top of D1585087.

fbconfig thrift/perf/cpp; fbmake dbg
_bin/thrift/perf/cpp/ThriftServer
_bin/thrift/perf/cpp/loadgen -num_threads=100 -weight_sendrecv=1 -cpp2 -async

Ran loadgen for a while, watched res memory in top.  Stopped loadgen.  After ~5 sec, res memory was much reduced.

Reviewed By: jsedgwick@fb.com

Subscribers: trunkagent, doug, fugalh, njormrod, folly-diffs@

FB internal diff: D1641057

Tasks: 5002425

folly/detail/MemoryIdler.h
folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp
folly/io/async/EventBase.cpp
folly/io/async/EventBase.h

index 28d3c196420d41b023de8bf833ff92c85523be6a..cbb846128e866d549627d34a62610e95784f194f 100644 (file)
@@ -74,6 +74,31 @@ struct MemoryIdler {
   /// avoid synchronizing their flushes.
   static AtomicStruct<std::chrono::steady_clock::duration> defaultIdleTimeout;
 
+  /// Selects a timeout pseudo-randomly chosen to be between
+  /// idleTimeout and idleTimeout * (1 + timeoutVariationFraction), to
+  /// smooth out the behavior in a bursty system
+  template <typename Clock = std::chrono::steady_clock>
+  static typename Clock::duration getVariationTimeout(
+      typename Clock::duration idleTimeout
+          = defaultIdleTimeout.load(std::memory_order_acquire),
+      float timeoutVariationFrac = 0.5) {
+    if (idleTimeout.count() > 0 && timeoutVariationFrac > 0) {
+      // hash the pthread_t and the time to get the adjustment.
+      // Standard hash func isn't very good, so bit mix the result
+      auto pr = std::make_pair(pthread_self(),
+                               Clock::now().time_since_epoch().count());
+      std::hash<decltype(pr)> hash_fn;
+      uint64_t h = folly::hash::twang_mix64(hash_fn(pr));
+
+      // multiplying the duration by a floating point doesn't work, grr..
+      auto extraFrac =
+        timeoutVariationFrac / std::numeric_limits<uint64_t>::max() * h;
+      uint64_t tics = idleTimeout.count() * (1 + extraFrac);
+      idleTimeout = typename Clock::duration(tics);
+    }
+
+    return idleTimeout;
+  }
 
   /// Equivalent to fut.futexWait(expected, waitMask), but calls
   /// flushLocalMallocCaches() and unmapUnusedStack(stackToRetain)
@@ -100,26 +125,11 @@ struct MemoryIdler {
       return fut.futexWait(expected, waitMask);
     }
 
+    idleTimeout = getVariationTimeout(idleTimeout, timeoutVariationFrac);
     if (idleTimeout.count() > 0) {
-      auto begin = Clock::now();
-
-      if (timeoutVariationFrac > 0) {
-        // hash the pthread_t and the time to get the adjustment.
-        // Standard hash func isn't very good, so bit mix the result
-        auto pr = std::make_pair(pthread_self(),
-                                 begin.time_since_epoch().count());
-        std::hash<decltype(pr)> hash_fn;
-        uint64_t h = folly::hash::twang_mix64(hash_fn(pr));
-
-        // multiplying the duration by a floating point doesn't work, grr..
-        auto extraFrac =
-            timeoutVariationFrac / std::numeric_limits<uint64_t>::max() * h;
-        uint64_t tics = idleTimeout.count() * (1 + extraFrac);
-        idleTimeout = typename Clock::duration(tics);
-      }
-
       while (true) {
-        auto rv = fut.futexWaitUntil(expected, begin + idleTimeout, waitMask);
+        auto rv = fut.futexWaitUntil(
+          expected, Clock::now() + idleTimeout, waitMask);
         if (rv == FutexResult::TIMEDOUT) {
           // timeout is over
           break;
index f0a1b4924f0bb01ceb27421fdd74eae26c852658..db8ffdca8402ac37da9e5d07eb30723f7f589dc8 100644 (file)
 #include <glog/logging.h>
 #include <folly/io/async/EventBaseManager.h>
 
+#include <folly/detail/MemoryIdler.h>
+
 namespace folly { namespace wangle {
 
+using folly::detail::MemoryIdler;
+
+/* Class that will free jemalloc caches and madvise the stack away
+ * if the event loop is unused for some period of time
+ */
+class MemoryIdlerTimeout
+    : public AsyncTimeout , public EventBase::LoopCallback {
+ public:
+  explicit MemoryIdlerTimeout(EventBase* b) : AsyncTimeout(b), base_(b) {}
+
+  virtual void timeoutExpired() noexcept {
+    idled = true;
+  }
+
+  virtual void runLoopCallback() noexcept {
+    if (idled) {
+      MemoryIdler::flushLocalMallocCaches();
+      MemoryIdler::unmapUnusedStack(MemoryIdler::kDefaultStackToRetain);
+
+      idled = false;
+    } else {
+      std::chrono::steady_clock::duration idleTimeout =
+        MemoryIdler::defaultIdleTimeout.load(
+          std::memory_order_acquire);
+
+      idleTimeout = MemoryIdler::getVariationTimeout(idleTimeout);
+
+      scheduleTimeout(std::chrono::duration_cast<std::chrono::milliseconds>(
+                        idleTimeout).count());
+    }
+
+    // reschedule this callback for the next event loop.
+    base_->runBeforeLoop(this);
+  }
+ private:
+  EventBase* base_;
+  bool idled{false};
+} ;
+
 IOThreadPoolExecutor::IOThreadPoolExecutor(
     size_t numThreads,
     std::shared_ptr<ThreadFactory> threadFactory)
@@ -73,6 +114,10 @@ void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
   const auto ioThread = std::static_pointer_cast<IOThread>(thread);
   ioThread->eventBase =
     folly::EventBaseManager::get()->getEventBase();
+
+  auto idler = new MemoryIdlerTimeout(ioThread->eventBase);
+  ioThread->eventBase->runBeforeLoop(idler);
+
   thread->startupBaton.post();
   while (ioThread->shouldRun) {
     ioThread->eventBase->loopForever();
index 17cdd5c181b3862829556e2b987bbdaabcc95b6f..aa79aba3d9c57924f0e1eaf8a7097603106e8213 100644 (file)
@@ -185,7 +185,7 @@ EventBase::~EventBase() {
     callback->runLoopCallback();
   }
 
-  // Delete any unfired CobTimeout objects, so that we don't leak memory
+  // Delete any unfired callback objects, so that we don't leak memory
   // (Note that we don't fire them.  The caller is responsible for cleaning up
   // its own data structures if it destroys the EventBase with unfired events
   // remaining.)
@@ -194,6 +194,10 @@ EventBase::~EventBase() {
     delete timeout;
   }
 
+  while (!noWaitLoopCallbacks_.empty()) {
+    delete &noWaitLoopCallbacks_.front();
+  }
+
   (void) runLoopCallbacks(false);
 
   // Stop consumer before deleting NotificationQueue
@@ -274,10 +278,20 @@ bool EventBase::loopBody(int flags) {
     // nobody can add loop callbacks from within this thread if
     // we don't have to handle anything to start with...
     if (blocking && loopCallbacks_.empty()) {
+      LoopCallbackList callbacks;
+      callbacks.swap(noWaitLoopCallbacks_);
+
+      while(!callbacks.empty()) {
+        auto* item = &callbacks.front();
+        callbacks.pop_front();
+        item->runLoopCallback();
+      }
+
       res = event_base_loop(evb_, EVLOOP_ONCE);
     } else {
       res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
     }
+
     ranLoopCallbacks = runLoopCallbacks();
 
     int64_t busy = std::chrono::duration_cast<std::chrono::microseconds>(
@@ -458,6 +472,12 @@ void EventBase::runOnDestruction(LoopCallback* callback) {
   onDestructionCallbacks_.push_back(*callback);
 }
 
+void EventBase::runBeforeLoop(LoopCallback* callback) {
+  DCHECK(isInEventBaseThread());
+  callback->cancelLoopCallback();
+  noWaitLoopCallbacks_.push_back(*callback);
+}
+
 bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) {
   // Send the message.
   // It will be received by the FunctionRunner in the EventBase's thread.
index 4930478f0aec113ee7aa09a601a11229715e78c0..98d5769c7a7979836f2474af36000dab50c3d461 100644 (file)
@@ -259,6 +259,8 @@ class EventBase : private boost::noncopyable, public TimeoutManager {
    */
   void runOnDestruction(LoopCallback* callback);
 
+  void runBeforeLoop(LoopCallback* callback);
+
   /**
    * Run the specified function in the EventBase's thread.
    *
@@ -519,6 +521,7 @@ class EventBase : private boost::noncopyable, public TimeoutManager {
   CobTimeout::List pendingCobTimeouts_;
 
   LoopCallbackList loopCallbacks_;
+  LoopCallbackList noWaitLoopCallbacks_;
   LoopCallbackList onDestructionCallbacks_;
 
   // This will be null most of the time, but point to currentCallbacks