httpserver on serverbootstrap (2)
[folly.git] / folly / wangle / bootstrap / ServerBootstrap.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
19 #include <folly/Baton.h>
20 #include <folly/wangle/channel/ChannelPipeline.h>
21
22 namespace folly {
23
24 typedef folly::wangle::ChannelPipeline<
25   folly::IOBufQueue&, std::unique_ptr<folly::IOBuf>> DefaultPipeline;
26
27 /*
28  * ServerBootstrap is a parent class intended to set up a
29  * high-performance TCP accepting server.  It will manage a pool of
30  * accepting threads, any number of accepting sockets, a pool of
31  * IO-worker threads, and connection pool for each IO thread for you.
32  *
33  * The output is given as a ChannelPipeline template: given a
34  * PipelineFactory, it will create a new pipeline for each connection,
35  * and your server can handle the incoming bytes.
36  *
37  * BACKWARDS COMPATIBLITY: for servers already taking a pool of
38  * Acceptor objects, an AcceptorFactory can be given directly instead
39  * of a pipeline factory.
40  */
41 template <typename Pipeline>
42 class ServerBootstrap {
43  public:
44
45   ServerBootstrap(const ServerBootstrap& that) = delete;
46   ServerBootstrap(ServerBootstrap&& that) = default;
47
48   ServerBootstrap() {}
49
50   ~ServerBootstrap() {
51     stop();
52     join();
53   }
54
55   typedef wangle::ChannelPipeline<
56    void*,
57    std::exception> AcceptPipeline;
58   /*
59    * Pipeline used to add connections to event bases.
60    * This is used for UDP or for load balancing
61    * TCP connections to IO threads explicitly
62    */
63   ServerBootstrap* pipeline(
64     std::shared_ptr<PipelineFactory<AcceptPipeline>> factory) {
65     pipeline_ = factory;
66     return this;
67   }
68
69   ServerBootstrap* channelFactory(
70     std::shared_ptr<ServerSocketFactory> factory) {
71     socketFactory_ = factory;
72     return this;
73   }
74
75   /*
76    * BACKWARDS COMPATIBILITY - an acceptor factory can be set.  Your
77    * Acceptor is responsible for managing the connection pool.
78    *
79    * @param childHandler - acceptor factory to call for each IO thread
80    */
81   ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> h) {
82     acceptorFactory_ = h;
83     return this;
84   }
85
86   /*
87    * Set a pipeline factory that will be called for each new connection
88    *
89    * @param factory pipeline factory to use for each new connection
90    */
91   ServerBootstrap* childPipeline(
92       std::shared_ptr<PipelineFactory<Pipeline>> factory) {
93     childPipelineFactory_ = factory;
94     return this;
95   }
96
97   /*
98    * Set the IO executor.  If not set, a default one will be created
99    * with one thread per core.
100    *
101    * @param io_group - io executor to use for IO threads.
102    */
103   ServerBootstrap* group(
104       std::shared_ptr<folly::wangle::IOThreadPoolExecutor> io_group) {
105     return group(nullptr, io_group);
106   }
107
108   /*
109    * Set the acceptor executor, and IO executor.
110    *
111    * If no acceptor executor is set, a single thread will be created for accepts
112    * If no IO executor is set, a default of one thread per core will be created
113    *
114    * @param group - acceptor executor to use for acceptor threads.
115    * @param io_group - io executor to use for IO threads.
116    */
117   ServerBootstrap* group(
118       std::shared_ptr<folly::wangle::IOThreadPoolExecutor> accept_group,
119       std::shared_ptr<wangle::IOThreadPoolExecutor> io_group) {
120     if (!accept_group) {
121       accept_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
122         1, std::make_shared<wangle::NamedThreadFactory>("Acceptor Thread"));
123     }
124     if (!io_group) {
125       io_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
126         32, std::make_shared<wangle::NamedThreadFactory>("IO Thread"));
127     }
128
129     // TODO better config checking
130     // CHECK(acceptorFactory_ || childPipelineFactory_);
131     CHECK(!(acceptorFactory_ && childPipelineFactory_));
132
133     if (acceptorFactory_) {
134       workerFactory_ = std::make_shared<ServerWorkerPool>(
135         acceptorFactory_, io_group.get(), &sockets_, socketFactory_);
136     } else {
137       workerFactory_ = std::make_shared<ServerWorkerPool>(
138         std::make_shared<ServerAcceptorFactory<Pipeline>>(
139           childPipelineFactory_,
140           pipeline_),
141         io_group.get(), &sockets_, socketFactory_);
142     }
143
144     io_group->addObserver(workerFactory_);
145
146     acceptor_group_ = accept_group;
147     io_group_ = io_group;
148
149     return this;
150   }
151
152   /*
153    * Bind to an existing socket
154    *
155    * @param sock Existing socket to use for accepting
156    */
157   void bind(folly::AsyncServerSocket::UniquePtr s) {
158     if (!workerFactory_) {
159       group(nullptr);
160     }
161
162     // Since only a single socket is given,
163     // we can only accept on a single thread
164     CHECK(acceptor_group_->numThreads() == 1);
165
166     std::shared_ptr<folly::AsyncServerSocket> socket(
167       s.release(), DelayedDestruction::Destructor());
168
169     folly::Baton<> barrier;
170     acceptor_group_->add([&](){
171       socket->attachEventBase(EventBaseManager::get()->getEventBase());
172       socket->listen(socketConfig.acceptBacklog);
173       socket->startAccepting();
174       barrier.post();
175     });
176     barrier.wait();
177
178     // Startup all the threads
179     workerFactory_->forEachWorker([this, socket](Acceptor* worker){
180       socket->getEventBase()->runInEventBaseThreadAndWait(
181         [this, worker, socket](){
182           socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
183       });
184     });
185
186     sockets_.push_back(socket);
187   }
188
189   void bind(folly::SocketAddress& address) {
190     bindImpl(-1, address);
191   }
192
193   /*
194    * Bind to a port and start listening.
195    * One of childPipeline or childHandler must be called before bind
196    *
197    * @param port Port to listen on
198    */
199   void bind(int port) {
200     CHECK(port >= 0);
201     folly::SocketAddress address;
202     bindImpl(port, address);
203   }
204
205   void bindImpl(int port, folly::SocketAddress& address) {
206     if (!workerFactory_) {
207       group(nullptr);
208     }
209
210     bool reusePort = false;
211     if (acceptor_group_->numThreads() > 1) {
212       reusePort = true;
213     }
214
215     std::mutex sock_lock;
216     std::vector<std::shared_ptr<folly::AsyncSocketBase>> new_sockets;
217
218
219     std::exception_ptr exn;
220
221     auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier){
222
223       try {
224         auto socket = socketFactory_->newSocket(
225           port, address, socketConfig.acceptBacklog, reusePort, socketConfig);
226
227         sock_lock.lock();
228         new_sockets.push_back(socket);
229         sock_lock.unlock();
230
231         if (port <= 0) {
232           socket->getAddress(&address);
233           port = address.getPort();
234         }
235
236         barrier->post();
237       } catch (...) {
238         exn = std::current_exception();
239         barrier->post();
240
241         return;
242       }
243
244
245
246     };
247
248     auto wait0 = std::make_shared<folly::Baton<>>();
249     acceptor_group_->add(std::bind(startupFunc, wait0));
250     wait0->wait();
251
252     for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
253       auto barrier = std::make_shared<folly::Baton<>>();
254       acceptor_group_->add(std::bind(startupFunc, barrier));
255       barrier->wait();
256     }
257
258     if (exn) {
259       std::rethrow_exception(exn);
260     }
261
262     for (auto& socket : new_sockets) {
263       // Startup all the threads
264       workerFactory_->forEachWorker([this, socket](Acceptor* worker){
265         socket->getEventBase()->runInEventBaseThreadAndWait([this, worker, socket](){
266           socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
267         });
268       });
269
270       sockets_.push_back(socket);
271     }
272   }
273
274   /*
275    * Stop listening on all sockets.
276    */
277   void stop() {
278     for (auto socket : sockets_) {
279       folly::Baton<> barrier;
280       socket->getEventBase()->runInEventBaseThread([&]() mutable {
281         socketFactory_->stopSocket(socket);
282         barrier.post();
283       });
284       barrier.wait();
285     }
286     sockets_.clear();
287   }
288
289   void join() {
290     if (acceptor_group_) {
291       acceptor_group_->join();
292     }
293     if (io_group_) {
294       io_group_->join();
295     }
296   }
297
298   /*
299    * Get the list of listening sockets
300    */
301   const std::vector<std::shared_ptr<folly::AsyncSocketBase>>&
302   getSockets() const {
303     return sockets_;
304   }
305
306   std::shared_ptr<wangle::IOThreadPoolExecutor> getIOGroup() const {
307     return io_group_;
308   }
309
310   template <typename F>
311   void forEachWorker(F&& f) const {
312     workerFactory_->forEachWorker(f);
313   }
314
315   ServerSocketConfig socketConfig;
316
317  private:
318   std::shared_ptr<wangle::IOThreadPoolExecutor> acceptor_group_;
319   std::shared_ptr<wangle::IOThreadPoolExecutor> io_group_;
320
321   std::shared_ptr<ServerWorkerPool> workerFactory_;
322   std::vector<std::shared_ptr<folly::AsyncSocketBase>> sockets_;
323
324   std::shared_ptr<AcceptorFactory> acceptorFactory_;
325   std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
326   std::shared_ptr<PipelineFactory<AcceptPipeline>> pipeline_{
327     std::make_shared<DefaultAcceptPipelineFactory>()};
328   std::shared_ptr<ServerSocketFactory> socketFactory_{
329     std::make_shared<AsyncServerSocketFactory>()};
330 };
331
332 } // namespace