From 1185881df4c4a6195f5cd99dae703a0ff0a1ec58 Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Wed, 13 May 2015 17:43:44 -0700 Subject: [PATCH] transportActive and transportInactive Summary: These are equivalents to Netty's channelActive and channelInactive, but we've been calling channels transports so I'm staying consistent. I skipped integrating this into TAsyncTransportHandler because thrift still does manual CB attachment/detachment and it's unclear how that fits into this model If my suspicions are correct, it *should* be possible to make attachReadCallback and detachReadCallback private in AsyncSocketHandler, right? And perhaps get rid of the event base modifier methods? What's our use case for those? Test Plan: unit, employ in telnet server Reviewed By: davejwatson@fb.com Subscribers: fugalh, alandau, bmatheny, folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D2044520 Signature: t1:2044520:1431551998:af1de358b5dbefcca148814015d8e9f63f458d5d --- folly/wangle/bootstrap/ClientBootstrap.h | 6 +- folly/wangle/bootstrap/ServerBootstrap-inl.h | 1 + folly/wangle/channel/AsyncSocketHandler.h | 20 +++-- folly/wangle/channel/Handler.h | 17 +++-- folly/wangle/channel/HandlerContext-inl.h | 75 ++++++++++++------- folly/wangle/channel/HandlerContext.h | 19 +++-- folly/wangle/channel/Pipeline-inl.h | 40 +++++----- folly/wangle/channel/Pipeline.h | 24 ++++-- .../test/OutputBufferingHandlerTest.cpp | 3 +- folly/wangle/channel/test/PipelineTest.cpp | 18 ----- 10 files changed, 130 insertions(+), 93 deletions(-) diff --git a/folly/wangle/bootstrap/ClientBootstrap.h b/folly/wangle/bootstrap/ClientBootstrap.h index 3ae2ce4f..ecd0c3b1 100644 --- a/folly/wangle/bootstrap/ClientBootstrap.h +++ b/folly/wangle/bootstrap/ClientBootstrap.h @@ -36,6 +36,9 @@ class ClientBootstrap { , bootstrap_(bootstrap) {} void connectSuccess() noexcept override { + if (bootstrap_->getPipeline()) { + bootstrap_->getPipeline()->transportActive(); + } promise_.setValue(bootstrap_->getPipeline()); delete this; } @@ -77,9 +80,6 @@ class ClientBootstrap { socket->connect( new ConnectCallback(std::move(promise), this), address); pipeline_ = pipelineFactory_->newPipeline(socket); - if (pipeline_) { - pipeline_->attachTransport(socket); - } }); return retval; } diff --git a/folly/wangle/bootstrap/ServerBootstrap-inl.h b/folly/wangle/bootstrap/ServerBootstrap-inl.h index 85342646..4b56de86 100644 --- a/folly/wangle/bootstrap/ServerBootstrap-inl.h +++ b/folly/wangle/bootstrap/ServerBootstrap-inl.h @@ -89,6 +89,7 @@ class ServerAcceptor std::shared_ptr( transport.release(), folly::DelayedDestruction::Destructor()))); + pipeline->transportActive(); auto connection = new ServerConnection(std::move(pipeline)); Acceptor::addConnection(connection); } diff --git a/folly/wangle/channel/AsyncSocketHandler.h b/folly/wangle/channel/AsyncSocketHandler.h index 04491bbd..26728494 100644 --- a/folly/wangle/channel/AsyncSocketHandler.h +++ b/folly/wangle/channel/AsyncSocketHandler.h @@ -37,9 +37,7 @@ class AsyncSocketHandler AsyncSocketHandler(AsyncSocketHandler&&) = default; ~AsyncSocketHandler() { - if (socket_) { - detachReadCallback(); - } + detachReadCallback(); } void attachReadCallback() { @@ -47,9 +45,14 @@ class AsyncSocketHandler } void detachReadCallback() { - if (socket_->getReadCallback() == this) { + if (socket_ && socket_->getReadCallback() == this) { socket_->setReadCB(nullptr); } + auto ctx = getContext(); + if (ctx && !firedInactive_) { + firedInactive_ = true; + ctx->fireTransportInactive(); + } } void attachEventBase(folly::EventBase* eventBase) { @@ -65,8 +68,14 @@ class AsyncSocketHandler } } - void attachPipeline(Context* ctx) override { + void transportActive(Context* ctx) override { + ctx->getPipeline()->setTransport(socket_); attachReadCallback(); + ctx->fireTransportActive(); + } + + void detachPipeline(Context* ctx) override { + detachReadCallback(); } folly::Future write( @@ -149,6 +158,7 @@ class AsyncSocketHandler folly::IOBufQueue bufQueue_{folly::IOBufQueue::cacheChainLength()}; std::shared_ptr socket_{nullptr}; + bool firedInactive_{false}; }; }} diff --git a/folly/wangle/channel/Handler.h b/folly/wangle/channel/Handler.h index cdad11e5..2080c2c2 100644 --- a/folly/wangle/channel/Handler.h +++ b/folly/wangle/channel/Handler.h @@ -29,10 +29,7 @@ class HandlerBase { virtual ~HandlerBase() {} virtual void attachPipeline(Context* ctx) {} - virtual void attachTransport(Context* ctx) {} - virtual void detachPipeline(Context* ctx) {} - virtual void detachTransport(Context* ctx) {} Context* getContext() { if (attachCount_ != 1) { @@ -67,6 +64,12 @@ class Handler : public HandlerBase> { virtual void readException(Context* ctx, exception_wrapper e) { ctx->fireReadException(std::move(e)); } + virtual void transportActive(Context* ctx) { + ctx->fireTransportActive(); + } + virtual void transportInactive(Context* ctx) { + ctx->fireTransportInactive(); + } virtual Future write(Context* ctx, Win msg) = 0; virtual Future close(Context* ctx) { @@ -81,8 +84,6 @@ class Handler : public HandlerBase> { exception_wrapper e) {} virtual void channelRegistered(HandlerContext* ctx) {} virtual void channelUnregistered(HandlerContext* ctx) {} - virtual void channelActive(HandlerContext* ctx) {} - virtual void channelInactive(HandlerContext* ctx) {} virtual void channelReadComplete(HandlerContext* ctx) {} virtual void userEventTriggered(HandlerContext* ctx, void* evt) {} virtual void channelWritabilityChanged(HandlerContext* ctx) {} @@ -120,6 +121,12 @@ class InboundHandler : public HandlerBase> { virtual void readException(Context* ctx, exception_wrapper e) { ctx->fireReadException(std::move(e)); } + virtual void transportActive(Context* ctx) { + ctx->fireTransportActive(); + } + virtual void transportInactive(Context* ctx) { + ctx->fireTransportInactive(); + } }; template diff --git a/folly/wangle/channel/HandlerContext-inl.h b/folly/wangle/channel/HandlerContext-inl.h index 49f2517f..9a220bcd 100644 --- a/folly/wangle/channel/HandlerContext-inl.h +++ b/folly/wangle/channel/HandlerContext-inl.h @@ -25,9 +25,6 @@ class PipelineContext { virtual void attachPipeline() = 0; virtual void detachPipeline() = 0; - virtual void attachTransport() = 0; - virtual void detachTransport() = 0; - template void attachContext(H* handler, HandlerContext* ctx) { if (++handler->attachCount_ == 1) { @@ -48,6 +45,8 @@ class InboundLink { virtual void read(In msg) = 0; virtual void readEOF() = 0; virtual void readException(exception_wrapper e) = 0; + virtual void transportActive() = 0; + virtual void transportInactive() = 0; }; template @@ -86,16 +85,6 @@ class ContextImplBase : public PipelineContext { attached_ = false; } - void attachTransport() override { - DestructorGuard dg(pipeline_); - handler_->attachTransport(impl_); - } - - void detachTransport() override { - DestructorGuard dg(pipeline_); - handler_->detachTransport(impl_); - } - void setNextIn(PipelineContext* ctx) override { auto nextIn = dynamic_cast*>(ctx); if (nextIn) { @@ -181,6 +170,20 @@ class ContextImpl } } + void fireTransportActive() override { + DestructorGuard dg(this->pipeline_); + if (this->nextIn_) { + this->nextIn_->transportActive(); + } + } + + void fireTransportInactive() override { + DestructorGuard dg(this->pipeline_); + if (this->nextIn_) { + this->nextIn_->transportInactive(); + } + } + Future fireWrite(Wout msg) override { DestructorGuard dg(this->pipeline_); if (this->nextOut_) { @@ -205,10 +208,6 @@ class ContextImpl return this->pipeline_; } - std::shared_ptr getTransport() override { - return this->pipeline_->getTransport(); - } - void setWriteFlags(WriteFlags flags) override { this->pipeline_->setWriteFlags(flags); } @@ -243,6 +242,16 @@ class ContextImpl this->handler_->readException(this, std::move(e)); } + void transportActive() override { + DestructorGuard dg(this->pipeline_); + this->handler_->transportActive(this); + } + + void transportInactive() override { + DestructorGuard dg(this->pipeline_); + this->handler_->transportInactive(this); + } + // OutboundLink overrides Future write(Win msg) override { DestructorGuard dg(this->pipeline_); @@ -310,12 +319,22 @@ class InboundContextImpl } } - PipelineBase* getPipeline() override { - return this->pipeline_; + void fireTransportActive() override { + DestructorGuard dg(this->pipeline_); + if (this->nextIn_) { + this->nextIn_->transportActive(); + } } - std::shared_ptr getTransport() override { - return this->pipeline_->getTransport(); + void fireTransportInactive() override { + DestructorGuard dg(this->pipeline_); + if (this->nextIn_) { + this->nextIn_->transportInactive(); + } + } + + PipelineBase* getPipeline() override { + return this->pipeline_; } // InboundLink overrides @@ -334,6 +353,16 @@ class InboundContextImpl this->handler_->readException(this, std::move(e)); } + void transportActive() override { + DestructorGuard dg(this->pipeline_); + this->handler_->transportActive(this); + } + + void transportInactive() override { + DestructorGuard dg(this->pipeline_); + this->handler_->transportInactive(this); + } + private: using DestructorGuard = typename P::DestructorGuard; }; @@ -387,10 +416,6 @@ class OutboundContextImpl return this->pipeline_; } - std::shared_ptr getTransport() override { - return this->pipeline_->getTransport(); - } - // OutboundLink overrides Future write(Win msg) override { DestructorGuard dg(this->pipeline_); diff --git a/folly/wangle/channel/HandlerContext.h b/folly/wangle/channel/HandlerContext.h index 2173ab35..ddd9a576 100644 --- a/folly/wangle/channel/HandlerContext.h +++ b/folly/wangle/channel/HandlerContext.h @@ -32,13 +32,16 @@ class HandlerContext { virtual void fireRead(In msg) = 0; virtual void fireReadEOF() = 0; virtual void fireReadException(exception_wrapper e) = 0; + virtual void fireTransportActive() = 0; + virtual void fireTransportInactive() = 0; virtual Future fireWrite(Out msg) = 0; virtual Future fireClose() = 0; virtual PipelineBase* getPipeline() = 0; - - virtual std::shared_ptr getTransport() = 0; + std::shared_ptr getTransport() { + return getPipeline()->getTransport(); + } virtual void setWriteFlags(WriteFlags flags) = 0; virtual WriteFlags getWriteFlags() = 0; @@ -67,10 +70,13 @@ class InboundHandlerContext { virtual void fireRead(In msg) = 0; virtual void fireReadEOF() = 0; virtual void fireReadException(exception_wrapper e) = 0; + virtual void fireTransportActive() = 0; + virtual void fireTransportInactive() = 0; virtual PipelineBase* getPipeline() = 0; - - virtual std::shared_ptr getTransport() = 0; + std::shared_ptr getTransport() { + return getPipeline()->getTransport(); + } // TODO Need get/set writeFlags, readBufferSettings? Probably not. // Do we even really need them stored in the pipeline at all? @@ -86,8 +92,9 @@ class OutboundHandlerContext { virtual Future fireClose() = 0; virtual PipelineBase* getPipeline() = 0; - - virtual std::shared_ptr getTransport() = 0; + std::shared_ptr getTransport() { + return getPipeline()->getTransport(); + } }; enum class HandlerDir { diff --git a/folly/wangle/channel/Pipeline-inl.h b/folly/wangle/channel/Pipeline-inl.h index 458f502c..7c1d46bc 100644 --- a/folly/wangle/channel/Pipeline-inl.h +++ b/folly/wangle/channel/Pipeline-inl.h @@ -35,11 +35,6 @@ Pipeline::~Pipeline() { } } -template -std::shared_ptr Pipeline::getTransport() { - return transport_; -} - template void Pipeline::setWriteFlags(WriteFlags flags) { writeFlags_ = flags; @@ -82,6 +77,24 @@ Pipeline::readEOF() { front_->readEOF(); } +template +template +typename std::enable_if::value>::type +Pipeline::transportActive() { + if (front_) { + front_->transportActive(); + } +} + +template +template +typename std::enable_if::value>::type +Pipeline::transportInactive() { + if (front_) { + front_->transportInactive(); + } +} + template template typename std::enable_if::value>::type @@ -221,23 +234,6 @@ bool Pipeline::setOwner(H* handler) { return false; } -template -void Pipeline::attachTransport( - std::shared_ptr transport) { - transport_ = std::move(transport); - for (auto& ctx : ctxs_) { - ctx->attachTransport(); - } -} - -template -void Pipeline::detachTransport() { - transport_ = nullptr; - for (auto& ctx : ctxs_) { - ctx->detachTransport(); - } -} - template template void Pipeline::addContextFront(Context* ctx) { diff --git a/folly/wangle/channel/Pipeline.h b/folly/wangle/channel/Pipeline.h index 14a586ff..8fa134d8 100644 --- a/folly/wangle/channel/Pipeline.h +++ b/folly/wangle/channel/Pipeline.h @@ -43,8 +43,17 @@ class PipelineBase { } } + void setTransport(std::shared_ptr transport) { + transport_ = transport; + } + + std::shared_ptr getTransport() { + return transport_; + } + private: PipelineManager* manager_{nullptr}; + std::shared_ptr transport_; }; struct Nothing{}; @@ -63,8 +72,6 @@ class Pipeline : public PipelineBase, public DelayedDestruction { Pipeline(); ~Pipeline(); - std::shared_ptr getTransport(); - void setWriteFlags(WriteFlags flags); WriteFlags getWriteFlags(); @@ -83,6 +90,14 @@ class Pipeline : public PipelineBase, public DelayedDestruction { typename std::enable_if::value>::type readException(exception_wrapper e); + template + typename std::enable_if::value>::type + transportActive(); + + template + typename std::enable_if::value>::type + transportInactive(); + template typename std::enable_if::value, Future>::type write(W msg); @@ -121,10 +136,6 @@ class Pipeline : public PipelineBase, public DelayedDestruction { template bool setOwner(H* handler); - void attachTransport(std::shared_ptr transport); - - void detachTransport(); - protected: explicit Pipeline(bool isStatic); @@ -137,7 +148,6 @@ class Pipeline : public PipelineBase, public DelayedDestruction { template Pipeline& addHelper(std::shared_ptr&& ctx, bool front); - std::shared_ptr transport_; WriteFlags writeFlags_{WriteFlags::NONE}; std::pair readBufferSettings_{2048, 2048}; diff --git a/folly/wangle/channel/test/OutputBufferingHandlerTest.cpp b/folly/wangle/channel/test/OutputBufferingHandlerTest.cpp index a0279666..0fce7911 100644 --- a/folly/wangle/channel/test/OutputBufferingHandlerTest.cpp +++ b/folly/wangle/channel/test/OutputBufferingHandlerTest.cpp @@ -42,8 +42,7 @@ TEST(OutputBufferingHandlerTest, Basic) { EventBase eb; auto socket = AsyncSocket::newSocket(&eb); - EXPECT_CALL(mockHandler, attachTransport(_)); - pipeline.attachTransport(socket); + pipeline.setTransport(socket); // Buffering should prevent writes until the EB loops, and the writes should // be batched into one write call. diff --git a/folly/wangle/channel/test/PipelineTest.cpp b/folly/wangle/channel/test/PipelineTest.cpp index 4c940583..9c26bd8f 100644 --- a/folly/wangle/channel/test/PipelineTest.cpp +++ b/folly/wangle/channel/test/PipelineTest.cpp @@ -299,21 +299,3 @@ TEST(Pipeline, DynamicConstruction) { .finalize()); } } - -TEST(Pipeline, AttachTransport) { - IntHandler handler; - EXPECT_CALL(handler, attachPipeline(_)); - StaticPipeline - pipeline(&handler); - - EventBase eb; - auto socket = AsyncSocket::newSocket(&eb); - - EXPECT_CALL(handler, attachTransport(_)); - pipeline.attachTransport(socket); - - EXPECT_CALL(handler, detachTransport(_)); - pipeline.detachTransport(); - - EXPECT_CALL(handler, detachPipeline(_)); -} -- 2.34.1