2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/io/async/AsyncTransport.h>
20 #include <folly/wangle/futures/Future.h>
21 #include <folly/ExceptionWrapper.h>
23 namespace folly { namespace wangle {
25 template <class In, class Out>
26 class ChannelHandlerContext {
28 virtual ~ChannelHandlerContext() {}
30 virtual void fireRead(In msg) = 0;
31 virtual void fireReadEOF() = 0;
32 virtual void fireReadException(exception_wrapper e) = 0;
34 virtual Future<void> fireWrite(Out msg) = 0;
35 virtual Future<void> fireClose() = 0;
37 virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
39 virtual void setWriteFlags(WriteFlags flags) = 0;
40 virtual WriteFlags getWriteFlags() = 0;
42 virtual void setReadBufferSettings(
43 uint64_t minAvailable,
44 uint64_t allocationSize) = 0;
45 virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0;
49 virtual void addHandlerBefore(H&&) {}
51 virtual void addHandlerAfter(H&&) {}
53 virtual void replaceHandler(H&&) {}
54 virtual void removeHandler() {}
58 class PipelineContext {
60 virtual ~PipelineContext() {}
62 virtual void attachTransport() = 0;
63 virtual void detachTransport() = 0;
65 void link(PipelineContext* other) {
67 other->setNextOut(this);
71 virtual void setNextIn(PipelineContext* ctx) = 0;
72 virtual void setNextOut(PipelineContext* ctx) = 0;
76 class InboundChannelHandlerContext {
78 virtual ~InboundChannelHandlerContext() {}
79 virtual void read(In msg) = 0;
80 virtual void readEOF() = 0;
81 virtual void readException(exception_wrapper e) = 0;
85 class OutboundChannelHandlerContext {
87 virtual ~OutboundChannelHandlerContext() {}
88 virtual Future<void> write(Out msg) = 0;
89 virtual Future<void> close() = 0;
92 template <class P, class H>
93 class ContextImpl : public ChannelHandlerContext<typename H::rout,
95 public InboundChannelHandlerContext<typename H::rin>,
96 public OutboundChannelHandlerContext<typename H::win>,
97 public PipelineContext {
99 typedef typename H::rin Rin;
100 typedef typename H::rout Rout;
101 typedef typename H::win Win;
102 typedef typename H::wout Wout;
104 template <class HandlerArg>
105 explicit ContextImpl(P* pipeline, HandlerArg&& handlerArg)
106 : pipeline_(pipeline),
107 handler_(std::forward<HandlerArg>(handlerArg)) {
108 handler_.attachPipeline(this);
112 handler_.detachPipeline(this);
119 // PipelineContext overrides
120 void setNextIn(PipelineContext* ctx) override {
121 auto nextIn = dynamic_cast<InboundChannelHandlerContext<Rout>*>(ctx);
125 throw std::invalid_argument("wrong type in setNextIn");
129 void setNextOut(PipelineContext* ctx) override {
130 auto nextOut = dynamic_cast<OutboundChannelHandlerContext<Wout>*>(ctx);
134 throw std::invalid_argument("wrong type in setNextOut");
138 void attachTransport() override {
139 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
140 handler_.attachTransport(this);
143 void detachTransport() override {
144 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
145 handler_.detachTransport(this);
148 // ChannelHandlerContext overrides
149 void fireRead(Rout msg) override {
150 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
152 nextIn_->read(std::forward<Rout>(msg));
154 LOG(WARNING) << "read reached end of pipeline";
158 void fireReadEOF() override {
159 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
163 LOG(WARNING) << "readEOF reached end of pipeline";
167 void fireReadException(exception_wrapper e) override {
168 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
170 nextIn_->readException(std::move(e));
172 LOG(WARNING) << "readException reached end of pipeline";
176 Future<void> fireWrite(Wout msg) override {
177 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
179 return nextOut_->write(std::forward<Wout>(msg));
181 LOG(WARNING) << "write reached end of pipeline";
186 Future<void> fireClose() override {
187 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
189 return nextOut_->close();
191 LOG(WARNING) << "close reached end of pipeline";
196 std::shared_ptr<AsyncTransport> getTransport() override {
197 return pipeline_->getTransport();
200 void setWriteFlags(WriteFlags flags) override {
201 pipeline_->setWriteFlags(flags);
204 WriteFlags getWriteFlags() override {
205 return pipeline_->getWriteFlags();
208 void setReadBufferSettings(
209 uint64_t minAvailable,
210 uint64_t allocationSize) override {
211 pipeline_->setReadBufferSettings(minAvailable, allocationSize);
214 std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
215 return pipeline_->getReadBufferSettings();
218 // InboundChannelHandlerContext overrides
219 void read(Rin msg) override {
220 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
221 handler_.read(this, std::forward<Rin>(msg));
224 void readEOF() override {
225 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
226 handler_.readEOF(this);
229 void readException(exception_wrapper e) override {
230 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
231 handler_.readException(this, std::move(e));
234 // OutboundChannelHandlerContext overrides
235 Future<void> write(Win msg) override {
236 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
237 return handler_.write(this, std::forward<Win>(msg));
240 Future<void> close() override {
241 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
242 return handler_.close(this);
248 InboundChannelHandlerContext<Rout>* nextIn_{nullptr};
249 OutboundChannelHandlerContext<Wout>* nextOut_{nullptr};