rearrange Pipeline to have more functionality in PipelineBase
authorJames Sedgwick <jsedgwick@fb.com>
Tue, 23 Jun 2015 18:21:34 +0000 (11:21 -0700)
committerSara Golemon <sgolemon@fb.com>
Wed, 24 Jun 2015 16:41:04 +0000 (09:41 -0700)
Summary: This way, handlers can carry out more complex manipulations of the pipeline via ctx->getPipeline() without knowing the R/W types

Reviewed By: @djwatson

Differential Revision: D2158736

folly/Makefile.am
folly/wangle/channel/HandlerContext-inl.h
folly/wangle/channel/Pipeline-inl.h
folly/wangle/channel/Pipeline.cpp [new file with mode: 0644]
folly/wangle/channel/Pipeline.h
folly/wangle/channel/StaticPipeline.h

index 9b1c9e0ae30ba20531ae36b5f2bccbe69a5e4772..e615a7336e04ab574539a84ef59cffff00c57bbe 100644 (file)
@@ -420,6 +420,7 @@ libfolly_la_SOURCES = \
        wangle/acceptor/SocketOptions.cpp \
        wangle/acceptor/TransportInfo.cpp \
        wangle/bootstrap/ServerBootstrap.cpp \
+       wangle/channel/Pipeline.cpp \
        wangle/concurrent/CPUThreadPoolExecutor.cpp \
        wangle/concurrent/Codel.cpp \
        wangle/concurrent/IOThreadPoolExecutor.cpp \
index 9f111c419069ee4a0913da6686ecc8e1aea1572b..6877f7defcfe9aa059dbe644a7e1ce3b341c766e 100644 (file)
@@ -59,7 +59,7 @@ class OutboundLink {
   virtual Future<void> close() = 0;
 };
 
