inbound/outbound handlers
authorJames Sedgwick <jsedgwick@fb.com>
Thu, 30 Apr 2015 01:04:41 +0000 (18:04 -0700)
committerPraveen Kumar Ramakrishnan <praveenr@fb.com>
Tue, 12 May 2015 00:01:10 +0000 (17:01 -0700)
Summary:
Much less copypasta this time around. I wonder if the getters and setters for write flags and read buffer settings are necessary in the new handler types, or even if they belong in the bidirectional handler

I'm all ears for more suggestions on reducing copypasta

I'm going to reorg the code (inl headers etc) in a subsequent diff once this is in - easier to review this way

Test Plan: existing unit, thinking about tests for these changes

Reviewed By: davejwatson@fb.com

Subscribers: fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2026522

Tasks: 6836580

Signature: t1:2026522:1430346145:bd7f7770eddce0470e2ac72440fc001cf128df08

folly/wangle/channel/Handler.h
folly/wangle/channel/HandlerContext.h
folly/wangle/channel/OutputBufferingHandler.h
folly/wangle/channel/Pipeline.h
folly/wangle/channel/StaticPipeline.h
folly/wangle/codec/ByteToMessageCodec.h
folly/wangle/codec/CodecTest.cpp
folly/wangle/codec/LengthFieldPrepender.h

