2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
23 #include <folly/concurrency/CacheLocality.h>
27 /// UnboundedQueue supports a variety of options for unbounded
28 /// dynamically expanding an shrinking queues, including variations of:
29 /// - Single vs. multiple producers
30 /// - Single vs. multiple consumers
31 /// - Blocking vs. spin-waiting
32 /// - Non-waiting, timed, and waiting consumer operations.
33 /// Producer operations never wait or fail (unless out-of-memory).
35 /// Template parameters:
37 /// - SingleProducer: true if there can be only one producer at a
39 /// - SingleConsumer: true if there can be only one consumer at a
41 /// - MayBlock: true if consumers may block, false if they only
42 /// spins. A performance tuning parameter.
43 /// - LgSegmentSize (default 8): Log base 2 of number of elements per
44 /// segment. A performance tuning parameter. See below.
46 /// When to use UnboundedQueue:
47 /// - If a small bound may lead to deadlock or performance degradation
48 /// under bursty patterns.
49 /// - If there is no risk of the queue growing too much.
51 /// When not to use UnboundedQueue:
52 /// - If there is risk of the queue growing too much and a large bound
53 /// is acceptable, then use DynamicBoundedQueue.
54 /// - If the queue must not allocate on enqueue or it must have a
55 /// small bound, then use fixed-size MPMCQueue or (if non-blocking
56 /// SPSC) ProducerConsumerQueue.
59 /// USPSCQueue<T, MayBlock, LgSegmentSize>
60 /// UMPSCQueue<T, MayBlock, LgSegmentSize>
61 /// USPMCQueue<T, MayBlock, LgSegmentSize>
62 /// UMPMCQueue<T, MayBlock, LgSegmentSize>
65 /// Producer operations never wait or fail (unless OOM)
66 /// void enqueue(const T&);
67 /// void enqueue(T&&);
68 /// Adds an element to the end of the queue.
70 /// Consumer operations:
72 /// Extracts an element from the front of the queue. Waits
73 /// until an element is available if needed.
74 /// bool try_dequeue(T&);
75 /// Tries to extracts an element from the front of the queue
76 /// if available. Returns true if successful, false otherwise.
77 /// bool try_dequeue_until(T&, time_point& deadline);
78 /// Tries to extracts an element from the front of the queue
79 /// if available until the specified deadline. Returns true
80 /// if successful, false otherwise.
81 /// bool try_dequeue_for(T&, duration&);
82 /// Tries to extracts an element from the front of the queue
83 /// if available for for the specified duration. Returns true
84 /// if successful, false otherwise.
86 /// Secondary functions:
88 /// Returns an estimate of the size of the queue.
90 /// Returns true only if the queue was empty during the call.
91 /// Note: size() and empty() are guaranteed to be accurate only if
92 /// the queue is not changed concurrently.
96 /// /* UMPSC, doesn't block, 1024 int elements per segment */
97 /// UMPSCQueue<int, false, 10> q;
101 /// ASSERT_FALSE(q.empty());
102 /// ASSERT_EQ(q.size(), 3);
106 /// ASSERT_TRUE(try_dequeue(v));
108 /// ASSERT_TRUE(try_dequeue_until(v, now() + seconds(1)));
110 /// ASSERT_TRUE(q.empty());
111 /// ASSERT_EQ(q.size(), 0);
112 /// ASSERT_FALSE(try_dequeue(v));
113 /// ASSERT_FALSE(try_dequeue_for(v, microseconds(100)));
117 /// - The queue is composed of one or more segments. Each segment has
118 /// a fixed size of 2^LgSegmentSize entries. Each segment is used
120 /// - Each entry is composed of a futex and a single element.
121 /// - The queue contains two 64-bit ticket variables. The producer
122 /// ticket counts the number of producer tickets isued so far, and
123 /// the same for the consumer ticket. Each ticket number corresponds
124 /// to a specific entry in a specific segment.
125 /// - The queue maintains two pointers, head and tail. Head points to
126 /// the segment that corresponds to the current consumer
127 /// ticket. Similarly, tail pointer points to the segment that
128 /// corresponds to the producer ticket.
129 /// - Segments are organized as a singly linked list.
130 /// - The producer with the first ticket in the current producer
131 /// segment is solely responsible for allocating and linking the
133 /// - The producer with the last ticket in the current producer
134 /// segment is solely responsible for advancing the tail pointer to
135 /// the next segment.
136 /// - Similarly, the consumer with the last ticket in the current
137 /// consumer segment is solely responsible for advancing the head
138 /// pointer to the next segment. It must ensure that head never
142 /// - An empty queue contains one segment. A nonempty queue contains
143 /// one or two more segment than fits its contents.
144 /// - Removed segments are not reclaimed until there are no threads,
145 /// producers or consumers, have references to them or their
146 /// predessors. That is, a lagging thread may delay the reclamation
147 /// of a chain of removed segments.
149 /// Performance considerations:
150 /// - All operations take constant time, excluding the costs of
151 /// allocation, reclamation, interence from other threads, and
152 /// waiting for actions by other threads.
153 /// - In general, using the single producer and or single consumer
154 /// variants yields better performance than the MP and MC
156 /// - SPSC without blocking is the fastest configuration. It doesn't
157 /// include any read-modify-write atomic operations, full fences, or
158 /// system calls in the critical path.
159 /// - MP adds a fetch_add to the critical path of each producer operation.
160 /// - MC adds a fetch_add or compare_exchange to the critical path of
161 /// each consumer operation.
162 /// - The possibility of consumers blocking, even if they never do,
163 /// adds a compare_exchange to the crtical path of each producer
165 /// - MPMC, SPMC, MPSC require the use of a deferred reclamation
166 /// mechanism to guarantee that segments removed from the linked
167 /// list, i.e., unreachable from the head pointer, are reclaimed
168 /// only after they are no longer needed by any lagging producers or
170 /// - The overheads of segment allocation and reclamation are intended
171 /// to be mostly out of the critical path of the queue's throughput.
172 /// - If the template parameter LgSegmentSize is changed, it should be
173 /// set adequately high to keep the amortized cost of allocation and
175 /// - Another consideration is that the queue is guaranteed to have
176 /// enough space for a number of consumers equal to 2^LgSegmentSize
177 /// for local blocking. Excess waiting consumers spin.
178 /// - It is recommended to measure perforamnce with different variants
179 /// when applicable, e.g., UMPMC vs UMPSC. Depending on the use
180 /// case, sometimes the variant with the higher sequential overhead
181 /// may yield better results due to, for example, more favorable
182 /// producer-consumer balance or favorable timining for avoiding
190 size_t LgSegmentSize = 8,
191 template <typename> class Atom = std::atomic>
192 class UnboundedQueue {
193 using Ticket = uint64_t;
197 static constexpr bool SPSC = SingleProducer && SingleConsumer;
198 static constexpr size_t SegmentSize = 1 << LgSegmentSize;
199 static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27;
202 std::is_nothrow_destructible<T>::value,
203 "T must be nothrow_destructible");
204 static_assert((Stride & 1) == 1, "Stride must be odd");
205 static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32");
207 FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
208 Atom<Segment*> head_;
209 Atom<Ticket> consumerTicket_;
210 FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
211 Atom<Segment*> tail_;
212 Atom<Ticket> producerTicket_;
219 FOLLY_ALWAYS_INLINE void enqueue(const T& arg) {
223 FOLLY_ALWAYS_INLINE void enqueue(T&& arg) {
224 enqueueImpl(std::move(arg));
228 void dequeue(T& item) noexcept;
231 bool try_dequeue(T& item) noexcept {
232 return try_dequeue_until(
233 item, std::chrono::steady_clock::time_point::min());
236 /** try_dequeue_until */
237 template <typename Clock, typename Duration>
238 bool try_dequeue_until(
240 const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
242 /** try_dequeue_for */
243 template <typename Rep, typename Period>
244 bool try_dequeue_for(
246 const std::chrono::duration<Rep, Period>& duration) noexcept {
247 return try_dequeue_until(item, std::chrono::steady_clock::now() + duration);
251 size_t size() const noexcept {
252 auto p = producerTicket();
253 auto c = consumerTicket();
254 return p > c ? p - c : 0;
258 bool empty() const noexcept {
259 auto c = consumerTicket();
260 auto p = producerTicket();
265 template <typename Arg>
266 void enqueueImpl(Arg&& arg);
268 template <typename Arg>
269 void enqueueCommon(Segment* s, Arg&& arg);
271 void dequeueCommon(Segment* s, T& item) noexcept;
273 template <typename Clock, typename Duration>
274 bool singleConsumerTryDequeueUntil(
277 const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
279 template <typename Clock, typename Duration>
280 bool multiConsumerTryDequeueUntil(
283 const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
285 Segment* findSegment(Segment* s, const Ticket t) const noexcept;
287 void allocNextSegment(Segment* s, const Ticket t);
289 void advanceTail(Segment* s) noexcept;
291 void advanceHead(Segment* s) noexcept;
293 FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept {
294 return (t * Stride) & (SegmentSize - 1);
297 FOLLY_ALWAYS_INLINE bool responsibleForAlloc(Ticket t) const noexcept {
298 return (t & (SegmentSize - 1)) == 0;
301 FOLLY_ALWAYS_INLINE bool responsibleForAdvance(Ticket t) const noexcept {
302 return (t & (SegmentSize - 1)) == (SegmentSize - 1);
305 FOLLY_ALWAYS_INLINE Segment* head() const noexcept {
306 return head_.load(std::memory_order_acquire);
309 FOLLY_ALWAYS_INLINE Segment* tail() const noexcept {
310 return tail_.load(std::memory_order_acquire);
313 FOLLY_ALWAYS_INLINE Ticket producerTicket() const noexcept {
314 return producerTicket_.load(std::memory_order_acquire);
317 FOLLY_ALWAYS_INLINE Ticket consumerTicket() const noexcept {
318 return consumerTicket_.load(std::memory_order_acquire);
321 void setHead(Segment* s) noexcept {
322 head_.store(s, std::memory_order_release);
325 void setTail(Segment* s) noexcept {
326 tail_.store(s, std::memory_order_release);
329 FOLLY_ALWAYS_INLINE void setProducerTicket(Ticket t) noexcept {
330 producerTicket_.store(t, std::memory_order_release);
333 FOLLY_ALWAYS_INLINE void setConsumerTicket(Ticket t) noexcept {
334 consumerTicket_.store(t, std::memory_order_release);
337 FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept {
338 if (SingleConsumer) {
339 auto oldval = consumerTicket();
340 setConsumerTicket(oldval + 1);
343 return consumerTicket_.fetch_add(1, std::memory_order_acq_rel);
347 FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept {
348 if (SingleProducer) {
349 auto oldval = producerTicket();
350 setProducerTicket(oldval + 1);
353 return producerTicket_.fetch_add(1, std::memory_order_acq_rel);
363 size_t LgSegmentSize = 8,
364 template <typename> class Atom = std::atomic>
365 using USPSCQueue = UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, Atom>;
370 size_t LgSegmentSize = 8,
371 template <typename> class Atom = std::atomic>
373 UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, Atom>;
378 size_t LgSegmentSize = 8,
379 template <typename> class Atom = std::atomic>
381 UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, Atom>;
386 size_t LgSegmentSize = 8,
387 template <typename> class Atom = std::atomic>
389 UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, Atom>;
393 #include <folly/concurrency/UnboundedQueue-inl.h>