pipeline handler removal, fix service test
[folly.git] / folly / wangle / channel / Pipeline.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/wangle/channel/HandlerContext.h>
20 #include <folly/futures/Future.h>
21 #include <folly/io/async/AsyncTransport.h>
22 #include <folly/io/async/DelayedDestruction.h>
23 #include <folly/ExceptionWrapper.h>
24 #include <folly/Memory.h>
25
26 namespace folly { namespace wangle {
27
28 class PipelineManager {
29  public:
30   virtual ~PipelineManager() = default;
31   virtual void deletePipeline(PipelineBase* pipeline) = 0;
32 };
33
34 class PipelineBase : public DelayedDestruction {
35  public:
36   virtual ~PipelineBase() = default;
37
38   void setPipelineManager(PipelineManager* manager) {
39     manager_ = manager;
40   }
41
42   void deletePipeline() {
43     if (manager_) {
44       manager_->deletePipeline(this);
45     }
46   }
47
48   void setTransport(std::shared_ptr<AsyncTransport> transport) {
49     transport_ = transport;
50   }
51
52   std::shared_ptr<AsyncTransport> getTransport() {
53     return transport_;
54   }
55
56  private:
57   PipelineManager* manager_{nullptr};
58   std::shared_ptr<AsyncTransport> transport_;
59 };
60
61 struct Nothing{};
62
63 /*
64  * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
65  * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
66  *
67  * Use Nothing for one of the types if your pipeline is unidirectional.
68  * If R is Nothing, read(), readEOF(), and readException() will be disabled.
69  * If W is Nothing, write() and close() will be disabled.
70  */
71 template <class R, class W = Nothing>
72 class Pipeline : public PipelineBase {
73  public:
74   Pipeline();
75   ~Pipeline();
76
77   void setWriteFlags(WriteFlags flags);
78   WriteFlags getWriteFlags();
79
80   void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize);
81   std::pair<uint64_t, uint64_t> getReadBufferSettings();
82
83   template <class T = R>
84   typename std::enable_if<!std::is_same<T, Nothing>::value>::type
85   read(R msg);
86
87   template <class T = R>
88   typename std::enable_if<!std::is_same<T, Nothing>::value>::type
89   readEOF();
90
91   template <class T = R>
92   typename std::enable_if<!std::is_same<T, Nothing>::value>::type
93   readException(exception_wrapper e);
94
95   template <class T = R>
96   typename std::enable_if<!std::is_same<T, Nothing>::value>::type
97   transportActive();
98
99   template <class T = R>
100   typename std::enable_if<!std::is_same<T, Nothing>::value>::type
101   transportInactive();
102
103   template <class T = W>
104   typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
105   write(W msg);
106
107   template <class T = W>
108   typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
109   close();
110
111   template <class H>
112   Pipeline& addBack(std::shared_ptr<H> handler);
113
114   template <class H>
115   Pipeline& addBack(H&& handler);
116
117   template <class H>
118   Pipeline& addBack(H* handler);
119
120   template <class H>
121   Pipeline& addFront(std::shared_ptr<H> handler);
122
123   template <class H>
124   Pipeline& addFront(H&& handler);
125
126   template <class H>
127   Pipeline& addFront(H* handler);
128
129   template <class H>
130   Pipeline& remove(H* handler);
131
132   template <class H>
133   Pipeline& remove();
134
135   Pipeline& removeFront();
136
137   Pipeline& removeBack();
138
139   template <class H>
140   H* getHandler(int i);
141
142   void finalize();
143
144   // If one of the handlers owns the pipeline itself, use setOwner to ensure
145   // that the pipeline doesn't try to detach the handler during destruction,
146   // lest destruction ordering issues occur.
147   // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
148   template <class H>
149   bool setOwner(H* handler);
150
151  protected:
152   explicit Pipeline(bool isStatic);
153
154   template <class Context>
155   void addContextFront(Context* ctx);
156
157   void detachHandlers();
158
159  private:
160   template <class Context>
161   Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
162
163   template <class H>
164   Pipeline& removeHelper(H* handler, bool checkEqual);
165
166   typedef std::vector<std::shared_ptr<PipelineContext>>::iterator
167     ContextIterator;
168
169   ContextIterator removeAt(const ContextIterator& it);
170
171   WriteFlags writeFlags_{WriteFlags::NONE};
172   std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
173
174   bool isStatic_{false};
175   std::shared_ptr<PipelineContext> owner_;
176   std::vector<std::shared_ptr<PipelineContext>> ctxs_;
177   std::vector<PipelineContext*> inCtxs_;
178   std::vector<PipelineContext*> outCtxs_;
179   InboundLink<R>* front_{nullptr};
180   OutboundLink<W>* back_{nullptr};
181 };
182
183 }}
184
185 namespace folly {
186
187 class AsyncSocket;
188
189 template <typename Pipeline>
190 class PipelineFactory {
191  public:
192   virtual std::unique_ptr<Pipeline, folly::DelayedDestruction::Destructor>
193   newPipeline(std::shared_ptr<AsyncSocket>) = 0;
194
195   virtual ~PipelineFactory() = default;
196 };
197
198 }
199
200 #include <folly/wangle/channel/Pipeline-inl.h>