index 73d59a641e2942bcc021df9d8957b92bac0c314d..4332878fcd6347d59f38550b03b5c6903a925c21 100644 (file)
@@ -51,6 +51,8 @@ class HandlerBase {
 template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
 class Handler : public HandlerBase<HandlerContext<Rout, Wout>> {
  public:
+  static const HandlerDir dir = HandlerDir::BOTH;
+
   typedef Rin rin;
   typedef Rout rout;
   typedef Win win;
@@ -99,6 +101,47 @@ class Handler : public HandlerBase<HandlerContext<Rout, Wout>> {
   */
 };
 
+struct Unit{};
+
+template <class Rin, class Rout = Rin>
+class InboundHandler : public HandlerBase<InboundHandlerContext<Rout>> {
+ public:
+  static const HandlerDir dir = HandlerDir::IN;
+
+  typedef Rin rin;
+  typedef Rout rout;
+  typedef Unit win;
+  typedef Unit wout;
+  typedef InboundHandlerContext<Rout> Context;
+  virtual ~InboundHandler() {}
+
+  virtual void read(Context* ctx, Rin msg) = 0;
+  virtual void readEOF(Context* ctx) {
+    ctx->fireReadEOF();
+  }
+  virtual void readException(Context* ctx, exception_wrapper e) {
+    ctx->fireReadException(std::move(e));
+  }
+};
+
+template <class Win, class Wout = Win>
+class OutboundHandler : public HandlerBase<OutboundHandlerContext<Wout>> {
+ public:
+  static const HandlerDir dir = HandlerDir::OUT;
+
+  typedef Unit rin;
+  typedef Unit rout;
+  typedef Win win;
+  typedef Wout wout;
+  typedef OutboundHandlerContext<Wout> Context;
+  virtual ~OutboundHandler() {}
+
+  virtual Future<void> write(Context* ctx, Win msg) = 0;
+  virtual Future<void> close(Context* ctx) {
+    return ctx->fireClose();
+  }
+};
+
 template <class R, class W = R>
 class HandlerAdapter : public Handler<R, R, W, W> {
  public:
@@ -116,4 +159,10 @@ class HandlerAdapter : public Handler<R, R, W, W> {
 typedef HandlerAdapter<IOBufQueue&, std::unique_ptr<IOBuf>>
 BytesToBytesHandler;
 
+typedef InboundHandler<IOBufQueue&>
+InboundBytesToBytesHandler;
+
+typedef OutboundHandler<std::unique_ptr<IOBuf>>
+OutboundBytesToBytesHandler;
+
 }}
index 4bd79f5f05ed8811871ced06015c5ae5dc7a1b6c..2f899796f699ea7bb9d41e7726fa931a606046ad 100644 (file)
@@ -55,6 +55,39 @@ class HandlerContext {
   */
 };
 
+template <class In>
+class InboundHandlerContext {
+ public:
+  virtual ~InboundHandlerContext() {}
+
+  virtual void fireRead(In msg) = 0;
+  virtual void fireReadEOF() = 0;
+  virtual void fireReadException(exception_wrapper e) = 0;
+
+  virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
+
+  // TODO Need get/set writeFlags, readBufferSettings? Probably not.
+  // Do we even really need them stored in the pipeline at all?
+  // Could just always delegate to the socket impl
+};
+
+template <class Out>
+class OutboundHandlerContext {
+ public:
+  virtual ~OutboundHandlerContext() {}
+
+  virtual Future<void> fireWrite(Out msg) = 0;
+  virtual Future<void> fireClose() = 0;
+
+  virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
+};
+
+enum class HandlerDir {
+  IN,
+  OUT,
+  BOTH
+};
+
 class PipelineContext {
  public:
   virtual ~PipelineContext() {}
@@ -74,12 +107,6 @@ class PipelineContext {
     }
   }
 
-  void link(PipelineContext* other) {
-    setNextIn(other);
-    other->setNextOut(this);
-  }
-
- protected:
   virtual void setNextIn(PipelineContext* ctx) = 0;
   virtual void setNextOut(PipelineContext* ctx) = 0;
 };
@@ -144,7 +171,7 @@ class ContextImplBase : public PipelineContext {
     if (nextIn) {
       nextIn_ = nextIn;
     } else {
-      throw std::invalid_argument("wrong type in setNextIn");
+      throw std::invalid_argument("inbound type mismatch");
     }
   }
 
@@ -153,7 +180,7 @@ class ContextImplBase : public PipelineContext {
     if (nextOut) {
       nextOut_ = nextOut;
     } else {
-      throw std::invalid_argument("wrong type in setNextOut");
+      throw std::invalid_argument("outbound type mismatch");
     }
   }
 
@@ -170,16 +197,19 @@ class ContextImplBase : public PipelineContext {
 };
 
 template <class P, class H>
-class ContextImpl : public HandlerContext<typename H::rout,
-                                                 typename H::wout>,
-                    public InboundLink<typename H::rin>,
-                    public OutboundLink<typename H::win>,
-                    public ContextImplBase<P, H, HandlerContext<typename H::rout, typename H::wout>> {
+class ContextImpl
+  : public HandlerContext<typename H::rout,
+                          typename H::wout>,
+    public InboundLink<typename H::rin>,
+    public OutboundLink<typename H::win>,
+    public ContextImplBase<P, H, HandlerContext<typename H::rout,
+                                                typename H::wout>> {
  public:
   typedef typename H::rin Rin;
   typedef typename H::rout Rout;
   typedef typename H::win Win;
   typedef typename H::wout Wout;
+  static const HandlerDir dir = HandlerDir::BOTH;
 
   explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
     this->impl_ = this;
@@ -294,4 +324,157 @@ class ContextImpl : public HandlerContext<typename H::rout,
   using DestructorGuard = typename P::DestructorGuard;
 };
 
+template <class P, class H>
+class InboundContextImpl
+  : public InboundHandlerContext<typename H::rout>,
+    public InboundLink<typename H::rin>,
+    public ContextImplBase<P, H, InboundHandlerContext<typename H::rout>> {
+ public:
+  typedef typename H::rin Rin;
+  typedef typename H::rout Rout;
+  typedef typename H::win Win;
+  typedef typename H::wout Wout;
+  static const HandlerDir dir = HandlerDir::IN;
+
+  explicit InboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
+    this->impl_ = this;
+    this->initialize(pipeline, std::move(handler));
+  }
+
+  // For StaticPipeline
+  InboundContextImpl() {
+    this->impl_ = this;
+  }
+
+  ~InboundContextImpl() {}
+
+  // InboundHandlerContext overrides
+  void fireRead(Rout msg) override {
+    DestructorGuard dg(this->pipeline_);
+    if (this->nextIn_) {
+      this->nextIn_->read(std::forward<Rout>(msg));
+    } else {
+      LOG(WARNING) << "read reached end of pipeline";
+    }
+  }
+
+  void fireReadEOF() override {
+    DestructorGuard dg(this->pipeline_);
+    if (this->nextIn_) {
+      this->nextIn_->readEOF();
+    } else {
+      LOG(WARNING) << "readEOF reached end of pipeline";
+    }
+  }
+
+  void fireReadException(exception_wrapper e) override {
+    DestructorGuard dg(this->pipeline_);
+    if (this->nextIn_) {
+      this->nextIn_->readException(std::move(e));
+    } else {
+      LOG(WARNING) << "readException reached end of pipeline";
+    }
+  }
+
+  std::shared_ptr<AsyncTransport> getTransport() override {
+    return this->pipeline_->getTransport();
+  }
+
+  // InboundLink overrides
+  void read(Rin msg) override {
+    DestructorGuard dg(this->pipeline_);
+    this->handler_->read(this, std::forward<Rin>(msg));
+  }
+
+  void readEOF() override {
+    DestructorGuard dg(this->pipeline_);
+    this->handler_->readEOF(this);
+  }
+
+  void readException(exception_wrapper e) override {
+    DestructorGuard dg(this->pipeline_);
+    this->handler_->readException(this, std::move(e));
+  }
+
+ private:
+  using DestructorGuard = typename P::DestructorGuard;
+};
+
+template <class P, class H>
+class OutboundContextImpl
+  : public OutboundHandlerContext<typename H::wout>,
+    public OutboundLink<typename H::win>,
+    public ContextImplBase<P, H, OutboundHandlerContext<typename H::wout>> {
+ public:
+  typedef typename H::rin Rin;
+  typedef typename H::rout Rout;
+  typedef typename H::win Win;
+  typedef typename H::wout Wout;
+  static const HandlerDir dir = HandlerDir::OUT;
+
+  explicit OutboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
+    this->impl_ = this;
+    this->initialize(pipeline, std::move(handler));
+  }
+
+  // For StaticPipeline
+  OutboundContextImpl() {
+    this->impl_ = this;
+  }
+
+  ~OutboundContextImpl() {}
+
+  // OutboundHandlerContext overrides
+  Future<void> fireWrite(Wout msg) override {
+    DestructorGuard dg(this->pipeline_);
+    if (this->nextOut_) {
+      return this->nextOut_->write(std::forward<Wout>(msg));
+    } else {
+      LOG(WARNING) << "write reached end of pipeline";
+      return makeFuture();
+    }
+  }
+
+  Future<void> fireClose() override {
+    DestructorGuard dg(this->pipeline_);
+    if (this->nextOut_) {
+      return this->nextOut_->close();
+    } else {
+      LOG(WARNING) << "close reached end of pipeline";
+      return makeFuture();
+    }
+  }
+
+  std::shared_ptr<AsyncTransport> getTransport() override {
+    return this->pipeline_->getTransport();
+  }
+
+  // OutboundLink overrides
+  Future<void> write(Win msg) override {
+    DestructorGuard dg(this->pipeline_);
+    return this->handler_->write(this, std::forward<Win>(msg));
+  }
+
+  Future<void> close() override {
+    DestructorGuard dg(this->pipeline_);
+    return this->handler_->close(this);
+  }
+
+ private:
+  using DestructorGuard = typename P::DestructorGuard;
+};
+
+template <class Handler, class Pipeline>
+struct ContextType {
+  typedef typename std::conditional<
+    Handler::dir == HandlerDir::BOTH,
+    ContextImpl<Pipeline, Handler>,
+    typename std::conditional<
+      Handler::dir == HandlerDir::IN,
+      InboundContextImpl<Pipeline, Handler>,
+      OutboundContextImpl<Pipeline, Handler>
+    >::type>::type
+  type;
+};
+
 }}
