2 * Copyright 2017-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/concurrency/CacheLocality.h>
20 #include <folly/concurrency/UnboundedQueue.h>
22 #include <glog/logging.h>
29 /// DynamicBoundedQueue supports:
31 /// - Dynamic memory usage that grows and shrink in proportion to the
32 /// number of elements in the queue.
33 /// - Adjustable capacity that helps throttle pathological cases of
34 /// producer-consumer imbalance that may lead to excessive memory
36 /// - The adjustable capacity can also help prevent deadlock by
37 /// allowing users to temporarily increase capacity substantially to
38 /// guarantee accommodating producer requests that cannot wait.
39 /// - SPSC, SPMC, MPSC, MPMC variants.
40 /// - Blocking and spinning-only variants.
41 /// - Inter-operable non-waiting, timed until, timed for, and waiting
42 /// variants of producer and consumer operations.
43 /// - Optional variable element weights.
46 /// - Queue elements may have variable weights (calculated using a
47 /// template parameter) that are by default 1.
48 /// - Element weights count towards the queue's capacity.
49 /// - Elements weights are not priorities and do not affect element
50 /// order. Queues with variable element weights follow FIFO order,
51 /// the same as default queues.
53 /// When to use DynamicBoundedQueue:
54 /// - If a small maximum capacity may lead to deadlock or performance
55 /// degradation under bursty patterns and a larger capacity is
57 /// - If the typical queue size is expected to be much lower than the
59 /// - If an unbounded queue is susceptible to growing too much.
60 /// - If support for variable element weights is needed.
62 /// When not to use DynamicBoundedQueue?
63 /// - If dynamic memory allocation is unacceptable or if the maximum
64 /// capacity needs to be small, then use fixed-size MPMCQueue or (if
65 /// non-blocking SPSC) ProducerConsumerQueue.
66 /// - If there is no risk of the queue growing too much, then use
70 /// - The general rule is to set the capacity as high as acceptable.
71 /// The queue performs best when it is not near full capacity.
72 /// - The implementation may allow extra slack in capacity (~10%) for
73 /// amortizing some costly steps. Therefore, precise capacity is not
74 /// guaranteed and cannot be relied on for synchronization; i.e.,
75 /// this queue cannot be used as a semaphore.
77 /// Performance expectations:
78 /// - As long as the queue size is below capacity in the common case,
79 /// performance is comparable to MPMCQueue and better in cases of
80 /// higher producer demand.
81 /// - Performance degrades gracefully at full capacity.
82 /// - It is recommended to measure performance with different variants
83 /// when applicable, e.g., DMPMC vs DMPSC. Depending on the use
84 /// case, sometimes the variant with the higher sequential overhead
85 /// may yield better results due to, for example, more favorable
86 /// producer-consumer balance or favorable timing for avoiding
88 /// - See DynamicBoundedQueueTest.cpp for some benchmark results.
90 /// Template prameters:
92 /// - SingleProducer: true if there can be only one producer at a
94 /// - SingleConsumer: true if there can be only one consumer at a
96 /// - MayBlock: true if producers or consumers may block.
97 /// - LgSegmentSize (default 8): Log base 2 of number of elements per
98 /// UnboundedQueue segment.
99 /// - LgAlign (default 7): Log base 2 of alignment directive; can be
100 /// used to balance scalability (avoidance of false sharing) with
101 /// memory efficiency.
102 /// - WeightFn (DefaultWeightFn<T>): A customizable weight computing type
103 /// for computing the weights of elements. The default weight is 1.
105 /// Template Aliases:
106 /// DSPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
107 /// DMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
108 /// DSPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
109 /// DMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
113 /// Takes a capacity value as an argument.
115 /// Producer functions:
116 /// void enqueue(const T&);
117 /// void enqueue(T&&);
118 /// Adds an element to the end of the queue. Waits until
119 /// capacity is available if necessary.
120 /// bool try_enqueue(const T&);
121 /// bool try_enqueue(T&&);
122 /// Tries to add an element to the end of the queue if
123 /// capacity allows it. Returns true if successful. Otherwise
125 /// bool try_enqueue_until(const T&, time_point&);
126 /// bool try_enqueue_until(T&&, time_point&);
127 /// Tries to add an element to the end of the queue if
128 /// capacity allows it until the specified timepoint. Returns
129 /// true if successful, otherwise false.
130 /// bool try_enqueue_for(const T&, duration&);
131 /// bool try_enqueue_for(T&&, duration&);
132 /// Tries to add an element to the end of the queue if
133 /// capacity allows it until the specified timepoint. Returns
134 /// true if successful, otherwise false.
136 /// Consumer functions:
137 /// void dequeue(T&);
138 /// Extracts an element from the front of the queue. Waits
139 /// until an element is available if necessary.
140 /// bool try_dequeue(T&);
141 /// Tries to extracts an element from the front of the queue
142 /// if available. Returns true if successful, otherwise false.
143 /// bool try_dequeue_until(T&, time_point&);
144 /// Tries to extracts an element from the front of the queue
145 /// if available until the specified time_point. Returns true
146 /// if successful. Otherwise Returns false.
147 /// bool try_dequeue_for(T&, duration&);
148 /// Tries to extracts an element from the front of the queue
149 /// if available until the expiration of the specified
150 /// duration. Returns true if successful. Otherwise Returns
153 /// Secondary functions:
154 /// void reset_capacity(size_t capacity);
155 /// Changes the capacity of the queue. Does not affect the
156 /// current contents of the queue. Guaranteed only to affect
157 /// subsequent enqueue operations. May or may not affect
158 /// concurrent operations. Capacity must be at least 1000.
160 /// Returns an estimate of the total weight of the elements in
163 /// Returns an estimate of the total number of elements.
165 /// Returns true only if the queue was empty during the call.
166 /// Note: weight(), size(), and empty() are guaranteed to be
167 /// accurate only if there are no concurrent changes to the queue.
169 /// Usage example with default weight:
171 /// /* DMPSC, doesn't block, 1024 int elements per segment */
172 /// DMPSCQueue<int, false, 10> q(100000);
173 /// ASSERT_TRUE(q.empty());
174 /// ASSERT_EQ(q.size(), 0);
176 /// ASSERT_TRUE(q.try_enqueue(2));
177 /// ASSERT_TRUE(q.try_enqueue_until(3, deadline));
178 /// ASSERT_TRUE(q.try_enqueue(4, duration));
179 /// // ... enqueue more elements until capacity is full
180 /// // See above comments about imprecise capacity guarantees
181 /// ASSERT_FALSE(q.try_enqueue(100001)); // can't enqueue but can't wait
182 /// size_t sz = q.size();
183 /// ASSERT_GE(sz, 100000);
184 /// q.reset_capacity(1000000000); // set huge capacity
185 /// ASSERT_TRUE(q.try_enqueue(100001)); // now enqueue succeeds
186 /// q.reset_capacity(100000); // set capacity back to 100,000
187 /// ASSERT_FALSE(q.try_enqueue(100002));
188 /// ASSERT_EQ(q.size(), sz + 1);
192 /// ASSERT_TRUE(q.try_dequeue(v));
194 /// ASSERT_TRUE(q.try_dequeue_until(v, deadline));
196 /// ASSERT_TRUE(q.try_dequeue_for(v, duration));
198 /// ASSERT_EQ(q.size(), sz - 3);
201 /// Usage example with custom weights:
203 /// struct CustomWeightFn {
204 /// uint64_t operator()(int val) { return val / 100; }
206 /// DMPMCQueue<int, false, 10, CustomWeightFn> q(20);
207 /// ASSERT_TRUE(q.empty());
209 /// ASSERT_TRUE(q.try_enqueue(200));
210 /// ASSERT_TRUE(q.try_enqueue_until(500, now() + seconds(1)));
211 /// ASSERT_EQ(q.size(), 3);
212 /// ASSERT_EQ(q.weight(), 8);
213 /// ASSERT_FALSE(q.try_enqueue_for(1700, microseconds(1)));
214 /// q.reset_capacity(1000000); // set capacity to 1000000 instead of 20
215 /// ASSERT_TRUE(q.try_enqueue_for(1700, microseconds(1)));
216 /// q.reset_capacity(20); // set capacity to 20 again
217 /// ASSERT_FALSE(q.try_enqueue(100));
218 /// ASSERT_EQ(q.size(), 4);
219 /// ASSERT_EQ(q.weight(), 25);
222 /// ASSERT_EQ(v, 100);
223 /// ASSERT_TRUE(q.try_dequeue(v));
224 /// ASSERT_EQ(v, 200);
225 /// ASSERT_TRUE(q.try_dequeue_until(v, now() + seconds(1)));
226 /// ASSERT_EQ(v, 500);
227 /// ASSERT_EQ(q.size(), 1);
228 /// ASSERT_EQ(q.weight(), 17);
232 /// - The implementation is on top of UnboundedQueue.
233 /// - The main FIFO functionality is in UnboundedQueue.
234 /// DynamicBoundedQueue manages keeping the total queue weight
235 /// within the specified capacity.
236 /// - For the sake of scalability, the data structures are designed to
237 /// minimize interference between producers on one side and
238 /// consumers on the other.
239 /// - Producers add to a debit variable the weight of the added
240 /// element and check capacity.
241 /// - Consumers add to a credit variable the weight of the removed
243 /// - Producers, for the sake of scalability, use fetch_add to add to
244 /// the debit variable and subtract if it exceeded capacity,
245 /// rather than using compare_exchange to avoid overshooting.
246 /// - Consumers, infrequently, transfer credit to a transfer variable
247 /// and unblock any blocked producers. The transfer variable can be
248 /// used by producers to decrease their debit when needed.
249 /// - Note that a low capacity will trigger frequent credit transfer
250 /// by consumers that may degrade performance. Capacity should not
252 /// - Transfer of credit by consumers is triggered when the amount of
253 /// credit reaches a threshold (1/10 of capacity).
254 /// - The waiting of consumers is handled in UnboundedQueue.
255 /// The waiting of producers is handled in this template.
256 /// - For a producer operation, if the difference between debit and
257 /// capacity (plus some slack to account for the transfer threshold)
258 /// does not accommodate the weight of the new element, it first
259 /// tries to transfer credit that may have already been made
260 /// available by consumers. If this is insufficient and MayBlock is
261 /// true, then the producer uses a futex to block until new credit
262 /// is transferred by a consumer.
265 /// - Aside from three cache lines for managing capacity, the memory
266 /// for queue elements is managed using UnboundedQueue and grows and
267 /// shrinks dynamically with the number of elements.
268 /// - The template parameter LgAlign can be used to reduce memory usage
269 /// at the cost of increased chance of false sharing.
271 template <typename T>
272 struct DefaultWeightFn {
273 template <typename Arg>
274 uint64_t operator()(Arg&&) const noexcept {
284 size_t LgSegmentSize = 8,
286 typename WeightFn = DefaultWeightFn<T>,
287 template <typename> class Atom = std::atomic>
288 class DynamicBoundedQueue {
289 using Weight = uint64_t;
291 enum WaitingState : uint32_t {
296 static constexpr bool SPSC = SingleProducer && SingleConsumer;
297 static constexpr size_t Align = 1u << LgAlign;
299 static_assert(LgAlign < 16, "LgAlign must be < 16");
303 // Read mostly by producers
304 alignas(Align) Atom<Weight> debit_; // written frequently only by producers
305 Atom<Weight> capacity_; // written rarely by capacity resets
307 // Read mostly by consumers
308 alignas(Align) Atom<Weight> credit_; // written frequently only by consumers
309 Atom<Weight> threshold_; // written rarely only by capacity resets
311 // Normally written and read rarely by producers and consumers
312 // May be read frequently by producers when capacity is full
313 alignas(Align) Atom<Weight> transfer_;
314 detail::Futex<Atom> waiting_;
316 // Underlying unbounded queue
329 explicit DynamicBoundedQueue(Weight capacity)
331 capacity_(capacity + threshold(capacity)), // capacity slack
333 threshold_(threshold(capacity)),
337 ~DynamicBoundedQueue() {}
339 /// Enqueue functions
342 FOLLY_ALWAYS_INLINE void enqueue(const T& v) {
346 FOLLY_ALWAYS_INLINE void enqueue(T&& v) {
347 enqueueImpl(std::move(v));
351 FOLLY_ALWAYS_INLINE bool try_enqueue(const T& v) {
352 return tryEnqueueImpl(v);
355 FOLLY_ALWAYS_INLINE bool try_enqueue(T&& v) {
356 return tryEnqueueImpl(std::move(v));
359 /** try_enqueue_until */
360 template <typename Clock, typename Duration>
361 FOLLY_ALWAYS_INLINE bool try_enqueue_until(
363 const std::chrono::time_point<Clock, Duration>& deadline) {
364 return tryEnqueueUntilImpl(v, deadline);
367 template <typename Clock, typename Duration>
368 FOLLY_ALWAYS_INLINE bool try_enqueue_until(
370 const std::chrono::time_point<Clock, Duration>& deadline) {
371 return tryEnqueueUntilImpl(std::move(v), deadline);
374 /** try_enqueue_for */
375 template <typename Rep, typename Period>
376 FOLLY_ALWAYS_INLINE bool try_enqueue_for(
378 const std::chrono::duration<Rep, Period>& duration) {
379 return tryEnqueueForImpl(v, duration);
382 template <typename Rep, typename Period>
383 FOLLY_ALWAYS_INLINE bool try_enqueue_for(
385 const std::chrono::duration<Rep, Period>& duration) {
386 return tryEnqueueForImpl(std::move(v), duration);
389 /// Dequeue functions
392 FOLLY_ALWAYS_INLINE void dequeue(T& elem) {
394 addCredit(WeightFn()(elem));
398 FOLLY_ALWAYS_INLINE bool try_dequeue(T& elem) {
399 if (q_.try_dequeue(elem)) {
400 addCredit(WeightFn()(elem));
406 /** try_dequeue_until */
407 template <typename Clock, typename Duration>
408 FOLLY_ALWAYS_INLINE bool try_dequeue_until(
410 const std::chrono::time_point<Clock, Duration>& deadline) {
411 if (q_.try_dequeue_until(elem, deadline)) {
412 addCredit(WeightFn()(elem));
418 /** try_dequeue_for */
419 template <typename Rep, typename Period>
420 FOLLY_ALWAYS_INLINE bool try_dequeue_for(
422 const std::chrono::duration<Rep, Period>& duration) {
423 if (q_.try_dequeue_for(elem, duration)) {
424 addCredit(WeightFn()(elem));
430 /// Secondary functions
432 /** reset_capacity */
433 void reset_capacity(Weight capacity) noexcept {
434 Weight thresh = threshold(capacity);
435 capacity_.store(capacity + thresh, std::memory_order_release);
436 threshold_.store(thresh, std::memory_order_release);
440 Weight weight() const noexcept {
442 auto c = getCredit();
443 auto t = getTransfer();
444 return d > (c + t) ? d - (c + t) : 0;
448 size_t size() const noexcept {
453 bool empty() const noexcept {
458 /// Private functions ///
460 // Calculation of threshold to move credits in bulk from consumers
462 constexpr Weight threshold(Weight capacity) const noexcept {
463 return capacity / 10;
466 // Functions called frequently by producers
468 template <typename Arg>
469 FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& v) {
471 std::forward<Arg>(v), std::chrono::steady_clock::time_point::max());
474 template <typename Arg>
475 FOLLY_ALWAYS_INLINE bool tryEnqueueImpl(Arg&& v) {
476 return tryEnqueueUntilImpl(
477 std::forward<Arg>(v), std::chrono::steady_clock::time_point::min());
480 template <typename Clock, typename Duration, typename Arg>
481 FOLLY_ALWAYS_INLINE bool tryEnqueueUntilImpl(
483 const std::chrono::time_point<Clock, Duration>& deadline) {
484 Weight weight = WeightFn()(std::forward<Arg>(v));
485 if (LIKELY(tryAddDebit(weight))) {
486 q_.enqueue(std::forward<Arg>(v));
489 return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline);
492 template <typename Rep, typename Period, typename Arg>
493 FOLLY_ALWAYS_INLINE bool tryEnqueueForImpl(
495 const std::chrono::duration<Rep, Period>& duration) {
496 if (LIKELY(tryEnqueueImpl(std::forward<Arg>(v)))) {
499 auto deadline = std::chrono::steady_clock::now() + duration;
500 return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline);
503 FOLLY_ALWAYS_INLINE bool tryAddDebit(Weight weight) noexcept {
504 Weight capacity = getCapacity();
505 Weight before = fetchAddDebit(weight);
506 if (LIKELY(before + weight <= capacity)) {
514 FOLLY_ALWAYS_INLINE Weight getCapacity() const noexcept {
515 return capacity_.load(std::memory_order_acquire);
518 FOLLY_ALWAYS_INLINE Weight fetchAddDebit(Weight weight) noexcept {
520 if (SingleProducer) {
522 debit_.store(before + weight, std::memory_order_relaxed);
524 before = debit_.fetch_add(weight, std::memory_order_acq_rel);
529 FOLLY_ALWAYS_INLINE Weight getDebit() const noexcept {
530 return debit_.load(std::memory_order_acquire);
533 // Functions called frequently by consumers
535 FOLLY_ALWAYS_INLINE void addCredit(Weight weight) noexcept {
536 Weight before = fetchAddCredit(weight);
537 Weight thresh = getThreshold();
538 if (before + weight >= thresh && before < thresh) {
543 FOLLY_ALWAYS_INLINE Weight fetchAddCredit(Weight weight) noexcept {
545 if (SingleConsumer) {
546 before = getCredit();
547 credit_.store(before + weight, std::memory_order_relaxed);
549 before = credit_.fetch_add(weight, std::memory_order_acq_rel);
554 FOLLY_ALWAYS_INLINE Weight getCredit() const noexcept {
555 return credit_.load(std::memory_order_acquire);
558 FOLLY_ALWAYS_INLINE Weight getThreshold() const noexcept {
559 return threshold_.load(std::memory_order_acquire);
562 /** Functions called infrequently by producers */
564 void subDebit(Weight weight) noexcept {
566 if (SingleProducer) {
568 debit_.store(before - weight, std::memory_order_relaxed);
570 before = debit_.fetch_sub(weight, std::memory_order_acq_rel);
572 DCHECK_GE(before, weight);
575 template <typename Clock, typename Duration, typename Arg>
576 bool tryEnqueueUntilSlow(
578 const std::chrono::time_point<Clock, Duration>& deadline) {
579 Weight weight = WeightFn()(std::forward<Arg>(v));
580 if (canEnqueue(deadline, weight)) {
581 q_.enqueue(std::forward<Arg>(v));
588 template <typename Clock, typename Duration>
590 const std::chrono::time_point<Clock, Duration>& deadline,
591 Weight weight) noexcept {
592 Weight capacity = getCapacity();
595 Weight debit = getDebit();
596 if ((debit + weight <= capacity) && tryAddDebit(weight)) {
599 if (Clock::now() >= deadline) {
603 if (canBlock(weight, capacity)) {
604 waiting_.futexWaitUntil(WAITING, deadline);
607 asm_volatile_pause();
612 bool canBlock(Weight weight, Weight capacity) noexcept {
613 waiting_.store(WAITING, std::memory_order_relaxed);
614 std::atomic_thread_fence(std::memory_order_seq_cst);
616 Weight debit = getDebit();
617 return debit + weight > capacity;
620 bool tryReduceDebit() noexcept {
621 Weight w = takeTransfer();
628 Weight takeTransfer() noexcept {
629 Weight w = getTransfer();
631 w = transfer_.exchange(0, std::memory_order_acq_rel);
636 Weight getTransfer() const noexcept {
637 return transfer_.load(std::memory_order_acquire);
640 /** Functions called infrequently by consumers */
642 void transferCredit() noexcept {
643 Weight credit = takeCredit();
644 transfer_.fetch_add(credit, std::memory_order_acq_rel);
646 std::atomic_thread_fence(std::memory_order_seq_cst);
647 waiting_.store(NOTWAITING, std::memory_order_relaxed);
648 waiting_.futexWake();
652 Weight takeCredit() noexcept {
654 if (SingleConsumer) {
655 credit = credit_.load(std::memory_order_relaxed);
656 credit_.store(0, std::memory_order_relaxed);
658 credit = credit_.exchange(0, std::memory_order_acq_rel);
663 }; // DynamicBoundedQueue
671 size_t LgSegmentSize = 8,
673 typename WeightFn = DefaultWeightFn<T>,
674 template <typename> class Atom = std::atomic>
675 using DSPSCQueue = DynamicBoundedQueue<
689 size_t LgSegmentSize = 8,
691 typename WeightFn = DefaultWeightFn<T>,
692 template <typename> class Atom = std::atomic>
693 using DMPSCQueue = DynamicBoundedQueue<
707 size_t LgSegmentSize = 8,
709 typename WeightFn = DefaultWeightFn<T>,
710 template <typename> class Atom = std::atomic>
711 using DSPMCQueue = DynamicBoundedQueue<
725 size_t LgSegmentSize = 8,
727 typename WeightFn = DefaultWeightFn<T>,
728 template <typename> class Atom = std::atomic>
729 using DMPMCQueue = DynamicBoundedQueue<