Fix the assumption in the propagate_const test about local layout
[folly.git] / folly / MPMCQueue.h
index 743de144fe9c51282c9cac4705fbff1bdccb88a2..932ae8fc2add1af1ad29c15aa0a1ebe4b3aa4039 100644 (file)
@@ -118,22 +118,8 @@ class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
 
 /// The dynamic version of MPMCQueue allows dynamic expansion of queue
 /// capacity, such that a queue may start with a smaller capacity than
-/// specified and expand if needed up to the specified capacity.
-/// Shrinking is not supported at this point.
-///
-/// Users may optionally specify the initial capacity and the
-/// expansion multiplier. Otherwise default values are used.
-///
-/// Operation on the dynamic version have the same semantics as for
-/// the default fixed-size version, except that the total queue
-/// capacity can temporarily exceed the specified capacity when there
-/// are lagging consumers that haven't yet consumed all the elements
-/// in closed arrays. Users should not rely on the capacity of dynamic
-/// queues for synchronization, e.g., they should not expect that a
-/// thread will definitely block on a call to blockingWrite() when the
-/// queue size is known to be equal to its capacity.
-///
-/// Design Overview:
+/// specified and expand only if needed. Users may optionally specify
+/// the initial capacity and the expansion multiplier.
 ///
 /// The design uses a seqlock to enforce mutual exclusion among
 /// expansion attempts. Regular operations read up-to-date queue
@@ -168,6 +154,21 @@ class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
 /// seqlock read-only section (and hence obtained information for
 /// older closed arrays).
 ///
+/// Note that the total queue capacity can temporarily exceed the
+/// specified capacity when there are lagging consumers that haven't
+/// yet consumed all the elements in closed arrays. Users should not
+/// rely on the capacity of dynamic queues for synchronization, e.g.,
+/// they should not expect that a thread will definitely block on a
+/// call to blockingWrite() when the queue size is known to be equal
+/// to its capacity.
+///
+/// Note that some writeIfNotFull() and tryWriteUntil() operations may
+/// fail even if the size of the queue is less than its maximum
+/// capacity and despite the success of expansion, if the operation
+/// happens to acquire a ticket that belongs to a closed array. This
+/// is a transient condition. Typically, one or two ticket values may
+/// be subject to such condition per expansion.
+///
 /// The dynamic version is a partial specialization of MPMCQueue with
 /// Dynamic == true
 template <typename T, template <typename> class Atom>
@@ -271,31 +272,36 @@ class MPMCQueue<T,Atom,true> :
     int stride;
     uint64_t state;
     uint64_t offset;
-    while (true) {
-      while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
+    do {
+      if (!trySeqlockReadSection(state, slots, cap, stride)) {
         asm_volatile_pause();
+        continue;
       }
-      maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-      if (LIKELY(slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
-              this->turn(ticket - offset, cap)))) {
-        // A slot is ready. Fast path. No need to expand.
-        break;
-      }
-      // Slow path
-      auto head = this->popTicket_.load(std::memory_order_relaxed);
-      auto avail = std::max(head, offset) + cap;
-      if (ticket < avail) {
-        // May block, but a pop is in progress. No need to expand.
+      if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
+        // There was an expansion after this ticket was issued.
         break;
       }
-      // Try to expand, otherwise this operation may block
-      // indefinitely awaiting a consumer to unblock it.
-      if (!tryExpand(state, cap)) {
-        // Can't expand. Block.
+      if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+              this->turn(ticket - offset, cap))) {
+        // A slot is ready. No need to expand.
         break;
+      } else if (
+          this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) {
+        // May block, but a pop is in progress. No need to expand.
+        // Get seqlock read section info again in case an expansion
+        // occurred with an equal or higher ticket.
+        continue;
+      } else {
+        // May block. See if we can expand.
+        if (tryExpand(state, cap)) {
+          // This or another thread started an expansion. Get updated info.
+          continue;
+        } else {
+          // Can't expand.
+          break;
+        }
       }
-      // Either this or another thread started an expansion Get up-to-date info.
-    }
+    } while (true);
     this->enqueueWithTicketBase(ticket-offset, slots, cap, stride,
                                 std::forward<Args>(args)...);
   }
