pipeline handler removal, fix service test
authorJames Sedgwick <jsedgwick@fb.com>
Mon, 15 Jun 2015 19:12:28 +0000 (12:12 -0700)
committerSara Golemon <sgolemon@fb.com>
Mon, 15 Jun 2015 20:36:36 +0000 (13:36 -0700)
Summary: add remove<T>, remove(Handler*), removeFront(), removeBack() to Pipeline
employ these to fix up reusing client pipelines with client dispatchers, which in turn fixes the broken ServiceTest

Reviewed By: @djwatson

Differential Revision: D2152636

folly/wangle/channel/HandlerContext-inl.h
folly/wangle/channel/Pipeline-inl.h
folly/wangle/channel/Pipeline.h
folly/wangle/channel/test/PipelineTest.cpp
folly/wangle/service/ClientDispatcher.h

index 5f12f99e8f2979af4109dd83a337767ad0759b86..9f111c419069ee4a0913da6686ecc8e1aea1572b 100644 (file)
@@ -36,6 +36,8 @@ class PipelineContext {
 
   virtual void setNextIn(PipelineContext* ctx) = 0;
   virtual void setNextOut(PipelineContext* ctx) = 0;
+
+  virtual HandlerDir getDirection() = 0;
 };
 
 template <class In>
@@ -86,6 +88,10 @@ class ContextImplBase : public PipelineContext {
   }
 
   void setNextIn(PipelineContext* ctx) override {
+    if (!ctx) {
+      nextIn_ = nullptr;
+      return;
+    }
     auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
     if (nextIn) {
       nextIn_ = nextIn;
@@ -95,6 +101,10 @@ class ContextImplBase : public PipelineContext {
   }
 
   void setNextOut(PipelineContext* ctx) override {
+    if (!ctx) {
+      nextOut_ = nullptr;
+      return;
+    }
     auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);
     if (nextOut) {
       nextOut_ = nextOut;
@@ -103,6 +113,10 @@ class ContextImplBase : public PipelineContext {
     }
   }
 
+  HandlerDir getDirection() override {
+    return H::dir;
+  }
+
  protected:
   Context* impl_;
   P* pipeline_;
index 7c1d46bc2e3c53386c091be7c6839df036a91df4..dde1f39eda93360eaa42460c0c9709389aa9ee46 100644 (file)
@@ -164,6 +164,80 @@ Pipeline<R, W>& Pipeline<R, W>::addFront(H* handler) {
   return addFront(std::shared_ptr<H>(handler, [](H*){}));
 }
 
+template <class R, class W>
+template <class H>
+Pipeline<R, W>& Pipeline<R, W>::removeHelper(H* handler, bool checkEqual) {
+  typedef typename ContextType<H, Pipeline<R, W>>::type Context;
+  bool removed = false;
+  for (auto it = ctxs_.begin(); it != ctxs_.end(); it++) {
+    auto ctx = std::dynamic_pointer_cast<Context>(*it);
+    if (ctx && (!checkEqual || ctx->getHandler() == handler)) {
+      it = removeAt(it);
+      removed = true;
+      if (it == ctxs_.end()) {
+        break;
+      }
+    }
+  }
+
+  if (!removed) {
+    throw std::invalid_argument("No such handler in pipeline");
+  }
+
+  return *this;
+}
+
+template <class R, class W>
+template <class H>
+Pipeline<R, W>& Pipeline<R, W>::remove() {
+  return removeHelper<H>(nullptr, false);
+}
+
+template <class R, class W>
+template <class H>
+Pipeline<R, W>& Pipeline<R, W>::remove(H* handler) {
+  return removeHelper<H>(handler, true);
+}
+
+template <class R, class W>
+typename Pipeline<R, W>::ContextIterator Pipeline<R, W>::removeAt(
+    const typename Pipeline<R, W>::ContextIterator& it) {
+  (*it)->detachPipeline();
+
+  const auto dir = (*it)->getDirection();
+  if (dir == HandlerDir::BOTH || dir == HandlerDir::IN) {
+    auto it2 = std::find(inCtxs_.begin(), inCtxs_.end(), it->get());
+    CHECK(it2 != inCtxs_.end());
+    inCtxs_.erase(it2);
+  }
+
+  if (dir == HandlerDir::BOTH || dir == HandlerDir::OUT) {
+    auto it2 = std::find(outCtxs_.begin(), outCtxs_.end(), it->get());
+    CHECK(it2 != outCtxs_.end());
+    outCtxs_.erase(it2);
+  }
+
+  return ctxs_.erase(it);
+}
+
+template <class R, class W>
+Pipeline<R, W>& Pipeline<R, W>::removeFront() {
+  if (ctxs_.empty()) {
+    throw std::invalid_argument("No handlers in pipeline");
+  }
+  removeAt(ctxs_.begin());
+  return *this;
+}
+
+template <class R, class W>
+Pipeline<R, W>& Pipeline<R, W>::removeBack() {
+  if (ctxs_.empty()) {
+    throw std::invalid_argument("No handlers in pipeline");
+  }
+  removeAt(--ctxs_.end());
+  return *this;
+}
+
 template <class R, class W>
 template <class H>
 H* Pipeline<R, W>::getHandler(int i) {
@@ -190,18 +264,22 @@ inline void logWarningIfNotNothing<Nothing>(const std::string& warning) {
 // TODO Have read/write/etc check that pipeline has been finalized
 template <class R, class W>
 void Pipeline<R, W>::finalize() {
+  front_ = nullptr;
   if (!inCtxs_.empty()) {
     front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
     for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
       inCtxs_[i]->setNextIn(inCtxs_[i+1]);
     }
+    inCtxs_.back()->setNextIn(nullptr);
   }
 
+  back_ = nullptr;
   if (!outCtxs_.empty()) {
     back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
     for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
       outCtxs_[i]->setNextOut(outCtxs_[i-1]);
     }
+    outCtxs_.front()->setNextOut(nullptr);
   }
 
   if (!front_) {
index 174c5d9f1b7512bfd3d77a2b0d801de54410da12..7138ab1c10728759ce80d6a9f70960fbe201cea9 100644 (file)
@@ -126,6 +126,16 @@ class Pipeline : public PipelineBase {
   template <class H>
   Pipeline& addFront(H* handler);
 
+  template <class H>
+  Pipeline& remove(H* handler);
+
+  template <class H>
+  Pipeline& remove();
+
+  Pipeline& removeFront();
+
+  Pipeline& removeBack();
+
   template <class H>
   H* getHandler(int i);
 
@@ -150,6 +160,14 @@ class Pipeline : public PipelineBase {
   template <class Context>
   Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
 
+  template <class H>
+  Pipeline& removeHelper(H* handler, bool checkEqual);
+
+  typedef std::vector<std::shared_ptr<PipelineContext>>::iterator
+    ContextIterator;
+
+  ContextIterator removeAt(const ContextIterator& it);
+
   WriteFlags writeFlags_{WriteFlags::NONE};
   std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
 
index cdc4e980d7acac7e9cbfbeac577e4d7c99805ff6..36c4cee671b1aec9cf30e12f8ed3086fe41c187f 100644 (file)
@@ -304,3 +304,88 @@ TEST(Pipeline, DynamicConstruction) {
         .finalize());
   }
 }
+
+TEST(Pipeline, RemovePointer) {
+  IntHandler handler1, handler2;
+  EXPECT_CALL(handler1, attachPipeline(_));
+  EXPECT_CALL(handler2, attachPipeline(_));
+  Pipeline<int, int> pipeline;
+  pipeline
+    .addBack(&handler1)
+    .addBack(&handler2)
+    .finalize();
+
+  EXPECT_CALL(handler1, detachPipeline(_));
+  pipeline
+    .remove(&handler1)
+    .finalize();
+
+  EXPECT_CALL(handler2, read_(_, _));
+  pipeline.read(1);
+
+  EXPECT_CALL(handler2, detachPipeline(_));
+}
+
+TEST(Pipeline, RemoveFront) {
+  IntHandler handler1, handler2;
+  EXPECT_CALL(handler1, attachPipeline(_));
+  EXPECT_CALL(handler2, attachPipeline(_));
+  Pipeline<int, int> pipeline;
+  pipeline
+    .addBack(&handler1)
+    .addBack(&handler2)
+    .finalize();
+
+  EXPECT_CALL(handler1, detachPipeline(_));
+  pipeline
+    .removeFront()
+    .finalize();
+
+  EXPECT_CALL(handler2, read_(_, _));
+  pipeline.read(1);
+
+  EXPECT_CALL(handler2, detachPipeline(_));
+}
+
+TEST(Pipeline, RemoveBack) {
+  IntHandler handler1, handler2;
+  EXPECT_CALL(handler1, attachPipeline(_));
+  EXPECT_CALL(handler2, attachPipeline(_));
+  Pipeline<int, int> pipeline;
+  pipeline
+    .addBack(&handler1)
+    .addBack(&handler2)
+    .finalize();
+
+  EXPECT_CALL(handler2, detachPipeline(_));
+  pipeline
+    .removeBack()
+    .finalize();
+
+  EXPECT_CALL(handler1, read_(_, _));
+  pipeline.read(1);
+
+  EXPECT_CALL(handler1, detachPipeline(_));
+}
+
+TEST(Pipeline, RemoveType) {
+  IntHandler handler1;
+  IntHandler2 handler2;
+  EXPECT_CALL(handler1, attachPipeline(_));
+  EXPECT_CALL(handler2, attachPipeline(_));
+  Pipeline<int, int> pipeline;
+  pipeline
+    .addBack(&handler1)
+    .addBack(&handler2)
+    .finalize();
+
+  EXPECT_CALL(handler1, detachPipeline(_));
+  pipeline
+    .remove<IntHandler>()
+    .finalize();
+
+  EXPECT_CALL(handler2, read_(_, _));
+  pipeline.read(1);
+
+  EXPECT_CALL(handler2, detachPipeline(_));
+}
index d6354f04f98f717ff394749bc90d087250430775..66f24cc99c7553d40ec34d16c5a663375b70150e 100644 (file)
@@ -29,13 +29,27 @@ template <typename Pipeline, typename Req, typename Resp = Req>
 class SerialClientDispatcher : public HandlerAdapter<Req, Resp>
                              , public Service<Req, Resp> {
  public:
-
   typedef typename HandlerAdapter<Req, Resp>::Context Context;
 
+  ~SerialClientDispatcher() {
+    if (pipeline_) {
+      try {
+        pipeline_->remove(this).finalize();
+      } catch (const std::invalid_argument& e) {
+        // not in pipeline; this is fine
+      }
+    }
+  }
+
   void setPipeline(Pipeline* pipeline) {
+    try {
+      pipeline->template remove<SerialClientDispatcher>();
+    } catch (const std::invalid_argument& e) {
+      // no existing dispatcher; this is fine
+    }
     pipeline_ = pipeline;
-    pipeline->addBack(this);
-    pipeline->finalize();
+    pipeline_->addBack(this);
+    pipeline_->finalize();
   }
 
   void read(Context* ctx, Req in) override {
@@ -61,6 +75,11 @@ class SerialClientDispatcher : public HandlerAdapter<Req, Resp>
   virtual Future<void> close(Context* ctx) override {
     return HandlerAdapter<Req, Resp>::close(ctx);
   }
+
+  void detachPipeline(Context* ctx) override {
+    pipeline_ = nullptr;
+  }
+
  private:
   Pipeline* pipeline_{nullptr};
   folly::Optional<Promise<Resp>> p_;