transportActive and transportInactive
authorJames Sedgwick <jsedgwick@fb.com>
Thu, 14 May 2015 00:43:44 +0000 (17:43 -0700)
committerViswanath Sivakumar <viswanath@fb.com>
Wed, 20 May 2015 17:57:05 +0000 (10:57 -0700)
Summary:
These are equivalents to Netty's channelActive and channelInactive, but we've been calling channels transports so I'm staying consistent.
I skipped integrating this into TAsyncTransportHandler because thrift still does manual CB attachment/detachment and it's unclear how that fits into this model
If my suspicions are correct, it *should* be possible to make attachReadCallback and detachReadCallback private in AsyncSocketHandler, right? And perhaps get rid of the event base modifier methods? What's our use case for those?

Test Plan: unit, employ in telnet server

Reviewed By: davejwatson@fb.com

Subscribers: fugalh, alandau, bmatheny, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2044520

Signature: t1:2044520:1431551998:af1de358b5dbefcca148814015d8e9f63f458d5d

folly/wangle/bootstrap/ClientBootstrap.h
folly/wangle/bootstrap/ServerBootstrap-inl.h
folly/wangle/channel/AsyncSocketHandler.h
folly/wangle/channel/Handler.h
folly/wangle/channel/HandlerContext-inl.h
folly/wangle/channel/HandlerContext.h
folly/wangle/channel/Pipeline-inl.h
folly/wangle/channel/Pipeline.h
folly/wangle/channel/test/OutputBufferingHandlerTest.cpp
folly/wangle/channel/test/PipelineTest.cpp

index 3ae2ce4f294770bd79179eec4d281f8b757584a9..ecd0c3b17bff4ef3b665f3782a5728abdb22eda5 100644 (file)
@@ -36,6 +36,9 @@ class ClientBootstrap {
         , bootstrap_(bootstrap) {}
 
     void connectSuccess() noexcept override {
+      if (bootstrap_->getPipeline()) {
+        bootstrap_->getPipeline()->transportActive();
+      }
       promise_.setValue(bootstrap_->getPipeline());
       delete this;
     }
@@ -77,9 +80,6 @@ class ClientBootstrap {
       socket->connect(
         new ConnectCallback(std::move(promise), this), address);
       pipeline_ = pipelineFactory_->newPipeline(socket);
-      if (pipeline_) {
-        pipeline_->attachTransport(socket);
-      }
     });
     return retval;
   }
index 85342646c3c30f77af678ce570818e0aef876a33..4b56de86628050ab775df870bc35c9018fa1bcbe 100644 (file)
@@ -89,6 +89,7 @@ class ServerAcceptor
         std::shared_ptr<AsyncSocket>(
           transport.release(),
           folly::DelayedDestruction::Destructor())));
+    pipeline->transportActive();
     auto connection = new ServerConnection(std::move(pipeline));
     Acceptor::addConnection(connection);
   }
index 04491bbd5c51992836ea2921eea4cc7998a114bb..267284940d358400d334972b7ae07a4f93ee3be8 100644 (file)
@@ -37,9 +37,7 @@ class AsyncSocketHandler
   AsyncSocketHandler(AsyncSocketHandler&&) = default;
 
   ~AsyncSocketHandler() {
-    if (socket_) {
-      detachReadCallback();
-    }
+    detachReadCallback();
   }
 
   void attachReadCallback() {
@@ -47,9 +45,14 @@ class AsyncSocketHandler
   }
 
   void detachReadCallback() {
-    if (socket_->getReadCallback() == this) {
+    if (socket_ && socket_->getReadCallback() == this) {
       socket_->setReadCB(nullptr);
     }
+    auto ctx = getContext();
+    if (ctx && !firedInactive_) {
+      firedInactive_ = true;
+      ctx->fireTransportInactive();
+    }
   }
 
   void attachEventBase(folly::EventBase* eventBase) {
@@ -65,8 +68,14 @@ class AsyncSocketHandler
     }
   }
 
-  void attachPipeline(Context* ctx) override {
+  void transportActive(Context* ctx) override {
+    ctx->getPipeline()->setTransport(socket_);
     attachReadCallback();
+    ctx->fireTransportActive();
+  }
+
+  void detachPipeline(Context* ctx) override {
+    detachReadCallback();
   }
 
   folly::Future<void> write(
@@ -149,6 +158,7 @@ class AsyncSocketHandler
 
   folly::IOBufQueue bufQueue_{folly::IOBufQueue::cacheChainLength()};
   std::shared_ptr<AsyncSocket> socket_{nullptr};
+  bool firedInactive_{false};
 };
 
 }}
