inbound/outbound handlers
[folly.git] / folly / wangle / channel / HandlerContext.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/io/async/AsyncTransport.h>
20 #include <folly/futures/Future.h>
21 #include <folly/ExceptionWrapper.h>
22
23 namespace folly { namespace wangle {
24
25 template <class In, class Out>
26 class HandlerContext {
27  public:
28   virtual ~HandlerContext() {}
29
30   virtual void fireRead(In msg) = 0;
31   virtual void fireReadEOF() = 0;
32   virtual void fireReadException(exception_wrapper e) = 0;
33
34   virtual Future<void> fireWrite(Out msg) = 0;
35   virtual Future<void> fireClose() = 0;
36
37   virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
38
39   virtual void setWriteFlags(WriteFlags flags) = 0;
40   virtual WriteFlags getWriteFlags() = 0;
41
42   virtual void setReadBufferSettings(
43       uint64_t minAvailable,
44       uint64_t allocationSize) = 0;
45   virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0;
46
47   /* TODO
48   template <class H>
49   virtual void addHandlerBefore(H&&) {}
50   template <class H>
51   virtual void addHandlerAfter(H&&) {}
52   template <class H>
53   virtual void replaceHandler(H&&) {}
54   virtual void removeHandler() {}
55   */
56 };
57
58 template <class In>
59 class InboundHandlerContext {
60  public:
61   virtual ~InboundHandlerContext() {}
62
63   virtual void fireRead(In msg) = 0;
64   virtual void fireReadEOF() = 0;
65   virtual void fireReadException(exception_wrapper e) = 0;
66
67   virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
68
69   // TODO Need get/set writeFlags, readBufferSettings? Probably not.
70   // Do we even really need them stored in the pipeline at all?
71   // Could just always delegate to the socket impl
72 };
73
74 template <class Out>
75 class OutboundHandlerContext {
76  public:
77   virtual ~OutboundHandlerContext() {}
78
79   virtual Future<void> fireWrite(Out msg) = 0;
80   virtual Future<void> fireClose() = 0;
81
82   virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
83 };
84
85 enum class HandlerDir {
86   IN,
87   OUT,
88   BOTH
89 };
90
91 class PipelineContext {
92  public:
93   virtual ~PipelineContext() {}
94
95   virtual void attachPipeline() = 0;
96   virtual void detachPipeline() = 0;
97
98   virtual void attachTransport() = 0;
99   virtual void detachTransport() = 0;
100
101   template <class H, class HandlerContext>
102   void attachContext(H* handler, HandlerContext* ctx) {
103     if (++handler->attachCount_ == 1) {
104       handler->ctx_ = ctx;
105     } else {
106       handler->ctx_ = nullptr;
107     }
108   }
109
110   virtual void setNextIn(PipelineContext* ctx) = 0;
111   virtual void setNextOut(PipelineContext* ctx) = 0;
112 };
113
114 template <class In>
115 class InboundLink {
116  public:
117   virtual ~InboundLink() {}
118   virtual void read(In msg) = 0;
119   virtual void readEOF() = 0;
120   virtual void readException(exception_wrapper e) = 0;
121 };
122
123 template <class Out>
124 class OutboundLink {
125  public:
126   virtual ~OutboundLink() {}
127   virtual Future<void> write(Out msg) = 0;
128   virtual Future<void> close() = 0;
129 };
130
131 template <class P, class H, class Context>
132 class ContextImplBase : public PipelineContext {
133  public:
134   ~ContextImplBase() {}
135
136   H* getHandler() {
137     return handler_.get();
138   }
139
140   void initialize(P* pipeline, std::shared_ptr<H> handler) {
141     pipeline_ = pipeline;
142     handler_ = std::move(handler);
143   }
144
145   // PipelineContext overrides
146   void attachPipeline() override {
147     if (!attached_) {
148       this->attachContext(handler_.get(), impl_);
149       handler_->attachPipeline(impl_);
150       attached_ = true;
151     }
152   }
153
154   void detachPipeline() override {
155     handler_->detachPipeline(impl_);
156     attached_ = false;
157   }
158
159   void attachTransport() override {
160     DestructorGuard dg(pipeline_);
161     handler_->attachTransport(impl_);
162   }
163
164   void detachTransport() override {
165     DestructorGuard dg(pipeline_);
166     handler_->detachTransport(impl_);
167   }
168
169   void setNextIn(PipelineContext* ctx) override {
170     auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
171     if (nextIn) {
172       nextIn_ = nextIn;
173     } else {
174       throw std::invalid_argument("inbound type mismatch");
175     }
176   }
177
178   void setNextOut(PipelineContext* ctx) override {
179     auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);
180     if (nextOut) {
181       nextOut_ = nextOut;
182     } else {
183       throw std::invalid_argument("outbound type mismatch");
184     }
185   }
186
187  protected:
188   Context* impl_;
189   P* pipeline_;
190   std::shared_ptr<H> handler_;
191   InboundLink<typename H::rout>* nextIn_{nullptr};
192   OutboundLink<typename H::wout>* nextOut_{nullptr};
193
194  private:
195   bool attached_{false};
196   using DestructorGuard = typename P::DestructorGuard;
197 };
198
199 template <class P, class H>
200 class ContextImpl
201   : public HandlerContext<typename H::rout,
202                           typename H::wout>,
203     public InboundLink<typename H::rin>,
204     public OutboundLink<typename H::win>,
205     public ContextImplBase<P, H, HandlerContext<typename H::rout,
206                                                 typename H::wout>> {
207  public:
208   typedef typename H::rin Rin;
209   typedef typename H::rout Rout;
210   typedef typename H::win Win;
211   typedef typename H::wout Wout;
212   static const HandlerDir dir = HandlerDir::BOTH;
213
214   explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
215     this->impl_ = this;
216     this->initialize(pipeline, std::move(handler));
217   }
218
219   // For StaticPipeline
220   ContextImpl() {
221     this->impl_ = this;
222   }
223
224   ~ContextImpl() {}
225
226   // HandlerContext overrides
227   void fireRead(Rout msg) override {
228     DestructorGuard dg(this->pipeline_);
229     if (this->nextIn_) {
230       this->nextIn_->read(std::forward<Rout>(msg));
231     } else {
232       LOG(WARNING) << "read reached end of pipeline";
233     }
234   }
235
236   void fireReadEOF() override {
237     DestructorGuard dg(this->pipeline_);
238     if (this->nextIn_) {
239       this->nextIn_->readEOF();
240     } else {
241       LOG(WARNING) << "readEOF reached end of pipeline";
242     }
243   }
244
245   void fireReadException(exception_wrapper e) override {
246     DestructorGuard dg(this->pipeline_);
247     if (this->nextIn_) {
248       this->nextIn_->readException(std::move(e));
249     } else {
250       LOG(WARNING) << "readException reached end of pipeline";
251     }
252   }
253
254   Future<void> fireWrite(Wout msg) override {
255     DestructorGuard dg(this->pipeline_);
256     if (this->nextOut_) {
257       return this->nextOut_->write(std::forward<Wout>(msg));
258     } else {
259       LOG(WARNING) << "write reached end of pipeline";
260       return makeFuture();
261     }
262   }
263
264   Future<void> fireClose() override {
265     DestructorGuard dg(this->pipeline_);
266     if (this->nextOut_) {
267       return this->nextOut_->close();
268     } else {
269       LOG(WARNING) << "close reached end of pipeline";
270       return makeFuture();
271     }
272   }
273
274   std::shared_ptr<AsyncTransport> getTransport() override {
275     return this->pipeline_->getTransport();
276   }
277
278   void setWriteFlags(WriteFlags flags) override {
279     this->pipeline_->setWriteFlags(flags);
280   }
281
282   WriteFlags getWriteFlags() override {
283     return this->pipeline_->getWriteFlags();
284   }
285
286   void setReadBufferSettings(
287       uint64_t minAvailable,
288       uint64_t allocationSize) override {
289     this->pipeline_->setReadBufferSettings(minAvailable, allocationSize);
290   }
291
292   std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
293     return this->pipeline_->getReadBufferSettings();
294   }
295
296   // InboundLink overrides
297   void read(Rin msg) override {
298     DestructorGuard dg(this->pipeline_);
299     this->handler_->read(this, std::forward<Rin>(msg));
300   }
301
302   void readEOF() override {
303     DestructorGuard dg(this->pipeline_);
304     this->handler_->readEOF(this);
305   }
306
307   void readException(exception_wrapper e) override {
308     DestructorGuard dg(this->pipeline_);
309     this->handler_->readException(this, std::move(e));
310   }
311
312   // OutboundLink overrides
313   Future<void> write(Win msg) override {
314     DestructorGuard dg(this->pipeline_);
315     return this->handler_->write(this, std::forward<Win>(msg));
316   }
317
318   Future<void> close() override {
319     DestructorGuard dg(this->pipeline_);
320     return this->handler_->close(this);
321   }
322
323  private:
324   using DestructorGuard = typename P::DestructorGuard;
325 };
326
327 template <class P, class H>
328 class InboundContextImpl
329   : public InboundHandlerContext<typename H::rout>,
330     public InboundLink<typename H::rin>,
331     public ContextImplBase<P, H, InboundHandlerContext<typename H::rout>> {
332  public:
333   typedef typename H::rin Rin;
334   typedef typename H::rout Rout;
335   typedef typename H::win Win;
336   typedef typename H::wout Wout;
337   static const HandlerDir dir = HandlerDir::IN;
338
339   explicit InboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
340     this->impl_ = this;
341     this->initialize(pipeline, std::move(handler));
342   }
343
344   // For StaticPipeline
345   InboundContextImpl() {
346     this->impl_ = this;
347   }
348
349   ~InboundContextImpl() {}
350
351   // InboundHandlerContext overrides
352   void fireRead(Rout msg) override {
353     DestructorGuard dg(this->pipeline_);
354     if (this->nextIn_) {
355       this->nextIn_->read(std::forward<Rout>(msg));
356     } else {
357       LOG(WARNING) << "read reached end of pipeline";
358     }
359   }
360
361   void fireReadEOF() override {
362     DestructorGuard dg(this->pipeline_);
363     if (this->nextIn_) {
364       this->nextIn_->readEOF();
365     } else {
366       LOG(WARNING) << "readEOF reached end of pipeline";
367     }
368   }
369
370   void fireReadException(exception_wrapper e) override {
371     DestructorGuard dg(this->pipeline_);
372     if (this->nextIn_) {
373       this->nextIn_->readException(std::move(e));
374     } else {
375       LOG(WARNING) << "readException reached end of pipeline";
376     }
377   }
378
379   std::shared_ptr<AsyncTransport> getTransport() override {
380     return this->pipeline_->getTransport();
381   }
382
383   // InboundLink overrides
384   void read(Rin msg) override {
385     DestructorGuard dg(this->pipeline_);
386     this->handler_->read(this, std::forward<Rin>(msg));
387   }
388
389   void readEOF() override {
390     DestructorGuard dg(this->pipeline_);
391     this->handler_->readEOF(this);
392   }
393
394   void readException(exception_wrapper e) override {
395     DestructorGuard dg(this->pipeline_);
396     this->handler_->readException(this, std::move(e));
397   }
398
399  private:
400   using DestructorGuard = typename P::DestructorGuard;
401 };
402
403 template <class P, class H>
404 class OutboundContextImpl
405   : public OutboundHandlerContext<typename H::wout>,
406     public OutboundLink<typename H::win>,
407     public ContextImplBase<P, H, OutboundHandlerContext<typename H::wout>> {
408  public:
409   typedef typename H::rin Rin;
410   typedef typename H::rout Rout;
411   typedef typename H::win Win;
412   typedef typename H::wout Wout;
413   static const HandlerDir dir = HandlerDir::OUT;
414
415   explicit OutboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
416     this->impl_ = this;
417     this->initialize(pipeline, std::move(handler));
418   }
419
420   // For StaticPipeline
421   OutboundContextImpl() {
422     this->impl_ = this;
423   }
424
425   ~OutboundContextImpl() {}
426
427   // OutboundHandlerContext overrides
428   Future<void> fireWrite(Wout msg) override {
429     DestructorGuard dg(this->pipeline_);
430     if (this->nextOut_) {
431       return this->nextOut_->write(std::forward<Wout>(msg));
432     } else {
433       LOG(WARNING) << "write reached end of pipeline";
434       return makeFuture();
435     }
436   }
437
438   Future<void> fireClose() override {
439     DestructorGuard dg(this->pipeline_);
440     if (this->nextOut_) {
441       return this->nextOut_->close();
442     } else {
443       LOG(WARNING) << "close reached end of pipeline";
444       return makeFuture();
445     }
446   }
447
448   std::shared_ptr<AsyncTransport> getTransport() override {
449     return this->pipeline_->getTransport();
450   }
451
452   // OutboundLink overrides
453   Future<void> write(Win msg) override {
454     DestructorGuard dg(this->pipeline_);
455     return this->handler_->write(this, std::forward<Win>(msg));
456   }
457
458   Future<void> close() override {
459     DestructorGuard dg(this->pipeline_);
460     return this->handler_->close(this);
461   }
462
463  private:
464   using DestructorGuard = typename P::DestructorGuard;
465 };
466
467 template <class Handler, class Pipeline>
468 struct ContextType {
469   typedef typename std::conditional<
470     Handler::dir == HandlerDir::BOTH,
471     ContextImpl<Pipeline, Handler>,
472     typename std::conditional<
473       Handler::dir == HandlerDir::IN,
474       InboundContextImpl<Pipeline, Handler>,
475       OutboundContextImpl<Pipeline, Handler>
476     >::type>::type
477   type;
478 };
479
480 }}