Refactor the CMake file to work with CMake 3.8.2
[folly.git] / folly / MPMCQueue.h
index aa4973eea5e9ac5934913c94b01abafe209e6f01..b0cfc46f6dfc833c839544f055bd12f4282835fd 100644 (file)
@@ -159,6 +159,13 @@ class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
 /// call to blockingWrite() when the queue size is known to be equal
 /// to its capacity.
 ///
+/// Note that some writeIfNotFull() and tryWriteUntil() operations may
+/// fail even if the size of the queue is less than its maximum
+/// capacity and despite the success of expansion, if the operation
+/// happens to acquire a ticket that belongs to a closed array. This
+/// is a transient condition. Typically, one or two ticket values may
+/// be subject to such condition per expansion.
+///
 /// The dynamic version is a partial specialization of MPMCQueue with
 /// Dynamic == true
 template <typename T, template<typename> class Atom>
@@ -264,6 +271,7 @@ class MPMCQueue<T,Atom,true> :
     uint64_t offset;
     do {
       if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        asm_volatile_pause();
         continue;
       }
       if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
@@ -302,7 +310,9 @@ class MPMCQueue<T,Atom,true> :
     int stride;
     uint64_t state;
     uint64_t offset;
-    while (!trySeqlockReadSection(state, slots, cap, stride));
+    while (!trySeqlockReadSection(state, slots, cap, stride)) {
+      asm_volatile_pause();
+    }
     // If there was an expansion after the corresponding push ticket
     // was issued, adjust accordingly
     maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
@@ -344,6 +354,7 @@ class MPMCQueue<T,Atom,true> :
     do {
       ticket = this->pushTicket_.load(std::memory_order_acquire); // A
       if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        asm_volatile_pause();
         continue;
       }
 
@@ -388,31 +399,26 @@ class MPMCQueue<T,Atom,true> :
       ticket = this->pushTicket_.load(std::memory_order_acquire);
       auto numPops = this->popTicket_.load(std::memory_order_acquire);
       if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        asm_volatile_pause();
         continue;
       }
 
-      const auto oldCap = cap;
+      const auto curCap = cap;
       // If there was an expansion with offset greater than this ticket,
       // adjust accordingly
       uint64_t offset;
       maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
 
       int64_t n = ticket - numPops;
-      if (n >= static_cast<ssize_t>(this->capacity_)) {
-        ticket -= offset;
-        return false;
-      }
 
-      if (n >= static_cast<ssize_t>(oldCap)) {
-        if (tryExpand(state, oldCap)) {
-          // This or another thread started an expansion. Start over
-          // with a new state.
+      if (n >= static_cast<ssize_t>(cap)) {
+        if ((cap == curCap) && tryExpand(state, cap)) {
+          // This or another thread started an expansion. Start over.
           continue;
-        } else {
-          // Can't expand.
-          ticket -= offset;
-          return false;
         }
+        // Can't expand.
+        ticket -= offset;
+        return false;
       }
 
       if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
@@ -430,6 +436,7 @@ class MPMCQueue<T,Atom,true> :
     do {
       ticket = this->popTicket_.load(std::memory_order_relaxed);
       if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        asm_volatile_pause();
         continue;
       }
 
@@ -459,6 +466,7 @@ class MPMCQueue<T,Atom,true> :
       ticket = this->popTicket_.load(std::memory_order_acquire);
       auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
       if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        asm_volatile_pause();
         continue;
       }
 
@@ -1147,7 +1155,7 @@ class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
       ticket = numPushes;
       const auto numPops = popTicket_.load(std::memory_order_acquire); // B
       // n will be negative if pops are pending
-      const int64_t n = numPushes - numPops;
+      const int64_t n = int64_t(numPushes - numPops);
       if (n >= static_cast<ssize_t>(capacity_)) {
         // Full, linearize at B.  We don't need to recheck the read we
         // performed at A, because if numPushes was stale at B then the