Bump version to 52:0
[folly.git] / folly / wangle / bootstrap / ServerBootstrap.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
19 #include <folly/Baton.h>
20 #include <folly/wangle/channel/Pipeline.h>
21
22 namespace folly {
23
24 typedef folly::wangle::Pipeline<
25   folly::IOBufQueue&, std::unique_ptr<folly::IOBuf>> DefaultPipeline;
26
27 /*
28  * ServerBootstrap is a parent class intended to set up a
29  * high-performance TCP accepting server.  It will manage a pool of
30  * accepting threads, any number of accepting sockets, a pool of
31  * IO-worker threads, and connection pool for each IO thread for you.
32  *
33  * The output is given as a Pipeline template: given a
34  * PipelineFactory, it will create a new pipeline for each connection,
35  * and your server can handle the incoming bytes.
36  *
37  * BACKWARDS COMPATIBLITY: for servers already taking a pool of
38  * Acceptor objects, an AcceptorFactory can be given directly instead
39  * of a pipeline factory.
40  */
41 template <typename Pipeline>
42 class ServerBootstrap {
43  public:
44
45   ServerBootstrap(const ServerBootstrap& that) = delete;
46   ServerBootstrap(ServerBootstrap&& that) = default;
47
48   ServerBootstrap() = default;
49
50   ~ServerBootstrap() {
51     stop();
52     join();
53   }
54
55   typedef wangle::Pipeline<void*> AcceptPipeline;
56   /*
57    * Pipeline used to add connections to event bases.
58    * This is used for UDP or for load balancing
59    * TCP connections to IO threads explicitly
60    */
61   ServerBootstrap* pipeline(
62     std::shared_ptr<PipelineFactory<AcceptPipeline>> factory) {
63     pipeline_ = factory;
64     return this;
65   }
66
67   ServerBootstrap* channelFactory(
68     std::shared_ptr<ServerSocketFactory> factory) {
69     socketFactory_ = factory;
70     return this;
71   }
72
73   /*
74    * BACKWARDS COMPATIBILITY - an acceptor factory can be set.  Your
75    * Acceptor is responsible for managing the connection pool.
76    *
77    * @param childHandler - acceptor factory to call for each IO thread
78    */
79   ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> h) {
80     acceptorFactory_ = h;
81     return this;
82   }
83
84   /*
85    * Set a pipeline factory that will be called for each new connection
86    *
87    * @param factory pipeline factory to use for each new connection
88    */
89   ServerBootstrap* childPipeline(
90       std::shared_ptr<PipelineFactory<Pipeline>> factory) {
91     childPipelineFactory_ = factory;
92     return this;
93   }
94
95   /*
96    * Set the IO executor.  If not set, a default one will be created
97    * with one thread per core.
98    *
99    * @param io_group - io executor to use for IO threads.
100    */
101   ServerBootstrap* group(
102       std::shared_ptr<folly::wangle::IOThreadPoolExecutor> io_group) {
103     return group(nullptr, io_group);
104   }
105
106   /*
107    * Set the acceptor executor, and IO executor.
108    *
109    * If no acceptor executor is set, a single thread will be created for accepts
110    * If no IO executor is set, a default of one thread per core will be created
111    *
112    * @param group - acceptor executor to use for acceptor threads.
113    * @param io_group - io executor to use for IO threads.
114    */
115   ServerBootstrap* group(
116       std::shared_ptr<folly::wangle::IOThreadPoolExecutor> accept_group,
117       std::shared_ptr<wangle::IOThreadPoolExecutor> io_group) {
118     if (!accept_group) {
119       accept_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
120         1, std::make_shared<wangle::NamedThreadFactory>("Acceptor Thread"));
121     }
122     if (!io_group) {
123       auto threads = std::thread::hardware_concurrency();
124       if (threads <= 0) {
125         // Reasonable mid-point for concurrency when actual value unknown
126         threads = 8;
127       }
128       io_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
129         threads, std::make_shared<wangle::NamedThreadFactory>("IO Thread"));
130     }
131
132     // TODO better config checking
133     // CHECK(acceptorFactory_ || childPipelineFactory_);
134     CHECK(!(acceptorFactory_ && childPipelineFactory_));
135
136     if (acceptorFactory_) {
137       workerFactory_ = std::make_shared<ServerWorkerPool>(
138         acceptorFactory_, io_group.get(), sockets_, socketFactory_);
139     } else {
140       workerFactory_ = std::make_shared<ServerWorkerPool>(
141         std::make_shared<ServerAcceptorFactory<Pipeline>>(
142           childPipelineFactory_,
143           pipeline_),
144         io_group.get(), sockets_, socketFactory_);
145     }
146
147     io_group->addObserver(workerFactory_);
148
149     acceptor_group_ = accept_group;
150     io_group_ = io_group;
151
152     return this;
153   }
154
155   /*
156    * Bind to an existing socket
157    *
158    * @param sock Existing socket to use for accepting
159    */
160   void bind(folly::AsyncServerSocket::UniquePtr s) {
161     if (!workerFactory_) {
162       group(nullptr);
163     }
164
165     // Since only a single socket is given,
166     // we can only accept on a single thread
167     CHECK(acceptor_group_->numThreads() == 1);
168
169     std::shared_ptr<folly::AsyncServerSocket> socket(
170       s.release(), DelayedDestruction::Destructor());
171
172     folly::Baton<> barrier;
173     acceptor_group_->add([&](){
174       socket->attachEventBase(EventBaseManager::get()->getEventBase());
175       socket->listen(socketConfig.acceptBacklog);
176       socket->startAccepting();
177       barrier.post();
178     });
179     barrier.wait();
180
181     // Startup all the threads
182     workerFactory_->forEachWorker([this, socket](Acceptor* worker){
183       socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
184         [this, worker, socket](){
185           socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
186       });
187     });
188
189     sockets_->push_back(socket);
190   }
191
192   void bind(folly::SocketAddress& address) {
193     bindImpl(-1, address);
194   }
195
196   /*
197    * Bind to a port and start listening.
198    * One of childPipeline or childHandler must be called before bind
199    *
200    * @param port Port to listen on
201    */
202   void bind(int port) {
203     CHECK(port >= 0);
204     folly::SocketAddress address;
205     bindImpl(port, address);
206   }
207
208   void bindImpl(int port, folly::SocketAddress& address) {
209     if (!workerFactory_) {
210       group(nullptr);
211     }
212
213     bool reusePort = false;
214     if (acceptor_group_->numThreads() > 1) {
215       reusePort = true;
216     }
217
218     std::mutex sock_lock;
219     std::vector<std::shared_ptr<folly::AsyncSocketBase>> new_sockets;
220
221
222     std::exception_ptr exn;
223
224     auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier){
225
226       try {
227         auto socket = socketFactory_->newSocket(
228           port, address, socketConfig.acceptBacklog, reusePort, socketConfig);
229
230         sock_lock.lock();
231         new_sockets.push_back(socket);
232         sock_lock.unlock();
233
234         if (port <= 0) {
235           socket->getAddress(&address);
236           port = address.getPort();
237         }
238
239         barrier->post();
240       } catch (...) {
241         exn = std::current_exception();
242         barrier->post();
243
244         return;
245       }
246
247
248
249     };
250
251     auto wait0 = std::make_shared<folly::Baton<>>();
252     acceptor_group_->add(std::bind(startupFunc, wait0));
253     wait0->wait();
254
255     for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
256       auto barrier = std::make_shared<folly::Baton<>>();
257       acceptor_group_->add(std::bind(startupFunc, barrier));
258       barrier->wait();
259     }
260
261     if (exn) {
262       std::rethrow_exception(exn);
263     }
264
265     for (auto& socket : new_sockets) {
266       // Startup all the threads
267       workerFactory_->forEachWorker([this, socket](Acceptor* worker){
268         socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
269           [this, worker, socket](){
270             socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
271         });
272       });
273
274       sockets_->push_back(socket);
275     }
276   }
277
278   /*
279    * Stop listening on all sockets.
280    */
281   void stop() {
282     // sockets_ may be null if ServerBootstrap has been std::move'd
283     if (sockets_) {
284       for (auto socket : *sockets_) {
285         socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
286           [&]() mutable {
287             socketFactory_->stopSocket(socket);
288           });
289       }
290       sockets_->clear();
291     }
292     if (!stopped_) {
293       stopped_ = true;
294       // stopBaton_ may be null if ServerBootstrap has been std::move'd
295       if (stopBaton_) {
296         stopBaton_->post();
297       }
298     }
299   }
300
301   void join() {
302     if (acceptor_group_) {
303       acceptor_group_->join();
304     }
305     if (io_group_) {
306       io_group_->join();
307     }
308   }
309
310   void waitForStop() {
311     if (!stopped_) {
312       CHECK(stopBaton_);
313       stopBaton_->wait();
314     }
315   }
316
317   /*
318    * Get the list of listening sockets
319    */
320   const std::vector<std::shared_ptr<folly::AsyncSocketBase>>&
321   getSockets() const {
322     return *sockets_;
323   }
324
325   std::shared_ptr<wangle::IOThreadPoolExecutor> getIOGroup() const {
326     return io_group_;
327   }
328
329   template <typename F>
330   void forEachWorker(F&& f) const {
331     workerFactory_->forEachWorker(f);
332   }
333
334   ServerSocketConfig socketConfig;
335
336  private:
337   std::shared_ptr<wangle::IOThreadPoolExecutor> acceptor_group_;
338   std::shared_ptr<wangle::IOThreadPoolExecutor> io_group_;
339
340   std::shared_ptr<ServerWorkerPool> workerFactory_;
341   std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets_{
342     std::make_shared<std::vector<std::shared_ptr<folly::AsyncSocketBase>>>()};
343
344   std::shared_ptr<AcceptorFactory> acceptorFactory_;
345   std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
346   std::shared_ptr<PipelineFactory<AcceptPipeline>> pipeline_{
347     std::make_shared<DefaultAcceptPipelineFactory>()};
348   std::shared_ptr<ServerSocketFactory> socketFactory_{
349     std::make_shared<AsyncServerSocketFactory>()};
350
351   std::unique_ptr<folly::Baton<>> stopBaton_{
352     folly::make_unique<folly::Baton<>>()};
353   bool stopped_{false};
354 };
355
356 } // namespace