unidirectional pipelines
[folly.git] / folly / wangle / bootstrap / ServerBootstrap-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/wangle/acceptor/Acceptor.h>
19 #include <folly/wangle/bootstrap/ServerSocketFactory.h>
20 #include <folly/io/async/EventBaseManager.h>
21 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
22 #include <folly/wangle/acceptor/ManagedConnection.h>
23 #include <folly/wangle/channel/Pipeline.h>
24 #include <folly/wangle/channel/Handler.h>
25
26 namespace folly {
27
28 template <typename Pipeline>
29 class ServerAcceptor
30     : public Acceptor
31     , public folly::wangle::InboundHandler<void*> {
32   typedef std::unique_ptr<Pipeline,
33                           folly::DelayedDestruction::Destructor> PipelinePtr;
34
35   class ServerConnection : public wangle::ManagedConnection {
36    public:
37     explicit ServerConnection(PipelinePtr pipeline)
38         : pipeline_(std::move(pipeline)) {}
39
40     ~ServerConnection() {
41     }
42
43     void timeoutExpired() noexcept {
44     }
45
46     void describe(std::ostream& os) const {}
47     bool isBusy() const {
48       return false;
49     }
50     void notifyPendingShutdown() {}
51     void closeWhenIdle() {}
52     void dropConnection() {
53       delete this;
54     }
55     void dumpConnectionState(uint8_t loglevel) {}
56    private:
57     PipelinePtr pipeline_;
58   };
59
60  public:
61   explicit ServerAcceptor(
62         std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
63         std::shared_ptr<folly::wangle::Pipeline<void*>> acceptorPipeline,
64         EventBase* base)
65       : Acceptor(ServerSocketConfig())
66       , base_(base)
67       , childPipelineFactory_(pipelineFactory)
68       , acceptorPipeline_(acceptorPipeline) {
69     Acceptor::init(nullptr, base_);
70     CHECK(acceptorPipeline_);
71
72     acceptorPipeline_->addBack(this);
73     acceptorPipeline_->finalize();
74   }
75
76   void read(Context* ctx, void* conn) {
77     AsyncSocket::UniquePtr transport((AsyncSocket*)conn);
78       std::unique_ptr<Pipeline,
79                        folly::DelayedDestruction::Destructor>
80       pipeline(childPipelineFactory_->newPipeline(
81         std::shared_ptr<AsyncSocket>(
82           transport.release(),
83           folly::DelayedDestruction::Destructor())));
84     auto connection = new ServerConnection(std::move(pipeline));
85     Acceptor::addConnection(connection);
86   }
87
88   /* See Acceptor::onNewConnection for details */
89   void onNewConnection(
90     AsyncSocket::UniquePtr transport, const SocketAddress* address,
91     const std::string& nextProtocolName, const TransportInfo& tinfo) {
92     acceptorPipeline_->read(transport.release());
93   }
94
95   // UDP thunk
96   void onDataAvailable(std::shared_ptr<AsyncUDPSocket> socket,
97                        const folly::SocketAddress& addr,
98                        std::unique_ptr<folly::IOBuf> buf,
99                        bool truncated) noexcept {
100     acceptorPipeline_->read(buf.release());
101   }
102
103  private:
104   EventBase* base_;
105
106   std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
107   std::shared_ptr<folly::wangle::Pipeline<void*>> acceptorPipeline_;
108 };
109
110 template <typename Pipeline>
111 class ServerAcceptorFactory : public AcceptorFactory {
112  public:
113   explicit ServerAcceptorFactory(
114     std::shared_ptr<PipelineFactory<Pipeline>> factory,
115     std::shared_ptr<PipelineFactory<folly::wangle::Pipeline<void*>>> pipeline)
116     : factory_(factory)
117     , pipeline_(pipeline) {}
118
119   std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
120     std::shared_ptr<folly::wangle::Pipeline<void*>> pipeline(
121         pipeline_->newPipeline(nullptr));
122     return std::make_shared<ServerAcceptor<Pipeline>>(factory_, pipeline, base);
123   }
124  private:
125   std::shared_ptr<PipelineFactory<Pipeline>> factory_;
126   std::shared_ptr<PipelineFactory<
127     folly::wangle::Pipeline<void*>>> pipeline_;
128 };
129
130 class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
131  public:
132   explicit ServerWorkerPool(
133     std::shared_ptr<AcceptorFactory> acceptorFactory,
134     folly::wangle::IOThreadPoolExecutor* exec,
135     std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets,
136     std::shared_ptr<ServerSocketFactory> socketFactory)
137       : acceptorFactory_(acceptorFactory)
138       , exec_(exec)
139       , sockets_(sockets)
140       , socketFactory_(socketFactory) {
141     CHECK(exec);
142   }
143
144   template <typename F>
145   void forEachWorker(F&& f) const;
146
147   void threadStarted(
148     folly::wangle::ThreadPoolExecutor::ThreadHandle*);
149   void threadStopped(
150     folly::wangle::ThreadPoolExecutor::ThreadHandle*);
151   void threadPreviouslyStarted(
152       folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
153     threadStarted(thread);
154   }
155   void threadNotYetStopped(
156       folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
157     threadStopped(thread);
158   }
159
160  private:
161   std::map<folly::wangle::ThreadPoolExecutor::ThreadHandle*,
162            std::shared_ptr<Acceptor>> workers_;
163   std::shared_ptr<AcceptorFactory> acceptorFactory_;
164   folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
165   std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets_;
166   std::shared_ptr<ServerSocketFactory> socketFactory_;
167 };
168
169 template <typename F>
170 void ServerWorkerPool::forEachWorker(F&& f) const {
171   for (const auto& kv : workers_) {
172     f(kv.second.get());
173   }
174 }
175
176 class DefaultAcceptPipelineFactory
177     : public PipelineFactory<wangle::Pipeline<void*>> {
178   typedef wangle::Pipeline<void*> AcceptPipeline;
179
180  public:
181   AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
182     return new AcceptPipeline;
183   }
184 };
185
186 } // namespace