unidirectional pipelines
authorJames Sedgwick <jsedgwick@fb.com>
Thu, 30 Apr 2015 18:09:50 +0000 (11:09 -0700)
committerPraveen Kumar Ramakrishnan <praveenr@fb.com>
Tue, 12 May 2015 00:01:22 +0000 (17:01 -0700)
Summary:
Cleans up bootstrap a bit at the expense of a more complex Pipeline interface
This doesn't have to go in, lmk either way as I want to move on to reorganizing this code into inl headers etc

Test Plan: unit

Reviewed By: davejwatson@fb.com

Subscribers: fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2034634

Signature: t1:2034634:1430414670:c91712fb26353987cb471e35a54f55c869ae7cf1

folly/wangle/bootstrap/BootstrapTest.cpp
folly/wangle/bootstrap/ServerBootstrap-inl.h
folly/wangle/bootstrap/ServerBootstrap.h
folly/wangle/channel/Handler.h
folly/wangle/channel/Pipeline.h

index ddbd50ee672b96ce8c402f8c7e0e99a8296cb1ad..3e8908b87488f458ae6e80fbe09c209fc5b75635 100644 (file)
@@ -267,18 +267,12 @@ TEST(Bootstrap, ExistingSocket) {
 
 std::atomic<int> connections{0};
 
-class TestHandlerPipeline
-    : public HandlerAdapter<void*,
-                                   std::exception> {
+class TestHandlerPipeline : public InboundHandler<void*> {
  public:
   void read(Context* ctx, void* conn) {
     connections++;
     return ctx->fireRead(conn);
   }
-
-  Future<void> write(Context* ctx, std::exception e) {
-    return ctx->fireWrite(e);
-  }
 };
 
 template <typename HandlerPipeline>
@@ -316,17 +310,11 @@ TEST(Bootstrap, LoadBalanceHandler) {
   CHECK(connections == 1);
 }
 
-class TestUDPPipeline
-    : public HandlerAdapter<void*,
-                                   std::exception> {
+class TestUDPPipeline : public InboundHandler<void*> {
  public:
   void read(Context* ctx, void* conn) {
     connections++;
   }
-
-  Future<void> write(Context* ctx, std::exception e) {
-    return ctx->fireWrite(e);
-  }
 };
 
 TEST(Bootstrap, UDP) {
index 54fde1969f18fd7dc73c00f0cc87f3eea50c7985..543a655f46081fce30e9aec8bc168741ff2ad889 100644 (file)
@@ -60,8 +60,7 @@ class ServerAcceptor
  public:
   explicit ServerAcceptor(
         std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
-        std::shared_ptr<folly::wangle::Pipeline<
-                          void*, std::exception>> acceptorPipeline,
+        std::shared_ptr<folly::wangle::Pipeline<void*>> acceptorPipeline,
         EventBase* base)
       : Acceptor(ServerSocketConfig())
       , base_(base)
@@ -105,8 +104,7 @@ class ServerAcceptor
   EventBase* base_;
 
   std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
-  std::shared_ptr<folly::wangle::Pipeline<
-    void*, std::exception>> acceptorPipeline_;
+  std::shared_ptr<folly::wangle::Pipeline<void*>> acceptorPipeline_;
 };
 
 template <typename Pipeline>
@@ -114,22 +112,19 @@ class ServerAcceptorFactory : public AcceptorFactory {
  public:
   explicit ServerAcceptorFactory(
     std::shared_ptr<PipelineFactory<Pipeline>> factory,
-    std::shared_ptr<PipelineFactory<folly::wangle::Pipeline<
-    void*, std::exception>>> pipeline)
+    std::shared_ptr<PipelineFactory<folly::wangle::Pipeline<void*>>> pipeline)
     : factory_(factory)
     , pipeline_(pipeline) {}
 
   std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
-    std::shared_ptr<folly::wangle::Pipeline<
-                      void*, std::exception>> pipeline(
-                        pipeline_->newPipeline(nullptr));
+    std::shared_ptr<folly::wangle::Pipeline<void*>> pipeline(
+        pipeline_->newPipeline(nullptr));
     return std::make_shared<ServerAcceptor<Pipeline>>(factory_, pipeline, base);
   }
  private:
   std::shared_ptr<PipelineFactory<Pipeline>> factory_;
   std::shared_ptr<PipelineFactory<
-    folly::wangle::Pipeline<
-      void*, std::exception>>> pipeline_;
+    folly::wangle::Pipeline<void*>>> pipeline_;
 };
 
 class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
@@ -179,10 +174,8 @@ void ServerWorkerPool::forEachWorker(F&& f) const {
 }
 
 class DefaultAcceptPipelineFactory
-    : public PipelineFactory<wangle::Pipeline<void*, std::exception>> {
-  typedef wangle::Pipeline<
-      void*,
-      std::exception> AcceptPipeline;
+    : public PipelineFactory<wangle::Pipeline<void*>> {
+  typedef wangle::Pipeline<void*> AcceptPipeline;
 
  public:
   AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
index 28785a1b3f58ee02c77ac619969ae34aff2e9917..3de9c85c9f84425f3c809d9716ebaf35ad37be48 100644 (file)
@@ -52,9 +52,7 @@ class ServerBootstrap {
     join();
   }
 
-  typedef wangle::Pipeline<
-   void*,
-   std::exception> AcceptPipeline;
+  typedef wangle::Pipeline<void*> AcceptPipeline;
   /*
    * Pipeline used to add connections to event bases.
    * This is used for UDP or for load balancing
index 676c2fabf3783e4ee14e6d643cb3d7b8255ab672..cdad11e53f65124e8c91db43239b695dc04412cf 100644 (file)
@@ -101,8 +101,6 @@ class Handler : public HandlerBase<HandlerContext<Rout, Wout>> {
   */
 };
 
-struct Unit{};
-
 template <class Rin, class Rout = Rin>
 class InboundHandler : public HandlerBase<InboundHandlerContext<Rout>> {
  public:
@@ -110,8 +108,8 @@ class InboundHandler : public HandlerBase<InboundHandlerContext<Rout>> {
 
   typedef Rin rin;
   typedef Rout rout;
-  typedef Unit win;
-  typedef Unit wout;
+  typedef Nothing win;
+  typedef Nothing wout;
   typedef InboundHandlerContext<Rout> Context;
   virtual ~InboundHandler() {}
 
@@ -129,8 +127,8 @@ class OutboundHandler : public HandlerBase<OutboundHandlerContext<Wout>> {
  public:
   static const HandlerDir dir = HandlerDir::OUT;
 
-  typedef Unit rin;
-  typedef Unit rout;
+  typedef Nothing rin;
+  typedef Nothing rout;
   typedef Win win;
   typedef Wout wout;
   typedef OutboundHandlerContext<Wout> Context;
index db7a71ef3571d7f1d2da2be26ada7a9bc65c67e6..700fc804e2470c17febf3e3cd1342f8d9b166011 100644 (file)
 
 namespace folly { namespace wangle {
 
+// See Pipeline docblock for purpose
+struct Nothing{};
+
+namespace detail {
+
+template <class T>
+inline void logWarningIfNotNothing(const std::string& warning) {
+  LOG(WARNING) << warning;
+}
+
+template <>
+inline void logWarningIfNotNothing<Nothing>(const std::string& warning) {
+  // do nothing
+}
+
+} // detail
+
 /*
  * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
  * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
+ *
+ * Use Nothing for one of the types if your pipeline is unidirectional.
+ * If R is Nothing, read(), readEOF(), and readException() will be disabled.
+ * If W is Nothing, write() and close() will be disabled.
  */
-template <class R, class W>
+template <class R, class W = Nothing>
 class Pipeline : public DelayedDestruction {
  public:
   Pipeline() : isStatic_(false) {}
@@ -61,21 +82,27 @@ class Pipeline : public DelayedDestruction {
     return readBufferSettings_;
   }
 
-  void read(R msg) {
+  template <class T = R>
+  typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+  read(R msg) {
     if (!front_) {
       throw std::invalid_argument("read(): no inbound handler in Pipeline");
     }
     front_->read(std::forward<R>(msg));
   }
 
-  void readEOF() {
+  template <class T = R>
+  typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+  readEOF() {
     if (!front_) {
       throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
     }
     front_->readEOF();
   }
 
-  void readException(exception_wrapper e) {
+  template <class T = R>
+  typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+  readException(exception_wrapper e) {
     if (!front_) {
       throw std::invalid_argument(
           "readException(): no inbound handler in Pipeline");
@@ -83,14 +110,18 @@ class Pipeline : public DelayedDestruction {
     front_->readException(std::move(e));
   }
 
-  Future<void> write(W msg) {
+  template <class T = W>
+  typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
+  write(W msg) {
     if (!back_) {
       throw std::invalid_argument("write(): no outbound handler in Pipeline");
     }
     return back_->write(std::forward<W>(msg));
   }
 
-  Future<void> close() {
+  template <class T = W>
+  typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
+  close() {
     if (!back_) {
       throw std::invalid_argument("close(): no outbound handler in Pipeline");
     }
@@ -154,12 +185,14 @@ class Pipeline : public DelayedDestruction {
     }
 
     if (!front_) {
-      LOG(WARNING) << "No inbound handler in Pipeline, "
-                      "inbound operations will throw std::invalid_argument";
+      detail::logWarningIfNotNothing<R>(
+          "No inbound handler in Pipeline, inbound operations will throw "
+          "std::invalid_argument");
     }
     if (!back_) {
-      LOG(WARNING) << "No outbound handler in Pipeline, "
-                      "outbound operations will throw std::invalid_argument";
+      detail::logWarningIfNotNothing<W>(
+          "No outbound handler in Pipeline, outbound operations will throw "
+          "std::invalid_argument");
     }
 
     for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {