X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2FMPMCQueue.h;h=d4c0ccc7c8307dbfec970f75fa8b6bd676da6339;hp=74899ae80d2b720f05eabb468750537c6c965a05;hb=82d8337f03e90168bba5b66f9383228863661e70;hpb=d7d91eb119b9f0758cd7c9e952e9a58209ca5a67 diff --git a/folly/MPMCQueue.h b/folly/MPMCQueue.h index 74899ae8..d4c0ccc7 100644 --- a/folly/MPMCQueue.h +++ b/folly/MPMCQueue.h @@ -1,5 +1,5 @@ /* - * Copyright 2016 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,14 +18,15 @@ #include #include -#include -#include +#include +#include #include -#include #include +#include + #include -#include +#include #include #include @@ -33,7 +34,7 @@ namespace folly { namespace detail { -template class Atom> +template class Atom> struct SingleElementQueue; template class MPMCPipelineStageImpl; @@ -86,7 +87,7 @@ template class MPMCQueueBase; /// use noexcept, you will have to wrap it in something that provides /// the guarantee. We provide an alternate safe implementation for types /// that don't use noexcept but that are marked folly::IsRelocatable -/// and boost::has_nothrow_constructor, which is common for folly types. +/// and std::is_nothrow_constructible, which is common for folly types. /// In particular, if you can declare FOLLY_ASSUME_FBVECTOR_COMPATIBLE /// then your type can be put in MPMCQueue. /// @@ -96,8 +97,10 @@ template class MPMCQueueBase; /// are you can enqueue one sentinel and then have each consumer requeue /// two sentinels after it receives it (by requeuing 2 the shutdown can /// complete in O(log P) time instead of O(P)). -template class Atom = std::atomic, - bool Dynamic = false> +template < + typename T, + template class Atom = std::atomic, + bool Dynamic = false> class MPMCQueue : public detail::MPMCQueueBase> { friend class detail::MPMCPipelineStageImpl; using Slot = detail::SingleElementQueue; @@ -141,7 +144,7 @@ class MPMCQueue : public detail::MPMCQueueBase> { /// closed arrays instead of the current one. Information about closed /// slots arrays (array address, capacity, stride, and offset) is /// maintained in a logarithmic-sized structure. Each entry in that -/// structure never need to be changed once set. The number of closed +/// structure never needs to be changed once set. The number of closed /// arrays is half the value of the seqlock (when unlocked). /// /// The acquisition of the seqlock to perform an expansion does not @@ -159,9 +162,16 @@ class MPMCQueue : public detail::MPMCQueueBase> { /// call to blockingWrite() when the queue size is known to be equal /// to its capacity. /// +/// Note that some writeIfNotFull() and tryWriteUntil() operations may +/// fail even if the size of the queue is less than its maximum +/// capacity and despite the success of expansion, if the operation +/// happens to acquire a ticket that belongs to a closed array. This +/// is a transient condition. Typically, one or two ticket values may +/// be subject to such condition per expansion. +/// /// The dynamic version is a partial specialization of MPMCQueue with /// Dynamic == true -template class Atom> +template class Atom> class MPMCQueue : public detail::MPMCQueueBase> { friend class detail::MPMCQueueBase>; @@ -264,18 +274,19 @@ class MPMCQueue : uint64_t offset; do { if (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); continue; } if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) { // There was an expansion after this ticket was issued. break; } - if (slots[this->idx((ticket-offset), cap, stride)] - .mayEnqueue(this->turn(ticket-offset, cap))) { + if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( + this->turn(ticket - offset, cap))) { // A slot is ready. No need to expand. break; - } else if (this->popTicket_.load(std::memory_order_relaxed) + cap - > ticket) { + } else if ( + this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) { // May block, but a pop is in progress. No need to expand. // Get seqlock read section info again in case an expansion // occurred with an equal or higher ticket. @@ -302,7 +313,9 @@ class MPMCQueue : int stride; uint64_t state; uint64_t offset; - while (!trySeqlockReadSection(state, slots, cap, stride)); + while (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); + } // If there was an expansion after the corresponding push ticket // was issued, adjust accordingly maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); @@ -310,7 +323,6 @@ class MPMCQueue : } private: - enum { kSeqlockBits = 6, kDefaultMinDynamicCapacity = 10, @@ -344,6 +356,7 @@ class MPMCQueue : do { ticket = this->pushTicket_.load(std::memory_order_acquire); // A if (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); continue; } @@ -352,8 +365,8 @@ class MPMCQueue : uint64_t offset; maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - if (slots[this->idx((ticket-offset), cap, stride)] - .mayEnqueue(this->turn(ticket-offset, cap))) { + if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( + this->turn(ticket - offset, cap))) { // A slot is ready. if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { // Adjust ticket @@ -388,31 +401,26 @@ class MPMCQueue : ticket = this->pushTicket_.load(std::memory_order_acquire); auto numPops = this->popTicket_.load(std::memory_order_acquire); if (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); continue; } - const auto oldCap = cap; + const auto curCap = cap; // If there was an expansion with offset greater than this ticket, // adjust accordingly uint64_t offset; maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); int64_t n = ticket - numPops; - if (n >= static_cast(this->capacity_)) { - ticket -= offset; - return false; - } - if (n >= static_cast(oldCap)) { - if (tryExpand(state, oldCap)) { - // This or another thread started an expansion. Start over - // with a new state. + if (n >= static_cast(cap)) { + if ((cap == curCap) && tryExpand(state, cap)) { + // This or another thread started an expansion. Start over. continue; - } else { - // Can't expand. - ticket -= offset; - return false; } + // Can't expand. + ticket -= offset; + return false; } if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { @@ -430,6 +438,7 @@ class MPMCQueue : do { ticket = this->popTicket_.load(std::memory_order_relaxed); if (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); continue; } @@ -438,8 +447,8 @@ class MPMCQueue : uint64_t offset; maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - if (slots[this->idx((ticket-offset), cap, stride)] - .mayDequeue(this->turn(ticket-offset, cap))) { + if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue( + this->turn(ticket - offset, cap))) { if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) { // Adjust ticket ticket -= offset; @@ -459,6 +468,7 @@ class MPMCQueue : ticket = this->popTicket_.load(std::memory_order_acquire); auto numPushes = this->pushTicket_.load(std::memory_order_acquire); if (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); continue; } @@ -487,7 +497,8 @@ class MPMCQueue : uint64_t state; uint64_t offset; - while (!trySeqlockReadSection(state, slots, cap, stride)) {} + while (!trySeqlockReadSection(state, slots, cap, stride)) { + } // If there was an expansion after this ticket was issued, adjust // accordingly @@ -518,12 +529,11 @@ class MPMCQueue : assert((state & 1) == 0); if (this->dstate_.compare_exchange_strong(oldval, state + 1)) { assert(cap == this->dcapacity_.load()); - uint64_t ticket = 1 + std::max(this->pushTicket_.load(), - this->popTicket_.load()); - size_t newCapacity = - std::min(dmult_ * cap, this->capacity_); + uint64_t ticket = + 1 + std::max(this->pushTicket_.load(), this->popTicket_.load()); + size_t newCapacity = std::min(dmult_ * cap, this->capacity_); Slot* newSlots = - new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding]; + new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding]; if (newSlots == nullptr) { // Expansion failed. Restore the seqlock this->dstate_.store(state); @@ -604,10 +614,12 @@ class MPMCQueue : namespace detail { /// CRTP specialization of MPMCQueueBase -template< - template< - typename T, template class Atom, bool Dynamic> class Derived, - typename T, template class Atom, bool Dynamic> +template < + template class Atom, bool Dynamic> + class Derived, + typename T, + template class Atom, + bool Dynamic> class MPMCQueueBase> : boost::noncopyable { // Note: Using CRTP static casts in several functions of this base @@ -639,11 +651,11 @@ class MPMCQueueBase> : boost::noncopyable { } // ideally this would be a static assert, but g++ doesn't allow it - assert(alignof(MPMCQueue) - >= detail::CacheLocality::kFalseSharingRange); - assert(static_cast(static_cast(&popTicket_)) - - static_cast(static_cast(&pushTicket_)) - >= detail::CacheLocality::kFalseSharingRange); + assert(alignof(MPMCQueue) >= CacheLocality::kFalseSharingRange); + assert( + static_cast(static_cast(&popTicket_)) - + static_cast(static_cast(&pushTicket_)) >= + CacheLocality::kFalseSharingRange); } /// A default-constructed queue is useful because a usable (non-zero @@ -963,8 +975,7 @@ class MPMCQueueBase> : boost::noncopyable { /// To avoid false sharing in slots_ with neighboring memory /// allocations, we pad it with this many SingleElementQueue-s at /// each end - kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1) - / sizeof(Slot) + 1 + kSlotPadding = (CacheLocality::kFalseSharingRange - 1) / sizeof(Slot) + 1 }; /// The maximum number of items in the queue at once @@ -1016,8 +1027,7 @@ class MPMCQueueBase> : boost::noncopyable { /// Alignment doesn't prevent false sharing at the end of the struct, /// so fill out the last cache line - char padding_[detail::CacheLocality::kFalseSharingRange - - sizeof(Atom)]; + char padding_[CacheLocality::kFalseSharingRange - sizeof(Atom)]; /// We assign tickets in increasing order, but we don't want to /// access neighboring elements of slots_ because that will lead to @@ -1147,7 +1157,7 @@ class MPMCQueueBase> : boost::noncopyable { ticket = numPushes; const auto numPops = popTicket_.load(std::memory_order_acquire); // B // n will be negative if pops are pending - const int64_t n = numPushes - numPops; + const int64_t n = int64_t(numPushes - numPops); if (n >= static_cast(capacity_)) { // Full, linearize at B. We don't need to recheck the read we // performed at A, because if numPushes was stale at B then the @@ -1296,9 +1306,10 @@ struct SingleElementQueue { } /// enqueue using in-place noexcept construction - template ::value>::type> + template < + typename... Args, + typename = typename std::enable_if< + std::is_nothrow_constructible::value>::type> void enqueue(const uint32_t turn, Atom& spinCutoff, const bool updateSpinCutoff, @@ -1310,15 +1321,17 @@ struct SingleElementQueue { /// enqueue using move construction, either real (if /// is_nothrow_move_constructible) or simulated using relocation and - /// default construction (if IsRelocatable and has_nothrow_constructor) - template ::value && - boost::has_nothrow_constructor::value) || - std::is_nothrow_constructible::value>::type> - void enqueue(const uint32_t turn, - Atom& spinCutoff, - const bool updateSpinCutoff, - T&& goner) noexcept { + /// default construction (if IsRelocatable and is_nothrow_constructible) + template < + typename = typename std::enable_if< + (folly::IsRelocatable::value && + std::is_nothrow_constructible::value) || + std::is_nothrow_constructible::value>::type> + void enqueue( + const uint32_t turn, + Atom& spinCutoff, + const bool updateSpinCutoff, + T&& goner) noexcept { enqueueImpl( turn, spinCutoff,