-template <class P, class H, class Context>
+template <class H, class Context>
 class ContextImplBase : public PipelineContext {
  public:
   ~ContextImplBase() = default;
@@ -68,7 +68,7 @@ class ContextImplBase : public PipelineContext {
     return handler_.get();
   }
 
-  void initialize(P* pipeline, std::shared_ptr<H> handler) {
+  void initialize(PipelineBase* pipeline, std::shared_ptr<H> handler) {
     pipeline_ = pipeline;
     handler_ = std::move(handler);
   }
@@ -119,24 +119,24 @@ class ContextImplBase : public PipelineContext {
 
  protected:
   Context* impl_;
-  P* pipeline_;
+  PipelineBase* pipeline_;
   std::shared_ptr<H> handler_;
   InboundLink<typename H::rout>* nextIn_{nullptr};
   OutboundLink<typename H::wout>* nextOut_{nullptr};
 
  private:
   bool attached_{false};
-  using DestructorGuard = typename P::DestructorGuard;
+  using DestructorGuard = typename DelayedDestruction::DestructorGuard;
 };
 
-template <class P, class H>
+template <class H>
 class ContextImpl
   : public HandlerContext<typename H::rout,
                           typename H::wout>,
     public InboundLink<typename H::rin>,
     public OutboundLink<typename H::win>,
-    public ContextImplBase<P, H, HandlerContext<typename H::rout,
-                                                typename H::wout>> {
+    public ContextImplBase<H, HandlerContext<typename H::rout,
+                                             typename H::wout>> {
  public:
   typedef typename H::rin Rin;
   typedef typename H::rout Rout;
@@ -144,7 +144,7 @@ class ContextImpl
   typedef typename H::wout Wout;
   static const HandlerDir dir = HandlerDir::BOTH;
 
-  explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
+  explicit ContextImpl(PipelineBase* pipeline, std::shared_ptr<H> handler) {
     this->impl_ = this;
     this->initialize(pipeline, std::move(handler));
   }
@@ -278,14 +278,14 @@ class ContextImpl
   }
 
  private:
-  using DestructorGuard = typename P::DestructorGuard;
+  using DestructorGuard = typename DelayedDestruction::DestructorGuard;
 };
 
-template <class P, class H>
+template <class H>
 class InboundContextImpl
   : public InboundHandlerContext<typename H::rout>,
     public InboundLink<typename H::rin>,
-    public ContextImplBase<P, H, InboundHandlerContext<typename H::rout>> {
+    public ContextImplBase<H, InboundHandlerContext<typename H::rout>> {
  public:
   typedef typename H::rin Rin;
   typedef typename H::rout Rout;
@@ -293,7 +293,9 @@ class InboundContextImpl
   typedef typename H::wout Wout;
   static const HandlerDir dir = HandlerDir::IN;
 
-  explicit InboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
+  explicit InboundContextImpl(
+      PipelineBase* pipeline,
+      std::shared_ptr<H> handler) {
     this->impl_ = this;
     this->initialize(pipeline, std::move(handler));
   }
@@ -378,14 +380,14 @@ class InboundContextImpl
   }
 
  private:
-  using DestructorGuard = typename P::DestructorGuard;
+  using DestructorGuard = typename DelayedDestruction::DestructorGuard;
 };
 
-template <class P, class H>
+template <class H>
 class OutboundContextImpl
   : public OutboundHandlerContext<typename H::wout>,
     public OutboundLink<typename H::win>,
-    public ContextImplBase<P, H, OutboundHandlerContext<typename H::wout>> {
+    public ContextImplBase<H, OutboundHandlerContext<typename H::wout>> {
  public:
   typedef typename H::rin Rin;
   typedef typename H::rout Rout;
@@ -393,7 +395,9 @@ class OutboundContextImpl
   typedef typename H::wout Wout;
   static const HandlerDir dir = HandlerDir::OUT;
 
-  explicit OutboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
+  explicit OutboundContextImpl(
+      PipelineBase* pipeline,
+      std::shared_ptr<H> handler) {
     this->impl_ = this;
     this->initialize(pipeline, std::move(handler));
   }
@@ -442,18 +446,18 @@ class OutboundContextImpl
   }
 
  private:
-  using DestructorGuard = typename P::DestructorGuard;
+  using DestructorGuard = typename DelayedDestruction::DestructorGuard;
 };
 
-template <class Handler, class Pipeline>
+template <class Handler>
 struct ContextType {
   typedef typename std::conditional<
     Handler::dir == HandlerDir::BOTH,
-    ContextImpl<Pipeline, Handler>,
+    ContextImpl<Handler>,
     typename std::conditional<
       Handler::dir == HandlerDir::IN,
-      InboundContextImpl<Pipeline, Handler>,
-      OutboundContextImpl<Pipeline, Handler>
+      InboundContextImpl<Handler>,
+      OutboundContextImpl<Handler>
     >::type>::type
   type;
 };
index f4f7e000b9ee5a1a40cb385f677d9e29808bc440..64096a145010177b7e63e7a20e5b5922d69a7835 100644 (file)
@@ -35,28 +35,124 @@ Pipeline<R, W>::~Pipeline() {
   }
 }
 
-template <class R, class W>
-void Pipeline<R, W>::setWriteFlags(WriteFlags flags) {
-  writeFlags_ = flags;
+template <class H>
+PipelineBase& PipelineBase::addBack(std::shared_ptr<H> handler) {
+  typedef typename ContextType<H>::type Context;
+  return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
 }
 
-template <class R, class W>
-WriteFlags Pipeline<R, W>::getWriteFlags() {
-  return writeFlags_;
+template <class H>
+PipelineBase& PipelineBase::addBack(H&& handler) {
+  return addBack(std::make_shared<H>(std::forward<H>(handler)));
 }
 
-template <class R, class W>
-void Pipeline<R, W>::setReadBufferSettings(
-    uint64_t minAvailable,
-    uint64_t allocationSize) {
-  readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
+template <class H>
+PipelineBase& PipelineBase::addBack(H* handler) {
+  return addBack(std::shared_ptr<H>(handler, [](H*){}));
 }
 
-template <class R, class W>
-std::pair<uint64_t, uint64_t> Pipeline<R, W>::getReadBufferSettings() {
-  return readBufferSettings_;
+template <class H>
+PipelineBase& PipelineBase::addFront(std::shared_ptr<H> handler) {
+  typedef typename ContextType<H>::type Context;
+  return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
+}
+
+template <class H>
+PipelineBase& PipelineBase::addFront(H&& handler) {
+  return addFront(std::make_shared<H>(std::forward<H>(handler)));
+}
+
+template <class H>
+PipelineBase& PipelineBase::addFront(H* handler) {
+  return addFront(std::shared_ptr<H>(handler, [](H*){}));
+}
+
+template <class H>
+PipelineBase& PipelineBase::removeHelper(H* handler, bool checkEqual) {
+  typedef typename ContextType<H>::type Context;
+  bool removed = false;
+  for (auto it = ctxs_.begin(); it != ctxs_.end(); it++) {
+    auto ctx = std::dynamic_pointer_cast<Context>(*it);
+    if (ctx && (!checkEqual || ctx->getHandler() == handler)) {
+      it = removeAt(it);
+      removed = true;
+      if (it == ctxs_.end()) {
+        break;
+      }
+    }
+  }
+
+  if (!removed) {
+    throw std::invalid_argument("No such handler in pipeline");
+  }
+
+  return *this;
+}
+
+template <class H>
+PipelineBase& PipelineBase::remove() {
+  return removeHelper<H>(nullptr, false);
+}
+
+template <class H>
+PipelineBase& PipelineBase::remove(H* handler) {
+  return removeHelper<H>(handler, true);
+}
+
+template <class H>
+H* PipelineBase::getHandler(int i) {
+  typedef typename ContextType<H>::type Context;
+  auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
+  CHECK(ctx);
+  return ctx->getHandler();
+}
+
+template <class H>
+bool PipelineBase::setOwner(H* handler) {
+  typedef typename ContextType<H>::type Context;
+  for (auto& ctx : ctxs_) {
+    auto ctxImpl = dynamic_cast<Context*>(ctx.get());
+    if (ctxImpl && ctxImpl->getHandler() == handler) {
+      owner_ = ctx;
+      return true;
+    }
+  }
+  return false;
+}
+
+template <class Context>
+void PipelineBase::addContextFront(Context* ctx) {
+  addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
+}
+
+template <class Context>
+PipelineBase& PipelineBase::addHelper(
+    std::shared_ptr<Context>&& ctx,
+    bool front) {
+  ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
+  if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
+    inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
+  }
+  if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
+    outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
+  }
+  return *this;
+}
+
+namespace detail {
+
+template <class T>
+inline void logWarningIfNotUnit(const std::string& warning) {
+  LOG(WARNING) << warning;
+}
+
+template <>
+inline void logWarningIfNotUnit<Unit>(const std::string& warning) {
+  // do nothing
 }
 
+} // detail
+
 template <class R, class W>
 template <class T>
 typename std::enable_if<!std::is_same<T, Unit>::value>::type
@@ -126,141 +222,6 @@ Pipeline<R, W>::close() {
   return back_->close();
 }
 
-template <class R, class W>
-template <class H>
-Pipeline<R, W>& Pipeline<R, W>::addBack(std::shared_ptr<H> handler) {
-  typedef typename ContextType<H, Pipeline<R, W>>::type Context;
-  return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
-}
-
-template <class R, class W>
-template <class H>
-Pipeline<R, W>& Pipeline<R, W>::addBack(H&& handler) {
-  return addBack(std::make_shared<H>(std::forward<H>(handler)));
-}
-
-template <class R, class W>
-template <class H>
-Pipeline<R, W>& Pipeline<R, W>::addBack(H* handler) {
-  return addBack(std::shared_ptr<H>(handler, [](H*){}));
-}
-
-template <class R, class W>
-template <class H>
-Pipeline<R, W>& Pipeline<R, W>::addFront(std::shared_ptr<H> handler) {
-  typedef typename ContextType<H, Pipeline<R, W>>::type Context;
-  return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
-}
-
-template <class R, class W>
-template <class H>
-Pipeline<R, W>& Pipeline<R, W>::addFront(H&& handler) {
-  return addFront(std::make_shared<H>(std::forward<H>(handler)));
-}
-
-template <class R, class W>
-template <class H>
-Pipeline<R, W>& Pipeline<R, W>::addFront(H* handler) {
-  return addFront(std::shared_ptr<H>(handler, [](H*){}));
-}
-
-template <class R, class W>
-template <class H>
-Pipeline<R, W>& Pipeline<R, W>::removeHelper(H* handler, bool checkEqual) {
-  typedef typename ContextType<H, Pipeline<R, W>>::type Context;
-  bool removed = false;
-  for (auto it = ctxs_.begin(); it != ctxs_.end(); it++) {
-    auto ctx = std::dynamic_pointer_cast<Context>(*it);
-    if (ctx && (!checkEqual || ctx->getHandler() == handler)) {
-      it = removeAt(it);
-      removed = true;
-      if (it == ctxs_.end()) {
-        break;
-      }
-    }
-  }
-
-  if (!removed) {
-    throw std::invalid_argument("No such handler in pipeline");
-  }
-
-  return *this;
-}
-
-template <class R, class W>
-template <class H>
-Pipeline<R, W>& Pipeline<R, W>::remove() {
-  return removeHelper<H>(nullptr, false);
-}
-
-template <class R, class W>
-template <class H>
-Pipeline<R, W>& Pipeline<R, W>::remove(H* handler) {
-  return removeHelper<H>(handler, true);
-}
-
-template <class R, class W>
-typename Pipeline<R, W>::ContextIterator Pipeline<R, W>::removeAt(
-    const typename Pipeline<R, W>::ContextIterator& it) {
-  (*it)->detachPipeline();
-
-  const auto dir = (*it)->getDirection();
-  if (dir == HandlerDir::BOTH || dir == HandlerDir::IN) {
-    auto it2 = std::find(inCtxs_.begin(), inCtxs_.end(), it->get());
-    CHECK(it2 != inCtxs_.end());
-    inCtxs_.erase(it2);
-  }
-
-  if (dir == HandlerDir::BOTH || dir == HandlerDir::OUT) {
-    auto it2 = std::find(outCtxs_.begin(), outCtxs_.end(), it->get());
-    CHECK(it2 != outCtxs_.end());
-    outCtxs_.erase(it2);
-  }
-
-  return ctxs_.erase(it);
-}
-
-template <class R, class W>
-Pipeline<R, W>& Pipeline<R, W>::removeFront() {
-  if (ctxs_.empty()) {
-    throw std::invalid_argument("No handlers in pipeline");
-  }
-  removeAt(ctxs_.begin());
-  return *this;
-}
-
-template <class R, class W>
-Pipeline<R, W>& Pipeline<R, W>::removeBack() {
-  if (ctxs_.empty()) {
-    throw std::invalid_argument("No handlers in pipeline");
-  }
-  removeAt(--ctxs_.end());
-  return *this;
-}
-
-template <class R, class W>
-template <class H>
-H* Pipeline<R, W>::getHandler(int i) {
-  typedef typename ContextType<H, Pipeline<R, W>>::type Context;
-  auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
-  CHECK(ctx);
-  return ctx->getHandler();
-}
-
-namespace detail {
-
-template <class T>
-inline void logWarningIfNotUnit(const std::string& warning) {
-  LOG(WARNING) << warning;
-}
-
-template <>
-inline void logWarningIfNotUnit<Unit>(const std::string& warning) {
-  // do nothing
-}
-
-} // detail
-
 // TODO Have read/write/etc check that pipeline has been finalized
 template <class R, class W>
 void Pipeline<R, W>::finalize() {
@@ -298,48 +259,4 @@ void Pipeline<R, W>::finalize() {
   }
 }
 
-template <class R, class W>
-template <class H>
-bool Pipeline<R, W>::setOwner(H* handler) {
-  typedef typename ContextType<H, Pipeline<R, W>>::type Context;
-  for (auto& ctx : ctxs_) {
-    auto ctxImpl = dynamic_cast<Context*>(ctx.get());
-    if (ctxImpl && ctxImpl->getHandler() == handler) {
-      owner_ = ctx;
-      return true;
-    }
-  }
-  return false;
-}
-
-template <class R, class W>
-template <class Context>
-void Pipeline<R, W>::addContextFront(Context* ctx) {
-  addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
-}
-
-template <class R, class W>
-void Pipeline<R, W>::detachHandlers() {
-  for (auto& ctx : ctxs_) {
-    if (ctx != owner_) {
-      ctx->detachPipeline();
-    }
-  }
-}
-
-template <class R, class W>
-template <class Context>
-Pipeline<R, W>& Pipeline<R, W>::addHelper(
-    std::shared_ptr<Context>&& ctx,
-    bool front) {
-  ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
-  if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
-    inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
-  }
-  if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
-    outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
-  }
-  return *this;
-}
-
 }} // folly::wangle
