Special-case /dev/null in open in the portability header
[folly.git] / folly / MPMCPipeline.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <utility>
20
21 #include <glog/logging.h>
22
23 #include <folly/detail/MPMCPipelineDetail.h>
24
25 namespace folly {
26
27 /**
28  * Helper tag template to use amplification > 1
29  */
30 template <class T, size_t Amp> class MPMCPipelineStage;
31
32 /**
33  * Multi-Producer, Multi-Consumer pipeline.
34  *
35  * A N-stage pipeline is a combination of N+1 MPMC queues (see MPMCQueue.h).
36  *
37  * At each stage, you may dequeue the results from the previous stage (possibly
38  * from multiple threads) and enqueue results to the next stage. Regardless of
39  * the order of completion, data is delivered to the next stage in the original
40  * order.  Each input is matched with a "ticket" which must be produced
41  * when enqueueing to the next stage.
42  *
43  * A given stage must produce exactly K ("amplification factor", default K=1)
44  * results for every input. This is enforced by requiring that each ticket
45  * is used exactly K times.
46  *
47  * Usage:
48  *
49  * // arguments are queue sizes
50  * MPMCPipeline<int, std::string, int> pipeline(10, 10, 10);
51  *
52  * pipeline.blockingWrite(42);
53  *
54  * {
55  *   int val;
56  *   auto ticket = pipeline.blockingReadStage<0>(val);
57  *   pipeline.blockingWriteStage<0>(ticket, folly::to<std::string>(val));
58  * }
59  *
60  * {
61  *   std::string val;
62  *   auto ticket = pipeline.blockingReadStage<1>(val);
63  *   int ival = 0;
64  *   try {
65  *     ival = folly::to<int>(val);
66  *   } catch (...) {
67  *     // We must produce exactly 1 output even on exception!
68  *   }
69  *   pipeline.blockingWriteStage<1>(ticket, ival);
70  * }
71  *
72  * int result;
73  * pipeline.blockingRead(result);
74  * // result == 42
75  *
76  * To specify amplification factors greater than 1, use
77  * MPMCPipelineStage<T, amplification> instead of T in the declaration:
78  *
79  * MPMCPipeline<int,
80  *              MPMCPipelineStage<std::string, 2>,
81  *              MPMCPipelineStage<int, 4>>
82  *
83  * declares a two-stage pipeline: the first stage produces 2 strings
84  * for each input int, the second stage produces 4 ints for each input string,
85  * so, overall, the pipeline produces 2*4 = 8 ints for each input int.
86  *
87  * Implementation details: we use N+1 MPMCQueue objects; each intermediate
88  * queue connects two adjacent stages.  The MPMCQueue implementation is abused;
89  * instead of using it as a queue, we insert in the output queue at the
90  * position determined by the input queue's popTicket_.  We guarantee that
91  * all slots are filled (and therefore the queue doesn't freeze) because
92  * we require that each step produces exactly K outputs for every input.
93  */
94 template <class In, class... Stages> class MPMCPipeline {
95   typedef std::tuple<detail::PipelineStageInfo<Stages>...> StageInfos;
96   typedef std::tuple<
97              detail::MPMCPipelineStageImpl<In>,
98              detail::MPMCPipelineStageImpl<
99                  typename detail::PipelineStageInfo<Stages>::value_type>...>
100     StageTuple;
101   static constexpr size_t kAmplification =
102     detail::AmplificationProduct<StageInfos>::value;
103
104  public:
105   /**
106    * Ticket, returned by blockingReadStage, must be given back to
107    * blockingWriteStage. Tickets are not thread-safe.
108    */
109   template <size_t Stage>
110   class Ticket {
111    public:
112     ~Ticket() noexcept {
113       CHECK_EQ(remainingUses_, 0) << "All tickets must be completely used!";
114     }
115
116 #ifndef NDEBUG
117     Ticket() noexcept
118       : owner_(nullptr),
119         remainingUses_(0),
120         value_(0xdeadbeeffaceb00c) {
121     }
122 #else
123     Ticket() noexcept : remainingUses_(0) { }
124 #endif
125
126     Ticket(Ticket&& other) noexcept
127       :
128 #ifndef NDEBUG
129         owner_(other.owner_),
130 #endif
131         remainingUses_(other.remainingUses_),
132         value_(other.value_) {
133       other.remainingUses_ = 0;
134 #ifndef NDEBUG
135       other.owner_ = nullptr;
136       other.value_ = 0xdeadbeeffaceb00c;
137 #endif
138     }
139
140     Ticket& operator=(Ticket&& other) noexcept {
141       if (this != &other) {
142         this->~Ticket();
143         new (this) Ticket(std::move(other));
144       }
145       return *this;
146     }
147
148    private:
149     friend class MPMCPipeline;
150 #ifndef NDEBUG
151     MPMCPipeline* owner_;
152 #endif
153     size_t remainingUses_;
154     uint64_t value_;
155
156
157     Ticket(MPMCPipeline* owner, size_t amplification, uint64_t value) noexcept
158       :
159 #ifndef NDEBUG
160         owner_(owner),
161 #endif
162         remainingUses_(amplification),
163         value_(value * amplification) {
164       (void)owner; // -Wunused-parameter
165     }
166
167     uint64_t use(MPMCPipeline* owner) {
168       CHECK_GT(remainingUses_--, 0);
169 #ifndef NDEBUG
170       CHECK(owner == owner_);
171 #else
172       (void)owner; // -Wunused-parameter
173 #endif
174       return value_++;
175     }
176   };
177
178   /**
179    * Default-construct pipeline. Useful to move-assign later,
180    * just like MPMCQueue, see MPMCQueue.h for more details.
181    */
182   MPMCPipeline() = default;
183
184   /**
185    * Construct a pipeline with N+1 queue sizes.
186    */
187   template <class... Sizes>
188   explicit MPMCPipeline(Sizes... sizes) : stages_(sizes...) { }
189
190   /**
191    * Push an element into (the first stage of) the pipeline. Blocking.
192    */
193   template <class... Args>
194   void blockingWrite(Args&&... args) {
195     std::get<0>(stages_).blockingWrite(std::forward<Args>(args)...);
196   }
197
198   /**
199    * Try to push an element into (the first stage of) the pipeline.
200    * Non-blocking.
201    */
202   template <class... Args>
203   bool write(Args&&... args) {
204     return std::get<0>(stages_).write(std::forward<Args>(args)...);
205   }
206
207   /**
208    * Read an element for stage Stage and obtain a ticket. Blocking.
209    */
210   template <size_t Stage>
211   Ticket<Stage> blockingReadStage(
212       typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
213     return Ticket<Stage>(
214         this,
215         std::tuple_element<Stage, StageInfos>::type::kAmplification,
216         std::get<Stage>(stages_).blockingRead(elem));
217   }
218
219   /**
220    * Try to read an element for stage Stage and obtain a ticket.
221    * Non-blocking.
222    */
223   template <size_t Stage>
224   bool readStage(
225       Ticket<Stage>& ticket,
226       typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
227     uint64_t tval;
228     if (!std::get<Stage>(stages_).readAndGetTicket(tval, elem)) {
229       return false;
230     }
231     ticket = Ticket<Stage>(
232         this,
233         std::tuple_element<Stage, StageInfos>::type::kAmplification,
234         tval);
235     return true;
236   }
237
238   /**
239    * Complete an element in stage Stage (pushing it for stage Stage+1).
240    * Blocking.
241    */
242   template <size_t Stage, class... Args>
243   void blockingWriteStage(Ticket<Stage>& ticket, Args&&... args) {
244     std::get<Stage+1>(stages_).blockingWriteWithTicket(
245         ticket.use(this),
246         std::forward<Args>(args)...);
247   }
248
249   /**
250    * Pop an element from (the final stage of) the pipeline. Blocking.
251    */
252   void blockingRead(
253       typename std::tuple_element<
254           sizeof...(Stages),
255           StageTuple>::type::value_type& elem) {
256     std::get<sizeof...(Stages)>(stages_).blockingRead(elem);
257   }
258
259   /**
260    * Try to pop an element from (the final stage of) the pipeline.
261    * Non-blocking.
262    */
263   bool read(
264       typename std::tuple_element<
265           sizeof...(Stages),
266           StageTuple>::type::value_type& elem) {
267     return std::get<sizeof...(Stages)>(stages_).read(elem);
268   }
269
270   /**
271    * Estimate queue size, measured as values from the last stage.
272    * (so if the pipeline has an amplification factor > 1, pushing an element
273    * into the first stage will cause sizeGuess() to be == amplification factor)
274    * Elements "in flight" (currently processed as part of a stage, so not
275    * in any queue) are also counted.
276    */
277   ssize_t sizeGuess() const noexcept {
278     return (std::get<0>(stages_).writeCount() * kAmplification -
279             std::get<sizeof...(Stages)>(stages_).readCount());
280   }
281
282  private:
283   StageTuple stages_;
284 };
285
286
287 }  // namespaces