2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
21 #include <glog/logging.h>
23 #include <folly/detail/MPMCPipelineDetail.h>
28 * Helper tag template to use amplification > 1
30 template <class T, size_t Amp> class MPMCPipelineStage;
33 * Multi-Producer, Multi-Consumer pipeline.
35 * A N-stage pipeline is a combination of N+1 MPMC queues (see MPMCQueue.h).
37 * At each stage, you may dequeue the results from the previous stage (possibly
38 * from multiple threads) and enqueue results to the next stage. Regardless of
39 * the order of completion, data is delivered to the next stage in the original
40 * order. Each input is matched with a "ticket" which must be produced
41 * when enqueueing to the next stage.
43 * A given stage must produce exactly K ("amplification factor", default K=1)
44 * results for every input. This is enforced by requiring that each ticket
45 * is used exactly K times.
49 * // arguments are queue sizes
50 * MPMCPipeline<int, std::string, int> pipeline(10, 10, 10);
52 * pipeline.blockingWrite(42);
56 * auto ticket = pipeline.blockingReadStage<0>(val);
57 * pipeline.blockingWriteStage<0>(ticket, folly::to<std::string>(val));
62 * auto ticket = pipeline.blockingReadStage<1>(val);
65 * ival = folly::to<int>(val);
67 * // We must produce exactly 1 output even on exception!
69 * pipeline.blockingWriteStage<1>(ticket, ival);
73 * pipeline.blockingRead(result);
76 * To specify amplification factors greater than 1, use
77 * MPMCPipelineStage<T, amplification> instead of T in the declaration:
80 * MPMCPipelineStage<std::string, 2>,
81 * MPMCPipelineStage<int, 4>>
83 * declares a two-stage pipeline: the first stage produces 2 strings
84 * for each input int, the second stage produces 4 ints for each input string,
85 * so, overall, the pipeline produces 2*4 = 8 ints for each input int.
87 * Implementation details: we use N+1 MPMCQueue objects; each intermediate
88 * queue connects two adjacent stages. The MPMCQueue implementation is abused;
89 * instead of using it as a queue, we insert in the output queue at the
90 * position determined by the input queue's popTicket_. We guarantee that
91 * all slots are filled (and therefore the queue doesn't freeze) because
92 * we require that each step produces exactly K outputs for every input.
94 template <class In, class... Stages> class MPMCPipeline {
95 typedef std::tuple<detail::PipelineStageInfo<Stages>...> StageInfos;
97 detail::MPMCPipelineStageImpl<In>,
98 detail::MPMCPipelineStageImpl<
99 typename detail::PipelineStageInfo<Stages>::value_type>...>
101 static constexpr size_t kAmplification =
102 detail::AmplificationProduct<StageInfos>::value;
106 * Ticket, returned by blockingReadStage, must be given back to
107 * blockingWriteStage. Tickets are not thread-safe.
109 template <size_t Stage>
113 CHECK_EQ(remainingUses_, 0) << "All tickets must be completely used!";
120 value_(0xdeadbeeffaceb00c) {
123 Ticket() noexcept : remainingUses_(0) { }
126 Ticket(Ticket&& other) noexcept
129 owner_(other.owner_),
131 remainingUses_(other.remainingUses_),
132 value_(other.value_) {
133 other.remainingUses_ = 0;
135 other.owner_ = nullptr;
136 other.value_ = 0xdeadbeeffaceb00c;
140 Ticket& operator=(Ticket&& other) noexcept {
141 if (this != &other) {
143 new (this) Ticket(std::move(other));
149 friend class MPMCPipeline;
151 MPMCPipeline* owner_;
153 size_t remainingUses_;
157 Ticket(MPMCPipeline* owner, size_t amplification, uint64_t value) noexcept
162 remainingUses_(amplification),
163 value_(value * amplification) {
164 (void)owner; // -Wunused-parameter
167 uint64_t use(MPMCPipeline* owner) {
168 CHECK_GT(remainingUses_--, 0);
170 CHECK(owner == owner_);
172 (void)owner; // -Wunused-parameter
179 * Default-construct pipeline. Useful to move-assign later,
180 * just like MPMCQueue, see MPMCQueue.h for more details.
182 MPMCPipeline() = default;
185 * Construct a pipeline with N+1 queue sizes.
187 template <class... Sizes>
188 explicit MPMCPipeline(Sizes... sizes) : stages_(sizes...) { }
191 * Push an element into (the first stage of) the pipeline. Blocking.
193 template <class... Args>
194 void blockingWrite(Args&&... args) {
195 std::get<0>(stages_).blockingWrite(std::forward<Args>(args)...);
199 * Try to push an element into (the first stage of) the pipeline.
202 template <class... Args>
203 bool write(Args&&... args) {
204 return std::get<0>(stages_).write(std::forward<Args>(args)...);
208 * Read an element for stage Stage and obtain a ticket. Blocking.
210 template <size_t Stage>
211 Ticket<Stage> blockingReadStage(
212 typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
213 return Ticket<Stage>(
215 std::tuple_element<Stage, StageInfos>::type::kAmplification,
216 std::get<Stage>(stages_).blockingRead(elem));
220 * Try to read an element for stage Stage and obtain a ticket.
223 template <size_t Stage>
225 Ticket<Stage>& ticket,
226 typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
228 if (!std::get<Stage>(stages_).readAndGetTicket(tval, elem)) {
231 ticket = Ticket<Stage>(
233 std::tuple_element<Stage, StageInfos>::type::kAmplification,
239 * Complete an element in stage Stage (pushing it for stage Stage+1).
242 template <size_t Stage, class... Args>
243 void blockingWriteStage(Ticket<Stage>& ticket, Args&&... args) {
244 std::get<Stage+1>(stages_).blockingWriteWithTicket(
246 std::forward<Args>(args)...);
250 * Pop an element from (the final stage of) the pipeline. Blocking.
253 typename std::tuple_element<
255 StageTuple>::type::value_type& elem) {
256 std::get<sizeof...(Stages)>(stages_).blockingRead(elem);
260 * Try to pop an element from (the final stage of) the pipeline.
264 typename std::tuple_element<
266 StageTuple>::type::value_type& elem) {
267 return std::get<sizeof...(Stages)>(stages_).read(elem);
271 * Estimate queue size, measured as values from the last stage.
272 * (so if the pipeline has an amplification factor > 1, pushing an element
273 * into the first stage will cause sizeGuess() to be == amplification factor)
274 * Elements "in flight" (currently processed as part of a stage, so not
275 * in any queue) are also counted.
277 ssize_t sizeGuess() const noexcept {
278 return (std::get<0>(stages_).writeCount() * kAmplification -
279 std::get<sizeof...(Stages)>(stages_).readCount());