namespace folly {
-std::thread ServerWorkerFactory::newThread(
- folly::Func&& func) {
- auto id = nextWorkerId_++;
- auto worker = acceptorFactory_->newAcceptor();
- {
- folly::RWSpinLock::WriteHolder guard(workersLock_);
- workers_.insert({id, worker});
+void ServerWorkerPool::threadStarted(
+ folly::wangle::ThreadPoolExecutor::ThreadHandle* h) {
+ auto worker = acceptorFactory_->newAcceptor(exec_->getEventBase(h));
+ workers_.insert({h, worker});
+
+ for(auto socket : *sockets_) {
+ socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
+ socket->addAcceptCallback(worker.get(), worker->getEventBase());
+ });
}
- return internalFactory_->newThread([=](){
- EventBaseManager::get()->setEventBase(worker->getEventBase(), false);
- func();
- EventBaseManager::get()->clearEventBase();
-
- worker->drainAllConnections();
- {
- folly::RWSpinLock::WriteHolder guard(workersLock_);
- workers_.erase(id);
- }
- });
}
-void ServerWorkerFactory::setInternalFactory(
- std::shared_ptr<wangle::NamedThreadFactory> internalFactory) {
- CHECK(workers_.empty());
- internalFactory_ = internalFactory;
-}
+void ServerWorkerPool::threadStopped(
+ folly::wangle::ThreadPoolExecutor::ThreadHandle* h) {
+ auto worker = workers_.find(h);
+ CHECK(worker != workers_.end());
+
+ for (auto& socket : *sockets_) {
+ folly::Baton<> barrier;
+ socket->getEventBase()->runInEventBaseThread([&]() {
+ socket->removeAcceptCallback(worker->second.get(), nullptr);
+ barrier.post();
+ });
+ barrier.wait();
+ }
+
+ CHECK(worker->second->getEventBase() != nullptr);
+ CHECK(!worker->second->getEventBase()->isInEventBaseThread());
+ folly::Baton<> barrier;
+ worker->second->getEventBase()->runInEventBaseThread([&]() {
+ worker->second->dropAllConnections();
+ barrier.post();
+ });
-void ServerWorkerFactory::setNamePrefix(folly::StringPiece prefix) {
- CHECK(workers_.empty());
- internalFactory_->setNamePrefix(prefix);
+ barrier.wait();
+ workers_.erase(worker);
}
} // namespace