2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 namespace folly { namespace wangle {
21 class PipelineContext {
23 virtual ~PipelineContext() = default;
25 virtual void attachPipeline() = 0;
26 virtual void detachPipeline() = 0;
28 template <class H, class HandlerContext>
29 void attachContext(H* handler, HandlerContext* ctx) {
30 if (++handler->attachCount_ == 1) {
33 handler->ctx_ = nullptr;
37 virtual void setNextIn(PipelineContext* ctx) = 0;
38 virtual void setNextOut(PipelineContext* ctx) = 0;
40 virtual HandlerDir getDirection() = 0;
46 virtual ~InboundLink() = default;
47 virtual void read(In msg) = 0;
48 virtual void readEOF() = 0;
49 virtual void readException(exception_wrapper e) = 0;
50 virtual void transportActive() = 0;
51 virtual void transportInactive() = 0;
57 virtual ~OutboundLink() = default;
58 virtual Future<void> write(Out msg) = 0;
59 virtual Future<void> close() = 0;
62 template <class P, class H, class Context>
63 class ContextImplBase : public PipelineContext {
65 ~ContextImplBase() = default;
68 return handler_.get();
71 void initialize(P* pipeline, std::shared_ptr<H> handler) {
73 handler_ = std::move(handler);
76 // PipelineContext overrides
77 void attachPipeline() override {
79 this->attachContext(handler_.get(), impl_);
80 handler_->attachPipeline(impl_);
85 void detachPipeline() override {
86 handler_->detachPipeline(impl_);
90 void setNextIn(PipelineContext* ctx) override {
95 auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
99 throw std::invalid_argument("inbound type mismatch");
103 void setNextOut(PipelineContext* ctx) override {
108 auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);
112 throw std::invalid_argument("outbound type mismatch");
116 HandlerDir getDirection() override {
123 std::shared_ptr<H> handler_;
124 InboundLink<typename H::rout>* nextIn_{nullptr};
125 OutboundLink<typename H::wout>* nextOut_{nullptr};
128 bool attached_{false};
129 using DestructorGuard = typename P::DestructorGuard;
132 template <class P, class H>
134 : public HandlerContext<typename H::rout,
136 public InboundLink<typename H::rin>,
137 public OutboundLink<typename H::win>,
138 public ContextImplBase<P, H, HandlerContext<typename H::rout,
141 typedef typename H::rin Rin;
142 typedef typename H::rout Rout;
143 typedef typename H::win Win;
144 typedef typename H::wout Wout;
145 static const HandlerDir dir = HandlerDir::BOTH;
147 explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
149 this->initialize(pipeline, std::move(handler));
152 // For StaticPipeline
157 ~ContextImpl() = default;
159 // HandlerContext overrides
160 void fireRead(Rout msg) override {
161 DestructorGuard dg(this->pipeline_);
163 this->nextIn_->read(std::forward<Rout>(msg));
165 LOG(WARNING) << "read reached end of pipeline";
169 void fireReadEOF() override {
170 DestructorGuard dg(this->pipeline_);
172 this->nextIn_->readEOF();
174 LOG(WARNING) << "readEOF reached end of pipeline";
178 void fireReadException(exception_wrapper e) override {
179 DestructorGuard dg(this->pipeline_);
181 this->nextIn_->readException(std::move(e));
183 LOG(WARNING) << "readException reached end of pipeline";
187 void fireTransportActive() override {
188 DestructorGuard dg(this->pipeline_);
190 this->nextIn_->transportActive();
194 void fireTransportInactive() override {
195 DestructorGuard dg(this->pipeline_);
197 this->nextIn_->transportInactive();
201 Future<void> fireWrite(Wout msg) override {
202 DestructorGuard dg(this->pipeline_);
203 if (this->nextOut_) {
204 return this->nextOut_->write(std::forward<Wout>(msg));
206 LOG(WARNING) << "write reached end of pipeline";
211 Future<void> fireClose() override {
212 DestructorGuard dg(this->pipeline_);
213 if (this->nextOut_) {
214 return this->nextOut_->close();
216 LOG(WARNING) << "close reached end of pipeline";
221 PipelineBase* getPipeline() override {
222 return this->pipeline_;
225 void setWriteFlags(WriteFlags flags) override {
226 this->pipeline_->setWriteFlags(flags);
229 WriteFlags getWriteFlags() override {
230 return this->pipeline_->getWriteFlags();
233 void setReadBufferSettings(
234 uint64_t minAvailable,
235 uint64_t allocationSize) override {
236 this->pipeline_->setReadBufferSettings(minAvailable, allocationSize);
239 std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
240 return this->pipeline_->getReadBufferSettings();
243 // InboundLink overrides
244 void read(Rin msg) override {
245 DestructorGuard dg(this->pipeline_);
246 this->handler_->read(this, std::forward<Rin>(msg));
249 void readEOF() override {
250 DestructorGuard dg(this->pipeline_);
251 this->handler_->readEOF(this);
254 void readException(exception_wrapper e) override {
255 DestructorGuard dg(this->pipeline_);
256 this->handler_->readException(this, std::move(e));
259 void transportActive() override {
260 DestructorGuard dg(this->pipeline_);
261 this->handler_->transportActive(this);
264 void transportInactive() override {
265 DestructorGuard dg(this->pipeline_);
266 this->handler_->transportInactive(this);
269 // OutboundLink overrides
270 Future<void> write(Win msg) override {
271 DestructorGuard dg(this->pipeline_);
272 return this->handler_->write(this, std::forward<Win>(msg));
275 Future<void> close() override {
276 DestructorGuard dg(this->pipeline_);
277 return this->handler_->close(this);
281 using DestructorGuard = typename P::DestructorGuard;
284 template <class P, class H>
285 class InboundContextImpl
286 : public InboundHandlerContext<typename H::rout>,
287 public InboundLink<typename H::rin>,
288 public ContextImplBase<P, H, InboundHandlerContext<typename H::rout>> {
290 typedef typename H::rin Rin;
291 typedef typename H::rout Rout;
292 typedef typename H::win Win;
293 typedef typename H::wout Wout;
294 static const HandlerDir dir = HandlerDir::IN;
296 explicit InboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
298 this->initialize(pipeline, std::move(handler));
301 // For StaticPipeline
302 InboundContextImpl() {
306 ~InboundContextImpl() = default;
308 // InboundHandlerContext overrides
309 void fireRead(Rout msg) override {
310 DestructorGuard dg(this->pipeline_);
312 this->nextIn_->read(std::forward<Rout>(msg));
314 LOG(WARNING) << "read reached end of pipeline";
318 void fireReadEOF() override {
319 DestructorGuard dg(this->pipeline_);
321 this->nextIn_->readEOF();
323 LOG(WARNING) << "readEOF reached end of pipeline";
327 void fireReadException(exception_wrapper e) override {
328 DestructorGuard dg(this->pipeline_);
330 this->nextIn_->readException(std::move(e));
332 LOG(WARNING) << "readException reached end of pipeline";
336 void fireTransportActive() override {
337 DestructorGuard dg(this->pipeline_);
339 this->nextIn_->transportActive();
343 void fireTransportInactive() override {
344 DestructorGuard dg(this->pipeline_);
346 this->nextIn_->transportInactive();
350 PipelineBase* getPipeline() override {
351 return this->pipeline_;
354 // InboundLink overrides
355 void read(Rin msg) override {
356 DestructorGuard dg(this->pipeline_);
357 this->handler_->read(this, std::forward<Rin>(msg));
360 void readEOF() override {
361 DestructorGuard dg(this->pipeline_);
362 this->handler_->readEOF(this);
365 void readException(exception_wrapper e) override {
366 DestructorGuard dg(this->pipeline_);
367 this->handler_->readException(this, std::move(e));
370 void transportActive() override {
371 DestructorGuard dg(this->pipeline_);
372 this->handler_->transportActive(this);
375 void transportInactive() override {
376 DestructorGuard dg(this->pipeline_);
377 this->handler_->transportInactive(this);
381 using DestructorGuard = typename P::DestructorGuard;
384 template <class P, class H>
385 class OutboundContextImpl
386 : public OutboundHandlerContext<typename H::wout>,
387 public OutboundLink<typename H::win>,
388 public ContextImplBase<P, H, OutboundHandlerContext<typename H::wout>> {
390 typedef typename H::rin Rin;
391 typedef typename H::rout Rout;
392 typedef typename H::win Win;
393 typedef typename H::wout Wout;
394 static const HandlerDir dir = HandlerDir::OUT;
396 explicit OutboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
398 this->initialize(pipeline, std::move(handler));
401 // For StaticPipeline
402 OutboundContextImpl() {
406 ~OutboundContextImpl() = default;
408 // OutboundHandlerContext overrides
409 Future<void> fireWrite(Wout msg) override {
410 DestructorGuard dg(this->pipeline_);
411 if (this->nextOut_) {
412 return this->nextOut_->write(std::forward<Wout>(msg));
414 LOG(WARNING) << "write reached end of pipeline";
419 Future<void> fireClose() override {
420 DestructorGuard dg(this->pipeline_);
421 if (this->nextOut_) {
422 return this->nextOut_->close();
424 LOG(WARNING) << "close reached end of pipeline";
429 PipelineBase* getPipeline() override {
430 return this->pipeline_;
433 // OutboundLink overrides
434 Future<void> write(Win msg) override {
435 DestructorGuard dg(this->pipeline_);
436 return this->handler_->write(this, std::forward<Win>(msg));
439 Future<void> close() override {
440 DestructorGuard dg(this->pipeline_);
441 return this->handler_->close(this);
445 using DestructorGuard = typename P::DestructorGuard;
448 template <class Handler, class Pipeline>
450 typedef typename std::conditional<
451 Handler::dir == HandlerDir::BOTH,
452 ContextImpl<Pipeline, Handler>,
453 typename std::conditional<
454 Handler::dir == HandlerDir::IN,
455 InboundContextImpl<Pipeline, Handler>,
456 OutboundContextImpl<Pipeline, Handler>