diff --git a/folly/wangle/channel/Pipeline.cpp b/folly/wangle/channel/Pipeline.cpp
new file mode 100644 (file)
index 0000000..331dd0b
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/wangle/channel/Pipeline.h>
+
+namespace folly { namespace wangle {
+
+void PipelineBase::setWriteFlags(WriteFlags flags) {
+  writeFlags_ = flags;
+}
+
+WriteFlags PipelineBase::getWriteFlags() {
+  return writeFlags_;
+}
+
+void PipelineBase::setReadBufferSettings(
+    uint64_t minAvailable,
+    uint64_t allocationSize) {
+  readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
+}
+
+std::pair<uint64_t, uint64_t> PipelineBase::getReadBufferSettings() {
+  return readBufferSettings_;
+}
+
+typename PipelineBase::ContextIterator PipelineBase::removeAt(
+    const typename PipelineBase::ContextIterator& it) {
+  (*it)->detachPipeline();
+
+  const auto dir = (*it)->getDirection();
+  if (dir == HandlerDir::BOTH || dir == HandlerDir::IN) {
+    auto it2 = std::find(inCtxs_.begin(), inCtxs_.end(), it->get());
+    CHECK(it2 != inCtxs_.end());
+    inCtxs_.erase(it2);
+  }
+
+  if (dir == HandlerDir::BOTH || dir == HandlerDir::OUT) {
+    auto it2 = std::find(outCtxs_.begin(), outCtxs_.end(), it->get());
+    CHECK(it2 != outCtxs_.end());
+    outCtxs_.erase(it2);
+  }
+
+  return ctxs_.erase(it);
+}
+
+PipelineBase& PipelineBase::removeFront() {
+  if (ctxs_.empty()) {
+    throw std::invalid_argument("No handlers in pipeline");
+  }
+  removeAt(ctxs_.begin());
+  return *this;
+}
+
+PipelineBase& PipelineBase::removeBack() {
+  if (ctxs_.empty()) {
+    throw std::invalid_argument("No handlers in pipeline");
+  }
+  removeAt(--ctxs_.end());
+  return *this;
+}
+
+void PipelineBase::detachHandlers() {
+  for (auto& ctx : ctxs_) {
+    if (ctx != owner_) {
+      ctx->detachPipeline();
+    }
+  }
+}
+
+}} // folly::wangle
index 85d89d894fa72a0e0ebae47af234489a8bbacf99..0a81be99cadf66b6c92dfb098a53aef385861885 100644 (file)
 
 #pragma once
 
