0a81be99cadf66b6c92dfb098a53aef385861885
[folly.git] / folly / wangle / channel / Pipeline.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/futures/Future.h>
20 #include <folly/futures/Unit.h>
21 #include <folly/io/async/AsyncTransport.h>
22 #include <folly/io/async/DelayedDestruction.h>
23 #include <folly/wangle/channel/HandlerContext.h>
24 #include <folly/ExceptionWrapper.h>
25 #include <folly/Memory.h>
26
27 namespace folly { namespace wangle {
28
29 class PipelineBase;
30
31 class PipelineManager {
32  public:
33   virtual ~PipelineManager() = default;
34   virtual void deletePipeline(PipelineBase* pipeline) = 0;
35 };
36
37 class PipelineBase : public DelayedDestruction {
38  public:
39   virtual ~PipelineBase() = default;
40
41   void setPipelineManager(PipelineManager* manager) {
42     manager_ = manager;
43   }
44
45   void deletePipeline() {
46     if (manager_) {
47       manager_->deletePipeline(this);
48     }
49   }
50
51   void setTransport(std::shared_ptr<AsyncTransport> transport) {
52     transport_ = transport;
53   }
54
55   std::shared_ptr<AsyncTransport> getTransport() {
56     return transport_;
57   }
58
59   void setWriteFlags(WriteFlags flags);
60   WriteFlags getWriteFlags();
61
62   void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize);
63   std::pair<uint64_t, uint64_t> getReadBufferSettings();
64
65   template <class H>
66   PipelineBase& addBack(std::shared_ptr<H> handler);
67
68   template <class H>
69   PipelineBase& addBack(H&& handler);
70
71   template <class H>
72   PipelineBase& addBack(H* handler);
73
74   template <class H>
75   PipelineBase& addFront(std::shared_ptr<H> handler);
76
77   template <class H>
78   PipelineBase& addFront(H&& handler);
79
80   template <class H>
81   PipelineBase& addFront(H* handler);
82
83   template <class H>
84   PipelineBase& remove(H* handler);
85
86   template <class H>
87   PipelineBase& remove();
88
89   PipelineBase& removeFront();
90
91   PipelineBase& removeBack();
92
93   template <class H>
94   H* getHandler(int i);
95
96   // If one of the handlers owns the pipeline itself, use setOwner to ensure
97   // that the pipeline doesn't try to detach the handler during destruction,
98   // lest destruction ordering issues occur.
99   // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
100   template <class H>
101   bool setOwner(H* handler);
102
103   virtual void finalize() = 0;
104
105  protected:
106   template <class Context>
107   void addContextFront(Context* ctx);
108
109   void detachHandlers();
110
111   std::vector<std::shared_ptr<PipelineContext>> ctxs_;
112   std::vector<PipelineContext*> inCtxs_;
113   std::vector<PipelineContext*> outCtxs_;
114
115  private:
116   PipelineManager* manager_{nullptr};
117   std::shared_ptr<AsyncTransport> transport_;
118
119   template <class Context>
120   PipelineBase& addHelper(std::shared_ptr<Context>&& ctx, bool front);
121
122   template <class H>
123   PipelineBase& removeHelper(H* handler, bool checkEqual);
124
125   typedef std::vector<std::shared_ptr<PipelineContext>>::iterator
126     ContextIterator;
127
128   ContextIterator removeAt(const ContextIterator& it);
129
130   WriteFlags writeFlags_{WriteFlags::NONE};
131   std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
132
133   std::shared_ptr<PipelineContext> owner_;
134 };
135
136 /*
137  * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
138  * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
139  *
140  * Use Unit for one of the types if your pipeline is unidirectional.
141  * If R is Unit, read(), readEOF(), and readException() will be disabled.
142  * If W is Unit, write() and close() will be disabled.
143  */
144 template <class R, class W = Unit>
145 class Pipeline : public PipelineBase {
146  public:
147   Pipeline();
148   ~Pipeline();
149
150   template <class T = R>
151   typename std::enable_if<!std::is_same<T, Unit>::value>::type
152   read(R msg);
153
154   template <class T = R>
155   typename std::enable_if<!std::is_same<T, Unit>::value>::type
156   readEOF();
157
158   template <class T = R>
159   typename std::enable_if<!std::is_same<T, Unit>::value>::type
160   readException(exception_wrapper e);
161
162   template <class T = R>
163   typename std::enable_if<!std::is_same<T, Unit>::value>::type
164   transportActive();
165
166   template <class T = R>
167   typename std::enable_if<!std::is_same<T, Unit>::value>::type
168   transportInactive();
169
170   template <class T = W>
171   typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
172   write(W msg);
173
174   template <class T = W>
175   typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
176   close();
177
178   void finalize() override;
179
180  protected:
181   explicit Pipeline(bool isStatic);
182
183  private:
184   bool isStatic_{false};
185
186   InboundLink<R>* front_{nullptr};
187   OutboundLink<W>* back_{nullptr};
188 };
189
190 }}
191
192 namespace folly {
193
194 class AsyncSocket;
195
196 template <typename Pipeline>
197 class PipelineFactory {
198  public:
199   virtual std::unique_ptr<Pipeline, folly::DelayedDestruction::Destructor>
200   newPipeline(std::shared_ptr<AsyncSocket>) = 0;
201
202   virtual ~PipelineFactory() = default;
203 };
204
205 }
206
207 #include <folly/wangle/channel/Pipeline-inl.h>