Fix dynamic MPMCQueue tryObtainPromisedPushTicket() to prevent tryWriteUntil() and...
authorMaged Michael <magedmichael@fb.com>
Tue, 10 Jan 2017 02:31:20 +0000 (18:31 -0800)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Tue, 10 Jan 2017 02:33:00 +0000 (18:33 -0800)
Summary:
The bug was reported by Alexander Pronchenkov in https://fb.facebook.com/groups/560979627394613/permalink/837052843120622/

Under certain conditions a `tryWriteUntil()`--and also `writeIfNotFull()`--operation may block indefinitely awaiting a matching read. This could happen because in each dynamic MPMCQueue expansion, typically one or two tickets are associated with the closed array not the new one. In the incorrect code, a `tryWriteUntil()` operation that induced expansion but gets a ticket associated with the closed array, incorrectly assumes that because the expansion succeeded then there is space for it. However because the ticket is associated with the closed array, the operation needs to wait (possibly indefinitely) for space to open in the closed array.

The fix: Changed the code in tryObtainPromisedPushTicket() such that the operation tries to acquire a ticket only if there is promised space in the array associated with that ticket. If there is no space, an expansion is attempted if the ticket is not associated with a closed array. If not or if expansion fails because of reaching maximum capacity or for being out-of-memory, then the operation returns false without attempting to acquire the ticket.

Other changes:
- Added a note about this difference in semantic between the dynamic and non-dynamic version to the main comment about the dynamic version.
- Changed `oldCap` to `curCap` because the value is actually current not old.
- Added two tests for checking that tryWriteUntil() never blocks indefinitely for both dynamic and non-dynamic versions.
- Removed all the `never_fail` tests for the dynamic version, because such operations may fails as described above.
- Added `asm_volatile_pause` when spinning on the seqlock.

Reviewed By: djwatson

Differential Revision: D4389347

fbshipit-source-id: c46dbefc9fe08e146250d2ad8ba68b0887f97436

folly/MPMCQueue.h
folly/test/MPMCQueueTest.cpp

index aa4973eea5e9ac5934913c94b01abafe209e6f01..22798880d4e9c58e3ddbb34382f6711f1475f370 100644 (file)
@@ -159,6 +159,13 @@ class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
 /// call to blockingWrite() when the queue size is known to be equal
 /// to its capacity.
 ///
+/// Note that some writeIfNotFull() and tryWriteUntil() operations may
+/// fail even if the size of the queue is less than its maximum
+/// capacity and despite the success of expansion, if the operation
+/// happens to acquire a ticket that belongs to a closed array. This
+/// is a transient condition. Typically, one or two ticket values may
+/// be subject to such condition per expansion.
+///
 /// The dynamic version is a partial specialization of MPMCQueue with
 /// Dynamic == true
 template <typename T, template<typename> class Atom>
@@ -264,6 +271,7 @@ class MPMCQueue<T,Atom,true> :
     uint64_t offset;
     do {
       if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        asm_volatile_pause();
         continue;
       }
       if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
@@ -302,7 +310,9 @@ class MPMCQueue<T,Atom,true> :
     int stride;
     uint64_t state;
     uint64_t offset;
-    while (!trySeqlockReadSection(state, slots, cap, stride));
+    while (!trySeqlockReadSection(state, slots, cap, stride)) {
+      asm_volatile_pause();
+    }
     // If there was an expansion after the corresponding push ticket
     // was issued, adjust accordingly
     maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
@@ -344,6 +354,7 @@ class MPMCQueue<T,Atom,true> :
     do {
       ticket = this->pushTicket_.load(std::memory_order_acquire); // A
       if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        asm_volatile_pause();
         continue;
       }
 
@@ -388,31 +399,26 @@ class MPMCQueue<T,Atom,true> :
       ticket = this->pushTicket_.load(std::memory_order_acquire);
       auto numPops = this->popTicket_.load(std::memory_order_acquire);
       if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        asm_volatile_pause();
         continue;
       }
 
-      const auto oldCap = cap;
+      const auto curCap = cap;
       // If there was an expansion with offset greater than this ticket,
       // adjust accordingly
       uint64_t offset;
       maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
 
       int64_t n = ticket - numPops;
-      if (n >= static_cast<ssize_t>(this->capacity_)) {
-        ticket -= offset;
-        return false;
-      }
 
