2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/wangle/acceptor/Acceptor.h>
19 #include <folly/io/async/EventBaseManager.h>
20 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
21 #include <folly/wangle/acceptor/ManagedConnection.h>
22 #include <folly/wangle/channel/ChannelPipeline.h>
26 template <typename Pipeline>
27 class ServerAcceptor : public Acceptor {
28 typedef std::unique_ptr<Pipeline,
29 folly::DelayedDestruction::Destructor> PipelinePtr;
31 class ServerConnection : public wangle::ManagedConnection {
33 explicit ServerConnection(PipelinePtr pipeline)
34 : pipeline_(std::move(pipeline)) {}
39 void timeoutExpired() noexcept {
42 void describe(std::ostream& os) const {}
46 void notifyPendingShutdown() {}
47 void closeWhenIdle() {}
48 void dropConnection() {}
49 void dumpConnectionState(uint8_t loglevel) {}
51 PipelinePtr pipeline_;
55 explicit ServerAcceptor(
56 std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
58 : Acceptor(ServerSocketConfig())
59 , pipelineFactory_(pipelineFactory) {
60 Acceptor::init(nullptr, base);
63 /* See Acceptor::onNewConnection for details */
65 AsyncSocket::UniquePtr transport, const SocketAddress* address,
66 const std::string& nextProtocolName, const TransportInfo& tinfo) {
68 std::unique_ptr<Pipeline,
69 folly::DelayedDestruction::Destructor>
70 pipeline(pipelineFactory_->newPipeline(
71 std::shared_ptr<AsyncSocket>(
73 folly::DelayedDestruction::Destructor())));
74 auto connection = new ServerConnection(std::move(pipeline));
75 Acceptor::addConnection(connection);
79 std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
82 template <typename Pipeline>
83 class ServerAcceptorFactory : public AcceptorFactory {
85 explicit ServerAcceptorFactory(
86 std::shared_ptr<PipelineFactory<Pipeline>> factory)
87 : factory_(factory) {}
89 std::shared_ptr<Acceptor> newAcceptor(folly::EventBase* base) {
90 return std::make_shared<ServerAcceptor<Pipeline>>(factory_, base);
93 std::shared_ptr<PipelineFactory<Pipeline>> factory_;
96 class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
98 explicit ServerWorkerPool(
99 std::shared_ptr<AcceptorFactory> acceptorFactory,
100 folly::wangle::IOThreadPoolExecutor* exec,
101 std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets)
102 : acceptorFactory_(acceptorFactory)
104 , sockets_(sockets) {
108 template <typename F>
109 void forEachWorker(F&& f) const;
112 folly::wangle::ThreadPoolExecutor::ThreadHandle*);
114 folly::wangle::ThreadPoolExecutor::ThreadHandle*);
115 void threadPreviouslyStarted(
116 folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
117 threadStarted(thread);
119 void threadNotYetStopped(
120 folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
121 threadStopped(thread);
125 std::map<folly::wangle::ThreadPoolExecutor::ThreadHandle*,
126 std::shared_ptr<Acceptor>> workers_;
127 std::shared_ptr<AcceptorFactory> acceptorFactory_;
128 folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
129 std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets_;
132 template <typename F>
133 void ServerWorkerPool::forEachWorker(F&& f) const {
134 for (const auto& kv : workers_) {