UnboundedQueue: Use hazptr_obj_batch
[folly.git] / folly / concurrency / UnboundedQueue.h
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <atomic>
20 #include <chrono>
21 #include <memory>
22
23 #include <glog/logging.h>
24
25 #include <folly/concurrency/CacheLocality.h>
26 #include <folly/experimental/hazptr/hazptr.h>
27 #include <folly/synchronization/SaturatingSemaphore.h>
28
29 namespace folly {
30
31 /// UnboundedQueue supports a variety of options for unbounded
32 /// dynamically expanding an shrinking queues, including variations of:
33 /// - Single vs. multiple producers
34 /// - Single vs. multiple consumers
35 /// - Blocking vs. spin-waiting
36 /// - Non-waiting, timed, and waiting consumer operations.
37 /// Producer operations never wait or fail (unless out-of-memory).
38 ///
39 /// Template parameters:
40 /// - T: element type
41 /// - SingleProducer: true if there can be only one producer at a
42 ///   time.
43 /// - SingleConsumer: true if there can be only one consumer at a
44 ///   time.
45 /// - MayBlock: true if consumers may block, false if they only
46 ///   spin. A performance tuning parameter.
47 /// - LgSegmentSize (default 8): Log base 2 of number of elements per
48 ///   segment. A performance tuning parameter. See below.
49 /// - LgAlign (default 7): Log base 2 of alignment directive; can be
50 ///   used to balance scalability (avoidance of false sharing) with
51 ///   memory efficiency.
52 ///
53 /// When to use UnboundedQueue:
54 /// - If a small bound may lead to deadlock or performance degradation
55 ///   under bursty patterns.
56 /// - If there is no risk of the queue growing too much.
57 ///
58 /// When not to use UnboundedQueue:
59 /// - If there is risk of the queue growing too much and a large bound
60 ///   is acceptable, then use DynamicBoundedQueue.
61 /// - If the queue must not allocate on enqueue or it must have a
62 ///   small bound, then use fixed-size MPMCQueue or (if non-blocking
63 ///   SPSC) ProducerConsumerQueue.
64 ///
65 /// Template Aliases:
66 ///   USPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
67 ///   UMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
68 ///   USPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
69 ///   UMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
70 ///
71 /// Functions:
72 ///   Producer operations never wait or fail (unless OOM)
73 ///     void enqueue(const T&);
74 ///     void enqueue(T&&);
75 ///         Adds an element to the end of the queue.
76 ///
77 ///   Consumer operations:
78 ///     void dequeue(T&);
79 ///         Extracts an element from the front of the queue. Waits
80 ///         until an element is available if needed.
81 ///     bool try_dequeue(T&);
82 ///         Tries to extract an element from the front of the queue
83 ///         if available. Returns true if successful, false otherwise.
84 ///     bool try_dequeue_until(T&, time_point& deadline);
85 ///         Tries to extract an element from the front of the queue
86 ///         if available until the specified deadline.  Returns true
87 ///         if successful, false otherwise.
88 ///     bool try_dequeue_for(T&, duration&);
89 ///         Tries to extract an element from the front of the queue if
90 ///         available for until the expiration of the specified
91 ///         duration.  Returns true if successful, false otherwise.
92 ///
93 ///   Secondary functions:
94 ///     size_t size();
95 ///         Returns an estimate of the size of the queue.
96 ///     bool empty();
97 ///         Returns true only if the queue was empty during the call.
98 ///     Note: size() and empty() are guaranteed to be accurate only if
99 ///     the queue is not changed concurrently.
100 ///
101 /// Usage examples:
102 /// @code
103 ///   /* UMPSC, doesn't block, 1024 int elements per segment */
104 ///   UMPSCQueue<int, false, 10> q;
105 ///   q.enqueue(1);
106 ///   q.enqueue(2);
107 ///   q.enqueue(3);
108 ///   ASSERT_FALSE(q.empty());
109 ///   ASSERT_EQ(q.size(), 3);
110 ///   int v;
111 ///   q.dequeue(v);
112 ///   ASSERT_EQ(v, 1);
113 ///   ASSERT_TRUE(try_dequeue(v));
114 ///   ASSERT_EQ(v, 2);
115 ///   ASSERT_TRUE(try_dequeue_until(v, now() + seconds(1)));
116 ///   ASSERT_EQ(v, 3);
117 ///   ASSERT_TRUE(q.empty());
118 ///   ASSERT_EQ(q.size(), 0);
119 ///   ASSERT_FALSE(try_dequeue(v));
120 ///   ASSERT_FALSE(try_dequeue_for(v, microseconds(100)));
121 /// @endcode
122 ///
123 /// Design:
124 /// - The queue is composed of one or more segments. Each segment has
125 ///   a fixed size of 2^LgSegmentSize entries. Each segment is used
126 ///   exactly once.
127 /// - Each entry is composed of a futex and a single element.
128 /// - The queue contains two 64-bit ticket variables. The producer
129 ///   ticket counts the number of producer tickets issued so far, and
130 ///   the same for the consumer ticket. Each ticket number corresponds
131 ///   to a specific entry in a specific segment.
132 /// - The queue maintains two pointers, head and tail. Head points to
133 ///   the segment that corresponds to the current consumer
134 ///   ticket. Similarly, tail pointer points to the segment that
135 ///   corresponds to the producer ticket.
136 /// - Segments are organized as a singly linked list.
137 /// - The producer with the first ticket in the current producer
138 ///   segment is solely responsible for allocating and linking the
139 ///   next segment.
140 /// - The producer with the last ticket in the current producer
141 ///   segment is solely responsible for advancing the tail pointer to
142 ///   the next segment.
143 /// - Similarly, the consumer with the last ticket in the current
144 ///   consumer segment is solely responsible for advancing the head
145 ///   pointer to the next segment. It must ensure that head never
146 ///   overtakes tail.
147 ///
148 /// Memory Usage:
149 /// - An empty queue contains one segment. A nonempty queue contains
150 ///   one or two more segment than fits its contents.
151 /// - Removed segments are not reclaimed until there are no threads,
152 ///   producers or consumers, have references to them or their
153 ///   predecessors. That is, a lagging thread may delay the reclamation
154 ///   of a chain of removed segments.
155 /// - The template parameter LgAlign can be used to reduce memory usage
156 ///   at the cost of increased chance of false sharing.
157 ///
158 /// Performance considerations:
159 /// - All operations take constant time, excluding the costs of
160 ///   allocation, reclamation, interference from other threads, and
161 ///   waiting for actions by other threads.
162 /// - In general, using the single producer and or single consumer
163 ///   variants yield better performance than the MP and MC
164 ///   alternatives.
165 /// - SPSC without blocking is the fastest configuration. It doesn't
166 ///   include any read-modify-write atomic operations, full fences, or
167 ///   system calls in the critical path.
168 /// - MP adds a fetch_add to the critical path of each producer operation.
169 /// - MC adds a fetch_add or compare_exchange to the critical path of
170 ///   each consumer operation.
171 /// - The possibility of consumers blocking, even if they never do,
172 ///   adds a compare_exchange to the critical path of each producer
173 ///   operation.
174 /// - MPMC, SPMC, MPSC require the use of a deferred reclamation
175 ///   mechanism to guarantee that segments removed from the linked
176 ///   list, i.e., unreachable from the head pointer, are reclaimed
177 ///   only after they are no longer needed by any lagging producers or
178 ///   consumers.
179 /// - The overheads of segment allocation and reclamation are intended
180 ///   to be mostly out of the critical path of the queue's throughput.
181 /// - If the template parameter LgSegmentSize is changed, it should be
182 ///   set adequately high to keep the amortized cost of allocation and
183 ///   reclamation low.
184 /// - Another consideration is that the queue is guaranteed to have
185 ///   enough space for a number of consumers equal to 2^LgSegmentSize
186 ///   for local blocking. Excess waiting consumers spin.
187 /// - It is recommended to measure performance with different variants
188 ///   when applicable, e.g., UMPMC vs UMPSC. Depending on the use
189 ///   case, sometimes the variant with the higher sequential overhead
190 ///   may yield better results due to, for example, more favorable
191 ///   producer-consumer balance or favorable timing for avoiding
192 ///   costly blocking.
193
194 template <
195     typename T,
196     bool SingleProducer,
197     bool SingleConsumer,
198     bool MayBlock,
199     size_t LgSegmentSize = 8,
200     size_t LgAlign = 7,
201     template <typename> class Atom = std::atomic>
202 class UnboundedQueue {
203   using Ticket = uint64_t;
204   class Entry;
205   class Segment;
206
207   static constexpr bool SPSC = SingleProducer && SingleConsumer;
208   static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27;
209   static constexpr size_t SegmentSize = 1u << LgSegmentSize;
210   static constexpr size_t Align = 1u << LgAlign;
211
212   static_assert(
213       std::is_nothrow_destructible<T>::value,
214       "T must be nothrow_destructible");
215   static_assert((Stride & 1) == 1, "Stride must be odd");
216   static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32");
217   static_assert(LgAlign < 16, "LgAlign must be < 16");
218
219   struct Consumer {
220     Atom<Segment*> head;
221     Atom<Ticket> ticket;
222     folly::hazptr::hazptr_obj_batch batch;
223   };
224   struct Producer {
225     Atom<Segment*> tail;
226     Atom<Ticket> ticket;
227   };
228
229   alignas(Align) Consumer c_;
230   alignas(Align) Producer p_;
231
232  public:
233   /** constructor */
234   UnboundedQueue() {
235     setProducerTicket(0);
236     setConsumerTicket(0);
237     Segment* s = new Segment(0);
238     setTail(s);
239     setHead(s);
240   }
241
242   /** destructor */
243   ~UnboundedQueue() {
244     Segment* next;
245     for (Segment* s = head(); s; s = next) {
246       next = s->nextSegment();
247       reclaimSegment(s);
248     }
249   }
250
251   /** enqueue */
252   FOLLY_ALWAYS_INLINE void enqueue(const T& arg) {
253     enqueueImpl(arg);
254   }
255
256   FOLLY_ALWAYS_INLINE void enqueue(T&& arg) {
257     enqueueImpl(std::move(arg));
258   }
259
260   /** dequeue */
261   FOLLY_ALWAYS_INLINE void dequeue(T& item) noexcept {
262     dequeueImpl(item);
263   }
264
265   /** try_dequeue */
266   FOLLY_ALWAYS_INLINE bool try_dequeue(T& item) noexcept {
267     return tryDequeueUntil(item, std::chrono::steady_clock::time_point::min());
268   }
269
270   /** try_dequeue_until */
271   template <typename Clock, typename Duration>
272   FOLLY_ALWAYS_INLINE bool try_dequeue_until(
273       T& item,
274       const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
275     return tryDequeueUntil(item, deadline);
276   }
277
278   /** try_dequeue_for */
279   template <typename Rep, typename Period>
280   FOLLY_ALWAYS_INLINE bool try_dequeue_for(
281       T& item,
282       const std::chrono::duration<Rep, Period>& duration) noexcept {
283     if (LIKELY(try_dequeue(item))) {
284       return true;
285     }
286     return tryDequeueUntil(item, std::chrono::steady_clock::now() + duration);
287   }
288
289   /** size */
290   size_t size() const noexcept {
291     auto p = producerTicket();
292     auto c = consumerTicket();
293     return p > c ? p - c : 0;
294   }
295
296   /** empty */
297   bool empty() const noexcept {
298     auto c = consumerTicket();
299     auto p = producerTicket();
300     return p <= c;
301   }
302
303  private:
304   /** enqueueImpl */
305   template <typename Arg>
306   FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& arg) {
307     if (SPSC) {
308       Segment* s = tail();
309       enqueueCommon(s, std::forward<Arg>(arg));
310     } else {
311       // Using hazptr_holder instead of hazptr_local because it is
312       // possible that the T ctor happens to use hazard pointers.
313       folly::hazptr::hazptr_holder hptr;
314       Segment* s = hptr.get_protected(p_.tail);
315       enqueueCommon(s, std::forward<Arg>(arg));
316     }
317   }
318
319   /** enqueueCommon */
320   template <typename Arg>
321   FOLLY_ALWAYS_INLINE void enqueueCommon(Segment* s, Arg&& arg) {
322     Ticket t = fetchIncrementProducerTicket();
323     if (!SingleProducer) {
324       s = findSegment(s, t);
325     }
326     DCHECK_GE(t, s->minTicket());
327     DCHECK_LT(t, s->minTicket() + SegmentSize);
328     size_t idx = index(t);
329     Entry& e = s->entry(idx);
330     e.putItem(std::forward<Arg>(arg));
331     if (responsibleForAlloc(t)) {
332       allocNextSegment(s, t + SegmentSize);
333     }
334     if (responsibleForAdvance(t)) {
335       advanceTail(s);
336     }
337   }
338
339   /** dequeueImpl */
340   FOLLY_ALWAYS_INLINE void dequeueImpl(T& item) noexcept {
341     if (SPSC) {
342       Segment* s = head();
343       dequeueCommon(s, item);
344     } else {
345       // Using hazptr_holder instead of hazptr_local because it is
346       // possible to call the T dtor and it may happen to use hazard
347       // pointers.
348       folly::hazptr::hazptr_holder hptr;
349       Segment* s = hptr.get_protected(c_.head);
350       dequeueCommon(s, item);
351     }
352   }
353
354   /** dequeueCommon */
355   FOLLY_ALWAYS_INLINE void dequeueCommon(Segment* s, T& item) noexcept {
356     Ticket t = fetchIncrementConsumerTicket();
357     if (!SingleConsumer) {
358       s = findSegment(s, t);
359     }
360     size_t idx = index(t);
361     Entry& e = s->entry(idx);
362     e.takeItem(item);
363     if (responsibleForAdvance(t)) {
364       advanceHead(s);
365     }
366   }
367
368   /** tryDequeueUntil */
369   template <typename Clock, typename Duration>
370   FOLLY_ALWAYS_INLINE bool tryDequeueUntil(
371       T& item,
372       const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
373     if (SingleConsumer) {
374       Segment* s = head();
375       return tryDequeueUntilSC(s, item, deadline);
376     } else {
377       // Using hazptr_holder instead of hazptr_local because it is
378       // possible to call ~T() and it may happen to use hazard pointers.
379       folly::hazptr::hazptr_holder hptr;
380       Segment* s = hptr.get_protected(c_.head);
381       return tryDequeueUntilMC(s, item, deadline);
382     }
383   }
384
385   /** tryDequeueUntilSC */
386   template <typename Clock, typename Duration>
387   FOLLY_ALWAYS_INLINE bool tryDequeueUntilSC(
388       Segment* s,
389       T& item,
390       const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
391     Ticket t = consumerTicket();
392     DCHECK_GE(t, s->minTicket());
393     DCHECK_LT(t, (s->minTicket() + SegmentSize));
394     size_t idx = index(t);
395     Entry& e = s->entry(idx);
396     if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
397       return false;
398     }
399     setConsumerTicket(t + 1);
400     e.takeItem(item);
401     if (responsibleForAdvance(t)) {
402       advanceHead(s);
403     }
404     return true;
405   }
406
407   /** tryDequeueUntilMC */
408   template <typename Clock, typename Duration>
409   FOLLY_ALWAYS_INLINE bool tryDequeueUntilMC(
410       Segment* s,
411       T& item,
412       const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
413     while (true) {
414       Ticket t = consumerTicket();
415       if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
416         s = tryGetNextSegmentUntil(s, deadline);
417         if (s == nullptr) {
418           return false; // timed out
419         }
420         continue;
421       }
422       size_t idx = index(t);
423       Entry& e = s->entry(idx);
424       if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
425         return false;
426       }
427       if (!c_.ticket.compare_exchange_weak(
428               t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
429         continue;
430       }
431       e.takeItem(item);
432       if (responsibleForAdvance(t)) {
433         advanceHead(s);
434       }
435       return true;
436     }
437   }
438
439   /** tryDequeueWaitElem */
440   template <typename Clock, typename Duration>
441   FOLLY_ALWAYS_INLINE bool tryDequeueWaitElem(
442       Entry& e,
443       Ticket t,
444       const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
445     while (true) {
446       if (LIKELY(e.tryWaitUntil(deadline))) {
447         return true;
448       }
449       if (t >= producerTicket()) {
450         return false;
451       }
452       asm_volatile_pause();
453     }
454   }
455
456   /** findSegment */
457   FOLLY_ALWAYS_INLINE
458   Segment* findSegment(Segment* s, const Ticket t) const noexcept {
459     while (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
460       auto deadline = std::chrono::steady_clock::time_point::max();
461       s = tryGetNextSegmentUntil(s, deadline);
462       DCHECK(s != nullptr);
463     }
464     return s;
465   }
466
467   /** tryGetNextSegmentUntil */
468   template <typename Clock, typename Duration>
469   Segment* tryGetNextSegmentUntil(
470       Segment* s,
471       const std::chrono::time_point<Clock, Duration>& deadline) const noexcept {
472     // The following loop will not spin indefinitely (as long as the
473     // number of concurrently waiting consumers does not exceeds
474     // SegmentSize and the OS scheduler does not pause ready threads
475     // indefinitely). Under such conditions, the algorithm guarantees
476     // that the producer reponsible for advancing the tail pointer to
477     // the next segment has already acquired its ticket.
478     while (tail() == s) {
479       if (deadline < Clock::time_point::max() && deadline > Clock::now()) {
480         return nullptr;
481       }
482       asm_volatile_pause();
483     }
484     Segment* next = s->nextSegment();
485     DCHECK(next != nullptr);
486     return next;
487   }
488
489   /** allocNextSegment */
490   void allocNextSegment(Segment* s, const Ticket t) {
491     Segment* next = new Segment(t);
492     if (!SPSC) {
493       next->acquire_ref_safe(); // hazptr
494     }
495     DCHECK(s->nextSegment() == nullptr);
496     s->setNextSegment(next);
497   }
498
499   /** advanceTail */
500   void advanceTail(Segment* s) noexcept {
501     Segment* next = s->nextSegment();
502     if (!SingleProducer) {
503       // The following loop will not spin indefinitely (as long as the
504       // OS scheduler does not pause ready threads indefinitely). The
505       // algorithm guarantees that the producer reponsible for setting
506       // the next pointer has already acquired its ticket.
507       while (next == nullptr) {
508         asm_volatile_pause();
509         next = s->nextSegment();
510       }
511     }
512     DCHECK(next != nullptr);
513     setTail(next);
514   }
515
516   /** advanceHead */
517   void advanceHead(Segment* s) noexcept {
518     auto deadline = std::chrono::steady_clock::time_point::max();
519     Segment* next = tryGetNextSegmentUntil(s, deadline);
520     DCHECK(next != nullptr);
521     while (head() != s) {
522       // Wait for head to advance to the current segment first before
523       // advancing head to the next segment. Otherwise, a lagging
524       // consumer responsible for advancing head from an earlier
525       // segment may incorrectly set head back.
526       asm_volatile_pause();
527     }
528     /* ***IMPORTANT*** prepReclaimSegment() must be called after
529      * confirming that head() is up-to-date and before calling
530      * setHead() to be thread-safe. */
531     /* ***IMPORTANT*** Segment s cannot be retired before the call to
532      * setHead(s). This is why prep_retire_refcounted(), which is
533      * called by prepReclaimSegment() does not retire objects, it
534      * merely adds the object to the batch and returns a private batch
535      * structure of a list of objects that can be retired later, if
536      * there are enough objects for amortizing the cost of updating
537      * the domain structure. */
538     auto res = prepReclaimSegment(s);
539     setHead(next);
540     /* Now it is safe to retire s. */
541     /* ***IMPORTANT*** The destructor of res automatically calls
542      * retire_all(), which retires to the domain any objects moved to
543      * res from batch in the call to prepReclaimSegment(). */
544   }
545
546   /** reclaimSegment */
547   void reclaimSegment(Segment* s) noexcept {
548     if (SPSC) {
549       delete s;
550     } else {
551       s->retire(); // hazptr
552     }
553   }
554
555   /** prepReclaimSegment */
556   folly::hazptr::hazptr_obj_batch prepReclaimSegment(Segment* s) noexcept {
557     if (SPSC) {
558       delete s;
559       /*Return an empty result; nothing more to do for this segment */
560       return folly::hazptr::hazptr_obj_batch();
561     } else {
562       return c_.batch.prep_retire_refcounted(s);
563     }
564   }
565
566   FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept {
567     return (t * Stride) & (SegmentSize - 1);
568   }
569
570   FOLLY_ALWAYS_INLINE bool responsibleForAlloc(Ticket t) const noexcept {
571     return (t & (SegmentSize - 1)) == 0;
572   }
573
574   FOLLY_ALWAYS_INLINE bool responsibleForAdvance(Ticket t) const noexcept {
575     return (t & (SegmentSize - 1)) == (SegmentSize - 1);
576   }
577
578   FOLLY_ALWAYS_INLINE Segment* head() const noexcept {
579     return c_.head.load(std::memory_order_acquire);
580   }
581
582   FOLLY_ALWAYS_INLINE Segment* tail() const noexcept {
583     return p_.tail.load(std::memory_order_acquire);
584   }
585
586   FOLLY_ALWAYS_INLINE Ticket producerTicket() const noexcept {
587     return p_.ticket.load(std::memory_order_acquire);
588   }
589
590   FOLLY_ALWAYS_INLINE Ticket consumerTicket() const noexcept {
591     return c_.ticket.load(std::memory_order_acquire);
592   }
593
594   void setHead(Segment* s) noexcept {
595     c_.head.store(s, std::memory_order_release);
596   }
597
598   void setTail(Segment* s) noexcept {
599     p_.tail.store(s, std::memory_order_release);
600   }
601
602   FOLLY_ALWAYS_INLINE void setProducerTicket(Ticket t) noexcept {
603     p_.ticket.store(t, std::memory_order_release);
604   }
605
606   FOLLY_ALWAYS_INLINE void setConsumerTicket(Ticket t) noexcept {
607     c_.ticket.store(t, std::memory_order_release);
608   }
609
610   FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept {
611     if (SingleConsumer) {
612       Ticket oldval = consumerTicket();
613       setConsumerTicket(oldval + 1);
614       return oldval;
615     } else { // MC
616       return c_.ticket.fetch_add(1, std::memory_order_acq_rel);
617     }
618   }
619
620   FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept {
621     if (SingleProducer) {
622       Ticket oldval = producerTicket();
623       setProducerTicket(oldval + 1);
624       return oldval;
625     } else { // MP
626       return p_.ticket.fetch_add(1, std::memory_order_acq_rel);
627     }
628   }
629
630   /**
631    *  Entry
632    */
633   class Entry {
634     folly::SaturatingSemaphore<MayBlock, Atom> flag_;
635     typename std::aligned_storage<sizeof(T), alignof(T)>::type item_;
636
637    public:
638     template <typename Arg>
639     FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) {
640       new (&item_) T(std::forward<Arg>(arg));
641       flag_.post();
642     }
643
644     FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept {
645       flag_.wait();
646       getItem(item);
647     }
648
649     template <typename Clock, typename Duration>
650     FOLLY_ALWAYS_INLINE bool tryWaitUntil(
651         const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
652       // wait-options from benchmarks on contended queues:
653       auto const opt =
654           flag_.wait_options().spin_max(std::chrono::microseconds(10));
655       return flag_.try_wait_until(deadline, opt);
656     }
657
658    private:
659     FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept {
660       item = std::move(*(itemPtr()));
661       destroyItem();
662     }
663
664     FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
665       return static_cast<T*>(static_cast<void*>(&item_));
666     }
667
668     FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
669       itemPtr()->~T();
670     }
671   }; // Entry
672
673   /**
674    *  Segment
675    */
676   class Segment : public folly::hazptr::hazptr_obj_base_refcounted<Segment> {
677     Atom<Segment*> next_;
678     const Ticket min_;
679     bool marked_; // used for iterative deletion
680     alignas(Align) Entry b_[SegmentSize];
681
682    public:
683     explicit Segment(const Ticket t)
684         : next_(nullptr), min_(t), marked_(false) {}
685
686     ~Segment() {
687       if (!SPSC && !marked_) {
688         Segment* next = nextSegment();
689         while (next) {
690           if (!next->release_ref()) { // hazptr
691             return;
692           }
693           Segment* s = next;
694           next = s->nextSegment();
695           s->marked_ = true;
696           delete s;
697         }
698       }
699     }
700
701     Segment* nextSegment() const noexcept {
702       return next_.load(std::memory_order_acquire);
703     }
704
705     void setNextSegment(Segment* s) noexcept {
706       next_.store(s, std::memory_order_release);
707     }
708
709     FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept {
710       DCHECK_EQ((min_ & (SegmentSize - 1)), 0);
711       return min_;
712     }
713
714     FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept {
715       return b_[index];
716     }
717   }; // Segment
718
719 }; // UnboundedQueue
720
721 /* Aliases */
722
723 template <
724     typename T,
725     bool MayBlock,
726     size_t LgSegmentSize = 8,
727     size_t LgAlign = 7,
728     template <typename> class Atom = std::atomic>
729 using USPSCQueue =
730     UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
731
732 template <
733     typename T,
734     bool MayBlock,
735     size_t LgSegmentSize = 8,
736     size_t LgAlign = 7,
737     template <typename> class Atom = std::atomic>
738 using UMPSCQueue =
739     UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
740
741 template <
742     typename T,
743     bool MayBlock,
744     size_t LgSegmentSize = 8,
745     size_t LgAlign = 7,
746     template <typename> class Atom = std::atomic>
747 using USPMCQueue =
748     UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
749
750 template <
751     typename T,
752     bool MayBlock,
753     size_t LgSegmentSize = 8,
754     size_t LgAlign = 7,
755     template <typename> class Atom = std::atomic>
756 using UMPMCQueue =
757     UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
758
759 } // namespace folly