@@ -307,7 +313,7 @@ class MPMCQueue<T,Atom,true> :
     int stride;
     uint64_t state;
     uint64_t offset;
-    while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
+    while (!trySeqlockReadSection(state, slots, cap, stride)) {
       asm_volatile_pause();
     }
     // If there was an expansion after the corresponding push ticket
@@ -318,9 +324,9 @@ class MPMCQueue<T,Atom,true> :
 
  private:
   enum {
-    kSeqlockBits = 8,
-    kDefaultMinDynamicCapacity = 16,
-    kDefaultExpansionMultiplier = 8,
+    kSeqlockBits = 6,
+    kDefaultMinDynamicCapacity = 10,
+    kDefaultExpansionMultiplier = 10,
   };
 
   size_t dmult_;
@@ -347,86 +353,69 @@ class MPMCQueue<T,Atom,true> :
       uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
   ) noexcept {
     uint64_t state;
-    while (true) {
+    do {
       ticket = this->pushTicket_.load(std::memory_order_acquire); // A
-      if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
+      if (!trySeqlockReadSection(state, slots, cap, stride)) {
         asm_volatile_pause();
         continue;
       }
+
       // If there was an expansion with offset greater than this ticket,
       // adjust accordingly
       uint64_t offset;
       maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-      if (LIKELY(slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
-              this->turn(ticket - offset, cap)))) {
+
+      if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+              this->turn(ticket - offset, cap))) {
         // A slot is ready.
-        if (UNLIKELY(!this->pushTicket_.compare_exchange_strong(
-                ticket, ticket + 1))) {
-          continue;
-        }
-        // Validate that state is the same
-        if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
+        if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+          // Adjust ticket
           ticket -= offset;
           return true;
+        } else {
+          continue;
         }
-        // Slow path - state changed - get up-to-date info for obtained ticket
-        while (true) {
-          state = this->dstate_.load(std::memory_order_acquire);
-          if (trySeqlockReadSection(state, slots, cap, stride)) {
-            break;
-          }
-          asm_volatile_pause();
+      } else {
+        if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
+          // Try again. Ticket changed.
+          continue;
         }
-        maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-        ticket -= offset;
-        return true;
-      }
-      // slow path - no ready ticket
-      if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
-        // Ticket changed. Start over.
-        continue;
-      }
-      auto head = this->popTicket_.load(std::memory_order_acquire);
-      auto avail = std::max(head, offset) + cap;
-      if (ticket < avail) {
-        // a consumer is in the process of making the slot available
-        // don't try to expand. Spin if capacity is not
-        // exhausted. Otherwise return false.
-        if (cap == this->capacity_) {
-          return false;
+        // Likely to block.
+        // Try to expand unless the ticket is for a closed array
+        if (offset == getOffset(state)) {
+          if (tryExpand(state, cap)) {
+            // This or another thread started an expansion. Get up-to-date info.
+            continue;
+          }
         }
-        asm_volatile_pause();
-        continue;
-      }
-      // Likely to block. Try to expand.
-      if (tryExpand(state, cap)) {
-        // This or another thread started an expansion. Get up-to-date info.
-        continue;
+        return false;
       }
-      // No ready ticket and cannot expand
-      return false;
-    }
+    } while (true);
   }
 
   bool tryObtainPromisedPushTicket(
     uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
   ) noexcept {
     uint64_t state;
-    while (true) {
+    do {
       ticket = this->pushTicket_.load(std::memory_order_acquire);
-      auto head = this->popTicket_.load(std::memory_order_acquire);
-      if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
+      auto numPops = this->popTicket_.load(std::memory_order_acquire);
+      if (!trySeqlockReadSection(state, slots, cap, stride)) {
         asm_volatile_pause();
         continue;
       }
+
+      const auto curCap = cap;
       // If there was an expansion with offset greater than this ticket,
       // adjust accordingly
       uint64_t offset;
       maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-      auto avail = std::max(offset, head) + cap;
-      if (UNLIKELY(ticket >= avail)) {
-        if (tryExpand(state, cap)) {
-          // Space may be available. Start over.
+
+      int64_t n = ticket - numPops;
+
+      if (n >= static_cast<ssize_t>(cap)) {
+        if ((cap == curCap) && tryExpand(state, cap)) {
+          // This or another thread started an expansion. Start over.
           continue;
         }
         // Can't expand.
@@ -434,110 +423,69 @@ class MPMCQueue<T,Atom,true> :
         return false;
       }
 
-      if (UNLIKELY((!this->pushTicket_.compare_exchange_strong(
-              ticket, ticket + 1)))) {
-        continue;
-      }
-      // Validate that state is the same
-      if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
+      if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+        // Adjust ticket
         ticket -= offset;
         return true;
       }
-      // Obtained ticket but info is out-of-date - Update info
-      while (true) {
-        state = this->dstate_.load(std::memory_order_acquire);
-        if (trySeqlockReadSection(state, slots, cap, stride)) {
-          break;
-        }
-        asm_volatile_pause();
-      }
-      maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-      ticket -= offset;
-      return true;
-    }
+    } while (true);
   }
 
   bool tryObtainReadyPopTicket(
     uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
   ) noexcept {
     uint64_t state;
-    while (true) {
+    do {
       ticket = this->popTicket_.load(std::memory_order_relaxed);
-      if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
+      if (!trySeqlockReadSection(state, slots, cap, stride)) {
         asm_volatile_pause();
         continue;
       }
+
       // If there was an expansion after the corresponding push ticket
       // was issued, adjust accordingly
       uint64_t offset;
       maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-      if (UNLIKELY(!slots[this->idx((ticket - offset), cap, stride)].mayDequeue(
-              this->turn(ticket - offset, cap)))) {
-        return false;
-      }
-      if (UNLIKELY(
-              !this->popTicket_.compare_exchange_strong(ticket, ticket + 1))) {
-        continue;
-      }
-      // Validate that state is the same
-      if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
-        ticket -= offset;
-        return true;
-      }
-      // Obtained ticket but info is out-of-date - Update info
-      while (true) {
-        state = this->dstate_.load(std::memory_order_acquire);
-        if (trySeqlockReadSection(state, slots, cap, stride)) {
-          break;
+
+      if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue(
+              this->turn(ticket - offset, cap))) {
+        if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+          // Adjust ticket
+          ticket -= offset;
+          return true;
         }
-        asm_volatile_pause();
+      } else {
+        return false;
       }
-      maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-      ticket -= offset;
-      return true;
-    }
+    } while (true);
   }
 
   bool tryObtainPromisedPopTicket(
     uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
   ) noexcept {
     uint64_t state;
-    while (true) {
+    do {
       ticket = this->popTicket_.load(std::memory_order_acquire);
       auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
-      if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
+      if (!trySeqlockReadSection(state, slots, cap, stride)) {
         asm_volatile_pause();
         continue;
       }
+
       uint64_t offset;
       // If there was an expansion after the corresponding push
       // ticket was issued, adjust accordingly
       maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-      if (UNLIKELY(ticket >= numPushes)) {
+
+      if (ticket >= numPushes) {
         ticket -= offset;
         return false;
       }
-      if (UNLIKELY(
-              !this->popTicket_.compare_exchange_strong(ticket, ticket + 1))) {
-        continue;
-      }
-      // Validate that state is the same
-      if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
+      if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
         ticket -= offset;
         return true;
       }
-      // Obtained ticket but info is out-of-date - Update info
-      while (true) {
-        state = this->dstate_.load(std::memory_order_acquire);
-        if (trySeqlockReadSection(state, slots, cap, stride)) {
-          break;
-        }
-        asm_volatile_pause();
-      }
-      maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-      ticket -= offset;
-      return true;
-    }
+    } while (true);
   }
 
   /// Enqueues an element with a specific ticket number
