738a83f9d8b9414e3495b89f2c99e8f0f9260177
[folly.git] / folly / wangle / channel / Pipeline.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/wangle/channel/HandlerContext.h>
20 #include <folly/futures/Future.h>
21 #include <folly/io/async/AsyncTransport.h>
22 #include <folly/io/async/DelayedDestruction.h>
23 #include <folly/ExceptionWrapper.h>
24 #include <folly/Memory.h>
25 #include <glog/logging.h>
26
27 namespace folly { namespace wangle {
28
29 /*
30  * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
31  * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
32  */
33 template <class R, class W>
34 class Pipeline : public DelayedDestruction {
35  public:
36   Pipeline() : isStatic_(false) {}
37
38   ~Pipeline() {
39     if (!isStatic_) {
40       detachHandlers();
41     }
42   }
43
44   std::shared_ptr<AsyncTransport> getTransport() {
45     return transport_;
46   }
47
48   void setWriteFlags(WriteFlags flags) {
49     writeFlags_ = flags;
50   }
51
52   WriteFlags getWriteFlags() {
53     return writeFlags_;
54   }
55
56   void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize) {
57     readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
58   }
59
60   std::pair<uint64_t, uint64_t> getReadBufferSettings() {
61     return readBufferSettings_;
62   }
63
64   void read(R msg) {
65     front_->read(std::forward<R>(msg));
66   }
67
68   void readEOF() {
69     front_->readEOF();
70   }
71
72   void readException(exception_wrapper e) {
73     front_->readException(std::move(e));
74   }
75
76   Future<void> write(W msg) {
77     return back_->write(std::forward<W>(msg));
78   }
79
80   Future<void> close() {
81     return back_->close();
82   }
83
84   template <class H>
85   Pipeline& addBack(std::shared_ptr<H> handler) {
86     ctxs_.push_back(std::make_shared<ContextImpl<Pipeline, H>>(
87         this,
88         std::move(handler)));
89     return *this;
90   }
91
92   template <class H>
93   Pipeline& addBack(H* handler) {
94     return addBack(std::shared_ptr<H>(handler, [](H*){}));
95   }
96
97   template <class H>
98   Pipeline& addBack(H&& handler) {
99     return addBack(std::make_shared<H>(std::forward<H>(handler)));
100   }
101
102   template <class H>
103   Pipeline& addFront(std::shared_ptr<H> handler) {
104     ctxs_.insert(
105         ctxs_.begin(),
106         std::make_shared<ContextImpl<Pipeline, H>>(this, std::move(handler)));
107     return *this;
108   }
109
110   template <class H>
111   Pipeline& addFront(H* handler) {
112     return addFront(std::shared_ptr<H>(handler, [](H*){}));
113   }
114
115   template <class H>
116   Pipeline& addFront(H&& handler) {
117     return addFront(std::make_shared<H>(std::forward<H>(handler)));
118   }
119
120   template <class H>
121   H* getHandler(int i) {
122     auto ctx = dynamic_cast<ContextImpl<Pipeline, H>*>(ctxs_[i].get());
123     CHECK(ctx);
124     return ctx->getHandler();
125   }
126
127   void finalize() {
128     if (ctxs_.empty()) {
129       return;
130     }
131
132     for (size_t i = 0; i < ctxs_.size() - 1; i++) {
133       ctxs_[i]->link(ctxs_[i+1].get());
134     }
135
136     back_ = dynamic_cast<OutboundLink<W>*>(ctxs_.back().get());
137     if (!back_) {
138       throw std::invalid_argument("wrong type for last handler");
139     }
140
141     front_ = dynamic_cast<InboundLink<R>*>(ctxs_.front().get());
142     if (!front_) {
143       throw std::invalid_argument("wrong type for first handler");
144     }
145
146     for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
147       (*it)->attachPipeline();
148     }
149   }
150
151   // If one of the handlers owns the pipeline itself, use setOwner to ensure
152   // that the pipeline doesn't try to detach the handler during destruction,
153   // lest destruction ordering issues occur.
154   // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
155   template <class H>
156   bool setOwner(H* handler) {
157     for (auto& ctx : ctxs_) {
158       auto ctxImpl = dynamic_cast<ContextImpl<Pipeline, H>*>(ctx.get());
159       if (ctxImpl && ctxImpl->getHandler() == handler) {
160         owner_ = ctx;
161         return true;
162       }
163     }
164     return false;
165   }
166
167   void attachTransport(
168       std::shared_ptr<AsyncTransport> transport) {
169     transport_ = std::move(transport);
170     for (auto& ctx : ctxs_) {
171       ctx->attachTransport();
172     }
173   }
174
175   void detachTransport() {
176     transport_ = nullptr;
177     for (auto& ctx : ctxs_) {
178       ctx->detachTransport();
179     }
180   }
181
182  protected:
183   explicit Pipeline(bool isStatic) : isStatic_(isStatic) {
184     CHECK(isStatic_);
185   }
186
187   template <class Context>
188   void addContextFront(Context* context) {
189     ctxs_.insert(
190         ctxs_.begin(),
191         std::shared_ptr<Context>(context, [](Context*){}));
192   }
193
194   void detachHandlers() {
195     for (auto& ctx : ctxs_) {
196       if (ctx != owner_) {
197         ctx->detachPipeline();
198       }
199     }
200   }
201
202  private:
203   std::shared_ptr<AsyncTransport> transport_;
204   WriteFlags writeFlags_{WriteFlags::NONE};
205   std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
206
207   bool isStatic_{false};
208   InboundLink<R>* front_{nullptr};
209   OutboundLink<W>* back_{nullptr};
210   std::vector<std::shared_ptr<PipelineContext>> ctxs_;
211   std::shared_ptr<PipelineContext> owner_;
212 };
213
214 }}
215
216 namespace folly {
217
218 class AsyncSocket;
219
220 template <typename Pipeline>
221 class PipelineFactory {
222  public:
223   virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) = 0;
224   virtual ~PipelineFactory() {}
225 };
226
227 }