pipeline handler removal, fix service test
[folly.git] / folly / wangle / channel / Pipeline-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <glog/logging.h>
20
21 namespace folly { namespace wangle {
22
23 template <class R, class W>
24 Pipeline<R, W>::Pipeline() : isStatic_(false) {}
25
26 template <class R, class W>
27 Pipeline<R, W>::Pipeline(bool isStatic) : isStatic_(isStatic) {
28   CHECK(isStatic_);
29 }
30
31 template <class R, class W>
32 Pipeline<R, W>::~Pipeline() {
33   if (!isStatic_) {
34     detachHandlers();
35   }
36 }
37
38 template <class R, class W>
39 void Pipeline<R, W>::setWriteFlags(WriteFlags flags) {
40   writeFlags_ = flags;
41 }
42
43 template <class R, class W>
44 WriteFlags Pipeline<R, W>::getWriteFlags() {
45   return writeFlags_;
46 }
47
48 template <class R, class W>
49 void Pipeline<R, W>::setReadBufferSettings(
50     uint64_t minAvailable,
51     uint64_t allocationSize) {
52   readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
53 }
54
55 template <class R, class W>
56 std::pair<uint64_t, uint64_t> Pipeline<R, W>::getReadBufferSettings() {
57   return readBufferSettings_;
58 }
59
60 template <class R, class W>
61 template <class T>
62 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
63 Pipeline<R, W>::read(R msg) {
64   if (!front_) {
65     throw std::invalid_argument("read(): no inbound handler in Pipeline");
66   }
67   front_->read(std::forward<R>(msg));
68 }
69
70 template <class R, class W>
71 template <class T>
72 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
73 Pipeline<R, W>::readEOF() {
74   if (!front_) {
75     throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
76   }
77   front_->readEOF();
78 }
79
80 template <class R, class W>
81 template <class T>
82 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
83 Pipeline<R, W>::transportActive() {
84   if (front_) {
85     front_->transportActive();
86   }
87 }
88
89 template <class R, class W>
90 template <class T>
91 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
92 Pipeline<R, W>::transportInactive() {
93   if (front_) {
94     front_->transportInactive();
95   }
96 }
97
98 template <class R, class W>
99 template <class T>
100 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
101 Pipeline<R, W>::readException(exception_wrapper e) {
102   if (!front_) {
103     throw std::invalid_argument(
104         "readException(): no inbound handler in Pipeline");
105   }
106   front_->readException(std::move(e));
107 }
108
109 template <class R, class W>
110 template <class T>
111 typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
112 Pipeline<R, W>::write(W msg) {
113   if (!back_) {
114     throw std::invalid_argument("write(): no outbound handler in Pipeline");
115   }
116   return back_->write(std::forward<W>(msg));
117 }
118
119 template <class R, class W>
120 template <class T>
121 typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
122 Pipeline<R, W>::close() {
123   if (!back_) {
124     throw std::invalid_argument("close(): no outbound handler in Pipeline");
125   }
126   return back_->close();
127 }
128
129 template <class R, class W>
130 template <class H>
131 Pipeline<R, W>& Pipeline<R, W>::addBack(std::shared_ptr<H> handler) {
132   typedef typename ContextType<H, Pipeline<R, W>>::type Context;
133   return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
134 }
135
136 template <class R, class W>
137 template <class H>
138 Pipeline<R, W>& Pipeline<R, W>::addBack(H&& handler) {
139   return addBack(std::make_shared<H>(std::forward<H>(handler)));
140 }
141
142 template <class R, class W>
143 template <class H>
144 Pipeline<R, W>& Pipeline<R, W>::addBack(H* handler) {
145   return addBack(std::shared_ptr<H>(handler, [](H*){}));
146 }
147
148 template <class R, class W>
149 template <class H>
150 Pipeline<R, W>& Pipeline<R, W>::addFront(std::shared_ptr<H> handler) {
151   typedef typename ContextType<H, Pipeline<R, W>>::type Context;
152   return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
153 }
154
155 template <class R, class W>
156 template <class H>
157 Pipeline<R, W>& Pipeline<R, W>::addFront(H&& handler) {
158   return addFront(std::make_shared<H>(std::forward<H>(handler)));
159 }
160
161 template <class R, class W>
162 template <class H>
163 Pipeline<R, W>& Pipeline<R, W>::addFront(H* handler) {
164   return addFront(std::shared_ptr<H>(handler, [](H*){}));
165 }
166
167 template <class R, class W>
168 template <class H>
169 Pipeline<R, W>& Pipeline<R, W>::removeHelper(H* handler, bool checkEqual) {
170   typedef typename ContextType<H, Pipeline<R, W>>::type Context;
171   bool removed = false;
172   for (auto it = ctxs_.begin(); it != ctxs_.end(); it++) {
173     auto ctx = std::dynamic_pointer_cast<Context>(*it);
174     if (ctx && (!checkEqual || ctx->getHandler() == handler)) {
175       it = removeAt(it);
176       removed = true;
177       if (it == ctxs_.end()) {
178         break;
179       }
180     }
181   }
182
183   if (!removed) {
184     throw std::invalid_argument("No such handler in pipeline");
185   }
186
187   return *this;
188 }
189
190 template <class R, class W>
191 template <class H>
192 Pipeline<R, W>& Pipeline<R, W>::remove() {
193   return removeHelper<H>(nullptr, false);
194 }
195
196 template <class R, class W>
197 template <class H>
198 Pipeline<R, W>& Pipeline<R, W>::remove(H* handler) {
199   return removeHelper<H>(handler, true);
200 }
201
202 template <class R, class W>
203 typename Pipeline<R, W>::ContextIterator Pipeline<R, W>::removeAt(
204     const typename Pipeline<R, W>::ContextIterator& it) {
205   (*it)->detachPipeline();
206
207   const auto dir = (*it)->getDirection();
208   if (dir == HandlerDir::BOTH || dir == HandlerDir::IN) {
209     auto it2 = std::find(inCtxs_.begin(), inCtxs_.end(), it->get());
210     CHECK(it2 != inCtxs_.end());
211     inCtxs_.erase(it2);
212   }
213
214   if (dir == HandlerDir::BOTH || dir == HandlerDir::OUT) {
215     auto it2 = std::find(outCtxs_.begin(), outCtxs_.end(), it->get());
216     CHECK(it2 != outCtxs_.end());
217     outCtxs_.erase(it2);
218   }
219
220   return ctxs_.erase(it);
221 }
222
223 template <class R, class W>
224 Pipeline<R, W>& Pipeline<R, W>::removeFront() {
225   if (ctxs_.empty()) {
226     throw std::invalid_argument("No handlers in pipeline");
227   }
228   removeAt(ctxs_.begin());
229   return *this;
230 }
231
232 template <class R, class W>
233 Pipeline<R, W>& Pipeline<R, W>::removeBack() {
234   if (ctxs_.empty()) {
235     throw std::invalid_argument("No handlers in pipeline");
236   }
237   removeAt(--ctxs_.end());
238   return *this;
239 }
240
241 template <class R, class W>
242 template <class H>
243 H* Pipeline<R, W>::getHandler(int i) {
244   typedef typename ContextType<H, Pipeline<R, W>>::type Context;
245   auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
246   CHECK(ctx);
247   return ctx->getHandler();
248 }
249
250 namespace detail {
251
252 template <class T>
253 inline void logWarningIfNotNothing(const std::string& warning) {
254   LOG(WARNING) << warning;
255 }
256
257 template <>
258 inline void logWarningIfNotNothing<Nothing>(const std::string& warning) {
259   // do nothing
260 }
261
262 } // detail
263
264 // TODO Have read/write/etc check that pipeline has been finalized
265 template <class R, class W>
266 void Pipeline<R, W>::finalize() {
267   front_ = nullptr;
268   if (!inCtxs_.empty()) {
269     front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
270     for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
271       inCtxs_[i]->setNextIn(inCtxs_[i+1]);
272     }
273     inCtxs_.back()->setNextIn(nullptr);
274   }
275
276   back_ = nullptr;
277   if (!outCtxs_.empty()) {
278     back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
279     for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
280       outCtxs_[i]->setNextOut(outCtxs_[i-1]);
281     }
282     outCtxs_.front()->setNextOut(nullptr);
283   }
284
285   if (!front_) {
286     detail::logWarningIfNotNothing<R>(
287         "No inbound handler in Pipeline, inbound operations will throw "
288         "std::invalid_argument");
289   }
290   if (!back_) {
291     detail::logWarningIfNotNothing<W>(
292         "No outbound handler in Pipeline, outbound operations will throw "
293         "std::invalid_argument");
294   }
295
296   for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
297     (*it)->attachPipeline();
298   }
299 }
300
301 template <class R, class W>
302 template <class H>
303 bool Pipeline<R, W>::setOwner(H* handler) {
304   typedef typename ContextType<H, Pipeline<R, W>>::type Context;
305   for (auto& ctx : ctxs_) {
306     auto ctxImpl = dynamic_cast<Context*>(ctx.get());
307     if (ctxImpl && ctxImpl->getHandler() == handler) {
308       owner_ = ctx;
309       return true;
310     }
311   }
312   return false;
313 }
314
315 template <class R, class W>
316 template <class Context>
317 void Pipeline<R, W>::addContextFront(Context* ctx) {
318   addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
319 }
320
321 template <class R, class W>
322 void Pipeline<R, W>::detachHandlers() {
323   for (auto& ctx : ctxs_) {
324     if (ctx != owner_) {
325       ctx->detachPipeline();
326     }
327   }
328 }
329
330 template <class R, class W>
331 template <class Context>
332 Pipeline<R, W>& Pipeline<R, W>::addHelper(
333     std::shared_ptr<Context>&& ctx,
334     bool front) {
335   ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
336   if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
337     inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
338   }
339   if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
340     outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
341   }
342   return *this;
343 }
344
345 }} // folly::wangle