index eb7c4248a9e197f49f4401ba04e3f4f4d7a12c9f..48d8aa51b1794ae6780573bc36298c6ee1d4be5a 100644 (file)
@@ -30,7 +30,7 @@ namespace folly { namespace wangle {
  *
  * This handler may only be used in a single Pipeline.
  */
-class OutputBufferingHandler : public BytesToBytesHandler,
+class OutputBufferingHandler : public OutboundBytesToBytesHandler,
                                protected EventBase::LoopCallback {
  public:
   Future<void> write(Context* ctx, std::unique_ptr<IOBuf> buf) override {
index 738a83f9d8b9414e3495b89f2c99e8f0f9260177..48b3db4d3213ec8d3b22ae41f2e08405fda80bbe 100644 (file)
@@ -83,15 +83,8 @@ class Pipeline : public DelayedDestruction {
 
   template <class H>
   Pipeline& addBack(std::shared_ptr<H> handler) {
-    ctxs_.push_back(std::make_shared<ContextImpl<Pipeline, H>>(
-        this,
-        std::move(handler)));
-    return *this;
-  }
-
-  template <class H>
-  Pipeline& addBack(H* handler) {
-    return addBack(std::shared_ptr<H>(handler, [](H*){}));
+    typedef typename ContextType<H, Pipeline>::type Context;
+    return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
   }
 
   template <class H>
@@ -100,16 +93,14 @@ class Pipeline : public DelayedDestruction {
   }
 
   template <class H>
-  Pipeline& addFront(std::shared_ptr<H> handler) {
-    ctxs_.insert(
-        ctxs_.begin(),
-        std::make_shared<ContextImpl<Pipeline, H>>(this, std::move(handler)));
-    return *this;
+  Pipeline& addBack(H* handler) {
+    return addBack(std::shared_ptr<H>(handler, [](H*){}));
   }
 
   template <class H>
-  Pipeline& addFront(H* handler) {
-    return addFront(std::shared_ptr<H>(handler, [](H*){}));
+  Pipeline& addFront(std::shared_ptr<H> handler) {
+    typedef typename ContextType<H, Pipeline>::type Context;
+    return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
   }
 
   template <class H>
@@ -117,30 +108,40 @@ class Pipeline : public DelayedDestruction {
     return addFront(std::make_shared<H>(std::forward<H>(handler)));
   }
 
+  template <class H>
+  Pipeline& addFront(H* handler) {
+    return addFront(std::shared_ptr<H>(handler, [](H*){}));
+  }
+
   template <class H>
   H* getHandler(int i) {
-    auto ctx = dynamic_cast<ContextImpl<Pipeline, H>*>(ctxs_[i].get());
+    typedef typename ContextType<H, Pipeline>::type Context;
+    auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
     CHECK(ctx);
     return ctx->getHandler();
   }
 
+  // TODO Have read/write/etc check that pipeline has been finalized
   void finalize() {
-    if (ctxs_.empty()) {
-      return;
-    }
-
-    for (size_t i = 0; i < ctxs_.size() - 1; i++) {
-      ctxs_[i]->link(ctxs_[i+1].get());
+    if (!inCtxs_.empty()) {
+      front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
+      for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
+        inCtxs_[i]->setNextIn(inCtxs_[i+1]);
+      }
     }
 
-    back_ = dynamic_cast<OutboundLink<W>*>(ctxs_.back().get());
-    if (!back_) {
-      throw std::invalid_argument("wrong type for last handler");
+    if (!outCtxs_.empty()) {
+      back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
+      for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
+        outCtxs_[i]->setNextOut(outCtxs_[i-1]);
+      }
     }
 
-    front_ = dynamic_cast<InboundLink<R>*>(ctxs_.front().get());
     if (!front_) {
-      throw std::invalid_argument("wrong type for first handler");
+      throw std::invalid_argument("no inbound handler in Pipeline");
+    }
+    if (!back_) {
+      throw std::invalid_argument("no outbound handler in Pipeline");
     }
 
     for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
@@ -154,8 +155,9 @@ class Pipeline : public DelayedDestruction {
   // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
   template <class H>
   bool setOwner(H* handler) {
+    typedef typename ContextType<H, Pipeline>::type Context;
     for (auto& ctx : ctxs_) {
-      auto ctxImpl = dynamic_cast<ContextImpl<Pipeline, H>*>(ctx.get());
+      auto ctxImpl = dynamic_cast<Context*>(ctx.get());
       if (ctxImpl && ctxImpl->getHandler() == handler) {
         owner_ = ctx;
         return true;
@@ -185,10 +187,8 @@ class Pipeline : public DelayedDestruction {
   }
 
   template <class Context>
-  void addContextFront(Context* context) {
-    ctxs_.insert(
-        ctxs_.begin(),
-        std::shared_ptr<Context>(context, [](Context*){}));
+  void addContextFront(Context* ctx) {
+    addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
   }
 
   void detachHandlers() {
@@ -200,15 +200,29 @@ class Pipeline : public DelayedDestruction {
   }
 
  private:
+  template <class Context>
+  Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front) {
+    ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
+    if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
+      inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
+    }
+    if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
+      outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
+    }
+    return *this;
+  }
+
   std::shared_ptr<AsyncTransport> transport_;
   WriteFlags writeFlags_{WriteFlags::NONE};
   std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
 
   bool isStatic_{false};
+  std::shared_ptr<PipelineContext> owner_;
+  std::vector<std::shared_ptr<PipelineContext>> ctxs_;
+  std::vector<PipelineContext*> inCtxs_;
+  std::vector<PipelineContext*> outCtxs_;
   InboundLink<R>* front_{nullptr};
   OutboundLink<W>* back_{nullptr};
-  std::vector<std::shared_ptr<PipelineContext>> ctxs_;
-  std::shared_ptr<PipelineContext> owner_;
 };
 
 }}
index 6c6c5e0868062269825052730f3513adb3b01c05..68e0d906365cd051c5bb7968ba6493adb954361c 100644 (file)
@@ -67,8 +67,6 @@ class StaticPipeline<R, W, Handler, Handlers...>
   }
 
  protected:
-  typedef ContextImpl<Pipeline<R, W>, Handler> Context;
-
   template <class HandlerArg, class... HandlerArgs>
   StaticPipeline(
       bool isFirst,
@@ -119,7 +117,7 @@ class StaticPipeline<R, W, Handler, Handlers...>
   bool isFirst_;
   folly::Optional<Handler> handler_;
   std::shared_ptr<Handler> handlerPtr_;
-  ContextImpl<Pipeline<R, W>, Handler> ctx_;
+  typename ContextType<Handler, Pipeline<R, W>>::type ctx_;
 };
 
 }} // folly::wangle
index 20d6e7fe676dc0c5b24d4b556306cdd8d87f769c..53ec3d8e4201beca16dc04a43829fa052c2db081 100644 (file)
@@ -40,7 +40,7 @@ namespace folly { namespace wangle {
  * IOBufQueue.front(), without split() or pop_front().
  */
 class ByteToMessageCodec
-    : public BytesToBytesHandler {
+    : public InboundBytesToBytesHandler {
  public:
 
   virtual std::unique_ptr<IOBuf> decode(
index 80bb83d3acdb38ff0466975c5d8b4575e9e434b8..c44577888f4cd7794485540c368c83bbdb3ad75f 100644 (file)
@@ -95,13 +95,13 @@ TEST(LengthFieldFramePipeline, SimpleTest) {
 
   pipeline
     .addBack(BytesReflector())
+    .addBack(LengthFieldPrepender())
     .addBack(LengthFieldBasedFrameDecoder())
     .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
         auto sz = buf->computeChainDataLength();
         called++;
         EXPECT_EQ(sz, 2);
       }))
-    .addBack(LengthFieldPrepender())
     .finalize();
 
   auto buf = IOBuf::create(2);
index 72c30d816a24fe77b9ad35a788e105a104d9a5fd..d2e1d37bf5224ecef22fce845a9194fb17aaf92f 100644 (file)
@@ -47,7 +47,7 @@ namespace folly { namespace wangle {
  *
  */
 class LengthFieldPrepender
-: public BytesToBytesHandler {
+: public OutboundBytesToBytesHandler {
  public:
   LengthFieldPrepender(
     int lengthFieldLength = 4,