/// The dynamic version of MPMCQueue allows dynamic expansion of queue
/// capacity, such that a queue may start with a smaller capacity than
-/// specified and expand if needed up to the specified capacity.
-/// Shrinking is not supported at this point.
-///
-/// Users may optionally specify the initial capacity and the
-/// expansion multiplier. Otherwise default values are used.
-///
-/// Operation on the dynamic version have the same semantics as for
-/// the default fixed-size version, except that the total queue
-/// capacity can temporarily exceed the specified capacity when there
-/// are lagging consumers that haven't yet consumed all the elements
-/// in closed arrays. Users should not rely on the capacity of dynamic
-/// queues for synchronization, e.g., they should not expect that a
-/// thread will definitely block on a call to blockingWrite() when the
-/// queue size is known to be equal to its capacity.
-///
-/// Design Overview:
+/// specified and expand only if needed. Users may optionally specify
+/// the initial capacity and the expansion multiplier.
///
/// The design uses a seqlock to enforce mutual exclusion among
/// expansion attempts. Regular operations read up-to-date queue
/// seqlock read-only section (and hence obtained information for
/// older closed arrays).
///
+/// Note that the total queue capacity can temporarily exceed the
+/// specified capacity when there are lagging consumers that haven't
+/// yet consumed all the elements in closed arrays. Users should not
+/// rely on the capacity of dynamic queues for synchronization, e.g.,
+/// they should not expect that a thread will definitely block on a
+/// call to blockingWrite() when the queue size is known to be equal
+/// to its capacity.
+///
+/// Note that some writeIfNotFull() and tryWriteUntil() operations may
+/// fail even if the size of the queue is less than its maximum
+/// capacity and despite the success of expansion, if the operation
+/// happens to acquire a ticket that belongs to a closed array. This
+/// is a transient condition. Typically, one or two ticket values may
+/// be subject to such condition per expansion.
+///
/// The dynamic version is a partial specialization of MPMCQueue with
/// Dynamic == true
template <typename T, template <typename> class Atom>
int stride;
uint64_t state;
uint64_t offset;
- while (true) {
- while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
+ do {
+ if (!trySeqlockReadSection(state, slots, cap, stride)) {
asm_volatile_pause();
+ continue;
}
- maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
- if (LIKELY(slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
- this->turn(ticket - offset, cap)))) {
- // A slot is ready. Fast path. No need to expand.
- break;
- }
- // Slow path
- auto head = this->popTicket_.load(std::memory_order_relaxed);
- auto avail = std::max(head, offset) + cap;
- if (ticket < avail) {
- // May block, but a pop is in progress. No need to expand.
+ if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
+ // There was an expansion after this ticket was issued.
break;
}
- // Try to expand, otherwise this operation may block
- // indefinitely awaiting a consumer to unblock it.
- if (!tryExpand(state, cap)) {
- // Can't expand. Block.
+ if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+ this->turn(ticket - offset, cap))) {
+ // A slot is ready. No need to expand.
break;
+ } else if (
+ this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) {
+ // May block, but a pop is in progress. No need to expand.
+ // Get seqlock read section info again in case an expansion
+ // occurred with an equal or higher ticket.
+ continue;
+ } else {
+ // May block. See if we can expand.
+ if (tryExpand(state, cap)) {
+ // This or another thread started an expansion. Get updated info.
+ continue;
+ } else {
+ // Can't expand.
+ break;
+ }
}
- // Either this or another thread started an expansion Get up-to-date info.
- }
+ } while (true);
this->enqueueWithTicketBase(ticket-offset, slots, cap, stride,
std::forward<Args>(args)...);
}
int stride;
uint64_t state;
uint64_t offset;
- while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
+ while (!trySeqlockReadSection(state, slots, cap, stride)) {
asm_volatile_pause();
}
// If there was an expansion after the corresponding push ticket
private:
enum {
- kSeqlockBits = 8,
- kDefaultMinDynamicCapacity = 16,
- kDefaultExpansionMultiplier = 8,
+ kSeqlockBits = 6,
+ kDefaultMinDynamicCapacity = 10,
+ kDefaultExpansionMultiplier = 10,
};
size_t dmult_;
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
uint64_t state;
- while (true) {
+ do {
ticket = this->pushTicket_.load(std::memory_order_acquire); // A
- if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
+ if (!trySeqlockReadSection(state, slots, cap, stride)) {
asm_volatile_pause();
continue;
}
+
// If there was an expansion with offset greater than this ticket,
// adjust accordingly
uint64_t offset;
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
- if (LIKELY(slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
- this->turn(ticket - offset, cap)))) {
+
+ if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+ this->turn(ticket - offset, cap))) {
// A slot is ready.
- if (UNLIKELY(!this->pushTicket_.compare_exchange_strong(
- ticket, ticket + 1))) {
- continue;
- }
- // Validate that state is the same
- if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
+ if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+ // Adjust ticket
ticket -= offset;
return true;
+ } else {
+ continue;
}
- // Slow path - state changed - get up-to-date info for obtained ticket
- while (true) {
- state = this->dstate_.load(std::memory_order_acquire);
- if (trySeqlockReadSection(state, slots, cap, stride)) {
- break;
- }
- asm_volatile_pause();
+ } else {
+ if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
+ // Try again. Ticket changed.
+ continue;
}
- maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
- ticket -= offset;
- return true;
- }
- // slow path - no ready ticket
- if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
- // Ticket changed. Start over.
- continue;
- }
- auto head = this->popTicket_.load(std::memory_order_acquire);
- auto avail = std::max(head, offset) + cap;
- if (ticket < avail) {
- // a consumer is in the process of making the slot available
- // don't try to expand. Spin if capacity is not
- // exhausted. Otherwise return false.
- if (cap == this->capacity_) {
- return false;
+ // Likely to block.
+ // Try to expand unless the ticket is for a closed array
+ if (offset == getOffset(state)) {
+ if (tryExpand(state, cap)) {
+ // This or another thread started an expansion. Get up-to-date info.
+ continue;
+ }
}
- asm_volatile_pause();
- continue;
- }
- // Likely to block. Try to expand.
- if (tryExpand(state, cap)) {
- // This or another thread started an expansion. Get up-to-date info.
- continue;
+ return false;
}
- // No ready ticket and cannot expand
- return false;
- }
+ } while (true);
}
bool tryObtainPromisedPushTicket(
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
uint64_t state;
- while (true) {
+ do {
ticket = this->pushTicket_.load(std::memory_order_acquire);
- auto head = this->popTicket_.load(std::memory_order_acquire);
- if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
+ auto numPops = this->popTicket_.load(std::memory_order_acquire);
+ if (!trySeqlockReadSection(state, slots, cap, stride)) {
asm_volatile_pause();
continue;
}
+
+ const auto curCap = cap;
// If there was an expansion with offset greater than this ticket,
// adjust accordingly
uint64_t offset;
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
- auto avail = std::max(offset, head) + cap;
- if (UNLIKELY(ticket >= avail)) {
- if (tryExpand(state, cap)) {
- // Space may be available. Start over.
+
+ int64_t n = ticket - numPops;
+
+ if (n >= static_cast<ssize_t>(cap)) {
+ if ((cap == curCap) && tryExpand(state, cap)) {
+ // This or another thread started an expansion. Start over.
continue;
}
// Can't expand.
return false;
}
- if (UNLIKELY((!this->pushTicket_.compare_exchange_strong(
- ticket, ticket + 1)))) {
- continue;
- }
- // Validate that state is the same
- if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
+ if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+ // Adjust ticket
ticket -= offset;
return true;
}
- // Obtained ticket but info is out-of-date - Update info
- while (true) {
- state = this->dstate_.load(std::memory_order_acquire);
- if (trySeqlockReadSection(state, slots, cap, stride)) {
- break;
- }
- asm_volatile_pause();
- }
- maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
- ticket -= offset;
- return true;
- }
+ } while (true);
}
bool tryObtainReadyPopTicket(
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
uint64_t state;
- while (true) {
+ do {
ticket = this->popTicket_.load(std::memory_order_relaxed);
- if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
+ if (!trySeqlockReadSection(state, slots, cap, stride)) {
asm_volatile_pause();
continue;
}
+
// If there was an expansion after the corresponding push ticket
// was issued, adjust accordingly
uint64_t offset;
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
- if (UNLIKELY(!slots[this->idx((ticket - offset), cap, stride)].mayDequeue(
- this->turn(ticket - offset, cap)))) {
- return false;
- }
- if (UNLIKELY(
- !this->popTicket_.compare_exchange_strong(ticket, ticket + 1))) {
- continue;
- }
- // Validate that state is the same
- if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
- ticket -= offset;
- return true;
- }
- // Obtained ticket but info is out-of-date - Update info
- while (true) {
- state = this->dstate_.load(std::memory_order_acquire);
- if (trySeqlockReadSection(state, slots, cap, stride)) {
- break;
+
+ if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue(
+ this->turn(ticket - offset, cap))) {
+ if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+ // Adjust ticket
+ ticket -= offset;
+ return true;
}
- asm_volatile_pause();
+ } else {
+ return false;
}
- maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
- ticket -= offset;
- return true;
- }
+ } while (true);
}
bool tryObtainPromisedPopTicket(
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
uint64_t state;
- while (true) {
+ do {
ticket = this->popTicket_.load(std::memory_order_acquire);
auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
- if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
+ if (!trySeqlockReadSection(state, slots, cap, stride)) {
asm_volatile_pause();
continue;
}
+
uint64_t offset;
// If there was an expansion after the corresponding push
// ticket was issued, adjust accordingly
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
- if (UNLIKELY(ticket >= numPushes)) {
+
+ if (ticket >= numPushes) {
ticket -= offset;
return false;
}
- if (UNLIKELY(
- !this->popTicket_.compare_exchange_strong(ticket, ticket + 1))) {
- continue;
- }
- // Validate that state is the same
- if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
+ if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
ticket -= offset;
return true;
}
- // Obtained ticket but info is out-of-date - Update info
- while (true) {
- state = this->dstate_.load(std::memory_order_acquire);
- if (trySeqlockReadSection(state, slots, cap, stride)) {
- break;
- }
- asm_volatile_pause();
- }
- maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
- ticket -= offset;
- return true;
- }
+ } while (true);
}
/// Enqueues an element with a specific ticket number
uint64_t state;
uint64_t offset;
- while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
- asm_volatile_pause();
+ while (!trySeqlockReadSection(state, slots, cap, stride)) {
}
// If there was an expansion after this ticket was issued, adjust
}
/// Try to expand the queue. Returns true if this expansion was
- /// successful or a concurent expansion is in progresse. Returns
+ /// successful or a concurent expansion is in progress. Returns
/// false if the queue has reached its maximum capacity or
/// allocation has failed.
bool tryExpand(const uint64_t state, const size_t cap) noexcept {
- if (LIKELY(cap == this->capacity_)) {
+ if (cap == this->capacity_) {
return false;
}
- return tryExpandWithSeqlock(state, cap);
- }
-
- bool tryExpandWithSeqlock(const uint64_t state, const size_t cap) noexcept {
// Acquire seqlock
uint64_t oldval = state;
assert((state & 1) == 0);
- if (!this->dstate_.compare_exchange_strong(oldval, state + 1)) {
- // Failed to acquire seqlock. Another thread acaquired it.
- // Go back to the caller and get up-to-date info.
+ if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
+ assert(cap == this->dcapacity_.load());
+ uint64_t ticket =
+ 1 + std::max(this->pushTicket_.load(), this->popTicket_.load());
+ size_t newCapacity = std::min(dmult_ * cap, this->capacity_);
+ Slot* newSlots =
+ new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
+ if (newSlots == nullptr) {
+ // Expansion failed. Restore the seqlock
+ this->dstate_.store(state);
+ return false;
+ }
+ // Successful expansion
+ // calculate the current ticket offset
+ uint64_t offset = getOffset(state);
+ // calculate index in closed array
+ int index = getNumClosed(state);
+ assert((index << 1) < (1 << kSeqlockBits));
+ // fill the info for the closed slots array
+ closed_[index].offset_ = offset;
+ closed_[index].slots_ = this->dslots_.load();
+ closed_[index].capacity_ = cap;
+ closed_[index].stride_ = this->dstride_.load();
+ // update the new slots array info
+ this->dslots_.store(newSlots);
+ this->dcapacity_.store(newCapacity);
+ this->dstride_.store(this->computeStride(newCapacity));
+ // Release the seqlock and record the new ticket offset
+ this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1)));
+ return true;
+ } else { // failed to acquire seqlock
+ // Someone acaquired the seqlock. Go back to the caller and get
+ // up-to-date info.
return true;
}
- // Write critical section
- assert(cap == this->dcapacity_.load());
- auto head = this->popTicket_.load(std::memory_order_acquire);
- auto avail = std::max(head, getOffset(state)) + cap;
- uint64_t newOffset = avail;
- size_t newCapacity = std::min(dmult_ * cap, this->capacity_);
- Slot* newSlots =
- new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
- if (newSlots == nullptr) {
- // Expansion failed. Restore the seqlock
- this->dstate_.store(state);
- return false;
- }
- // Successful expansion
- // calculate the current ticket offset
- uint64_t offset = getOffset(state);
- // calculate index in list of closed slots arrays
- int index = getNumClosed(state);
- assert((index << 1) < (1 << kSeqlockBits));
- // fill the info for the closed slots array
- closed_[index].offset_ = offset;
- closed_[index].slots_ = this->dslots_.load();
- closed_[index].capacity_ = cap;
- closed_[index].stride_ = this->dstride_.load();
- // update the new slots array info
- this->dslots_.store(newSlots);
- this->dcapacity_.store(newCapacity);
- this->dstride_.store(this->computeStride(newCapacity));
- // Release the seqlock and record the new ticket offset
- this->dstate_.store((newOffset << kSeqlockBits) + (2 * (index + 1)));
- return true;
}
/// Seqlock read-only section
uint64_t& state, Slot*& slots, size_t& cap, int& stride
) noexcept {
state = this->dstate_.load(std::memory_order_acquire);
- if (UNLIKELY(state & 1)) {
+ if (state & 1) {
// Locked.
return false;
}
stride = this->dstride_.load(std::memory_order_relaxed);
// End of read-only section. Validate seqlock.
std::atomic_thread_fence(std::memory_order_acquire);
- return LIKELY(state == this->dstate_.load(std::memory_order_relaxed));
+ return (state == this->dstate_.load(std::memory_order_relaxed));
}
/// If there was an expansion after ticket was issued, update local variables
size_t& cap,
int& stride) noexcept {
offset = getOffset(state);
- if (LIKELY(ticket >= offset)) {
+ if (ticket >= offset) {
return false;
}
- updateFromClosed(state, ticket, offset, slots, cap, stride);
- return true;
- }
-
- void updateFromClosed(
- const uint64_t state,
- const uint64_t ticket,
- uint64_t& offset,
- Slot*& slots,
- size_t& cap,
- int& stride) noexcept {
for (int i = getNumClosed(state) - 1; i >= 0; --i) {
offset = closed_[i].offset_;
if (offset <= ticket) {
slots = closed_[i].slots_;
cap = closed_[i].capacity_;
stride = closed_[i].stride_;
- return;
+ return true;
}
}
// A closed array with offset <= ticket should have been found
assert(false);
- return;
+ return false;
}
};