X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2FMPMCQueue.h;h=d4c0ccc7c8307dbfec970f75fa8b6bd676da6339;hp=34ed779ac079dcf28cf41cb99736557e97e79347;hb=614eb71734a284e1a9fefabcc48743a3c8efd653;hpb=22afce906d7e98d95f8c45c3301072d9fd891d41 diff --git a/folly/MPMCQueue.h b/folly/MPMCQueue.h index 34ed779a..d4c0ccc7 100644 --- a/folly/MPMCQueue.h +++ b/folly/MPMCQueue.h @@ -1,5 +1,5 @@ /* - * Copyright 2014 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,29 +18,30 @@ #include #include -#include -#include -#include +#include +#include #include -#include -#include -#include #include -#include + +#include #include -#include -#include +#include +#include +#include namespace folly { namespace detail { -template class Atom> -class SingleElementQueue; +template class Atom> +struct SingleElementQueue; template class MPMCPipelineStageImpl; +/// MPMCQueue base CRTP template +template class MPMCQueueBase; + } // namespace detail /// MPMCQueue is a high-performance bounded concurrent queue that @@ -49,6 +50,14 @@ template class MPMCPipelineStageImpl; /// up front. The bulk of the work of enqueuing and dequeuing can be /// performed in parallel. /// +/// MPMCQueue is linearizable. That means that if a call to write(A) +/// returns before a call to write(B) begins, then A will definitely end up +/// in the queue before B, and if a call to read(X) returns before a call +/// to read(Y) is started, that X will be something from earlier in the +/// queue than Y. This also means that if a read call returns a value, you +/// can be sure that all previous elements of the queue have been assigned +/// a reader (that reader might not yet have returned, but it exists). +/// /// The underlying implementation uses a ticket dispenser for the head and /// the tail, spreading accesses across N single-element queues to produce /// a queue with capacity N. The ticket dispensers use atomic increment, @@ -60,8 +69,7 @@ template class MPMCPipelineStageImpl; /// when the MPMCQueue's capacity is smaller than the number of enqueuers /// or dequeuers). /// -/// NOEXCEPT INTERACTION: Ticket-based queues separate the assignment -/// of In benchmarks (contained in tao/queues/ConcurrentQueueTests) +/// In benchmarks (contained in tao/queues/ConcurrentQueueTests) /// it handles 1 to 1, 1 to N, N to 1, and N to M thread counts better /// than any of the alternatives present in fbcode, for both small (~10) /// and large capacities. In these benchmarks it is also faster than @@ -70,53 +78,594 @@ template class MPMCPipelineStageImpl; /// queue because it uses futex() to block and unblock waiting threads, /// rather than spinning with sched_yield. /// -/// queue positions from the actual construction of the in-queue elements, -/// which means that the T constructor used during enqueue must not throw -/// an exception. This is enforced at compile time using type traits, -/// which requires that T be adorned with accurate noexcept information. -/// If your type does not use noexcept, you will have to wrap it in -/// something that provides the guarantee. We provide an alternate -/// safe implementation for types that don't use noexcept but that are -/// marked folly::IsRelocatable and boost::has_nothrow_constructor, -/// which is common for folly types. In particular, if you can declare -/// FOLLY_ASSUME_FBVECTOR_COMPATIBLE then your type can be put in -/// MPMCQueue. -template class Atom = std::atomic> -class MPMCQueue : boost::noncopyable { +/// NOEXCEPT INTERACTION: tl;dr; If it compiles you're fine. Ticket-based +/// queues separate the assignment of queue positions from the actual +/// construction of the in-queue elements, which means that the T +/// constructor used during enqueue must not throw an exception. This is +/// enforced at compile time using type traits, which requires that T be +/// adorned with accurate noexcept information. If your type does not +/// use noexcept, you will have to wrap it in something that provides +/// the guarantee. We provide an alternate safe implementation for types +/// that don't use noexcept but that are marked folly::IsRelocatable +/// and std::is_nothrow_constructible, which is common for folly types. +/// In particular, if you can declare FOLLY_ASSUME_FBVECTOR_COMPATIBLE +/// then your type can be put in MPMCQueue. +/// +/// If you have a pool of N queue consumers that you want to shut down +/// after the queue has drained, one way is to enqueue N sentinel values +/// to the queue. If the producer doesn't know how many consumers there +/// are you can enqueue one sentinel and then have each consumer requeue +/// two sentinels after it receives it (by requeuing 2 the shutdown can +/// complete in O(log P) time instead of O(P)). +template < + typename T, + template class Atom = std::atomic, + bool Dynamic = false> +class MPMCQueue : public detail::MPMCQueueBase> { + friend class detail::MPMCPipelineStageImpl; + using Slot = detail::SingleElementQueue; + public: + + explicit MPMCQueue(size_t queueCapacity) + : detail::MPMCQueueBase>(queueCapacity) + { + this->stride_ = this->computeStride(queueCapacity); + this->slots_ = new Slot[queueCapacity + 2 * this->kSlotPadding]; + } + + MPMCQueue() noexcept { } +}; + +/// The dynamic version of MPMCQueue allows dynamic expansion of queue +/// capacity, such that a queue may start with a smaller capacity than +/// specified and expand only if needed. Users may optionally specify +/// the initial capacity and the expansion multiplier. +/// +/// The design uses a seqlock to enforce mutual exclusion among +/// expansion attempts. Regular operations read up-to-date queue +/// information (slots array, capacity, stride) inside read-only +/// seqlock sections, which are unimpeded when no expansion is in +/// progress. +/// +/// An expansion computes a new capacity, allocates a new slots array, +/// and updates stride. No information needs to be copied from the +/// current slots array to the new one. When this happens, new slots +/// will not have sequence numbers that match ticket numbers. The +/// expansion needs to compute a ticket offset such that operations +/// that use new arrays can adjust the calculations of slot indexes +/// and sequence numbers that take into account that the new slots +/// start with sequence numbers of zero. The current ticket offset is +/// packed with the seqlock in an atomic 64-bit integer. The initial +/// offset is zero. +/// +/// Lagging write and read operations with tickets lower than the +/// ticket offset of the current slots array (i.e., the minimum ticket +/// number that can be served by the current array) must use earlier +/// closed arrays instead of the current one. Information about closed +/// slots arrays (array address, capacity, stride, and offset) is +/// maintained in a logarithmic-sized structure. Each entry in that +/// structure never needs to be changed once set. The number of closed +/// arrays is half the value of the seqlock (when unlocked). +/// +/// The acquisition of the seqlock to perform an expansion does not +/// prevent the issuing of new push and pop tickets concurrently. The +/// expansion must set the new ticket offset to a value that couldn't +/// have been issued to an operation that has already gone through a +/// seqlock read-only section (and hence obtained information for +/// older closed arrays). +/// +/// Note that the total queue capacity can temporarily exceed the +/// specified capacity when there are lagging consumers that haven't +/// yet consumed all the elements in closed arrays. Users should not +/// rely on the capacity of dynamic queues for synchronization, e.g., +/// they should not expect that a thread will definitely block on a +/// call to blockingWrite() when the queue size is known to be equal +/// to its capacity. +/// +/// Note that some writeIfNotFull() and tryWriteUntil() operations may +/// fail even if the size of the queue is less than its maximum +/// capacity and despite the success of expansion, if the operation +/// happens to acquire a ticket that belongs to a closed array. This +/// is a transient condition. Typically, one or two ticket values may +/// be subject to such condition per expansion. +/// +/// The dynamic version is a partial specialization of MPMCQueue with +/// Dynamic == true +template class Atom> +class MPMCQueue : + public detail::MPMCQueueBase> { + friend class detail::MPMCQueueBase>; + using Slot = detail::SingleElementQueue; + + struct ClosedArray { + uint64_t offset_ {0}; + Slot* slots_ {nullptr}; + size_t capacity_ {0}; + int stride_ {0}; + }; + + public: + + explicit MPMCQueue(size_t queueCapacity) + : detail::MPMCQueueBase>(queueCapacity) + { + size_t cap = std::min(kDefaultMinDynamicCapacity, queueCapacity); + initQueue(cap, kDefaultExpansionMultiplier); + } + + explicit MPMCQueue(size_t queueCapacity, + size_t minCapacity, + size_t expansionMultiplier) + : detail::MPMCQueueBase>(queueCapacity) + { + minCapacity = std::max(1, minCapacity); + size_t cap = std::min(minCapacity, queueCapacity); + expansionMultiplier = std::max(2, expansionMultiplier); + initQueue(cap, expansionMultiplier); + } + + MPMCQueue() noexcept { + dmult_ = 0; + closed_ = nullptr; + } + + MPMCQueue(MPMCQueue&& rhs) noexcept { + this->capacity_ = rhs.capacity_; + this->slots_ = rhs.slots_; + this->stride_ = rhs.stride_; + this->dstate_.store(rhs.dstate_.load(std::memory_order_relaxed), + std::memory_order_relaxed); + this->dcapacity_.store(rhs.dcapacity_.load(std::memory_order_relaxed), + std::memory_order_relaxed); + this->pushTicket_.store(rhs.pushTicket_.load(std::memory_order_relaxed), + std::memory_order_relaxed); + this->popTicket_.store(rhs.popTicket_.load(std::memory_order_relaxed), + std::memory_order_relaxed); + this->pushSpinCutoff_.store( + rhs.pushSpinCutoff_.load(std::memory_order_relaxed), + std::memory_order_relaxed); + this->popSpinCutoff_.store( + rhs.popSpinCutoff_.load(std::memory_order_relaxed), + std::memory_order_relaxed); + dmult_ = rhs.dmult_; + closed_ = rhs.closed_; + + rhs.capacity_ = 0; + rhs.slots_ = nullptr; + rhs.stride_ = 0; + rhs.dstate_.store(0, std::memory_order_relaxed); + rhs.dcapacity_.store(0, std::memory_order_relaxed); + rhs.pushTicket_.store(0, std::memory_order_relaxed); + rhs.popTicket_.store(0, std::memory_order_relaxed); + rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed); + rhs.popSpinCutoff_.store(0, std::memory_order_relaxed); + rhs.dmult_ = 0; + rhs.closed_ = nullptr; + } + + MPMCQueue const& operator= (MPMCQueue&& rhs) { + if (this != &rhs) { + this->~MPMCQueue(); + new (this) MPMCQueue(std::move(rhs)); + } + return *this; + } + + ~MPMCQueue() { + if (closed_ != nullptr) { + for (int i = getNumClosed(this->dstate_.load()) - 1; i >= 0; --i) { + delete[] closed_[i].slots_; + } + delete[] closed_; + } + } + + size_t allocatedCapacity() const noexcept { + return this->dcapacity_.load(std::memory_order_relaxed); + } + + template + void blockingWrite(Args&&... args) noexcept { + uint64_t ticket = this->pushTicket_++; + Slot* slots; + size_t cap; + int stride; + uint64_t state; + uint64_t offset; + do { + if (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); + continue; + } + if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) { + // There was an expansion after this ticket was issued. + break; + } + if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( + this->turn(ticket - offset, cap))) { + // A slot is ready. No need to expand. + break; + } else if ( + this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) { + // May block, but a pop is in progress. No need to expand. + // Get seqlock read section info again in case an expansion + // occurred with an equal or higher ticket. + continue; + } else { + // May block. See if we can expand. + if (tryExpand(state, cap)) { + // This or another thread started an expansion. Get updated info. + continue; + } else { + // Can't expand. + break; + } + } + } while (true); + this->enqueueWithTicketBase(ticket-offset, slots, cap, stride, + std::forward(args)...); + } + + void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept { + ticket = this->popTicket_++; + Slot* slots; + size_t cap; + int stride; + uint64_t state; + uint64_t offset; + while (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); + } + // If there was an expansion after the corresponding push ticket + // was issued, adjust accordingly + maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); + this->dequeueWithTicketBase(ticket-offset, slots, cap, stride, elem); + } + + private: + enum { + kSeqlockBits = 6, + kDefaultMinDynamicCapacity = 10, + kDefaultExpansionMultiplier = 10, + }; + + size_t dmult_; + + // Info about closed slots arrays for use by lagging operations + ClosedArray* closed_; + + void initQueue(const size_t cap, const size_t mult) { + this->stride_ = this->computeStride(cap); + this->slots_ = new Slot[cap + 2 * this->kSlotPadding]; + this->dstate_.store(0); + this->dcapacity_.store(cap); + dmult_ = mult; + size_t maxClosed = 0; + for (size_t expanded = cap; + expanded < this->capacity_; + expanded *= mult) { + ++maxClosed; + } + closed_ = (maxClosed > 0) ? new ClosedArray[maxClosed] : nullptr; + } + + bool tryObtainReadyPushTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { + uint64_t state; + do { + ticket = this->pushTicket_.load(std::memory_order_acquire); // A + if (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); + continue; + } + + // If there was an expansion with offset greater than this ticket, + // adjust accordingly + uint64_t offset; + maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); + + if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( + this->turn(ticket - offset, cap))) { + // A slot is ready. + if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { + // Adjust ticket + ticket -= offset; + return true; + } else { + continue; + } + } else { + if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B + // Try again. Ticket changed. + continue; + } + // Likely to block. + // Try to expand unless the ticket is for a closed array + if (offset == getOffset(state)) { + if (tryExpand(state, cap)) { + // This or another thread started an expansion. Get up-to-date info. + continue; + } + } + return false; + } + } while (true); + } + + bool tryObtainPromisedPushTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { + uint64_t state; + do { + ticket = this->pushTicket_.load(std::memory_order_acquire); + auto numPops = this->popTicket_.load(std::memory_order_acquire); + if (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); + continue; + } + + const auto curCap = cap; + // If there was an expansion with offset greater than this ticket, + // adjust accordingly + uint64_t offset; + maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); + + int64_t n = ticket - numPops; + + if (n >= static_cast(cap)) { + if ((cap == curCap) && tryExpand(state, cap)) { + // This or another thread started an expansion. Start over. + continue; + } + // Can't expand. + ticket -= offset; + return false; + } + + if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { + // Adjust ticket + ticket -= offset; + return true; + } + } while (true); + } + + bool tryObtainReadyPopTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { + uint64_t state; + do { + ticket = this->popTicket_.load(std::memory_order_relaxed); + if (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); + continue; + } + + // If there was an expansion after the corresponding push ticket + // was issued, adjust accordingly + uint64_t offset; + maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); + + if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue( + this->turn(ticket - offset, cap))) { + if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) { + // Adjust ticket + ticket -= offset; + return true; + } + } else { + return false; + } + } while (true); + } + + bool tryObtainPromisedPopTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { + uint64_t state; + do { + ticket = this->popTicket_.load(std::memory_order_acquire); + auto numPushes = this->pushTicket_.load(std::memory_order_acquire); + if (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); + continue; + } + + uint64_t offset; + // If there was an expansion after the corresponding push + // ticket was issued, adjust accordingly + maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); + + if (ticket >= numPushes) { + ticket -= offset; + return false; + } + if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) { + ticket -= offset; + return true; + } + } while (true); + } + + /// Enqueues an element with a specific ticket number + template + void enqueueWithTicket(const uint64_t ticket, Args&&... args) noexcept { + Slot* slots; + size_t cap; + int stride; + uint64_t state; + uint64_t offset; + + while (!trySeqlockReadSection(state, slots, cap, stride)) { + } + + // If there was an expansion after this ticket was issued, adjust + // accordingly + maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); + + this->enqueueWithTicketBase(ticket-offset, slots, cap, stride, + std::forward(args)...); + } + + uint64_t getOffset(const uint64_t state) const noexcept { + return state >> kSeqlockBits; + } + + int getNumClosed(const uint64_t state) const noexcept { + return (state & ((1 << kSeqlockBits) - 1)) >> 1; + } + + /// Try to expand the queue. Returns true if this expansion was + /// successful or a concurent expansion is in progress. Returns + /// false if the queue has reached its maximum capacity or + /// allocation has failed. + bool tryExpand(const uint64_t state, const size_t cap) noexcept { + if (cap == this->capacity_) { + return false; + } + // Acquire seqlock + uint64_t oldval = state; + assert((state & 1) == 0); + if (this->dstate_.compare_exchange_strong(oldval, state + 1)) { + assert(cap == this->dcapacity_.load()); + uint64_t ticket = + 1 + std::max(this->pushTicket_.load(), this->popTicket_.load()); + size_t newCapacity = std::min(dmult_ * cap, this->capacity_); + Slot* newSlots = + new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding]; + if (newSlots == nullptr) { + // Expansion failed. Restore the seqlock + this->dstate_.store(state); + return false; + } + // Successful expansion + // calculate the current ticket offset + uint64_t offset = getOffset(state); + // calculate index in closed array + int index = getNumClosed(state); + assert((index << 1) < (1 << kSeqlockBits)); + // fill the info for the closed slots array + closed_[index].offset_ = offset; + closed_[index].slots_ = this->dslots_.load(); + closed_[index].capacity_ = cap; + closed_[index].stride_ = this->dstride_.load(); + // update the new slots array info + this->dslots_.store(newSlots); + this->dcapacity_.store(newCapacity); + this->dstride_.store(this->computeStride(newCapacity)); + // Release the seqlock and record the new ticket offset + this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1))); + return true; + } else { // failed to acquire seqlock + // Someone acaquired the seqlock. Go back to the caller and get + // up-to-date info. + return true; + } + } + + /// Seqlock read-only section + bool trySeqlockReadSection( + uint64_t& state, Slot*& slots, size_t& cap, int& stride + ) noexcept { + state = this->dstate_.load(std::memory_order_acquire); + if (state & 1) { + // Locked. + return false; + } + // Start read-only section. + slots = this->dslots_.load(std::memory_order_relaxed); + cap = this->dcapacity_.load(std::memory_order_relaxed); + stride = this->dstride_.load(std::memory_order_relaxed); + // End of read-only section. Validate seqlock. + std::atomic_thread_fence(std::memory_order_acquire); + return (state == this->dstate_.load(std::memory_order_relaxed)); + } + + /// If there was an expansion after ticket was issued, update local variables + /// of the lagging operation using the most recent closed array with + /// offset <= ticket and return true. Otherwise, return false; + bool maybeUpdateFromClosed( + const uint64_t state, + const uint64_t ticket, + uint64_t& offset, + Slot*& slots, + size_t& cap, + int& stride) noexcept { + offset = getOffset(state); + if (ticket >= offset) { + return false; + } + for (int i = getNumClosed(state) - 1; i >= 0; --i) { + offset = closed_[i].offset_; + if (offset <= ticket) { + slots = closed_[i].slots_; + cap = closed_[i].capacity_; + stride = closed_[i].stride_; + return true; + } + } + // A closed array with offset <= ticket should have been found + assert(false); + return false; + } +}; + +namespace detail { + +/// CRTP specialization of MPMCQueueBase +template < + template class Atom, bool Dynamic> + class Derived, + typename T, + template class Atom, + bool Dynamic> +class MPMCQueueBase> : boost::noncopyable { + +// Note: Using CRTP static casts in several functions of this base +// template instead of making called functions virtual or duplicating +// the code of calling functions in the derived partially specialized +// template static_assert(std::is_nothrow_constructible::value || folly::IsRelocatable::value, "T must be relocatable or have a noexcept move constructor"); - friend class detail::MPMCPipelineStageImpl; public: typedef T value_type; - explicit MPMCQueue(size_t queueCapacity) + using Slot = detail::SingleElementQueue; + + explicit MPMCQueueBase(size_t queueCapacity) : capacity_(queueCapacity) - , slots_(new detail::SingleElementQueue[queueCapacity + - 2 * kSlotPadding]) - , stride_(computeStride(queueCapacity)) , pushTicket_(0) , popTicket_(0) , pushSpinCutoff_(0) , popSpinCutoff_(0) { + if (queueCapacity == 0) { + throw std::invalid_argument( + "MPMCQueue with explicit capacity 0 is impossible" + // Stride computation in derived classes would sigfpe if capacity is 0 + ); + } + // ideally this would be a static assert, but g++ doesn't allow it - assert(alignof(MPMCQueue) - >= detail::CacheLocality::kFalseSharingRange); - assert(static_cast(static_cast(&popTicket_)) - - static_cast(static_cast(&pushTicket_)) - >= detail::CacheLocality::kFalseSharingRange); + assert(alignof(MPMCQueue) >= CacheLocality::kFalseSharingRange); + assert( + static_cast(static_cast(&popTicket_)) - + static_cast(static_cast(&pushTicket_)) >= + CacheLocality::kFalseSharingRange); } /// A default-constructed queue is useful because a usable (non-zero /// capacity) queue can be moved onto it or swapped with it - MPMCQueue() noexcept + MPMCQueueBase() noexcept : capacity_(0) , slots_(nullptr) , stride_(0) + , dstate_(0) + , dcapacity_(0) , pushTicket_(0) , popTicket_(0) , pushSpinCutoff_(0) @@ -126,10 +675,12 @@ class MPMCQueue : boost::noncopyable { /// IMPORTANT: The move constructor is here to make it easier to perform /// the initialization phase, it is not safe to use when there are any /// concurrent accesses (this is not checked). - MPMCQueue(MPMCQueue&& rhs) noexcept + MPMCQueueBase(MPMCQueueBase>&& rhs) noexcept : capacity_(rhs.capacity_) , slots_(rhs.slots_) , stride_(rhs.stride_) + , dstate_(rhs.dstate_.load(std::memory_order_relaxed)) + , dcapacity_(rhs.dcapacity_.load(std::memory_order_relaxed)) , pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed)) , popTicket_(rhs.popTicket_.load(std::memory_order_relaxed)) , pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed)) @@ -142,6 +693,8 @@ class MPMCQueue : boost::noncopyable { rhs.capacity_ = 0; rhs.slots_ = nullptr; rhs.stride_ = 0; + rhs.dstate_.store(0, std::memory_order_relaxed); + rhs.dcapacity_.store(0, std::memory_order_relaxed); rhs.pushTicket_.store(0, std::memory_order_relaxed); rhs.popTicket_.store(0, std::memory_order_relaxed); rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed); @@ -151,23 +704,30 @@ class MPMCQueue : boost::noncopyable { /// IMPORTANT: The move operator is here to make it easier to perform /// the initialization phase, it is not safe to use when there are any /// concurrent accesses (this is not checked). - MPMCQueue const& operator= (MPMCQueue&& rhs) { + MPMCQueueBase> const& operator= + (MPMCQueueBase>&& rhs) { if (this != &rhs) { - this->~MPMCQueue(); - new (this) MPMCQueue(std::move(rhs)); + this->~MPMCQueueBase(); + new (this) MPMCQueueBase(std::move(rhs)); } return *this; } /// MPMCQueue can only be safely destroyed when there are no /// pending enqueuers or dequeuers (this is not checked). - ~MPMCQueue() { + ~MPMCQueueBase() { delete[] slots_; } - /// Returns the number of successful reads minus the number of successful - /// writes. Waiting blockingRead and blockingWrite calls are included, - /// so this value can be negative. + /// Returns the number of writes (including threads that are blocked waiting + /// to write) minus the number of reads (including threads that are blocked + /// waiting to read). So effectively, it becomes: + /// elements in queue + pending(calls to write) - pending(calls to read). + /// If nothing is pending, then the method returns the actual number of + /// elements in the queue. + /// The returned value can be negative if there are no writers and the queue + /// is empty, but there is one reader that is blocked waiting to read (in + /// which case, the returned size will be -1). ssize_t size() const noexcept { // since both pushes and pops increase monotonically, we can get a // consistent snapshot either by bracketing a read of popTicket_ with @@ -181,14 +741,14 @@ class MPMCQueue : boost::noncopyable { if (pushes == nextPushes) { // pushTicket_ didn't change from A (or the previous C) to C, // so we can linearize at B (or D) - return pushes - pops; + return ssize_t(pushes - pops); } pushes = nextPushes; uint64_t nextPops = popTicket_.load(std::memory_order_acquire); // D if (pops == nextPops) { // popTicket_ didn't chance from B (or the previous D), so we // can linearize at C - return pushes - pops; + return ssize_t(pushes - pops); } pops = nextPops; } @@ -206,7 +766,11 @@ class MPMCQueue : boost::noncopyable { } /// Returns is a guess at size() for contexts that don't need a precise - /// value, such as stats. + /// value, such as stats. More specifically, it returns the number of writes + /// minus the number of reads, but after reading the number of writes, more + /// writers could have came before the number of reads was sampled, + /// and this method doesn't protect against such case. + /// The returned value can be negative. ssize_t sizeGuess() const noexcept { return writeCount() - readCount(); } @@ -216,6 +780,11 @@ class MPMCQueue : boost::noncopyable { return capacity_; } + /// Doesn't change for non-dynamic + size_t allocatedCapacity() const noexcept { + return capacity_; + } + /// Returns the total number of calls to blockingWrite or successful /// calls to write, including those blockingWrite calls that are /// currently blocking @@ -237,7 +806,8 @@ class MPMCQueue : boost::noncopyable { /// to a T constructor. template void blockingWrite(Args&&... args) noexcept { - enqueueWithTicket(pushTicket_++, std::forward(args)...); + enqueueWithTicketBase(pushTicket_++, slots_, capacity_, stride_, + std::forward(args)...); } /// If an item can be enqueued with no blocking, does so and returns @@ -256,9 +826,33 @@ class MPMCQueue : boost::noncopyable { template bool write(Args&&... args) noexcept { uint64_t ticket; - if (tryObtainReadyPushTicket(ticket)) { + Slot* slots; + size_t cap; + int stride; + if (static_cast*>(this)-> + tryObtainReadyPushTicket(ticket, slots, cap, stride)) { // we have pre-validated that the ticket won't block - enqueueWithTicket(ticket, std::forward(args)...); + enqueueWithTicketBase(ticket, slots, cap, stride, + std::forward(args)...); + return true; + } else { + return false; + } + } + + template + bool tryWriteUntil(const std::chrono::time_point& when, + Args&&... args) noexcept { + uint64_t ticket; + Slot* slots; + size_t cap; + int stride; + if (tryObtainPromisedPushTicketUntil(ticket, slots, cap, stride, when)) { + // we have pre-validated that the ticket won't block, or rather that + // it won't block longer than it takes another thread to dequeue an + // element from the slot it identifies. + enqueueWithTicketBase(ticket, slots, cap, stride, + std::forward(args)...); return true; } else { return false; @@ -277,14 +871,19 @@ class MPMCQueue : boost::noncopyable { /// return false, but writeIfNotFull will wait for the dequeue to finish. /// This method is required if you are composing queues and managing /// your own wakeup, because it guarantees that after every successful - /// write a readIfNotFull will succeed. + /// write a readIfNotEmpty will succeed. template bool writeIfNotFull(Args&&... args) noexcept { uint64_t ticket; - if (tryObtainPromisedPushTicket(ticket)) { + Slot* slots; + size_t cap; + int stride; + if (static_cast*>(this)-> + tryObtainPromisedPushTicket(ticket, slots, cap, stride)) { // some other thread is already dequeuing the slot into which we // are going to enqueue, but we might have to wait for them to finish - enqueueWithTicket(ticket, std::forward(args)...); + enqueueWithTicketBase(ticket, slots, cap, stride, + std::forward(args)...); return true; } else { return false; @@ -294,16 +893,53 @@ class MPMCQueue : boost::noncopyable { /// Moves a dequeued element onto elem, blocking until an element /// is available void blockingRead(T& elem) noexcept { - dequeueWithTicket(popTicket_++, elem); + uint64_t ticket; + static_cast*>(this)-> + blockingReadWithTicket(ticket, elem); + } + + /// Same as blockingRead() but also records the ticket nunmer + void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept { + assert(capacity_ != 0); + ticket = popTicket_++; + dequeueWithTicketBase(ticket, slots_, capacity_, stride_, elem); } /// If an item can be dequeued with no blocking, does so and returns /// true, otherwise returns false. bool read(T& elem) noexcept { uint64_t ticket; - if (tryObtainReadyPopTicket(ticket)) { + return readAndGetTicket(ticket, elem); + } + + /// Same as read() but also records the ticket nunmer + bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept { + Slot* slots; + size_t cap; + int stride; + if (static_cast*>(this)-> + tryObtainReadyPopTicket(ticket, slots, cap, stride)) { // the ticket has been pre-validated to not block - dequeueWithTicket(ticket, elem); + dequeueWithTicketBase(ticket, slots, cap, stride, elem); + return true; + } else { + return false; + } + } + + template + bool tryReadUntil( + const std::chrono::time_point& when, + T& elem) noexcept { + uint64_t ticket; + Slot* slots; + size_t cap; + int stride; + if (tryObtainPromisedPopTicketUntil(ticket, slots, cap, stride, when)) { + // we have pre-validated that the ticket won't block, or rather that + // it won't block longer than it takes another thread to enqueue an + // element on the slot it identifies. + dequeueWithTicketBase(ticket, slots, cap, stride, elem); return true; } else { return false; @@ -317,16 +953,20 @@ class MPMCQueue : boost::noncopyable { /// prefer read. bool readIfNotEmpty(T& elem) noexcept { uint64_t ticket; - if (tryObtainPromisedPopTicket(ticket)) { + Slot* slots; + size_t cap; + int stride; + if (static_cast*>(this)-> + tryObtainPromisedPopTicket(ticket, slots, cap, stride)) { // the matching enqueue already has a ticket, but might not be done - dequeueWithTicket(ticket, elem); + dequeueWithTicketBase(ticket, slots, cap, stride, elem); return true; } else { return false; } } - private: + protected: enum { /// Once every kAdaptationFreq we will spin longer, to try to estimate /// the proper spin backoff @@ -335,22 +975,41 @@ class MPMCQueue : boost::noncopyable { /// To avoid false sharing in slots_ with neighboring memory /// allocations, we pad it with this many SingleElementQueue-s at /// each end - kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1) - / sizeof(detail::SingleElementQueue) + 1 + kSlotPadding = (CacheLocality::kFalseSharingRange - 1) / sizeof(Slot) + 1 }; /// The maximum number of items in the queue at once size_t FOLLY_ALIGN_TO_AVOID_FALSE_SHARING capacity_; - /// An array of capacity_ SingleElementQueue-s, each of which holds - /// either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't - /// touch the slots at either end, to avoid false sharing - detail::SingleElementQueue* slots_; + /// Anonymous union for use when Dynamic = false and true, respectively + union { + /// An array of capacity_ SingleElementQueue-s, each of which holds + /// either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't + /// touch the slots at either end, to avoid false sharing + Slot* slots_; + /// Current dynamic slots array of dcapacity_ SingleElementQueue-s + Atom dslots_; + }; - /// The number of slots_ indices that we advance for each ticket, to - /// avoid false sharing. Ideally slots_[i] and slots_[i + stride_] - /// aren't on the same cache line - int stride_; + /// Anonymous union for use when Dynamic = false and true, respectively + union { + /// The number of slots_ indices that we advance for each ticket, to + /// avoid false sharing. Ideally slots_[i] and slots_[i + stride_] + /// aren't on the same cache line + int stride_; + /// Current stride + Atom dstride_; + }; + + /// The following two memebers are used by dynamic MPMCQueue. + /// Ideally they should be in MPMCQueue, but we get + /// better cache locality if they are in the same cache line as + /// dslots_ and dstride_. + /// + /// Dynamic state. A packed seqlock and ticket offset + Atom dstate_; + /// Dynamic capacity + Atom dcapacity_; /// Enqueuers get tickets from here Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushTicket_; @@ -361,15 +1020,14 @@ class MPMCQueue : boost::noncopyable { /// This is how many times we will spin before using FUTEX_WAIT when /// the queue is full on enqueue, adaptively computed by occasionally /// spinning for longer and smoothing with an exponential moving average - Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushSpinCutoff_; + Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushSpinCutoff_; /// The adaptive spin cutoff when the queue is empty on dequeue - Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popSpinCutoff_; + Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popSpinCutoff_; /// Alignment doesn't prevent false sharing at the end of the struct, /// so fill out the last cache line - char padding_[detail::CacheLocality::kFalseSharingRange - sizeof(Atom)]; - + char padding_[CacheLocality::kFalseSharingRange - sizeof(Atom)]; /// We assign tickets in increasing order, but we don't want to /// access neighboring elements of slots_ because that will lead to @@ -411,23 +1069,30 @@ class MPMCQueue : boost::noncopyable { /// Returns the index into slots_ that should be used when enqueuing or /// dequeuing with the specified ticket - size_t idx(uint64_t ticket) noexcept { - return ((ticket * stride_) % capacity_) + kSlotPadding; + size_t idx(uint64_t ticket, size_t cap, int stride) noexcept { + return ((ticket * stride) % cap) + kSlotPadding; } /// Maps an enqueue or dequeue ticket to the turn should be used at the /// corresponding SingleElementQueue - uint32_t turn(uint64_t ticket) noexcept { - return ticket / capacity_; + uint32_t turn(uint64_t ticket, size_t cap) noexcept { + assert(cap != 0); + return uint32_t(ticket / cap); } /// Tries to obtain a push ticket for which SingleElementQueue::enqueue /// won't block. Returns true on immediate success, false on immediate /// failure. - bool tryObtainReadyPushTicket(uint64_t& rv) noexcept { - auto ticket = pushTicket_.load(std::memory_order_acquire); // A + bool tryObtainReadyPushTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { + ticket = pushTicket_.load(std::memory_order_acquire); // A + slots = slots_; + cap = capacity_; + stride = stride_; while (true) { - if (!slots_[idx(ticket)].mayEnqueue(turn(ticket))) { + if (!slots[idx(ticket, cap, stride)] + .mayEnqueue(turn(ticket, cap))) { // if we call enqueue(ticket, ...) on the SingleElementQueue // right now it would block, but this might no longer be the next // ticket. We can increase the chance of tryEnqueue success under @@ -444,24 +1109,55 @@ class MPMCQueue : boost::noncopyable { // or prev failing CAS) and the following CAS. If the CAS fails // it will effect a load of pushTicket_ if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { - rv = ticket; return true; } } } } + /// Tries until when to obtain a push ticket for which + /// SingleElementQueue::enqueue won't block. Returns true on success, false + /// on failure. + /// ticket is filled on success AND failure. + template + bool tryObtainPromisedPushTicketUntil( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride, + const std::chrono::time_point& when + ) noexcept { + bool deadlineReached = false; + while (!deadlineReached) { + if (static_cast*>(this)-> + tryObtainPromisedPushTicket(ticket, slots, cap, stride)) { + return true; + } + // ticket is a blocking ticket until the preceding ticket has been + // processed: wait until this ticket's turn arrives. We have not reserved + // this ticket so we will have to re-attempt to get a non-blocking ticket + // if we wake up before we time-out. + deadlineReached = !slots[idx(ticket, cap, stride)] + .tryWaitForEnqueueTurnUntil(turn(ticket, cap), pushSpinCutoff_, + (ticket % kAdaptationFreq) == 0, when); + } + return false; + } + /// Tries to obtain a push ticket which can be satisfied if all /// in-progress pops complete. This function does not block, but /// blocking may be required when using the returned ticket if some /// other thread's pop is still in progress (ticket has been granted but /// pop has not yet completed). - bool tryObtainPromisedPushTicket(uint64_t& rv) noexcept { + bool tryObtainPromisedPushTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { auto numPushes = pushTicket_.load(std::memory_order_acquire); // A + slots = slots_; + cap = capacity_; + stride = stride_; while (true) { - auto numPops = popTicket_.load(std::memory_order_acquire); // B + ticket = numPushes; + const auto numPops = popTicket_.load(std::memory_order_acquire); // B // n will be negative if pops are pending - int64_t n = numPushes - numPops; + const int64_t n = int64_t(numPushes - numPops); if (n >= static_cast(capacity_)) { // Full, linearize at B. We don't need to recheck the read we // performed at A, because if numPushes was stale at B then the @@ -469,7 +1165,6 @@ class MPMCQueue : boost::noncopyable { return false; } if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) { - rv = numPushes; return true; } } @@ -478,10 +1173,16 @@ class MPMCQueue : boost::noncopyable { /// Tries to obtain a pop ticket for which SingleElementQueue::dequeue /// won't block. Returns true on immediate success, false on immediate /// failure. - bool tryObtainReadyPopTicket(uint64_t& rv) noexcept { - auto ticket = popTicket_.load(std::memory_order_acquire); + bool tryObtainReadyPopTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { + ticket = popTicket_.load(std::memory_order_acquire); + slots = slots_; + cap = capacity_; + stride = stride_; while (true) { - if (!slots_[idx(ticket)].mayDequeue(turn(ticket))) { + if (!slots[idx(ticket, cap, stride)] + .mayDequeue(turn(ticket, cap))) { auto prev = ticket; ticket = popTicket_.load(std::memory_order_acquire); if (prev == ticket) { @@ -489,13 +1190,43 @@ class MPMCQueue : boost::noncopyable { } } else { if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) { - rv = ticket; return true; } } } } + /// Tries until when to obtain a pop ticket for which + /// SingleElementQueue::dequeue won't block. Returns true on success, false + /// on failure. + /// ticket is filled on success AND failure. + template + bool tryObtainPromisedPopTicketUntil( + uint64_t& ticket, + Slot*& slots, + size_t& cap, + int& stride, + const std::chrono::time_point& when) noexcept { + bool deadlineReached = false; + while (!deadlineReached) { + if (static_cast*>(this) + ->tryObtainPromisedPopTicket(ticket, slots, cap, stride)) { + return true; + } + // ticket is a blocking ticket until the preceding ticket has been + // processed: wait until this ticket's turn arrives. We have not reserved + // this ticket so we will have to re-attempt to get a non-blocking ticket + // if we wake up before we time-out. + deadlineReached = + !slots[idx(ticket, cap, stride)].tryWaitForDequeueTurnUntil( + turn(ticket, cap), + pushSpinCutoff_, + (ticket % kAdaptationFreq) == 0, + when); + } + return false; + } + /// Similar to tryObtainReadyPopTicket, but returns a pop ticket whose /// corresponding push ticket has already been handed out, rather than /// returning one whose corresponding push ticket has already been @@ -506,10 +1237,16 @@ class MPMCQueue : boost::noncopyable { /// to block waiting for someone to call enqueue, although we might /// have to block waiting for them to finish executing code inside the /// MPMCQueue itself. - bool tryObtainPromisedPopTicket(uint64_t& rv) noexcept { + bool tryObtainPromisedPopTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { auto numPops = popTicket_.load(std::memory_order_acquire); // A + slots = slots_; + cap = capacity_; + stride = stride_; while (true) { - auto numPushes = pushTicket_.load(std::memory_order_acquire); // B + ticket = numPops; + const auto numPushes = pushTicket_.load(std::memory_order_acquire); // B if (numPops >= numPushes) { // Empty, or empty with pending pops. Linearize at B. We don't // need to recheck the read we performed at A, because if numPops @@ -517,7 +1254,6 @@ class MPMCQueue : boost::noncopyable { return false; } if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) { - rv = numPops; return true; } } @@ -525,224 +1261,36 @@ class MPMCQueue : boost::noncopyable { // Given a ticket, constructs an enqueued item using args template - void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept { - slots_[idx(ticket)].enqueue(turn(ticket), - pushSpinCutoff_, - (ticket % kAdaptationFreq) == 0, - std::forward(args)...); - } - - // Given a ticket, dequeues the corresponding element - void dequeueWithTicket(uint64_t ticket, T& elem) noexcept { - slots_[idx(ticket)].dequeue(turn(ticket), - popSpinCutoff_, - (ticket % kAdaptationFreq) == 0, - elem); - } -}; - - -namespace detail { - -/// A TurnSequencer allows threads to order their execution according to -/// a monotonically increasing (with wraparound) "turn" value. The two -/// operations provided are to wait for turn T, and to move to the next -/// turn. Every thread that is waiting for T must have arrived before -/// that turn is marked completed (for MPMCQueue only one thread waits -/// for any particular turn, so this is trivially true). -/// -/// TurnSequencer's state_ holds 26 bits of the current turn (shifted -/// left by 6), along with a 6 bit saturating value that records the -/// maximum waiter minus the current turn. Wraparound of the turn space -/// is expected and handled. This allows us to atomically adjust the -/// number of outstanding waiters when we perform a FUTEX_WAKE operation. -/// Compare this strategy to sem_t's separate num_waiters field, which -/// isn't decremented until after the waiting thread gets scheduled, -/// during which time more enqueues might have occurred and made pointless -/// FUTEX_WAKE calls. -/// -/// TurnSequencer uses futex() directly. It is optimized for the -/// case that the highest awaited turn is 32 or less higher than the -/// current turn. We use the FUTEX_WAIT_BITSET variant, which lets -/// us embed 32 separate wakeup channels in a single futex. See -/// http://locklessinc.com/articles/futex_cheat_sheet for a description. -/// -/// We only need to keep exact track of the delta between the current -/// turn and the maximum waiter for the 32 turns that follow the current -/// one, because waiters at turn t+32 will be awoken at turn t. At that -/// point they can then adjust the delta using the higher base. Since we -/// need to encode waiter deltas of 0 to 32 inclusive, we use 6 bits. -/// We actually store waiter deltas up to 63, since that might reduce -/// the number of CAS operations a tiny bit. -/// -/// To avoid some futex() calls entirely, TurnSequencer uses an adaptive -/// spin cutoff before waiting. The overheads (and convergence rate) -/// of separately tracking the spin cutoff for each TurnSequencer would -/// be prohibitive, so the actual storage is passed in as a parameter and -/// updated atomically. This also lets the caller use different adaptive -/// cutoffs for different operations (read versus write, for example). -/// To avoid contention, the spin cutoff is only updated when requested -/// by the caller. -template class Atom> -struct TurnSequencer { - explicit TurnSequencer(const uint32_t firstTurn = 0) noexcept - : state_(encode(firstTurn << kTurnShift, 0)) - {} - - /// Returns true iff a call to waitForTurn(turn, ...) won't block - bool isTurn(const uint32_t turn) const noexcept { - auto state = state_.load(std::memory_order_acquire); - return decodeCurrentSturn(state) == (turn << kTurnShift); - } - - // Internally we always work with shifted turn values, which makes the - // truncation and wraparound work correctly. This leaves us bits at - // the bottom to store the number of waiters. We call shifted turns - // "sturns" inside this class. - - /// Blocks the current thread until turn has arrived. If - /// updateSpinCutoff is true then this will spin for up to kMaxSpins tries - /// before blocking and will adjust spinCutoff based on the results, - /// otherwise it will spin for at most spinCutoff spins. - void waitForTurn(const uint32_t turn, - Atom& spinCutoff, - const bool updateSpinCutoff) noexcept { - int prevThresh = spinCutoff.load(std::memory_order_relaxed); - const int effectiveSpinCutoff = - updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh; - int tries; - - const uint32_t sturn = turn << kTurnShift; - for (tries = 0; ; ++tries) { - uint32_t state = state_.load(std::memory_order_acquire); - uint32_t current_sturn = decodeCurrentSturn(state); - if (current_sturn == sturn) { - break; - } - - // wrap-safe version of assert(current_sturn < sturn) - assert(sturn - current_sturn < std::numeric_limits::max() / 2); - - // the first effectSpinCutoff tries are spins, after that we will - // record ourself as a waiter and block with futexWait - if (tries < effectiveSpinCutoff) { - asm volatile ("pause"); - continue; - } - - uint32_t current_max_waiter_delta = decodeMaxWaitersDelta(state); - uint32_t our_waiter_delta = (sturn - current_sturn) >> kTurnShift; - uint32_t new_state; - if (our_waiter_delta <= current_max_waiter_delta) { - // state already records us as waiters, probably because this - // isn't our first time around this loop - new_state = state; - } else { - new_state = encode(current_sturn, our_waiter_delta); - if (state != new_state && - !state_.compare_exchange_strong(state, new_state)) { - continue; - } - } - state_.futexWait(new_state, futexChannel(turn)); - } - - if (updateSpinCutoff || prevThresh == 0) { - // if we hit kMaxSpins then spinning was pointless, so the right - // spinCutoff is kMinSpins - int target; - if (tries >= kMaxSpins) { - target = kMinSpins; - } else { - // to account for variations, we allow ourself to spin 2*N when - // we think that N is actually required in order to succeed - target = std::min(int{kMaxSpins}, std::max(int{kMinSpins}, tries * 2)); - } - - if (prevThresh == 0) { - // bootstrap - spinCutoff = target; - } else { - // try once, keep moving if CAS fails. Exponential moving average - // with alpha of 7/8 - spinCutoff.compare_exchange_weak( - prevThresh, prevThresh + (target - prevThresh) / 8); - } - } + void enqueueWithTicketBase( + uint64_t ticket, Slot* slots, size_t cap, int stride, Args&&... args + ) noexcept { + slots[idx(ticket, cap, stride)] + .enqueue(turn(ticket, cap), + pushSpinCutoff_, + (ticket % kAdaptationFreq) == 0, + std::forward(args)...); } - /// Unblocks a thread running waitForTurn(turn + 1) - void completeTurn(const uint32_t turn) noexcept { - uint32_t state = state_.load(std::memory_order_acquire); - while (true) { - assert(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state))); - uint32_t max_waiter_delta = decodeMaxWaitersDelta(state); - uint32_t new_state = encode( - (turn + 1) << kTurnShift, - max_waiter_delta == 0 ? 0 : max_waiter_delta - 1); - if (state_.compare_exchange_strong(state, new_state)) { - if (max_waiter_delta != 0) { - state_.futexWake(std::numeric_limits::max(), - futexChannel(turn + 1)); - } - break; - } - // failing compare_exchange_strong updates first arg to the value - // that caused the failure, so no need to reread state_ - } - } - - /// Returns the least-most significant byte of the current uncompleted - /// turn. The full 32 bit turn cannot be recovered. - uint8_t uncompletedTurnLSB() const noexcept { - return state_.load(std::memory_order_acquire) >> kTurnShift; - } - - private: - enum : uint32_t { - /// kTurnShift counts the bits that are stolen to record the delta - /// between the current turn and the maximum waiter. It needs to be big - /// enough to record wait deltas of 0 to 32 inclusive. Waiters more - /// than 32 in the future will be woken up 32*n turns early (since - /// their BITSET will hit) and will adjust the waiter count again. - /// We go a bit beyond and let the waiter count go up to 63, which - /// is free and might save us a few CAS - kTurnShift = 6, - kWaitersMask = (1 << kTurnShift) - 1, - - /// The minimum spin count that we will adaptively select - kMinSpins = 20, - - /// The maximum spin count that we will adaptively select, and the - /// spin count that will be used when probing to get a new data point - /// for the adaptation - kMaxSpins = 2000, - }; - - /// This holds both the current turn, and the highest waiting turn, - /// stored as (current_turn << 6) | min(63, max(waited_turn - current_turn)) - Futex state_; - - /// Returns the bitmask to pass futexWait or futexWake when communicating - /// about the specified turn - int futexChannel(uint32_t turn) const noexcept { - return 1 << (turn & 31); - } - - uint32_t decodeCurrentSturn(uint32_t state) const noexcept { - return state & ~kWaitersMask; - } - - uint32_t decodeMaxWaitersDelta(uint32_t state) const noexcept { - return state & kWaitersMask; + // To support tracking ticket numbers in MPMCPipelineStageImpl + template + void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept { + enqueueWithTicketBase(ticket, slots_, capacity_, stride_, + std::forward(args)...); } - uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept { - return currentSturn | std::min(uint32_t{ kWaitersMask }, maxWaiterD); + // Given a ticket, dequeues the corresponding element + void dequeueWithTicketBase( + uint64_t ticket, Slot* slots, size_t cap, int stride, T& elem + ) noexcept { + assert(cap != 0); + slots[idx(ticket, cap, stride)] + .dequeue(turn(ticket, cap), + popSpinCutoff_, + (ticket % kAdaptationFreq) == 0, + elem); } }; - /// SingleElementQueue implements a blocking queue that holds at most one /// item, and that requires its users to assign incrementing identifiers /// (turns) to each enqueue and dequeue operation. Note that the turns @@ -758,11 +1306,12 @@ struct SingleElementQueue { } /// enqueue using in-place noexcept construction - template ::value>::type> + template < + typename... Args, + typename = typename std::enable_if< + std::is_nothrow_constructible::value>::type> void enqueue(const uint32_t turn, - Atom& spinCutoff, + Atom& spinCutoff, const bool updateSpinCutoff, Args&&... args) noexcept { sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); @@ -772,28 +1321,39 @@ struct SingleElementQueue { /// enqueue using move construction, either real (if /// is_nothrow_move_constructible) or simulated using relocation and - /// default construction (if IsRelocatable and has_nothrow_constructor) - template ::value && - boost::has_nothrow_constructor::value) || - std::is_nothrow_constructible::value>::type> - void enqueue(const uint32_t turn, - Atom& spinCutoff, - const bool updateSpinCutoff, - T&& goner) noexcept { - if (std::is_nothrow_constructible::value) { - // this is preferred - sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); - new (&contents_) T(std::move(goner)); - sequencer_.completeTurn(turn * 2); - } else { - // simulate nothrow move with relocation, followed by default - // construction to fill the gap we created - sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); - memcpy(&contents_, &goner, sizeof(T)); - sequencer_.completeTurn(turn * 2); - new (&goner) T(); - } + /// default construction (if IsRelocatable and is_nothrow_constructible) + template < + typename = typename std::enable_if< + (folly::IsRelocatable::value && + std::is_nothrow_constructible::value) || + std::is_nothrow_constructible::value>::type> + void enqueue( + const uint32_t turn, + Atom& spinCutoff, + const bool updateSpinCutoff, + T&& goner) noexcept { + enqueueImpl( + turn, + spinCutoff, + updateSpinCutoff, + std::move(goner), + typename std::conditional::value, + ImplByMove, ImplByRelocation>::type()); + } + + /// Waits until either: + /// 1: the dequeue turn preceding the given enqueue turn has arrived + /// 2: the given deadline has arrived + /// Case 1 returns true, case 2 returns false. + template + bool tryWaitForEnqueueTurnUntil( + const uint32_t turn, + Atom& spinCutoff, + const bool updateSpinCutoff, + const std::chrono::time_point& when) noexcept { + return sequencer_.tryWaitForTurn( + turn * 2, spinCutoff, updateSpinCutoff, &when) != + TurnSequencer::TryWaitResult::TIMEDOUT; } bool mayEnqueue(const uint32_t turn) const noexcept { @@ -801,27 +1361,31 @@ struct SingleElementQueue { } void dequeue(uint32_t turn, - Atom& spinCutoff, + Atom& spinCutoff, const bool updateSpinCutoff, T& elem) noexcept { - if (folly::IsRelocatable::value) { - // this version is preferred, because we do as much work as possible - // before waiting - try { - elem.~T(); - } catch (...) { - // unlikely, but if we don't complete our turn the queue will die - } - sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff); - memcpy(&elem, &contents_, sizeof(T)); - sequencer_.completeTurn(turn * 2 + 1); - } else { - // use nothrow move assignment - sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff); - elem = std::move(*ptr()); - destroyContents(); - sequencer_.completeTurn(turn * 2 + 1); - } + dequeueImpl(turn, + spinCutoff, + updateSpinCutoff, + elem, + typename std::conditional::value, + ImplByRelocation, + ImplByMove>::type()); + } + + /// Waits until either: + /// 1: the enqueue turn preceding the given dequeue turn has arrived + /// 2: the given deadline has arrived + /// Case 1 returns true, case 2 returns false. + template + bool tryWaitForDequeueTurnUntil( + const uint32_t turn, + Atom& spinCutoff, + const bool updateSpinCutoff, + const std::chrono::time_point& when) noexcept { + return sequencer_.tryWaitForTurn( + turn * 2 + 1, spinCutoff, updateSpinCutoff, &when) != + TurnSequencer::TryWaitResult::TIMEDOUT; } bool mayDequeue(const uint32_t turn) const noexcept { @@ -849,6 +1413,63 @@ struct SingleElementQueue { memset(&contents_, 'Q', sizeof(T)); #endif } + + /// Tag classes for dispatching to enqueue/dequeue implementation. + struct ImplByRelocation {}; + struct ImplByMove {}; + + /// enqueue using nothrow move construction. + void enqueueImpl(const uint32_t turn, + Atom& spinCutoff, + const bool updateSpinCutoff, + T&& goner, + ImplByMove) noexcept { + sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); + new (&contents_) T(std::move(goner)); + sequencer_.completeTurn(turn * 2); + } + + /// enqueue by simulating nothrow move with relocation, followed by + /// default construction to a noexcept relocation. + void enqueueImpl(const uint32_t turn, + Atom& spinCutoff, + const bool updateSpinCutoff, + T&& goner, + ImplByRelocation) noexcept { + sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); + memcpy(&contents_, &goner, sizeof(T)); + sequencer_.completeTurn(turn * 2); + new (&goner) T(); + } + + /// dequeue by destructing followed by relocation. This version is preferred, + /// because as much work as possible can be done before waiting. + void dequeueImpl(uint32_t turn, + Atom& spinCutoff, + const bool updateSpinCutoff, + T& elem, + ImplByRelocation) noexcept { + try { + elem.~T(); + } catch (...) { + // unlikely, but if we don't complete our turn the queue will die + } + sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff); + memcpy(&elem, &contents_, sizeof(T)); + sequencer_.completeTurn(turn * 2 + 1); + } + + /// dequeue by nothrow move assignment. + void dequeueImpl(uint32_t turn, + Atom& spinCutoff, + const bool updateSpinCutoff, + T& elem, + ImplByMove) noexcept { + sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff); + elem = std::move(*ptr()); + destroyContents(); + sequencer_.completeTurn(turn * 2 + 1); + } }; } // namespace detail