Manage handlers with shared_ptrs, introduce StaticPipeline
authorJames Sedgwick <jsedgwick@fb.com>
Mon, 27 Apr 2015 18:44:31 +0000 (11:44 -0700)
committerAlecs King <int@fb.com>
Mon, 27 Apr 2015 23:55:07 +0000 (16:55 -0700)
Summary:
A few things:
- Eliminate HandlerPtr by managing all handlers with shared_ptrs instead of keeping them inline in the pipeline
- Kill recursively templated ChannelPipeline accordingly
- Introduce StaticPipeline to retain the flexibility of zero-alloc pipelines
- Introduce notion of an "owning handler" to avoid destruction order issues

Test Plan: unit (will add more), thrift unit

Reviewed By: davejwatson@fb.com

Subscribers: fugalh, alandau, bmatheny, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2023976

Tasks: 6836580

Signature: t1:2023976:1430159578:e50e8a149e549a40670d093fb65987a4843cdd8d

folly/Makefile.am
folly/wangle/bootstrap/BootstrapTest.cpp
folly/wangle/bootstrap/ServerBootstrap-inl.h
folly/wangle/channel/Handler.h
folly/wangle/channel/HandlerContext.h
folly/wangle/channel/Pipeline.h
folly/wangle/channel/StaticPipeline.h [new file with mode: 0644]
folly/wangle/channel/test/OutputBufferingHandlerTest.cpp
folly/wangle/channel/test/PipelineTest.cpp
folly/wangle/service/ClientDispatcher.h

index 1c4030cfe62b1c0de103c3b991fea5067d80811b..45296006996a3763e7aef27f230bc0bf23f73aec 100644 (file)
@@ -279,6 +279,7 @@ nobase_follyinclude_HEADERS = \
        wangle/channel/HandlerContext.h \
        wangle/channel/OutputBufferingHandler.h \
        wangle/channel/Pipeline.h \
+       wangle/channel/StaticPipeline.h \
        wangle/concurrent/BlockingQueue.h \
        wangle/concurrent/Codel.h \
        wangle/concurrent/CPUThreadPoolExecutor.h \
index 9087be57977900e1d839732fc01157145e447771..ddbd50ee672b96ce8c402f8c7e0e99a8296cb1ad 100644 (file)
@@ -287,8 +287,7 @@ class TestHandlerPipelineFactory
  public:
   ServerBootstrap<BytesPipeline>::AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
     auto pipeline = new ServerBootstrap<BytesPipeline>::AcceptPipeline;
-    auto handler = std::make_shared<HandlerPipeline>();
-      pipeline->addBack(HandlerPtr<HandlerPipeline>(handler));
+    pipeline->addBack(HandlerPipeline());
     return pipeline;
   }
 };
index 1e0ebc75d3d827dd051186b41128f26c48d5c838..ee3f9b22799a6771bef6270dae93517580e1279d 100644 (file)
@@ -70,7 +70,7 @@ class ServerAcceptor
     Acceptor::init(nullptr, base_);
     CHECK(acceptorPipeline_);
 
-    acceptorPipeline_->addBack(folly::wangle::HandlerPtr<ServerAcceptor, false>(this));
+    acceptorPipeline_->addBack(this);
     acceptorPipeline_->finalize();
   }
 