index cdad11e53f65124e8c91db43239b695dc04412cf..2080c2c224ddd00a4da33adbc753dfe7ace36fd0 100644 (file)
@@ -29,10 +29,7 @@ class HandlerBase {
   virtual ~HandlerBase() {}
 
   virtual void attachPipeline(Context* ctx) {}
-  virtual void attachTransport(Context* ctx) {}
-
   virtual void detachPipeline(Context* ctx) {}
-  virtual void detachTransport(Context* ctx) {}
 
   Context* getContext() {
     if (attachCount_ != 1) {
@@ -67,6 +64,12 @@ class Handler : public HandlerBase<HandlerContext<Rout, Wout>> {
   virtual void readException(Context* ctx, exception_wrapper e) {
     ctx->fireReadException(std::move(e));
   }
+  virtual void transportActive(Context* ctx) {
+    ctx->fireTransportActive();
+  }
+  virtual void transportInactive(Context* ctx) {
+    ctx->fireTransportInactive();
+  }
 
   virtual Future<void> write(Context* ctx, Win msg) = 0;
   virtual Future<void> close(Context* ctx) {
@@ -81,8 +84,6 @@ class Handler : public HandlerBase<HandlerContext<Rout, Wout>> {
       exception_wrapper e) {}
   virtual void channelRegistered(HandlerContext* ctx) {}
   virtual void channelUnregistered(HandlerContext* ctx) {}
-  virtual void channelActive(HandlerContext* ctx) {}
-  virtual void channelInactive(HandlerContext* ctx) {}
   virtual void channelReadComplete(HandlerContext* ctx) {}
   virtual void userEventTriggered(HandlerContext* ctx, void* evt) {}
   virtual void channelWritabilityChanged(HandlerContext* ctx) {}
@@ -120,6 +121,12 @@ class InboundHandler : public HandlerBase<InboundHandlerContext<Rout>> {
   virtual void readException(Context* ctx, exception_wrapper e) {
     ctx->fireReadException(std::move(e));
   }
+  virtual void transportActive(Context* ctx) {
+    ctx->fireTransportActive();
+  }
+  virtual void transportInactive(Context* ctx) {
+    ctx->fireTransportInactive();
+  }
 };
 
 template <class Win, class Wout = Win>
index 49f2517f1eeaedea94fdd9ffb54adfa1f3f744f5..9a220bcd3e8b8b847df4c9cf1f970418bd5311ec 100644 (file)
@@ -25,9 +25,6 @@ class PipelineContext {
   virtual void attachPipeline() = 0;
   virtual void detachPipeline() = 0;
 
-  virtual void attachTransport() = 0;
-  virtual void detachTransport() = 0;
-
   template <class H, class HandlerContext>
   void attachContext(H* handler, HandlerContext* ctx) {
     if (++handler->attachCount_ == 1) {
@@ -48,6 +45,8 @@ class InboundLink {
   virtual void read(In msg) = 0;
   virtual void readEOF() = 0;
   virtual void readException(exception_wrapper e) = 0;
+  virtual void transportActive() = 0;
+  virtual void transportInactive() = 0;
 };
 
 template <class Out>
@@ -86,16 +85,6 @@ class ContextImplBase : public PipelineContext {
     attached_ = false;
   }
 
-  void attachTransport() override {
-    DestructorGuard dg(pipeline_);
-    handler_->attachTransport(impl_);
-  }
-
-  void detachTransport() override {
-    DestructorGuard dg(pipeline_);
-    handler_->detachTransport(impl_);
-  }
-
   void setNextIn(PipelineContext* ctx) override {
     auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
     if (nextIn) {
@@ -181,6 +170,20 @@ class ContextImpl
     }
   }
 
+  void fireTransportActive() override {
+    DestructorGuard dg(this->pipeline_);
+    if (this->nextIn_) {
+      this->nextIn_->transportActive();
+    }
+  }
+
+  void fireTransportInactive() override {
+    DestructorGuard dg(this->pipeline_);
+    if (this->nextIn_) {
+      this->nextIn_->transportInactive();
+    }
+  }
+
   Future<void> fireWrite(Wout msg) override {
     DestructorGuard dg(this->pipeline_);
     if (this->nextOut_) {
@@ -205,10 +208,6 @@ class ContextImpl
     return this->pipeline_;
   }
 
-  std::shared_ptr<AsyncTransport> getTransport() override {
-    return this->pipeline_->getTransport();
-  }
-
   void setWriteFlags(WriteFlags flags) override {
     this->pipeline_->setWriteFlags(flags);
   }
@@ -243,6 +242,16 @@ class ContextImpl
     this->handler_->readException(this, std::move(e));
   }
 
+  void transportActive() override {
+    DestructorGuard dg(this->pipeline_);
+    this->handler_->transportActive(this);
+  }
+
+  void transportInactive() override {
+    DestructorGuard dg(this->pipeline_);
+    this->handler_->transportInactive(this);
+  }
+
   // OutboundLink overrides
   Future<void> write(Win msg) override {
     DestructorGuard dg(this->pipeline_);
@@ -310,12 +319,22 @@ class InboundContextImpl
     }
   }
 
-  PipelineBase* getPipeline() override {
-    return this->pipeline_;
+  void fireTransportActive() override {
+    DestructorGuard dg(this->pipeline_);
+    if (this->nextIn_) {
+      this->nextIn_->transportActive();
+    }
   }
 
-  std::shared_ptr<AsyncTransport> getTransport() override {
-    return this->pipeline_->getTransport();
+  void fireTransportInactive() override {
+    DestructorGuard dg(this->pipeline_);
+    if (this->nextIn_) {
+      this->nextIn_->transportInactive();
+    }
+  }
+
+  PipelineBase* getPipeline() override {
+    return this->pipeline_;
   }
 
   // InboundLink overrides
@@ -334,6 +353,16 @@ class InboundContextImpl
     this->handler_->readException(this, std::move(e));
   }
 
+  void transportActive() override {
+    DestructorGuard dg(this->pipeline_);
+    this->handler_->transportActive(this);
+  }
+
+  void transportInactive() override {
+    DestructorGuard dg(this->pipeline_);
+    this->handler_->transportInactive(this);
+  }
+
  private:
   using DestructorGuard = typename P::DestructorGuard;
 };
@@ -387,10 +416,6 @@ class OutboundContextImpl
     return this->pipeline_;
   }
 
-  std::shared_ptr<AsyncTransport> getTransport() override {
-    return this->pipeline_->getTransport();
-  }
-
   // OutboundLink overrides
   Future<void> write(Win msg) override {
     DestructorGuard dg(this->pipeline_);
index 2173ab351eb47af4a294b64dfd710888b97fc8ee..ddd9a57654794f4efbe059baac665b6deeb5f499 100644 (file)
@@ -32,13 +32,16 @@ class HandlerContext {
   virtual void fireRead(In msg) = 0;
   virtual void fireReadEOF() = 0;
   virtual void fireReadException(exception_wrapper e) = 0;
+  virtual void fireTransportActive() = 0;
+  virtual void fireTransportInactive() = 0;
 
   virtual Future<void> fireWrite(Out msg) = 0;
   virtual Future<void> fireClose() = 0;
 
   virtual PipelineBase* getPipeline() = 0;
-
-  virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
+  std::shared_ptr<AsyncTransport> getTransport() {
+    return getPipeline()->getTransport();
+  }
 
   virtual void setWriteFlags(WriteFlags flags) = 0;
   virtual WriteFlags getWriteFlags() = 0;
@@ -67,10 +70,13 @@ class InboundHandlerContext {
   virtual void fireRead(In msg) = 0;
   virtual void fireReadEOF() = 0;
   virtual void fireReadException(exception_wrapper e) = 0;
+  virtual void fireTransportActive() = 0;
+  virtual void fireTransportInactive() = 0;
 
   virtual PipelineBase* getPipeline() = 0;
-
-  virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
+  std::shared_ptr<AsyncTransport> getTransport() {
+    return getPipeline()->getTransport();
+  }
 
   // TODO Need get/set writeFlags, readBufferSettings? Probably not.
   // Do we even really need them stored in the pipeline at all?
@@ -86,8 +92,9 @@ class OutboundHandlerContext {
   virtual Future<void> fireClose() = 0;
 
   virtual PipelineBase* getPipeline() = 0;
-
-  virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
+  std::shared_ptr<AsyncTransport> getTransport() {
+    return getPipeline()->getTransport();
+  }
 };
 
 enum class HandlerDir {
index 458f502ca24e77c890a56be2a16475357f7fbdc1..7c1d46bc2e3c53386c091be7c6839df036a91df4 100644 (file)
@@ -35,11 +35,6 @@ Pipeline<R, W>::~Pipeline() {
   }
 }
 
-template <class R, class W>
-std::shared_ptr<AsyncTransport> Pipeline<R, W>::getTransport() {
-  return transport_;
-}
-
 template <class R, class W>
 void Pipeline<R, W>::setWriteFlags(WriteFlags flags) {
   writeFlags_ = flags;
@@ -82,6 +77,24 @@ Pipeline<R, W>::readEOF() {
   front_->readEOF();
 }
 
+template <class R, class W>
+template <class T>
+typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+Pipeline<R, W>::transportActive() {
+  if (front_) {
+    front_->transportActive();
+  }
+}
+
+template <class R, class W>
+template <class T>
+typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+Pipeline<R, W>::transportInactive() {
+  if (front_) {
+    front_->transportInactive();
+  }
+}
+
 template <class R, class W>
 template <class T>
 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
@@ -221,23 +234,6 @@ bool Pipeline<R, W>::setOwner(H* handler) {
   return false;
 }
 
-template <class R, class W>
-void Pipeline<R, W>::attachTransport(
-    std::shared_ptr<AsyncTransport> transport) {
-  transport_ = std::move(transport);
-  for (auto& ctx : ctxs_) {
-    ctx->attachTransport();
-  }
-}
-
-template <class R, class W>
-void Pipeline<R, W>::detachTransport() {
-  transport_ = nullptr;
-  for (auto& ctx : ctxs_) {
-    ctx->detachTransport();
-  }
-}
-
 template <class R, class W>
 template <class Context>
 void Pipeline<R, W>::addContextFront(Context* ctx) {
index 14a586ffa813f76b7d3aa94cc9577ee6c09e2d96..8fa134d80f18c996f483779f854a620762c5d9c9 100644 (file)
@@ -43,8 +43,17 @@ class PipelineBase {
     }
   }
 
+  void setTransport(std::shared_ptr<AsyncTransport> transport) {
+    transport_ = transport;
+  }
+
+  std::shared_ptr<AsyncTransport> getTransport() {
+    return transport_;
+  }
+
  private:
   PipelineManager* manager_{nullptr};
+  std::shared_ptr<AsyncTransport> transport_;
 };
 
 struct Nothing{};
@@ -63,8 +72,6 @@ class Pipeline : public PipelineBase, public DelayedDestruction {
   Pipeline();
   ~Pipeline();
 
-  std::shared_ptr<AsyncTransport> getTransport();
-
   void setWriteFlags(WriteFlags flags);
   WriteFlags getWriteFlags();
 
@@ -83,6 +90,14 @@ class Pipeline : public PipelineBase, public DelayedDestruction {
   typename std::enable_if<!std::is_same<T, Nothing>::value>::type
   readException(exception_wrapper e);
 
+  template <class T = R>
+  typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+  transportActive();
+
+  template <class T = R>
+  typename std::enable_if<!std::is_same<T, Nothing>::value>::type
+  transportInactive();
+
   template <class T = W>
   typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
   write(W msg);
@@ -121,10 +136,6 @@ class Pipeline : public PipelineBase, public DelayedDestruction {
   template <class H>
   bool setOwner(H* handler);
 
-  void attachTransport(std::shared_ptr<AsyncTransport> transport);
-
-  void detachTransport();
-
  protected:
   explicit Pipeline(bool isStatic);
 
@@ -137,7 +148,6 @@ class Pipeline : public PipelineBase, public DelayedDestruction {
   template <class Context>
   Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
 
-  std::shared_ptr<AsyncTransport> transport_;
   WriteFlags writeFlags_{WriteFlags::NONE};
   std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
 
index a0279666d543c52089ccd31356fd63af08cf705b..0fce791118eae344f35d5f9b17595b0eba616e7c 100644 (file)
@@ -42,8 +42,7 @@ TEST(OutputBufferingHandlerTest, Basic) {
 
   EventBase eb;
   auto socket = AsyncSocket::newSocket(&eb);
-  EXPECT_CALL(mockHandler, attachTransport(_));
-  pipeline.attachTransport(socket);
+  pipeline.setTransport(socket);
 
   // Buffering should prevent writes until the EB loops, and the writes should
   // be batched into one write call.
index 4c9405837fb29295934b88d5b5ad0a37dc2ded31..9c26bd8ffeb305292cf1a8925fee60674b9a7a17 100644 (file)
@@ -299,21 +299,3 @@ TEST(Pipeline, DynamicConstruction) {
         .finalize());
   }
 }
-
-TEST(Pipeline, AttachTransport) {
-  IntHandler handler;
-  EXPECT_CALL(handler, attachPipeline(_));
-  StaticPipeline<int, int, IntHandler>
-  pipeline(&handler);
-
-  EventBase eb;
-  auto socket = AsyncSocket::newSocket(&eb);
-
-  EXPECT_CALL(handler, attachTransport(_));
-  pipeline.attachTransport(socket);
-
-  EXPECT_CALL(handler, detachTransport(_));
-  pipeline.detachTransport();
-
-  EXPECT_CALL(handler, detachPipeline(_));
-}