attempt at putting thrift2 on ChannelPipeline
[folly.git] / folly / wangle / channel / ChannelHandler.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/futures/Future.h>
20 #include <folly/wangle/channel/ChannelPipeline.h>
21 #include <folly/io/IOBuf.h>
22 #include <folly/io/IOBufQueue.h>
23
24 namespace folly { namespace wangle {
25
26 template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
27 class ChannelHandler {
28  public:
29   typedef Rin rin;
30   typedef Rout rout;
31   typedef Win win;
32   typedef Wout wout;
33   typedef ChannelHandlerContext<Rout, Wout> Context;
34   virtual ~ChannelHandler() {}
35
36   virtual void read(Context* ctx, Rin msg) = 0;
37   virtual void readEOF(Context* ctx) {
38     ctx->fireReadEOF();
39   }
40   virtual void readException(Context* ctx, exception_wrapper e) {
41     ctx->fireReadException(std::move(e));
42   }
43
44   virtual Future<void> write(Context* ctx, Win msg) = 0;
45   virtual Future<void> close(Context* ctx) {
46     return ctx->fireClose();
47   }
48
49   virtual void attachPipeline(Context* ctx) {}
50   virtual void attachTransport(Context* ctx) {}
51
52   virtual void detachPipeline(Context* ctx) {}
53   virtual void detachTransport(Context* ctx) {}
54
55   /*
56   // Other sorts of things we might want, all shamelessly stolen from Netty
57   // inbound
58   virtual void exceptionCaught(
59       ChannelHandlerContext* ctx,
60       exception_wrapper e) {}
61   virtual void channelRegistered(ChannelHandlerContext* ctx) {}
62   virtual void channelUnregistered(ChannelHandlerContext* ctx) {}
63   virtual void channelActive(ChannelHandlerContext* ctx) {}
64   virtual void channelInactive(ChannelHandlerContext* ctx) {}
65   virtual void channelReadComplete(ChannelHandlerContext* ctx) {}
66   virtual void userEventTriggered(ChannelHandlerContext* ctx, void* evt) {}
67   virtual void channelWritabilityChanged(ChannelHandlerContext* ctx) {}
68
69   // outbound
70   virtual Future<void> bind(
71       ChannelHandlerContext* ctx,
72       SocketAddress localAddress) {}
73   virtual Future<void> connect(
74           ChannelHandlerContext* ctx,
75           SocketAddress remoteAddress, SocketAddress localAddress) {}
76   virtual Future<void> disconnect(ChannelHandlerContext* ctx) {}
77   virtual Future<void> deregister(ChannelHandlerContext* ctx) {}
78   virtual Future<void> read(ChannelHandlerContext* ctx) {}
79   virtual void flush(ChannelHandlerContext* ctx) {}
80   */
81 };
82
83 template <class R, class W = R>
84 class ChannelHandlerAdapter : public ChannelHandler<R, R, W, W> {
85  public:
86   typedef typename ChannelHandler<R, R, W, W>::Context Context;
87
88   void read(Context* ctx, R msg) override {
89     ctx->fireRead(std::forward<R>(msg));
90   }
91
92   Future<void> write(Context* ctx, W msg) override {
93     return ctx->fireWrite(std::forward<W>(msg));
94   }
95 };
96
97 typedef ChannelHandlerAdapter<IOBufQueue&, std::unique_ptr<IOBuf>>
98 BytesToBytesHandler;
99
100 template <class Handler, bool Shared = true>
101 class ChannelHandlerPtr : public ChannelHandler<
102                                    typename Handler::rin,
103                                    typename Handler::rout,
104                                    typename Handler::win,
105                                    typename Handler::wout> {
106  public:
107   typedef typename std::conditional<
108     Shared,
109     std::shared_ptr<Handler>,
110     Handler*>::type
111   HandlerPtr;
112
113   typedef typename Handler::Context Context;
114
115   explicit ChannelHandlerPtr(HandlerPtr handler)
116     : handler_(std::move(handler)) {}
117
118   HandlerPtr getHandler() {
119     return handler_;
120   }
121
122   void setHandler(HandlerPtr handler) {
123     if (handler == handler_) {
124       return;
125     }
126     if (handler_ && ctx_) {
127       handler_->detachPipeline(ctx_);
128     }
129     handler_ = std::move(handler);
130     if (handler_ && ctx_) {
131       handler_->attachPipeline(ctx_);
132       if (ctx_->getTransport()) {
133         handler_->attachTransport(ctx_);
134       }
135     }
136   }
137
138   void attachPipeline(Context* ctx) override {
139     ctx_ = ctx;
140     if (handler_) {
141       handler_->attachPipeline(ctx_);
142     }
143   }
144
145   void attachTransport(Context* ctx) override {
146     ctx_ = ctx;
147     if (handler_) {
148       handler_->attachTransport(ctx_);
149     }
150   }
151
152   void detachPipeline(Context* ctx) override {
153     ctx_ = ctx;
154     if (handler_) {
155       handler_->detachPipeline(ctx_);
156     }
157   }
158
159   void detachTransport(Context* ctx) override {
160     ctx_ = ctx;
161     if (handler_) {
162       handler_->detachTransport(ctx_);
163     }
164   }
165
166   void read(Context* ctx, typename Handler::rin msg) override {
167     DCHECK(handler_);
168     handler_->read(ctx, std::forward<typename Handler::rin>(msg));
169   }
170
171   void readEOF(Context* ctx) override {
172     DCHECK(handler_);
173     handler_->readEOF(ctx);
174   }
175
176   void readException(Context* ctx, exception_wrapper e) override {
177     DCHECK(handler_);
178     handler_->readException(ctx, std::move(e));
179   }
180
181   Future<void> write(Context* ctx, typename Handler::win msg) override {
182     DCHECK(handler_);
183     return handler_->write(ctx, std::forward<typename Handler::win>(msg));
184   }
185
186   Future<void> close(Context* ctx) override {
187     DCHECK(handler_);
188     return handler_->close(ctx);
189   }
190
191  private:
192   Context* ctx_;
193   HandlerPtr handler_;
194 };
195
196 }}