2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/futures/Future.h>
20 #include <folly/futures/Unit.h>
21 #include <folly/io/async/AsyncTransport.h>
22 #include <folly/io/async/DelayedDestruction.h>
23 #include <folly/wangle/channel/HandlerContext.h>
24 #include <folly/ExceptionWrapper.h>
25 #include <folly/Memory.h>
27 namespace folly { namespace wangle {
31 class PipelineManager {
33 virtual ~PipelineManager() = default;
34 virtual void deletePipeline(PipelineBase* pipeline) = 0;
37 class PipelineBase : public DelayedDestruction {
39 virtual ~PipelineBase() = default;
41 void setPipelineManager(PipelineManager* manager) {
45 void deletePipeline() {
47 manager_->deletePipeline(this);
51 void setTransport(std::shared_ptr<AsyncTransport> transport) {
52 transport_ = transport;
55 std::shared_ptr<AsyncTransport> getTransport() {
59 void setWriteFlags(WriteFlags flags);
60 WriteFlags getWriteFlags();
62 void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize);
63 std::pair<uint64_t, uint64_t> getReadBufferSettings();
66 PipelineBase& addBack(std::shared_ptr<H> handler);
69 PipelineBase& addBack(H&& handler);
72 PipelineBase& addBack(H* handler);
75 PipelineBase& addFront(std::shared_ptr<H> handler);
78 PipelineBase& addFront(H&& handler);
81 PipelineBase& addFront(H* handler);
84 PipelineBase& remove(H* handler);
87 PipelineBase& remove();
89 PipelineBase& removeFront();
91 PipelineBase& removeBack();
96 // If one of the handlers owns the pipeline itself, use setOwner to ensure
97 // that the pipeline doesn't try to detach the handler during destruction,
98 // lest destruction ordering issues occur.
99 // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
101 bool setOwner(H* handler);
103 virtual void finalize() = 0;
106 template <class Context>
107 void addContextFront(Context* ctx);
109 void detachHandlers();
111 std::vector<std::shared_ptr<PipelineContext>> ctxs_;
112 std::vector<PipelineContext*> inCtxs_;
113 std::vector<PipelineContext*> outCtxs_;
116 PipelineManager* manager_{nullptr};
117 std::shared_ptr<AsyncTransport> transport_;
119 template <class Context>
120 PipelineBase& addHelper(std::shared_ptr<Context>&& ctx, bool front);
123 PipelineBase& removeHelper(H* handler, bool checkEqual);
125 typedef std::vector<std::shared_ptr<PipelineContext>>::iterator
128 ContextIterator removeAt(const ContextIterator& it);
130 WriteFlags writeFlags_{WriteFlags::NONE};
131 std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
133 std::shared_ptr<PipelineContext> owner_;
137 * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
138 * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
140 * Use Unit for one of the types if your pipeline is unidirectional.
141 * If R is Unit, read(), readEOF(), and readException() will be disabled.
142 * If W is Unit, write() and close() will be disabled.
144 template <class R, class W = Unit>
145 class Pipeline : public PipelineBase {
150 template <class T = R>
151 typename std::enable_if<!std::is_same<T, Unit>::value>::type
154 template <class T = R>
155 typename std::enable_if<!std::is_same<T, Unit>::value>::type
158 template <class T = R>
159 typename std::enable_if<!std::is_same<T, Unit>::value>::type
160 readException(exception_wrapper e);
162 template <class T = R>
163 typename std::enable_if<!std::is_same<T, Unit>::value>::type
166 template <class T = R>
167 typename std::enable_if<!std::is_same<T, Unit>::value>::type
170 template <class T = W>
171 typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
174 template <class T = W>
175 typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
178 void finalize() override;
181 explicit Pipeline(bool isStatic);
184 bool isStatic_{false};
186 InboundLink<R>* front_{nullptr};
187 OutboundLink<W>* back_{nullptr};
196 template <typename Pipeline>
197 class PipelineFactory {
199 virtual std::unique_ptr<Pipeline, folly::DelayedDestruction::Destructor>
200 newPipeline(std::shared_ptr<AsyncSocket>) = 0;
202 virtual ~PipelineFactory() = default;
207 #include <folly/wangle/channel/Pipeline-inl.h>