Split HandlerContext and Pipeline into inl headers
[folly.git] / folly / wangle / channel / Pipeline-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <glog/logging.h>
20
21 namespace folly { namespace wangle {
22
23 template <class R, class W>
24 Pipeline<R, W>::Pipeline() : isStatic_(false) {}
25
26 template <class R, class W>
27 Pipeline<R, W>::Pipeline(bool isStatic) : isStatic_(isStatic) {
28   CHECK(isStatic_);
29 }
30
31 template <class R, class W>
32 Pipeline<R, W>::~Pipeline() {
33   if (!isStatic_) {
34     detachHandlers();
35   }
36 }
37
38 template <class R, class W>
39 std::shared_ptr<AsyncTransport> Pipeline<R, W>::getTransport() {
40   return transport_;
41 }
42
43 template <class R, class W>
44 void Pipeline<R, W>::setWriteFlags(WriteFlags flags) {
45   writeFlags_ = flags;
46 }
47
48 template <class R, class W>
49 WriteFlags Pipeline<R, W>::getWriteFlags() {
50   return writeFlags_;
51 }
52
53 template <class R, class W>
54 void Pipeline<R, W>::setReadBufferSettings(
55     uint64_t minAvailable,
56     uint64_t allocationSize) {
57   readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
58 }
59
60 template <class R, class W>
61 std::pair<uint64_t, uint64_t> Pipeline<R, W>::getReadBufferSettings() {
62   return readBufferSettings_;
63 }
64
65 template <class R, class W>
66 template <class T>
67 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
68 Pipeline<R, W>::read(R msg) {
69   if (!front_) {
70     throw std::invalid_argument("read(): no inbound handler in Pipeline");
71   }
72   front_->read(std::forward<R>(msg));
73 }
74
75 template <class R, class W>
76 template <class T>
77 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
78 Pipeline<R, W>::readEOF() {
79   if (!front_) {
80     throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
81   }
82   front_->readEOF();
83 }
84
85 template <class R, class W>
86 template <class T>
87 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
88 Pipeline<R, W>::readException(exception_wrapper e) {
89   if (!front_) {
90     throw std::invalid_argument(
91         "readException(): no inbound handler in Pipeline");
92   }
93   front_->readException(std::move(e));
94 }
95
96 template <class R, class W>
97 template <class T>
98 typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
99 Pipeline<R, W>::write(W msg) {
100   if (!back_) {
101     throw std::invalid_argument("write(): no outbound handler in Pipeline");
102   }
103   return back_->write(std::forward<W>(msg));
104 }
105
106 template <class R, class W>
107 template <class T>
108 typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
109 Pipeline<R, W>::close() {
110   if (!back_) {
111     throw std::invalid_argument("close(): no outbound handler in Pipeline");
112   }
113   return back_->close();
114 }
115
116 template <class R, class W>
117 template <class H>
118 Pipeline<R, W>& Pipeline<R, W>::addBack(std::shared_ptr<H> handler) {
119   typedef typename ContextType<H, Pipeline<R, W>>::type Context;
120   return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
121 }
122
123 template <class R, class W>
124 template <class H>
125 Pipeline<R, W>& Pipeline<R, W>::addBack(H&& handler) {
126   return addBack(std::make_shared<H>(std::forward<H>(handler)));
127 }
128
129 template <class R, class W>
130 template <class H>
131 Pipeline<R, W>& Pipeline<R, W>::addBack(H* handler) {
132   return addBack(std::shared_ptr<H>(handler, [](H*){}));
133 }
134
135 template <class R, class W>
136 template <class H>
137 Pipeline<R, W>& Pipeline<R, W>::addFront(std::shared_ptr<H> handler) {
138   typedef typename ContextType<H, Pipeline<R, W>>::type Context;
139   return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
140 }
141
142 template <class R, class W>
143 template <class H>
144 Pipeline<R, W>& Pipeline<R, W>::addFront(H&& handler) {
145   return addFront(std::make_shared<H>(std::forward<H>(handler)));
146 }
147
148 template <class R, class W>
149 template <class H>
150 Pipeline<R, W>& Pipeline<R, W>::addFront(H* handler) {
151   return addFront(std::shared_ptr<H>(handler, [](H*){}));
152 }
153
154 template <class R, class W>
155 template <class H>
156 H* Pipeline<R, W>::getHandler(int i) {
157   typedef typename ContextType<H, Pipeline<R, W>>::type Context;
158   auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
159   CHECK(ctx);
160   return ctx->getHandler();
161 }
162
163 namespace detail {
164
165 template <class T>
166 inline void logWarningIfNotNothing(const std::string& warning) {
167   LOG(WARNING) << warning;
168 }
169
170 template <>
171 inline void logWarningIfNotNothing<Nothing>(const std::string& warning) {
172   // do nothing
173 }
174
175 } // detail
176
177 // TODO Have read/write/etc check that pipeline has been finalized
178 template <class R, class W>
179 void Pipeline<R, W>::finalize() {
180   if (!inCtxs_.empty()) {
181     front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
182     for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
183       inCtxs_[i]->setNextIn(inCtxs_[i+1]);
184     }
185   }
186
187   if (!outCtxs_.empty()) {
188     back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
189     for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
190       outCtxs_[i]->setNextOut(outCtxs_[i-1]);
191     }
192   }
193
194   if (!front_) {
195     detail::logWarningIfNotNothing<R>(
196         "No inbound handler in Pipeline, inbound operations will throw "
197         "std::invalid_argument");
198   }
199   if (!back_) {
200     detail::logWarningIfNotNothing<W>(
201         "No outbound handler in Pipeline, outbound operations will throw "
202         "std::invalid_argument");
203   }
204
205   for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
206     (*it)->attachPipeline();
207   }
208 }
209
210 template <class R, class W>
211 template <class H>
212 bool Pipeline<R, W>::setOwner(H* handler) {
213   typedef typename ContextType<H, Pipeline<R, W>>::type Context;
214   for (auto& ctx : ctxs_) {
215     auto ctxImpl = dynamic_cast<Context*>(ctx.get());
216     if (ctxImpl && ctxImpl->getHandler() == handler) {
217       owner_ = ctx;
218       return true;
219     }
220   }
221   return false;
222 }
223
224 template <class R, class W>
225 void Pipeline<R, W>::attachTransport(
226     std::shared_ptr<AsyncTransport> transport) {
227   transport_ = std::move(transport);
228   for (auto& ctx : ctxs_) {
229     ctx->attachTransport();
230   }
231 }
232
233 template <class R, class W>
234 void Pipeline<R, W>::detachTransport() {
235   transport_ = nullptr;
236   for (auto& ctx : ctxs_) {
237     ctx->detachTransport();
238   }
239 }
240
241 template <class R, class W>
242 template <class Context>
243 void Pipeline<R, W>::addContextFront(Context* ctx) {
244   addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
245 }
246
247 template <class R, class W>
248 void Pipeline<R, W>::detachHandlers() {
249   for (auto& ctx : ctxs_) {
250     if (ctx != owner_) {
251       ctx->detachPipeline();
252     }
253   }
254 }
255
256 template <class R, class W>
257 template <class Context>
258 Pipeline<R, W>& Pipeline<R, W>::addHelper(
259     std::shared_ptr<Context>&& ctx,
260     bool front) {
261   ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
262   if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
263     inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
264   }
265   if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
266     outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
267   }
268   return *this;
269 }
270
271 }} // folly::wangle