@@ -549,8 +497,7 @@ class MPMCQueue<T,Atom,true> :
     uint64_t state;
     uint64_t offset;
 
-    while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
-      asm_volatile_pause();
+    while (!trySeqlockReadSection(state, slots, cap, stride)) {
     }
 
     // If there was an expansion after this ticket was issued, adjust
@@ -570,56 +517,51 @@ class MPMCQueue<T,Atom,true> :
   }
 
   /// Try to expand the queue. Returns true if this expansion was
-  /// successful or a concurent expansion is in progresse. Returns
+  /// successful or a concurent expansion is in progress. Returns
   /// false if the queue has reached its maximum capacity or
   /// allocation has failed.
   bool tryExpand(const uint64_t state, const size_t cap) noexcept {
-    if (LIKELY(cap == this->capacity_)) {
+    if (cap == this->capacity_) {
       return false;
     }
-    return tryExpandWithSeqlock(state, cap);
-  }
-
-  bool tryExpandWithSeqlock(const uint64_t state, const size_t cap) noexcept {
     // Acquire seqlock
     uint64_t oldval = state;
     assert((state & 1) == 0);
-    if (!this->dstate_.compare_exchange_strong(oldval, state + 1)) {
-      // Failed to acquire seqlock. Another thread acaquired it.
-      // Go back to the caller and get up-to-date info.
+    if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
+      assert(cap == this->dcapacity_.load());
+      uint64_t ticket =
+          1 + std::max(this->pushTicket_.load(), this->popTicket_.load());
+      size_t newCapacity = std::min(dmult_ * cap, this->capacity_);
+      Slot* newSlots =
+          new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
+      if (newSlots == nullptr) {
+        // Expansion failed. Restore the seqlock
+        this->dstate_.store(state);
+        return false;
+      }
+      // Successful expansion
+      // calculate the current ticket offset
+      uint64_t offset = getOffset(state);
+      // calculate index in closed array
+      int index = getNumClosed(state);
+      assert((index << 1) < (1 << kSeqlockBits));
+      // fill the info for the closed slots array
+      closed_[index].offset_ = offset;
+      closed_[index].slots_ = this->dslots_.load();
+      closed_[index].capacity_ = cap;
+      closed_[index].stride_ = this->dstride_.load();
+      // update the new slots array info
+      this->dslots_.store(newSlots);
+      this->dcapacity_.store(newCapacity);
+      this->dstride_.store(this->computeStride(newCapacity));
+      // Release the seqlock and record the new ticket offset
+      this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1)));
+      return true;
+    } else { // failed to acquire seqlock
+      // Someone acaquired the seqlock. Go back to the caller and get
+      // up-to-date info.
       return true;
     }
