82465988e7660b763706f1ce7eb02046d79cd266
[folly.git] / folly / wangle / bootstrap / ServerBootstrap.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
19 #include <folly/Baton.h>
20 #include <folly/wangle/channel/ChannelPipeline.h>
21
22 namespace folly {
23
24 typedef folly::wangle::ChannelPipeline<
25   folly::IOBufQueue&, std::unique_ptr<folly::IOBuf>> DefaultPipeline;
26
27 /*
28  * ServerBootstrap is a parent class intended to set up a
29  * high-performance TCP accepting server.  It will manage a pool of
30  * accepting threads, any number of accepting sockets, a pool of
31  * IO-worker threads, and connection pool for each IO thread for you.
32  *
33  * The output is given as a ChannelPipeline template: given a
34  * PipelineFactory, it will create a new pipeline for each connection,
35  * and your server can handle the incoming bytes.
36  *
37  * BACKWARDS COMPATIBLITY: for servers already taking a pool of
38  * Acceptor objects, an AcceptorFactory can be given directly instead
39  * of a pipeline factory.
40  */
41 template <typename Pipeline>
42 class ServerBootstrap {
43  public:
44
45   ~ServerBootstrap() {
46     stop();
47   }
48
49   typedef wangle::ChannelPipeline<
50    void*,
51    std::exception> AcceptPipeline;
52   /*
53    * Pipeline used to add connections to event bases.
54    * This is used for UDP or for load balancing
55    * TCP connections to IO threads explicitly
56    */
57   ServerBootstrap* pipeline(
58     std::shared_ptr<PipelineFactory<AcceptPipeline>> factory) {
59     pipeline_ = factory;
60     return this;
61   }
62
63   ServerBootstrap* channelFactory(
64     std::shared_ptr<ServerSocketFactory> factory) {
65     socketFactory_ = factory;
66     return this;
67   }
68
69   /*
70    * BACKWARDS COMPATIBILITY - an acceptor factory can be set.  Your
71    * Acceptor is responsible for managing the connection pool.
72    *
73    * @param childHandler - acceptor factory to call for each IO thread
74    */
75   ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> h) {
76     acceptorFactory_ = h;
77     return this;
78   }
79
80   /*
81    * Set a pipeline factory that will be called for each new connection
82    *
83    * @param factory pipeline factory to use for each new connection
84    */
85   ServerBootstrap* childPipeline(
86       std::shared_ptr<PipelineFactory<Pipeline>> factory) {
87     childPipelineFactory_ = factory;
88     return this;
89   }
90
91   /*
92    * Set the IO executor.  If not set, a default one will be created
93    * with one thread per core.
94    *
95    * @param io_group - io executor to use for IO threads.
96    */
97   ServerBootstrap* group(
98       std::shared_ptr<folly::wangle::IOThreadPoolExecutor> io_group) {
99     return group(nullptr, io_group);
100   }
101
102   /*
103    * Set the acceptor executor, and IO executor.
104    *
105    * If no acceptor executor is set, a single thread will be created for accepts
106    * If no IO executor is set, a default of one thread per core will be created
107    *
108    * @param group - acceptor executor to use for acceptor threads.
109    * @param io_group - io executor to use for IO threads.
110    */
111   ServerBootstrap* group(
112       std::shared_ptr<folly::wangle::IOThreadPoolExecutor> accept_group,
113       std::shared_ptr<wangle::IOThreadPoolExecutor> io_group) {
114     if (!accept_group) {
115       accept_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
116         1, std::make_shared<wangle::NamedThreadFactory>("Acceptor Thread"));
117     }
118     if (!io_group) {
119       io_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
120         32, std::make_shared<wangle::NamedThreadFactory>("IO Thread"));
121     }
122
123     // TODO better config checking
124     // CHECK(acceptorFactory_ || childPipelineFactory_);
125     CHECK(!(acceptorFactory_ && childPipelineFactory_));
126
127     if (acceptorFactory_) {
128       workerFactory_ = std::make_shared<ServerWorkerPool>(
129         acceptorFactory_, io_group.get(), &sockets_, socketFactory_);
130     } else {
131       workerFactory_ = std::make_shared<ServerWorkerPool>(
132         std::make_shared<ServerAcceptorFactory<Pipeline>>(
133           childPipelineFactory_,
134           pipeline_),
135         io_group.get(), &sockets_, socketFactory_);
136     }
137
138     io_group->addObserver(workerFactory_);
139
140     acceptor_group_ = accept_group;
141     io_group_ = io_group;
142
143     return this;
144   }
145
146   /*
147    * Bind to an existing socket
148    *
149    * @param sock Existing socket to use for accepting
150    */
151   void bind(folly::AsyncServerSocket::UniquePtr s) {
152     if (!workerFactory_) {
153       group(nullptr);
154     }
155
156     // Since only a single socket is given,
157     // we can only accept on a single thread
158     CHECK(acceptor_group_->numThreads() == 1);
159
160     std::shared_ptr<folly::AsyncServerSocket> socket(
161       s.release(), DelayedDestruction::Destructor());
162
163     folly::Baton<> barrier;
164     acceptor_group_->add([&](){
165       socket->attachEventBase(EventBaseManager::get()->getEventBase());
166       socket->listen(socketConfig.acceptBacklog);
167       socket->startAccepting();
168       barrier.post();
169     });
170     barrier.wait();
171
172     // Startup all the threads
173     workerFactory_->forEachWorker([this, socket](Acceptor* worker){
174       socket->getEventBase()->runInEventBaseThreadAndWait(
175         [this, worker, socket](){
176           socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
177       });
178     });
179
180     sockets_.push_back(socket);
181   }
182
183   void bind(folly::SocketAddress& address) {
184     bindImpl(-1, address);
185   }
186
187   /*
188    * Bind to a port and start listening.
189    * One of childPipeline or childHandler must be called before bind
190    *
191    * @param port Port to listen on
192    */
193   void bind(int port) {
194     CHECK(port >= 0);
195     folly::SocketAddress address;
196     bindImpl(port, address);
197   }
198
199   void bindImpl(int port, folly::SocketAddress& address) {
200     if (!workerFactory_) {
201       group(nullptr);
202     }
203
204     bool reusePort = false;
205     if (acceptor_group_->numThreads() > 1) {
206       reusePort = true;
207     }
208
209     std::mutex sock_lock;
210     std::vector<std::shared_ptr<folly::AsyncSocketBase>> new_sockets;
211
212
213     std::exception_ptr exn;
214
215     auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier){
216
217       try {
218         auto socket = socketFactory_->newSocket(
219           port, address, socketConfig.acceptBacklog, reusePort, socketConfig);
220
221         sock_lock.lock();
222         new_sockets.push_back(socket);
223         sock_lock.unlock();
224
225         if (port == 0) {
226           socket->getAddress(&address);
227           port = address.getPort();
228         }
229
230         barrier->post();
231       } catch (...) {
232         exn = std::current_exception();
233         barrier->post();
234
235         return;
236       }
237
238
239
240     };
241
242     auto wait0 = std::make_shared<folly::Baton<>>();
243     acceptor_group_->add(std::bind(startupFunc, wait0));
244     wait0->wait();
245
246     for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
247       auto barrier = std::make_shared<folly::Baton<>>();
248       acceptor_group_->add(std::bind(startupFunc, barrier));
249       barrier->wait();
250     }
251
252     if (exn) {
253       std::rethrow_exception(exn);
254     }
255
256     for (auto& socket : new_sockets) {
257       // Startup all the threads
258       workerFactory_->forEachWorker([this, socket](Acceptor* worker){
259         socket->getEventBase()->runInEventBaseThreadAndWait([this, worker, socket](){
260           socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
261         });
262       });
263
264       sockets_.push_back(socket);
265     }
266   }
267
268   /*
269    * Stop listening on all sockets.
270    */
271   void stop() {
272     for (auto socket : sockets_) {
273       folly::Baton<> barrier;
274       socket->getEventBase()->runInEventBaseThread([&]() mutable {
275         socketFactory_->stopSocket(socket);
276         barrier.post();
277       });
278       barrier.wait();
279     }
280     sockets_.clear();
281
282     if (acceptor_group_) {
283       acceptor_group_->join();
284     }
285     if (io_group_) {
286       io_group_->join();
287     }
288   }
289
290   /*
291    * Get the list of listening sockets
292    */
293   const std::vector<std::shared_ptr<folly::AsyncSocketBase>>&
294   getSockets() const {
295     return sockets_;
296   }
297
298   std::shared_ptr<wangle::IOThreadPoolExecutor> getIOGroup() const {
299     return io_group_;
300   }
301
302   template <typename F>
303   void forEachWorker(F&& f) const {
304     workerFactory_->forEachWorker(f);
305   }
306
307   ServerSocketConfig socketConfig;
308
309  private:
310   std::shared_ptr<wangle::IOThreadPoolExecutor> acceptor_group_;
311   std::shared_ptr<wangle::IOThreadPoolExecutor> io_group_;
312
313   std::shared_ptr<ServerWorkerPool> workerFactory_;
314   std::vector<std::shared_ptr<folly::AsyncSocketBase>> sockets_;
315
316   std::shared_ptr<AcceptorFactory> acceptorFactory_;
317   std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
318   std::shared_ptr<PipelineFactory<AcceptPipeline>> pipeline_{
319     std::make_shared<DefaultAcceptPipelineFactory>()};
320   std::shared_ptr<ServerSocketFactory> socketFactory_{
321     std::make_shared<AsyncServerSocketFactory>()};
322 };
323
324 } // namespace