public:
explicit ServerAcceptor(
- std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory)
+ std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
+ EventBase* base)
: Acceptor(ServerSocketConfig())
, pipelineFactory_(pipelineFactory) {
- Acceptor::init(nullptr, &base_);
+ Acceptor::init(nullptr, base);
}
/* See Acceptor::onNewConnection for details */
Acceptor::addConnection(connection);
}
- ~ServerAcceptor() {
- Acceptor::dropAllConnections();
- }
-
private:
- EventBase base_;
-
std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
};
std::shared_ptr<PipelineFactory<Pipeline>> factory)
: factory_(factory) {}
- std::shared_ptr<Acceptor> newAcceptor() {
- return std::make_shared<ServerAcceptor<Pipeline>>(factory_);
+ std::shared_ptr<Acceptor> newAcceptor(folly::EventBase* base) {
+ return std::make_shared<ServerAcceptor<Pipeline>>(factory_, base);
}
private:
std::shared_ptr<PipelineFactory<Pipeline>> factory_;
};
-class ServerWorkerFactory : public folly::wangle::ThreadFactory {
+class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
public:
- explicit ServerWorkerFactory(std::shared_ptr<AcceptorFactory> acceptorFactory)
- : internalFactory_(
- std::make_shared<folly::wangle::NamedThreadFactory>("BootstrapWorker"))
- , acceptorFactory_(acceptorFactory)
- {}
- virtual std::thread newThread(folly::Func&& func) override;
-
- void setInternalFactory(
- std::shared_ptr<folly::wangle::NamedThreadFactory> internalFactory);
- void setNamePrefix(folly::StringPiece prefix);
+ explicit ServerWorkerPool(
+ std::shared_ptr<AcceptorFactory> acceptorFactory,
+ folly::wangle::IOThreadPoolExecutor* exec,
+ std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets)
+ : acceptorFactory_(acceptorFactory)
+ , exec_(exec)
+ , sockets_(sockets) {
+ CHECK(exec);
+ }
template <typename F>
- void forEachWorker(F&& f);
+ void forEachWorker(F&& f) const;
+
+ void threadStarted(
+ folly::wangle::ThreadPoolExecutor::ThreadHandle*);
+ void threadStopped(
+ folly::wangle::ThreadPoolExecutor::ThreadHandle*);
+ void threadPreviouslyStarted(
+ folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
+ threadStarted(thread);
+ }
+ void threadNotYetStopped(
+ folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
+ threadStopped(thread);
+ }
private:
- std::shared_ptr<folly::wangle::NamedThreadFactory> internalFactory_;
- folly::RWSpinLock workersLock_;
- std::map<int32_t, std::shared_ptr<Acceptor>> workers_;
- int32_t nextWorkerId_{0};
-
+ std::map<folly::wangle::ThreadPoolExecutor::ThreadHandle*,
+ std::shared_ptr<Acceptor>> workers_;
std::shared_ptr<AcceptorFactory> acceptorFactory_;
+ folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
+ std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets_;
};
template <typename F>
-void ServerWorkerFactory::forEachWorker(F&& f) {
- folly::RWSpinLock::ReadHolder guard(workersLock_);
+void ServerWorkerPool::forEachWorker(F&& f) const {
for (const auto& kv : workers_) {
f(kv.second.get());
}