X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2FMPMCQueue.h;h=932ae8fc2add1af1ad29c15aa0a1ebe4b3aa4039;hb=7bf1486094cccb266e789a378d8e5f91e3cb7780;hp=b0cfc46f6dfc833c839544f055bd12f4282835fd;hpb=fa172175980b13569ba42008202a857af6e959dd;p=folly.git diff --git a/folly/MPMCQueue.h b/folly/MPMCQueue.h index b0cfc46f..932ae8fc 100644 --- a/folly/MPMCQueue.h +++ b/folly/MPMCQueue.h @@ -18,14 +18,15 @@ #include #include -#include -#include +#include +#include #include -#include #include +#include + #include -#include +#include #include #include @@ -33,7 +34,7 @@ namespace folly { namespace detail { -template class Atom> +template class Atom> struct SingleElementQueue; template class MPMCPipelineStageImpl; @@ -96,8 +97,10 @@ template class MPMCQueueBase; /// are you can enqueue one sentinel and then have each consumer requeue /// two sentinels after it receives it (by requeuing 2 the shutdown can /// complete in O(log P) time instead of O(P)). -template class Atom = std::atomic, - bool Dynamic = false> +template < + typename T, + template class Atom = std::atomic, + bool Dynamic = false> class MPMCQueue : public detail::MPMCQueueBase> { friend class detail::MPMCPipelineStageImpl; using Slot = detail::SingleElementQueue; @@ -141,7 +144,7 @@ class MPMCQueue : public detail::MPMCQueueBase> { /// closed arrays instead of the current one. Information about closed /// slots arrays (array address, capacity, stride, and offset) is /// maintained in a logarithmic-sized structure. Each entry in that -/// structure never need to be changed once set. The number of closed +/// structure never needs to be changed once set. The number of closed /// arrays is half the value of the seqlock (when unlocked). /// /// The acquisition of the seqlock to perform an expansion does not @@ -168,7 +171,7 @@ class MPMCQueue : public detail::MPMCQueueBase> { /// /// The dynamic version is a partial specialization of MPMCQueue with /// Dynamic == true -template class Atom> +template class Atom> class MPMCQueue : public detail::MPMCQueueBase> { friend class detail::MPMCQueueBase>; @@ -278,12 +281,12 @@ class MPMCQueue : // There was an expansion after this ticket was issued. break; } - if (slots[this->idx((ticket-offset), cap, stride)] - .mayEnqueue(this->turn(ticket-offset, cap))) { + if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( + this->turn(ticket - offset, cap))) { // A slot is ready. No need to expand. break; - } else if (this->popTicket_.load(std::memory_order_relaxed) + cap - > ticket) { + } else if ( + this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) { // May block, but a pop is in progress. No need to expand. // Get seqlock read section info again in case an expansion // occurred with an equal or higher ticket. @@ -320,7 +323,6 @@ class MPMCQueue : } private: - enum { kSeqlockBits = 6, kDefaultMinDynamicCapacity = 10, @@ -363,8 +365,8 @@ class MPMCQueue : uint64_t offset; maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - if (slots[this->idx((ticket-offset), cap, stride)] - .mayEnqueue(this->turn(ticket-offset, cap))) { + if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( + this->turn(ticket - offset, cap))) { // A slot is ready. if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { // Adjust ticket @@ -445,8 +447,8 @@ class MPMCQueue : uint64_t offset; maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - if (slots[this->idx((ticket-offset), cap, stride)] - .mayDequeue(this->turn(ticket-offset, cap))) { + if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue( + this->turn(ticket - offset, cap))) { if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) { // Adjust ticket ticket -= offset; @@ -495,7 +497,8 @@ class MPMCQueue : uint64_t state; uint64_t offset; - while (!trySeqlockReadSection(state, slots, cap, stride)) {} + while (!trySeqlockReadSection(state, slots, cap, stride)) { + } // If there was an expansion after this ticket was issued, adjust // accordingly @@ -526,12 +529,11 @@ class MPMCQueue : assert((state & 1) == 0); if (this->dstate_.compare_exchange_strong(oldval, state + 1)) { assert(cap == this->dcapacity_.load()); - uint64_t ticket = 1 + std::max(this->pushTicket_.load(), - this->popTicket_.load()); - size_t newCapacity = - std::min(dmult_ * cap, this->capacity_); + uint64_t ticket = + 1 + std::max(this->pushTicket_.load(), this->popTicket_.load()); + size_t newCapacity = std::min(dmult_ * cap, this->capacity_); Slot* newSlots = - new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding]; + new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding]; if (newSlots == nullptr) { // Expansion failed. Restore the seqlock this->dstate_.store(state); @@ -612,10 +614,12 @@ class MPMCQueue : namespace detail { /// CRTP specialization of MPMCQueueBase -template< - template< - typename T, template class Atom, bool Dynamic> class Derived, - typename T, template class Atom, bool Dynamic> +template < + template class Atom, bool Dynamic> + class Derived, + typename T, + template class Atom, + bool Dynamic> class MPMCQueueBase> : boost::noncopyable { // Note: Using CRTP static casts in several functions of this base @@ -647,11 +651,12 @@ class MPMCQueueBase> : boost::noncopyable { } // ideally this would be a static assert, but g++ doesn't allow it - assert(alignof(MPMCQueue) - >= detail::CacheLocality::kFalseSharingRange); - assert(static_cast(static_cast(&popTicket_)) - - static_cast(static_cast(&pushTicket_)) - >= detail::CacheLocality::kFalseSharingRange); + assert( + alignof(MPMCQueue) >= hardware_destructive_interference_size); + assert( + static_cast(static_cast(&popTicket_)) - + static_cast(static_cast(&pushTicket_)) >= + static_cast(hardware_destructive_interference_size)); } /// A default-constructed queue is useful because a usable (non-zero @@ -971,12 +976,12 @@ class MPMCQueueBase> : boost::noncopyable { /// To avoid false sharing in slots_ with neighboring memory /// allocations, we pad it with this many SingleElementQueue-s at /// each end - kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1) - / sizeof(Slot) + 1 + kSlotPadding = + (hardware_destructive_interference_size - 1) / sizeof(Slot) + 1 }; /// The maximum number of items in the queue at once - size_t FOLLY_ALIGN_TO_AVOID_FALSE_SHARING capacity_; + alignas(hardware_destructive_interference_size) size_t capacity_; /// Anonymous union for use when Dynamic = false and true, respectively union { @@ -1009,23 +1014,23 @@ class MPMCQueueBase> : boost::noncopyable { Atom dcapacity_; /// Enqueuers get tickets from here - Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushTicket_; + alignas(hardware_destructive_interference_size) Atom pushTicket_; /// Dequeuers get tickets from here - Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popTicket_; + alignas(hardware_destructive_interference_size) Atom popTicket_; /// This is how many times we will spin before using FUTEX_WAIT when /// the queue is full on enqueue, adaptively computed by occasionally /// spinning for longer and smoothing with an exponential moving average - Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushSpinCutoff_; + alignas( + hardware_destructive_interference_size) Atom pushSpinCutoff_; /// The adaptive spin cutoff when the queue is empty on dequeue - Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popSpinCutoff_; + alignas(hardware_destructive_interference_size) Atom popSpinCutoff_; /// Alignment doesn't prevent false sharing at the end of the struct, /// so fill out the last cache line - char padding_[detail::CacheLocality::kFalseSharingRange - - sizeof(Atom)]; + char pad_[hardware_destructive_interference_size - sizeof(Atom)]; /// We assign tickets in increasing order, but we don't want to /// access neighboring elements of slots_ because that will lead to @@ -1304,9 +1309,10 @@ struct SingleElementQueue { } /// enqueue using in-place noexcept construction - template ::value>::type> + template < + typename... Args, + typename = typename std::enable_if< + std::is_nothrow_constructible::value>::type> void enqueue(const uint32_t turn, Atom& spinCutoff, const bool updateSpinCutoff,