Unbounded queue
[folly.git] / folly / concurrency / UnboundedQueue.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <atomic>
20 #include <chrono>
21 #include <memory>
22
23 #include <folly/concurrency/CacheLocality.h>
24
25 namespace folly {
26
27 /// UnboundedQueue supports a variety of options for unbounded
28 /// dynamically expanding an shrinking queues, including variations of:
29 /// - Single vs. multiple producers
30 /// - Single vs. multiple consumers
31 /// - Blocking vs. spin-waiting
32 /// - Non-waiting, timed, and waiting consumer operations.
33 /// Producer operations never wait or fail (unless out-of-memory).
34 ///
35 /// Template parameters:
36 /// - T: element type
37 /// - SingleProducer: true if there can be only one producer at a
38 ///   time.
39 /// - SingleConsumer: true if there can be only one consumer at a
40 ///   time.
41 /// - MayBlock: true if consumers may block, false if they only
42 ///   spins. A performance tuning parameter.
43 /// - LgSegmentSize (default 8): Log base 2 of number of elements per
44 ///   segment. A performance tuning parameter. See below.
45 ///
46 /// When to use UnboundedQueue:
47 /// - If a small bound may lead to deadlock or performance degradation
48 ///   under bursty patterns.
49 /// - If there is no risk of the queue growing too much.
50 ///
51 /// When not to use UnboundedQueue:
52 /// - If there is risk of the queue growing too much and a large bound
53 ///   is acceptable, then use DynamicBoundedQueue.
54 /// - If the queue must not allocate on enqueue or it must have a
55 ///   small bound, then use fixed-size MPMCQueue or (if non-blocking
56 ///   SPSC) ProducerConsumerQueue.
57 ///
58 /// Template Aliases:
59 ///   USPSCQueue<T, MayBlock, LgSegmentSize>
60 ///   UMPSCQueue<T, MayBlock, LgSegmentSize>
61 ///   USPMCQueue<T, MayBlock, LgSegmentSize>
62 ///   UMPMCQueue<T, MayBlock, LgSegmentSize>
63 ///
64 /// Functions:
65 ///   Producer operations never wait or fail (unless OOM)
66 ///     void enqueue(const T&);
67 ///     void enqueue(T&&);
68 ///         Adds an element to the end of the queue.
69 ///
70 ///   Consumer operations:
71 ///     void dequeue(T&);
72 ///         Extracts an element from the front of the queue. Waits
73 ///         until an element is available if needed.
74 ///     bool try_dequeue(T&);
75 ///         Tries to extracts an element from the front of the queue
76 ///         if available. Returns true if successful, false otherwise.
77 ///     bool try_dequeue_until(T&, time_point& deadline);
78 ///         Tries to extracts an element from the front of the queue
79 ///         if available until the specified deadline.  Returns true
80 ///         if successful, false otherwise.
81 ///     bool try_dequeue_for(T&, duration&);
82 ///         Tries to extracts an element from the front of the queue
83 ///         if available for for the specified duration.  Returns true
84 ///         if successful, false otherwise.
85 ///
86 ///   Secondary functions:
87 ///     size_t size();
88 ///         Returns an estimate of the size of the queue.
89 ///     bool empty();
90 ///         Returns true only if the queue was empty during the call.
91 ///     Note: size() and empty() are guaranteed to be accurate only if
92 ///     the queue is not changed concurrently.
93 ///
94 /// Usage examples:
95 /// @code
96 ///   /* UMPSC, doesn't block, 1024 int elements per segment */
97 ///   UMPSCQueue<int, false, 10> q;
98 ///   q.enqueue(1);
99 ///   q.enqueue(2);
100 ///   q.enqueue(3);
101 ///   ASSERT_FALSE(q.empty());
102 ///   ASSERT_EQ(q.size(), 3);
103 ///   int v;
104 ///   q.dequeue(v);
105 ///   ASSERT_EQ(v, 1);
106 ///   ASSERT_TRUE(try_dequeue(v));
107 ///   ASSERT_EQ(v, 2);
108 ///   ASSERT_TRUE(try_dequeue_until(v, now() + seconds(1)));
109 ///   ASSERT_EQ(v, 3);
110 ///   ASSERT_TRUE(q.empty());
111 ///   ASSERT_EQ(q.size(), 0);
112 ///   ASSERT_FALSE(try_dequeue(v));
113 ///   ASSERT_FALSE(try_dequeue_for(v, microseconds(100)));
114 /// @endcode
115 ///
116 /// Design:
117 /// - The queue is composed of one or more segments. Each segment has
118 ///   a fixed size of 2^LgSegmentSize entries. Each segment is used
119 ///   exactly once.
120 /// - Each entry is composed of a futex and a single element.
121 /// - The queue contains two 64-bit ticket variables. The producer
122 ///   ticket counts the number of producer tickets isued so far, and
123 ///   the same for the consumer ticket. Each ticket number corresponds
124 ///   to a specific entry in a specific segment.
125 /// - The queue maintains two pointers, head and tail. Head points to
126 ///   the segment that corresponds to the current consumer
127 ///   ticket. Similarly, tail pointer points to the segment that
128 ///   corresponds to the producer ticket.
129 /// - Segments are organized as a singly linked list.
130 /// - The producer with the first ticket in the current producer
131 ///   segment is solely responsible for allocating and linking the
132 ///   next segment.
133 /// - The producer with the last ticket in the current producer
134 ///   segment is solely responsible for advancing the tail pointer to
135 ///   the next segment.
136 /// - Similarly, the consumer with the last ticket in the current
137 ///   consumer segment is solely responsible for advancing the head
138 ///   pointer to the next segment. It must ensure that head never
139 ///   overtakes tail.
140 ///
141 /// Memory Usage:
142 /// - An empty queue contains one segment. A nonempty queue contains
143 ///   one or two more segment than fits its contents.
144 /// - Removed segments are not reclaimed until there are no threads,
145 ///   producers or consumers, have references to them or their
146 ///   predessors. That is, a lagging thread may delay the reclamation
147 ///   of a chain of removed segments.
148 ///
149 /// Performance considerations:
150 /// - All operations take constant time, excluding the costs of
151 ///   allocation, reclamation, interence from other threads, and
152 ///   waiting for actions by other threads.
153 /// - In general, using the single producer and or single consumer
154 ///   variants yields better performance than the MP and MC
155 ///   alternatives.
156 /// - SPSC without blocking is the fastest configuration. It doesn't
157 ///   include any read-modify-write atomic operations, full fences, or
158 ///   system calls in the critical path.
159 /// - MP adds a fetch_add to the critical path of each producer operation.
160 /// - MC adds a fetch_add or compare_exchange to the critical path of
161 ///   each consumer operation.
162 /// - The possibility of consumers blocking, even if they never do,
163 ///   adds a compare_exchange to the crtical path of each producer
164 ///   operation.
165 /// - MPMC, SPMC, MPSC require the use of a deferred reclamation
166 ///   mechanism to guarantee that segments removed from the linked
167 ///   list, i.e., unreachable from the head pointer, are reclaimed
168 ///   only after they are no longer needed by any lagging producers or
169 ///   consumers.
170 /// - The overheads of segment allocation and reclamation are intended
171 ///   to be mostly out of the critical path of the queue's throughput.
172 /// - If the template parameter LgSegmentSize is changed, it should be
173 ///   set adequately high to keep the amortized cost of allocation and
174 ///   reclamation low.
175 /// - Another consideration is that the queue is guaranteed to have
176 ///   enough space for a number of consumers equal to 2^LgSegmentSize
177 ///   for local blocking. Excess waiting consumers spin.
178 /// - It is recommended to measure perforamnce with different variants
179 ///   when applicable, e.g., UMPMC vs UMPSC. Depending on the use
180 ///   case, sometimes the variant with the higher sequential overhead
181 ///   may yield better results due to, for example, more favorable
182 ///   producer-consumer balance or favorable timining for avoiding
183 ///   costly blocking.
184
185 template <
186     typename T,
187     bool SingleProducer,
188     bool SingleConsumer,
189     bool MayBlock,
190     size_t LgSegmentSize = 8,
191     template <typename> class Atom = std::atomic>
192 class UnboundedQueue {
193   using Ticket = uint64_t;
194   class Entry;
195   class Segment;
196
197   static constexpr bool SPSC = SingleProducer && SingleConsumer;
198   static constexpr size_t SegmentSize = 1 << LgSegmentSize;
199   static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27;
200
201   static_assert(
202       std::is_nothrow_destructible<T>::value,
203       "T must be nothrow_destructible");
204   static_assert((Stride & 1) == 1, "Stride must be odd");
205   static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32");
206
207   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
208   Atom<Segment*> head_;
209   Atom<Ticket> consumerTicket_;
210   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
211   Atom<Segment*> tail_;
212   Atom<Ticket> producerTicket_;
213
214  public:
215   UnboundedQueue();
216   ~UnboundedQueue();
217
218   /** enqueue */
219   FOLLY_ALWAYS_INLINE void enqueue(const T& arg) {
220     enqueueImpl(arg);
221   }
222
223   FOLLY_ALWAYS_INLINE void enqueue(T&& arg) {
224     enqueueImpl(std::move(arg));
225   }
226
227   /** dequeue */
228   void dequeue(T& item) noexcept;
229
230   /** try_dequeue */
231   bool try_dequeue(T& item) noexcept {
232     return try_dequeue_until(
233         item, std::chrono::steady_clock::time_point::min());
234   }
235
236   /** try_dequeue_until */
237   template <typename Clock, typename Duration>
238   bool try_dequeue_until(
239       T& item,
240       const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
241
242   /** try_dequeue_for */
243   template <typename Rep, typename Period>
244   bool try_dequeue_for(
245       T& item,
246       const std::chrono::duration<Rep, Period>& duration) noexcept {
247     return try_dequeue_until(item, std::chrono::steady_clock::now() + duration);
248   }
249
250   /** size */
251   size_t size() const noexcept {
252     auto p = producerTicket();
253     auto c = consumerTicket();
254     return p > c ? p - c : 0;
255   }
256
257   /** empty */
258   bool empty() const noexcept {
259     auto c = consumerTicket();
260     auto p = producerTicket();
261     return p <= c;
262   }
263
264  private:
265   template <typename Arg>
266   void enqueueImpl(Arg&& arg);
267
268   template <typename Arg>
269   void enqueueCommon(Segment* s, Arg&& arg);
270
271   void dequeueCommon(Segment* s, T& item) noexcept;
272
273   template <typename Clock, typename Duration>
274   bool singleConsumerTryDequeueUntil(
275       Segment* s,
276       T& item,
277       const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
278
279   template <typename Clock, typename Duration>
280   bool multiConsumerTryDequeueUntil(
281       Segment* s,
282       T& item,
283       const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
284
285   Segment* findSegment(Segment* s, const Ticket t) const noexcept;
286
287   void allocNextSegment(Segment* s, const Ticket t);
288
289   void advanceTail(Segment* s) noexcept;
290
291   void advanceHead(Segment* s) noexcept;
292
293   FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept {
294     return (t * Stride) & (SegmentSize - 1);
295   }
296
297   FOLLY_ALWAYS_INLINE bool responsibleForAlloc(Ticket t) const noexcept {
298     return (t & (SegmentSize - 1)) == 0;
299   }
300
301   FOLLY_ALWAYS_INLINE bool responsibleForAdvance(Ticket t) const noexcept {
302     return (t & (SegmentSize - 1)) == (SegmentSize - 1);
303   }
304
305   FOLLY_ALWAYS_INLINE Segment* head() const noexcept {
306     return head_.load(std::memory_order_acquire);
307   }
308
309   FOLLY_ALWAYS_INLINE Segment* tail() const noexcept {
310     return tail_.load(std::memory_order_acquire);
311   }
312
313   FOLLY_ALWAYS_INLINE Ticket producerTicket() const noexcept {
314     return producerTicket_.load(std::memory_order_acquire);
315   }
316
317   FOLLY_ALWAYS_INLINE Ticket consumerTicket() const noexcept {
318     return consumerTicket_.load(std::memory_order_acquire);
319   }
320
321   void setHead(Segment* s) noexcept {
322     head_.store(s, std::memory_order_release);
323   }
324
325   void setTail(Segment* s) noexcept {
326     tail_.store(s, std::memory_order_release);
327   }
328
329   FOLLY_ALWAYS_INLINE void setProducerTicket(Ticket t) noexcept {
330     producerTicket_.store(t, std::memory_order_release);
331   }
332
333   FOLLY_ALWAYS_INLINE void setConsumerTicket(Ticket t) noexcept {
334     consumerTicket_.store(t, std::memory_order_release);
335   }
336
337   FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept {
338     if (SingleConsumer) {
339       auto oldval = consumerTicket();
340       setConsumerTicket(oldval + 1);
341       return oldval;
342     } else { // MC
343       return consumerTicket_.fetch_add(1, std::memory_order_acq_rel);
344     }
345   }
346
347   FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept {
348     if (SingleProducer) {
349       auto oldval = producerTicket();
350       setProducerTicket(oldval + 1);
351       return oldval;
352     } else { // MP
353       return producerTicket_.fetch_add(1, std::memory_order_acq_rel);
354     }
355   }
356 }; // UnboundedQueue
357
358 /* Aliases */
359
360 template <
361     typename T,
362     bool MayBlock,
363     size_t LgSegmentSize = 8,
364     template <typename> class Atom = std::atomic>
365 using USPSCQueue = UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, Atom>;
366
367 template <
368     typename T,
369     bool MayBlock,
370     size_t LgSegmentSize = 8,
371     template <typename> class Atom = std::atomic>
372 using UMPSCQueue =
373     UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, Atom>;
374
375 template <
376     typename T,
377     bool MayBlock,
378     size_t LgSegmentSize = 8,
379     template <typename> class Atom = std::atomic>
380 using USPMCQueue =
381     UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, Atom>;
382
383 template <
384     typename T,
385     bool MayBlock,
386     size_t LgSegmentSize = 8,
387     template <typename> class Atom = std::atomic>
388 using UMPMCQueue =
389     UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, Atom>;
390
391 } // namespace folly
392
393 #include <folly/concurrency/UnboundedQueue-inl.h>