Get *=default*ed default constructors
[folly.git] / folly / wangle / bootstrap / ServerBootstrap-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/wangle/acceptor/Acceptor.h>
19 #include <folly/wangle/bootstrap/ServerSocketFactory.h>
20 #include <folly/io/async/EventBaseManager.h>
21 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
22 #include <folly/wangle/acceptor/ManagedConnection.h>
23 #include <folly/wangle/channel/Pipeline.h>
24 #include <folly/wangle/channel/Handler.h>
25
26 namespace folly {
27
28 template <typename Pipeline>
29 class ServerAcceptor
30     : public Acceptor
31     , public folly::wangle::InboundHandler<void*> {
32   typedef std::unique_ptr<Pipeline,
33                           folly::DelayedDestruction::Destructor> PipelinePtr;
34
35   class ServerConnection : public wangle::ManagedConnection,
36                            public wangle::PipelineManager {
37    public:
38     explicit ServerConnection(PipelinePtr pipeline)
39         : pipeline_(std::move(pipeline)) {
40       pipeline_->setPipelineManager(this);
41     }
42
43     ~ServerConnection() = default;
44
45     void timeoutExpired() noexcept override {
46     }
47
48     void describe(std::ostream& os) const override {}
49     bool isBusy() const override {
50       return false;
51     }
52     void notifyPendingShutdown() override {}
53     void closeWhenIdle() override {}
54     void dropConnection() override {
55       delete this;
56     }
57     void dumpConnectionState(uint8_t loglevel) override {}
58
59     void deletePipeline(wangle::PipelineBase* p) override {
60       CHECK(p == pipeline_.get());
61       delete this;
62     }
63
64    private:
65     PipelinePtr pipeline_;
66   };
67
68  public:
69   explicit ServerAcceptor(
70         std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
71         std::shared_ptr<folly::wangle::Pipeline<void*>> acceptorPipeline,
72         EventBase* base)
73       : Acceptor(ServerSocketConfig())
74       , base_(base)
75       , childPipelineFactory_(pipelineFactory)
76       , acceptorPipeline_(acceptorPipeline) {
77     Acceptor::init(nullptr, base_);
78     CHECK(acceptorPipeline_);
79
80     acceptorPipeline_->addBack(this);
81     acceptorPipeline_->finalize();
82   }
83
84   void read(Context* ctx, void* conn) {
85     AsyncSocket::UniquePtr transport((AsyncSocket*)conn);
86       std::unique_ptr<Pipeline,
87                        folly::DelayedDestruction::Destructor>
88       pipeline(childPipelineFactory_->newPipeline(
89         std::shared_ptr<AsyncSocket>(
90           transport.release(),
91           folly::DelayedDestruction::Destructor())));
92     pipeline->transportActive();
93     auto connection = new ServerConnection(std::move(pipeline));
94     Acceptor::addConnection(connection);
95   }
96
97   /* See Acceptor::onNewConnection for details */
98   void onNewConnection(
99     AsyncSocket::UniquePtr transport, const SocketAddress* address,
100     const std::string& nextProtocolName, const TransportInfo& tinfo) {
101     acceptorPipeline_->read(transport.release());
102   }
103
104   // UDP thunk
105   void onDataAvailable(std::shared_ptr<AsyncUDPSocket> socket,
106                        const folly::SocketAddress& addr,
107                        std::unique_ptr<folly::IOBuf> buf,
108                        bool truncated) noexcept {
109     acceptorPipeline_->read(buf.release());
110   }
111
112  private:
113   EventBase* base_;
114
115   std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
116   std::shared_ptr<folly::wangle::Pipeline<void*>> acceptorPipeline_;
117 };
118
119 template <typename Pipeline>
120 class ServerAcceptorFactory : public AcceptorFactory {
121  public:
122   explicit ServerAcceptorFactory(
123     std::shared_ptr<PipelineFactory<Pipeline>> factory,
124     std::shared_ptr<PipelineFactory<folly::wangle::Pipeline<void*>>> pipeline)
125     : factory_(factory)
126     , pipeline_(pipeline) {}
127
128   std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
129     std::shared_ptr<folly::wangle::Pipeline<void*>> pipeline(
130         pipeline_->newPipeline(nullptr));
131     return std::make_shared<ServerAcceptor<Pipeline>>(factory_, pipeline, base);
132   }
133  private:
134   std::shared_ptr<PipelineFactory<Pipeline>> factory_;
135   std::shared_ptr<PipelineFactory<
136     folly::wangle::Pipeline<void*>>> pipeline_;
137 };
138
139 class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
140  public:
141   explicit ServerWorkerPool(
142     std::shared_ptr<AcceptorFactory> acceptorFactory,
143     folly::wangle::IOThreadPoolExecutor* exec,
144     std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets,
145     std::shared_ptr<ServerSocketFactory> socketFactory)
146       : acceptorFactory_(acceptorFactory)
147       , exec_(exec)
148       , sockets_(sockets)
149       , socketFactory_(socketFactory) {
150     CHECK(exec);
151   }
152
153   template <typename F>
154   void forEachWorker(F&& f) const;
155
156   void threadStarted(
157     folly::wangle::ThreadPoolExecutor::ThreadHandle*);
158   void threadStopped(
159     folly::wangle::ThreadPoolExecutor::ThreadHandle*);
160   void threadPreviouslyStarted(
161       folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
162     threadStarted(thread);
163   }
164   void threadNotYetStopped(
165       folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
166     threadStopped(thread);
167   }
168
169  private:
170   std::map<folly::wangle::ThreadPoolExecutor::ThreadHandle*,
171            std::shared_ptr<Acceptor>> workers_;
172   std::shared_ptr<AcceptorFactory> acceptorFactory_;
173   folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
174   std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets_;
175   std::shared_ptr<ServerSocketFactory> socketFactory_;
176 };
177
178 template <typename F>
179 void ServerWorkerPool::forEachWorker(F&& f) const {
180   for (const auto& kv : workers_) {
181     f(kv.second.get());
182   }
183 }
184
185 class DefaultAcceptPipelineFactory
186     : public PipelineFactory<wangle::Pipeline<void*>> {
187   typedef wangle::Pipeline<void*> AcceptPipeline;
188
189  public:
190   std::unique_ptr<AcceptPipeline, folly::DelayedDestruction::Destructor>
191     newPipeline(std::shared_ptr<AsyncSocket>) {
192
193     return std::unique_ptr<AcceptPipeline, folly::DelayedDestruction::Destructor>
194       (new AcceptPipeline);
195   }
196 };
197
198 } // namespace