Copyright 2014->2015
[folly.git] / folly / wangle / bootstrap / ServerBootstrap.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
19 #include <folly/Baton.h>
20
21 namespace folly {
22
23 /*
24  * ServerBootstrap is a parent class intended to set up a
25  * high-performance TCP accepting server.  It will manage a pool of
26  * accepting threads, any number of accepting sockets, a pool of
27  * IO-worker threads, and connection pool for each IO thread for you.
28  *
29  * The output is given as a ChannelPipeline template: given a
30  * PipelineFactory, it will create a new pipeline for each connection,
31  * and your server can handle the incoming bytes.
32  *
33  * BACKWARDS COMPATIBLITY: for servers already taking a pool of
34  * Acceptor objects, an AcceptorFactory can be given directly instead
35  * of a pipeline factory.
36  */
37 template <typename Pipeline>
38 class ServerBootstrap {
39  public:
40
41   ~ServerBootstrap() {
42     stop();
43   }
44   /* TODO(davejwatson)
45    *
46    * If there is any work to be done BEFORE handing the work to IO
47    * threads, this handler is where the pipeline to do it would be
48    * set.
49    *
50    * This could be used for things like logging, load balancing, or
51    * advanced load balancing on IO threads.  Netty also provides this.
52    */
53   ServerBootstrap* handler() {
54     return this;
55   }
56
57   /*
58    * BACKWARDS COMPATIBILITY - an acceptor factory can be set.  Your
59    * Acceptor is responsible for managing the connection pool.
60    *
61    * @param childHandler - acceptor factory to call for each IO thread
62    */
63   ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> childHandler) {
64     acceptorFactory_ = childHandler;
65     return this;
66   }
67
68   /*
69    * Set a pipeline factory that will be called for each new connection
70    *
71    * @param factory pipeline factory to use for each new connection
72    */
73   ServerBootstrap* childPipeline(
74       std::shared_ptr<PipelineFactory<Pipeline>> factory) {
75     pipelineFactory_ = factory;
76     return this;
77   }
78
79   /*
80    * Set the IO executor.  If not set, a default one will be created
81    * with one thread per core.
82    *
83    * @param io_group - io executor to use for IO threads.
84    */
85   ServerBootstrap* group(
86       std::shared_ptr<folly::wangle::IOThreadPoolExecutor> io_group) {
87     return group(nullptr, io_group);
88   }
89
90   /*
91    * Set the acceptor executor, and IO executor.
92    *
93    * If no acceptor executor is set, a single thread will be created for accepts
94    * If no IO executor is set, a default of one thread per core will be created
95    *
96    * @param group - acceptor executor to use for acceptor threads.
97    * @param io_group - io executor to use for IO threads.
98    */
99   ServerBootstrap* group(
100       std::shared_ptr<folly::wangle::IOThreadPoolExecutor> accept_group,
101       std::shared_ptr<wangle::IOThreadPoolExecutor> io_group) {
102     if (!accept_group) {
103       accept_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
104         1, std::make_shared<wangle::NamedThreadFactory>("Acceptor Thread"));
105     }
106     if (!io_group) {
107       io_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
108         32, std::make_shared<wangle::NamedThreadFactory>("IO Thread"));
109     }
110
111     CHECK(acceptorFactory_ || pipelineFactory_);
112
113     if (acceptorFactory_) {
114       workerFactory_ = std::make_shared<ServerWorkerPool>(
115         acceptorFactory_, io_group.get(), &sockets_);
116     } else {
117       workerFactory_ = std::make_shared<ServerWorkerPool>(
118         std::make_shared<ServerAcceptorFactory<Pipeline>>(pipelineFactory_),
119         io_group.get(), &sockets_);
120     }
121
122     io_group->addObserver(workerFactory_);
123
124     acceptor_group_ = accept_group;
125     io_group_ = io_group;
126
127     return this;
128   }
129
130   /*
131    * Bind to an existing socket
132    *
133    * @param sock Existing socket to use for accepting
134    */
135   void bind(folly::AsyncServerSocket::UniquePtr s) {
136     if (!workerFactory_) {
137       group(nullptr);
138     }
139
140     // Since only a single socket is given,
141     // we can only accept on a single thread
142     CHECK(acceptor_group_->numThreads() == 1);
143     std::shared_ptr<folly::AsyncServerSocket> socket(
144       s.release(), DelayedDestruction::Destructor());
145
146     folly::Baton<> barrier;
147     acceptor_group_->add([&](){
148       socket->attachEventBase(EventBaseManager::get()->getEventBase());
149       socket->listen(1024);
150       socket->startAccepting();
151       barrier.post();
152     });
153     barrier.wait();
154
155     // Startup all the threads
156     workerFactory_->forEachWorker([this, socket](Acceptor* worker){
157       socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
158         socket->addAcceptCallback(worker, worker->getEventBase());
159       });
160     });
161
162     sockets_.push_back(socket);
163   }
164
165   void bind(folly::SocketAddress address) {
166     bindImpl(-1, address);
167   }
168
169   /*
170    * Bind to a port and start listening.
171    * One of childPipeline or childHandler must be called before bind
172    *
173    * @param port Port to listen on
174    */
175   void bind(int port) {
176     CHECK(port >= 0);
177     bindImpl(port, folly::SocketAddress());
178   }
179
180   void bindImpl(int port, folly::SocketAddress address) {
181     if (!workerFactory_) {
182       group(nullptr);
183     }
184
185     bool reusePort = false;
186     if (acceptor_group_->numThreads() > 1) {
187       reusePort = true;
188     }
189
190     std::mutex sock_lock;
191     std::vector<std::shared_ptr<folly::AsyncServerSocket>> new_sockets;
192
193     auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier){
194         auto socket = folly::AsyncServerSocket::newSocket();
195         sock_lock.lock();
196         new_sockets.push_back(socket);
197         sock_lock.unlock();
198         socket->setReusePortEnabled(reusePort);
199         socket->attachEventBase(EventBaseManager::get()->getEventBase());
200         if (port >= 0) {
201           socket->bind(port);
202         } else {
203           socket->bind(address);
204           port = address.getPort();
205         }
206         socket->listen(socketConfig.acceptBacklog);
207         socket->startAccepting();
208
209         if (port == 0) {
210           SocketAddress address;
211           socket->getAddress(&address);
212           port = address.getPort();
213         }
214
215         barrier->post();
216     };
217
218     auto wait0 = std::make_shared<folly::Baton<>>();
219     acceptor_group_->add(std::bind(startupFunc, wait0));
220     wait0->wait();
221
222     for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
223       auto barrier = std::make_shared<folly::Baton<>>();
224       acceptor_group_->add(std::bind(startupFunc, barrier));
225       barrier->wait();
226     }
227
228     // Startup all the threads
229     for(auto socket : new_sockets) {
230       workerFactory_->forEachWorker([this, socket](Acceptor* worker){
231         socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
232           socket->addAcceptCallback(worker, worker->getEventBase());
233         });
234       });
235     }
236
237     for (auto& socket : new_sockets) {
238       sockets_.push_back(socket);
239     }
240   }
241
242   /*
243    * Stop listening on all sockets.
244    */
245   void stop() {
246     for (auto socket : sockets_) {
247       folly::Baton<> barrier;
248       socket->getEventBase()->runInEventBaseThread([&barrier, socket]() {
249         socket->stopAccepting();
250         socket->detachEventBase();
251         barrier.post();
252       });
253       barrier.wait();
254     }
255     sockets_.clear();
256
257     if (acceptor_group_) {
258       acceptor_group_->join();
259     }
260     if (io_group_) {
261       io_group_->join();
262     }
263   }
264
265   /*
266    * Get the list of listening sockets
267    */
268   const std::vector<std::shared_ptr<folly::AsyncServerSocket>>&
269   getSockets() const {
270     return sockets_;
271   }
272
273   std::shared_ptr<wangle::IOThreadPoolExecutor> getIOGroup() const {
274     return io_group_;
275   }
276
277   template <typename F>
278   void forEachWorker(F&& f) const {
279     workerFactory_->forEachWorker(f);
280   }
281
282   ServerSocketConfig socketConfig;
283
284  private:
285   std::shared_ptr<wangle::IOThreadPoolExecutor> acceptor_group_;
286   std::shared_ptr<wangle::IOThreadPoolExecutor> io_group_;
287
288   std::shared_ptr<ServerWorkerPool> workerFactory_;
289   std::vector<std::shared_ptr<folly::AsyncServerSocket>> sockets_;
290
291   std::shared_ptr<AcceptorFactory> acceptorFactory_;
292   std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
293 };
294
295 } // namespace