+template<typename T, template<typename> class Atom = std::atomic,
+ bool Dynamic = false>
+class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
+ friend class detail::MPMCPipelineStageImpl<T>;
+ using Slot = detail::SingleElementQueue<T,Atom>;
+ public:
+
+ explicit MPMCQueue(size_t queueCapacity)
+ : detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>>(queueCapacity)
+ {
+ this->stride_ = this->computeStride(queueCapacity);
+ this->slots_ = new Slot[queueCapacity + 2 * this->kSlotPadding];
+ }
+
+ MPMCQueue() noexcept { }
+};
+
+/// The dynamic version of MPMCQueue allows dynamic expansion of queue
+/// capacity, such that a queue may start with a smaller capacity than
+/// specified and expand only if needed. Users may optionally specify
+/// the initial capacity and the expansion multiplier.
+///
+/// The design uses a seqlock to enforce mutual exclusion among
+/// expansion attempts. Regular operations read up-to-date queue
+/// information (slots array, capacity, stride) inside read-only
+/// seqlock sections, which are unimpeded when no expansion is in
+/// progress.
+///
+/// An expansion computes a new capacity, allocates a new slots array,
+/// and updates stride. No information needs to be copied from the
+/// current slots array to the new one. When this happens, new slots
+/// will not have sequence numbers that match ticket numbers. The
+/// expansion needs to compute a ticket offset such that operations
+/// that use new arrays can adjust the calculations of slot indexes
+/// and sequence numbers that take into account that the new slots
+/// start with sequence numbers of zero. The current ticket offset is
+/// packed with the seqlock in an atomic 64-bit integer. The initial
+/// offset is zero.
+///
+/// Lagging write and read operations with tickets lower than the
+/// ticket offset of the current slots array (i.e., the minimum ticket
+/// number that can be served by the current array) must use earlier
+/// closed arrays instead of the current one. Information about closed
+/// slots arrays (array address, capacity, stride, and offset) is
+/// maintained in a logarithmic-sized structure. Each entry in that
+/// structure never need to be changed once set. The number of closed
+/// arrays is half the value of the seqlock (when unlocked).
+///
+/// The acquisition of the seqlock to perform an expansion does not
+/// prevent the issuing of new push and pop tickets concurrently. The
+/// expansion must set the new ticket offset to a value that couldn't
+/// have been issued to an operation that has already gone through a
+/// seqlock read-only section (and hence obtained information for
+/// older closed arrays).
+///
+/// Note that the total queue capacity can temporarily exceed the
+/// specified capacity when there are lagging consumers that haven't
+/// yet consumed all the elements in closed arrays. Users should not
+/// rely on the capacity of dynamic queues for synchronization, e.g.,
+/// they should not expect that a thread will definitely block on a
+/// call to blockingWrite() when the queue size is known to be equal
+/// to its capacity.
+///
+/// Note that some writeIfNotFull() and tryWriteUntil() operations may
+/// fail even if the size of the queue is less than its maximum
+/// capacity and despite the success of expansion, if the operation
+/// happens to acquire a ticket that belongs to a closed array. This
+/// is a transient condition. Typically, one or two ticket values may
+/// be subject to such condition per expansion.
+///
+/// The dynamic version is a partial specialization of MPMCQueue with
+/// Dynamic == true
+template <typename T, template<typename> class Atom>
+class MPMCQueue<T,Atom,true> :
+ public detail::MPMCQueueBase<MPMCQueue<T,Atom,true>> {
+ friend class detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>;
+ using Slot = detail::SingleElementQueue<T,Atom>;
+
+ struct ClosedArray {
+ uint64_t offset_ {0};
+ Slot* slots_ {nullptr};
+ size_t capacity_ {0};
+ int stride_ {0};
+ };
+
+ public:
+
+ explicit MPMCQueue(size_t queueCapacity)
+ : detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>(queueCapacity)
+ {
+ size_t cap = std::min<size_t>(kDefaultMinDynamicCapacity, queueCapacity);
+ initQueue(cap, kDefaultExpansionMultiplier);
+ }
+
+ explicit MPMCQueue(size_t queueCapacity,
+ size_t minCapacity,
+ size_t expansionMultiplier)
+ : detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>(queueCapacity)
+ {
+ minCapacity = std::max<size_t>(1, minCapacity);
+ size_t cap = std::min<size_t>(minCapacity, queueCapacity);
+ expansionMultiplier = std::max<size_t>(2, expansionMultiplier);
+ initQueue(cap, expansionMultiplier);
+ }
+
+ MPMCQueue() noexcept {
+ dmult_ = 0;
+ closed_ = nullptr;
+ }
+
+ MPMCQueue(MPMCQueue<T,Atom,true>&& rhs) noexcept {
+ this->capacity_ = rhs.capacity_;
+ this->slots_ = rhs.slots_;
+ this->stride_ = rhs.stride_;
+ this->dstate_.store(rhs.dstate_.load(std::memory_order_relaxed),
+ std::memory_order_relaxed);
+ this->dcapacity_.store(rhs.dcapacity_.load(std::memory_order_relaxed),
+ std::memory_order_relaxed);
+ this->pushTicket_.store(rhs.pushTicket_.load(std::memory_order_relaxed),
+ std::memory_order_relaxed);
+ this->popTicket_.store(rhs.popTicket_.load(std::memory_order_relaxed),
+ std::memory_order_relaxed);
+ this->pushSpinCutoff_.store(
+ rhs.pushSpinCutoff_.load(std::memory_order_relaxed),
+ std::memory_order_relaxed);
+ this->popSpinCutoff_.store(
+ rhs.popSpinCutoff_.load(std::memory_order_relaxed),
+ std::memory_order_relaxed);
+ dmult_ = rhs.dmult_;
+ closed_ = rhs.closed_;
+
+ rhs.capacity_ = 0;
+ rhs.slots_ = nullptr;
+ rhs.stride_ = 0;
+ rhs.dstate_.store(0, std::memory_order_relaxed);
+ rhs.dcapacity_.store(0, std::memory_order_relaxed);
+ rhs.pushTicket_.store(0, std::memory_order_relaxed);
+ rhs.popTicket_.store(0, std::memory_order_relaxed);
+ rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
+ rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
+ rhs.dmult_ = 0;
+ rhs.closed_ = nullptr;
+ }
+
+ MPMCQueue<T,Atom, true> const& operator= (MPMCQueue<T,Atom, true>&& rhs) {
+ if (this != &rhs) {
+ this->~MPMCQueue();
+ new (this) MPMCQueue(std::move(rhs));
+ }
+ return *this;
+ }
+
+ ~MPMCQueue() {
+ if (closed_ != nullptr) {
+ for (int i = getNumClosed(this->dstate_.load()) - 1; i >= 0; --i) {
+ delete[] closed_[i].slots_;
+ }
+ delete[] closed_;
+ }
+ }
+
+ size_t allocatedCapacity() const noexcept {
+ return this->dcapacity_.load(std::memory_order_relaxed);
+ }
+
+ template <typename ...Args>
+ void blockingWrite(Args&&... args) noexcept {
+ uint64_t ticket = this->pushTicket_++;
+ Slot* slots;
+ size_t cap;
+ int stride;
+ uint64_t state;
+ uint64_t offset;
+ do {
+ if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
+ continue;
+ }
+ if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
+ // There was an expansion after this ticket was issued.
+ break;
+ }
+ if (slots[this->idx((ticket-offset), cap, stride)]
+ .mayEnqueue(this->turn(ticket-offset, cap))) {
+ // A slot is ready. No need to expand.
+ break;
+ } else if (this->popTicket_.load(std::memory_order_relaxed) + cap
+ > ticket) {
+ // May block, but a pop is in progress. No need to expand.
+ // Get seqlock read section info again in case an expansion
+ // occurred with an equal or higher ticket.
+ continue;
+ } else {
+ // May block. See if we can expand.
+ if (tryExpand(state, cap)) {
+ // This or another thread started an expansion. Get updated info.
+ continue;
+ } else {
+ // Can't expand.
+ break;
+ }
+ }
+ } while (true);
+ this->enqueueWithTicketBase(ticket-offset, slots, cap, stride,
+ std::forward<Args>(args)...);
+ }
+
+ void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept {
+ ticket = this->popTicket_++;
+ Slot* slots;
+ size_t cap;
+ int stride;
+ uint64_t state;
+ uint64_t offset;
+ while (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
+ }
+ // If there was an expansion after the corresponding push ticket
+ // was issued, adjust accordingly
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+ this->dequeueWithTicketBase(ticket-offset, slots, cap, stride, elem);
+ }
+
+ private:
+
+ enum {
+ kSeqlockBits = 6,
+ kDefaultMinDynamicCapacity = 10,
+ kDefaultExpansionMultiplier = 10,
+ };
+
+ size_t dmult_;
+
+ // Info about closed slots arrays for use by lagging operations
+ ClosedArray* closed_;
+
+ void initQueue(const size_t cap, const size_t mult) {
+ this->stride_ = this->computeStride(cap);
+ this->slots_ = new Slot[cap + 2 * this->kSlotPadding];
+ this->dstate_.store(0);
+ this->dcapacity_.store(cap);
+ dmult_ = mult;
+ size_t maxClosed = 0;
+ for (size_t expanded = cap;
+ expanded < this->capacity_;
+ expanded *= mult) {
+ ++maxClosed;
+ }
+ closed_ = (maxClosed > 0) ? new ClosedArray[maxClosed] : nullptr;
+ }
+
+ bool tryObtainReadyPushTicket(
+ uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+ ) noexcept {
+ uint64_t state;
+ do {
+ ticket = this->pushTicket_.load(std::memory_order_acquire); // A
+ if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
+ continue;
+ }
+
+ // If there was an expansion with offset greater than this ticket,
+ // adjust accordingly
+ uint64_t offset;
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+
+ if (slots[this->idx((ticket-offset), cap, stride)]
+ .mayEnqueue(this->turn(ticket-offset, cap))) {
+ // A slot is ready.
+ if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+ // Adjust ticket
+ ticket -= offset;
+ return true;
+ } else {
+ continue;
+ }
+ } else {
+ if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
+ // Try again. Ticket changed.
+ continue;
+ }
+ // Likely to block.
+ // Try to expand unless the ticket is for a closed array
+ if (offset == getOffset(state)) {
+ if (tryExpand(state, cap)) {
+ // This or another thread started an expansion. Get up-to-date info.
+ continue;
+ }
+ }
+ return false;
+ }
+ } while (true);
+ }
+
+ bool tryObtainPromisedPushTicket(
+ uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+ ) noexcept {
+ uint64_t state;
+ do {
+ ticket = this->pushTicket_.load(std::memory_order_acquire);
+ auto numPops = this->popTicket_.load(std::memory_order_acquire);
+ if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
+ continue;
+ }
+
+ const auto curCap = cap;
+ // If there was an expansion with offset greater than this ticket,
+ // adjust accordingly
+ uint64_t offset;
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+
+ int64_t n = ticket - numPops;
+
+ if (n >= static_cast<ssize_t>(cap)) {
+ if ((cap == curCap) && tryExpand(state, cap)) {
+ // This or another thread started an expansion. Start over.
+ continue;
+ }
+ // Can't expand.
+ ticket -= offset;
+ return false;
+ }
+
+ if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+ // Adjust ticket
+ ticket -= offset;
+ return true;
+ }
+ } while (true);
+ }
+
+ bool tryObtainReadyPopTicket(
+ uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+ ) noexcept {
+ uint64_t state;
+ do {
+ ticket = this->popTicket_.load(std::memory_order_relaxed);
+ if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
+ continue;
+ }
+
+ // If there was an expansion after the corresponding push ticket
+ // was issued, adjust accordingly
+ uint64_t offset;
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+
+ if (slots[this->idx((ticket-offset), cap, stride)]
+ .mayDequeue(this->turn(ticket-offset, cap))) {
+ if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+ // Adjust ticket
+ ticket -= offset;
+ return true;
+ }
+ } else {
+ return false;
+ }
+ } while (true);
+ }
+
+ bool tryObtainPromisedPopTicket(
+ uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+ ) noexcept {
+ uint64_t state;
+ do {
+ ticket = this->popTicket_.load(std::memory_order_acquire);
+ auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
+ if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
+ continue;
+ }
+
+ uint64_t offset;
+ // If there was an expansion after the corresponding push
+ // ticket was issued, adjust accordingly
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+
+ if (ticket >= numPushes) {
+ ticket -= offset;
+ return false;
+ }
+ if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+ ticket -= offset;
+ return true;
+ }
+ } while (true);
+ }
+
+ /// Enqueues an element with a specific ticket number
+ template <typename ...Args>
+ void enqueueWithTicket(const uint64_t ticket, Args&&... args) noexcept {
+ Slot* slots;
+ size_t cap;
+ int stride;
+ uint64_t state;
+ uint64_t offset;
+
+ while (!trySeqlockReadSection(state, slots, cap, stride)) {}
+
+ // If there was an expansion after this ticket was issued, adjust
+ // accordingly
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+
+ this->enqueueWithTicketBase(ticket-offset, slots, cap, stride,
+ std::forward<Args>(args)...);
+ }
+
+ uint64_t getOffset(const uint64_t state) const noexcept {
+ return state >> kSeqlockBits;
+ }
+
+ int getNumClosed(const uint64_t state) const noexcept {
+ return (state & ((1 << kSeqlockBits) - 1)) >> 1;
+ }
+
+ /// Try to expand the queue. Returns true if this expansion was
+ /// successful or a concurent expansion is in progress. Returns
+ /// false if the queue has reached its maximum capacity or
+ /// allocation has failed.
+ bool tryExpand(const uint64_t state, const size_t cap) noexcept {
+ if (cap == this->capacity_) {
+ return false;
+ }
+ // Acquire seqlock
+ uint64_t oldval = state;
+ assert((state & 1) == 0);
+ if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
+ assert(cap == this->dcapacity_.load());
+ uint64_t ticket = 1 + std::max(this->pushTicket_.load(),
+ this->popTicket_.load());
+ size_t newCapacity =
+ std::min(dmult_ * cap, this->capacity_);
+ Slot* newSlots =
+ new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
+ if (newSlots == nullptr) {
+ // Expansion failed. Restore the seqlock
+ this->dstate_.store(state);
+ return false;
+ }
+ // Successful expansion
+ // calculate the current ticket offset
+ uint64_t offset = getOffset(state);
+ // calculate index in closed array
+ int index = getNumClosed(state);
+ assert((index << 1) < (1 << kSeqlockBits));
+ // fill the info for the closed slots array
+ closed_[index].offset_ = offset;
+ closed_[index].slots_ = this->dslots_.load();
+ closed_[index].capacity_ = cap;
+ closed_[index].stride_ = this->dstride_.load();
+ // update the new slots array info
+ this->dslots_.store(newSlots);
+ this->dcapacity_.store(newCapacity);
+ this->dstride_.store(this->computeStride(newCapacity));
+ // Release the seqlock and record the new ticket offset
+ this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1)));
+ return true;
+ } else { // failed to acquire seqlock
+ // Someone acaquired the seqlock. Go back to the caller and get
+ // up-to-date info.
+ return true;
+ }
+ }
+
+ /// Seqlock read-only section
+ bool trySeqlockReadSection(
+ uint64_t& state, Slot*& slots, size_t& cap, int& stride
+ ) noexcept {
+ state = this->dstate_.load(std::memory_order_acquire);
+ if (state & 1) {
+ // Locked.
+ return false;
+ }
+ // Start read-only section.
+ slots = this->dslots_.load(std::memory_order_relaxed);
+ cap = this->dcapacity_.load(std::memory_order_relaxed);
+ stride = this->dstride_.load(std::memory_order_relaxed);
+ // End of read-only section. Validate seqlock.
+ std::atomic_thread_fence(std::memory_order_acquire);
+ return (state == this->dstate_.load(std::memory_order_relaxed));
+ }
+
+ /// If there was an expansion after ticket was issued, update local variables
+ /// of the lagging operation using the most recent closed array with
+ /// offset <= ticket and return true. Otherwise, return false;
+ bool maybeUpdateFromClosed(
+ const uint64_t state,
+ const uint64_t ticket,
+ uint64_t& offset,
+ Slot*& slots,
+ size_t& cap,
+ int& stride) noexcept {
+ offset = getOffset(state);
+ if (ticket >= offset) {
+ return false;
+ }
+ for (int i = getNumClosed(state) - 1; i >= 0; --i) {
+ offset = closed_[i].offset_;
+ if (offset <= ticket) {
+ slots = closed_[i].slots_;
+ cap = closed_[i].capacity_;
+ stride = closed_[i].stride_;
+ return true;
+ }
+ }
+ // A closed array with offset <= ticket should have been found
+ assert(false);
+ return false;
+ }
+};
+
+namespace detail {
+
+/// CRTP specialization of MPMCQueueBase
+template<
+ template<
+ typename T, template<typename> class Atom, bool Dynamic> class Derived,
+ typename T, template<typename> class Atom, bool Dynamic>
+class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
+
+// Note: Using CRTP static casts in several functions of this base
+// template instead of making called functions virtual or duplicating
+// the code of calling functions in the derived partially specialized
+// template