Adds writer test case for RCU
[folly.git] / folly / concurrency / DynamicBoundedQueue.h
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/concurrency/CacheLocality.h>
20 #include <folly/concurrency/UnboundedQueue.h>
21
22 #include <glog/logging.h>
23
24 #include <atomic>
25 #include <chrono>
26
27 namespace folly {
28
29 /// DynamicBoundedQueue supports:
30 /// - Dynamic memory usage that grows and shrink in proportion to the
31 ///   number of elements in the queue.
32 /// - Adjustable capacity that helps throttle pathological cases of
33 ///   producer-consumer imbalance that may lead to excessive memory
34 ///   usage.
35 /// - The adjustable capacity can also help prevent deadlock by
36 ///   allowing users to temporarily increase capacity substantially to
37 ///   guarantee accommodating producer requests that cannot wait.
38 /// - SPSC, SPMC, MPSC, MPMC variants.
39 /// - Blocking and spinning-only variants.
40 /// - Inter-operable non-waiting, timed until, timed for, and waiting
41 ///   variants of producer and consumer operations.
42 /// - Optional variable element weights.
43 ///
44 /// Element Weights
45 /// - Queue elements may have variable weights (calculated using a
46 ///   template parameter) that are by default 1.
47 /// - Element weights count towards the queue's capacity.
48 /// - Elements weights are not priorities and do not affect element
49 ///   order. Queues with variable element weights follow FIFO order,
50 ///   the same as default queues.
51 ///
52 /// When to use DynamicBoundedQueue:
53 /// - If a small maximum capacity may lead to deadlock or performance
54 ///   degradation under bursty patterns and a larger capacity is
55 ///   sufficient.
56 /// - If the typical queue size is expected to be much lower than the
57 ///   maximum capacity
58 /// - If an unbounded queue is susceptible to growing too much.
59 /// - If support for variable element weights is needed.
60 ///
61 /// When not to use DynamicBoundedQueue?
62 /// - If dynamic memory allocation is unacceptable or if the maximum
63 ///   capacity needs to be small, then use fixed-size MPMCQueue or (if
64 ///   non-blocking SPSC) ProducerConsumerQueue.
65 /// - If there is no risk of the queue growing too much, then use
66 ///   UnboundedQueue.
67 ///
68 /// Setting capacity
69 /// - The general rule is to set the capacity as high as acceptable.
70 ///   The queue performs best when it is not near full capacity.
71 /// - The implementation may allow extra slack in capacity (~10%) for
72 ///   amortizing some costly steps. Therefore, precise capacity is not
73 ///   guaranteed and cannot be relied on for synchronization; i.e.,
74 ///   this queue cannot be used as a semaphore.
75 ///
76 /// Performance expectations:
77 /// - As long as the queue size is below capacity in the common case,
78 ///   performance is comparable to MPMCQueue and better in cases of
79 ///   higher producer demand.
80 /// - Performance degrades gracefully at full capacity.
81 /// - It is recommended to measure performance with different variants
82 ///   when applicable, e.g., DMPMC vs DMPSC. Depending on the use
83 ///   case, sometimes the variant with the higher sequential overhead
84 ///   may yield better results due to, for example, more favorable
85 ///   producer-consumer balance or favorable timing for avoiding
86 ///   costly blocking.
87 /// - See DynamicBoundedQueueTest.cpp for some benchmark results.
88 ///
89 /// Template parameters:
90 /// - T: element type
91 /// - SingleProducer: true if there can be only one producer at a
92 ///   time.
93 /// - SingleConsumer: true if there can be only one consumer at a
94 ///   time.
95 /// - MayBlock: true if producers or consumers may block.
96 /// - LgSegmentSize (default 8): Log base 2 of number of elements per
97 ///   UnboundedQueue segment.
98 /// - LgAlign (default 7): Log base 2 of alignment directive; can be
99 ///   used to balance scalability (avoidance of false sharing) with
100 ///   memory efficiency.
101 /// - WeightFn (DefaultWeightFn<T>): A customizable weight computing type
102 ///   for computing the weights of elements. The default weight is 1.
103 ///
104 /// Template Aliases:
105 ///   DSPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
106 ///   DMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
107 ///   DSPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
108 ///   DMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
109 ///
110 /// Functions:
111 ///   Constructor
112 ///     Takes a capacity value as an argument.
113 ///
114 ///   Producer functions:
115 ///     void enqueue(const T&);
116 ///     void enqueue(T&&);
117 ///         Adds an element to the end of the queue. Waits until
118 ///         capacity is available if necessary.
119 ///     bool try_enqueue(const T&);
120 ///     bool try_enqueue(T&&);
121 ///         Tries to add an element to the end of the queue if
122 ///         capacity allows it. Returns true if successful. Otherwise
123 ///         Returns false.
124 ///     bool try_enqueue_until(const T&, time_point& deadline);
125 ///     bool try_enqueue_until(T&&, time_point& deadline);
126 ///         Tries to add an element to the end of the queue if
127 ///         capacity allows it until the specified deadline. Returns
128 ///         true if successful, otherwise false.
129 ///     bool try_enqueue_for(const T&, duration&);
130 ///     bool try_enqueue_for(T&&, duration&);
131 ///         Tries to add an element to the end of the queue if
132 ///         capacity allows until the expiration of the specified
133 ///         duration. Returns true if successful, otherwise false.
134 ///
135 ///   Consumer functions:
136 ///     void dequeue(T&);
137 ///         Extracts an element from the front of the queue. Waits
138 ///         until an element is available if necessary.
139 ///     bool try_dequeue(T&);
140 ///         Tries to extracts an element from the front of the queue
141 ///         if available. Returns true if successful, otherwise false.
142 ///     bool try_dequeue_until(T&, time_point& deadline);
143 ///         Tries to extracts an element from the front of the queue
144 ///         if available until the specified daedline. Returns true
145 ///         if successful. Otherwise Returns false.
146 ///     bool try_dequeue_for(T&, duration&);
147 ///         Tries to extracts an element from the front of the queue
148 ///         if available until the expiration of the specified
149 ///         duration.  Returns true if successful. Otherwise Returns
150 ///         false.
151 ///
152 ///   Secondary functions:
153 ///     void reset_capacity(size_t capacity);
154 ///        Changes the capacity of the queue. Does not affect the
155 ///        current contents of the queue. Guaranteed only to affect
156 ///        subsequent enqueue operations. May or may not affect
157 ///        concurrent operations. Capacity must be at least 1000.
158 ///     Weight weight();
159 ///        Returns an estimate of the total weight of the elements in
160 ///        the queue.
161 ///     size_t size();
162 ///         Returns an estimate of the total number of elements.
163 ///     bool empty();
164 ///         Returns true only if the queue was empty during the call.
165 ///     Note: weight(), size(), and empty() are guaranteed to be
166 ///     accurate only if there are no concurrent changes to the queue.
167 ///
168 /// Usage example with default weight:
169 /// @code
170 ///   /* DMPSC, doesn't block, 1024 int elements per segment */
171 ///   DMPSCQueue<int, false, 10> q(100000);
172 ///   ASSERT_TRUE(q.empty());
173 ///   ASSERT_EQ(q.size(), 0);
174 ///   q.enqueue(1));
175 ///   ASSERT_TRUE(q.try_enqueue(2));
176 ///   ASSERT_TRUE(q.try_enqueue_until(3, deadline));
177 ///   ASSERT_TRUE(q.try_enqueue(4, duration));
178 ///   // ... enqueue more elements until capacity is full
179 ///   // See above comments about imprecise capacity guarantees
180 ///   ASSERT_FALSE(q.try_enqueue(100001)); // can't enqueue but can't wait
181 ///   size_t sz = q.size();
182 ///   ASSERT_GE(sz, 100000);
183 ///   q.reset_capacity(1000000000); // set huge capacity
184 ///   ASSERT_TRUE(q.try_enqueue(100001)); // now enqueue succeeds
185 ///   q.reset_capacity(100000); // set capacity back to 100,000
186 ///   ASSERT_FALSE(q.try_enqueue(100002));
187 ///   ASSERT_EQ(q.size(), sz + 1);
188 ///   int v;
189 ///   q.dequeue(v);
190 ///   ASSERT_EQ(v, 1);
191 ///   ASSERT_TRUE(q.try_dequeue(v));
192 ///   ASSERT_EQ(v, 2);
193 ///   ASSERT_TRUE(q.try_dequeue_until(v, deadline));
194 ///   ASSERT_EQ(v, 3);
195 ///   ASSERT_TRUE(q.try_dequeue_for(v, duration));
196 ///   ASSERT_EQ(v, 4);
197 ///   ASSERT_EQ(q.size(), sz - 3);
198 /// @endcode
199 ///
200 /// Usage example with custom weights:
201 /// @code
202 ///   struct CustomWeightFn {
203 ///     uint64_t operator()(int val) { return val / 100; }
204 ///   };
205 ///   DMPMCQueue<int, false, 10, CustomWeightFn> q(20);
206 ///   ASSERT_TRUE(q.empty());
207 ///   q.enqueue(100);
208 ///   ASSERT_TRUE(q.try_enqueue(200));
209 ///   ASSERT_TRUE(q.try_enqueue_until(500, now() + seconds(1)));
210 ///   ASSERT_EQ(q.size(), 3);
211 ///   ASSERT_EQ(q.weight(), 8);
212 ///   ASSERT_FALSE(q.try_enqueue_for(1700, microseconds(1)));
213 ///   q.reset_capacity(1000000); // set capacity to 1000000 instead of 20
214 ///   ASSERT_TRUE(q.try_enqueue_for(1700, microseconds(1)));
215 ///   q.reset_capacity(20); // set capacity to 20 again
216 ///   ASSERT_FALSE(q.try_enqueue(100));
217 ///   ASSERT_EQ(q.size(), 4);
218 ///   ASSERT_EQ(q.weight(), 25);
219 ///   int v;
220 ///   q.dequeue(v);
221 ///   ASSERT_EQ(v, 100);
222 ///   ASSERT_TRUE(q.try_dequeue(v));
223 ///   ASSERT_EQ(v, 200);
224 ///   ASSERT_TRUE(q.try_dequeue_until(v, now() + seconds(1)));
225 ///   ASSERT_EQ(v, 500);
226 ///   ASSERT_EQ(q.size(), 1);
227 ///   ASSERT_EQ(q.weight(), 17);
228 /// @endcode
229 ///
230 /// Design:
231 /// - The implementation is on top of UnboundedQueue.
232 /// - The main FIFO functionality is in UnboundedQueue.
233 ///   DynamicBoundedQueue manages keeping the total queue weight
234 ///   within the specified capacity.
235 /// - For the sake of scalability, the data structures are designed to
236 ///   minimize interference between producers on one side and
237 ///   consumers on the other.
238 /// - Producers add to a debit variable the weight of the added
239 ///   element and check capacity.
240 /// - Consumers add to a credit variable the weight of the removed
241 ///   element.
242 /// - Producers, for the sake of scalability, use fetch_add to add to
243 ///   the debit variable and subtract if it exceeded capacity,
244 ///   rather than using compare_exchange to avoid overshooting.
245 /// - Consumers, infrequently, transfer credit to a transfer variable
246 ///   and unblock any blocked producers. The transfer variable can be
247 ///   used by producers to decrease their debit when needed.
248 /// - Note that a low capacity will trigger frequent credit transfer
249 ///   by consumers that may degrade performance. Capacity should not
250 ///   be set too low.
251 /// - Transfer of credit by consumers is triggered when the amount of
252 ///   credit reaches a threshold (1/10 of capacity).
253 /// - The waiting of consumers is handled in UnboundedQueue.
254 ///   The waiting of producers is handled in this template.
255 /// - For a producer operation, if the difference between debit and
256 ///   capacity (plus some slack to account for the transfer threshold)
257 ///   does not accommodate the weight of the new element, it first
258 ///   tries to transfer credit that may have already been made
259 ///   available by consumers. If this is insufficient and MayBlock is
260 ///   true, then the producer uses a futex to block until new credit
261 ///   is transferred by a consumer.
262 ///
263 /// Memory Usage:
264 /// - Aside from three cache lines for managing capacity, the memory
265 ///   for queue elements is managed using UnboundedQueue and grows and
266 ///   shrinks dynamically with the number of elements.
267 /// - The template parameter LgAlign can be used to reduce memory usage
268 ///   at the cost of increased chance of false sharing.
269
270 template <typename T>
271 struct DefaultWeightFn {
272   template <typename Arg>
273   uint64_t operator()(Arg&&) const noexcept {
274     return 1;
275   }
276 };
277
278 template <
279     typename T,
280     bool SingleProducer,
281     bool SingleConsumer,
282     bool MayBlock,
283     size_t LgSegmentSize = 8,
284     size_t LgAlign = 7,
285     typename WeightFn = DefaultWeightFn<T>,
286     template <typename> class Atom = std::atomic>
287 class DynamicBoundedQueue {
288   using Weight = uint64_t;
289
290   enum WaitingState : uint32_t {
291     NOTWAITING = 0,
292     WAITING = 1,
293   };
294
295   static constexpr bool SPSC = SingleProducer && SingleConsumer;
296   static constexpr size_t Align = 1u << LgAlign;
297
298   static_assert(LgAlign < 16, "LgAlign must be < 16");
299
300   /// Data members
301
302   // Read mostly by producers
303   alignas(Align) Atom<Weight> debit_; // written frequently only by producers
304   Atom<Weight> capacity_; // written rarely by capacity resets
305
306   // Read mostly by consumers
307   alignas(Align) Atom<Weight> credit_; // written frequently only by consumers
308   Atom<Weight> threshold_; // written rarely only by capacity resets
309
310   // Normally written and read rarely by producers and consumers
311   // May be read frequently by producers when capacity is full
312   alignas(Align) Atom<Weight> transfer_;
313   detail::Futex<Atom> waiting_;
314
315   // Underlying unbounded queue
316   UnboundedQueue<
317       T,
318       SingleProducer,
319       SingleConsumer,
320       MayBlock,
321       LgSegmentSize,
322       LgAlign,
323       Atom>
324       q_;
325
326  public:
327   /** constructor */
328   explicit DynamicBoundedQueue(Weight capacity)
329       : debit_(0),
330         capacity_(capacity + threshold(capacity)), // capacity slack
331         credit_(0),
332         threshold_(threshold(capacity)),
333         transfer_(0),
334         waiting_(0) {}
335
336   /** destructor */
337   ~DynamicBoundedQueue() {}
338
339   /// Enqueue functions
340
341   /** enqueue */
342   FOLLY_ALWAYS_INLINE void enqueue(const T& v) {
343     enqueueImpl(v);
344   }
345
346   FOLLY_ALWAYS_INLINE void enqueue(T&& v) {
347     enqueueImpl(std::move(v));
348   }
349
350   /** try_enqueue */
351   FOLLY_ALWAYS_INLINE bool try_enqueue(const T& v) {
352     return tryEnqueueImpl(v);
353   }
354
355   FOLLY_ALWAYS_INLINE bool try_enqueue(T&& v) {
356     return tryEnqueueImpl(std::move(v));
357   }
358
359   /** try_enqueue_until */
360   template <typename Clock, typename Duration>
361   FOLLY_ALWAYS_INLINE bool try_enqueue_until(
362       const T& v,
363       const std::chrono::time_point<Clock, Duration>& deadline) {
364     return tryEnqueueUntilImpl(v, deadline);
365   }
366
367   template <typename Clock, typename Duration>
368   FOLLY_ALWAYS_INLINE bool try_enqueue_until(
369       T&& v,
370       const std::chrono::time_point<Clock, Duration>& deadline) {
371     return tryEnqueueUntilImpl(std::move(v), deadline);
372   }
373
374   /** try_enqueue_for */
375   template <typename Rep, typename Period>
376   FOLLY_ALWAYS_INLINE bool try_enqueue_for(
377       const T& v,
378       const std::chrono::duration<Rep, Period>& duration) {
379     return tryEnqueueForImpl(v, duration);
380   }
381
382   template <typename Rep, typename Period>
383   FOLLY_ALWAYS_INLINE bool try_enqueue_for(
384       T&& v,
385       const std::chrono::duration<Rep, Period>& duration) {
386     return tryEnqueueForImpl(std::move(v), duration);
387   }
388
389   /// Dequeue functions
390
391   /** dequeue */
392   FOLLY_ALWAYS_INLINE void dequeue(T& elem) {
393     q_.dequeue(elem);
394     addCredit(WeightFn()(elem));
395   }
396
397   /** try_dequeue */
398   FOLLY_ALWAYS_INLINE bool try_dequeue(T& elem) {
399     if (q_.try_dequeue(elem)) {
400       addCredit(WeightFn()(elem));
401       return true;
402     }
403     return false;
404   }
405
406   /** try_dequeue_until */
407   template <typename Clock, typename Duration>
408   FOLLY_ALWAYS_INLINE bool try_dequeue_until(
409       T& elem,
410       const std::chrono::time_point<Clock, Duration>& deadline) {
411     if (q_.try_dequeue_until(elem, deadline)) {
412       addCredit(WeightFn()(elem));
413       return true;
414     }
415     return false;
416   }
417
418   /** try_dequeue_for */
419   template <typename Rep, typename Period>
420   FOLLY_ALWAYS_INLINE bool try_dequeue_for(
421       T& elem,
422       const std::chrono::duration<Rep, Period>& duration) {
423     if (q_.try_dequeue_for(elem, duration)) {
424       addCredit(WeightFn()(elem));
425       return true;
426     }
427     return false;
428   }
429
430   /// Secondary functions
431
432   /** reset_capacity */
433   void reset_capacity(Weight capacity) noexcept {
434     Weight thresh = threshold(capacity);
435     capacity_.store(capacity + thresh, std::memory_order_release);
436     threshold_.store(thresh, std::memory_order_release);
437   }
438
439   /** weight */
440   Weight weight() const noexcept {
441     auto d = getDebit();
442     auto c = getCredit();
443     auto t = getTransfer();
444     return d > (c + t) ? d - (c + t) : 0;
445   }
446
447   /** size */
448   size_t size() const noexcept {
449     return q_.size();
450   }
451
452   /** empty */
453   bool empty() const noexcept {
454     return q_.empty();
455   }
456
457  private:
458   /// Private functions ///
459
460   // Calculation of threshold to move credits in bulk from consumers
461   // to producers
462   constexpr Weight threshold(Weight capacity) const noexcept {
463     return capacity / 10;
464   }
465
466   // Functions called frequently by producers
467
468   template <typename Arg>
469   FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& v) {
470     tryEnqueueUntilImpl(
471         std::forward<Arg>(v), std::chrono::steady_clock::time_point::max());
472   }
473
474   template <typename Arg>
475   FOLLY_ALWAYS_INLINE bool tryEnqueueImpl(Arg&& v) {
476     return tryEnqueueUntilImpl(
477         std::forward<Arg>(v), std::chrono::steady_clock::time_point::min());
478   }
479
480   template <typename Clock, typename Duration, typename Arg>
481   FOLLY_ALWAYS_INLINE bool tryEnqueueUntilImpl(
482       Arg&& v,
483       const std::chrono::time_point<Clock, Duration>& deadline) {
484     Weight weight = WeightFn()(std::forward<Arg>(v));
485     if (LIKELY(tryAddDebit(weight))) {
486       q_.enqueue(std::forward<Arg>(v));
487       return true;
488     }
489     return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline);
490   }
491
492   template <typename Rep, typename Period, typename Arg>
493   FOLLY_ALWAYS_INLINE bool tryEnqueueForImpl(
494       Arg&& v,
495       const std::chrono::duration<Rep, Period>& duration) {
496     if (LIKELY(tryEnqueueImpl(std::forward<Arg>(v)))) {
497       return true;
498     }
499     auto deadline = std::chrono::steady_clock::now() + duration;
500     return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline);
501   }
502
503   FOLLY_ALWAYS_INLINE bool tryAddDebit(Weight weight) noexcept {
504     Weight capacity = getCapacity();
505     Weight before = fetchAddDebit(weight);
506     if (LIKELY(before + weight <= capacity)) {
507       return true;
508     } else {
509       subDebit(weight);
510       return false;
511     }
512   }
513
514   FOLLY_ALWAYS_INLINE Weight getCapacity() const noexcept {
515     return capacity_.load(std::memory_order_acquire);
516   }
517
518   FOLLY_ALWAYS_INLINE Weight fetchAddDebit(Weight weight) noexcept {
519     Weight before;
520     if (SingleProducer) {
521       before = getDebit();
522       debit_.store(before + weight, std::memory_order_relaxed);
523     } else {
524       before = debit_.fetch_add(weight, std::memory_order_acq_rel);
525     }
526     return before;
527   }
528
529   FOLLY_ALWAYS_INLINE Weight getDebit() const noexcept {
530     return debit_.load(std::memory_order_acquire);
531   }
532
533   // Functions called frequently by consumers
534
535   FOLLY_ALWAYS_INLINE void addCredit(Weight weight) noexcept {
536     Weight before = fetchAddCredit(weight);
537     Weight thresh = getThreshold();
538     if (before + weight >= thresh && before < thresh) {
539       transferCredit();
540     }
541   }
542
543   FOLLY_ALWAYS_INLINE Weight fetchAddCredit(Weight weight) noexcept {
544     Weight before;
545     if (SingleConsumer) {
546       before = getCredit();
547       credit_.store(before + weight, std::memory_order_relaxed);
548     } else {
549       before = credit_.fetch_add(weight, std::memory_order_acq_rel);
550     }
551     return before;
552   }
553
554   FOLLY_ALWAYS_INLINE Weight getCredit() const noexcept {
555     return credit_.load(std::memory_order_acquire);
556   }
557
558   FOLLY_ALWAYS_INLINE Weight getThreshold() const noexcept {
559     return threshold_.load(std::memory_order_acquire);
560   }
561
562   /** Functions called infrequently by producers */
563
564   void subDebit(Weight weight) noexcept {
565     Weight before;
566     if (SingleProducer) {
567       before = getDebit();
568       debit_.store(before - weight, std::memory_order_relaxed);
569     } else {
570       before = debit_.fetch_sub(weight, std::memory_order_acq_rel);
571     }
572     DCHECK_GE(before, weight);
573   }
574
575   template <typename Clock, typename Duration, typename Arg>
576   bool tryEnqueueUntilSlow(
577       Arg&& v,
578       const std::chrono::time_point<Clock, Duration>& deadline) {
579     Weight weight = WeightFn()(std::forward<Arg>(v));
580     if (canEnqueue(deadline, weight)) {
581       q_.enqueue(std::forward<Arg>(v));
582       return true;
583     } else {
584       return false;
585     }
586   }
587
588   template <typename Clock, typename Duration>
589   bool canEnqueue(
590       const std::chrono::time_point<Clock, Duration>& deadline,
591       Weight weight) noexcept {
592     Weight capacity = getCapacity();
593     while (true) {
594       tryReduceDebit();
595       Weight debit = getDebit();
596       if ((debit + weight <= capacity) && tryAddDebit(weight)) {
597         return true;
598       }
599       if (Clock::now() >= deadline) {
600         return false;
601       }
602       if (MayBlock) {
603         if (canBlock(weight, capacity)) {
604           waiting_.futexWaitUntil(WAITING, deadline);
605         }
606       } else {
607         asm_volatile_pause();
608       }
609     }
610   }
611
612   bool canBlock(Weight weight, Weight capacity) noexcept {
613     waiting_.store(WAITING, std::memory_order_relaxed);
614     std::atomic_thread_fence(std::memory_order_seq_cst);
615     tryReduceDebit();
616     Weight debit = getDebit();
617     return debit + weight > capacity;
618   }
619
620   bool tryReduceDebit() noexcept {
621     Weight w = takeTransfer();
622     if (w > 0) {
623       subDebit(w);
624     }
625     return w > 0;
626   }
627
628   Weight takeTransfer() noexcept {
629     Weight w = getTransfer();
630     if (w > 0) {
631       w = transfer_.exchange(0, std::memory_order_acq_rel);
632     }
633     return w;
634   }
635
636   Weight getTransfer() const noexcept {
637     return transfer_.load(std::memory_order_acquire);
638   }
639
640   /** Functions called infrequently by consumers */
641
642   void transferCredit() noexcept {
643     Weight credit = takeCredit();
644     transfer_.fetch_add(credit, std::memory_order_acq_rel);
645     if (MayBlock) {
646       std::atomic_thread_fence(std::memory_order_seq_cst);
647       waiting_.store(NOTWAITING, std::memory_order_relaxed);
648       waiting_.futexWake();
649     }
650   }
651
652   Weight takeCredit() noexcept {
653     Weight credit;
654     if (SingleConsumer) {
655       credit = credit_.load(std::memory_order_relaxed);
656       credit_.store(0, std::memory_order_relaxed);
657     } else {
658       credit = credit_.exchange(0, std::memory_order_acq_rel);
659     }
660     return credit;
661   }
662
663 }; // DynamicBoundedQueue
664
665 /// Aliases
666
667 /** DSPSCQueue */
668 template <
669     typename T,
670     bool MayBlock,
671     size_t LgSegmentSize = 8,
672     size_t LgAlign = 7,
673     typename WeightFn = DefaultWeightFn<T>,
674     template <typename> class Atom = std::atomic>
675 using DSPSCQueue = DynamicBoundedQueue<
676     T,
677     true,
678     true,
679     MayBlock,
680     LgSegmentSize,
681     LgAlign,
682     WeightFn,
683     Atom>;
684
685 /** DMPSCQueue */
686 template <
687     typename T,
688     bool MayBlock,
689     size_t LgSegmentSize = 8,
690     size_t LgAlign = 7,
691     typename WeightFn = DefaultWeightFn<T>,
692     template <typename> class Atom = std::atomic>
693 using DMPSCQueue = DynamicBoundedQueue<
694     T,
695     false,
696     true,
697     MayBlock,
698     LgSegmentSize,
699     LgAlign,
700     WeightFn,
701     Atom>;
702
703 /** DSPMCQueue */
704 template <
705     typename T,
706     bool MayBlock,
707     size_t LgSegmentSize = 8,
708     size_t LgAlign = 7,
709     typename WeightFn = DefaultWeightFn<T>,
710     template <typename> class Atom = std::atomic>
711 using DSPMCQueue = DynamicBoundedQueue<
712     T,
713     true,
714     false,
715     MayBlock,
716     LgSegmentSize,
717     LgAlign,
718     WeightFn,
719     Atom>;
720
721 /** DMPMCQueue */
722 template <
723     typename T,
724     bool MayBlock,
725     size_t LgSegmentSize = 8,
726     size_t LgAlign = 7,
727     typename WeightFn = DefaultWeightFn<T>,
728     template <typename> class Atom = std::atomic>
729 using DMPMCQueue = DynamicBoundedQueue<
730     T,
731     false,
732     false,
733     MayBlock,
734     LgSegmentSize,
735     LgAlign,
736     WeightFn,
737     Atom>;
738
739 } // namespace folly