invoke and member-invoke tweaks
[folly.git] / folly / concurrency / DynamicBoundedQueue.h
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/concurrency/CacheLocality.h>
20 #include <folly/concurrency/UnboundedQueue.h>
21
22 #include <glog/logging.h>
23
24 #include <atomic>
25 #include <chrono>
26
27 namespace folly {
28
29 /// DynamicBoundedQueue supports:
30 /// - Dynamic memory usage that grows and shrink in proportion to the
31 ///   number of elements in the queue.
32 /// - Adjustable capacity that helps throttle pathological cases of
33 ///   producer-consumer imbalance that may lead to excessive memory
34 ///   usage.
35 /// - The adjustable capacity can also help prevent deadlock by
36 ///   allowing users to temporarily increase capacity substantially to
37 ///   guarantee accommodating producer requests that cannot wait.
38 /// - SPSC, SPMC, MPSC, MPMC variants.
39 /// - Blocking and spinning-only variants.
40 /// - Inter-operable non-waiting, timed until, timed for, and waiting
41 ///   variants of producer and consumer operations.
42 /// - Optional variable element weights.
43 ///
44 /// Element Weights
45 /// - Queue elements may have variable weights (calculated using a
46 ///   template parameter) that are by default 1.
47 /// - Element weights count towards the queue's capacity.
48 /// - Elements weights are not priorities and do not affect element
49 ///   order. Queues with variable element weights follow FIFO order,
50 ///   the same as default queues.
51 ///
52 /// When to use DynamicBoundedQueue:
53 /// - If a small maximum capacity may lead to deadlock or performance
54 ///   degradation under bursty patterns and a larger capacity is
55 ///   sufficient.
56 /// - If the typical queue size is expected to be much lower than the
57 ///   maximum capacity
58 /// - If an unbounded queue is susceptible to growing too much.
59 /// - If support for variable element weights is needed.
60 ///
61 /// When not to use DynamicBoundedQueue?
62 /// - If dynamic memory allocation is unacceptable or if the maximum
63 ///   capacity needs to be small, then use fixed-size MPMCQueue or (if
64 ///   non-blocking SPSC) ProducerConsumerQueue.
65 /// - If there is no risk of the queue growing too much, then use
66 ///   UnboundedQueue.
67 ///
68 /// Setting capacity
69 /// - The general rule is to set the capacity as high as acceptable.
70 ///   The queue performs best when it is not near full capacity.
71 /// - The implementation may allow extra slack in capacity (~10%) for
72 ///   amortizing some costly steps. Therefore, precise capacity is not
73 ///   guaranteed and cannot be relied on for synchronization; i.e.,
74 ///   this queue cannot be used as a semaphore.
75 ///
76 /// Performance expectations:
77 /// - As long as the queue size is below capacity in the common case,
78 ///   performance is comparable to MPMCQueue and better in cases of
79 ///   higher producer demand.
80 /// - Performance degrades gracefully at full capacity.
81 /// - It is recommended to measure performance with different variants
82 ///   when applicable, e.g., DMPMC vs DMPSC. Depending on the use
83 ///   case, sometimes the variant with the higher sequential overhead
84 ///   may yield better results due to, for example, more favorable
85 ///   producer-consumer balance or favorable timing for avoiding
86 ///   costly blocking.
87 /// - See DynamicBoundedQueueTest.cpp for some benchmark results.
88 ///
89 /// Template parameters:
90 /// - T: element type
91 /// - SingleProducer: true if there can be only one producer at a
92 ///   time.
93 /// - SingleConsumer: true if there can be only one consumer at a
94 ///   time.
95 /// - MayBlock: true if producers or consumers may block.
96 /// - LgSegmentSize (default 8): Log base 2 of number of elements per
97 ///   UnboundedQueue segment.
98 /// - LgAlign (default 7): Log base 2 of alignment directive; can be
99 ///   used to balance scalability (avoidance of false sharing) with
100 ///   memory efficiency.
101 /// - WeightFn (DefaultWeightFn<T>): A customizable weight computing type
102 ///   for computing the weights of elements. The default weight is 1.
103 ///
104 /// Template Aliases:
105 ///   DSPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
106 ///   DMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
107 ///   DSPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
108 ///   DMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
109 ///
110 /// Functions:
111 ///   Constructor
112 ///     Takes a capacity value as an argument.
113 ///
114 ///   Producer functions:
115 ///     void enqueue(const T&);
116 ///     void enqueue(T&&);
117 ///         Adds an element to the end of the queue. Waits until
118 ///         capacity is available if necessary.
119 ///     bool try_enqueue(const T&);
120 ///     bool try_enqueue(T&&);
121 ///         Tries to add an element to the end of the queue if
122 ///         capacity allows it. Returns true if successful. Otherwise
123 ///         Returns false.
124 ///     bool try_enqueue_until(const T&, time_point& deadline);
125 ///     bool try_enqueue_until(T&&, time_point& deadline);
126 ///         Tries to add an element to the end of the queue if
127 ///         capacity allows it until the specified deadline. Returns
128 ///         true if successful, otherwise false.
129 ///     bool try_enqueue_for(const T&, duration&);
130 ///     bool try_enqueue_for(T&&, duration&);
131 ///         Tries to add an element to the end of the queue if
132 ///         capacity allows until the expiration of the specified
133 ///         duration. Returns true if successful, otherwise false.
134 ///
135 ///   Consumer functions:
136 ///     void dequeue(T&);
137 ///         Extracts an element from the front of the queue. Waits
138 ///         until an element is available if necessary.
139 ///     bool try_dequeue(T&);
140 ///         Tries to extracts an element from the front of the queue
141 ///         if available. Returns true if successful, otherwise false.
142 ///     bool try_dequeue_until(T&, time_point& deadline);
143 ///         Tries to extracts an element from the front of the queue
144 ///         if available until the specified daedline. Returns true
145 ///         if successful. Otherwise Returns false.
146 ///     bool try_dequeue_for(T&, duration&);
147 ///         Tries to extracts an element from the front of the queue
148 ///         if available until the expiration of the specified
149 ///         duration.  Returns true if successful. Otherwise Returns
150 ///         false.
151 ///
152 ///   Secondary functions:
153 ///     void reset_capacity(size_t capacity);
154 ///        Changes the capacity of the queue. Does not affect the
155 ///        current contents of the queue. Guaranteed only to affect
156 ///        subsequent enqueue operations. May or may not affect
157 ///        concurrent operations. Capacity must be at least 1000.
158 ///     Weight weight();
159 ///        Returns an estimate of the total weight of the elements in
160 ///        the queue.
161 ///     size_t size();
162 ///         Returns an estimate of the total number of elements.
163 ///     bool empty();
164 ///         Returns true only if the queue was empty during the call.
165 ///     Note: weight(), size(), and empty() are guaranteed to be
166 ///     accurate only if there are no concurrent changes to the queue.
167 ///
168 /// Usage example with default weight:
169 /// @code
170 ///   /* DMPSC, doesn't block, 1024 int elements per segment */
171 ///   DMPSCQueue<int, false, 10> q(100000);
172 ///   ASSERT_TRUE(q.empty());
173 ///   ASSERT_EQ(q.size(), 0);
174 ///   q.enqueue(1));
175 ///   ASSERT_TRUE(q.try_enqueue(2));
176 ///   ASSERT_TRUE(q.try_enqueue_until(3, deadline));
177 ///   ASSERT_TRUE(q.try_enqueue(4, duration));
178 ///   // ... enqueue more elements until capacity is full
179 ///   // See above comments about imprecise capacity guarantees
180 ///   ASSERT_FALSE(q.try_enqueue(100001)); // can't enqueue but can't wait
181 ///   size_t sz = q.size();
182 ///   ASSERT_GE(sz, 100000);
183 ///   q.reset_capacity(1000000000); // set huge capacity
184 ///   ASSERT_TRUE(q.try_enqueue(100001)); // now enqueue succeeds
185 ///   q.reset_capacity(100000); // set capacity back to 100,000
186 ///   ASSERT_FALSE(q.try_enqueue(100002));
187 ///   ASSERT_EQ(q.size(), sz + 1);
188 ///   int v;
189 ///   q.dequeue(v);
190 ///   ASSERT_EQ(v, 1);
191 ///   ASSERT_TRUE(q.try_dequeue(v));
192 ///   ASSERT_EQ(v, 2);
193 ///   ASSERT_TRUE(q.try_dequeue_until(v, deadline));
194 ///   ASSERT_EQ(v, 3);
195 ///   ASSERT_TRUE(q.try_dequeue_for(v, duration));
196 ///   ASSERT_EQ(v, 4);
197 ///   ASSERT_EQ(q.size(), sz - 3);
198 /// @endcode
199 ///
200 /// Usage example with custom weights:
201 /// @code
202 ///   struct CustomWeightFn {
203 ///     uint64_t operator()(int val) { return val / 100; }
204 ///   };
205 ///   DMPMCQueue<int, false, 10, CustomWeightFn> q(20);
206 ///   ASSERT_TRUE(q.empty());
207 ///   q.enqueue(100);
208 ///   ASSERT_TRUE(q.try_enqueue(200));
209 ///   ASSERT_TRUE(q.try_enqueue_until(500, now() + seconds(1)));
210 ///   ASSERT_EQ(q.size(), 3);
211 ///   ASSERT_EQ(q.weight(), 8);
212 ///   ASSERT_FALSE(q.try_enqueue_for(1700, microseconds(1)));
213 ///   q.reset_capacity(1000000); // set capacity to 1000000 instead of 20
214 ///   ASSERT_TRUE(q.try_enqueue_for(1700, microseconds(1)));
215 ///   q.reset_capacity(20); // set capacity to 20 again
216 ///   ASSERT_FALSE(q.try_enqueue(100));
217 ///   ASSERT_EQ(q.size(), 4);
218 ///   ASSERT_EQ(q.weight(), 25);
219 ///   int v;
220 ///   q.dequeue(v);
221 ///   ASSERT_EQ(v, 100);
222 ///   ASSERT_TRUE(q.try_dequeue(v));
223 ///   ASSERT_EQ(v, 200);
224 ///   ASSERT_TRUE(q.try_dequeue_until(v, now() + seconds(1)));
225 ///   ASSERT_EQ(v, 500);
226 ///   ASSERT_EQ(q.size(), 1);
227 ///   ASSERT_EQ(q.weight(), 17);
228 /// @endcode
229 ///
230 /// Design:
231 /// - The implementation is on top of UnboundedQueue.
232 /// - The main FIFO functionality is in UnboundedQueue.
233 ///   DynamicBoundedQueue manages keeping the total queue weight
234 ///   within the specified capacity.
235 /// - For the sake of scalability, the data structures are designed to
236 ///   minimize interference between producers on one side and
237 ///   consumers on the other.
238 /// - Producers add to a debit variable the weight of the added
239 ///   element and check capacity.
240 /// - Consumers add to a credit variable the weight of the removed
241 ///   element.
242 /// - Producers, for the sake of scalability, use fetch_add to add to
243 ///   the debit variable and subtract if it exceeded capacity,
244 ///   rather than using compare_exchange to avoid overshooting.
245 /// - Consumers, infrequently, transfer credit to a transfer variable
246 ///   and unblock any blocked producers. The transfer variable can be
247 ///   used by producers to decrease their debit when needed.
248 /// - Note that a low capacity will trigger frequent credit transfer
249 ///   by consumers that may degrade performance. Capacity should not
250 ///   be set too low.
251 /// - Transfer of credit by consumers is triggered when the amount of
252 ///   credit reaches a threshold (1/10 of capacity).
253 /// - The waiting of consumers is handled in UnboundedQueue.
254 ///   The waiting of producers is handled in this template.
255 /// - For a producer operation, if the difference between debit and
256 ///   capacity (plus some slack to account for the transfer threshold)
257 ///   does not accommodate the weight of the new element, it first
258 ///   tries to transfer credit that may have already been made
259 ///   available by consumers. If this is insufficient and MayBlock is
260 ///   true, then the producer uses a futex to block until new credit
261 ///   is transferred by a consumer.
262 ///
263 /// Memory Usage:
264 /// - Aside from three cache lines for managing capacity, the memory
265 ///   for queue elements is managed using UnboundedQueue and grows and
266 ///   shrinks dynamically with the number of elements.
267 /// - The template parameter LgAlign can be used to reduce memory usage
268 ///   at the cost of increased chance of false sharing.
269
270 template <typename T>
271 struct DefaultWeightFn {
272   template <typename Arg>
273   uint64_t operator()(Arg&&) const noexcept {
274     return 1;
275   }
276 };
277
278 template <
279     typename T,
280     bool SingleProducer,
281     bool SingleConsumer,
282     bool MayBlock,
283     size_t LgSegmentSize = 8,
284     size_t LgAlign = 7,
285     typename WeightFn = DefaultWeightFn<T>,
286     template <typename> class Atom = std::atomic>
287 class DynamicBoundedQueue {
288   using Weight = uint64_t;
289
290   enum WaitingState : uint32_t {
291     NOTWAITING = 0,
292     WAITING = 1,
293   };
294
295   static constexpr bool SPSC = SingleProducer && SingleConsumer;
296   static constexpr size_t Align = 1u << LgAlign;
297
298   static_assert(LgAlign < 16, "LgAlign must be < 16");
299
300   /// Data members
301
302   // Read mostly by producers
303   alignas(Align) Atom<Weight> debit_; // written frequently only by producers
304   Atom<Weight> capacity_; // written rarely by capacity resets
305
306   // Read mostly by consumers
307   alignas(Align) Atom<Weight> credit_; // written frequently only by consumers
308   Atom<Weight> threshold_; // written rarely only by capacity resets
309
310   // Normally written and read rarely by producers and consumers
311   // May be read frequently by producers when capacity is full
312   alignas(Align) Atom<Weight> transfer_;
313   detail::Futex<Atom> waiting_;
314
315   // Underlying unbounded queue
316   UnboundedQueue<
317       T,
318       SingleProducer,
319       SingleConsumer,
320       MayBlock,
321       LgSegmentSize,
322       LgAlign,
323       Atom>
324       q_;
325
326  public:
327   /** constructor */
328   explicit DynamicBoundedQueue(Weight capacity)
329       : debit_(0),
330         capacity_(capacity + threshold(capacity)), // capacity slack
331         credit_(0),
332         threshold_(threshold(capacity)),
333         transfer_(0) {}
334
335   /** destructor */
336   ~DynamicBoundedQueue() {}
337
338   /// Enqueue functions
339
340   /** enqueue */
341   FOLLY_ALWAYS_INLINE void enqueue(const T& v) {
342     enqueueImpl(v);
343   }
344
345   FOLLY_ALWAYS_INLINE void enqueue(T&& v) {
346     enqueueImpl(std::move(v));
347   }
348
349   /** try_enqueue */
350   FOLLY_ALWAYS_INLINE bool try_enqueue(const T& v) {
351     return tryEnqueueImpl(v);
352   }
353
354   FOLLY_ALWAYS_INLINE bool try_enqueue(T&& v) {
355     return tryEnqueueImpl(std::move(v));
356   }
357
358   /** try_enqueue_until */
359   template <typename Clock, typename Duration>
360   FOLLY_ALWAYS_INLINE bool try_enqueue_until(
361       const T& v,
362       const std::chrono::time_point<Clock, Duration>& deadline) {
363     return tryEnqueueUntilImpl(v, deadline);
364   }
365
366   template <typename Clock, typename Duration>
367   FOLLY_ALWAYS_INLINE bool try_enqueue_until(
368       T&& v,
369       const std::chrono::time_point<Clock, Duration>& deadline) {
370     return tryEnqueueUntilImpl(std::move(v), deadline);
371   }
372
373   /** try_enqueue_for */
374   template <typename Rep, typename Period>
375   FOLLY_ALWAYS_INLINE bool try_enqueue_for(
376       const T& v,
377       const std::chrono::duration<Rep, Period>& duration) {
378     return tryEnqueueForImpl(v, duration);
379   }
380
381   template <typename Rep, typename Period>
382   FOLLY_ALWAYS_INLINE bool try_enqueue_for(
383       T&& v,
384       const std::chrono::duration<Rep, Period>& duration) {
385     return tryEnqueueForImpl(std::move(v), duration);
386   }
387
388   /// Dequeue functions
389
390   /** dequeue */
391   FOLLY_ALWAYS_INLINE void dequeue(T& elem) {
392     q_.dequeue(elem);
393     addCredit(WeightFn()(elem));
394   }
395
396   /** try_dequeue */
397   FOLLY_ALWAYS_INLINE bool try_dequeue(T& elem) {
398     if (q_.try_dequeue(elem)) {
399       addCredit(WeightFn()(elem));
400       return true;
401     }
402     return false;
403   }
404
405   /** try_dequeue_until */
406   template <typename Clock, typename Duration>
407   FOLLY_ALWAYS_INLINE bool try_dequeue_until(
408       T& elem,
409       const std::chrono::time_point<Clock, Duration>& deadline) {
410     if (q_.try_dequeue_until(elem, deadline)) {
411       addCredit(WeightFn()(elem));
412       return true;
413     }
414     return false;
415   }
416
417   /** try_dequeue_for */
418   template <typename Rep, typename Period>
419   FOLLY_ALWAYS_INLINE bool try_dequeue_for(
420       T& elem,
421       const std::chrono::duration<Rep, Period>& duration) {
422     if (q_.try_dequeue_for(elem, duration)) {
423       addCredit(WeightFn()(elem));
424       return true;
425     }
426     return false;
427   }
428
429   /// Secondary functions
430
431   /** reset_capacity */
432   void reset_capacity(Weight capacity) noexcept {
433     Weight thresh = threshold(capacity);
434     capacity_.store(capacity + thresh, std::memory_order_release);
435     threshold_.store(thresh, std::memory_order_release);
436   }
437
438   /** weight */
439   Weight weight() const noexcept {
440     auto d = getDebit();
441     auto c = getCredit();
442     auto t = getTransfer();
443     return d > (c + t) ? d - (c + t) : 0;
444   }
445
446   /** size */
447   size_t size() const noexcept {
448     return q_.size();
449   }
450
451   /** empty */
452   bool empty() const noexcept {
453     return q_.empty();
454   }
455
456  private:
457   /// Private functions ///
458
459   // Calculation of threshold to move credits in bulk from consumers
460   // to producers
461   constexpr Weight threshold(Weight capacity) const noexcept {
462     return capacity / 10;
463   }
464
465   // Functions called frequently by producers
466
467   template <typename Arg>
468   FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& v) {
469     tryEnqueueUntilImpl(
470         std::forward<Arg>(v), std::chrono::steady_clock::time_point::max());
471   }
472
473   template <typename Arg>
474   FOLLY_ALWAYS_INLINE bool tryEnqueueImpl(Arg&& v) {
475     return tryEnqueueUntilImpl(
476         std::forward<Arg>(v), std::chrono::steady_clock::time_point::min());
477   }
478
479   template <typename Clock, typename Duration, typename Arg>
480   FOLLY_ALWAYS_INLINE bool tryEnqueueUntilImpl(
481       Arg&& v,
482       const std::chrono::time_point<Clock, Duration>& deadline) {
483     Weight weight = WeightFn()(std::forward<Arg>(v));
484     if (LIKELY(tryAddDebit(weight))) {
485       q_.enqueue(std::forward<Arg>(v));
486       return true;
487     }
488     return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline);
489   }
490
491   template <typename Rep, typename Period, typename Arg>
492   FOLLY_ALWAYS_INLINE bool tryEnqueueForImpl(
493       Arg&& v,
494       const std::chrono::duration<Rep, Period>& duration) {
495     if (LIKELY(tryEnqueueImpl(std::forward<Arg>(v)))) {
496       return true;
497     }
498     auto deadline = std::chrono::steady_clock::now() + duration;
499     return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline);
500   }
501
502   FOLLY_ALWAYS_INLINE bool tryAddDebit(Weight weight) noexcept {
503     Weight capacity = getCapacity();
504     Weight before = fetchAddDebit(weight);
505     if (LIKELY(before + weight <= capacity)) {
506       return true;
507     } else {
508       subDebit(weight);
509       return false;
510     }
511   }
512
513   FOLLY_ALWAYS_INLINE Weight getCapacity() const noexcept {
514     return capacity_.load(std::memory_order_acquire);
515   }
516
517   FOLLY_ALWAYS_INLINE Weight fetchAddDebit(Weight weight) noexcept {
518     Weight before;
519     if (SingleProducer) {
520       before = getDebit();
521       debit_.store(before + weight, std::memory_order_relaxed);
522     } else {
523       before = debit_.fetch_add(weight, std::memory_order_acq_rel);
524     }
525     return before;
526   }
527
528   FOLLY_ALWAYS_INLINE Weight getDebit() const noexcept {
529     return debit_.load(std::memory_order_acquire);
530   }
531
532   // Functions called frequently by consumers
533
534   FOLLY_ALWAYS_INLINE void addCredit(Weight weight) noexcept {
535     Weight before = fetchAddCredit(weight);
536     Weight thresh = getThreshold();
537     if (before + weight >= thresh && before < thresh) {
538       transferCredit();
539     }
540   }
541
542   FOLLY_ALWAYS_INLINE Weight fetchAddCredit(Weight weight) noexcept {
543     Weight before;
544     if (SingleConsumer) {
545       before = getCredit();
546       credit_.store(before + weight, std::memory_order_relaxed);
547     } else {
548       before = credit_.fetch_add(weight, std::memory_order_acq_rel);
549     }
550     return before;
551   }
552
553   FOLLY_ALWAYS_INLINE Weight getCredit() const noexcept {
554     return credit_.load(std::memory_order_acquire);
555   }
556
557   FOLLY_ALWAYS_INLINE Weight getThreshold() const noexcept {
558     return threshold_.load(std::memory_order_acquire);
559   }
560
561   /** Functions called infrequently by producers */
562
563   void subDebit(Weight weight) noexcept {
564     Weight before;
565     if (SingleProducer) {
566       before = getDebit();
567       debit_.store(before - weight, std::memory_order_relaxed);
568     } else {
569       before = debit_.fetch_sub(weight, std::memory_order_acq_rel);
570     }
571     DCHECK_GE(before, weight);
572   }
573
574   template <typename Clock, typename Duration, typename Arg>
575   bool tryEnqueueUntilSlow(
576       Arg&& v,
577       const std::chrono::time_point<Clock, Duration>& deadline) {
578     Weight weight = WeightFn()(std::forward<Arg>(v));
579     if (canEnqueue(deadline, weight)) {
580       q_.enqueue(std::forward<Arg>(v));
581       return true;
582     } else {
583       return false;
584     }
585   }
586
587   template <typename Clock, typename Duration>
588   bool canEnqueue(
589       const std::chrono::time_point<Clock, Duration>& deadline,
590       Weight weight) noexcept {
591     Weight capacity = getCapacity();
592     while (true) {
593       tryReduceDebit();
594       Weight debit = getDebit();
595       if ((debit + weight <= capacity) && tryAddDebit(weight)) {
596         return true;
597       }
598       if (Clock::now() >= deadline) {
599         return false;
600       }
601       if (MayBlock) {
602         if (canBlock(weight, capacity)) {
603           waiting_.futexWaitUntil(WAITING, deadline);
604         }
605       } else {
606         asm_volatile_pause();
607       }
608     }
609   }
610
611   bool canBlock(Weight weight, Weight capacity) noexcept {
612     waiting_.store(WAITING, std::memory_order_relaxed);
613     std::atomic_thread_fence(std::memory_order_seq_cst);
614     tryReduceDebit();
615     Weight debit = getDebit();
616     return debit + weight > capacity;
617   }
618
619   bool tryReduceDebit() noexcept {
620     Weight w = takeTransfer();
621     if (w > 0) {
622       subDebit(w);
623     }
624     return w > 0;
625   }
626
627   Weight takeTransfer() noexcept {
628     Weight w = getTransfer();
629     if (w > 0) {
630       w = transfer_.exchange(0, std::memory_order_acq_rel);
631     }
632     return w;
633   }
634
635   Weight getTransfer() const noexcept {
636     return transfer_.load(std::memory_order_acquire);
637   }
638
639   /** Functions called infrequently by consumers */
640
641   void transferCredit() noexcept {
642     Weight credit = takeCredit();
643     transfer_.fetch_add(credit, std::memory_order_acq_rel);
644     if (MayBlock) {
645       std::atomic_thread_fence(std::memory_order_seq_cst);
646       waiting_.store(NOTWAITING, std::memory_order_relaxed);
647       waiting_.futexWake();
648     }
649   }
650
651   Weight takeCredit() noexcept {
652     Weight credit;
653     if (SingleConsumer) {
654       credit = credit_.load(std::memory_order_relaxed);
655       credit_.store(0, std::memory_order_relaxed);
656     } else {
657       credit = credit_.exchange(0, std::memory_order_acq_rel);
658     }
659     return credit;
660   }
661
662 }; // DynamicBoundedQueue
663
664 /// Aliases
665
666 /** DSPSCQueue */
667 template <
668     typename T,
669     bool MayBlock,
670     size_t LgSegmentSize = 8,
671     size_t LgAlign = 7,
672     typename WeightFn = DefaultWeightFn<T>,
673     template <typename> class Atom = std::atomic>
674 using DSPSCQueue = DynamicBoundedQueue<
675     T,
676     true,
677     true,
678     MayBlock,
679     LgSegmentSize,
680     LgAlign,
681     WeightFn,
682     Atom>;
683
684 /** DMPSCQueue */
685 template <
686     typename T,
687     bool MayBlock,
688     size_t LgSegmentSize = 8,
689     size_t LgAlign = 7,
690     typename WeightFn = DefaultWeightFn<T>,
691     template <typename> class Atom = std::atomic>
692 using DMPSCQueue = DynamicBoundedQueue<
693     T,
694     false,
695     true,
696     MayBlock,
697     LgSegmentSize,
698     LgAlign,
699     WeightFn,
700     Atom>;
701
702 /** DSPMCQueue */
703 template <
704     typename T,
705     bool MayBlock,
706     size_t LgSegmentSize = 8,
707     size_t LgAlign = 7,
708     typename WeightFn = DefaultWeightFn<T>,
709     template <typename> class Atom = std::atomic>
710 using DSPMCQueue = DynamicBoundedQueue<
711     T,
712     true,
713     false,
714     MayBlock,
715     LgSegmentSize,
716     LgAlign,
717     WeightFn,
718     Atom>;
719
720 /** DMPMCQueue */
721 template <
722     typename T,
723     bool MayBlock,
724     size_t LgSegmentSize = 8,
725     size_t LgAlign = 7,
726     typename WeightFn = DefaultWeightFn<T>,
727     template <typename> class Atom = std::atomic>
728 using DMPMCQueue = DynamicBoundedQueue<
729     T,
730     false,
731     false,
732     MayBlock,
733     LgSegmentSize,
734     LgAlign,
735     WeightFn,
736     Atom>;
737
738 } // namespace folly