From e4b8b76f31814b7e554c08653d032e02afb01884 Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Tue, 23 Jun 2015 11:21:34 -0700 Subject: [PATCH] rearrange Pipeline to have more functionality in PipelineBase Summary: This way, handlers can carry out more complex manipulations of the pipeline via ctx->getPipeline() without knowing the R/W types Reviewed By: @djwatson Differential Revision: D2158736 --- folly/Makefile.am | 1 + folly/wangle/channel/HandlerContext-inl.h | 46 ++-- folly/wangle/channel/Pipeline-inl.h | 303 ++++++++-------------- folly/wangle/channel/Pipeline.cpp | 83 ++++++ folly/wangle/channel/Pipeline.h | 144 +++++----- folly/wangle/channel/StaticPipeline.h | 2 +- 6 files changed, 296 insertions(+), 283 deletions(-) create mode 100644 folly/wangle/channel/Pipeline.cpp diff --git a/folly/Makefile.am b/folly/Makefile.am index 9b1c9e0a..e615a733 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -420,6 +420,7 @@ libfolly_la_SOURCES = \ wangle/acceptor/SocketOptions.cpp \ wangle/acceptor/TransportInfo.cpp \ wangle/bootstrap/ServerBootstrap.cpp \ + wangle/channel/Pipeline.cpp \ wangle/concurrent/CPUThreadPoolExecutor.cpp \ wangle/concurrent/Codel.cpp \ wangle/concurrent/IOThreadPoolExecutor.cpp \ diff --git a/folly/wangle/channel/HandlerContext-inl.h b/folly/wangle/channel/HandlerContext-inl.h index 9f111c41..6877f7de 100644 --- a/folly/wangle/channel/HandlerContext-inl.h +++ b/folly/wangle/channel/HandlerContext-inl.h @@ -59,7 +59,7 @@ class OutboundLink { virtual Future close() = 0; }; -template +template class ContextImplBase : public PipelineContext { public: ~ContextImplBase() = default; @@ -68,7 +68,7 @@ class ContextImplBase : public PipelineContext { return handler_.get(); } - void initialize(P* pipeline, std::shared_ptr handler) { + void initialize(PipelineBase* pipeline, std::shared_ptr handler) { pipeline_ = pipeline; handler_ = std::move(handler); } @@ -119,24 +119,24 @@ class ContextImplBase : public PipelineContext { protected: Context* impl_; - P* pipeline_; + PipelineBase* pipeline_; std::shared_ptr handler_; InboundLink* nextIn_{nullptr}; OutboundLink* nextOut_{nullptr}; private: bool attached_{false}; - using DestructorGuard = typename P::DestructorGuard; + using DestructorGuard = typename DelayedDestruction::DestructorGuard; }; -template +template class ContextImpl : public HandlerContext, public InboundLink, public OutboundLink, - public ContextImplBase> { + public ContextImplBase> { public: typedef typename H::rin Rin; typedef typename H::rout Rout; @@ -144,7 +144,7 @@ class ContextImpl typedef typename H::wout Wout; static const HandlerDir dir = HandlerDir::BOTH; - explicit ContextImpl(P* pipeline, std::shared_ptr handler) { + explicit ContextImpl(PipelineBase* pipeline, std::shared_ptr handler) { this->impl_ = this; this->initialize(pipeline, std::move(handler)); } @@ -278,14 +278,14 @@ class ContextImpl } private: - using DestructorGuard = typename P::DestructorGuard; + using DestructorGuard = typename DelayedDestruction::DestructorGuard; }; -template +template class InboundContextImpl : public InboundHandlerContext, public InboundLink, - public ContextImplBase> { + public ContextImplBase> { public: typedef typename H::rin Rin; typedef typename H::rout Rout; @@ -293,7 +293,9 @@ class InboundContextImpl typedef typename H::wout Wout; static const HandlerDir dir = HandlerDir::IN; - explicit InboundContextImpl(P* pipeline, std::shared_ptr handler) { + explicit InboundContextImpl( + PipelineBase* pipeline, + std::shared_ptr handler) { this->impl_ = this; this->initialize(pipeline, std::move(handler)); } @@ -378,14 +380,14 @@ class InboundContextImpl } private: - using DestructorGuard = typename P::DestructorGuard; + using DestructorGuard = typename DelayedDestruction::DestructorGuard; }; -template +template class OutboundContextImpl : public OutboundHandlerContext, public OutboundLink, - public ContextImplBase> { + public ContextImplBase> { public: typedef typename H::rin Rin; typedef typename H::rout Rout; @@ -393,7 +395,9 @@ class OutboundContextImpl typedef typename H::wout Wout; static const HandlerDir dir = HandlerDir::OUT; - explicit OutboundContextImpl(P* pipeline, std::shared_ptr handler) { + explicit OutboundContextImpl( + PipelineBase* pipeline, + std::shared_ptr handler) { this->impl_ = this; this->initialize(pipeline, std::move(handler)); } @@ -442,18 +446,18 @@ class OutboundContextImpl } private: - using DestructorGuard = typename P::DestructorGuard; + using DestructorGuard = typename DelayedDestruction::DestructorGuard; }; -template +template struct ContextType { typedef typename std::conditional< Handler::dir == HandlerDir::BOTH, - ContextImpl, + ContextImpl, typename std::conditional< Handler::dir == HandlerDir::IN, - InboundContextImpl, - OutboundContextImpl + InboundContextImpl, + OutboundContextImpl >::type>::type type; }; diff --git a/folly/wangle/channel/Pipeline-inl.h b/folly/wangle/channel/Pipeline-inl.h index f4f7e000..64096a14 100644 --- a/folly/wangle/channel/Pipeline-inl.h +++ b/folly/wangle/channel/Pipeline-inl.h @@ -35,28 +35,124 @@ Pipeline::~Pipeline() { } } -template -void Pipeline::setWriteFlags(WriteFlags flags) { - writeFlags_ = flags; +template +PipelineBase& PipelineBase::addBack(std::shared_ptr handler) { + typedef typename ContextType::type Context; + return addHelper(std::make_shared(this, std::move(handler)), false); } -template -WriteFlags Pipeline::getWriteFlags() { - return writeFlags_; +template +PipelineBase& PipelineBase::addBack(H&& handler) { + return addBack(std::make_shared(std::forward(handler))); } -template -void Pipeline::setReadBufferSettings( - uint64_t minAvailable, - uint64_t allocationSize) { - readBufferSettings_ = std::make_pair(minAvailable, allocationSize); +template +PipelineBase& PipelineBase::addBack(H* handler) { + return addBack(std::shared_ptr(handler, [](H*){})); } -template -std::pair Pipeline::getReadBufferSettings() { - return readBufferSettings_; +template +PipelineBase& PipelineBase::addFront(std::shared_ptr handler) { + typedef typename ContextType::type Context; + return addHelper(std::make_shared(this, std::move(handler)), true); +} + +template +PipelineBase& PipelineBase::addFront(H&& handler) { + return addFront(std::make_shared(std::forward(handler))); +} + +template +PipelineBase& PipelineBase::addFront(H* handler) { + return addFront(std::shared_ptr(handler, [](H*){})); +} + +template +PipelineBase& PipelineBase::removeHelper(H* handler, bool checkEqual) { + typedef typename ContextType::type Context; + bool removed = false; + for (auto it = ctxs_.begin(); it != ctxs_.end(); it++) { + auto ctx = std::dynamic_pointer_cast(*it); + if (ctx && (!checkEqual || ctx->getHandler() == handler)) { + it = removeAt(it); + removed = true; + if (it == ctxs_.end()) { + break; + } + } + } + + if (!removed) { + throw std::invalid_argument("No such handler in pipeline"); + } + + return *this; +} + +template +PipelineBase& PipelineBase::remove() { + return removeHelper(nullptr, false); +} + +template +PipelineBase& PipelineBase::remove(H* handler) { + return removeHelper(handler, true); +} + +template +H* PipelineBase::getHandler(int i) { + typedef typename ContextType::type Context; + auto ctx = dynamic_cast(ctxs_[i].get()); + CHECK(ctx); + return ctx->getHandler(); +} + +template +bool PipelineBase::setOwner(H* handler) { + typedef typename ContextType::type Context; + for (auto& ctx : ctxs_) { + auto ctxImpl = dynamic_cast(ctx.get()); + if (ctxImpl && ctxImpl->getHandler() == handler) { + owner_ = ctx; + return true; + } + } + return false; +} + +template +void PipelineBase::addContextFront(Context* ctx) { + addHelper(std::shared_ptr(ctx, [](Context*){}), true); +} + +template +PipelineBase& PipelineBase::addHelper( + std::shared_ptr&& ctx, + bool front) { + ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx); + if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) { + inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get()); + } + if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) { + outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get()); + } + return *this; +} + +namespace detail { + +template +inline void logWarningIfNotUnit(const std::string& warning) { + LOG(WARNING) << warning; +} + +template <> +inline void logWarningIfNotUnit(const std::string& warning) { + // do nothing } +} // detail + template template typename std::enable_if::value>::type @@ -126,141 +222,6 @@ Pipeline::close() { return back_->close(); } -template -template -Pipeline& Pipeline::addBack(std::shared_ptr handler) { - typedef typename ContextType>::type Context; - return addHelper(std::make_shared(this, std::move(handler)), false); -} - -template -template -Pipeline& Pipeline::addBack(H&& handler) { - return addBack(std::make_shared(std::forward(handler))); -} - -template -template -Pipeline& Pipeline::addBack(H* handler) { - return addBack(std::shared_ptr(handler, [](H*){})); -} - -template -template -Pipeline& Pipeline::addFront(std::shared_ptr handler) { - typedef typename ContextType>::type Context; - return addHelper(std::make_shared(this, std::move(handler)), true); -} - -template -template -Pipeline& Pipeline::addFront(H&& handler) { - return addFront(std::make_shared(std::forward(handler))); -} - -template -template -Pipeline& Pipeline::addFront(H* handler) { - return addFront(std::shared_ptr(handler, [](H*){})); -} - -template -template -Pipeline& Pipeline::removeHelper(H* handler, bool checkEqual) { - typedef typename ContextType>::type Context; - bool removed = false; - for (auto it = ctxs_.begin(); it != ctxs_.end(); it++) { - auto ctx = std::dynamic_pointer_cast(*it); - if (ctx && (!checkEqual || ctx->getHandler() == handler)) { - it = removeAt(it); - removed = true; - if (it == ctxs_.end()) { - break; - } - } - } - - if (!removed) { - throw std::invalid_argument("No such handler in pipeline"); - } - - return *this; -} - -template -template -Pipeline& Pipeline::remove() { - return removeHelper(nullptr, false); -} - -template -template -Pipeline& Pipeline::remove(H* handler) { - return removeHelper(handler, true); -} - -template -typename Pipeline::ContextIterator Pipeline::removeAt( - const typename Pipeline::ContextIterator& it) { - (*it)->detachPipeline(); - - const auto dir = (*it)->getDirection(); - if (dir == HandlerDir::BOTH || dir == HandlerDir::IN) { - auto it2 = std::find(inCtxs_.begin(), inCtxs_.end(), it->get()); - CHECK(it2 != inCtxs_.end()); - inCtxs_.erase(it2); - } - - if (dir == HandlerDir::BOTH || dir == HandlerDir::OUT) { - auto it2 = std::find(outCtxs_.begin(), outCtxs_.end(), it->get()); - CHECK(it2 != outCtxs_.end()); - outCtxs_.erase(it2); - } - - return ctxs_.erase(it); -} - -template -Pipeline& Pipeline::removeFront() { - if (ctxs_.empty()) { - throw std::invalid_argument("No handlers in pipeline"); - } - removeAt(ctxs_.begin()); - return *this; -} - -template -Pipeline& Pipeline::removeBack() { - if (ctxs_.empty()) { - throw std::invalid_argument("No handlers in pipeline"); - } - removeAt(--ctxs_.end()); - return *this; -} - -template -template -H* Pipeline::getHandler(int i) { - typedef typename ContextType>::type Context; - auto ctx = dynamic_cast(ctxs_[i].get()); - CHECK(ctx); - return ctx->getHandler(); -} - -namespace detail { - -template -inline void logWarningIfNotUnit(const std::string& warning) { - LOG(WARNING) << warning; -} - -template <> -inline void logWarningIfNotUnit(const std::string& warning) { - // do nothing -} - -} // detail - // TODO Have read/write/etc check that pipeline has been finalized template void Pipeline::finalize() { @@ -298,48 +259,4 @@ void Pipeline::finalize() { } } -template -template -bool Pipeline::setOwner(H* handler) { - typedef typename ContextType>::type Context; - for (auto& ctx : ctxs_) { - auto ctxImpl = dynamic_cast(ctx.get()); - if (ctxImpl && ctxImpl->getHandler() == handler) { - owner_ = ctx; - return true; - } - } - return false; -} - -template -template -void Pipeline::addContextFront(Context* ctx) { - addHelper(std::shared_ptr(ctx, [](Context*){}), true); -} - -template -void Pipeline::detachHandlers() { - for (auto& ctx : ctxs_) { - if (ctx != owner_) { - ctx->detachPipeline(); - } - } -} - -template -template -Pipeline& Pipeline::addHelper( - std::shared_ptr&& ctx, - bool front) { - ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx); - if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) { - inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get()); - } - if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) { - outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get()); - } - return *this; -} - }} // folly::wangle diff --git a/folly/wangle/channel/Pipeline.cpp b/folly/wangle/channel/Pipeline.cpp new file mode 100644 index 00000000..331dd0b0 --- /dev/null +++ b/folly/wangle/channel/Pipeline.cpp @@ -0,0 +1,83 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace folly { namespace wangle { + +void PipelineBase::setWriteFlags(WriteFlags flags) { + writeFlags_ = flags; +} + +WriteFlags PipelineBase::getWriteFlags() { + return writeFlags_; +} + +void PipelineBase::setReadBufferSettings( + uint64_t minAvailable, + uint64_t allocationSize) { + readBufferSettings_ = std::make_pair(minAvailable, allocationSize); +} + +std::pair PipelineBase::getReadBufferSettings() { + return readBufferSettings_; +} + +typename PipelineBase::ContextIterator PipelineBase::removeAt( + const typename PipelineBase::ContextIterator& it) { + (*it)->detachPipeline(); + + const auto dir = (*it)->getDirection(); + if (dir == HandlerDir::BOTH || dir == HandlerDir::IN) { + auto it2 = std::find(inCtxs_.begin(), inCtxs_.end(), it->get()); + CHECK(it2 != inCtxs_.end()); + inCtxs_.erase(it2); + } + + if (dir == HandlerDir::BOTH || dir == HandlerDir::OUT) { + auto it2 = std::find(outCtxs_.begin(), outCtxs_.end(), it->get()); + CHECK(it2 != outCtxs_.end()); + outCtxs_.erase(it2); + } + + return ctxs_.erase(it); +} + +PipelineBase& PipelineBase::removeFront() { + if (ctxs_.empty()) { + throw std::invalid_argument("No handlers in pipeline"); + } + removeAt(ctxs_.begin()); + return *this; +} + +PipelineBase& PipelineBase::removeBack() { + if (ctxs_.empty()) { + throw std::invalid_argument("No handlers in pipeline"); + } + removeAt(--ctxs_.end()); + return *this; +} + +void PipelineBase::detachHandlers() { + for (auto& ctx : ctxs_) { + if (ctx != owner_) { + ctx->detachPipeline(); + } + } +} + +}} // folly::wangle diff --git a/folly/wangle/channel/Pipeline.h b/folly/wangle/channel/Pipeline.h index 85d89d89..0a81be99 100644 --- a/folly/wangle/channel/Pipeline.h +++ b/folly/wangle/channel/Pipeline.h @@ -16,16 +16,18 @@ #pragma once -#include #include #include #include #include +#include #include #include namespace folly { namespace wangle { +class PipelineBase; + class PipelineManager { public: virtual ~PipelineManager() = default; @@ -54,92 +56,43 @@ class PipelineBase : public DelayedDestruction { return transport_; } - private: - PipelineManager* manager_{nullptr}; - std::shared_ptr transport_; -}; - -/* - * R is the inbound type, i.e. inbound calls start with pipeline.read(R) - * W is the outbound type, i.e. outbound calls start with pipeline.write(W) - * - * Use Unit for one of the types if your pipeline is unidirectional. - * If R is Unit, read(), readEOF(), and readException() will be disabled. - * If W is Unit, write() and close() will be disabled. - */ -template -class Pipeline : public PipelineBase { - public: - Pipeline(); - ~Pipeline(); - void setWriteFlags(WriteFlags flags); WriteFlags getWriteFlags(); void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize); std::pair getReadBufferSettings(); - template - typename std::enable_if::value>::type - read(R msg); - - template - typename std::enable_if::value>::type - readEOF(); - - template - typename std::enable_if::value>::type - readException(exception_wrapper e); - - template - typename std::enable_if::value>::type - transportActive(); - - template - typename std::enable_if::value>::type - transportInactive(); - - template - typename std::enable_if::value, Future>::type - write(W msg); - - template - typename std::enable_if::value, Future>::type - close(); - template - Pipeline& addBack(std::shared_ptr handler); + PipelineBase& addBack(std::shared_ptr handler); template - Pipeline& addBack(H&& handler); + PipelineBase& addBack(H&& handler); template - Pipeline& addBack(H* handler); + PipelineBase& addBack(H* handler); template - Pipeline& addFront(std::shared_ptr handler); + PipelineBase& addFront(std::shared_ptr handler); template - Pipeline& addFront(H&& handler); + PipelineBase& addFront(H&& handler); template - Pipeline& addFront(H* handler); + PipelineBase& addFront(H* handler); template - Pipeline& remove(H* handler); + PipelineBase& remove(H* handler); template - Pipeline& remove(); + PipelineBase& remove(); - Pipeline& removeFront(); + PipelineBase& removeFront(); - Pipeline& removeBack(); + PipelineBase& removeBack(); template H* getHandler(int i); - void finalize(); - // If one of the handlers owns the pipeline itself, use setOwner to ensure // that the pipeline doesn't try to detach the handler during destruction, // lest destruction ordering issues occur. @@ -147,20 +100,27 @@ class Pipeline : public PipelineBase { template bool setOwner(H* handler); - protected: - explicit Pipeline(bool isStatic); + virtual void finalize() = 0; + protected: template void addContextFront(Context* ctx); void detachHandlers(); + std::vector> ctxs_; + std::vector inCtxs_; + std::vector outCtxs_; + private: + PipelineManager* manager_{nullptr}; + std::shared_ptr transport_; + template - Pipeline& addHelper(std::shared_ptr&& ctx, bool front); + PipelineBase& addHelper(std::shared_ptr&& ctx, bool front); template - Pipeline& removeHelper(H* handler, bool checkEqual); + PipelineBase& removeHelper(H* handler, bool checkEqual); typedef std::vector>::iterator ContextIterator; @@ -170,11 +130,59 @@ class Pipeline : public PipelineBase { WriteFlags writeFlags_{WriteFlags::NONE}; std::pair readBufferSettings_{2048, 2048}; - bool isStatic_{false}; std::shared_ptr owner_; - std::vector> ctxs_; - std::vector inCtxs_; - std::vector outCtxs_; +}; + +/* + * R is the inbound type, i.e. inbound calls start with pipeline.read(R) + * W is the outbound type, i.e. outbound calls start with pipeline.write(W) + * + * Use Unit for one of the types if your pipeline is unidirectional. + * If R is Unit, read(), readEOF(), and readException() will be disabled. + * If W is Unit, write() and close() will be disabled. + */ +template +class Pipeline : public PipelineBase { + public: + Pipeline(); + ~Pipeline(); + + template + typename std::enable_if::value>::type + read(R msg); + + template + typename std::enable_if::value>::type + readEOF(); + + template + typename std::enable_if::value>::type + readException(exception_wrapper e); + + template + typename std::enable_if::value>::type + transportActive(); + + template + typename std::enable_if::value>::type + transportInactive(); + + template + typename std::enable_if::value, Future>::type + write(W msg); + + template + typename std::enable_if::value, Future>::type + close(); + + void finalize() override; + + protected: + explicit Pipeline(bool isStatic); + + private: + bool isStatic_{false}; + InboundLink* front_{nullptr}; OutboundLink* back_{nullptr}; }; diff --git a/folly/wangle/channel/StaticPipeline.h b/folly/wangle/channel/StaticPipeline.h index a5d2e893..ef4a804a 100644 --- a/folly/wangle/channel/StaticPipeline.h +++ b/folly/wangle/channel/StaticPipeline.h @@ -131,7 +131,7 @@ class StaticPipeline bool isFirst_; std::shared_ptr handlerPtr_; - typename ContextType>::type ctx_; + typename ContextType::type ctx_; }; }} // folly::wangle -- 2.34.1