2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/wangle/channel/HandlerContext.h>
20 #include <folly/futures/Future.h>
21 #include <folly/io/async/AsyncTransport.h>
22 #include <folly/io/async/DelayedDestruction.h>
23 #include <folly/ExceptionWrapper.h>
24 #include <folly/Memory.h>
26 namespace folly { namespace wangle {
28 class PipelineManager {
30 virtual ~PipelineManager() = default;
31 virtual void deletePipeline(PipelineBase* pipeline) = 0;
34 class PipelineBase : public DelayedDestruction {
36 virtual ~PipelineBase() = default;
38 void setPipelineManager(PipelineManager* manager) {
42 void deletePipeline() {
44 manager_->deletePipeline(this);
48 void setTransport(std::shared_ptr<AsyncTransport> transport) {
49 transport_ = transport;
52 std::shared_ptr<AsyncTransport> getTransport() {
57 PipelineManager* manager_{nullptr};
58 std::shared_ptr<AsyncTransport> transport_;
64 * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
65 * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
67 * Use Nothing for one of the types if your pipeline is unidirectional.
68 * If R is Nothing, read(), readEOF(), and readException() will be disabled.
69 * If W is Nothing, write() and close() will be disabled.
71 template <class R, class W = Nothing>
72 class Pipeline : public PipelineBase {
77 void setWriteFlags(WriteFlags flags);
78 WriteFlags getWriteFlags();
80 void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize);
81 std::pair<uint64_t, uint64_t> getReadBufferSettings();
83 template <class T = R>
84 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
87 template <class T = R>
88 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
91 template <class T = R>
92 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
93 readException(exception_wrapper e);
95 template <class T = R>
96 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
99 template <class T = R>
100 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
103 template <class T = W>
104 typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
107 template <class T = W>
108 typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
112 Pipeline& addBack(std::shared_ptr<H> handler);
115 Pipeline& addBack(H&& handler);
118 Pipeline& addBack(H* handler);
121 Pipeline& addFront(std::shared_ptr<H> handler);
124 Pipeline& addFront(H&& handler);
127 Pipeline& addFront(H* handler);
130 Pipeline& remove(H* handler);
135 Pipeline& removeFront();
137 Pipeline& removeBack();
140 H* getHandler(int i);
144 // If one of the handlers owns the pipeline itself, use setOwner to ensure
145 // that the pipeline doesn't try to detach the handler during destruction,
146 // lest destruction ordering issues occur.
147 // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
149 bool setOwner(H* handler);
152 explicit Pipeline(bool isStatic);
154 template <class Context>
155 void addContextFront(Context* ctx);
157 void detachHandlers();
160 template <class Context>
161 Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
164 Pipeline& removeHelper(H* handler, bool checkEqual);
166 typedef std::vector<std::shared_ptr<PipelineContext>>::iterator
169 ContextIterator removeAt(const ContextIterator& it);
171 WriteFlags writeFlags_{WriteFlags::NONE};
172 std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
174 bool isStatic_{false};
175 std::shared_ptr<PipelineContext> owner_;
176 std::vector<std::shared_ptr<PipelineContext>> ctxs_;
177 std::vector<PipelineContext*> inCtxs_;
178 std::vector<PipelineContext*> outCtxs_;
179 InboundLink<R>* front_{nullptr};
180 OutboundLink<W>* back_{nullptr};
189 template <typename Pipeline>
190 class PipelineFactory {
192 virtual std::unique_ptr<Pipeline, folly::DelayedDestruction::Destructor>
193 newPipeline(std::shared_ptr<AsyncSocket>) = 0;
195 virtual ~PipelineFactory() = default;
200 #include <folly/wangle/channel/Pipeline-inl.h>