2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
19 #include <folly/Baton.h>
20 #include <folly/wangle/channel/Pipeline.h>
24 typedef folly::wangle::Pipeline<
25 folly::IOBufQueue&, std::unique_ptr<folly::IOBuf>> DefaultPipeline;
28 * ServerBootstrap is a parent class intended to set up a
29 * high-performance TCP accepting server. It will manage a pool of
30 * accepting threads, any number of accepting sockets, a pool of
31 * IO-worker threads, and connection pool for each IO thread for you.
33 * The output is given as a Pipeline template: given a
34 * PipelineFactory, it will create a new pipeline for each connection,
35 * and your server can handle the incoming bytes.
37 * BACKWARDS COMPATIBLITY: for servers already taking a pool of
38 * Acceptor objects, an AcceptorFactory can be given directly instead
39 * of a pipeline factory.
41 template <typename Pipeline>
42 class ServerBootstrap {
45 ServerBootstrap(const ServerBootstrap& that) = delete;
46 ServerBootstrap(ServerBootstrap&& that) = default;
48 ServerBootstrap() = default;
55 typedef wangle::Pipeline<void*> AcceptPipeline;
57 * Pipeline used to add connections to event bases.
58 * This is used for UDP or for load balancing
59 * TCP connections to IO threads explicitly
61 ServerBootstrap* pipeline(
62 std::shared_ptr<PipelineFactory<AcceptPipeline>> factory) {
67 ServerBootstrap* channelFactory(
68 std::shared_ptr<ServerSocketFactory> factory) {
69 socketFactory_ = factory;
74 * BACKWARDS COMPATIBILITY - an acceptor factory can be set. Your
75 * Acceptor is responsible for managing the connection pool.
77 * @param childHandler - acceptor factory to call for each IO thread
79 ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> h) {
85 * Set a pipeline factory that will be called for each new connection
87 * @param factory pipeline factory to use for each new connection
89 ServerBootstrap* childPipeline(
90 std::shared_ptr<PipelineFactory<Pipeline>> factory) {
91 childPipelineFactory_ = factory;
96 * Set the IO executor. If not set, a default one will be created
97 * with one thread per core.
99 * @param io_group - io executor to use for IO threads.
101 ServerBootstrap* group(
102 std::shared_ptr<folly::wangle::IOThreadPoolExecutor> io_group) {
103 return group(nullptr, io_group);
107 * Set the acceptor executor, and IO executor.
109 * If no acceptor executor is set, a single thread will be created for accepts
110 * If no IO executor is set, a default of one thread per core will be created
112 * @param group - acceptor executor to use for acceptor threads.
113 * @param io_group - io executor to use for IO threads.
115 ServerBootstrap* group(
116 std::shared_ptr<folly::wangle::IOThreadPoolExecutor> accept_group,
117 std::shared_ptr<wangle::IOThreadPoolExecutor> io_group) {
119 accept_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
120 1, std::make_shared<wangle::NamedThreadFactory>("Acceptor Thread"));
123 auto threads = std::thread::hardware_concurrency();
125 // Reasonable mid-point for concurrency when actual value unknown
128 io_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
129 threads, std::make_shared<wangle::NamedThreadFactory>("IO Thread"));
132 // TODO better config checking
133 // CHECK(acceptorFactory_ || childPipelineFactory_);
134 CHECK(!(acceptorFactory_ && childPipelineFactory_));
136 if (acceptorFactory_) {
137 workerFactory_ = std::make_shared<ServerWorkerPool>(
138 acceptorFactory_, io_group.get(), sockets_, socketFactory_);
140 workerFactory_ = std::make_shared<ServerWorkerPool>(
141 std::make_shared<ServerAcceptorFactory<Pipeline>>(
142 childPipelineFactory_,
144 io_group.get(), sockets_, socketFactory_);
147 io_group->addObserver(workerFactory_);
149 acceptor_group_ = accept_group;
150 io_group_ = io_group;
156 * Bind to an existing socket
158 * @param sock Existing socket to use for accepting
160 void bind(folly::AsyncServerSocket::UniquePtr s) {
161 if (!workerFactory_) {
165 // Since only a single socket is given,
166 // we can only accept on a single thread
167 CHECK(acceptor_group_->numThreads() == 1);
169 std::shared_ptr<folly::AsyncServerSocket> socket(
170 s.release(), DelayedDestruction::Destructor());
172 folly::Baton<> barrier;
173 acceptor_group_->add([&](){
174 socket->attachEventBase(EventBaseManager::get()->getEventBase());
175 socket->listen(socketConfig.acceptBacklog);
176 socket->startAccepting();
181 // Startup all the threads
182 workerFactory_->forEachWorker([this, socket](Acceptor* worker){
183 socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
184 [this, worker, socket](){
185 socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
189 sockets_->push_back(socket);
192 void bind(folly::SocketAddress& address) {
193 bindImpl(-1, address);
197 * Bind to a port and start listening.
198 * One of childPipeline or childHandler must be called before bind
200 * @param port Port to listen on
202 void bind(int port) {
204 folly::SocketAddress address;
205 bindImpl(port, address);
208 void bindImpl(int port, folly::SocketAddress& address) {
209 if (!workerFactory_) {
213 bool reusePort = false;
214 if (acceptor_group_->numThreads() > 1) {
218 std::mutex sock_lock;
219 std::vector<std::shared_ptr<folly::AsyncSocketBase>> new_sockets;
222 std::exception_ptr exn;
224 auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier){
227 auto socket = socketFactory_->newSocket(
228 port, address, socketConfig.acceptBacklog, reusePort, socketConfig);
231 new_sockets.push_back(socket);
235 socket->getAddress(&address);
236 port = address.getPort();
241 exn = std::current_exception();
251 auto wait0 = std::make_shared<folly::Baton<>>();
252 acceptor_group_->add(std::bind(startupFunc, wait0));
255 for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
256 auto barrier = std::make_shared<folly::Baton<>>();
257 acceptor_group_->add(std::bind(startupFunc, barrier));
262 std::rethrow_exception(exn);
265 for (auto& socket : new_sockets) {
266 // Startup all the threads
267 workerFactory_->forEachWorker([this, socket](Acceptor* worker){
268 socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
269 [this, worker, socket](){
270 socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
274 sockets_->push_back(socket);
279 * Stop listening on all sockets.
282 // sockets_ may be null if ServerBootstrap has been std::move'd
284 for (auto socket : *sockets_) {
285 socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
287 socketFactory_->stopSocket(socket);
294 // stopBaton_ may be null if ServerBootstrap has been std::move'd
302 if (acceptor_group_) {
303 acceptor_group_->join();
318 * Get the list of listening sockets
320 const std::vector<std::shared_ptr<folly::AsyncSocketBase>>&
325 std::shared_ptr<wangle::IOThreadPoolExecutor> getIOGroup() const {
329 template <typename F>
330 void forEachWorker(F&& f) const {
331 workerFactory_->forEachWorker(f);
334 ServerSocketConfig socketConfig;
337 std::shared_ptr<wangle::IOThreadPoolExecutor> acceptor_group_;
338 std::shared_ptr<wangle::IOThreadPoolExecutor> io_group_;
340 std::shared_ptr<ServerWorkerPool> workerFactory_;
341 std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets_{
342 std::make_shared<std::vector<std::shared_ptr<folly::AsyncSocketBase>>>()};
344 std::shared_ptr<AcceptorFactory> acceptorFactory_;
345 std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
346 std::shared_ptr<PipelineFactory<AcceptPipeline>> pipeline_{
347 std::make_shared<DefaultAcceptPipelineFactory>()};
348 std::shared_ptr<ServerSocketFactory> socketFactory_{
349 std::make_shared<AsyncServerSocketFactory>()};
351 std::unique_ptr<folly::Baton<>> stopBaton_{
352 folly::make_unique<folly::Baton<>>()};
353 bool stopped_{false};