From 232f650cce09c371810d8b1d9fd94143896eceac Mon Sep 17 00:00:00 2001 From: Maged Michael Date: Mon, 9 Jan 2017 18:31:20 -0800 Subject: [PATCH] Fix dynamic MPMCQueue tryObtainPromisedPushTicket() to prevent tryWriteUntil() and writeIfNotFull() from blocking indefinitely for a matching read. Summary: The bug was reported by Alexander Pronchenkov in https://fb.facebook.com/groups/560979627394613/permalink/837052843120622/ Under certain conditions a `tryWriteUntil()`--and also `writeIfNotFull()`--operation may block indefinitely awaiting a matching read. This could happen because in each dynamic MPMCQueue expansion, typically one or two tickets are associated with the closed array not the new one. In the incorrect code, a `tryWriteUntil()` operation that induced expansion but gets a ticket associated with the closed array, incorrectly assumes that because the expansion succeeded then there is space for it. However because the ticket is associated with the closed array, the operation needs to wait (possibly indefinitely) for space to open in the closed array. The fix: Changed the code in tryObtainPromisedPushTicket() such that the operation tries to acquire a ticket only if there is promised space in the array associated with that ticket. If there is no space, an expansion is attempted if the ticket is not associated with a closed array. If not or if expansion fails because of reaching maximum capacity or for being out-of-memory, then the operation returns false without attempting to acquire the ticket. Other changes: - Added a note about this difference in semantic between the dynamic and non-dynamic version to the main comment about the dynamic version. - Changed `oldCap` to `curCap` because the value is actually current not old. - Added two tests for checking that tryWriteUntil() never blocks indefinitely for both dynamic and non-dynamic versions. - Removed all the `never_fail` tests for the dynamic version, because such operations may fails as described above. - Added `asm_volatile_pause` when spinning on the seqlock. Reviewed By: djwatson Differential Revision: D4389347 fbshipit-source-id: c46dbefc9fe08e146250d2ad8ba68b0887f97436 --- folly/MPMCQueue.h | 36 ++++++++++++++---------- folly/test/MPMCQueueTest.cpp | 54 +++++++++++++++--------------------- 2 files changed, 45 insertions(+), 45 deletions(-) diff --git a/folly/MPMCQueue.h b/folly/MPMCQueue.h index aa4973ee..22798880 100644 --- a/folly/MPMCQueue.h +++ b/folly/MPMCQueue.h @@ -159,6 +159,13 @@ class MPMCQueue : public detail::MPMCQueueBase> { /// call to blockingWrite() when the queue size is known to be equal /// to its capacity. /// +/// Note that some writeIfNotFull() and tryWriteUntil() operations may +/// fail even if the size of the queue is less than its maximum +/// capacity and despite the success of expansion, if the operation +/// happens to acquire a ticket that belongs to a closed array. This +/// is a transient condition. Typically, one or two ticket values may +/// be subject to such condition per expansion. +/// /// The dynamic version is a partial specialization of MPMCQueue with /// Dynamic == true template class Atom> @@ -264,6 +271,7 @@ class MPMCQueue : uint64_t offset; do { if (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); continue; } if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) { @@ -302,7 +310,9 @@ class MPMCQueue : int stride; uint64_t state; uint64_t offset; - while (!trySeqlockReadSection(state, slots, cap, stride)); + while (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); + } // If there was an expansion after the corresponding push ticket // was issued, adjust accordingly maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); @@ -344,6 +354,7 @@ class MPMCQueue : do { ticket = this->pushTicket_.load(std::memory_order_acquire); // A if (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); continue; } @@ -388,31 +399,26 @@ class MPMCQueue : ticket = this->pushTicket_.load(std::memory_order_acquire); auto numPops = this->popTicket_.load(std::memory_order_acquire); if (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); continue; } - const auto oldCap = cap; + const auto curCap = cap; // If there was an expansion with offset greater than this ticket, // adjust accordingly uint64_t offset; maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); int64_t n = ticket - numPops; - if (n >= static_cast(this->capacity_)) { - ticket -= offset; - return false; - } - if (n >= static_cast(oldCap)) { - if (tryExpand(state, oldCap)) { - // This or another thread started an expansion. Start over - // with a new state. + if (n >= static_cast(cap)) { + if ((cap == curCap) && tryExpand(state, cap)) { + // This or another thread started an expansion. Start over. continue; - } else { - // Can't expand. - ticket -= offset; - return false; } + // Can't expand. + ticket -= offset; + return false; } if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { @@ -430,6 +436,7 @@ class MPMCQueue : do { ticket = this->popTicket_.load(std::memory_order_relaxed); if (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); continue; } @@ -459,6 +466,7 @@ class MPMCQueue : ticket = this->popTicket_.load(std::memory_order_acquire); auto numPushes = this->pushTicket_.load(std::memory_order_acquire); if (!trySeqlockReadSection(state, slots, cap, stride)) { + asm_volatile_pause(); continue; } diff --git a/folly/test/MPMCQueueTest.cpp b/folly/test/MPMCQueueTest.cpp index d072ea4a..9269b890 100644 --- a/folly/test/MPMCQueueTest.cpp +++ b/folly/test/MPMCQueueTest.cpp @@ -772,30 +772,23 @@ void runMtNeverFail(std::vector& nts, int n) { } } +// All the never_fail tests are for the non-dynamic version only. +// False positive for dynamic version. Some writeIfNotFull() and +// tryWriteUntil() operations may fail in transient conditions related +// to expansion. + TEST(MPMCQueue, mt_never_fail) { std::vector nts {1, 3, 100}; int n = 100000; runMtNeverFail(nts, n); } -TEST(MPMCQueue, mt_never_fail_dynamic) { - std::vector nts {1, 3, 100}; - int n = 100000; - runMtNeverFail(nts, n); -} - TEST(MPMCQueue, mt_never_fail_emulated_futex) { std::vector nts {1, 3, 100}; int n = 100000; runMtNeverFail(nts, n); } -TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) { - std::vector nts {1, 3, 100}; - int n = 100000; - runMtNeverFail(nts, n); -} - template void runMtNeverFailDeterministic(std::vector& nts, int n, long seed) { LOG(INFO) << "using seed " << seed; @@ -818,13 +811,6 @@ TEST(MPMCQueue, mt_never_fail_deterministic) { runMtNeverFailDeterministic(nts, n, seed); } -TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) { - std::vector nts {3, 10}; - long seed = 0; // nowMicro() % 10000; - int n = 1000; - runMtNeverFailDeterministic(nts, n, seed); -} - template class Atom, bool Dynamic> void runNeverFailUntilThread(int numThreads, int n, /*numOps*/ @@ -889,12 +875,6 @@ TEST(MPMCQueue, mt_never_fail_until_system) { runMtNeverFailUntilSystem(nts, n); } -TEST(MPMCQueue, mt_never_fail_until_system_dynamic) { - std::vector nts {1, 3, 100}; - int n = 100000; - runMtNeverFailUntilSystem(nts, n); -} - template void runMtNeverFailUntilSteady(std::vector& nts, int n) { for (int nt : nts) { @@ -911,12 +891,6 @@ TEST(MPMCQueue, mt_never_fail_until_steady) { runMtNeverFailUntilSteady(nts, n); } -TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) { - std::vector nts {1, 3, 100}; - int n = 100000; - runMtNeverFailUntilSteady(nts, n); -} - enum LifecycleEvent { NOTHING = -1, DEFAULT_CONSTRUCTOR, @@ -1249,3 +1223,21 @@ TEST(MPMCQueue, try_write_until) { TEST(MPMCQueue, try_write_until_dynamic) { testTryWriteUntil(); } + +template +void testTimeout(MPMCQueue& q) { + CHECK(q.write(1)); + /* The following must not block forever */ + q.tryWriteUntil( + std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2); +} + +TEST(MPMCQueue, try_write_until_timeout) { + folly::MPMCQueue queue(1); + testTimeout(queue); +} + +TEST(MPMCQueue, must_fail_try_write_until_dynamic) { + folly::MPMCQueue queue(200, 1, 2); + testTimeout(queue); +} -- 2.34.1