virtual void setNextIn(PipelineContext* ctx) = 0;
virtual void setNextOut(PipelineContext* ctx) = 0;
+
+ virtual HandlerDir getDirection() = 0;
};
template <class In>
}
void setNextIn(PipelineContext* ctx) override {
+ if (!ctx) {
+ nextIn_ = nullptr;
+ return;
+ }
auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
if (nextIn) {
nextIn_ = nextIn;
}
void setNextOut(PipelineContext* ctx) override {
+ if (!ctx) {
+ nextOut_ = nullptr;
+ return;
+ }
auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);
if (nextOut) {
nextOut_ = nextOut;
}
}
+ HandlerDir getDirection() override {
+ return H::dir;
+ }
+
protected:
Context* impl_;
P* pipeline_;
return addFront(std::shared_ptr<H>(handler, [](H*){}));
}
+template <class R, class W>
+template <class H>
+Pipeline<R, W>& Pipeline<R, W>::removeHelper(H* handler, bool checkEqual) {
+ typedef typename ContextType<H, Pipeline<R, W>>::type Context;
+ bool removed = false;
+ for (auto it = ctxs_.begin(); it != ctxs_.end(); it++) {
+ auto ctx = std::dynamic_pointer_cast<Context>(*it);
+ if (ctx && (!checkEqual || ctx->getHandler() == handler)) {
+ it = removeAt(it);
+ removed = true;
+ if (it == ctxs_.end()) {
+ break;
+ }
+ }
+ }
+
+ if (!removed) {
+ throw std::invalid_argument("No such handler in pipeline");
+ }
+
+ return *this;
+}
+
+template <class R, class W>
+template <class H>
+Pipeline<R, W>& Pipeline<R, W>::remove() {
+ return removeHelper<H>(nullptr, false);
+}
+
+template <class R, class W>
+template <class H>
+Pipeline<R, W>& Pipeline<R, W>::remove(H* handler) {
+ return removeHelper<H>(handler, true);
+}
+
+template <class R, class W>
+typename Pipeline<R, W>::ContextIterator Pipeline<R, W>::removeAt(
+ const typename Pipeline<R, W>::ContextIterator& it) {
+ (*it)->detachPipeline();
+
+ const auto dir = (*it)->getDirection();
+ if (dir == HandlerDir::BOTH || dir == HandlerDir::IN) {
+ auto it2 = std::find(inCtxs_.begin(), inCtxs_.end(), it->get());
+ CHECK(it2 != inCtxs_.end());
+ inCtxs_.erase(it2);
+ }
+
+ if (dir == HandlerDir::BOTH || dir == HandlerDir::OUT) {
+ auto it2 = std::find(outCtxs_.begin(), outCtxs_.end(), it->get());
+ CHECK(it2 != outCtxs_.end());
+ outCtxs_.erase(it2);
+ }
+
+ return ctxs_.erase(it);
+}
+
+template <class R, class W>
+Pipeline<R, W>& Pipeline<R, W>::removeFront() {
+ if (ctxs_.empty()) {
+ throw std::invalid_argument("No handlers in pipeline");
+ }
+ removeAt(ctxs_.begin());
+ return *this;
+}
+
+template <class R, class W>
+Pipeline<R, W>& Pipeline<R, W>::removeBack() {
+ if (ctxs_.empty()) {
+ throw std::invalid_argument("No handlers in pipeline");
+ }
+ removeAt(--ctxs_.end());
+ return *this;
+}
+
template <class R, class W>
template <class H>
H* Pipeline<R, W>::getHandler(int i) {
// TODO Have read/write/etc check that pipeline has been finalized
template <class R, class W>
void Pipeline<R, W>::finalize() {
+ front_ = nullptr;
if (!inCtxs_.empty()) {
front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
inCtxs_[i]->setNextIn(inCtxs_[i+1]);
}
+ inCtxs_.back()->setNextIn(nullptr);
}
+ back_ = nullptr;
if (!outCtxs_.empty()) {
back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
outCtxs_[i]->setNextOut(outCtxs_[i-1]);
}
+ outCtxs_.front()->setNextOut(nullptr);
}
if (!front_) {
template <class H>
Pipeline& addFront(H* handler);
+ template <class H>
+ Pipeline& remove(H* handler);
+
+ template <class H>
+ Pipeline& remove();
+
+ Pipeline& removeFront();
+
+ Pipeline& removeBack();
+
template <class H>
H* getHandler(int i);
template <class Context>
Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
+ template <class H>
+ Pipeline& removeHelper(H* handler, bool checkEqual);
+
+ typedef std::vector<std::shared_ptr<PipelineContext>>::iterator
+ ContextIterator;
+
+ ContextIterator removeAt(const ContextIterator& it);
+
WriteFlags writeFlags_{WriteFlags::NONE};
std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
.finalize());
}
}
+
+TEST(Pipeline, RemovePointer) {
+ IntHandler handler1, handler2;
+ EXPECT_CALL(handler1, attachPipeline(_));
+ EXPECT_CALL(handler2, attachPipeline(_));
+ Pipeline<int, int> pipeline;
+ pipeline
+ .addBack(&handler1)
+ .addBack(&handler2)
+ .finalize();
+
+ EXPECT_CALL(handler1, detachPipeline(_));
+ pipeline
+ .remove(&handler1)
+ .finalize();
+
+ EXPECT_CALL(handler2, read_(_, _));
+ pipeline.read(1);
+
+ EXPECT_CALL(handler2, detachPipeline(_));
+}
+
+TEST(Pipeline, RemoveFront) {
+ IntHandler handler1, handler2;
+ EXPECT_CALL(handler1, attachPipeline(_));
+ EXPECT_CALL(handler2, attachPipeline(_));
+ Pipeline<int, int> pipeline;
+ pipeline
+ .addBack(&handler1)
+ .addBack(&handler2)
+ .finalize();
+
+ EXPECT_CALL(handler1, detachPipeline(_));
+ pipeline
+ .removeFront()
+ .finalize();
+
+ EXPECT_CALL(handler2, read_(_, _));
+ pipeline.read(1);
+
+ EXPECT_CALL(handler2, detachPipeline(_));
+}
+
+TEST(Pipeline, RemoveBack) {
+ IntHandler handler1, handler2;
+ EXPECT_CALL(handler1, attachPipeline(_));
+ EXPECT_CALL(handler2, attachPipeline(_));
+ Pipeline<int, int> pipeline;
+ pipeline
+ .addBack(&handler1)
+ .addBack(&handler2)
+ .finalize();
+
+ EXPECT_CALL(handler2, detachPipeline(_));
+ pipeline
+ .removeBack()
+ .finalize();
+
+ EXPECT_CALL(handler1, read_(_, _));
+ pipeline.read(1);
+
+ EXPECT_CALL(handler1, detachPipeline(_));
+}
+
+TEST(Pipeline, RemoveType) {
+ IntHandler handler1;
+ IntHandler2 handler2;
+ EXPECT_CALL(handler1, attachPipeline(_));
+ EXPECT_CALL(handler2, attachPipeline(_));
+ Pipeline<int, int> pipeline;
+ pipeline
+ .addBack(&handler1)
+ .addBack(&handler2)
+ .finalize();
+
+ EXPECT_CALL(handler1, detachPipeline(_));
+ pipeline
+ .remove<IntHandler>()
+ .finalize();
+
+ EXPECT_CALL(handler2, read_(_, _));
+ pipeline.read(1);
+
+ EXPECT_CALL(handler2, detachPipeline(_));
+}
class SerialClientDispatcher : public HandlerAdapter<Req, Resp>
, public Service<Req, Resp> {
public:
-
typedef typename HandlerAdapter<Req, Resp>::Context Context;
+ ~SerialClientDispatcher() {
+ if (pipeline_) {
+ try {
+ pipeline_->remove(this).finalize();
+ } catch (const std::invalid_argument& e) {
+ // not in pipeline; this is fine
+ }
+ }
+ }
+
void setPipeline(Pipeline* pipeline) {
+ try {
+ pipeline->template remove<SerialClientDispatcher>();
+ } catch (const std::invalid_argument& e) {
+ // no existing dispatcher; this is fine
+ }
pipeline_ = pipeline;
- pipeline->addBack(this);
- pipeline->finalize();
+ pipeline_->addBack(this);
+ pipeline_->finalize();
}
void read(Context* ctx, Req in) override {
virtual Future<void> close(Context* ctx) override {
return HandlerAdapter<Req, Resp>::close(ctx);
}
+
+ void detachPipeline(Context* ctx) override {
+ pipeline_ = nullptr;
+ }
+
private:
Pipeline* pipeline_{nullptr};
folly::Optional<Promise<Resp>> p_;