2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <glog/logging.h>
21 namespace folly { namespace wangle {
23 template <class R, class W>
24 Pipeline<R, W>::Pipeline() : isStatic_(false) {}
26 template <class R, class W>
27 Pipeline<R, W>::Pipeline(bool isStatic) : isStatic_(isStatic) {
31 template <class R, class W>
32 Pipeline<R, W>::~Pipeline() {
38 template <class R, class W>
39 std::shared_ptr<AsyncTransport> Pipeline<R, W>::getTransport() {
43 template <class R, class W>
44 void Pipeline<R, W>::setWriteFlags(WriteFlags flags) {
48 template <class R, class W>
49 WriteFlags Pipeline<R, W>::getWriteFlags() {
53 template <class R, class W>
54 void Pipeline<R, W>::setReadBufferSettings(
55 uint64_t minAvailable,
56 uint64_t allocationSize) {
57 readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
60 template <class R, class W>
61 std::pair<uint64_t, uint64_t> Pipeline<R, W>::getReadBufferSettings() {
62 return readBufferSettings_;
65 template <class R, class W>
67 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
68 Pipeline<R, W>::read(R msg) {
70 throw std::invalid_argument("read(): no inbound handler in Pipeline");
72 front_->read(std::forward<R>(msg));
75 template <class R, class W>
77 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
78 Pipeline<R, W>::readEOF() {
80 throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
85 template <class R, class W>
87 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
88 Pipeline<R, W>::readException(exception_wrapper e) {
90 throw std::invalid_argument(
91 "readException(): no inbound handler in Pipeline");
93 front_->readException(std::move(e));
96 template <class R, class W>
98 typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
99 Pipeline<R, W>::write(W msg) {
101 throw std::invalid_argument("write(): no outbound handler in Pipeline");
103 return back_->write(std::forward<W>(msg));
106 template <class R, class W>
108 typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
109 Pipeline<R, W>::close() {
111 throw std::invalid_argument("close(): no outbound handler in Pipeline");
113 return back_->close();
116 template <class R, class W>
118 Pipeline<R, W>& Pipeline<R, W>::addBack(std::shared_ptr<H> handler) {
119 typedef typename ContextType<H, Pipeline<R, W>>::type Context;
120 return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
123 template <class R, class W>
125 Pipeline<R, W>& Pipeline<R, W>::addBack(H&& handler) {
126 return addBack(std::make_shared<H>(std::forward<H>(handler)));
129 template <class R, class W>
131 Pipeline<R, W>& Pipeline<R, W>::addBack(H* handler) {
132 return addBack(std::shared_ptr<H>(handler, [](H*){}));
135 template <class R, class W>
137 Pipeline<R, W>& Pipeline<R, W>::addFront(std::shared_ptr<H> handler) {
138 typedef typename ContextType<H, Pipeline<R, W>>::type Context;
139 return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
142 template <class R, class W>
144 Pipeline<R, W>& Pipeline<R, W>::addFront(H&& handler) {
145 return addFront(std::make_shared<H>(std::forward<H>(handler)));
148 template <class R, class W>
150 Pipeline<R, W>& Pipeline<R, W>::addFront(H* handler) {
151 return addFront(std::shared_ptr<H>(handler, [](H*){}));
154 template <class R, class W>
156 H* Pipeline<R, W>::getHandler(int i) {
157 typedef typename ContextType<H, Pipeline<R, W>>::type Context;
158 auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
160 return ctx->getHandler();
166 inline void logWarningIfNotNothing(const std::string& warning) {
167 LOG(WARNING) << warning;
171 inline void logWarningIfNotNothing<Nothing>(const std::string& warning) {
177 // TODO Have read/write/etc check that pipeline has been finalized
178 template <class R, class W>
179 void Pipeline<R, W>::finalize() {
180 if (!inCtxs_.empty()) {
181 front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
182 for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
183 inCtxs_[i]->setNextIn(inCtxs_[i+1]);
187 if (!outCtxs_.empty()) {
188 back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
189 for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
190 outCtxs_[i]->setNextOut(outCtxs_[i-1]);
195 detail::logWarningIfNotNothing<R>(
196 "No inbound handler in Pipeline, inbound operations will throw "
197 "std::invalid_argument");
200 detail::logWarningIfNotNothing<W>(
201 "No outbound handler in Pipeline, outbound operations will throw "
202 "std::invalid_argument");
205 for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
206 (*it)->attachPipeline();
210 template <class R, class W>
212 bool Pipeline<R, W>::setOwner(H* handler) {
213 typedef typename ContextType<H, Pipeline<R, W>>::type Context;
214 for (auto& ctx : ctxs_) {
215 auto ctxImpl = dynamic_cast<Context*>(ctx.get());
216 if (ctxImpl && ctxImpl->getHandler() == handler) {
224 template <class R, class W>
225 void Pipeline<R, W>::attachTransport(
226 std::shared_ptr<AsyncTransport> transport) {
227 transport_ = std::move(transport);
228 for (auto& ctx : ctxs_) {
229 ctx->attachTransport();
233 template <class R, class W>
234 void Pipeline<R, W>::detachTransport() {
235 transport_ = nullptr;
236 for (auto& ctx : ctxs_) {
237 ctx->detachTransport();
241 template <class R, class W>
242 template <class Context>
243 void Pipeline<R, W>::addContextFront(Context* ctx) {
244 addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
247 template <class R, class W>
248 void Pipeline<R, W>::detachHandlers() {
249 for (auto& ctx : ctxs_) {
251 ctx->detachPipeline();
256 template <class R, class W>
257 template <class Context>
258 Pipeline<R, W>& Pipeline<R, W>::addHelper(
259 std::shared_ptr<Context>&& ctx,
261 ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
262 if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
263 inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
265 if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
266 outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());