index 67219f17f5d84bb9d68cdde312a1f9572b5aa6d4..062d23a2fd5576d243f64cd352cbb51bd4864a52 100644 (file)
@@ -97,100 +97,4 @@ class HandlerAdapter : public Handler<R, R, W, W> {
 typedef HandlerAdapter<IOBufQueue&, std::unique_ptr<IOBuf>>
 BytesToBytesHandler;
 
-template <class HandlerT, bool Shared = true>
-class HandlerPtr : public Handler<
-                            typename HandlerT::rin,
-                            typename HandlerT::rout,
-                            typename HandlerT::win,
-                            typename HandlerT::wout> {
- public:
-  typedef typename std::conditional<
-    Shared,
-    std::shared_ptr<HandlerT>,
-    HandlerT*>::type
-  Ptr;
-
-  typedef typename HandlerT::Context Context;
-
-  explicit HandlerPtr(Ptr handler)
-    : handler_(std::move(handler)) {}
-
-  Ptr getHandler() {
-    return handler_;
-  }
-
-  void setHandler(Ptr handler) {
-    if (handler == handler_) {
-      return;
-    }
-    if (handler_ && ctx_) {
-      handler_->detachPipeline(ctx_);
-    }
-    handler_ = std::move(handler);
-    if (handler_ && ctx_) {
-      handler_->attachPipeline(ctx_);
-      if (ctx_->getTransport()) {
-        handler_->attachTransport(ctx_);
-      }
-    }
-  }
-
-  void attachPipeline(Context* ctx) override {
-    ctx_ = ctx;
-    if (handler_) {
-      handler_->attachPipeline(ctx_);
-    }
-  }
-
-  void attachTransport(Context* ctx) override {
-    ctx_ = ctx;
-    if (handler_) {
-      handler_->attachTransport(ctx_);
-    }
-  }
-
-  void detachPipeline(Context* ctx) override {
-    ctx_ = ctx;
-    if (handler_) {
-      handler_->detachPipeline(ctx_);
-    }
-  }
-
-  void detachTransport(Context* ctx) override {
-    ctx_ = ctx;
-    if (handler_) {
-      handler_->detachTransport(ctx_);
-    }
-  }
-
-  void read(Context* ctx, typename HandlerT::rin msg) override {
-    DCHECK(handler_);
-    handler_->read(ctx, std::forward<typename HandlerT::rin>(msg));
-  }
-
-  void readEOF(Context* ctx) override {
-    DCHECK(handler_);
-    handler_->readEOF(ctx);
-  }
-
-  void readException(Context* ctx, exception_wrapper e) override {
-    DCHECK(handler_);
-    handler_->readException(ctx, std::move(e));
-  }
-
-  Future<void> write(Context* ctx, typename HandlerT::win msg) override {
-    DCHECK(handler_);
-    return handler_->write(ctx, std::forward<typename HandlerT::win>(msg));
-  }
-
-  Future<void> close(Context* ctx) override {
-    DCHECK(handler_);
-    return handler_->close(ctx);
-  }
-
- private:
-  Context* ctx_;
-  Ptr handler_;
-};
-
 }}
