From c7138e7ca18e6cfcc833e26ac3910fab44c80694 Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Thu, 30 Apr 2015 11:09:50 -0700 Subject: [PATCH] unidirectional pipelines Summary: Cleans up bootstrap a bit at the expense of a more complex Pipeline interface This doesn't have to go in, lmk either way as I want to move on to reorganizing this code into inl headers etc Test Plan: unit Reviewed By: davejwatson@fb.com Subscribers: fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D2034634 Signature: t1:2034634:1430414670:c91712fb26353987cb471e35a54f55c869ae7cf1 --- folly/wangle/bootstrap/BootstrapTest.cpp | 16 +----- folly/wangle/bootstrap/ServerBootstrap-inl.h | 23 +++------ folly/wangle/bootstrap/ServerBootstrap.h | 4 +- folly/wangle/channel/Handler.h | 10 ++-- folly/wangle/channel/Pipeline.h | 53 ++++++++++++++++---- 5 files changed, 58 insertions(+), 48 deletions(-) diff --git a/folly/wangle/bootstrap/BootstrapTest.cpp b/folly/wangle/bootstrap/BootstrapTest.cpp index ddbd50ee..3e8908b8 100644 --- a/folly/wangle/bootstrap/BootstrapTest.cpp +++ b/folly/wangle/bootstrap/BootstrapTest.cpp @@ -267,18 +267,12 @@ TEST(Bootstrap, ExistingSocket) { std::atomic connections{0}; -class TestHandlerPipeline - : public HandlerAdapter { +class TestHandlerPipeline : public InboundHandler { public: void read(Context* ctx, void* conn) { connections++; return ctx->fireRead(conn); } - - Future write(Context* ctx, std::exception e) { - return ctx->fireWrite(e); - } }; template @@ -316,17 +310,11 @@ TEST(Bootstrap, LoadBalanceHandler) { CHECK(connections == 1); } -class TestUDPPipeline - : public HandlerAdapter { +class TestUDPPipeline : public InboundHandler { public: void read(Context* ctx, void* conn) { connections++; } - - Future write(Context* ctx, std::exception e) { - return ctx->fireWrite(e); - } }; TEST(Bootstrap, UDP) { diff --git a/folly/wangle/bootstrap/ServerBootstrap-inl.h b/folly/wangle/bootstrap/ServerBootstrap-inl.h index 54fde196..543a655f 100644 --- a/folly/wangle/bootstrap/ServerBootstrap-inl.h +++ b/folly/wangle/bootstrap/ServerBootstrap-inl.h @@ -60,8 +60,7 @@ class ServerAcceptor public: explicit ServerAcceptor( std::shared_ptr> pipelineFactory, - std::shared_ptr> acceptorPipeline, + std::shared_ptr> acceptorPipeline, EventBase* base) : Acceptor(ServerSocketConfig()) , base_(base) @@ -105,8 +104,7 @@ class ServerAcceptor EventBase* base_; std::shared_ptr> childPipelineFactory_; - std::shared_ptr> acceptorPipeline_; + std::shared_ptr> acceptorPipeline_; }; template @@ -114,22 +112,19 @@ class ServerAcceptorFactory : public AcceptorFactory { public: explicit ServerAcceptorFactory( std::shared_ptr> factory, - std::shared_ptr>> pipeline) + std::shared_ptr>> pipeline) : factory_(factory) , pipeline_(pipeline) {} std::shared_ptr newAcceptor(EventBase* base) { - std::shared_ptr> pipeline( - pipeline_->newPipeline(nullptr)); + std::shared_ptr> pipeline( + pipeline_->newPipeline(nullptr)); return std::make_shared>(factory_, pipeline, base); } private: std::shared_ptr> factory_; std::shared_ptr>> pipeline_; + folly::wangle::Pipeline>> pipeline_; }; class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer { @@ -179,10 +174,8 @@ void ServerWorkerPool::forEachWorker(F&& f) const { } class DefaultAcceptPipelineFactory - : public PipelineFactory> { - typedef wangle::Pipeline< - void*, - std::exception> AcceptPipeline; + : public PipelineFactory> { + typedef wangle::Pipeline AcceptPipeline; public: AcceptPipeline* newPipeline(std::shared_ptr) { diff --git a/folly/wangle/bootstrap/ServerBootstrap.h b/folly/wangle/bootstrap/ServerBootstrap.h index 28785a1b..3de9c85c 100644 --- a/folly/wangle/bootstrap/ServerBootstrap.h +++ b/folly/wangle/bootstrap/ServerBootstrap.h @@ -52,9 +52,7 @@ class ServerBootstrap { join(); } - typedef wangle::Pipeline< - void*, - std::exception> AcceptPipeline; + typedef wangle::Pipeline AcceptPipeline; /* * Pipeline used to add connections to event bases. * This is used for UDP or for load balancing diff --git a/folly/wangle/channel/Handler.h b/folly/wangle/channel/Handler.h index 676c2fab..cdad11e5 100644 --- a/folly/wangle/channel/Handler.h +++ b/folly/wangle/channel/Handler.h @@ -101,8 +101,6 @@ class Handler : public HandlerBase> { */ }; -struct Unit{}; - template class InboundHandler : public HandlerBase> { public: @@ -110,8 +108,8 @@ class InboundHandler : public HandlerBase> { typedef Rin rin; typedef Rout rout; - typedef Unit win; - typedef Unit wout; + typedef Nothing win; + typedef Nothing wout; typedef InboundHandlerContext Context; virtual ~InboundHandler() {} @@ -129,8 +127,8 @@ class OutboundHandler : public HandlerBase> { public: static const HandlerDir dir = HandlerDir::OUT; - typedef Unit rin; - typedef Unit rout; + typedef Nothing rin; + typedef Nothing rout; typedef Win win; typedef Wout wout; typedef OutboundHandlerContext Context; diff --git a/folly/wangle/channel/Pipeline.h b/folly/wangle/channel/Pipeline.h index db7a71ef..700fc804 100644 --- a/folly/wangle/channel/Pipeline.h +++ b/folly/wangle/channel/Pipeline.h @@ -26,11 +26,32 @@ namespace folly { namespace wangle { +// See Pipeline docblock for purpose +struct Nothing{}; + +namespace detail { + +template +inline void logWarningIfNotNothing(const std::string& warning) { + LOG(WARNING) << warning; +} + +template <> +inline void logWarningIfNotNothing(const std::string& warning) { + // do nothing +} + +} // detail + /* * R is the inbound type, i.e. inbound calls start with pipeline.read(R) * W is the outbound type, i.e. outbound calls start with pipeline.write(W) + * + * Use Nothing for one of the types if your pipeline is unidirectional. + * If R is Nothing, read(), readEOF(), and readException() will be disabled. + * If W is Nothing, write() and close() will be disabled. */ -template +template class Pipeline : public DelayedDestruction { public: Pipeline() : isStatic_(false) {} @@ -61,21 +82,27 @@ class Pipeline : public DelayedDestruction { return readBufferSettings_; } - void read(R msg) { + template + typename std::enable_if::value>::type + read(R msg) { if (!front_) { throw std::invalid_argument("read(): no inbound handler in Pipeline"); } front_->read(std::forward(msg)); } - void readEOF() { + template + typename std::enable_if::value>::type + readEOF() { if (!front_) { throw std::invalid_argument("readEOF(): no inbound handler in Pipeline"); } front_->readEOF(); } - void readException(exception_wrapper e) { + template + typename std::enable_if::value>::type + readException(exception_wrapper e) { if (!front_) { throw std::invalid_argument( "readException(): no inbound handler in Pipeline"); @@ -83,14 +110,18 @@ class Pipeline : public DelayedDestruction { front_->readException(std::move(e)); } - Future write(W msg) { + template + typename std::enable_if::value, Future>::type + write(W msg) { if (!back_) { throw std::invalid_argument("write(): no outbound handler in Pipeline"); } return back_->write(std::forward(msg)); } - Future close() { + template + typename std::enable_if::value, Future>::type + close() { if (!back_) { throw std::invalid_argument("close(): no outbound handler in Pipeline"); } @@ -154,12 +185,14 @@ class Pipeline : public DelayedDestruction { } if (!front_) { - LOG(WARNING) << "No inbound handler in Pipeline, " - "inbound operations will throw std::invalid_argument"; + detail::logWarningIfNotNothing( + "No inbound handler in Pipeline, inbound operations will throw " + "std::invalid_argument"); } if (!back_) { - LOG(WARNING) << "No outbound handler in Pipeline, " - "outbound operations will throw std::invalid_argument"; + detail::logWarningIfNotNothing( + "No outbound handler in Pipeline, outbound operations will throw " + "std::invalid_argument"); } for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) { -- 2.34.1