stats for ThreadPoolExecutor
authorJames Sedgwick <jsedgwick@fb.com>
Tue, 23 Sep 2014 13:08:00 +0000 (06:08 -0700)
committerAnton Likhtarov <alikhtarov@fb.com>
Fri, 26 Sep 2014 22:21:47 +0000 (15:21 -0700)
Summary:
pool-wide stats via a call on the pool, and per-task stats (e.g. to be funneled into a histogram) via an rx subscription
rx needs a little work before this diff is safe - e.g. synchronization around the subscriber list, and perhaps exposing whether there are any subscribers so we can skip stat tracking if no one is listening
won't commit this without moving rx into folly/experimental of course

the idea is that timeout/expiration notifications can also go through the same subscription channel

haven't run the benchmarks yet and have to leave for the evening but tmrw i'll commit changes to the benchmark and get this stuff into windtunnel so i don't have to do so much manual output inspection on future diffs

Test Plan: added unit

Reviewed By: davejwatson@fb.com

Subscribers: fugalh, njormrod, bmatheny

FB internal diff: D1558424

Tasks: 50023925002425

folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp
folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h
folly/experimental/wangle/concurrent/Executor.h
folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp
folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h
folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp
folly/experimental/wangle/concurrent/ThreadPoolExecutor.h
folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp
folly/experimental/wangle/rx/types.h

index ba21ed6a3670f23595dcff63568b07e23446e468..ca88e5804ad6130c437b8c9cae71c887ca71b9ba 100644 (file)
@@ -22,7 +22,7 @@ const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 18;
 
 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
     size_t numThreads,
