fix service memory leak
[folly.git] / folly / wangle / bootstrap / ServerBootstrap-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/wangle/acceptor/Acceptor.h>
19 #include <folly/io/async/EventBaseManager.h>
20 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
21 #include <folly/wangle/acceptor/ManagedConnection.h>
22 #include <folly/wangle/channel/ChannelPipeline.h>
23
24 namespace folly {
25
26 template <typename Pipeline>
27 class ServerAcceptor : public Acceptor {
28   typedef std::unique_ptr<Pipeline,
29                           folly::DelayedDestruction::Destructor> PipelinePtr;
30
31   class ServerConnection : public wangle::ManagedConnection {
32    public:
33     explicit ServerConnection(PipelinePtr pipeline)
34         : pipeline_(std::move(pipeline)) {}
35
36     ~ServerConnection() {
37     }
38
39     void timeoutExpired() noexcept {
40     }
41
42     void describe(std::ostream& os) const {}
43     bool isBusy() const {
44       return false;
45     }
46     void notifyPendingShutdown() {}
47     void closeWhenIdle() {}
48     void dropConnection() {
49       delete this;
50     }
51     void dumpConnectionState(uint8_t loglevel) {}
52    private:
53     PipelinePtr pipeline_;
54   };
55
56  public:
57   explicit ServerAcceptor(
58     std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
59     EventBase* base)
60       : Acceptor(ServerSocketConfig())
61       , pipelineFactory_(pipelineFactory) {
62     Acceptor::init(nullptr, base);
63   }
64
65   /* See Acceptor::onNewConnection for details */
66   void onNewConnection(
67     AsyncSocket::UniquePtr transport, const SocketAddress* address,
68     const std::string& nextProtocolName, const TransportInfo& tinfo) {
69
70       std::unique_ptr<Pipeline,
71                        folly::DelayedDestruction::Destructor>
72       pipeline(pipelineFactory_->newPipeline(
73         std::shared_ptr<AsyncSocket>(
74           transport.release(),
75           folly::DelayedDestruction::Destructor())));
76     auto connection = new ServerConnection(std::move(pipeline));
77     Acceptor::addConnection(connection);
78   }
79
80  private:
81   std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
82 };
83
84 template <typename Pipeline>
85 class ServerAcceptorFactory : public AcceptorFactory {
86  public:
87   explicit ServerAcceptorFactory(
88       std::shared_ptr<PipelineFactory<Pipeline>> factory)
89     : factory_(factory) {}
90
91   std::shared_ptr<Acceptor> newAcceptor(folly::EventBase* base) {
92     return std::make_shared<ServerAcceptor<Pipeline>>(factory_, base);
93   }
94  private:
95   std::shared_ptr<PipelineFactory<Pipeline>> factory_;
96 };
97
98 class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
99  public:
100   explicit ServerWorkerPool(
101     std::shared_ptr<AcceptorFactory> acceptorFactory,
102     folly::wangle::IOThreadPoolExecutor* exec,
103     std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets)
104       : acceptorFactory_(acceptorFactory)
105       , exec_(exec)
106       , sockets_(sockets) {
107     CHECK(exec);
108   }
109
110   template <typename F>
111   void forEachWorker(F&& f) const;
112
113   void threadStarted(
114     folly::wangle::ThreadPoolExecutor::ThreadHandle*);
115   void threadStopped(
116     folly::wangle::ThreadPoolExecutor::ThreadHandle*);
117   void threadPreviouslyStarted(
118       folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
119     threadStarted(thread);
120   }
121   void threadNotYetStopped(
122       folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
123     threadStopped(thread);
124   }
125
126  private:
127   std::map<folly::wangle::ThreadPoolExecutor::ThreadHandle*,
128            std::shared_ptr<Acceptor>> workers_;
129   std::shared_ptr<AcceptorFactory> acceptorFactory_;
130   folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
131   std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets_;
132 };
133
134 template <typename F>
135 void ServerWorkerPool::forEachWorker(F&& f) const {
136   for (const auto& kv : workers_) {
137     f(kv.second.get());
138   }
139 }
140
141 } // namespace