-    // Write critical section
-    assert(cap == this->dcapacity_.load());
-    auto head = this->popTicket_.load(std::memory_order_acquire);
-    auto avail = std::max(head, getOffset(state)) + cap;
-    uint64_t newOffset = avail;
-    size_t newCapacity = std::min(dmult_ * cap, this->capacity_);
-    Slot* newSlots =
-        new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
-    if (newSlots == nullptr) {
-      // Expansion failed. Restore the seqlock
-      this->dstate_.store(state);
-      return false;
-    }
-    // Successful expansion
-    // calculate the current ticket offset
-    uint64_t offset = getOffset(state);
-    // calculate index in list of closed slots arrays
-    int index = getNumClosed(state);
-    assert((index << 1) < (1 << kSeqlockBits));
-    // fill the info for the closed slots array
-    closed_[index].offset_ = offset;
-    closed_[index].slots_ = this->dslots_.load();
-    closed_[index].capacity_ = cap;
-    closed_[index].stride_ = this->dstride_.load();
-    // update the new slots array info
-    this->dslots_.store(newSlots);
-    this->dcapacity_.store(newCapacity);
-    this->dstride_.store(this->computeStride(newCapacity));
-    // Release the seqlock and record the new ticket offset
-    this->dstate_.store((newOffset << kSeqlockBits) + (2 * (index + 1)));
-    return true;
   }
 
   /// Seqlock read-only section
