copy wangle back into folly
[folly.git] / folly / wangle / channel / Pipeline-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <glog/logging.h>
20
21 namespace folly { namespace wangle {
22
23 template <class R, class W>
24 Pipeline<R, W>::Pipeline() : isStatic_(false) {}
25
26 template <class R, class W>
27 Pipeline<R, W>::Pipeline(bool isStatic) : isStatic_(isStatic) {
28   CHECK(isStatic_);
29 }
30
31 template <class R, class W>
32 Pipeline<R, W>::~Pipeline() {
33   if (!isStatic_) {
34     detachHandlers();
35   }
36 }
37
38 template <class R, class W>
39 void Pipeline<R, W>::setWriteFlags(WriteFlags flags) {
40   writeFlags_ = flags;
41 }
42
43 template <class R, class W>
44 WriteFlags Pipeline<R, W>::getWriteFlags() {
45   return writeFlags_;
46 }
47
48 template <class R, class W>
49 void Pipeline<R, W>::setReadBufferSettings(
50     uint64_t minAvailable,
51     uint64_t allocationSize) {
52   readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
53 }
54
55 template <class R, class W>
56 std::pair<uint64_t, uint64_t> Pipeline<R, W>::getReadBufferSettings() {
57   return readBufferSettings_;
58 }
59
60 template <class R, class W>
61 template <class T>
62 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
63 Pipeline<R, W>::read(R msg) {
64   if (!front_) {
65     throw std::invalid_argument("read(): no inbound handler in Pipeline");
66   }
67   front_->read(std::forward<R>(msg));
68 }
69
70 template <class R, class W>
71 template <class T>
72 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
73 Pipeline<R, W>::readEOF() {
74   if (!front_) {
75     throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
76   }
77   front_->readEOF();
78 }
79
80 template <class R, class W>
81 template <class T>
82 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
83 Pipeline<R, W>::transportActive() {
84   if (front_) {
85     front_->transportActive();
86   }
87 }
88
89 template <class R, class W>
90 template <class T>
91 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
92 Pipeline<R, W>::transportInactive() {
93   if (front_) {
94     front_->transportInactive();
95   }
96 }
97
98 template <class R, class W>
99 template <class T>
100 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
101 Pipeline<R, W>::readException(exception_wrapper e) {
102   if (!front_) {
103     throw std::invalid_argument(
104         "readException(): no inbound handler in Pipeline");
105   }
106   front_->readException(std::move(e));
107 }
108
109 template <class R, class W>
110 template <class T>
111 typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
112 Pipeline<R, W>::write(W msg) {
113   if (!back_) {
114     throw std::invalid_argument("write(): no outbound handler in Pipeline");
115   }
116   return back_->write(std::forward<W>(msg));
117 }
118
119 template <class R, class W>
120 template <class T>
121 typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
122 Pipeline<R, W>::close() {
123   if (!back_) {
124     throw std::invalid_argument("close(): no outbound handler in Pipeline");
125   }
126   return back_->close();
127 }
128
129 template <class R, class W>
130 template <class H>
131 Pipeline<R, W>& Pipeline<R, W>::addBack(std::shared_ptr<H> handler) {
132   typedef typename ContextType<H, Pipeline<R, W>>::type Context;
133   return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
134 }
135
136 template <class R, class W>
137 template <class H>
138 Pipeline<R, W>& Pipeline<R, W>::addBack(H&& handler) {
139   return addBack(std::make_shared<H>(std::forward<H>(handler)));
140 }
141
142 template <class R, class W>
143 template <class H>
144 Pipeline<R, W>& Pipeline<R, W>::addBack(H* handler) {
145   return addBack(std::shared_ptr<H>(handler, [](H*){}));
146 }
147
148 template <class R, class W>
149 template <class H>
150 Pipeline<R, W>& Pipeline<R, W>::addFront(std::shared_ptr<H> handler) {
151   typedef typename ContextType<H, Pipeline<R, W>>::type Context;
152   return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
153 }
154
155 template <class R, class W>
156 template <class H>
157 Pipeline<R, W>& Pipeline<R, W>::addFront(H&& handler) {
158   return addFront(std::make_shared<H>(std::forward<H>(handler)));
159 }
160
161 template <class R, class W>
162 template <class H>
163 Pipeline<R, W>& Pipeline<R, W>::addFront(H* handler) {
164   return addFront(std::shared_ptr<H>(handler, [](H*){}));
165 }
166
167 template <class R, class W>
168 template <class H>
169 H* Pipeline<R, W>::getHandler(int i) {
170   typedef typename ContextType<H, Pipeline<R, W>>::type Context;
171   auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
172   CHECK(ctx);
173   return ctx->getHandler();
174 }
175
176 namespace detail {
177
178 template <class T>
179 inline void logWarningIfNotNothing(const std::string& warning) {
180   LOG(WARNING) << warning;
181 }
182
183 template <>
184 inline void logWarningIfNotNothing<Nothing>(const std::string& warning) {
185   // do nothing
186 }
187
188 } // detail
189
190 // TODO Have read/write/etc check that pipeline has been finalized
191 template <class R, class W>
192 void Pipeline<R, W>::finalize() {
193   if (!inCtxs_.empty()) {
194     front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
195     for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
196       inCtxs_[i]->setNextIn(inCtxs_[i+1]);
197     }
198   }
199
200   if (!outCtxs_.empty()) {
201     back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
202     for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
203       outCtxs_[i]->setNextOut(outCtxs_[i-1]);
204     }
205   }
206
207   if (!front_) {
208     detail::logWarningIfNotNothing<R>(
209         "No inbound handler in Pipeline, inbound operations will throw "
210         "std::invalid_argument");
211   }
212   if (!back_) {
213     detail::logWarningIfNotNothing<W>(
214         "No outbound handler in Pipeline, outbound operations will throw "
215         "std::invalid_argument");
216   }
217
218   for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
219     (*it)->attachPipeline();
220   }
221 }
222
223 template <class R, class W>
224 template <class H>
225 bool Pipeline<R, W>::setOwner(H* handler) {
226   typedef typename ContextType<H, Pipeline<R, W>>::type Context;
227   for (auto& ctx : ctxs_) {
228     auto ctxImpl = dynamic_cast<Context*>(ctx.get());
229     if (ctxImpl && ctxImpl->getHandler() == handler) {
230       owner_ = ctx;
231       return true;
232     }
233   }
234   return false;
235 }
236
237 template <class R, class W>
238 template <class Context>
239 void Pipeline<R, W>::addContextFront(Context* ctx) {
240   addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
241 }
242
243 template <class R, class W>
244 void Pipeline<R, W>::detachHandlers() {
245   for (auto& ctx : ctxs_) {
246     if (ctx != owner_) {
247       ctx->detachPipeline();
248     }
249   }
250 }
251
252 template <class R, class W>
253 template <class Context>
254 Pipeline<R, W>& Pipeline<R, W>::addHelper(
255     std::shared_ptr<Context>&& ctx,
256     bool front) {
257   ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
258   if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
259     inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
260   }
261   if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
262     outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
263   }
264   return *this;
265 }
266
267 }} // folly::wangle