2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/io/async/AsyncTransport.h>
20 #include <folly/futures/Future.h>
21 #include <folly/ExceptionWrapper.h>
23 namespace folly { namespace wangle {
25 template <class In, class Out>
26 class HandlerContext {
28 virtual ~HandlerContext() {}
30 virtual void fireRead(In msg) = 0;
31 virtual void fireReadEOF() = 0;
32 virtual void fireReadException(exception_wrapper e) = 0;
34 virtual Future<void> fireWrite(Out msg) = 0;
35 virtual Future<void> fireClose() = 0;
37 virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
39 virtual void setWriteFlags(WriteFlags flags) = 0;
40 virtual WriteFlags getWriteFlags() = 0;
42 virtual void setReadBufferSettings(
43 uint64_t minAvailable,
44 uint64_t allocationSize) = 0;
45 virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0;
49 virtual void addHandlerBefore(H&&) {}
51 virtual void addHandlerAfter(H&&) {}
53 virtual void replaceHandler(H&&) {}
54 virtual void removeHandler() {}
59 class InboundHandlerContext {
61 virtual ~InboundHandlerContext() {}
63 virtual void fireRead(In msg) = 0;
64 virtual void fireReadEOF() = 0;
65 virtual void fireReadException(exception_wrapper e) = 0;
67 virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
69 // TODO Need get/set writeFlags, readBufferSettings? Probably not.
70 // Do we even really need them stored in the pipeline at all?
71 // Could just always delegate to the socket impl
75 class OutboundHandlerContext {
77 virtual ~OutboundHandlerContext() {}
79 virtual Future<void> fireWrite(Out msg) = 0;
80 virtual Future<void> fireClose() = 0;
82 virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
85 enum class HandlerDir {
91 class PipelineContext {
93 virtual ~PipelineContext() {}
95 virtual void attachPipeline() = 0;
96 virtual void detachPipeline() = 0;
98 virtual void attachTransport() = 0;
99 virtual void detachTransport() = 0;
101 template <class H, class HandlerContext>
102 void attachContext(H* handler, HandlerContext* ctx) {
103 if (++handler->attachCount_ == 1) {
106 handler->ctx_ = nullptr;
110 virtual void setNextIn(PipelineContext* ctx) = 0;
111 virtual void setNextOut(PipelineContext* ctx) = 0;
117 virtual ~InboundLink() {}
118 virtual void read(In msg) = 0;
119 virtual void readEOF() = 0;
120 virtual void readException(exception_wrapper e) = 0;
126 virtual ~OutboundLink() {}
127 virtual Future<void> write(Out msg) = 0;
128 virtual Future<void> close() = 0;
131 template <class P, class H, class Context>
132 class ContextImplBase : public PipelineContext {
134 ~ContextImplBase() {}
137 return handler_.get();
140 void initialize(P* pipeline, std::shared_ptr<H> handler) {
141 pipeline_ = pipeline;
142 handler_ = std::move(handler);
145 // PipelineContext overrides
146 void attachPipeline() override {
148 this->attachContext(handler_.get(), impl_);
149 handler_->attachPipeline(impl_);
154 void detachPipeline() override {
155 handler_->detachPipeline(impl_);
159 void attachTransport() override {
160 DestructorGuard dg(pipeline_);
161 handler_->attachTransport(impl_);
164 void detachTransport() override {
165 DestructorGuard dg(pipeline_);
166 handler_->detachTransport(impl_);
169 void setNextIn(PipelineContext* ctx) override {
170 auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
174 throw std::invalid_argument("inbound type mismatch");
178 void setNextOut(PipelineContext* ctx) override {
179 auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);
183 throw std::invalid_argument("outbound type mismatch");
190 std::shared_ptr<H> handler_;
191 InboundLink<typename H::rout>* nextIn_{nullptr};
192 OutboundLink<typename H::wout>* nextOut_{nullptr};
195 bool attached_{false};
196 using DestructorGuard = typename P::DestructorGuard;
199 template <class P, class H>
201 : public HandlerContext<typename H::rout,
203 public InboundLink<typename H::rin>,
204 public OutboundLink<typename H::win>,
205 public ContextImplBase<P, H, HandlerContext<typename H::rout,
208 typedef typename H::rin Rin;
209 typedef typename H::rout Rout;
210 typedef typename H::win Win;
211 typedef typename H::wout Wout;
212 static const HandlerDir dir = HandlerDir::BOTH;
214 explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
216 this->initialize(pipeline, std::move(handler));
219 // For StaticPipeline
226 // HandlerContext overrides
227 void fireRead(Rout msg) override {
228 DestructorGuard dg(this->pipeline_);
230 this->nextIn_->read(std::forward<Rout>(msg));
232 LOG(WARNING) << "read reached end of pipeline";
236 void fireReadEOF() override {
237 DestructorGuard dg(this->pipeline_);
239 this->nextIn_->readEOF();
241 LOG(WARNING) << "readEOF reached end of pipeline";
245 void fireReadException(exception_wrapper e) override {
246 DestructorGuard dg(this->pipeline_);
248 this->nextIn_->readException(std::move(e));
250 LOG(WARNING) << "readException reached end of pipeline";
254 Future<void> fireWrite(Wout msg) override {
255 DestructorGuard dg(this->pipeline_);
256 if (this->nextOut_) {
257 return this->nextOut_->write(std::forward<Wout>(msg));
259 LOG(WARNING) << "write reached end of pipeline";
264 Future<void> fireClose() override {
265 DestructorGuard dg(this->pipeline_);
266 if (this->nextOut_) {
267 return this->nextOut_->close();
269 LOG(WARNING) << "close reached end of pipeline";
274 std::shared_ptr<AsyncTransport> getTransport() override {
275 return this->pipeline_->getTransport();
278 void setWriteFlags(WriteFlags flags) override {
279 this->pipeline_->setWriteFlags(flags);
282 WriteFlags getWriteFlags() override {
283 return this->pipeline_->getWriteFlags();
286 void setReadBufferSettings(
287 uint64_t minAvailable,
288 uint64_t allocationSize) override {
289 this->pipeline_->setReadBufferSettings(minAvailable, allocationSize);
292 std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
293 return this->pipeline_->getReadBufferSettings();
296 // InboundLink overrides
297 void read(Rin msg) override {
298 DestructorGuard dg(this->pipeline_);
299 this->handler_->read(this, std::forward<Rin>(msg));
302 void readEOF() override {
303 DestructorGuard dg(this->pipeline_);
304 this->handler_->readEOF(this);
307 void readException(exception_wrapper e) override {
308 DestructorGuard dg(this->pipeline_);
309 this->handler_->readException(this, std::move(e));
312 // OutboundLink overrides
313 Future<void> write(Win msg) override {
314 DestructorGuard dg(this->pipeline_);
315 return this->handler_->write(this, std::forward<Win>(msg));
318 Future<void> close() override {
319 DestructorGuard dg(this->pipeline_);
320 return this->handler_->close(this);
324 using DestructorGuard = typename P::DestructorGuard;
327 template <class P, class H>
328 class InboundContextImpl
329 : public InboundHandlerContext<typename H::rout>,
330 public InboundLink<typename H::rin>,
331 public ContextImplBase<P, H, InboundHandlerContext<typename H::rout>> {
333 typedef typename H::rin Rin;
334 typedef typename H::rout Rout;
335 typedef typename H::win Win;
336 typedef typename H::wout Wout;
337 static const HandlerDir dir = HandlerDir::IN;
339 explicit InboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
341 this->initialize(pipeline, std::move(handler));
344 // For StaticPipeline
345 InboundContextImpl() {
349 ~InboundContextImpl() {}
351 // InboundHandlerContext overrides
352 void fireRead(Rout msg) override {
353 DestructorGuard dg(this->pipeline_);
355 this->nextIn_->read(std::forward<Rout>(msg));
357 LOG(WARNING) << "read reached end of pipeline";
361 void fireReadEOF() override {
362 DestructorGuard dg(this->pipeline_);
364 this->nextIn_->readEOF();
366 LOG(WARNING) << "readEOF reached end of pipeline";
370 void fireReadException(exception_wrapper e) override {
371 DestructorGuard dg(this->pipeline_);
373 this->nextIn_->readException(std::move(e));
375 LOG(WARNING) << "readException reached end of pipeline";
379 std::shared_ptr<AsyncTransport> getTransport() override {
380 return this->pipeline_->getTransport();
383 // InboundLink overrides
384 void read(Rin msg) override {
385 DestructorGuard dg(this->pipeline_);
386 this->handler_->read(this, std::forward<Rin>(msg));
389 void readEOF() override {
390 DestructorGuard dg(this->pipeline_);
391 this->handler_->readEOF(this);
394 void readException(exception_wrapper e) override {
395 DestructorGuard dg(this->pipeline_);
396 this->handler_->readException(this, std::move(e));
400 using DestructorGuard = typename P::DestructorGuard;
403 template <class P, class H>
404 class OutboundContextImpl
405 : public OutboundHandlerContext<typename H::wout>,
406 public OutboundLink<typename H::win>,
407 public ContextImplBase<P, H, OutboundHandlerContext<typename H::wout>> {
409 typedef typename H::rin Rin;
410 typedef typename H::rout Rout;
411 typedef typename H::win Win;
412 typedef typename H::wout Wout;
413 static const HandlerDir dir = HandlerDir::OUT;
415 explicit OutboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
417 this->initialize(pipeline, std::move(handler));
420 // For StaticPipeline
421 OutboundContextImpl() {
425 ~OutboundContextImpl() {}
427 // OutboundHandlerContext overrides
428 Future<void> fireWrite(Wout msg) override {
429 DestructorGuard dg(this->pipeline_);
430 if (this->nextOut_) {
431 return this->nextOut_->write(std::forward<Wout>(msg));
433 LOG(WARNING) << "write reached end of pipeline";
438 Future<void> fireClose() override {
439 DestructorGuard dg(this->pipeline_);
440 if (this->nextOut_) {
441 return this->nextOut_->close();
443 LOG(WARNING) << "close reached end of pipeline";
448 std::shared_ptr<AsyncTransport> getTransport() override {
449 return this->pipeline_->getTransport();
452 // OutboundLink overrides
453 Future<void> write(Win msg) override {
454 DestructorGuard dg(this->pipeline_);
455 return this->handler_->write(this, std::forward<Win>(msg));
458 Future<void> close() override {
459 DestructorGuard dg(this->pipeline_);
460 return this->handler_->close(this);
464 using DestructorGuard = typename P::DestructorGuard;
467 template <class Handler, class Pipeline>
469 typedef typename std::conditional<
470 Handler::dir == HandlerDir::BOTH,
471 ContextImpl<Pipeline, Handler>,
472 typename std::conditional<
473 Handler::dir == HandlerDir::IN,
474 InboundContextImpl<Pipeline, Handler>,
475 OutboundContextImpl<Pipeline, Handler>