From 1292801d553f23f7aa71ae48a531b5e71a16d7a5 Mon Sep 17 00:00:00 2001 From: Maged Michael Date: Tue, 3 Oct 2017 12:19:29 -0700 Subject: [PATCH] Dynamic MPMCQueue: Backout of last change as it may deadlock Summary: The previous change can lead to deadlock. Backing out. Reviewed By: djwatson Differential Revision: D5957084 fbshipit-source-id: 72ea1cb6236367912b4b087da7e4d57f8a2daed0 --- folly/MPMCQueue.h | 355 ++++++++++++++--------------------- folly/test/MPMCQueueTest.cpp | 40 +--- 2 files changed, 150 insertions(+), 245 deletions(-) diff --git a/folly/MPMCQueue.h b/folly/MPMCQueue.h index 743de144..d4c0ccc7 100644 --- a/folly/MPMCQueue.h +++ b/folly/MPMCQueue.h @@ -118,22 +118,8 @@ class MPMCQueue : public detail::MPMCQueueBase> { /// The dynamic version of MPMCQueue allows dynamic expansion of queue /// capacity, such that a queue may start with a smaller capacity than -/// specified and expand if needed up to the specified capacity. -/// Shrinking is not supported at this point. -/// -/// Users may optionally specify the initial capacity and the -/// expansion multiplier. Otherwise default values are used. -/// -/// Operation on the dynamic version have the same semantics as for -/// the default fixed-size version, except that the total queue -/// capacity can temporarily exceed the specified capacity when there -/// are lagging consumers that haven't yet consumed all the elements -/// in closed arrays. Users should not rely on the capacity of dynamic -/// queues for synchronization, e.g., they should not expect that a -/// thread will definitely block on a call to blockingWrite() when the -/// queue size is known to be equal to its capacity. -/// -/// Design Overview: +/// specified and expand only if needed. Users may optionally specify +/// the initial capacity and the expansion multiplier. /// /// The design uses a seqlock to enforce mutual exclusion among /// expansion attempts. Regular operations read up-to-date queue @@ -168,6 +154,21 @@ class MPMCQueue : public detail::MPMCQueueBase> { /// seqlock read-only section (and hence obtained information for /// older closed arrays). /// +/// Note that the total queue capacity can temporarily exceed the +/// specified capacity when there are lagging consumers that haven't +/// yet consumed all the elements in closed arrays. Users should not +/// rely on the capacity of dynamic queues for synchronization, e.g., +/// they should not expect that a thread will definitely block on a +/// call to blockingWrite() when the queue size is known to be equal +/// to its capacity. +/// +/// Note that some writeIfNotFull() and tryWriteUntil() operations may +/// fail even if the size of the queue is less than its maximum +/// capacity and despite the success of expansion, if the operation +/// happens to acquire a ticket that belongs to a closed array. This +/// is a transient condition. Typically, one or two ticket values may +/// be subject to such condition per expansion. +/// /// The dynamic version is a partial specialization of MPMCQueue with /// Dynamic == true template class Atom> @@ -271,31 +272,36 @@ class MPMCQueue : int stride; uint64_t state; uint64_t offset; - while (true) { - while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) { + do { + if (!trySeqlockReadSection(state, slots, cap, stride)) { asm_volatile_pause(); + continue; } - maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - if (LIKELY(slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( - this->turn(ticket - offset, cap)))) { - // A slot is ready. Fast path. No need to expand. - break; - } - // Slow path - auto head = this->popTicket_.load(std::memory_order_relaxed); - auto avail = std::max(head, offset) + cap; - if (ticket < avail) { - // May block, but a pop is in progress. No need to expand. + if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) { + // There was an expansion after this ticket was issued. break; } - // Try to expand, otherwise this operation may block - // indefinitely awaiting a consumer to unblock it. - if (!tryExpand(state, cap)) { - // Can't expand. Block. + if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( + this->turn(ticket - offset, cap))) { + // A slot is ready. No need to expand. break; + } else if ( + this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) { + // May block, but a pop is in progress. No need to expand. + // Get seqlock read section info again in case an expansion + // occurred with an equal or higher ticket. + continue; + } else { + // May block. See if we can expand. + if (tryExpand(state, cap)) { + // This or another thread started an expansion. Get updated info. + continue; + } else { + // Can't expand. + break; + } } - // Either this or another thread started an expansion Get up-to-date info. - } + } while (true); this->enqueueWithTicketBase(ticket-offset, slots, cap, stride, std::forward(args)...); } @@ -307,7 +313,7 @@ class MPMCQueue : int stride; uint64_t state; uint64_t offset; - while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) { + while (!trySeqlockReadSection(state, slots, cap, stride)) { asm_volatile_pause(); } // If there was an expansion after the corresponding push ticket @@ -318,9 +324,9 @@ class MPMCQueue : private: enum { - kSeqlockBits = 8, - kDefaultMinDynamicCapacity = 16, - kDefaultExpansionMultiplier = 8, + kSeqlockBits = 6, + kDefaultMinDynamicCapacity = 10, + kDefaultExpansionMultiplier = 10, }; size_t dmult_; @@ -347,86 +353,69 @@ class MPMCQueue : uint64_t& ticket, Slot*& slots, size_t& cap, int& stride ) noexcept { uint64_t state; - while (true) { + do { ticket = this->pushTicket_.load(std::memory_order_acquire); // A - if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) { + if (!trySeqlockReadSection(state, slots, cap, stride)) { asm_volatile_pause(); continue; } + // If there was an expansion with offset greater than this ticket, // adjust accordingly uint64_t offset; maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - if (LIKELY(slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( - this->turn(ticket - offset, cap)))) { + + if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( + this->turn(ticket - offset, cap))) { // A slot is ready. - if (UNLIKELY(!this->pushTicket_.compare_exchange_strong( - ticket, ticket + 1))) { - continue; - } - // Validate that state is the same - if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) { + if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { + // Adjust ticket ticket -= offset; return true; + } else { + continue; } - // Slow path - state changed - get up-to-date info for obtained ticket - while (true) { - state = this->dstate_.load(std::memory_order_acquire); - if (trySeqlockReadSection(state, slots, cap, stride)) { - break; - } - asm_volatile_pause(); + } else { + if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B + // Try again. Ticket changed. + continue; } - maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - ticket -= offset; - return true; - } - // slow path - no ready ticket - if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B - // Ticket changed. Start over. - continue; - } - auto head = this->popTicket_.load(std::memory_order_acquire); - auto avail = std::max(head, offset) + cap; - if (ticket < avail) { - // a consumer is in the process of making the slot available - // don't try to expand. Spin if capacity is not - // exhausted. Otherwise return false. - if (cap == this->capacity_) { - return false; + // Likely to block. + // Try to expand unless the ticket is for a closed array + if (offset == getOffset(state)) { + if (tryExpand(state, cap)) { + // This or another thread started an expansion. Get up-to-date info. + continue; + } } - asm_volatile_pause(); - continue; - } - // Likely to block. Try to expand. - if (tryExpand(state, cap)) { - // This or another thread started an expansion. Get up-to-date info. - continue; + return false; } - // No ready ticket and cannot expand - return false; - } + } while (true); } bool tryObtainPromisedPushTicket( uint64_t& ticket, Slot*& slots, size_t& cap, int& stride ) noexcept { uint64_t state; - while (true) { + do { ticket = this->pushTicket_.load(std::memory_order_acquire); - auto head = this->popTicket_.load(std::memory_order_acquire); - if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) { + auto numPops = this->popTicket_.load(std::memory_order_acquire); + if (!trySeqlockReadSection(state, slots, cap, stride)) { asm_volatile_pause(); continue; } + + const auto curCap = cap; // If there was an expansion with offset greater than this ticket, // adjust accordingly uint64_t offset; maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - auto avail = std::max(offset, head) + cap; - if (UNLIKELY(ticket >= avail)) { - if (tryExpand(state, cap)) { - // Space may be available. Start over. + + int64_t n = ticket - numPops; + + if (n >= static_cast(cap)) { + if ((cap == curCap) && tryExpand(state, cap)) { + // This or another thread started an expansion. Start over. continue; } // Can't expand. @@ -434,110 +423,69 @@ class MPMCQueue : return false; } - if (UNLIKELY((!this->pushTicket_.compare_exchange_strong( - ticket, ticket + 1)))) { - continue; - } - // Validate that state is the same - if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) { + if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { + // Adjust ticket ticket -= offset; return true; } - // Obtained ticket but info is out-of-date - Update info - while (true) { - state = this->dstate_.load(std::memory_order_acquire); - if (trySeqlockReadSection(state, slots, cap, stride)) { - break; - } - asm_volatile_pause(); - } - maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - ticket -= offset; - return true; - } + } while (true); } bool tryObtainReadyPopTicket( uint64_t& ticket, Slot*& slots, size_t& cap, int& stride ) noexcept { uint64_t state; - while (true) { + do { ticket = this->popTicket_.load(std::memory_order_relaxed); - if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) { + if (!trySeqlockReadSection(state, slots, cap, stride)) { asm_volatile_pause(); continue; } + // If there was an expansion after the corresponding push ticket // was issued, adjust accordingly uint64_t offset; maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - if (UNLIKELY(!slots[this->idx((ticket - offset), cap, stride)].mayDequeue( - this->turn(ticket - offset, cap)))) { - return false; - } - if (UNLIKELY( - !this->popTicket_.compare_exchange_strong(ticket, ticket + 1))) { - continue; - } - // Validate that state is the same - if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) { - ticket -= offset; - return true; - } - // Obtained ticket but info is out-of-date - Update info - while (true) { - state = this->dstate_.load(std::memory_order_acquire); - if (trySeqlockReadSection(state, slots, cap, stride)) { - break; + + if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue( + this->turn(ticket - offset, cap))) { + if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) { + // Adjust ticket + ticket -= offset; + return true; } - asm_volatile_pause(); + } else { + return false; } - maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - ticket -= offset; - return true; - } + } while (true); } bool tryObtainPromisedPopTicket( uint64_t& ticket, Slot*& slots, size_t& cap, int& stride ) noexcept { uint64_t state; - while (true) { + do { ticket = this->popTicket_.load(std::memory_order_acquire); auto numPushes = this->pushTicket_.load(std::memory_order_acquire); - if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) { + if (!trySeqlockReadSection(state, slots, cap, stride)) { asm_volatile_pause(); continue; } + uint64_t offset; // If there was an expansion after the corresponding push // ticket was issued, adjust accordingly maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - if (UNLIKELY(ticket >= numPushes)) { + + if (ticket >= numPushes) { ticket -= offset; return false; } - if (UNLIKELY( - !this->popTicket_.compare_exchange_strong(ticket, ticket + 1))) { - continue; - } - // Validate that state is the same - if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) { + if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) { ticket -= offset; return true; } - // Obtained ticket but info is out-of-date - Update info - while (true) { - state = this->dstate_.load(std::memory_order_acquire); - if (trySeqlockReadSection(state, slots, cap, stride)) { - break; - } - asm_volatile_pause(); - } - maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - ticket -= offset; - return true; - } + } while (true); } /// Enqueues an element with a specific ticket number @@ -549,8 +497,7 @@ class MPMCQueue : uint64_t state; uint64_t offset; - while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) { - asm_volatile_pause(); + while (!trySeqlockReadSection(state, slots, cap, stride)) { } // If there was an expansion after this ticket was issued, adjust @@ -570,56 +517,51 @@ class MPMCQueue : } /// Try to expand the queue. Returns true if this expansion was - /// successful or a concurent expansion is in progresse. Returns + /// successful or a concurent expansion is in progress. Returns /// false if the queue has reached its maximum capacity or /// allocation has failed. bool tryExpand(const uint64_t state, const size_t cap) noexcept { - if (LIKELY(cap == this->capacity_)) { + if (cap == this->capacity_) { return false; } - return tryExpandWithSeqlock(state, cap); - } - - bool tryExpandWithSeqlock(const uint64_t state, const size_t cap) noexcept { // Acquire seqlock uint64_t oldval = state; assert((state & 1) == 0); - if (!this->dstate_.compare_exchange_strong(oldval, state + 1)) { - // Failed to acquire seqlock. Another thread acaquired it. - // Go back to the caller and get up-to-date info. + if (this->dstate_.compare_exchange_strong(oldval, state + 1)) { + assert(cap == this->dcapacity_.load()); + uint64_t ticket = + 1 + std::max(this->pushTicket_.load(), this->popTicket_.load()); + size_t newCapacity = std::min(dmult_ * cap, this->capacity_); + Slot* newSlots = + new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding]; + if (newSlots == nullptr) { + // Expansion failed. Restore the seqlock + this->dstate_.store(state); + return false; + } + // Successful expansion + // calculate the current ticket offset + uint64_t offset = getOffset(state); + // calculate index in closed array + int index = getNumClosed(state); + assert((index << 1) < (1 << kSeqlockBits)); + // fill the info for the closed slots array + closed_[index].offset_ = offset; + closed_[index].slots_ = this->dslots_.load(); + closed_[index].capacity_ = cap; + closed_[index].stride_ = this->dstride_.load(); + // update the new slots array info + this->dslots_.store(newSlots); + this->dcapacity_.store(newCapacity); + this->dstride_.store(this->computeStride(newCapacity)); + // Release the seqlock and record the new ticket offset + this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1))); + return true; + } else { // failed to acquire seqlock + // Someone acaquired the seqlock. Go back to the caller and get + // up-to-date info. return true; } - // Write critical section - assert(cap == this->dcapacity_.load()); - auto head = this->popTicket_.load(std::memory_order_acquire); - auto avail = std::max(head, getOffset(state)) + cap; - uint64_t newOffset = avail; - size_t newCapacity = std::min(dmult_ * cap, this->capacity_); - Slot* newSlots = - new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding]; - if (newSlots == nullptr) { - // Expansion failed. Restore the seqlock - this->dstate_.store(state); - return false; - } - // Successful expansion - // calculate the current ticket offset - uint64_t offset = getOffset(state); - // calculate index in list of closed slots arrays - int index = getNumClosed(state); - assert((index << 1) < (1 << kSeqlockBits)); - // fill the info for the closed slots array - closed_[index].offset_ = offset; - closed_[index].slots_ = this->dslots_.load(); - closed_[index].capacity_ = cap; - closed_[index].stride_ = this->dstride_.load(); - // update the new slots array info - this->dslots_.store(newSlots); - this->dcapacity_.store(newCapacity); - this->dstride_.store(this->computeStride(newCapacity)); - // Release the seqlock and record the new ticket offset - this->dstate_.store((newOffset << kSeqlockBits) + (2 * (index + 1))); - return true; } /// Seqlock read-only section @@ -627,7 +569,7 @@ class MPMCQueue : uint64_t& state, Slot*& slots, size_t& cap, int& stride ) noexcept { state = this->dstate_.load(std::memory_order_acquire); - if (UNLIKELY(state & 1)) { + if (state & 1) { // Locked. return false; } @@ -637,7 +579,7 @@ class MPMCQueue : stride = this->dstride_.load(std::memory_order_relaxed); // End of read-only section. Validate seqlock. std::atomic_thread_fence(std::memory_order_acquire); - return LIKELY(state == this->dstate_.load(std::memory_order_relaxed)); + return (state == this->dstate_.load(std::memory_order_relaxed)); } /// If there was an expansion after ticket was issued, update local variables @@ -651,32 +593,21 @@ class MPMCQueue : size_t& cap, int& stride) noexcept { offset = getOffset(state); - if (LIKELY(ticket >= offset)) { + if (ticket >= offset) { return false; } - updateFromClosed(state, ticket, offset, slots, cap, stride); - return true; - } - - void updateFromClosed( - const uint64_t state, - const uint64_t ticket, - uint64_t& offset, - Slot*& slots, - size_t& cap, - int& stride) noexcept { for (int i = getNumClosed(state) - 1; i >= 0; --i) { offset = closed_[i].offset_; if (offset <= ticket) { slots = closed_[i].slots_; cap = closed_[i].capacity_; stride = closed_[i].stride_; - return; + return true; } } // A closed array with offset <= ticket should have been found assert(false); - return; + return false; } }; diff --git a/folly/test/MPMCQueueTest.cpp b/folly/test/MPMCQueueTest.cpp index 3b1dd2c9..0874a8f2 100644 --- a/folly/test/MPMCQueueTest.cpp +++ b/folly/test/MPMCQueueTest.cpp @@ -748,6 +748,11 @@ void runMtNeverFail(std::vector& nts, int n) { } } +// All the never_fail tests are for the non-dynamic version only. +// False positive for dynamic version. Some writeIfNotFull() and +// tryWriteUntil() operations may fail in transient conditions related +// to expansion. + TEST(MPMCQueue, mt_never_fail) { std::vector nts {1, 3, 100}; int n = 100000; @@ -760,18 +765,6 @@ TEST(MPMCQueue, mt_never_fail_emulated_futex) { runMtNeverFail(nts, n); } -TEST(MPMCQueue, mt_never_fail_dynamic) { - std::vector nts{1, 3, 100}; - int n = 100000; - runMtNeverFail(nts, n); -} - -TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) { - std::vector nts{1, 3, 100}; - int n = 100000; - runMtNeverFail(nts, n); -} - template void runMtNeverFailDeterministic(std::vector& nts, int n, long seed) { LOG(INFO) << "using seed " << seed; @@ -794,13 +787,6 @@ TEST(MPMCQueue, mt_never_fail_deterministic) { runMtNeverFailDeterministic(nts, n, seed); } -TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) { - std::vector nts{3, 10}; - long seed = 0; // nowMicro() % 10000; - int n = 1000; - runMtNeverFailDeterministic(nts, n, seed); -} - template class Atom, bool Dynamic> void runNeverFailUntilThread(int numThreads, int n, /*numOps*/ @@ -865,12 +851,6 @@ TEST(MPMCQueue, mt_never_fail_until_system) { runMtNeverFailUntilSystem(nts, n); } -TEST(MPMCQueue, mt_never_fail_until_system_dynamic) { - std::vector nts{1, 3, 100}; - int n = 100000; - runMtNeverFailUntilSystem(nts, n); -} - template void runMtNeverFailUntilSteady(std::vector& nts, int n) { for (int nt : nts) { @@ -887,12 +867,6 @@ TEST(MPMCQueue, mt_never_fail_until_steady) { runMtNeverFailUntilSteady(nts, n); } -TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) { - std::vector nts{1, 3, 100}; - int n = 100000; - runMtNeverFailUntilSteady(nts, n); -} - enum LifecycleEvent { NOTHING = -1, DEFAULT_CONSTRUCTOR, @@ -1239,7 +1213,7 @@ TEST(MPMCQueue, try_write_until_timeout) { testTimeout(queue); } -TEST(MPMCQueue, try_write_until_timeout_dynamic) { - folly::MPMCQueue queue(1); +TEST(MPMCQueue, must_fail_try_write_until_dynamic) { + folly::MPMCQueue queue(200, 1, 2); testTimeout(queue); } -- 2.34.1