-    std::unique_ptr<BlockingQueue<Task>> taskQueue,
+    std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
     std::unique_ptr<ThreadFactory> threadFactory)
     : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
       taskQueue_(std::move(taskQueue)) {
@@ -37,29 +37,19 @@ CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
 
 void CPUThreadPoolExecutor::add(Func func) {
   // TODO handle enqueue failure, here and in other add() callsites
-  taskQueue_->add(Task(std::move(func)));
+  taskQueue_->add(CPUTask(std::move(func)));
 }
 
 void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
   while (1) {
     // TODO expiration / codel
-    auto t = taskQueue_->take();
-    if (UNLIKELY(t.poison)) {
+    auto task = taskQueue_->take();
+    if (UNLIKELY(task.poison)) {
       CHECK(threadsToStop_-- > 0);
       stoppedThreads_.add(thread);
       return;
     } else {
-      thread->idle = false;
-      try {
-        t.func();
-      } catch (const std::exception& e) {
-        LOG(ERROR) << "CPUThreadPoolExecutor: func threw unhandled " <<
-                      typeid(e).name() << " exception: " << e.what();
-      } catch (...) {
-        LOG(ERROR) << "CPUThreadPoolExecutor: func threw unhandled "
-                      "non-exception object";
-      }
-      thread->idle = true;
+      runTask(thread, std::move(task));
     }
 
     if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
@@ -77,8 +67,12 @@ void CPUThreadPoolExecutor::stopThreads(size_t n) {
   CHECK(stoppedThreads_.size() == 0);
   threadsToStop_ = n;
   for (int i = 0; i < n; i++) {
-    taskQueue_->add(Task());
+    taskQueue_->add(CPUTask());
   }
 }
 
+uint64_t CPUThreadPoolExecutor::getPendingTaskCount() {
+  return taskQueue_->size();
+}
+
 }} // folly::wangle
index 575e23c60d847cdf7e8dd14c450ac9825cdf2f18..7811c6783f81c2b1a660c8cdf018f6a6a4ca9618 100644 (file)
@@ -21,13 +21,12 @@ namespace folly { namespace wangle {
 
 class CPUThreadPoolExecutor : public ThreadPoolExecutor {
  public:
-  struct Task;
+  struct CPUTask;
 
-  // TODO thread naming, perhaps a required input to ThreadFactories
   explicit CPUThreadPoolExecutor(
       size_t numThreads,
-      std::unique_ptr<BlockingQueue<Task>> taskQueue =
-          folly::make_unique<LifoSemMPMCQueue<Task>>(
+      std::unique_ptr<BlockingQueue<CPUTask>> taskQueue =
+          folly::make_unique<LifoSemMPMCQueue<CPUTask>>(
               CPUThreadPoolExecutor::kDefaultMaxQueueSize),
       std::unique_ptr<ThreadFactory> threadFactory =
           folly::make_unique<NamedThreadFactory>("CPUThreadPool"));
@@ -36,15 +35,14 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
 
   void add(Func func) override;
 
-  struct Task {
-    explicit Task(Func&& taskArg) : func(std::move(taskArg)), poison(false) {}
-    Task() : func(nullptr), poison(true) {}
-    Task(Task&& o) noexcept : func(std::move(o.func)), poison(o.poison) {}
-    Task(const Task&) = default;
-    Task& operator=(const Task&) = default;
-    Func func;
+  struct CPUTask : public ThreadPoolExecutor::Task {
+    // Must be noexcept move constructible so it can be used in MPMCQueue
+    explicit CPUTask(Func&& f) : Task(std::move(f)), poison(false) {}
+    CPUTask() : Task(nullptr), poison(true) {}
+    CPUTask(CPUTask&& o) noexcept : Task(std::move(o)), poison(o.poison) {}
+    CPUTask(const CPUTask&) = default;
+    CPUTask& operator=(const CPUTask&) = default;
     bool poison;
-    // TODO per-task stats, timeouts, expirations
   };
 
   static const size_t kDefaultMaxQueueSize;
@@ -52,8 +50,9 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
  private:
   void threadRun(ThreadPtr thread) override;
   void stopThreads(size_t n) override;
+  uint64_t getPendingTaskCount() override;
 
-  std::unique_ptr<BlockingQueue<Task>> taskQueue_;
+  std::unique_ptr<BlockingQueue<CPUTask>> taskQueue_;
   std::atomic<ssize_t> threadsToStop_{0};
 };
 
index 49db177e5753b49b5cab4516e1c2bf8e2fce00c2..2687ee6bc86cec91fdd0ed7b4dbaeaa00ff44b64 100644 (file)
@@ -22,10 +22,14 @@ namespace folly { namespace wangle {
 
 typedef std::function<void()> Func;
 
+namespace experimental { // TODO(jsedgwick) merge with folly/wangle/Executor.h
+
 class Executor {
  public:
   virtual ~Executor() {};
   virtual void add(Func func) = 0;
 };
 
+}
+
 }} // folly::wangle
index cdc36ef43c89934ce7928b5f08f089458efa206e..6e106f92836071c0755a534984bc70f83631b627 100644 (file)
@@ -42,15 +42,15 @@ void IOThreadPoolExecutor::add(Func func) {
   auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
   auto ioThread = std::static_pointer_cast<IOThread>(thread);
 
-  auto moveFunc = folly::makeMoveWrapper(std::move(func));
-  auto wrappedFunc = [moveFunc, ioThread] () {
-    (*moveFunc)();
-    ioThread->outstandingTasks--;
+  auto moveTask = folly::makeMoveWrapper(Task(std::move(func)));
+  auto wrappedFunc = [this, ioThread, moveTask] () mutable {
+    runTask(ioThread, std::move(*moveTask));
+    ioThread->pendingTasks--;
   };
 
-  ioThread->outstandingTasks++;
+  ioThread->pendingTasks++;
   if (!ioThread->eventBase.runInEventBaseThread(std::move(wrappedFunc))) {
-    ioThread->outstandingTasks--;
+    ioThread->pendingTasks--;
     throw std::runtime_error("Unable to run func in event base thread");
   }
 }
@@ -66,13 +66,14 @@ void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
     ioThread->eventBase.loopForever();
   }
   if (isJoin_) {
-    while (ioThread->outstandingTasks > 0) {
+    while (ioThread->pendingTasks > 0) {
       ioThread->eventBase.loopOnce();
     }
   }
   stoppedThreads_.add(ioThread);
 }
 
+// threadListLock_ is writelocked
 void IOThreadPoolExecutor::stopThreads(size_t n) {
   for (int i = 0; i < n; i++) {
     const auto ioThread = std::static_pointer_cast<IOThread>(
@@ -82,4 +83,18 @@ void IOThreadPoolExecutor::stopThreads(size_t n) {
   }
 }
 
+// threadListLock_ is readlocked
+uint64_t IOThreadPoolExecutor::getPendingTaskCount() {
+  uint64_t count = 0;
+  for (const auto& thread : threadList_.get()) {
+    auto ioThread = std::static_pointer_cast<IOThread>(thread);
+    size_t pendingTasks = ioThread->pendingTasks;
+    if (pendingTasks > 0 && !ioThread->idle) {
+      pendingTasks--;
+    }
+    count += pendingTasks;
+  }
+  return count;
+}
+
 }} // folly::wangle
index 2b498bdc5d13ea77908f4c9b6ab8fccec7735110..c42da7198f56a5761690fdc4096ba936957dfd0a 100644 (file)
@@ -35,11 +35,12 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor {
   ThreadPtr makeThread() override;
   void threadRun(ThreadPtr thread) override;
   void stopThreads(size_t n) override;
+  uint64_t getPendingTaskCount() override;
 
   struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread {
-    IOThread() : shouldRun(true), outstandingTasks(0) {};
+    IOThread() : shouldRun(true), pendingTasks(0) {};
     std::atomic<bool> shouldRun;
-    std::atomic<size_t> outstandingTasks;
+    std::atomic<size_t> pendingTasks;
     EventBase eventBase;
   };
 
index 4d249b04e1b28b7d9e7d48e9e7618393e3cc7341..30e46f5c2c6326cdd14c2b036481611881cb59d4 100644 (file)
@@ -27,6 +27,25 @@ ThreadPoolExecutor::~ThreadPoolExecutor() {
   CHECK(threadList_.get().size() == 0);
 }
 
+void ThreadPoolExecutor::runTask(
+    const ThreadPtr& thread,
+    Task&& task) {
+  thread->idle = false;
+  task.started();
+  try {
+    task.func();
+  } catch (const std::exception& e) {
+    LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled " <<
+                  typeid(e).name() << " exception: " << e.what();
+  } catch (...) {
+    LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled non-exception "
+                  "object";
+  }
+  task.completed();
+  taskStatsSubject_.onNext(std::move(task.stats));
+  thread->idle = true;
+}
+
 size_t ThreadPoolExecutor::numThreads() {
   RWSpinLock::ReadHolder{&threadListLock_};
   return threadList_.get().size();
@@ -43,6 +62,7 @@ void ThreadPoolExecutor::setNumThreads(size_t n) {
   CHECK(threadList_.get().size() == n);
 }
 
+// threadListLock_ is writelocked
 void ThreadPoolExecutor::addThreads(size_t n) {
   for (int i = 0; i < n; i++) {
     auto thread = makeThread();
@@ -54,6 +74,7 @@ void ThreadPoolExecutor::addThreads(size_t n) {
   }
 }
 
+// threadListLock_ is writelocked
 void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) {
   CHECK(n <= threadList_.get().size());
   CHECK(stoppedThreads_.size() == 0);
@@ -79,6 +100,22 @@ void ThreadPoolExecutor::join() {
   CHECK(threadList_.get().size() == 0);
 }
 
+ThreadPoolExecutor::PoolStats ThreadPoolExecutor::getPoolStats() {
+  RWSpinLock::ReadHolder{&threadListLock_};
+  ThreadPoolExecutor::PoolStats stats;
+  stats.threadCount = threadList_.get().size();
+  for (auto thread : threadList_.get()) {
+    if (thread->idle) {
+      stats.idleThreadCount++;
+    } else {
+      stats.activeThreadCount++;
+    }
+  }
+  stats.pendingTaskCount = getPendingTaskCount();
+  stats.totalTaskCount = stats.pendingTaskCount + stats.activeThreadCount;
+  return stats;
+}
+
 std::atomic<uint64_t> ThreadPoolExecutor::Thread::nextId(0);
 
 void ThreadPoolExecutor::StoppedThreadQueue::add(
index f802af14905df2af84cbc967c0c29ab5085e2908..4eda2d36e3039fd703aeb254295c82b6629df015 100644 (file)
@@ -18,6 +18,7 @@
 #include <folly/experimental/wangle/concurrent/Executor.h>
 #include <folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h>
 #include <folly/experimental/wangle/concurrent/NamedThreadFactory.h>
+#include <folly/experimental/wangle/rx/Observable.h>
 #include <folly/Memory.h>
 #include <folly/RWSpinLock.h>
 
@@ -29,7 +30,7 @@
 
 namespace folly { namespace wangle {
 
-class ThreadPoolExecutor : public Executor {
+class ThreadPoolExecutor : public experimental::Executor {
  public:
   explicit ThreadPoolExecutor(
       size_t numThreads,
@@ -41,10 +42,32 @@ class ThreadPoolExecutor : public Executor {
   void setNumThreads(size_t numThreads);
   void stop();
   void join();
-  // TODO expose stats
+
+  struct PoolStats {
+    PoolStats() : threadCount(0), idleThreadCount(0), activeThreadCount(0),
+                  pendingTaskCount(0), totalTaskCount(0) {}
+    size_t threadCount, idleThreadCount, activeThreadCount;
+    uint64_t pendingTaskCount, totalTaskCount;
+  };
+
+  PoolStats getPoolStats();
+
+  struct TaskStats {
+    TaskStats() : expired(false), waitTime(0), runTime(0) {}
+    bool expired;
+    std::chrono::microseconds waitTime;
+    std::chrono::microseconds runTime;
+  };
+
+  Subscription subscribeToTaskStats(
+      const ObserverPtr<TaskStats>& observer) {
+    return taskStatsSubject_.subscribe(observer);
+  }
 
  protected:
+  // Prerequisite: threadListLock_ writelocked
   void addThreads(size_t n);
+  // Prerequisite: threadListLock_ writelocked
   void removeThreads(size_t n, bool isJoin);
 
   struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread {
@@ -54,20 +77,50 @@ class ThreadPoolExecutor : public Executor {
     uint64_t id;
     std::thread handle;
     bool idle;
-    // TODO per-thread stats go here
   };
 
   typedef std::shared_ptr<Thread> ThreadPtr;
 
+  struct Task {
+    explicit Task(Func&& f) : func(std::move(f)) {
+      // Assume that the task in enqueued on creation
+      intervalBegin = std::chrono::steady_clock::now();
+    }
+
+    Func func;
+    TaskStats stats;
+    // TODO per-task timeouts, expirations
+
+    void started() {
+      auto now = std::chrono::steady_clock::now();
+      stats.waitTime = std::chrono::duration_cast<std::chrono::microseconds>(
+          now - intervalBegin);
+      intervalBegin = now;
+    }
+    void completed() {
+      stats.runTime = std::chrono::duration_cast<std::chrono::microseconds>(
+         std::chrono::steady_clock::now() - intervalBegin);
+    }
+
+    std::chrono::steady_clock::time_point intervalBegin;
+  };
+
+  void runTask(const ThreadPtr& thread, Task&& task);
+
   // The function that will be bound to pool threads
   virtual void threadRun(ThreadPtr thread) = 0;
-  // Stop n threads and put their Thread structs in the threadsStopped_ queue
+
+  // Stop n threads and put their ThreadPtrs in the threadsStopped_ queue
+  // Prerequisite: threadListLock_ writelocked
   virtual void stopThreads(size_t n) = 0;
+
   // Create a suitable Thread struct
   virtual ThreadPtr makeThread() {
     return std::make_shared<Thread>();
   }
-  // need a stopThread(id) for keepalive feature
+
+  // Prerequisite: threadListLock_ readlocked
+  virtual uint64_t getPendingTaskCount() = 0;
 
   class ThreadList {
    public:
@@ -112,6 +165,8 @@ class ThreadPoolExecutor : public Executor {
   RWSpinLock threadListLock_;
   StoppedThreadQueue stoppedThreads_;
   std::atomic<bool> isJoin_; // whether the current downsizing is a join
+
+  Subject<TaskStats> taskStatsSubject_;
 };
 
 }} // folly::wangle
index fe7b8dcdbec692ae4a611e9ba4687080a0878fec..eb8527ca0ab5637758b88cb1493fac88830e3374 100644 (file)
 
 using namespace folly::wangle;
 
+static Func burnMs(uint64_t ms) {
+  return [ms]() { std::this_thread::sleep_for(std::chrono::milliseconds(ms)); };
+}
+
 template <class TPE>
 static void basic() {
   // Create and destroy
@@ -59,7 +63,7 @@ static void stop() {
   TPE tpe(10);
   std::atomic<int> completed(0);
   auto f = [&](){
-    std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    burnMs(1)();
     completed++;
   };
   for (int i = 0; i < 1000; i++) {
@@ -82,7 +86,7 @@ static void join() {
   TPE tpe(10);
   std::atomic<int> completed(0);
   auto f = [&](){
-    std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    burnMs(1)();
     completed++;
   };
   for (int i = 0; i < 1000; i++) {
@@ -105,7 +109,7 @@ static void resizeUnderLoad() {
   TPE tpe(10);
   std::atomic<int> completed(0);
   auto f = [&](){
-    std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    burnMs(1)();
     completed++;
   };
   for (int i = 0; i < 1000; i++) {
@@ -124,3 +128,75 @@ TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
   resizeUnderLoad<IOThreadPoolExecutor>();
 }
+
+template <class TPE>
+static void poolStats() {
+  {
+    TPE tpe(10);
+    for (int i = 0; i < 20; i++) {
+      tpe.add(burnMs(20));
+    }
+    burnMs(10)();
+    auto stats = tpe.getPoolStats();
+    EXPECT_EQ(10, stats.threadCount);
+    EXPECT_EQ(0, stats.idleThreadCount);
+    EXPECT_EQ(10, stats.activeThreadCount);
+    EXPECT_EQ(10, stats.pendingTaskCount);
+    EXPECT_EQ(20, stats.totalTaskCount);
+  }
+
+  {
+    TPE tpe(10);
+    for (int i = 0; i < 5; i++) {
+      tpe.add(burnMs(20));
+    }
+    burnMs(10)();
+    auto stats = tpe.getPoolStats();
+    EXPECT_EQ(10, stats.threadCount);
+    EXPECT_EQ(5, stats.idleThreadCount);
+    EXPECT_EQ(5, stats.activeThreadCount);
+    EXPECT_EQ(0, stats.pendingTaskCount);
+    EXPECT_EQ(5, stats.totalTaskCount);
+  }
+}
+
+TEST(ThreadPoolExecutorTest, CPUPoolStats) {
+  poolStats<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOPoolStats) {
+  poolStats<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void taskStats() {
+  TPE tpe(10);
+  std::atomic<int> c(0);
+  tpe.subscribeToTaskStats(Observer<ThreadPoolExecutor::TaskStats>::create(
+      [&] (ThreadPoolExecutor::TaskStats stats) {
+        int i = c++;
+        if (i < 10) {
+          EXPECT_GE(10000, stats.waitTime.count());
+          EXPECT_LE(20000, stats.runTime.count());
+        } else {
+          EXPECT_LE(10000, stats.waitTime.count());
+          EXPECT_LE(10000, stats.runTime.count());
+        }
+      }));
+  for (int i = 0; i < 10; i++) {
+    tpe.add(burnMs(20));
+  }
+  for (int i = 0; i < 10; i++) {
+    tpe.add(burnMs(10));
+  }
+  tpe.join();
+  EXPECT_EQ(20, c);
+}
+
+TEST(ThreadPoolExecutorTest, CPUTaskStats) {
+  taskStats<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOTaskStats) {
+  taskStats<IOThreadPoolExecutor>();
+}
index 54dd00993f4d45c0bd67ee7b295dbf1e48acf830..317fac147f709bcd19cb7285a5a345a4f93b50e0 100644 (file)
@@ -17,6 +17,7 @@
 #pragma once
 
 #include <folly/ExceptionWrapper.h>
+#include <folly/wangle/Executor.h>
 
 namespace folly { namespace wangle {
   typedef folly::exception_wrapper Error;