2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/experimental/wangle/channel/ChannelHandlerContext.h>
20 #include <folly/wangle/futures/Future.h>
21 #include <folly/io/async/AsyncTransport.h>
22 #include <folly/io/async/DelayedDestruction.h>
23 #include <folly/ExceptionWrapper.h>
24 #include <folly/Memory.h>
25 #include <glog/logging.h>
27 namespace folly { namespace wangle {
30 * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
31 * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
33 template <class R, class W, class... Handlers>
34 class ChannelPipeline;
36 template <class R, class W>
37 class ChannelPipeline<R, W> : public DelayedDestruction {
42 std::shared_ptr<AsyncTransport> getTransport() {
46 void setWriteFlags(WriteFlags flags) {
50 WriteFlags getWriteFlags() {
54 void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize) {
55 readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
58 std::pair<uint64_t, uint64_t> getReadBufferSettings() {
59 return readBufferSettings_;
63 front_->read(std::forward<R>(msg));
70 void readException(exception_wrapper e) {
71 front_->readException(std::move(e));
74 Future<void> write(W msg) {
75 return back_->write(std::forward<W>(msg));
78 Future<void> close() {
79 return back_->close();
83 ChannelPipeline& addBack(H&& handler) {
84 ctxs_.push_back(folly::make_unique<ContextImpl<ChannelPipeline, H>>(
85 this, std::forward<H>(handler)));
90 ChannelPipeline& addFront(H&& handler) {
93 folly::make_unique<ContextImpl<ChannelPipeline, H>>(
95 std::forward<H>(handler)));
100 H* getHandler(int i) {
101 auto ctx = dynamic_cast<ContextImpl<ChannelPipeline, H>*>(ctxs_[i].get());
103 return ctx->getHandler();
108 InboundChannelHandlerContext<R>* front;
109 front_ = dynamic_cast<InboundChannelHandlerContext<R>*>(
110 ctxs_.front().get());
112 throw std::invalid_argument("wrong type for first handler");
117 explicit ChannelPipeline(bool shouldFinalize) {
118 CHECK(!shouldFinalize);
121 void finalizeHelper() {
126 for (int i = 0; i < ctxs_.size() - 1; i++) {
127 ctxs_[i]->link(ctxs_[i+1].get());
130 back_ = dynamic_cast<OutboundChannelHandlerContext<W>*>(ctxs_.back().get());
132 throw std::invalid_argument("wrong type for last handler");
136 PipelineContext* getLocalFront() {
137 return ctxs_.empty() ? nullptr : ctxs_.front().get();
140 static const bool is_end{true};
142 std::shared_ptr<AsyncTransport> transport_;
143 WriteFlags writeFlags_{WriteFlags::NONE};
144 std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
146 void attachPipeline() {}
148 void attachTransport(
149 std::shared_ptr<AsyncTransport> transport) {
150 transport_ = std::move(transport);
153 void detachTransport() {
154 transport_ = nullptr;
157 OutboundChannelHandlerContext<W>* back_{nullptr};
160 InboundChannelHandlerContext<R>* front_{nullptr};
161 std::vector<std::unique_ptr<PipelineContext>> ctxs_;
164 template <class R, class W, class Handler, class... Handlers>
165 class ChannelPipeline<R, W, Handler, Handlers...>
166 : public ChannelPipeline<R, W, Handlers...> {
168 template <class HandlerArg, class... HandlersArgs>
171 HandlerArg&& handlerArg,
172 HandlersArgs&&... handlersArgs)
173 : ChannelPipeline<R, W, Handlers...>(
175 std::forward<HandlersArgs>(handlersArgs)...),
176 ctx_(this, std::forward<HandlerArg>(handlerArg)) {
177 if (shouldFinalize) {
183 template <class... HandlersArgs>
184 explicit ChannelPipeline(HandlersArgs&&... handlersArgs)
185 : ChannelPipeline(true, std::forward<HandlersArgs>(handlersArgs)...) {}
187 ~ChannelPipeline() {}
189 void destroy() override { }
192 typename ChannelPipeline<R, W>::DestructorGuard dg(
193 static_cast<DelayedDestruction*>(this));
194 front_->read(std::forward<R>(msg));
198 typename ChannelPipeline<R, W>::DestructorGuard dg(
199 static_cast<DelayedDestruction*>(this));
203 void readException(exception_wrapper e) {
204 typename ChannelPipeline<R, W>::DestructorGuard dg(
205 static_cast<DelayedDestruction*>(this));
206 front_->readException(std::move(e));
209 Future<void> write(W msg) {
210 typename ChannelPipeline<R, W>::DestructorGuard dg(
211 static_cast<DelayedDestruction*>(this));
212 return back_->write(std::forward<W>(msg));
215 Future<void> close() {
216 typename ChannelPipeline<R, W>::DestructorGuard dg(
217 static_cast<DelayedDestruction*>(this));
218 return back_->close();
221 void attachTransport(
222 std::shared_ptr<AsyncTransport> transport) {
223 typename ChannelPipeline<R, W>::DestructorGuard dg(
224 static_cast<DelayedDestruction*>(this));
225 CHECK((!ChannelPipeline<R, W>::transport_));
226 ChannelPipeline<R, W, Handlers...>::attachTransport(std::move(transport));
227 forEachCtx([&](PipelineContext* ctx){
228 ctx->attachTransport();
232 void detachTransport() {
233 typename ChannelPipeline<R, W>::DestructorGuard dg(
234 static_cast<DelayedDestruction*>(this));
235 ChannelPipeline<R, W, Handlers...>::detachTransport();
236 forEachCtx([&](PipelineContext* ctx){
237 ctx->detachTransport();
241 std::shared_ptr<AsyncTransport> getTransport() {
242 return ChannelPipeline<R, W>::transport_;
246 ChannelPipeline& addBack(H&& handler) {
247 ChannelPipeline<R, W>::addBack(std::move(handler));
252 ChannelPipeline& addFront(H&& handler) {
255 folly::make_unique<ContextImpl<ChannelPipeline, H>>(
257 std::move(handler)));
262 H* getHandler(size_t i) {
263 if (i > ctxs_.size()) {
264 return ChannelPipeline<R, W, Handlers...>::template getHandler<H>(
265 i - (ctxs_.size() + 1));
267 auto pctx = (i == ctxs_.size()) ? &ctx_ : ctxs_[i].get();
268 auto ctx = dynamic_cast<ContextImpl<ChannelPipeline, H>*>(pctx);
269 return ctx->getHandler();
275 auto ctx = ctxs_.empty() ? &ctx_ : ctxs_.front().get();
276 front_ = dynamic_cast<InboundChannelHandlerContext<R>*>(ctx);
278 throw std::invalid_argument("wrong type for first handler");
283 void finalizeHelper() {
284 ChannelPipeline<R, W, Handlers...>::finalizeHelper();
285 back_ = ChannelPipeline<R, W, Handlers...>::back_;
287 auto is_end = ChannelPipeline<R, W, Handlers...>::is_end;
289 back_ = dynamic_cast<OutboundChannelHandlerContext<W>*>(&ctx_);
291 throw std::invalid_argument("wrong type for last handler");
295 if (!ctxs_.empty()) {
296 for (int i = 0; i < ctxs_.size() - 1; i++) {
297 ctxs_[i]->link(ctxs_[i+1].get());
299 ctxs_.back()->link(&ctx_);
302 auto nextFront = ChannelPipeline<R, W, Handlers...>::getLocalFront();
304 ctx_.link(nextFront);
308 PipelineContext* getLocalFront() {
309 return ctxs_.empty() ? &ctx_ : ctxs_.front().get();
312 static const bool is_end{false};
313 InboundChannelHandlerContext<R>* front_{nullptr};
314 OutboundChannelHandlerContext<W>* back_{nullptr};
318 void forEachCtx(const F& func) {
319 for (auto& ctx : ctxs_) {
325 ContextImpl<ChannelPipeline, Handler> ctx_;
326 std::vector<std::unique_ptr<PipelineContext>> ctxs_;
335 template <typename Pipeline>
336 class PipelineFactory {
338 virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) = 0;
339 virtual ~PipelineFactory() {}