move wangle/futures to futures
[folly.git] / folly / wangle / channel / ChannelHandlerContext.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/io/async/AsyncTransport.h>
20 #include <folly/futures/Future.h>
21 #include <folly/ExceptionWrapper.h>
22
23 namespace folly { namespace wangle {
24
25 template <class In, class Out>
26 class ChannelHandlerContext {
27  public:
28   virtual ~ChannelHandlerContext() {}
29
30   virtual void fireRead(In msg) = 0;
31   virtual void fireReadEOF() = 0;
32   virtual void fireReadException(exception_wrapper e) = 0;
33
34   virtual Future<void> fireWrite(Out msg) = 0;
35   virtual Future<void> fireClose() = 0;
36
37   virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
38
39   virtual void setWriteFlags(WriteFlags flags) = 0;
40   virtual WriteFlags getWriteFlags() = 0;
41
42   virtual void setReadBufferSettings(
43       uint64_t minAvailable,
44       uint64_t allocationSize) = 0;
45   virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0;
46
47   /* TODO
48   template <class H>
49   virtual void addHandlerBefore(H&&) {}
50   template <class H>
51   virtual void addHandlerAfter(H&&) {}
52   template <class H>
53   virtual void replaceHandler(H&&) {}
54   virtual void removeHandler() {}
55   */
56 };
57
58 class PipelineContext {
59  public:
60   virtual ~PipelineContext() {}
61
62   virtual void attachTransport() = 0;
63   virtual void detachTransport() = 0;
64
65   void link(PipelineContext* other) {
66     setNextIn(other);
67     other->setNextOut(this);
68   }
69
70  protected:
71   virtual void setNextIn(PipelineContext* ctx) = 0;
72   virtual void setNextOut(PipelineContext* ctx) = 0;
73 };
74
75 template <class In>
76 class InboundChannelHandlerContext {
77  public:
78   virtual ~InboundChannelHandlerContext() {}
79   virtual void read(In msg) = 0;
80   virtual void readEOF() = 0;
81   virtual void readException(exception_wrapper e) = 0;
82 };
83
84 template <class Out>
85 class OutboundChannelHandlerContext {
86  public:
87   virtual ~OutboundChannelHandlerContext() {}
88   virtual Future<void> write(Out msg) = 0;
89   virtual Future<void> close() = 0;
90 };
91
92 template <class P, class H>
93 class ContextImpl : public ChannelHandlerContext<typename H::rout,
94                                                  typename H::wout>,
95                     public InboundChannelHandlerContext<typename H::rin>,
96                     public OutboundChannelHandlerContext<typename H::win>,
97                     public PipelineContext {
98  public:
99   typedef typename H::rin Rin;
100   typedef typename H::rout Rout;
101   typedef typename H::win Win;
102   typedef typename H::wout Wout;
103
104   template <class HandlerArg>
105   explicit ContextImpl(P* pipeline, HandlerArg&& handlerArg)
106     : pipeline_(pipeline),
107       handler_(std::forward<HandlerArg>(handlerArg)) {
108     handler_.attachPipeline(this);
109   }
110
111   ~ContextImpl() {
112     handler_.detachPipeline(this);
113   }
114
115   H* getHandler() {
116     return &handler_;
117   }
118
119   // PipelineContext overrides
120   void setNextIn(PipelineContext* ctx) override {
121     auto nextIn = dynamic_cast<InboundChannelHandlerContext<Rout>*>(ctx);
122     if (nextIn) {
123       nextIn_ = nextIn;
124     } else {
125       throw std::invalid_argument("wrong type in setNextIn");
126     }
127   }
128
129   void setNextOut(PipelineContext* ctx) override {
130     auto nextOut = dynamic_cast<OutboundChannelHandlerContext<Wout>*>(ctx);
131     if (nextOut) {
132       nextOut_ = nextOut;
133     } else {
134       throw std::invalid_argument("wrong type in setNextOut");
135     }
136   }
137
138   void attachTransport() override {
139     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
140     handler_.attachTransport(this);
141   }
142
143   void detachTransport() override {
144     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
145     handler_.detachTransport(this);
146   }
147
148   // ChannelHandlerContext overrides
149   void fireRead(Rout msg) override {
150     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
151     if (nextIn_) {
152       nextIn_->read(std::forward<Rout>(msg));
153     } else {
154       LOG(WARNING) << "read reached end of pipeline";
155     }
156   }
157
158   void fireReadEOF() override {
159     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
160     if (nextIn_) {
161       nextIn_->readEOF();
162     } else {
163       LOG(WARNING) << "readEOF reached end of pipeline";
164     }
165   }
166
167   void fireReadException(exception_wrapper e) override {
168     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
169     if (nextIn_) {
170       nextIn_->readException(std::move(e));
171     } else {
172       LOG(WARNING) << "readException reached end of pipeline";
173     }
174   }
175
176   Future<void> fireWrite(Wout msg) override {
177     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
178     if (nextOut_) {
179       return nextOut_->write(std::forward<Wout>(msg));
180     } else {
181       LOG(WARNING) << "write reached end of pipeline";
182       return makeFuture();
183     }
184   }
185
186   Future<void> fireClose() override {
187     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
188     if (nextOut_) {
189       return nextOut_->close();
190     } else {
191       LOG(WARNING) << "close reached end of pipeline";
192       return makeFuture();
193     }
194   }
195
196   std::shared_ptr<AsyncTransport> getTransport() override {
197     return pipeline_->getTransport();
198   }
199
200   void setWriteFlags(WriteFlags flags) override {
201     pipeline_->setWriteFlags(flags);
202   }
203
204   WriteFlags getWriteFlags() override {
205     return pipeline_->getWriteFlags();
206   }
207
208   void setReadBufferSettings(
209       uint64_t minAvailable,
210       uint64_t allocationSize) override {
211     pipeline_->setReadBufferSettings(minAvailable, allocationSize);
212   }
213
214   std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
215     return pipeline_->getReadBufferSettings();
216   }
217
218   // InboundChannelHandlerContext overrides
219   void read(Rin msg) override {
220     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
221     handler_.read(this, std::forward<Rin>(msg));
222   }
223
224   void readEOF() override {
225     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
226     handler_.readEOF(this);
227   }
228
229   void readException(exception_wrapper e) override {
230     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
231     handler_.readException(this, std::move(e));
232   }
233
234   // OutboundChannelHandlerContext overrides
235   Future<void> write(Win msg) override {
236     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
237     return handler_.write(this, std::forward<Win>(msg));
238   }
239
240   Future<void> close() override {
241     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
242     return handler_.close(this);
243   }
244
245  private:
246   P* pipeline_;
247   H handler_;
248   InboundChannelHandlerContext<Rout>* nextIn_{nullptr};
249   OutboundChannelHandlerContext<Wout>* nextOut_{nullptr};
250 };
251
252 }}