expose event base from IOThreadPoolExecutor
authorJames Sedgwick <jsedgwick@fb.com>
Wed, 1 Oct 2014 17:32:54 +0000 (10:32 -0700)
committerAndrii Grynenko <andrii@fb.com>
Wed, 15 Oct 2014 00:46:53 +0000 (17:46 -0700)
Summary:
I'm not 100% sure this is the best way to go about this but I don't hate it either.
I'm going to start seeing how it might fit into tserver - my guess is that some sort Cpp2WorkerFactory which also manages those objects would get plugged in as the thread factory
Haven't fleshed out how this would relate to TEventBaseManager

Test Plan: added unit, starting to play with this in Thrift2 server

Reviewed By: davejwatson@fb.com

Subscribers: alandau, bmatheny, trunkagent, fugalh, njormrod

FB internal diff: D1574660

folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp
folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp
folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h
folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp
folly/experimental/wangle/concurrent/ThreadPoolExecutor.h

index daac2eb90684bae1b40bf35f653e1cebc0db46ae..6d826b55ed81a287fa149304816cbad71dc5c9ff 100644 (file)
@@ -49,6 +49,7 @@ void CPUThreadPoolExecutor::add(
 }
 
 void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
+  thread->startupBaton.post();
   while (1) {
     auto task = taskQueue_->take();
     if (UNLIKELY(task.poison)) {
index 80d5ef73164f3aa41e343ba930dc0147bb6a2f38..8de3d5ab1501d1a1bdd62456991871dae798db85 100644 (file)
@@ -18,6 +18,7 @@
 
 #include <folly/MoveWrapper.h>
 #include <glog/logging.h>
+#include <thrift/lib/cpp/async/TEventBaseManager.h>
 
 namespace folly { namespace wangle {
 
@@ -57,7 +58,7 @@ void IOThreadPoolExecutor::add(
   };
 
   ioThread->pendingTasks++;
-  if (!ioThread->eventBase.runInEventBaseThread(std::move(wrappedFunc))) {
+  if (!ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc))) {
     ioThread->pendingTasks--;
     throw std::runtime_error("Unable to run func in event base thread");
   }
@@ -70,12 +71,15 @@ IOThreadPoolExecutor::makeThread() {
 
 void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
   const auto ioThread = std::static_pointer_cast<IOThread>(thread);
+  ioThread->eventBase =
+    apache::thrift::async::TEventBaseManager::get()->getEventBase();
+  thread->startupBaton.post();
   while (ioThread->shouldRun) {
-    ioThread->eventBase.loopForever();
+    ioThread->eventBase->loopForever();
   }
   if (isJoin_) {
     while (ioThread->pendingTasks > 0) {
-      ioThread->eventBase.loopOnce();
+      ioThread->eventBase->loopOnce();
     }
   }
   stoppedThreads_.add(ioThread);
@@ -87,7 +91,7 @@ void IOThreadPoolExecutor::stopThreads(size_t n) {
     const auto ioThread = std::static_pointer_cast<IOThread>(
         threadList_.get()[i]);
     ioThread->shouldRun = false;
-    ioThread->eventBase.terminateLoopSoon();
+    ioThread->eventBase->terminateLoopSoon();
   }
 }
 
index 60f9d9332b5860cc716b764dc1af310532833d63..a6bf5215ff0b140c2e4801c8098bf7723dfd1ecc 100644 (file)
@@ -45,7 +45,7 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor {
     IOThread() : shouldRun(true), pendingTasks(0) {};
     std::atomic<bool> shouldRun;
     std::atomic<size_t> pendingTasks;
-    EventBase eventBase;
+    EventBase* eventBase;
   };
 
   size_t nextThread_;
index 8b0b158dd4a2b7a84f96e1ef1eff90b35cb2c0ed..d8ddfac15e4c419e9904fa62bd88f43ee4bb808e 100644 (file)
@@ -84,14 +84,20 @@ void ThreadPoolExecutor::setNumThreads(size_t n) {
 
 // threadListLock_ is writelocked
 void ThreadPoolExecutor::addThreads(size_t n) {
+  std::vector<ThreadPtr> newThreads;
   for (int i = 0; i < n; i++) {
-    auto thread = makeThread();
+    newThreads.push_back(makeThread());
+  }
+  for (auto& thread : newThreads) {
     // TODO need a notion of failing to create the thread
     // and then handling for that case
     thread->handle = threadFactory_->newThread(
         std::bind(&ThreadPoolExecutor::threadRun, this, thread));
     threadList_.add(thread);
   }
+  for (auto& thread : newThreads) {
+    thread->startupBaton.wait();
+  }
 }
 
 // threadListLock_ is writelocked
index bf0dfda89cbfff54f19cc67f4f7a79ec041bcd98..54819ad6080ff35ce58da6dd4add7d0dd9cfd656 100644 (file)
@@ -19,6 +19,7 @@
 #include <folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h>
 #include <folly/experimental/wangle/concurrent/NamedThreadFactory.h>
 #include <folly/experimental/wangle/rx/Observable.h>
+#include <folly/Baton.h>
 #include <folly/Memory.h>
 #include <folly/RWSpinLock.h>
 
@@ -83,6 +84,7 @@ class ThreadPoolExecutor : public experimental::Executor {
     uint64_t id;
     std::thread handle;
     bool idle;
+    Baton<> startupBaton;
   };
 
   typedef std::shared_ptr<Thread> ThreadPtr;
@@ -101,7 +103,8 @@ class ThreadPoolExecutor : public experimental::Executor {
 
   void runTask(const ThreadPtr& thread, Task&& task);
 
-  // The function that will be bound to pool threads
+  // The function that will be bound to pool threads. It must call
+  // thread->startupBaton.post() when it's ready to consume work.
   virtual void threadRun(ThreadPtr thread) = 0;
 
   // Stop n threads and put their ThreadPtrs in the threadsStopped_ queue