From 45e225673d52fbd46547a82c8f7e620b0a76e6e5 Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Mon, 4 May 2015 07:27:04 -0700 Subject: [PATCH] Split HandlerContext and Pipeline into inl headers Summary: Leave the important headers visible. Opens the door for decent inline docs. Test Plan: unit Reviewed By: hans@fb.com Subscribers: fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D2036799 Signature: t1:2036799:1430749004:db5e58655c27b96826549849722fe962b9ae3985 --- folly/Makefile.am | 2 + folly/wangle/channel/HandlerContext-inl.h | 410 ++++++++++++++++++++++ folly/wangle/channel/HandlerContext.h | 391 +-------------------- folly/wangle/channel/Pipeline-inl.h | 271 ++++++++++++++ folly/wangle/channel/Pipeline.h | 212 ++--------- 5 files changed, 718 insertions(+), 568 deletions(-) create mode 100644 folly/wangle/channel/HandlerContext-inl.h create mode 100644 folly/wangle/channel/Pipeline-inl.h diff --git a/folly/Makefile.am b/folly/Makefile.am index 442e070b..b11be450 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -278,8 +278,10 @@ nobase_follyinclude_HEADERS = \ wangle/channel/AsyncSocketHandler.h \ wangle/channel/Handler.h \ wangle/channel/HandlerContext.h \ + wangle/channel/HandlerContext-inl.h \ wangle/channel/OutputBufferingHandler.h \ wangle/channel/Pipeline.h \ + wangle/channel/Pipeline-inl.h \ wangle/channel/StaticPipeline.h \ wangle/concurrent/BlockingQueue.h \ wangle/concurrent/Codel.h \ diff --git a/folly/wangle/channel/HandlerContext-inl.h b/folly/wangle/channel/HandlerContext-inl.h new file mode 100644 index 00000000..3d472467 --- /dev/null +++ b/folly/wangle/channel/HandlerContext-inl.h @@ -0,0 +1,410 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace folly { namespace wangle { + +class PipelineContext { + public: + virtual ~PipelineContext() {} + + virtual void attachPipeline() = 0; + virtual void detachPipeline() = 0; + + virtual void attachTransport() = 0; + virtual void detachTransport() = 0; + + template + void attachContext(H* handler, HandlerContext* ctx) { + if (++handler->attachCount_ == 1) { + handler->ctx_ = ctx; + } else { + handler->ctx_ = nullptr; + } + } + + virtual void setNextIn(PipelineContext* ctx) = 0; + virtual void setNextOut(PipelineContext* ctx) = 0; +}; + +template +class InboundLink { + public: + virtual ~InboundLink() {} + virtual void read(In msg) = 0; + virtual void readEOF() = 0; + virtual void readException(exception_wrapper e) = 0; +}; + +template +class OutboundLink { + public: + virtual ~OutboundLink() {} + virtual Future write(Out msg) = 0; + virtual Future close() = 0; +}; + +template +class ContextImplBase : public PipelineContext { + public: + ~ContextImplBase() {} + + H* getHandler() { + return handler_.get(); + } + + void initialize(P* pipeline, std::shared_ptr handler) { + pipeline_ = pipeline; + handler_ = std::move(handler); + } + + // PipelineContext overrides + void attachPipeline() override { + if (!attached_) { + this->attachContext(handler_.get(), impl_); + handler_->attachPipeline(impl_); + attached_ = true; + } + } + + void detachPipeline() override { + handler_->detachPipeline(impl_); + attached_ = false; + } + + void attachTransport() override { + DestructorGuard dg(pipeline_); + handler_->attachTransport(impl_); + } + + void detachTransport() override { + DestructorGuard dg(pipeline_); + handler_->detachTransport(impl_); + } + + void setNextIn(PipelineContext* ctx) override { + auto nextIn = dynamic_cast*>(ctx); + if (nextIn) { + nextIn_ = nextIn; + } else { + throw std::invalid_argument("inbound type mismatch"); + } + } + + void setNextOut(PipelineContext* ctx) override { + auto nextOut = dynamic_cast*>(ctx); + if (nextOut) { + nextOut_ = nextOut; + } else { + throw std::invalid_argument("outbound type mismatch"); + } + } + + protected: + Context* impl_; + P* pipeline_; + std::shared_ptr handler_; + InboundLink* nextIn_{nullptr}; + OutboundLink* nextOut_{nullptr}; + + private: + bool attached_{false}; + using DestructorGuard = typename P::DestructorGuard; +}; + +template +class ContextImpl + : public HandlerContext, + public InboundLink, + public OutboundLink, + public ContextImplBase> { + public: + typedef typename H::rin Rin; + typedef typename H::rout Rout; + typedef typename H::win Win; + typedef typename H::wout Wout; + static const HandlerDir dir = HandlerDir::BOTH; + + explicit ContextImpl(P* pipeline, std::shared_ptr handler) { + this->impl_ = this; + this->initialize(pipeline, std::move(handler)); + } + + // For StaticPipeline + ContextImpl() { + this->impl_ = this; + } + + ~ContextImpl() {} + + // HandlerContext overrides + void fireRead(Rout msg) override { + DestructorGuard dg(this->pipeline_); + if (this->nextIn_) { + this->nextIn_->read(std::forward(msg)); + } else { + LOG(WARNING) << "read reached end of pipeline"; + } + } + + void fireReadEOF() override { + DestructorGuard dg(this->pipeline_); + if (this->nextIn_) { + this->nextIn_->readEOF(); + } else { + LOG(WARNING) << "readEOF reached end of pipeline"; + } + } + + void fireReadException(exception_wrapper e) override { + DestructorGuard dg(this->pipeline_); + if (this->nextIn_) { + this->nextIn_->readException(std::move(e)); + } else { + LOG(WARNING) << "readException reached end of pipeline"; + } + } + + Future fireWrite(Wout msg) override { + DestructorGuard dg(this->pipeline_); + if (this->nextOut_) { + return this->nextOut_->write(std::forward(msg)); + } else { + LOG(WARNING) << "write reached end of pipeline"; + return makeFuture(); + } + } + + Future fireClose() override { + DestructorGuard dg(this->pipeline_); + if (this->nextOut_) { + return this->nextOut_->close(); + } else { + LOG(WARNING) << "close reached end of pipeline"; + return makeFuture(); + } + } + + std::shared_ptr getTransport() override { + return this->pipeline_->getTransport(); + } + + void setWriteFlags(WriteFlags flags) override { + this->pipeline_->setWriteFlags(flags); + } + + WriteFlags getWriteFlags() override { + return this->pipeline_->getWriteFlags(); + } + + void setReadBufferSettings( + uint64_t minAvailable, + uint64_t allocationSize) override { + this->pipeline_->setReadBufferSettings(minAvailable, allocationSize); + } + + std::pair getReadBufferSettings() override { + return this->pipeline_->getReadBufferSettings(); + } + + // InboundLink overrides + void read(Rin msg) override { + DestructorGuard dg(this->pipeline_); + this->handler_->read(this, std::forward(msg)); + } + + void readEOF() override { + DestructorGuard dg(this->pipeline_); + this->handler_->readEOF(this); + } + + void readException(exception_wrapper e) override { + DestructorGuard dg(this->pipeline_); + this->handler_->readException(this, std::move(e)); + } + + // OutboundLink overrides + Future write(Win msg) override { + DestructorGuard dg(this->pipeline_); + return this->handler_->write(this, std::forward(msg)); + } + + Future close() override { + DestructorGuard dg(this->pipeline_); + return this->handler_->close(this); + } + + private: + using DestructorGuard = typename P::DestructorGuard; +}; + +template +class InboundContextImpl + : public InboundHandlerContext, + public InboundLink, + public ContextImplBase> { + public: + typedef typename H::rin Rin; + typedef typename H::rout Rout; + typedef typename H::win Win; + typedef typename H::wout Wout; + static const HandlerDir dir = HandlerDir::IN; + + explicit InboundContextImpl(P* pipeline, std::shared_ptr handler) { + this->impl_ = this; + this->initialize(pipeline, std::move(handler)); + } + + // For StaticPipeline + InboundContextImpl() { + this->impl_ = this; + } + + ~InboundContextImpl() {} + + // InboundHandlerContext overrides + void fireRead(Rout msg) override { + DestructorGuard dg(this->pipeline_); + if (this->nextIn_) { + this->nextIn_->read(std::forward(msg)); + } else { + LOG(WARNING) << "read reached end of pipeline"; + } + } + + void fireReadEOF() override { + DestructorGuard dg(this->pipeline_); + if (this->nextIn_) { + this->nextIn_->readEOF(); + } else { + LOG(WARNING) << "readEOF reached end of pipeline"; + } + } + + void fireReadException(exception_wrapper e) override { + DestructorGuard dg(this->pipeline_); + if (this->nextIn_) { + this->nextIn_->readException(std::move(e)); + } else { + LOG(WARNING) << "readException reached end of pipeline"; + } + } + + std::shared_ptr getTransport() override { + return this->pipeline_->getTransport(); + } + + // InboundLink overrides + void read(Rin msg) override { + DestructorGuard dg(this->pipeline_); + this->handler_->read(this, std::forward(msg)); + } + + void readEOF() override { + DestructorGuard dg(this->pipeline_); + this->handler_->readEOF(this); + } + + void readException(exception_wrapper e) override { + DestructorGuard dg(this->pipeline_); + this->handler_->readException(this, std::move(e)); + } + + private: + using DestructorGuard = typename P::DestructorGuard; +}; + +template +class OutboundContextImpl + : public OutboundHandlerContext, + public OutboundLink, + public ContextImplBase> { + public: + typedef typename H::rin Rin; + typedef typename H::rout Rout; + typedef typename H::win Win; + typedef typename H::wout Wout; + static const HandlerDir dir = HandlerDir::OUT; + + explicit OutboundContextImpl(P* pipeline, std::shared_ptr handler) { + this->impl_ = this; + this->initialize(pipeline, std::move(handler)); + } + + // For StaticPipeline + OutboundContextImpl() { + this->impl_ = this; + } + + ~OutboundContextImpl() {} + + // OutboundHandlerContext overrides + Future fireWrite(Wout msg) override { + DestructorGuard dg(this->pipeline_); + if (this->nextOut_) { + return this->nextOut_->write(std::forward(msg)); + } else { + LOG(WARNING) << "write reached end of pipeline"; + return makeFuture(); + } + } + + Future fireClose() override { + DestructorGuard dg(this->pipeline_); + if (this->nextOut_) { + return this->nextOut_->close(); + } else { + LOG(WARNING) << "close reached end of pipeline"; + return makeFuture(); + } + } + + std::shared_ptr getTransport() override { + return this->pipeline_->getTransport(); + } + + // OutboundLink overrides + Future write(Win msg) override { + DestructorGuard dg(this->pipeline_); + return this->handler_->write(this, std::forward(msg)); + } + + Future close() override { + DestructorGuard dg(this->pipeline_); + return this->handler_->close(this); + } + + private: + using DestructorGuard = typename P::DestructorGuard; +}; + +template +struct ContextType { + typedef typename std::conditional< + Handler::dir == HandlerDir::BOTH, + ContextImpl, + typename std::conditional< + Handler::dir == HandlerDir::IN, + InboundContextImpl, + OutboundContextImpl + >::type>::type + type; +}; + +}} // folly::wangle diff --git a/folly/wangle/channel/HandlerContext.h b/folly/wangle/channel/HandlerContext.h index 2f899796..f95a80f0 100644 --- a/folly/wangle/channel/HandlerContext.h +++ b/folly/wangle/channel/HandlerContext.h @@ -88,393 +88,6 @@ enum class HandlerDir { BOTH }; -class PipelineContext { - public: - virtual ~PipelineContext() {} - - virtual void attachPipeline() = 0; - virtual void detachPipeline() = 0; - - virtual void attachTransport() = 0; - virtual void detachTransport() = 0; - - template - void attachContext(H* handler, HandlerContext* ctx) { - if (++handler->attachCount_ == 1) { - handler->ctx_ = ctx; - } else { - handler->ctx_ = nullptr; - } - } - - virtual void setNextIn(PipelineContext* ctx) = 0; - virtual void setNextOut(PipelineContext* ctx) = 0; -}; - -template -class InboundLink { - public: - virtual ~InboundLink() {} - virtual void read(In msg) = 0; - virtual void readEOF() = 0; - virtual void readException(exception_wrapper e) = 0; -}; - -template -class OutboundLink { - public: - virtual ~OutboundLink() {} - virtual Future write(Out msg) = 0; - virtual Future close() = 0; -}; - -template -class ContextImplBase : public PipelineContext { - public: - ~ContextImplBase() {} - - H* getHandler() { - return handler_.get(); - } - - void initialize(P* pipeline, std::shared_ptr handler) { - pipeline_ = pipeline; - handler_ = std::move(handler); - } - - // PipelineContext overrides - void attachPipeline() override { - if (!attached_) { - this->attachContext(handler_.get(), impl_); - handler_->attachPipeline(impl_); - attached_ = true; - } - } - - void detachPipeline() override { - handler_->detachPipeline(impl_); - attached_ = false; - } - - void attachTransport() override { - DestructorGuard dg(pipeline_); - handler_->attachTransport(impl_); - } - - void detachTransport() override { - DestructorGuard dg(pipeline_); - handler_->detachTransport(impl_); - } - - void setNextIn(PipelineContext* ctx) override { - auto nextIn = dynamic_cast*>(ctx); - if (nextIn) { - nextIn_ = nextIn; - } else { - throw std::invalid_argument("inbound type mismatch"); - } - } - - void setNextOut(PipelineContext* ctx) override { - auto nextOut = dynamic_cast*>(ctx); - if (nextOut) { - nextOut_ = nextOut; - } else { - throw std::invalid_argument("outbound type mismatch"); - } - } - - protected: - Context* impl_; - P* pipeline_; - std::shared_ptr handler_; - InboundLink* nextIn_{nullptr}; - OutboundLink* nextOut_{nullptr}; - - private: - bool attached_{false}; - using DestructorGuard = typename P::DestructorGuard; -}; - -template -class ContextImpl - : public HandlerContext, - public InboundLink, - public OutboundLink, - public ContextImplBase> { - public: - typedef typename H::rin Rin; - typedef typename H::rout Rout; - typedef typename H::win Win; - typedef typename H::wout Wout; - static const HandlerDir dir = HandlerDir::BOTH; - - explicit ContextImpl(P* pipeline, std::shared_ptr handler) { - this->impl_ = this; - this->initialize(pipeline, std::move(handler)); - } - - // For StaticPipeline - ContextImpl() { - this->impl_ = this; - } - - ~ContextImpl() {} - - // HandlerContext overrides - void fireRead(Rout msg) override { - DestructorGuard dg(this->pipeline_); - if (this->nextIn_) { - this->nextIn_->read(std::forward(msg)); - } else { - LOG(WARNING) << "read reached end of pipeline"; - } - } - - void fireReadEOF() override { - DestructorGuard dg(this->pipeline_); - if (this->nextIn_) { - this->nextIn_->readEOF(); - } else { - LOG(WARNING) << "readEOF reached end of pipeline"; - } - } - - void fireReadException(exception_wrapper e) override { - DestructorGuard dg(this->pipeline_); - if (this->nextIn_) { - this->nextIn_->readException(std::move(e)); - } else { - LOG(WARNING) << "readException reached end of pipeline"; - } - } - - Future fireWrite(Wout msg) override { - DestructorGuard dg(this->pipeline_); - if (this->nextOut_) { - return this->nextOut_->write(std::forward(msg)); - } else { - LOG(WARNING) << "write reached end of pipeline"; - return makeFuture(); - } - } - - Future fireClose() override { - DestructorGuard dg(this->pipeline_); - if (this->nextOut_) { - return this->nextOut_->close(); - } else { - LOG(WARNING) << "close reached end of pipeline"; - return makeFuture(); - } - } - - std::shared_ptr getTransport() override { - return this->pipeline_->getTransport(); - } - - void setWriteFlags(WriteFlags flags) override { - this->pipeline_->setWriteFlags(flags); - } - - WriteFlags getWriteFlags() override { - return this->pipeline_->getWriteFlags(); - } - - void setReadBufferSettings( - uint64_t minAvailable, - uint64_t allocationSize) override { - this->pipeline_->setReadBufferSettings(minAvailable, allocationSize); - } - - std::pair getReadBufferSettings() override { - return this->pipeline_->getReadBufferSettings(); - } - - // InboundLink overrides - void read(Rin msg) override { - DestructorGuard dg(this->pipeline_); - this->handler_->read(this, std::forward(msg)); - } - - void readEOF() override { - DestructorGuard dg(this->pipeline_); - this->handler_->readEOF(this); - } - - void readException(exception_wrapper e) override { - DestructorGuard dg(this->pipeline_); - this->handler_->readException(this, std::move(e)); - } - - // OutboundLink overrides - Future write(Win msg) override { - DestructorGuard dg(this->pipeline_); - return this->handler_->write(this, std::forward(msg)); - } - - Future close() override { - DestructorGuard dg(this->pipeline_); - return this->handler_->close(this); - } - - private: - using DestructorGuard = typename P::DestructorGuard; -}; - -template -class InboundContextImpl - : public InboundHandlerContext, - public InboundLink, - public ContextImplBase> { - public: - typedef typename H::rin Rin; - typedef typename H::rout Rout; - typedef typename H::win Win; - typedef typename H::wout Wout; - static const HandlerDir dir = HandlerDir::IN; - - explicit InboundContextImpl(P* pipeline, std::shared_ptr handler) { - this->impl_ = this; - this->initialize(pipeline, std::move(handler)); - } - - // For StaticPipeline - InboundContextImpl() { - this->impl_ = this; - } - - ~InboundContextImpl() {} - - // InboundHandlerContext overrides - void fireRead(Rout msg) override { - DestructorGuard dg(this->pipeline_); - if (this->nextIn_) { - this->nextIn_->read(std::forward(msg)); - } else { - LOG(WARNING) << "read reached end of pipeline"; - } - } - - void fireReadEOF() override { - DestructorGuard dg(this->pipeline_); - if (this->nextIn_) { - this->nextIn_->readEOF(); - } else { - LOG(WARNING) << "readEOF reached end of pipeline"; - } - } - - void fireReadException(exception_wrapper e) override { - DestructorGuard dg(this->pipeline_); - if (this->nextIn_) { - this->nextIn_->readException(std::move(e)); - } else { - LOG(WARNING) << "readException reached end of pipeline"; - } - } - - std::shared_ptr getTransport() override { - return this->pipeline_->getTransport(); - } - - // InboundLink overrides - void read(Rin msg) override { - DestructorGuard dg(this->pipeline_); - this->handler_->read(this, std::forward(msg)); - } - - void readEOF() override { - DestructorGuard dg(this->pipeline_); - this->handler_->readEOF(this); - } - - void readException(exception_wrapper e) override { - DestructorGuard dg(this->pipeline_); - this->handler_->readException(this, std::move(e)); - } - - private: - using DestructorGuard = typename P::DestructorGuard; -}; - -template -class OutboundContextImpl - : public OutboundHandlerContext, - public OutboundLink, - public ContextImplBase> { - public: - typedef typename H::rin Rin; - typedef typename H::rout Rout; - typedef typename H::win Win; - typedef typename H::wout Wout; - static const HandlerDir dir = HandlerDir::OUT; - - explicit OutboundContextImpl(P* pipeline, std::shared_ptr handler) { - this->impl_ = this; - this->initialize(pipeline, std::move(handler)); - } - - // For StaticPipeline - OutboundContextImpl() { - this->impl_ = this; - } - - ~OutboundContextImpl() {} - - // OutboundHandlerContext overrides - Future fireWrite(Wout msg) override { - DestructorGuard dg(this->pipeline_); - if (this->nextOut_) { - return this->nextOut_->write(std::forward(msg)); - } else { - LOG(WARNING) << "write reached end of pipeline"; - return makeFuture(); - } - } - - Future fireClose() override { - DestructorGuard dg(this->pipeline_); - if (this->nextOut_) { - return this->nextOut_->close(); - } else { - LOG(WARNING) << "close reached end of pipeline"; - return makeFuture(); - } - } - - std::shared_ptr getTransport() override { - return this->pipeline_->getTransport(); - } - - // OutboundLink overrides - Future write(Win msg) override { - DestructorGuard dg(this->pipeline_); - return this->handler_->write(this, std::forward(msg)); - } - - Future close() override { - DestructorGuard dg(this->pipeline_); - return this->handler_->close(this); - } - - private: - using DestructorGuard = typename P::DestructorGuard; -}; - -template -struct ContextType { - typedef typename std::conditional< - Handler::dir == HandlerDir::BOTH, - ContextImpl, - typename std::conditional< - Handler::dir == HandlerDir::IN, - InboundContextImpl, - OutboundContextImpl - >::type>::type - type; -}; +}} // folly::wangle -}} +#include diff --git a/folly/wangle/channel/Pipeline-inl.h b/folly/wangle/channel/Pipeline-inl.h new file mode 100644 index 00000000..458f502c --- /dev/null +++ b/folly/wangle/channel/Pipeline-inl.h @@ -0,0 +1,271 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace folly { namespace wangle { + +template +Pipeline::Pipeline() : isStatic_(false) {} + +template +Pipeline::Pipeline(bool isStatic) : isStatic_(isStatic) { + CHECK(isStatic_); +} + +template +Pipeline::~Pipeline() { + if (!isStatic_) { + detachHandlers(); + } +} + +template +std::shared_ptr Pipeline::getTransport() { + return transport_; +} + +template +void Pipeline::setWriteFlags(WriteFlags flags) { + writeFlags_ = flags; +} + +template +WriteFlags Pipeline::getWriteFlags() { + return writeFlags_; +} + +template +void Pipeline::setReadBufferSettings( + uint64_t minAvailable, + uint64_t allocationSize) { + readBufferSettings_ = std::make_pair(minAvailable, allocationSize); +} + +template +std::pair Pipeline::getReadBufferSettings() { + return readBufferSettings_; +} + +template +template +typename std::enable_if::value>::type +Pipeline::read(R msg) { + if (!front_) { + throw std::invalid_argument("read(): no inbound handler in Pipeline"); + } + front_->read(std::forward(msg)); +} + +template +template +typename std::enable_if::value>::type +Pipeline::readEOF() { + if (!front_) { + throw std::invalid_argument("readEOF(): no inbound handler in Pipeline"); + } + front_->readEOF(); +} + +template +template +typename std::enable_if::value>::type +Pipeline::readException(exception_wrapper e) { + if (!front_) { + throw std::invalid_argument( + "readException(): no inbound handler in Pipeline"); + } + front_->readException(std::move(e)); +} + +template +template +typename std::enable_if::value, Future>::type +Pipeline::write(W msg) { + if (!back_) { + throw std::invalid_argument("write(): no outbound handler in Pipeline"); + } + return back_->write(std::forward(msg)); +} + +template +template +typename std::enable_if::value, Future>::type +Pipeline::close() { + if (!back_) { + throw std::invalid_argument("close(): no outbound handler in Pipeline"); + } + return back_->close(); +} + +template +template +Pipeline& Pipeline::addBack(std::shared_ptr handler) { + typedef typename ContextType>::type Context; + return addHelper(std::make_shared(this, std::move(handler)), false); +} + +template +template +Pipeline& Pipeline::addBack(H&& handler) { + return addBack(std::make_shared(std::forward(handler))); +} + +template +template +Pipeline& Pipeline::addBack(H* handler) { + return addBack(std::shared_ptr(handler, [](H*){})); +} + +template +template +Pipeline& Pipeline::addFront(std::shared_ptr handler) { + typedef typename ContextType>::type Context; + return addHelper(std::make_shared(this, std::move(handler)), true); +} + +template +template +Pipeline& Pipeline::addFront(H&& handler) { + return addFront(std::make_shared(std::forward(handler))); +} + +template +template +Pipeline& Pipeline::addFront(H* handler) { + return addFront(std::shared_ptr(handler, [](H*){})); +} + +template +template +H* Pipeline::getHandler(int i) { + typedef typename ContextType>::type Context; + auto ctx = dynamic_cast(ctxs_[i].get()); + CHECK(ctx); + return ctx->getHandler(); +} + +namespace detail { + +template +inline void logWarningIfNotNothing(const std::string& warning) { + LOG(WARNING) << warning; +} + +template <> +inline void logWarningIfNotNothing(const std::string& warning) { + // do nothing +} + +} // detail + +// TODO Have read/write/etc check that pipeline has been finalized +template +void Pipeline::finalize() { + if (!inCtxs_.empty()) { + front_ = dynamic_cast*>(inCtxs_.front()); + for (size_t i = 0; i < inCtxs_.size() - 1; i++) { + inCtxs_[i]->setNextIn(inCtxs_[i+1]); + } + } + + if (!outCtxs_.empty()) { + back_ = dynamic_cast*>(outCtxs_.back()); + for (size_t i = outCtxs_.size() - 1; i > 0; i--) { + outCtxs_[i]->setNextOut(outCtxs_[i-1]); + } + } + + if (!front_) { + detail::logWarningIfNotNothing( + "No inbound handler in Pipeline, inbound operations will throw " + "std::invalid_argument"); + } + if (!back_) { + detail::logWarningIfNotNothing( + "No outbound handler in Pipeline, outbound operations will throw " + "std::invalid_argument"); + } + + for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) { + (*it)->attachPipeline(); + } +} + +template +template +bool Pipeline::setOwner(H* handler) { + typedef typename ContextType>::type Context; + for (auto& ctx : ctxs_) { + auto ctxImpl = dynamic_cast(ctx.get()); + if (ctxImpl && ctxImpl->getHandler() == handler) { + owner_ = ctx; + return true; + } + } + return false; +} + +template +void Pipeline::attachTransport( + std::shared_ptr transport) { + transport_ = std::move(transport); + for (auto& ctx : ctxs_) { + ctx->attachTransport(); + } +} + +template +void Pipeline::detachTransport() { + transport_ = nullptr; + for (auto& ctx : ctxs_) { + ctx->detachTransport(); + } +} + +template +template +void Pipeline::addContextFront(Context* ctx) { + addHelper(std::shared_ptr(ctx, [](Context*){}), true); +} + +template +void Pipeline::detachHandlers() { + for (auto& ctx : ctxs_) { + if (ctx != owner_) { + ctx->detachPipeline(); + } + } +} + +template +template +Pipeline& Pipeline::addHelper( + std::shared_ptr&& ctx, + bool front) { + ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx); + if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) { + inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get()); + } + if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) { + outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get()); + } + return *this; +} + +}} // folly::wangle diff --git a/folly/wangle/channel/Pipeline.h b/folly/wangle/channel/Pipeline.h index 700fc804..fa039fbe 100644 --- a/folly/wangle/channel/Pipeline.h +++ b/folly/wangle/channel/Pipeline.h @@ -22,27 +22,11 @@ #include #include #include -#include namespace folly { namespace wangle { -// See Pipeline docblock for purpose struct Nothing{}; -namespace detail { - -template -inline void logWarningIfNotNothing(const std::string& warning) { - LOG(WARNING) << warning; -} - -template <> -inline void logWarningIfNotNothing(const std::string& warning) { - // do nothing -} - -} // detail - /* * R is the inbound type, i.e. inbound calls start with pipeline.read(R) * W is the outbound type, i.e. outbound calls start with pipeline.write(W) @@ -54,214 +38,82 @@ inline void logWarningIfNotNothing(const std::string& warning) { template class Pipeline : public DelayedDestruction { public: - Pipeline() : isStatic_(false) {} - - ~Pipeline() { - if (!isStatic_) { - detachHandlers(); - } - } + Pipeline(); + ~Pipeline(); - std::shared_ptr getTransport() { - return transport_; - } + std::shared_ptr getTransport(); - void setWriteFlags(WriteFlags flags) { - writeFlags_ = flags; - } + void setWriteFlags(WriteFlags flags); + WriteFlags getWriteFlags(); - WriteFlags getWriteFlags() { - return writeFlags_; - } - - void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize) { - readBufferSettings_ = std::make_pair(minAvailable, allocationSize); - } - - std::pair getReadBufferSettings() { - return readBufferSettings_; - } + void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize); + std::pair getReadBufferSettings(); template typename std::enable_if::value>::type - read(R msg) { - if (!front_) { - throw std::invalid_argument("read(): no inbound handler in Pipeline"); - } - front_->read(std::forward(msg)); - } + read(R msg); template typename std::enable_if::value>::type - readEOF() { - if (!front_) { - throw std::invalid_argument("readEOF(): no inbound handler in Pipeline"); - } - front_->readEOF(); - } + readEOF(); template typename std::enable_if::value>::type - readException(exception_wrapper e) { - if (!front_) { - throw std::invalid_argument( - "readException(): no inbound handler in Pipeline"); - } - front_->readException(std::move(e)); - } + readException(exception_wrapper e); template typename std::enable_if::value, Future>::type - write(W msg) { - if (!back_) { - throw std::invalid_argument("write(): no outbound handler in Pipeline"); - } - return back_->write(std::forward(msg)); - } + write(W msg); template typename std::enable_if::value, Future>::type - close() { - if (!back_) { - throw std::invalid_argument("close(): no outbound handler in Pipeline"); - } - return back_->close(); - } + close(); template - Pipeline& addBack(std::shared_ptr handler) { - typedef typename ContextType::type Context; - return addHelper(std::make_shared(this, std::move(handler)), false); - } + Pipeline& addBack(std::shared_ptr handler); template - Pipeline& addBack(H&& handler) { - return addBack(std::make_shared(std::forward(handler))); - } + Pipeline& addBack(H&& handler); template - Pipeline& addBack(H* handler) { - return addBack(std::shared_ptr(handler, [](H*){})); - } + Pipeline& addBack(H* handler); template - Pipeline& addFront(std::shared_ptr handler) { - typedef typename ContextType::type Context; - return addHelper(std::make_shared(this, std::move(handler)), true); - } + Pipeline& addFront(std::shared_ptr handler); template - Pipeline& addFront(H&& handler) { - return addFront(std::make_shared(std::forward(handler))); - } + Pipeline& addFront(H&& handler); template - Pipeline& addFront(H* handler) { - return addFront(std::shared_ptr(handler, [](H*){})); - } + Pipeline& addFront(H* handler); template - H* getHandler(int i) { - typedef typename ContextType::type Context; - auto ctx = dynamic_cast(ctxs_[i].get()); - CHECK(ctx); - return ctx->getHandler(); - } - - // TODO Have read/write/etc check that pipeline has been finalized - void finalize() { - if (!inCtxs_.empty()) { - front_ = dynamic_cast*>(inCtxs_.front()); - for (size_t i = 0; i < inCtxs_.size() - 1; i++) { - inCtxs_[i]->setNextIn(inCtxs_[i+1]); - } - } - - if (!outCtxs_.empty()) { - back_ = dynamic_cast*>(outCtxs_.back()); - for (size_t i = outCtxs_.size() - 1; i > 0; i--) { - outCtxs_[i]->setNextOut(outCtxs_[i-1]); - } - } - - if (!front_) { - detail::logWarningIfNotNothing( - "No inbound handler in Pipeline, inbound operations will throw " - "std::invalid_argument"); - } - if (!back_) { - detail::logWarningIfNotNothing( - "No outbound handler in Pipeline, outbound operations will throw " - "std::invalid_argument"); - } - - for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) { - (*it)->attachPipeline(); - } - } + H* getHandler(int i); + + void finalize(); // If one of the handlers owns the pipeline itself, use setOwner to ensure // that the pipeline doesn't try to detach the handler during destruction, // lest destruction ordering issues occur. // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example template - bool setOwner(H* handler) { - typedef typename ContextType::type Context; - for (auto& ctx : ctxs_) { - auto ctxImpl = dynamic_cast(ctx.get()); - if (ctxImpl && ctxImpl->getHandler() == handler) { - owner_ = ctx; - return true; - } - } - return false; - } - - void attachTransport( - std::shared_ptr transport) { - transport_ = std::move(transport); - for (auto& ctx : ctxs_) { - ctx->attachTransport(); - } - } - - void detachTransport() { - transport_ = nullptr; - for (auto& ctx : ctxs_) { - ctx->detachTransport(); - } - } + bool setOwner(H* handler); + + void attachTransport(std::shared_ptr transport); + + void detachTransport(); protected: - explicit Pipeline(bool isStatic) : isStatic_(isStatic) { - CHECK(isStatic_); - } + explicit Pipeline(bool isStatic); template - void addContextFront(Context* ctx) { - addHelper(std::shared_ptr(ctx, [](Context*){}), true); - } - - void detachHandlers() { - for (auto& ctx : ctxs_) { - if (ctx != owner_) { - ctx->detachPipeline(); - } - } - } + void addContextFront(Context* ctx); + + void detachHandlers(); private: template - Pipeline& addHelper(std::shared_ptr&& ctx, bool front) { - ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx); - if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) { - inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get()); - } - if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) { - outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get()); - } - return *this; - } + Pipeline& addHelper(std::shared_ptr&& ctx, bool front); std::shared_ptr transport_; WriteFlags writeFlags_{WriteFlags::NONE}; @@ -290,3 +142,5 @@ class PipelineFactory { }; } + +#include -- 2.34.1