Copyright 2014->2015
[folly.git] / folly / wangle / bootstrap / ServerBootstrap-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/wangle/acceptor/Acceptor.h>
19 #include <folly/io/async/EventBaseManager.h>
20 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
21 #include <folly/wangle/acceptor/ManagedConnection.h>
22 #include <folly/wangle/channel/ChannelPipeline.h>
23
24 namespace folly {
25
26 template <typename Pipeline>
27 class ServerAcceptor : public Acceptor {
28   typedef std::unique_ptr<Pipeline,
29                           folly::DelayedDestruction::Destructor> PipelinePtr;
30
31   class ServerConnection : public wangle::ManagedConnection {
32    public:
33     explicit ServerConnection(PipelinePtr pipeline)
34         : pipeline_(std::move(pipeline)) {}
35
36     ~ServerConnection() {
37     }
38
39     void timeoutExpired() noexcept {
40     }
41
42     void describe(std::ostream& os) const {}
43     bool isBusy() const {
44       return false;
45     }
46     void notifyPendingShutdown() {}
47     void closeWhenIdle() {}
48     void dropConnection() {}
49     void dumpConnectionState(uint8_t loglevel) {}
50    private:
51     PipelinePtr pipeline_;
52   };
53
54  public:
55   explicit ServerAcceptor(
56     std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
57     EventBase* base)
58       : Acceptor(ServerSocketConfig())
59       , pipelineFactory_(pipelineFactory) {
60     Acceptor::init(nullptr, base);
61   }
62
63   /* See Acceptor::onNewConnection for details */
64   void onNewConnection(
65     AsyncSocket::UniquePtr transport, const SocketAddress* address,
66     const std::string& nextProtocolName, const TransportInfo& tinfo) {
67
68       std::unique_ptr<Pipeline,
69                        folly::DelayedDestruction::Destructor>
70       pipeline(pipelineFactory_->newPipeline(
71         std::shared_ptr<AsyncSocket>(
72           transport.release(),
73           folly::DelayedDestruction::Destructor())));
74     auto connection = new ServerConnection(std::move(pipeline));
75     Acceptor::addConnection(connection);
76   }
77
78  private:
79   std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
80 };
81
82 template <typename Pipeline>
83 class ServerAcceptorFactory : public AcceptorFactory {
84  public:
85   explicit ServerAcceptorFactory(
86       std::shared_ptr<PipelineFactory<Pipeline>> factory)
87     : factory_(factory) {}
88
89   std::shared_ptr<Acceptor> newAcceptor(folly::EventBase* base) {
90     return std::make_shared<ServerAcceptor<Pipeline>>(factory_, base);
91   }
92  private:
93   std::shared_ptr<PipelineFactory<Pipeline>> factory_;
94 };
95
96 class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
97  public:
98   explicit ServerWorkerPool(
99     std::shared_ptr<AcceptorFactory> acceptorFactory,
100     folly::wangle::IOThreadPoolExecutor* exec,
101     std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets)
102       : acceptorFactory_(acceptorFactory)
103       , exec_(exec)
104       , sockets_(sockets) {
105     CHECK(exec);
106   }
107
108   template <typename F>
109   void forEachWorker(F&& f) const;
110
111   void threadStarted(
112     folly::wangle::ThreadPoolExecutor::ThreadHandle*);
113   void threadStopped(
114     folly::wangle::ThreadPoolExecutor::ThreadHandle*);
115   void threadPreviouslyStarted(
116       folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
117     threadStarted(thread);
118   }
119   void threadNotYetStopped(
120       folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
121     threadStopped(thread);
122   }
123
124  private:
125   std::map<folly::wangle::ThreadPoolExecutor::ThreadHandle*,
126            std::shared_ptr<Acceptor>> workers_;
127   std::shared_ptr<AcceptorFactory> acceptorFactory_;
128   folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
129   std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets_;
130 };
131
132 template <typename F>
133 void ServerWorkerPool::forEachWorker(F&& f) const {
134   for (const auto& kv : workers_) {
135     f(kv.second.get());
136   }
137 }
138
139 } // namespace