index 809f2b1a37c0c1a9663d1d7da4d48ec37140ad1a..deddb2de012e27ea2ae40fff27c427ad75053b4a 100644 (file)
@@ -59,6 +59,8 @@ class PipelineContext {
  public:
   virtual ~PipelineContext() {}
 
+  virtual void detachPipeline() = 0;
+
   virtual void attachTransport() = 0;
   virtual void detachTransport() = 0;
 
@@ -101,22 +103,30 @@ class ContextImpl : public HandlerContext<typename H::rout,
   typedef typename H::win Win;
   typedef typename H::wout Wout;
 
-  template <class HandlerArg>
-  explicit ContextImpl(P* pipeline, HandlerArg&& handlerArg)
-    : pipeline_(pipeline),
-      handler_(std::forward<HandlerArg>(handlerArg)) {
-    handler_.attachPipeline(this);
+  explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
+    initialize(pipeline, std::move(handler));
   }
 
-  ~ContextImpl() {
-    handler_.detachPipeline(this);
+  void initialize(P* pipeline, std::shared_ptr<H> handler) {
+    pipeline_ = pipeline;
+    handler_ = std::move(handler);
+    handler_->attachPipeline(this);
   }
 
+  // For StaticPipeline
+  ContextImpl() {}
+
+  ~ContextImpl() {}
+
   H* getHandler() {
-    return &handler_;
+    return handler_.get();
   }
 
   // PipelineContext overrides
+  void detachPipeline() override {
+    handler_->detachPipeline(this);
+  }
+
   void setNextIn(PipelineContext* ctx) override {
     auto nextIn = dynamic_cast<InboundHandlerContext<Rout>*>(ctx);
     if (nextIn) {
@@ -137,12 +147,12 @@ class ContextImpl : public HandlerContext<typename H::rout,
 
   void attachTransport() override {
     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    handler_.attachTransport(this);
+    handler_->attachTransport(this);
   }
 
   void detachTransport() override {
     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    handler_.detachTransport(this);
+    handler_->detachTransport(this);
   }
 
   // HandlerContext overrides
@@ -218,33 +228,33 @@ class ContextImpl : public HandlerContext<typename H::rout,
   // InboundHandlerContext overrides
   void read(Rin msg) override {
     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    handler_.read(this, std::forward<Rin>(msg));
+    handler_->read(this, std::forward<Rin>(msg));
   }
 
   void readEOF() override {
     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    handler_.readEOF(this);
+    handler_->readEOF(this);
   }
 
   void readException(exception_wrapper e) override {
     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    handler_.readException(this, std::move(e));
+    handler_->readException(this, std::move(e));
   }
 
   // OutboundHandlerContext overrides
   Future<void> write(Win msg) override {
     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    return handler_.write(this, std::forward<Win>(msg));
+    return handler_->write(this, std::forward<Win>(msg));
   }
 
   Future<void> close() override {
     typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    return handler_.close(this);
+    return handler_->close(this);
   }
 
  private:
   P* pipeline_;
-  H handler_;
+  std::shared_ptr<H> handler_;
   InboundHandlerContext<Rout>* nextIn_{nullptr};
   OutboundHandlerContext<Wout>* nextOut_{nullptr};
 };
index 7d4fd2a6dee5b5df4b8daa1f14e6d23a67f3f7c1..0cabfb2c6c4146f5aa13cde3e7045cf23dd64d0a 100644 (file)
@@ -30,14 +30,10 @@ namespace folly { namespace wangle {
  * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
  * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
  */
-template <class R, class W, class... Handlers>
-class Pipeline;
-
 template <class R, class W>
-class Pipeline<R, W> : public DelayedDestruction {
+class Pipeline : public DelayedDestruction {
  public:
   Pipeline() {}
-  ~Pipeline() {}
 
   std::shared_ptr<AsyncTransport> getTransport() {
     return transport_;
@@ -80,22 +76,41 @@ class Pipeline<R, W> : public DelayedDestruction {
   }
 
   template <class H>
-  Pipeline& addBack(H&& handler) {
-    ctxs_.push_back(folly::make_unique<ContextImpl<Pipeline, H>>(
-        this, std::forward<H>(handler)));
+  Pipeline& addBack(std::shared_ptr<H> handler) {
+    ctxs_.push_back(std::make_shared<ContextImpl<Pipeline, H>>(
+        this,
+        std::move(handler)));
     return *this;
   }
 
   template <class H>
-  Pipeline& addFront(H&& handler) {
+  Pipeline& addBack(H* handler) {
+    return addBack(std::shared_ptr<H>(handler, [](H*){}));
+  }
+
+  template <class H>
+  Pipeline& addBack(H&& handler) {
+    return addBack(std::make_shared<H>(std::forward<H>(handler)));
+  }
+
+  template <class H>
+  Pipeline& addFront(std::shared_ptr<H> handler) {
     ctxs_.insert(
         ctxs_.begin(),
-        folly::make_unique<ContextImpl<Pipeline, H>>(
-            this,
-            std::forward<H>(handler)));
+        std::make_shared<ContextImpl<Pipeline, H>>(this, std::move(handler)));
     return *this;
   }
 
+  template <class H>
+  Pipeline& addFront(H* handler) {
+    return addFront(std::shared_ptr<H>(handler, [](H*){}));
+  }
+
+  template <class H>
+  Pipeline& addFront(H&& handler) {
+    return addFront(std::make_shared<H>(std::forward<H>(handler)));
+  }
+
   template <class H>
   H* getHandler(int i) {
     auto ctx = dynamic_cast<ContextImpl<Pipeline, H>*>(ctxs_[i].get());
@@ -104,21 +119,6 @@ class Pipeline<R, W> : public DelayedDestruction {
   }
 
   void finalize() {
-    finalizeHelper();
-    InboundHandlerContext<R>* front;
-    front_ = dynamic_cast<InboundHandlerContext<R>*>(
-        ctxs_.front().get());
-    if (!front_) {
-      throw std::invalid_argument("wrong type for first handler");
-    }
-  }
-
- protected:
-  explicit Pipeline(bool shouldFinalize) {
-    CHECK(!shouldFinalize);
-  }
-
-  void finalizeHelper() {
     if (ctxs_.empty()) {
       return;
     }
@@ -131,197 +131,69 @@ class Pipeline<R, W> : public DelayedDestruction {
     if (!back_) {
       throw std::invalid_argument("wrong type for last handler");
     }
-  }
-
-  PipelineContext* getLocalFront() {
-    return ctxs_.empty() ? nullptr : ctxs_.front().get();
-  }
-
-  static const bool is_end{true};
-
-  std::shared_ptr<AsyncTransport> transport_;
-  WriteFlags writeFlags_{WriteFlags::NONE};
-  std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
-
-  void attachPipeline() {}
-
-  void attachTransport(
-      std::shared_ptr<AsyncTransport> transport) {
-    transport_ = std::move(transport);
-  }
 
-  void detachTransport() {
-    transport_ = nullptr;
-  }
-
-  OutboundHandlerContext<W>* back_{nullptr};
-
- private:
-  InboundHandlerContext<R>* front_{nullptr};
-  std::vector<std::unique_ptr<PipelineContext>> ctxs_;
-};
-
-template <class R, class W, class Handler, class... Handlers>
-class Pipeline<R, W, Handler, Handlers...>
-  : public Pipeline<R, W, Handlers...> {
- protected:
-  template <class HandlerArg, class... HandlersArgs>
-  Pipeline(
-      bool shouldFinalize,
-      HandlerArg&& handlerArg,
-      HandlersArgs&&... handlersArgs)
-    : Pipeline<R, W, Handlers...>(
-          false,
-          std::forward<HandlersArgs>(handlersArgs)...),
-          ctx_(this, std::forward<HandlerArg>(handlerArg)) {
-    if (shouldFinalize) {
-      finalize();
+    front_ = dynamic_cast<InboundHandlerContext<R>*>(ctxs_.front().get());
+    if (!front_) {
+      throw std::invalid_argument("wrong type for first handler");
     }
   }
 
- public:
-  template <class... HandlersArgs>
-  explicit Pipeline(HandlersArgs&&... handlersArgs)
-    : Pipeline(true, std::forward<HandlersArgs>(handlersArgs)...) {}
-
-  ~Pipeline() {}
-
-  void read(R msg) {
-    typename Pipeline<R, W>::DestructorGuard dg(
-        static_cast<DelayedDestruction*>(this));
-    front_->read(std::forward<R>(msg));
-  }
-
-  void readEOF() {
-    typename Pipeline<R, W>::DestructorGuard dg(
-        static_cast<DelayedDestruction*>(this));
-    front_->readEOF();
-  }
-
-  void readException(exception_wrapper e) {
-    typename Pipeline<R, W>::DestructorGuard dg(
-        static_cast<DelayedDestruction*>(this));
-    front_->readException(std::move(e));
-  }
-
-  Future<void> write(W msg) {
-    typename Pipeline<R, W>::DestructorGuard dg(
-        static_cast<DelayedDestruction*>(this));
-    return back_->write(std::forward<W>(msg));
-  }
-
-  Future<void> close() {
-    typename Pipeline<R, W>::DestructorGuard dg(
-        static_cast<DelayedDestruction*>(this));
-    return back_->close();
+  // If one of the handlers owns the pipeline itself, use setOwner to ensure
+  // that the pipeline doesn't try to detach the handler during destruction,
+  // lest destruction ordering issues occur.
+  // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
+  template <class H>
+  bool setOwner(H* handler) {
+    for (auto& ctx : ctxs_) {
+      auto ctxImpl = dynamic_cast<ContextImpl<Pipeline, H>*>(ctx.get());
+      if (ctxImpl && ctxImpl->getHandler() == handler) {
+        owner_ = ctx;
+        return true;
+      }
+    }
+    return false;
   }
 
   void attachTransport(
       std::shared_ptr<AsyncTransport> transport) {
-    typename Pipeline<R, W>::DestructorGuard dg(
-        static_cast<DelayedDestruction*>(this));
-    CHECK((!Pipeline<R, W>::transport_));
-    Pipeline<R, W, Handlers...>::attachTransport(std::move(transport));
-    forEachCtx([&](PipelineContext* ctx){
+    transport_ = std::move(transport);
+    for (auto& ctx : ctxs_) {
       ctx->attachTransport();
-    });
+    }
   }
 
   void detachTransport() {
-    typename Pipeline<R, W>::DestructorGuard dg(
-        static_cast<DelayedDestruction*>(this));
-    Pipeline<R, W, Handlers...>::detachTransport();
-    forEachCtx([&](PipelineContext* ctx){
+    transport_ = nullptr;
+    for (auto& ctx : ctxs_) {
       ctx->detachTransport();
-    });
-  }
-
-  std::shared_ptr<AsyncTransport> getTransport() {
-    return Pipeline<R, W>::transport_;
-  }
-
-  template <class H>
-  Pipeline& addBack(H&& handler) {
-    Pipeline<R, W>::addBack(std::move(handler));
-    return *this;
+    }
   }
 
-  template <class H>
-  Pipeline& addFront(H&& handler) {
+ protected:
+  template <class Context>
+  void addContextFront(Context* context) {
     ctxs_.insert(
         ctxs_.begin(),
-        folly::make_unique<ContextImpl<Pipeline, H>>(
-            this,
-            std::move(handler)));
-    return *this;
-  }
-
-  template <class H>
-  H* getHandler(size_t i) {
-    if (i > ctxs_.size()) {
-      return Pipeline<R, W, Handlers...>::template getHandler<H>(
-          i - (ctxs_.size() + 1));
-    } else {
-      auto pctx = (i == ctxs_.size()) ? &ctx_ : ctxs_[i].get();
-      auto ctx = dynamic_cast<ContextImpl<Pipeline, H>*>(pctx);
-      return ctx->getHandler();
-    }
-  }
-
-  void finalize() {
-    finalizeHelper();
-    auto ctx = ctxs_.empty() ? &ctx_ : ctxs_.front().get();
-    front_ = dynamic_cast<InboundHandlerContext<R>*>(ctx);
-    if (!front_) {
-      throw std::invalid_argument("wrong type for first handler");
-    }
+        std::shared_ptr<Context>(context, [](Context*){}));
   }
 
- protected:
-  void finalizeHelper() {
-    Pipeline<R, W, Handlers...>::finalizeHelper();
-    back_ = Pipeline<R, W, Handlers...>::back_;
-    if (!back_) {
-      auto is_at_end = Pipeline<R, W, Handlers...>::is_end;
-      CHECK(is_at_end);
-      back_ = dynamic_cast<OutboundHandlerContext<W>*>(&ctx_);
-      if (!back_) {
-        throw std::invalid_argument("wrong type for last handler");
-      }
-    }
-
-    if (!ctxs_.empty()) {
-      for (size_t i = 0; i < ctxs_.size() - 1; i++) {
-        ctxs_[i]->link(ctxs_[i+1].get());
+  void detachHandlers() {
+    for (auto& ctx : ctxs_) {
+      if (ctx != owner_) {
+        ctx->detachPipeline();
       }
-      ctxs_.back()->link(&ctx_);
-    }
-
-    auto nextFront = Pipeline<R, W, Handlers...>::getLocalFront();
-    if (nextFront) {
-      ctx_.link(nextFront);
     }
   }
 
-  PipelineContext* getLocalFront() {
-    return ctxs_.empty() ? &ctx_ : ctxs_.front().get();
-  }
+ private:
+  std::shared_ptr<AsyncTransport> transport_;
+  WriteFlags writeFlags_{WriteFlags::NONE};
+  std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
 
-  static const bool is_end{false};
   InboundHandlerContext<R>* front_{nullptr};
   OutboundHandlerContext<W>* back_{nullptr};
-
- private:
-  template <class F>
-  void forEachCtx(const F& func) {
-    for (auto& ctx : ctxs_) {
-      func(ctx.get());
-    }
-    func(&ctx_);
-  }
-
-  ContextImpl<Pipeline, Handler> ctx_;
-  std::vector<std::unique_ptr<PipelineContext>> ctxs_;
+  std::vector<std::shared_ptr<PipelineContext>> ctxs_;
+  std::shared_ptr<PipelineContext> owner_;
 };
 
 }}
diff --git a/folly/wangle/channel/StaticPipeline.h b/folly/wangle/channel/StaticPipeline.h
new file mode 100644 (file)
index 0000000..3c644eb
--- /dev/null
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/channel/Pipeline.h>
+
+namespace folly { namespace wangle {
+
+/*
+ * StaticPipeline allows you to create a Pipeline with minimal allocations.
+ * Specify your handlers after the input/output types of your Pipeline in order
+ * from front to back, and construct with either H&&, H*, or std::shared_ptr<H>
+ * for each handler. The pipeline will be finalized for you at the end of
+ * construction. For example:
+ *
+ * StringToStringHandler stringHandler1;
+ * auto stringHandler2 = std::make_shared<StringToStringHandler>();
+ *
+ * StaticPipeline<int, std::string,
+ *   IntToStringHandler,
+ *   StringToStringHandler,
+ *   StringToStringHandler>(
+ *     IntToStringHandler(),  // H&&
+ *     &stringHandler1,       // H*
+ *     stringHandler2)        // std::shared_ptr<H>
+ * pipeline;
+ *
+ * You can then use pipeline just like any Pipeline. See Pipeline.h.
+ */
+template <class R, class W, class... Handlers>
+class StaticPipeline;
+
+template <class R, class W>
+class StaticPipeline<R, W> : public Pipeline<R, W> {
+ protected:
+  explicit StaticPipeline(bool) : Pipeline<R, W>() {}
+};
+
+template <class R, class W, class Handler, class... Handlers>
+class StaticPipeline<R, W, Handler, Handlers...>
+    : public StaticPipeline<R, W, Handlers...> {
+ public:
+  template <class... HandlerArgs>
+  explicit StaticPipeline(HandlerArgs&&... handlers)
+    : StaticPipeline(true, std::forward<HandlerArgs>(handlers)...) {
+    isFirst_ = true;
+  }
+
+  ~StaticPipeline() {
+    if (isFirst_) {
+      Pipeline<R, W>::detachHandlers();
+    }
+  }
+
+ protected:
+  typedef ContextImpl<Pipeline<R, W>, Handler> Context;
+
+  template <class HandlerArg, class... HandlerArgs>
+  StaticPipeline(
+      bool isFirst,
+      HandlerArg&& handler,
+      HandlerArgs&&... handlers)
+    : StaticPipeline<R, W, Handlers...>(
+          false,
+          std::forward<HandlerArgs>(handlers)...) {
+    isFirst_ = isFirst;
+    setHandler(std::forward<HandlerArg>(handler));
+    CHECK(handlerPtr_);
+    ctx_.initialize(this, handlerPtr_);
+    Pipeline<R, W>::addContextFront(&ctx_);
+    if (isFirst_) {
+      Pipeline<R, W>::finalize();
+    }
+  }
+
+ private:
+  template <class HandlerArg>
+  typename std::enable_if<std::is_same<
+    typename std::remove_reference<HandlerArg>::type,
+    Handler
+  >::value>::type
+  setHandler(HandlerArg&& arg) {
+    handler_.emplace(std::forward<HandlerArg>(arg));
+    handlerPtr_ = std::shared_ptr<Handler>(&(*handler_), [](Handler*){});
+  }
+
+  template <class HandlerArg>
+  typename std::enable_if<std::is_same<
+    typename std::decay<HandlerArg>::type,
+    std::shared_ptr<Handler>
+  >::value>::type
+  setHandler(HandlerArg&& arg) {
+    handlerPtr_ = std::forward<HandlerArg>(arg);
+  }
+
+  template <class HandlerArg>
+  typename std::enable_if<std::is_same<
+    typename std::decay<HandlerArg>::type,
+    Handler*
+  >::value>::type
+  setHandler(HandlerArg&& arg) {
+    handlerPtr_ = std::shared_ptr<Handler>(arg, [](Handler*){});
+  }
+
+  bool isFirst_;
+  folly::Optional<Handler> handler_;
+  std::shared_ptr<Handler> handlerPtr_;
+  ContextImpl<Pipeline<R, W>, Handler> ctx_;
+};
+
+}} // folly::wangle
index a08509b653461d6430a3cd522bfc2212ea180fe4..51f67275eef215c6b5d2199cfddfc20011ad9372 100644 (file)
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-#include <folly/wangle/channel/Pipeline.h>
+#include <folly/wangle/channel/StaticPipeline.h>
 #include <folly/wangle/channel/OutputBufferingHandler.h>
 #include <folly/wangle/channel/test/MockHandler.h>
 #include <folly/io/async/AsyncSocket.h>
@@ -35,8 +35,8 @@ MATCHER_P(IOBufContains, str, "") { return arg->moveToFbString() == str; }
 TEST(OutputBufferingHandlerTest, Basic) {
   MockBytesHandler mockHandler;
   EXPECT_CALL(mockHandler, attachPipeline(_));
-  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
-    HandlerPtr<MockBytesHandler, false>,
+  StaticPipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
+    MockBytesHandler,
     OutputBufferingHandler>
   pipeline(&mockHandler, OutputBufferingHandler{});
 
index 5fa97a6483d7294857703cfef8605dcd43e79511..d4d2ce7ce67f06f0943dd35dad2b6287252cb37d 100644 (file)
@@ -16,6 +16,7 @@
 
 #include <folly/wangle/channel/Handler.h>
 #include <folly/wangle/channel/Pipeline.h>
+#include <folly/wangle/channel/StaticPipeline.h>
 #include <folly/wangle/channel/AsyncSocketHandler.h>
 #include <folly/wangle/channel/OutputBufferingHandler.h>
 #include <folly/wangle/channel/test/MockHandler.h>
@@ -27,7 +28,6 @@ using namespace folly::wangle;
 using namespace testing;
 
 typedef StrictMock<MockHandlerAdapter<int, int>> IntHandler;
-typedef HandlerPtr<IntHandler, false> IntHandlerPtr;
 
 ACTION(FireRead) {
   arg0->fireRead(arg1);
@@ -55,7 +55,7 @@ TEST(PipelineTest, RealHandlersCompile) {
   auto socket = AsyncSocket::newSocket(&eb);
   // static
   {
-    Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
+    StaticPipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
       AsyncSocketHandler,
       OutputBufferingHandler>
     pipeline{AsyncSocketHandler(socket), OutputBufferingHandler()};
@@ -82,7 +82,7 @@ TEST(PipelineTest, FireActions) {
   EXPECT_CALL(handler1, attachPipeline(_));
   EXPECT_CALL(handler2, attachPipeline(_));
 
-  Pipeline<int, int, IntHandlerPtr, IntHandlerPtr>
+  StaticPipeline<int, int, IntHandler, IntHandler>
   pipeline(&handler1, &handler2);
 
   EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
@@ -114,7 +114,7 @@ TEST(PipelineTest, FireActions) {
 TEST(PipelineTest, ReachEndOfPipeline) {
   IntHandler handler;
   EXPECT_CALL(handler, attachPipeline(_));
-  Pipeline<int, int, IntHandlerPtr>
+  StaticPipeline<int, int, IntHandler>
   pipeline(&handler);
 
   EXPECT_CALL(handler, read_(_, _)).WillOnce(FireRead());
@@ -143,7 +143,7 @@ TEST(PipelineTest, TurnAround) {
   EXPECT_CALL(handler1, attachPipeline(_));
   EXPECT_CALL(handler2, attachPipeline(_));
 
-  Pipeline<int, int, IntHandlerPtr, IntHandlerPtr>
+  StaticPipeline<int, int, IntHandler, IntHandler>
   pipeline(&handler1, &handler2);
 
   EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
@@ -158,20 +158,20 @@ TEST(PipelineTest, TurnAround) {
 TEST(PipelineTest, DynamicFireActions) {
   IntHandler handler1, handler2, handler3;
   EXPECT_CALL(handler2, attachPipeline(_));
-  Pipeline<int, int, IntHandlerPtr>
+  StaticPipeline<int, int, IntHandler>
   pipeline(&handler2);
 
   EXPECT_CALL(handler1, attachPipeline(_));
   EXPECT_CALL(handler3, attachPipeline(_));
 
   pipeline
-    .addFront(IntHandlerPtr(&handler1))
-    .addBack(IntHandlerPtr(&handler3))
+    .addFront(&handler1)
+    .addBack(&handler3)
     .finalize();
 
-  EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(0));
-  EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(1));
-  EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(2));
+  EXPECT_TRUE(pipeline.getHandler<IntHandler>(0));
+  EXPECT_TRUE(pipeline.getHandler<IntHandler>(1));
+  EXPECT_TRUE(pipeline.getHandler<IntHandler>(2));
 
   EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
   EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireRead());
@@ -217,7 +217,7 @@ TEST(Pipeline, DynamicConstruction) {
       std::invalid_argument);
   }
   {
-    Pipeline<std::string, std::string, StringHandler, StringHandler>
+    StaticPipeline<std::string, std::string, StringHandler, StringHandler>
     pipeline{StringHandler(), StringHandler()};
 
     // Exercise both addFront and addBack. Final pipeline is
@@ -235,7 +235,7 @@ TEST(Pipeline, DynamicConstruction) {
 TEST(Pipeline, AttachTransport) {
   IntHandler handler;
   EXPECT_CALL(handler, attachPipeline(_));
-  Pipeline<int, int, IntHandlerPtr>
+  StaticPipeline<int, int, IntHandler>
   pipeline(&handler);
 
   EventBase eb;
index ac8ccc927e5f46f0f7acafa40034f68d92650063..b05aa9a26373831f6a77a54b5e99b06920e8e1a8 100644 (file)
@@ -34,9 +34,7 @@ class SerialClientDispatcher : public HandlerAdapter<Req, Resp>
 
   void setPipeline(Pipeline* pipeline) {
     pipeline_ = pipeline;
-    pipeline->addBack(
-      HandlerPtr<SerialClientDispatcher<Pipeline, Req, Resp>, false>(
-        this));
+    pipeline->addBack(this);
     pipeline->finalize();
   }