Dynamic Bounded Queue
[folly.git] / folly / concurrency / DynamicBoundedQueue.h
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/concurrency/CacheLocality.h>
20 #include <folly/concurrency/UnboundedQueue.h>
21
22 #include <glog/logging.h>
23
24 #include <atomic>
25 #include <chrono>
26
27 namespace folly {
28
29 /// DynamicBoundedQueue supports:
30
31 /// - Dynamic memory usage that grows and shrink in proportion to the
32 ///   number of elements in the queue.
33 /// - Adjustable capacity that helps throttle pathological cases of
34 ///   producer-consumer imbalance that may lead to excessive memory
35 ///   usage.
36 /// - The adjustable capacity can also help prevent deadlock by
37 ///   allowing users to temporarily increase capacity substantially to
38 ///   guarantee accommodating producer requests that cannot wait.
39 /// - SPSC, SPMC, MPSC, MPMC variants.
40 /// - Blocking and spinning-only variants.
41 /// - Inter-operable non-waiting, timed until, timed for, and waiting
42 ///   variants of producer and consumer operations.
43 /// - Optional variable element weights.
44 ///
45 /// Element Weights
46 /// - Queue elements may have variable weights (calculated using a
47 ///   template parameter) that are by default 1.
48 /// - Element weights count towards the queue's capacity.
49 /// - Elements weights are not priorities and do not affect element
50 ///   order. Queues with variable element weights follow FIFO order,
51 ///   the same as default queues.
52 ///
53 /// When to use DynamicBoundedQueue:
54 /// - If a small maximum capacity may lead to deadlock or performance
55 ///   degradation under bursty patterns and a larger capacity is
56 ///   sufficient.
57 /// - If the typical queue size is expected to be much lower than the
58 ///   maximum capacity
59 /// - If an unbounded queue is susceptible to growing too much.
60 /// - If support for variable element weights is needed.
61 ///
62 /// When not to use DynamicBoundedQueue?
63 /// - If dynamic memory allocation is unacceptable or if the maximum
64 ///   capacity needs to be small, then use fixed-size MPMCQueue or (if
65 ///   non-blocking SPSC) ProducerConsumerQueue.
66 /// - If there is no risk of the queue growing too much, then use
67 ///   UnboundedQueue.
68 ///
69 /// Setting capacity
70 /// - The general rule is to set the capacity as high as acceptable.
71 ///   The queue performs best when it is not near full capacity.
72 /// - The implementation may allow extra slack in capacity (~10%) for
73 ///   amortizing some costly steps. Therefore, precise capacity is not
74 ///   guaranteed and cannot be relied on for synchronization; i.e.,
75 ///   this queue cannot be used as a semaphore.
76 ///
77 /// Performance expectations:
78 /// - As long as the queue size is below capacity in the common case,
79 ///   performance is comparable to MPMCQueue and better in cases of
80 ///   higher producer demand.
81 /// - Performance degrades gracefully at full capacity.
82 /// - It is recommended to measure performance with different variants
83 ///   when applicable, e.g., DMPMC vs DMPSC. Depending on the use
84 ///   case, sometimes the variant with the higher sequential overhead
85 ///   may yield better results due to, for example, more favorable
86 ///   producer-consumer balance or favorable timing for avoiding
87 ///   costly blocking.
88 /// - See DynamicBoundedQueueTest.cpp for some benchmark results.
89 ///
90 /// Template prameters:
91 /// - T: element type
92 /// - SingleProducer: true if there can be only one producer at a
93 ///   time.
94 /// - SingleConsumer: true if there can be only one consumer at a
95 ///   time.
96 /// - MayBlock: true if producers or consumers may block.
97 /// - LgSegmentSize (default 8): Log base 2 of number of elements per
98 ///   UnboundedQueue segment.
99 /// - LgAlign (default 7): Log base 2 of alignment directive; can be
100 ///   used to balance scalability (avoidance of false sharing) with
101 ///   memory efficiency.
102 /// - WeightFn (DefaultWeightFn<T>): A customizable weight computing type
103 ///   for computing the weights of elements. The default weight is 1.
104 ///
105 /// Template Aliases:
106 ///   DSPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
107 ///   DMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
108 ///   DSPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
109 ///   DMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
110 ///
111 /// Functions:
112 ///   Constructor
113 ///     Takes a capacity value as an argument.
114 ///
115 ///   Producer functions:
116 ///     void enqueue(const T&);
117 ///     void enqueue(T&&);
118 ///         Adds an element to the end of the queue. Waits until
119 ///         capacity is available if necessary.
120 ///     bool try_enqueue(const T&);
121 ///     bool try_enqueue(T&&);
122 ///         Tries to add an element to the end of the queue if
123 ///         capacity allows it. Returns true if successful. Otherwise
124 ///         Returns false.
125 ///     bool try_enqueue_until(const T&, time_point&);
126 ///     bool try_enqueue_until(T&&, time_point&);
127 ///         Tries to add an element to the end of the queue if
128 ///         capacity allows it until the specified timepoint. Returns
129 ///         true if successful, otherwise false.
130 ///     bool try_enqueue_for(const T&, duration&);
131 ///     bool try_enqueue_for(T&&, duration&);
132 ///         Tries to add an element to the end of the queue if
133 ///         capacity allows it until the specified timepoint. Returns
134 ///         true if successful, otherwise false.
135 ///
136 ///   Consumer functions:
137 ///     void dequeue(T&);
138 ///         Extracts an element from the front of the queue. Waits
139 ///         until an element is available if necessary.
140 ///     bool try_dequeue(T&);
141 ///         Tries to extracts an element from the front of the queue
142 ///         if available. Returns true if successful, otherwise false.
143 ///     bool try_dequeue_until(T&, time_point&);
144 ///         Tries to extracts an element from the front of the queue
145 ///         if available until the specified time_point. Returns true
146 ///         if successful. Otherwise Returns false.
147 ///     bool try_dequeue_for(T&, duration&);
148 ///         Tries to extracts an element from the front of the queue
149 ///         if available until the expiration of the specified
150 ///         duration.  Returns true if successful. Otherwise Returns
151 ///         false.
152 ///
153 ///   Secondary functions:
154 ///     void reset_capacity(size_t capacity);
155 ///        Changes the capacity of the queue. Does not affect the
156 ///        current contents of the queue. Guaranteed only to affect
157 ///        subsequent enqueue operations. May or may not affect
158 ///        concurrent operations. Capacity must be at least 1000.
159 ///     Weight weight();
160 ///        Returns an estimate of the total weight of the elements in
161 ///        the queue.
162 ///     size_t size();
163 ///         Returns an estimate of the total number of elements.
164 ///     bool empty();
165 ///         Returns true only if the queue was empty during the call.
166 ///     Note: weight(), size(), and empty() are guaranteed to be
167 ///     accurate only if there are no concurrent changes to the queue.
168 ///
169 /// Usage example with default weight:
170 /// @code
171 ///   /* DMPSC, doesn't block, 1024 int elements per segment */
172 ///   DMPSCQueue<int, false, 10> q(100000);
173 ///   ASSERT_TRUE(q.empty());
174 ///   ASSERT_EQ(q.size(), 0);
175 ///   q.enqueue(1));
176 ///   ASSERT_TRUE(q.try_enqueue(2));
177 ///   ASSERT_TRUE(q.try_enqueue_until(3, deadline));
178 ///   ASSERT_TRUE(q.try_enqueue(4, duration));
179 ///   // ... enqueue more elements until capacity is full
180 ///   // See above comments about imprecise capacity guarantees
181 ///   ASSERT_FALSE(q.try_enqueue(100001)); // can't enqueue but can't wait
182 ///   size_t sz = q.size();
183 ///   ASSERT_GE(sz, 100000);
184 ///   q.reset_capacity(1000000000); // set huge capacity
185 ///   ASSERT_TRUE(q.try_enqueue(100001)); // now enqueue succeeds
186 ///   q.reset_capacity(100000); // set capacity back to 100,000
187 ///   ASSERT_FALSE(q.try_enqueue(100002));
188 ///   ASSERT_EQ(q.size(), sz + 1);
189 ///   int v;
190 ///   q.dequeue(v);
191 ///   ASSERT_EQ(v, 1);
192 ///   ASSERT_TRUE(q.try_dequeue(v));
193 ///   ASSERT_EQ(v, 2);
194 ///   ASSERT_TRUE(q.try_dequeue_until(v, deadline));
195 ///   ASSERT_EQ(v, 3);
196 ///   ASSERT_TRUE(q.try_dequeue_for(v, duration));
197 ///   ASSERT_EQ(v, 4);
198 ///   ASSERT_EQ(q.size(), sz - 3);
199 /// @endcode
200 ///
201 /// Usage example with custom weights:
202 /// @code
203 ///   struct CustomWeightFn {
204 ///     uint64_t operator()(int val) { return val / 100; }
205 ///   };
206 ///   DMPMCQueue<int, false, 10, CustomWeightFn> q(20);
207 ///   ASSERT_TRUE(q.empty());
208 ///   q.enqueue(100);
209 ///   ASSERT_TRUE(q.try_enqueue(200));
210 ///   ASSERT_TRUE(q.try_enqueue_until(500, now() + seconds(1)));
211 ///   ASSERT_EQ(q.size(), 3);
212 ///   ASSERT_EQ(q.weight(), 8);
213 ///   ASSERT_FALSE(q.try_enqueue_for(1700, microseconds(1)));
214 ///   q.reset_capacity(1000000); // set capacity to 1000000 instead of 20
215 ///   ASSERT_TRUE(q.try_enqueue_for(1700, microseconds(1)));
216 ///   q.reset_capacity(20); // set capacity to 20 again
217 ///   ASSERT_FALSE(q.try_enqueue(100));
218 ///   ASSERT_EQ(q.size(), 4);
219 ///   ASSERT_EQ(q.weight(), 25);
220 ///   int v;
221 ///   q.dequeue(v);
222 ///   ASSERT_EQ(v, 100);
223 ///   ASSERT_TRUE(q.try_dequeue(v));
224 ///   ASSERT_EQ(v, 200);
225 ///   ASSERT_TRUE(q.try_dequeue_until(v, now() + seconds(1)));
226 ///   ASSERT_EQ(v, 500);
227 ///   ASSERT_EQ(q.size(), 1);
228 ///   ASSERT_EQ(q.weight(), 17);
229 /// @endcode
230 ///
231 /// Design:
232 /// - The implementation is on top of UnboundedQueue.
233 /// - The main FIFO functionality is in UnboundedQueue.
234 ///   DynamicBoundedQueue manages keeping the total queue weight
235 ///   within the specified capacity.
236 /// - For the sake of scalability, the data structures are designed to
237 ///   minimize interference between producers on one side and
238 ///   consumers on the other.
239 /// - Producers add to a debit variable the weight of the added
240 ///   element and check capacity.
241 /// - Consumers add to a credit variable the weight of the removed
242 ///   element.
243 /// - Producers, for the sake of scalability, use fetch_add to add to
244 ///   the debit variable and subtract if it exceeded capacity,
245 ///   rather than using compare_exchange to avoid overshooting.
246 /// - Consumers, infrequently, transfer credit to a transfer variable
247 ///   and unblock any blocked producers. The transfer variable can be
248 ///   used by producers to decrease their debit when needed.
249 /// - Note that a low capacity will trigger frequent credit transfer
250 ///   by consumers that may degrade performance. Capacity should not
251 ///   be set too low.
252 /// - Transfer of credit by consumers is triggered when the amount of
253 ///   credit reaches a threshold (1/10 of capacity).
254 /// - The waiting of consumers is handled in UnboundedQueue.
255 ///   The waiting of producers is handled in this template.
256 /// - For a producer operation, if the difference between debit and
257 ///   capacity (plus some slack to account for the transfer threshold)
258 ///   does not accommodate the weight of the new element, it first
259 ///   tries to transfer credit that may have already been made
260 ///   available by consumers. If this is insufficient and MayBlock is
261 ///   true, then the producer uses a futex to block until new credit
262 ///   is transferred by a consumer.
263 ///
264 /// Memory Usage:
265 /// - Aside from three cache lines for managing capacity, the memory
266 ///   for queue elements is managed using UnboundedQueue and grows and
267 ///   shrinks dynamically with the number of elements.
268 /// - The template parameter LgAlign can be used to reduce memory usage
269 ///   at the cost of increased chance of false sharing.
270
271 template <typename T>
272 struct DefaultWeightFn {
273   template <typename Arg>
274   uint64_t operator()(Arg&&) const noexcept {
275     return 1;
276   }
277 };
278
279 template <
280     typename T,
281     bool SingleProducer,
282     bool SingleConsumer,
283     bool MayBlock,
284     size_t LgSegmentSize = 8,
285     size_t LgAlign = 7,
286     typename WeightFn = DefaultWeightFn<T>,
287     template <typename> class Atom = std::atomic>
288 class DynamicBoundedQueue {
289   using Weight = uint64_t;
290
291   enum WaitingState : uint32_t {
292     NOTWAITING = 0,
293     WAITING = 1,
294   };
295
296   static constexpr bool SPSC = SingleProducer && SingleConsumer;
297   static constexpr size_t Align = 1u << LgAlign;
298
299   static_assert(LgAlign < 16, "LgAlign must be < 16");
300
301   /// Data members
302
303   // Read mostly by producers
304   alignas(Align) Atom<Weight> debit_; // written frequently only by producers
305   Atom<Weight> capacity_; // written rarely by capacity resets
306
307   // Read mostly by consumers
308   alignas(Align) Atom<Weight> credit_; // written frequently only by consumers
309   Atom<Weight> threshold_; // written rarely only by capacity resets
310
311   // Normally written and read rarely by producers and consumers
312   // May be read frequently by producers when capacity is full
313   alignas(Align) Atom<Weight> transfer_;
314   detail::Futex<Atom> waiting_;
315
316   // Underlying unbounded queue
317   UnboundedQueue<
318       T,
319       SingleProducer,
320       SingleConsumer,
321       MayBlock,
322       LgSegmentSize,
323       LgAlign,
324       Atom>
325       q_;
326
327  public:
328   /** constructor */
329   explicit DynamicBoundedQueue(Weight capacity)
330       : debit_(0),
331         capacity_(capacity + threshold(capacity)), // capacity slack
332         credit_(0),
333         threshold_(threshold(capacity)),
334         transfer_(0) {}
335
336   /** destructor */
337   ~DynamicBoundedQueue() {}
338
339   /// Enqueue functions
340
341   /** enqueue */
342   FOLLY_ALWAYS_INLINE void enqueue(const T& v) {
343     enqueueImpl(v);
344   }
345
346   FOLLY_ALWAYS_INLINE void enqueue(T&& v) {
347     enqueueImpl(std::move(v));
348   }
349
350   /** try_enqueue */
351   FOLLY_ALWAYS_INLINE bool try_enqueue(const T& v) {
352     return tryEnqueueImpl(v);
353   }
354
355   FOLLY_ALWAYS_INLINE bool try_enqueue(T&& v) {
356     return tryEnqueueImpl(std::move(v));
357   }
358
359   /** try_enqueue_until */
360   template <typename Clock, typename Duration>
361   FOLLY_ALWAYS_INLINE bool try_enqueue_until(
362       const T& v,
363       const std::chrono::time_point<Clock, Duration>& deadline) {
364     return tryEnqueueUntilImpl(v, deadline);
365   }
366
367   template <typename Clock, typename Duration>
368   FOLLY_ALWAYS_INLINE bool try_enqueue_until(
369       T&& v,
370       const std::chrono::time_point<Clock, Duration>& deadline) {
371     return tryEnqueueUntilImpl(std::move(v), deadline);
372   }
373
374   /** try_enqueue_for */
375   template <typename Rep, typename Period>
376   FOLLY_ALWAYS_INLINE bool try_enqueue_for(
377       const T& v,
378       const std::chrono::duration<Rep, Period>& duration) {
379     return tryEnqueueForImpl(v, duration);
380   }
381
382   template <typename Rep, typename Period>
383   FOLLY_ALWAYS_INLINE bool try_enqueue_for(
384       T&& v,
385       const std::chrono::duration<Rep, Period>& duration) {
386     return tryEnqueueForImpl(std::move(v), duration);
387   }
388
389   /// Dequeue functions
390
391   /** dequeue */
392   FOLLY_ALWAYS_INLINE void dequeue(T& elem) {
393     q_.dequeue(elem);
394     addCredit(WeightFn()(elem));
395   }
396
397   /** try_dequeue */
398   FOLLY_ALWAYS_INLINE bool try_dequeue(T& elem) {
399     if (q_.try_dequeue(elem)) {
400       addCredit(WeightFn()(elem));
401       return true;
402     }
403     return false;
404   }
405
406   /** try_dequeue_until */
407   template <typename Clock, typename Duration>
408   FOLLY_ALWAYS_INLINE bool try_dequeue_until(
409       T& elem,
410       const std::chrono::time_point<Clock, Duration>& deadline) {
411     if (q_.try_dequeue_until(elem, deadline)) {
412       addCredit(WeightFn()(elem));
413       return true;
414     }
415     return false;
416   }
417
418   /** try_dequeue_for */
419   template <typename Rep, typename Period>
420   FOLLY_ALWAYS_INLINE bool try_dequeue_for(
421       T& elem,
422       const std::chrono::duration<Rep, Period>& duration) {
423     if (q_.try_dequeue_for(elem, duration)) {
424       addCredit(WeightFn()(elem));
425       return true;
426     }
427     return false;
428   }
429
430   /// Secondary functions
431
432   /** reset_capacity */
433   void reset_capacity(Weight capacity) noexcept {
434     Weight thresh = threshold(capacity);
435     capacity_.store(capacity + thresh, std::memory_order_release);
436     threshold_.store(thresh, std::memory_order_release);
437   }
438
439   /** weight */
440   Weight weight() const noexcept {
441     auto d = getDebit();
442     auto c = getCredit();
443     auto t = getTransfer();
444     return d > (c + t) ? d - (c + t) : 0;
445   }
446
447   /** size */
448   size_t size() const noexcept {
449     return q_.size();
450   }
451
452   /** empty */
453   bool empty() const noexcept {
454     return q_.empty();
455   }
456
457  private:
458   /// Private functions ///
459
460   // Calculation of threshold to move credits in bulk from consumers
461   // to producers
462   constexpr Weight threshold(Weight capacity) const noexcept {
463     return capacity / 10;
464   }
465
466   // Functions called frequently by producers
467
468   template <typename Arg>
469   FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& v) {
470     tryEnqueueUntilImpl(
471         std::forward<Arg>(v), std::chrono::steady_clock::time_point::max());
472   }
473
474   template <typename Arg>
475   FOLLY_ALWAYS_INLINE bool tryEnqueueImpl(Arg&& v) {
476     return tryEnqueueUntilImpl(
477         std::forward<Arg>(v), std::chrono::steady_clock::time_point::min());
478   }
479
480   template <typename Clock, typename Duration, typename Arg>
481   FOLLY_ALWAYS_INLINE bool tryEnqueueUntilImpl(
482       Arg&& v,
483       const std::chrono::time_point<Clock, Duration>& deadline) {
484     Weight weight = WeightFn()(std::forward<Arg>(v));
485     if (LIKELY(tryAddDebit(weight))) {
486       q_.enqueue(std::forward<Arg>(v));
487       return true;
488     }
489     return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline);
490   }
491
492   template <typename Rep, typename Period, typename Arg>
493   FOLLY_ALWAYS_INLINE bool tryEnqueueForImpl(
494       Arg&& v,
495       const std::chrono::duration<Rep, Period>& duration) {
496     if (LIKELY(tryEnqueueImpl(std::forward<Arg>(v)))) {
497       return true;
498     }
499     auto deadline = std::chrono::steady_clock::now() + duration;
500     return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline);
501   }
502
503   FOLLY_ALWAYS_INLINE bool tryAddDebit(Weight weight) noexcept {
504     Weight capacity = getCapacity();
505     Weight before = fetchAddDebit(weight);
506     if (LIKELY(before + weight <= capacity)) {
507       return true;
508     } else {
509       subDebit(weight);
510       return false;
511     }
512   }
513
514   FOLLY_ALWAYS_INLINE Weight getCapacity() const noexcept {
515     return capacity_.load(std::memory_order_acquire);
516   }
517
518   FOLLY_ALWAYS_INLINE Weight fetchAddDebit(Weight weight) noexcept {
519     Weight before;
520     if (SingleProducer) {
521       before = getDebit();
522       debit_.store(before + weight, std::memory_order_relaxed);
523     } else {
524       before = debit_.fetch_add(weight, std::memory_order_acq_rel);
525     }
526     return before;
527   }
528
529   FOLLY_ALWAYS_INLINE Weight getDebit() const noexcept {
530     return debit_.load(std::memory_order_acquire);
531   }
532
533   // Functions called frequently by consumers
534
535   FOLLY_ALWAYS_INLINE void addCredit(Weight weight) noexcept {
536     Weight before = fetchAddCredit(weight);
537     Weight thresh = getThreshold();
538     if (before + weight >= thresh && before < thresh) {
539       transferCredit();
540     }
541   }
542
543   FOLLY_ALWAYS_INLINE Weight fetchAddCredit(Weight weight) noexcept {
544     Weight before;
545     if (SingleConsumer) {
546       before = getCredit();
547       credit_.store(before + weight, std::memory_order_relaxed);
548     } else {
549       before = credit_.fetch_add(weight, std::memory_order_acq_rel);
550     }
551     return before;
552   }
553
554   FOLLY_ALWAYS_INLINE Weight getCredit() const noexcept {
555     return credit_.load(std::memory_order_acquire);
556   }
557
558   FOLLY_ALWAYS_INLINE Weight getThreshold() const noexcept {
559     return threshold_.load(std::memory_order_acquire);
560   }
561
562   /** Functions called infrequently by producers */
563
564   void subDebit(Weight weight) noexcept {
565     Weight before;
566     if (SingleProducer) {
567       before = getDebit();
568       debit_.store(before - weight, std::memory_order_relaxed);
569     } else {
570       before = debit_.fetch_sub(weight, std::memory_order_acq_rel);
571     }
572     DCHECK_GE(before, weight);
573   }
574
575   template <typename Clock, typename Duration, typename Arg>
576   bool tryEnqueueUntilSlow(
577       Arg&& v,
578       const std::chrono::time_point<Clock, Duration>& deadline) {
579     Weight weight = WeightFn()(std::forward<Arg>(v));
580     if (canEnqueue(deadline, weight)) {
581       q_.enqueue(std::forward<Arg>(v));
582       return true;
583     } else {
584       return false;
585     }
586   }
587
588   template <typename Clock, typename Duration>
589   bool canEnqueue(
590       const std::chrono::time_point<Clock, Duration>& deadline,
591       Weight weight) noexcept {
592     Weight capacity = getCapacity();
593     while (true) {
594       tryReduceDebit();
595       Weight debit = getDebit();
596       if ((debit + weight <= capacity) && tryAddDebit(weight)) {
597         return true;
598       }
599       if (Clock::now() >= deadline) {
600         return false;
601       }
602       if (MayBlock) {
603         if (canBlock(weight, capacity)) {
604           waiting_.futexWaitUntil(WAITING, deadline);
605         }
606       } else {
607         asm_volatile_pause();
608       }
609     }
610   }
611
612   bool canBlock(Weight weight, Weight capacity) noexcept {
613     waiting_.store(WAITING, std::memory_order_relaxed);
614     std::atomic_thread_fence(std::memory_order_seq_cst);
615     tryReduceDebit();
616     Weight debit = getDebit();
617     return debit + weight > capacity;
618   }
619
620   bool tryReduceDebit() noexcept {
621     Weight w = takeTransfer();
622     if (w > 0) {
623       subDebit(w);
624     }
625     return w > 0;
626   }
627
628   Weight takeTransfer() noexcept {
629     Weight w = getTransfer();
630     if (w > 0) {
631       w = transfer_.exchange(0, std::memory_order_acq_rel);
632     }
633     return w;
634   }
635
636   Weight getTransfer() const noexcept {
637     return transfer_.load(std::memory_order_acquire);
638   }
639
640   /** Functions called infrequently by consumers */
641
642   void transferCredit() noexcept {
643     Weight credit = takeCredit();
644     transfer_.fetch_add(credit, std::memory_order_acq_rel);
645     if (MayBlock) {
646       std::atomic_thread_fence(std::memory_order_seq_cst);
647       waiting_.store(NOTWAITING, std::memory_order_relaxed);
648       waiting_.futexWake();
649     }
650   }
651
652   Weight takeCredit() noexcept {
653     Weight credit;
654     if (SingleConsumer) {
655       credit = credit_.load(std::memory_order_relaxed);
656       credit_.store(0, std::memory_order_relaxed);
657     } else {
658       credit = credit_.exchange(0, std::memory_order_acq_rel);
659     }
660     return credit;
661   }
662
663 }; // DynamicBoundedQueue
664
665 /// Aliases
666
667 /** DSPSCQueue */
668 template <
669     typename T,
670     bool MayBlock,
671     size_t LgSegmentSize = 8,
672     size_t LgAlign = 7,
673     typename WeightFn = DefaultWeightFn<T>,
674     template <typename> class Atom = std::atomic>
675 using DSPSCQueue = DynamicBoundedQueue<
676     T,
677     true,
678     true,
679     MayBlock,
680     LgSegmentSize,
681     LgAlign,
682     WeightFn,
683     Atom>;
684
685 /** DMPSCQueue */
686 template <
687     typename T,
688     bool MayBlock,
689     size_t LgSegmentSize = 8,
690     size_t LgAlign = 7,
691     typename WeightFn = DefaultWeightFn<T>,
692     template <typename> class Atom = std::atomic>
693 using DMPSCQueue = DynamicBoundedQueue<
694     T,
695     false,
696     true,
697     MayBlock,
698     LgSegmentSize,
699     LgAlign,
700     WeightFn,
701     Atom>;
702
703 /** DSPMCQueue */
704 template <
705     typename T,
706     bool MayBlock,
707     size_t LgSegmentSize = 8,
708     size_t LgAlign = 7,
709     typename WeightFn = DefaultWeightFn<T>,
710     template <typename> class Atom = std::atomic>
711 using DSPMCQueue = DynamicBoundedQueue<
712     T,
713     true,
714     false,
715     MayBlock,
716     LgSegmentSize,
717     LgAlign,
718     WeightFn,
719     Atom>;
720
721 /** DMPMCQueue */
722 template <
723     typename T,
724     bool MayBlock,
725     size_t LgSegmentSize = 8,
726     size_t LgAlign = 7,
727     typename WeightFn = DefaultWeightFn<T>,
728     template <typename> class Atom = std::atomic>
729 using DMPMCQueue = DynamicBoundedQueue<
730     T,
731     false,
732     false,
733     MayBlock,
734     LgSegmentSize,
735     LgAlign,
736     WeightFn,
737     Atom>;
738
739 } // namespace folly