Summary:
I'm not 100% sure this is the best way to go about this but I don't hate it either.
I'm going to start seeing how it might fit into tserver - my guess is that some sort Cpp2WorkerFactory which also manages those objects would get plugged in as the thread factory
Haven't fleshed out how this would relate to TEventBaseManager
Test Plan: added unit, starting to play with this in Thrift2 server
Reviewed By: davejwatson@fb.com
Subscribers: alandau, bmatheny, trunkagent, fugalh, njormrod
FB internal diff:
D1574660
}
void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
+ thread->startupBaton.post();
while (1) {
auto task = taskQueue_->take();
if (UNLIKELY(task.poison)) {
#include <folly/MoveWrapper.h>
#include <glog/logging.h>
+#include <thrift/lib/cpp/async/TEventBaseManager.h>
namespace folly { namespace wangle {
};
ioThread->pendingTasks++;
- if (!ioThread->eventBase.runInEventBaseThread(std::move(wrappedFunc))) {
+ if (!ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc))) {
ioThread->pendingTasks--;
throw std::runtime_error("Unable to run func in event base thread");
}
void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
const auto ioThread = std::static_pointer_cast<IOThread>(thread);
+ ioThread->eventBase =
+ apache::thrift::async::TEventBaseManager::get()->getEventBase();
+ thread->startupBaton.post();
while (ioThread->shouldRun) {
- ioThread->eventBase.loopForever();
+ ioThread->eventBase->loopForever();
}
if (isJoin_) {
while (ioThread->pendingTasks > 0) {
- ioThread->eventBase.loopOnce();
+ ioThread->eventBase->loopOnce();
}
}
stoppedThreads_.add(ioThread);
const auto ioThread = std::static_pointer_cast<IOThread>(
threadList_.get()[i]);
ioThread->shouldRun = false;
- ioThread->eventBase.terminateLoopSoon();
+ ioThread->eventBase->terminateLoopSoon();
}
}
IOThread() : shouldRun(true), pendingTasks(0) {};
std::atomic<bool> shouldRun;
std::atomic<size_t> pendingTasks;
- EventBase eventBase;
+ EventBase* eventBase;
};
size_t nextThread_;
// threadListLock_ is writelocked
void ThreadPoolExecutor::addThreads(size_t n) {
+ std::vector<ThreadPtr> newThreads;
for (int i = 0; i < n; i++) {
- auto thread = makeThread();
+ newThreads.push_back(makeThread());
+ }
+ for (auto& thread : newThreads) {
// TODO need a notion of failing to create the thread
// and then handling for that case
thread->handle = threadFactory_->newThread(
std::bind(&ThreadPoolExecutor::threadRun, this, thread));
threadList_.add(thread);
}
+ for (auto& thread : newThreads) {
+ thread->startupBaton.wait();
+ }
}
// threadListLock_ is writelocked
#include <folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h>
#include <folly/experimental/wangle/concurrent/NamedThreadFactory.h>
#include <folly/experimental/wangle/rx/Observable.h>
+#include <folly/Baton.h>
#include <folly/Memory.h>
#include <folly/RWSpinLock.h>
uint64_t id;
std::thread handle;
bool idle;
+ Baton<> startupBaton;
};
typedef std::shared_ptr<Thread> ThreadPtr;
void runTask(const ThreadPtr& thread, Task&& task);
- // The function that will be bound to pool threads
+ // The function that will be bound to pool threads. It must call
+ // thread->startupBaton.post() when it's ready to consume work.
virtual void threadRun(ThreadPtr thread) = 0;
// Stop n threads and put their ThreadPtrs in the threadsStopped_ queue