-      if (n >= static_cast<ssize_t>(oldCap)) {
-        if (tryExpand(state, oldCap)) {
-          // This or another thread started an expansion. Start over
-          // with a new state.
+      if (n >= static_cast<ssize_t>(cap)) {
+        if ((cap == curCap) && tryExpand(state, cap)) {
+          // This or another thread started an expansion. Start over.
           continue;
-        } else {
-          // Can't expand.
-          ticket -= offset;
-          return false;
         }
+        // Can't expand.
+        ticket -= offset;
+        return false;
       }
 
       if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
@@ -430,6 +436,7 @@ class MPMCQueue<T,Atom,true> :
     do {
       ticket = this->popTicket_.load(std::memory_order_relaxed);
       if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        asm_volatile_pause();
         continue;
       }
 
@@ -459,6 +466,7 @@ class MPMCQueue<T,Atom,true> :
       ticket = this->popTicket_.load(std::memory_order_acquire);
       auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
       if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        asm_volatile_pause();
         continue;
       }
 
index d072ea4a0a86fdb3d42f209793955932b477e75f..9269b890dff2a70f9ce0c699215fb2bddfe799ab 100644 (file)
@@ -772,30 +772,23 @@ void runMtNeverFail(std::vector<int>& nts, int n) {
   }
 }
 
+// All the never_fail tests are for the non-dynamic version only.
+// False positive for dynamic version. Some writeIfNotFull() and
+// tryWriteUntil() operations may fail in transient conditions related
+// to expansion.
+
 TEST(MPMCQueue, mt_never_fail) {
   std::vector<int> nts {1, 3, 100};
   int n = 100000;
   runMtNeverFail<std::atomic>(nts, n);
 }
 
-TEST(MPMCQueue, mt_never_fail_dynamic) {
-  std::vector<int> nts {1, 3, 100};
-  int n = 100000;
-  runMtNeverFail<std::atomic, true>(nts, n);
-}
-
 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
   std::vector<int> nts {1, 3, 100};
   int n = 100000;
   runMtNeverFail<EmulatedFutexAtomic>(nts, n);
 }
 
-TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) {
-  std::vector<int> nts {1, 3, 100};
-  int n = 100000;
-  runMtNeverFail<EmulatedFutexAtomic, true>(nts, n);
-}
-
 template<bool Dynamic = false>
 void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
   LOG(INFO) << "using seed " << seed;
@@ -818,13 +811,6 @@ TEST(MPMCQueue, mt_never_fail_deterministic) {
   runMtNeverFailDeterministic(nts, n, seed);
 }
 
-TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) {
-  std::vector<int> nts {3, 10};
-  long seed = 0; // nowMicro() % 10000;
-  int n = 1000;
-  runMtNeverFailDeterministic<true>(nts, n, seed);
-}
-
 template <class Clock, template <typename> class Atom, bool Dynamic>
 void runNeverFailUntilThread(int numThreads,
                              int n, /*numOps*/
@@ -889,12 +875,6 @@ TEST(MPMCQueue, mt_never_fail_until_system) {
   runMtNeverFailUntilSystem(nts, n);
 }
 
-TEST(MPMCQueue, mt_never_fail_until_system_dynamic) {
-  std::vector<int> nts {1, 3, 100};
-  int n = 100000;
-  runMtNeverFailUntilSystem<true>(nts, n);
-}
-
 template <bool Dynamic = false>
 void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
   for (int nt : nts) {
@@ -911,12 +891,6 @@ TEST(MPMCQueue, mt_never_fail_until_steady) {
   runMtNeverFailUntilSteady(nts, n);
 }
 
-TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) {
-  std::vector<int> nts {1, 3, 100};
-  int n = 100000;
-  runMtNeverFailUntilSteady<true>(nts, n);
-}
-
 enum LifecycleEvent {
   NOTHING = -1,
   DEFAULT_CONSTRUCTOR,
@@ -1249,3 +1223,21 @@ TEST(MPMCQueue, try_write_until) {
 TEST(MPMCQueue, try_write_until_dynamic) {
   testTryWriteUntil<true>();
 }
+
+template <bool Dynamic>
+void testTimeout(MPMCQueue<int, std::atomic, Dynamic>& q) {
+  CHECK(q.write(1));
+  /* The following must not block forever */
+  q.tryWriteUntil(
+      std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
+}
+
+TEST(MPMCQueue, try_write_until_timeout) {
+  folly::MPMCQueue<int, std::atomic, false> queue(1);
+  testTimeout<false>(queue);
+}
+
+TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
+  folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
+  testTimeout<true>(queue);
+}