4940e67b504509b571c7e73e1c3cf5af0b76910a
[folly.git] / folly / wangle / bootstrap / ServerBootstrap.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
19 #include <folly/Baton.h>
20 #include <folly/wangle/channel/Pipeline.h>
21
22 namespace folly {
23
24 typedef folly::wangle::Pipeline<
25   folly::IOBufQueue&, std::unique_ptr<folly::IOBuf>> DefaultPipeline;
26
27 /*
28  * ServerBootstrap is a parent class intended to set up a
29  * high-performance TCP accepting server.  It will manage a pool of
30  * accepting threads, any number of accepting sockets, a pool of
31  * IO-worker threads, and connection pool for each IO thread for you.
32  *
33  * The output is given as a Pipeline template: given a
34  * PipelineFactory, it will create a new pipeline for each connection,
35  * and your server can handle the incoming bytes.
36  *
37  * BACKWARDS COMPATIBLITY: for servers already taking a pool of
38  * Acceptor objects, an AcceptorFactory can be given directly instead
39  * of a pipeline factory.
40  */
41 template <typename Pipeline>
42 class ServerBootstrap {
43  public:
44
45   ServerBootstrap(const ServerBootstrap& that) = delete;
46   ServerBootstrap(ServerBootstrap&& that) = default;
47
48   ServerBootstrap() {}
49
50   ~ServerBootstrap() {
51     stop();
52     join();
53   }
54
55   typedef wangle::Pipeline<void*> AcceptPipeline;
56   /*
57    * Pipeline used to add connections to event bases.
58    * This is used for UDP or for load balancing
59    * TCP connections to IO threads explicitly
60    */
61   ServerBootstrap* pipeline(
62     std::shared_ptr<PipelineFactory<AcceptPipeline>> factory) {
63     pipeline_ = factory;
64     return this;
65   }
66
67   ServerBootstrap* channelFactory(
68     std::shared_ptr<ServerSocketFactory> factory) {
69     socketFactory_ = factory;
70     return this;
71   }
72
73   /*
74    * BACKWARDS COMPATIBILITY - an acceptor factory can be set.  Your
75    * Acceptor is responsible for managing the connection pool.
76    *
77    * @param childHandler - acceptor factory to call for each IO thread
78    */
79   ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> h) {
80     acceptorFactory_ = h;
81     return this;
82   }
83
84   /*
85    * Set a pipeline factory that will be called for each new connection
86    *
87    * @param factory pipeline factory to use for each new connection
88    */
89   ServerBootstrap* childPipeline(
90       std::shared_ptr<PipelineFactory<Pipeline>> factory) {
91     childPipelineFactory_ = factory;
92     return this;
93   }
94
95   /*
96    * Set the IO executor.  If not set, a default one will be created
97    * with one thread per core.
98    *
99    * @param io_group - io executor to use for IO threads.
100    */
101   ServerBootstrap* group(
102       std::shared_ptr<folly::wangle::IOThreadPoolExecutor> io_group) {
103     return group(nullptr, io_group);
104   }
105
106   /*
107    * Set the acceptor executor, and IO executor.
108    *
109    * If no acceptor executor is set, a single thread will be created for accepts
110    * If no IO executor is set, a default of one thread per core will be created
111    *
112    * @param group - acceptor executor to use for acceptor threads.
113    * @param io_group - io executor to use for IO threads.
114    */
115   ServerBootstrap* group(
116       std::shared_ptr<folly::wangle::IOThreadPoolExecutor> accept_group,
117       std::shared_ptr<wangle::IOThreadPoolExecutor> io_group) {
118     if (!accept_group) {
119       accept_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
120         1, std::make_shared<wangle::NamedThreadFactory>("Acceptor Thread"));
121     }
122     if (!io_group) {
123       io_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
124         32, std::make_shared<wangle::NamedThreadFactory>("IO Thread"));
125     }
126
127     // TODO better config checking
128     // CHECK(acceptorFactory_ || childPipelineFactory_);
129     CHECK(!(acceptorFactory_ && childPipelineFactory_));
130
131     if (acceptorFactory_) {
132       workerFactory_ = std::make_shared<ServerWorkerPool>(
133         acceptorFactory_, io_group.get(), sockets_, socketFactory_);
134     } else {
135       workerFactory_ = std::make_shared<ServerWorkerPool>(
136         std::make_shared<ServerAcceptorFactory<Pipeline>>(
137           childPipelineFactory_,
138           pipeline_),
139         io_group.get(), sockets_, socketFactory_);
140     }
141
142     io_group->addObserver(workerFactory_);
143
144     acceptor_group_ = accept_group;
145     io_group_ = io_group;
146
147     return this;
148   }
149
150   /*
151    * Bind to an existing socket
152    *
153    * @param sock Existing socket to use for accepting
154    */
155   void bind(folly::AsyncServerSocket::UniquePtr s) {
156     if (!workerFactory_) {
157       group(nullptr);
158     }
159
160     // Since only a single socket is given,
161     // we can only accept on a single thread
162     CHECK(acceptor_group_->numThreads() == 1);
163
164     std::shared_ptr<folly::AsyncServerSocket> socket(
165       s.release(), DelayedDestruction::Destructor());
166
167     folly::Baton<> barrier;
168     acceptor_group_->add([&](){
169       socket->attachEventBase(EventBaseManager::get()->getEventBase());
170       socket->listen(socketConfig.acceptBacklog);
171       socket->startAccepting();
172       barrier.post();
173     });
174     barrier.wait();
175
176     // Startup all the threads
177     workerFactory_->forEachWorker([this, socket](Acceptor* worker){
178       socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
179         [this, worker, socket](){
180           socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
181       });
182     });
183
184     sockets_->push_back(socket);
185   }
186
187   void bind(folly::SocketAddress& address) {
188     bindImpl(-1, address);
189   }
190
191   /*
192    * Bind to a port and start listening.
193    * One of childPipeline or childHandler must be called before bind
194    *
195    * @param port Port to listen on
196    */
197   void bind(int port) {
198     CHECK(port >= 0);
199     folly::SocketAddress address;
200     bindImpl(port, address);
201   }
202
203   void bindImpl(int port, folly::SocketAddress& address) {
204     if (!workerFactory_) {
205       group(nullptr);
206     }
207
208     bool reusePort = false;
209     if (acceptor_group_->numThreads() > 1) {
210       reusePort = true;
211     }
212
213     std::mutex sock_lock;
214     std::vector<std::shared_ptr<folly::AsyncSocketBase>> new_sockets;
215
216
217     std::exception_ptr exn;
218
219     auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier){
220
221       try {
222         auto socket = socketFactory_->newSocket(
223           port, address, socketConfig.acceptBacklog, reusePort, socketConfig);
224
225         sock_lock.lock();
226         new_sockets.push_back(socket);
227         sock_lock.unlock();
228
229         if (port <= 0) {
230           socket->getAddress(&address);
231           port = address.getPort();
232         }
233
234         barrier->post();
235       } catch (...) {
236         exn = std::current_exception();
237         barrier->post();
238
239         return;
240       }
241
242
243
244     };
245
246     auto wait0 = std::make_shared<folly::Baton<>>();
247     acceptor_group_->add(std::bind(startupFunc, wait0));
248     wait0->wait();
249
250     for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
251       auto barrier = std::make_shared<folly::Baton<>>();
252       acceptor_group_->add(std::bind(startupFunc, barrier));
253       barrier->wait();
254     }
255
256     if (exn) {
257       std::rethrow_exception(exn);
258     }
259
260     for (auto& socket : new_sockets) {
261       // Startup all the threads
262       workerFactory_->forEachWorker([this, socket](Acceptor* worker){
263         socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
264           [this, worker, socket](){
265             socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
266         });
267       });
268
269       sockets_->push_back(socket);
270     }
271   }
272
273   /*
274    * Stop listening on all sockets.
275    */
276   void stop() {
277     // sockets_ may be null if ServerBootstrap has been std::move'd
278     if (sockets_) {
279       for (auto socket : *sockets_) {
280         socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
281           [&]() mutable {
282             socketFactory_->stopSocket(socket);
283           });
284       }
285       sockets_->clear();
286     }
287     if (!stopped_) {
288       stopped_ = true;
289       // stopBaton_ may be null if ServerBootstrap has been std::move'd
290       if (stopBaton_) {
291         stopBaton_->post();
292       }
293     }
294   }
295
296   void join() {
297     if (acceptor_group_) {
298       acceptor_group_->join();
299     }
300     if (io_group_) {
301       io_group_->join();
302     }
303   }
304
305   void waitForStop() {
306     if (!stopped_) {
307       CHECK(stopBaton_);
308       stopBaton_->wait();
309     }
310   }
311
312   /*
313    * Get the list of listening sockets
314    */
315   const std::vector<std::shared_ptr<folly::AsyncSocketBase>>&
316   getSockets() const {
317     return *sockets_;
318   }
319
320   std::shared_ptr<wangle::IOThreadPoolExecutor> getIOGroup() const {
321     return io_group_;
322   }
323
324   template <typename F>
325   void forEachWorker(F&& f) const {
326     workerFactory_->forEachWorker(f);
327   }
328
329   ServerSocketConfig socketConfig;
330
331  private:
332   std::shared_ptr<wangle::IOThreadPoolExecutor> acceptor_group_;
333   std::shared_ptr<wangle::IOThreadPoolExecutor> io_group_;
334
335   std::shared_ptr<ServerWorkerPool> workerFactory_;
336   std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets_{
337     std::make_shared<std::vector<std::shared_ptr<folly::AsyncSocketBase>>>()};
338
339   std::shared_ptr<AcceptorFactory> acceptorFactory_;
340   std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
341   std::shared_ptr<PipelineFactory<AcceptPipeline>> pipeline_{
342     std::make_shared<DefaultAcceptPipelineFactory>()};
343   std::shared_ptr<ServerSocketFactory> socketFactory_{
344     std::make_shared<AsyncServerSocketFactory>()};
345
346   std::unique_ptr<folly::Baton<>> stopBaton_{
347     folly::make_unique<folly::Baton<>>()};
348   bool stopped_{false};
349 };
350
351 } // namespace