@@ -627,7 +569,7 @@ class MPMCQueue<T,Atom,true> :
     uint64_t& state, Slot*& slots, size_t& cap, int& stride
   ) noexcept {
     state = this->dstate_.load(std::memory_order_acquire);
-    if (UNLIKELY(state & 1)) {
+    if (state & 1) {
       // Locked.
       return false;
     }
@@ -637,7 +579,7 @@ class MPMCQueue<T,Atom,true> :
     stride = this->dstride_.load(std::memory_order_relaxed);
     // End of read-only section. Validate seqlock.
     std::atomic_thread_fence(std::memory_order_acquire);
-    return LIKELY(state == this->dstate_.load(std::memory_order_relaxed));
+    return (state == this->dstate_.load(std::memory_order_relaxed));
   }
 
   /// If there was an expansion after ticket was issued, update local variables
@@ -651,32 +593,21 @@ class MPMCQueue<T,Atom,true> :
       size_t& cap,
       int& stride) noexcept {
     offset = getOffset(state);
-    if (LIKELY(ticket >= offset)) {
+    if (ticket >= offset) {
       return false;
     }
-    updateFromClosed(state, ticket, offset, slots, cap, stride);
-    return true;
-  }
-
-  void updateFromClosed(
-      const uint64_t state,
-      const uint64_t ticket,
-      uint64_t& offset,
-      Slot*& slots,
-      size_t& cap,
-      int& stride) noexcept {
     for (int i = getNumClosed(state) - 1; i >= 0; --i) {
       offset = closed_[i].offset_;
       if (offset <= ticket) {
         slots = closed_[i].slots_;
         cap = closed_[i].capacity_;
         stride = closed_[i].stride_;
-        return;
+        return true;
       }
     }
     // A closed array with offset <= ticket should have been found
     assert(false);
-    return;
+    return false;
   }
 };
 
@@ -720,11 +651,12 @@ class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
     }
 
     // ideally this would be a static assert, but g++ doesn't allow it
-    assert(alignof(MPMCQueue<T, Atom>) >= CacheLocality::kFalseSharingRange);
+    assert(
+        alignof(MPMCQueue<T, Atom>) >= hardware_destructive_interference_size);
     assert(
         static_cast<uint8_t*>(static_cast<void*>(&popTicket_)) -
             static_cast<uint8_t*>(static_cast<void*>(&pushTicket_)) >=
-        CacheLocality::kFalseSharingRange);
+        static_cast<ptrdiff_t>(hardware_destructive_interference_size));
   }
 
   /// A default-constructed queue is useful because a usable (non-zero
@@ -1044,11 +976,12 @@ class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
     /// To avoid false sharing in slots_ with neighboring memory
     /// allocations, we pad it with this many SingleElementQueue-s at
     /// each end
-    kSlotPadding = (CacheLocality::kFalseSharingRange - 1) / sizeof(Slot) + 1
+    kSlotPadding =
+        (hardware_destructive_interference_size - 1) / sizeof(Slot) + 1
   };
 
   /// The maximum number of items in the queue at once
-  size_t FOLLY_ALIGN_TO_AVOID_FALSE_SHARING capacity_;
+  alignas(hardware_destructive_interference_size) size_t capacity_;
 
   /// Anonymous union for use when Dynamic = false and true, respectively
   union {
@@ -1081,22 +1014,23 @@ class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
   Atom<size_t> dcapacity_;
 
   /// Enqueuers get tickets from here
-  Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushTicket_;
+  alignas(hardware_destructive_interference_size) Atom<uint64_t> pushTicket_;
 
   /// Dequeuers get tickets from here
-  Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popTicket_;
+  alignas(hardware_destructive_interference_size) Atom<uint64_t> popTicket_;
 
   /// This is how many times we will spin before using FUTEX_WAIT when
   /// the queue is full on enqueue, adaptively computed by occasionally
   /// spinning for longer and smoothing with an exponential moving average
-  Atom<uint32_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushSpinCutoff_;
+  alignas(
+      hardware_destructive_interference_size) Atom<uint32_t> pushSpinCutoff_;
 
   /// The adaptive spin cutoff when the queue is empty on dequeue
-  Atom<uint32_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popSpinCutoff_;
+  alignas(hardware_destructive_interference_size) Atom<uint32_t> popSpinCutoff_;
 
   /// Alignment doesn't prevent false sharing at the end of the struct,
   /// so fill out the last cache line
-  char padding_[CacheLocality::kFalseSharingRange - sizeof(Atom<uint32_t>)];
+  char pad_[hardware_destructive_interference_size - sizeof(Atom<uint32_t>)];
 
   /// We assign tickets in increasing order, but we don't want to
   /// access neighboring elements of slots_ because that will lead to