/// - SingleConsumer: true if there can be only one consumer at a
/// time.
/// - MayBlock: true if consumers may block, false if they only
-/// spins. A performance tuning parameter.
+/// spin. A performance tuning parameter.
/// - LgSegmentSize (default 8): Log base 2 of number of elements per
/// segment. A performance tuning parameter. See below.
/// - LgAlign (default 7): Log base 2 of alignment directive; can be
/// Extracts an element from the front of the queue. Waits
/// until an element is available if needed.
/// bool try_dequeue(T&);
-/// Tries to extracts an element from the front of the queue
+/// Tries to extract an element from the front of the queue
/// if available. Returns true if successful, false otherwise.
/// bool try_dequeue_until(T&, time_point& deadline);
-/// Tries to extracts an element from the front of the queue
+/// Tries to extract an element from the front of the queue
/// if available until the specified deadline. Returns true
/// if successful, false otherwise.
/// bool try_dequeue_for(T&, duration&);
-/// Tries to extracts an element from the front of the queue
-/// if available for for the specified duration. Returns true
-/// if successful, false otherwise.
+/// Tries to extract an element from the front of the queue if
+/// available for until the expiration of the specified
+/// duration. Returns true if successful, false otherwise.
///
/// Secondary functions:
/// size_t size();
/// exactly once.
/// - Each entry is composed of a futex and a single element.
/// - The queue contains two 64-bit ticket variables. The producer
-/// ticket counts the number of producer tickets isued so far, and
+/// ticket counts the number of producer tickets issued so far, and
/// the same for the consumer ticket. Each ticket number corresponds
/// to a specific entry in a specific segment.
/// - The queue maintains two pointers, head and tail. Head points to
/// one or two more segment than fits its contents.
/// - Removed segments are not reclaimed until there are no threads,
/// producers or consumers, have references to them or their
-/// predessors. That is, a lagging thread may delay the reclamation
+/// predecessors. That is, a lagging thread may delay the reclamation
/// of a chain of removed segments.
/// - The template parameter LgAlign can be used to reduce memory usage
/// at the cost of increased chance of false sharing.
///
/// Performance considerations:
/// - All operations take constant time, excluding the costs of
-/// allocation, reclamation, interence from other threads, and
+/// allocation, reclamation, interference from other threads, and
/// waiting for actions by other threads.
/// - In general, using the single producer and or single consumer
-/// variants yields better performance than the MP and MC
+/// variants yield better performance than the MP and MC
/// alternatives.
/// - SPSC without blocking is the fastest configuration. It doesn't
/// include any read-modify-write atomic operations, full fences, or
/// - MC adds a fetch_add or compare_exchange to the critical path of
/// each consumer operation.
/// - The possibility of consumers blocking, even if they never do,
-/// adds a compare_exchange to the crtical path of each producer
+/// adds a compare_exchange to the critical path of each producer
/// operation.
/// - MPMC, SPMC, MPSC require the use of a deferred reclamation
/// mechanism to guarantee that segments removed from the linked
/// - Another consideration is that the queue is guaranteed to have
/// enough space for a number of consumers equal to 2^LgSegmentSize
/// for local blocking. Excess waiting consumers spin.
-/// - It is recommended to measure perforamnce with different variants
+/// - It is recommended to measure performance with different variants
/// when applicable, e.g., UMPMC vs UMPSC. Depending on the use
/// case, sometimes the variant with the higher sequential overhead
/// may yield better results due to, for example, more favorable
-/// producer-consumer balance or favorable timining for avoiding
+/// producer-consumer balance or favorable timing for avoiding
/// costly blocking.
template <
static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32");
static_assert(LgAlign < 16, "LgAlign must be < 16");
- FOLLY_ALIGNED(Align)
- Atom<Segment*> head_;
- Atom<Ticket> consumerTicket_;
- FOLLY_ALIGNED(Align)
- Atom<Segment*> tail_;
- Atom<Ticket> producerTicket_;
+ struct Consumer {
+ Atom<Segment*> head;
+ Atom<Ticket> ticket;
+ };
+ struct Producer {
+ Atom<Segment*> tail;
+ Atom<Ticket> ticket;
+ };
+
+ alignas(Align) Consumer c_;
+ alignas(Align) Producer p_;
public:
/** constructor */
// Using hazptr_holder instead of hazptr_local because it is
// possible that the T ctor happens to use hazard pointers.
folly::hazptr::hazptr_holder hptr;
- Segment* s = hptr.get_protected(tail_);
+ Segment* s = hptr.get_protected(p_.tail);
enqueueCommon(s, std::forward<Arg>(arg));
}
}
// possible to call the T dtor and it may happen to use hazard
// pointers.
folly::hazptr::hazptr_holder hptr;
- Segment* s = hptr.get_protected(head_);
+ Segment* s = hptr.get_protected(c_.head);
dequeueCommon(s, item);
}
}
// Using hazptr_holder instead of hazptr_local because it is
// possible to call ~T() and it may happen to use hazard pointers.
folly::hazptr::hazptr_holder hptr;
- Segment* s = hptr.get_protected(head_);
- return ryDequeueUntilMC(s, item, deadline);
+ Segment* s = hptr.get_protected(c_.head);
+ return tryDequeueUntilMC(s, item, deadline);
}
}
- /** ryDequeueUntilSC */
+ /** tryDequeueUntilSC */
template <typename Clock, typename Duration>
FOLLY_ALWAYS_INLINE bool tryDequeueUntilSC(
Segment* s,
DCHECK_LT(t, (s->minTicket() + SegmentSize));
size_t idx = index(t);
Entry& e = s->entry(idx);
- if (!e.tryWaitUntil(deadline)) {
+ if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
return false;
}
setConsumerTicket(t + 1);
/** tryDequeueUntilMC */
template <typename Clock, typename Duration>
- FOLLY_ALWAYS_INLINE bool ryDequeueUntilMC(
+ FOLLY_ALWAYS_INLINE bool tryDequeueUntilMC(
Segment* s,
T& item,
const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
}
size_t idx = index(t);
Entry& e = s->entry(idx);
- if (!e.tryWaitUntil(deadline)) {
+ if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
return false;
}
- if (!consumerTicket_.compare_exchange_weak(
+ if (!c_.ticket.compare_exchange_weak(
t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
continue;
}
}
}
+ /** tryDequeueWaitElem */
+ template <typename Clock, typename Duration>
+ FOLLY_ALWAYS_INLINE bool tryDequeueWaitElem(
+ Entry& e,
+ Ticket t,
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ while (true) {
+ if (LIKELY(e.tryWaitUntil(deadline))) {
+ return true;
+ }
+ if (t >= producerTicket()) {
+ return false;
+ }
+ asm_volatile_pause();
+ }
+ }
+
/** findSegment */
FOLLY_ALWAYS_INLINE
Segment* findSegment(Segment* s, const Ticket t) const noexcept {
auto deadline = std::chrono::steady_clock::time_point::max();
Segment* next = tryGetNextSegmentUntil(s, deadline);
DCHECK(next != nullptr);
+ while (head() != s) {
+ // Wait for head to advance to the current segment first before
+ // advancing head to the next segment. Otherwise, a lagging
+ // consumer responsible for advancing head from an earlier
+ // segment may incorrectly set head back.
+ asm_volatile_pause();
+ }
setHead(next);
reclaimSegment(s);
}
}
FOLLY_ALWAYS_INLINE Segment* head() const noexcept {
- return head_.load(std::memory_order_acquire);
+ return c_.head.load(std::memory_order_acquire);
}
FOLLY_ALWAYS_INLINE Segment* tail() const noexcept {
- return tail_.load(std::memory_order_acquire);
+ return p_.tail.load(std::memory_order_acquire);
}
FOLLY_ALWAYS_INLINE Ticket producerTicket() const noexcept {
- return producerTicket_.load(std::memory_order_acquire);
+ return p_.ticket.load(std::memory_order_acquire);
}
FOLLY_ALWAYS_INLINE Ticket consumerTicket() const noexcept {
- return consumerTicket_.load(std::memory_order_acquire);
+ return c_.ticket.load(std::memory_order_acquire);
}
void setHead(Segment* s) noexcept {
- head_.store(s, std::memory_order_release);
+ c_.head.store(s, std::memory_order_release);
}
void setTail(Segment* s) noexcept {
- tail_.store(s, std::memory_order_release);
+ p_.tail.store(s, std::memory_order_release);
}
FOLLY_ALWAYS_INLINE void setProducerTicket(Ticket t) noexcept {
- producerTicket_.store(t, std::memory_order_release);
+ p_.ticket.store(t, std::memory_order_release);
}
FOLLY_ALWAYS_INLINE void setConsumerTicket(Ticket t) noexcept {
- consumerTicket_.store(t, std::memory_order_release);
+ c_.ticket.store(t, std::memory_order_release);
}
FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept {
setConsumerTicket(oldval + 1);
return oldval;
} else { // MC
- return consumerTicket_.fetch_add(1, std::memory_order_acq_rel);
+ return c_.ticket.fetch_add(1, std::memory_order_acq_rel);
}
}
setProducerTicket(oldval + 1);
return oldval;
} else { // MP
- return producerTicket_.fetch_add(1, std::memory_order_acq_rel);
+ return p_.ticket.fetch_add(1, std::memory_order_acq_rel);
}
}
Atom<Segment*> next_;
const Ticket min_;
bool marked_; // used for iterative deletion
- FOLLY_ALIGNED(Align)
- Entry b_[SegmentSize];
+ alignas(Align) Entry b_[SegmentSize];
public:
explicit Segment(const Ticket t)