-#include <folly/wangle/channel/HandlerContext.h>
 #include <folly/futures/Future.h>
 #include <folly/futures/Unit.h>
 #include <folly/io/async/AsyncTransport.h>
 #include <folly/io/async/DelayedDestruction.h>
+#include <folly/wangle/channel/HandlerContext.h>
 #include <folly/ExceptionWrapper.h>
 #include <folly/Memory.h>
 
 namespace folly { namespace wangle {
 
+class PipelineBase;
+
 class PipelineManager {
  public:
   virtual ~PipelineManager() = default;
@@ -54,92 +56,43 @@ class PipelineBase : public DelayedDestruction {
     return transport_;
   }
 
- private:
-  PipelineManager* manager_{nullptr};
-  std::shared_ptr<AsyncTransport> transport_;
-};
-
-/*
- * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
- * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
- *
- * Use Unit for one of the types if your pipeline is unidirectional.
- * If R is Unit, read(), readEOF(), and readException() will be disabled.
- * If W is Unit, write() and close() will be disabled.
- */
-template <class R, class W = Unit>
-class Pipeline : public PipelineBase {
- public:
-  Pipeline();
-  ~Pipeline();
-
   void setWriteFlags(WriteFlags flags);
   WriteFlags getWriteFlags();
 
   void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize);
   std::pair<uint64_t, uint64_t> getReadBufferSettings();
 
-  template <class T = R>
-  typename std::enable_if<!std::is_same<T, Unit>::value>::type
-  read(R msg);
-
-  template <class T = R>
-  typename std::enable_if<!std::is_same<T, Unit>::value>::type
-  readEOF();
-
-  template <class T = R>
-  typename std::enable_if<!std::is_same<T, Unit>::value>::type
-  readException(exception_wrapper e);
-
-  template <class T = R>
-  typename std::enable_if<!std::is_same<T, Unit>::value>::type
-  transportActive();
-
-  template <class T = R>
-  typename std::enable_if<!std::is_same<T, Unit>::value>::type
-  transportInactive();
-
-  template <class T = W>
-  typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
-  write(W msg);
-
-  template <class T = W>
-  typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
-  close();
-
   template <class H>
-  Pipeline& addBack(std::shared_ptr<H> handler);
+  PipelineBase& addBack(std::shared_ptr<H> handler);
 
   template <class H>
-  Pipeline& addBack(H&& handler);
+  PipelineBase& addBack(H&& handler);
 
   template <class H>
-  Pipeline& addBack(H* handler);
+  PipelineBase& addBack(H* handler);
 
   template <class H>
-  Pipeline& addFront(std::shared_ptr<H> handler);
+  PipelineBase& addFront(std::shared_ptr<H> handler);
 
   template <class H>
-  Pipeline& addFront(H&& handler);
+  PipelineBase& addFront(H&& handler);
 
   template <class H>
-  Pipeline& addFront(H* handler);
+  PipelineBase& addFront(H* handler);
 
   template <class H>
-  Pipeline& remove(H* handler);
+  PipelineBase& remove(H* handler);
 
   template <class H>
-  Pipeline& remove();
+  PipelineBase& remove();
 
-  Pipeline& removeFront();
+  PipelineBase& removeFront();
 
-  Pipeline& removeBack();
+  PipelineBase& removeBack();
 
   template <class H>
   H* getHandler(int i);
 
-  void finalize();
-
   // If one of the handlers owns the pipeline itself, use setOwner to ensure
   // that the pipeline doesn't try to detach the handler during destruction,
   // lest destruction ordering issues occur.
@@ -147,20 +100,27 @@ class Pipeline : public PipelineBase {
   template <class H>
   bool setOwner(H* handler);
 
- protected:
-  explicit Pipeline(bool isStatic);
+  virtual void finalize() = 0;
 
+ protected:
   template <class Context>
   void addContextFront(Context* ctx);
 
   void detachHandlers();
 
+  std::vector<std::shared_ptr<PipelineContext>> ctxs_;
+  std::vector<PipelineContext*> inCtxs_;
+  std::vector<PipelineContext*> outCtxs_;
+
  private:
+  PipelineManager* manager_{nullptr};
+  std::shared_ptr<AsyncTransport> transport_;
+
   template <class Context>
-  Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
+  PipelineBase& addHelper(std::shared_ptr<Context>&& ctx, bool front);
 
   template <class H>
-  Pipeline& removeHelper(H* handler, bool checkEqual);
+  PipelineBase& removeHelper(H* handler, bool checkEqual);
 
   typedef std::vector<std::shared_ptr<PipelineContext>>::iterator
     ContextIterator;
@@ -170,11 +130,59 @@ class Pipeline : public PipelineBase {
   WriteFlags writeFlags_{WriteFlags::NONE};
   std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
 
-  bool isStatic_{false};
   std::shared_ptr<PipelineContext> owner_;
-  std::vector<std::shared_ptr<PipelineContext>> ctxs_;
-  std::vector<PipelineContext*> inCtxs_;
-  std::vector<PipelineContext*> outCtxs_;
+};
+
+/*
+ * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
+ * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
+ *
+ * Use Unit for one of the types if your pipeline is unidirectional.
+ * If R is Unit, read(), readEOF(), and readException() will be disabled.
+ * If W is Unit, write() and close() will be disabled.
+ */
+template <class R, class W = Unit>
+class Pipeline : public PipelineBase {
+ public:
+  Pipeline();
+  ~Pipeline();
+
+  template <class T = R>
+  typename std::enable_if<!std::is_same<T, Unit>::value>::type
+  read(R msg);
+
+  template <class T = R>
+  typename std::enable_if<!std::is_same<T, Unit>::value>::type
+  readEOF();
+
+  template <class T = R>
+  typename std::enable_if<!std::is_same<T, Unit>::value>::type
+  readException(exception_wrapper e);
+
+  template <class T = R>
+  typename std::enable_if<!std::is_same<T, Unit>::value>::type
+  transportActive();
+
+  template <class T = R>
+  typename std::enable_if<!std::is_same<T, Unit>::value>::type
+  transportInactive();
+
+  template <class T = W>
+  typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
+  write(W msg);
+
+  template <class T = W>
+  typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
+  close();
+
+  void finalize() override;
+
+ protected:
+  explicit Pipeline(bool isStatic);
+
+ private:
+  bool isStatic_{false};
+
   InboundLink<R>* front_{nullptr};
   OutboundLink<W>* back_{nullptr};
 };
index a5d2e893eb5126d44a88fda999345390e195dbcb..ef4a804a8da80af7878be66d1fc9d66454f9a506 100644 (file)
@@ -131,7 +131,7 @@ class StaticPipeline<R, W, Handler, Handlers...>
 
   bool isFirst_;
   std::shared_ptr<Handler> handlerPtr_;
-  typename ContextType<Handler, Pipeline<R, W>>::type ctx_;
+  typename ContextType<Handler>::type ctx_;
 };
 
 }} // folly::wangle