Fix AsyncSocketTest.WriteErrorCallbackBytesWritten
[folly.git] / folly / MPMCQueue.h
index e706a1f49eafb89744f9490c65af9e11ab2fe567..d4c0ccc7c8307dbfec970f75fa8b6bd676da6339 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 #include <algorithm>
 #include <atomic>
-#include <assert.h>
-#include <boost/noncopyable.hpp>
+#include <cassert>
+#include <cstring>
 #include <limits>
-#include <string.h>
 #include <type_traits>
 
+#include <boost/noncopyable.hpp>
+
 #include <folly/Traits.h>
-#include <folly/detail/CacheLocality.h>
+#include <folly/concurrency/CacheLocality.h>
 #include <folly/detail/TurnSequencer.h>
 #include <folly/portability/Unistd.h>
 
@@ -33,7 +34,7 @@ namespace folly {
 
 namespace detail {
 
-template<typename T, template<typename> class Atom>
+template <typename T, template <typename> class Atom>
 struct SingleElementQueue;
 
 template <typename T> class MPMCPipelineStageImpl;
@@ -86,7 +87,7 @@ template <typename> class MPMCQueueBase;
 /// use noexcept, you will have to wrap it in something that provides
 /// the guarantee.  We provide an alternate safe implementation for types
 /// that don't use noexcept but that are marked folly::IsRelocatable
-/// and boost::has_nothrow_constructor, which is common for folly types.
+/// and std::is_nothrow_constructible, which is common for folly types.
 /// In particular, if you can declare FOLLY_ASSUME_FBVECTOR_COMPATIBLE
 /// then your type can be put in MPMCQueue.
 ///
@@ -96,8 +97,10 @@ template <typename> class MPMCQueueBase;
 /// are you can enqueue one sentinel and then have each consumer requeue
 /// two sentinels after it receives it (by requeuing 2 the shutdown can
 /// complete in O(log P) time instead of O(P)).
-template<typename T, template<typename> class Atom = std::atomic,
-         bool Dynamic = false>
+template <
+    typename T,
+    template <typename> class Atom = std::atomic,
+    bool Dynamic = false>
 class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
   friend class detail::MPMCPipelineStageImpl<T>;
   using Slot = detail::SingleElementQueue<T,Atom>;
@@ -141,7 +144,7 @@ class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
 /// closed arrays instead of the current one. Information about closed
 /// slots arrays (array address, capacity, stride, and offset) is
 /// maintained in a logarithmic-sized structure. Each entry in that
-/// structure never need to be changed once set. The number of closed
+/// structure never needs to be changed once set. The number of closed
 /// arrays is half the value of the seqlock (when unlocked).
 ///
 /// The acquisition of the seqlock to perform an expansion does not
@@ -159,9 +162,16 @@ class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
 /// call to blockingWrite() when the queue size is known to be equal
 /// to its capacity.
 ///
+/// Note that some writeIfNotFull() and tryWriteUntil() operations may
+/// fail even if the size of the queue is less than its maximum
+/// capacity and despite the success of expansion, if the operation
+/// happens to acquire a ticket that belongs to a closed array. This
+/// is a transient condition. Typically, one or two ticket values may
+/// be subject to such condition per expansion.
+///
 /// The dynamic version is a partial specialization of MPMCQueue with
 /// Dynamic == true
-template <typename T, template<typename> class Atom>
+template <typename T, template <typename> class Atom>
 class MPMCQueue<T,Atom,true> :
       public detail::MPMCQueueBase<MPMCQueue<T,Atom,true>> {
   friend class detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>;
@@ -264,18 +274,19 @@ class MPMCQueue<T,Atom,true> :
     uint64_t offset;
     do {
       if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        asm_volatile_pause();
         continue;
       }
       if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
         // There was an expansion after this ticket was issued.
         break;
       }
-      if (slots[this->idx((ticket-offset), cap, stride)]
-          .mayEnqueue(this->turn(ticket-offset, cap))) {
+      if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+              this->turn(ticket - offset, cap))) {
         // A slot is ready. No need to expand.
         break;
-      } else if (this->popTicket_.load(std::memory_order_relaxed) + cap
-                 > ticket) {
+      } else if (
+          this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) {
         // May block, but a pop is in progress. No need to expand.
         // Get seqlock read section info again in case an expansion
         // occurred with an equal or higher ticket.
@@ -302,7 +313,9 @@ class MPMCQueue<T,Atom,true> :
     int stride;
     uint64_t state;
     uint64_t offset;
-    while (!trySeqlockReadSection(state, slots, cap, stride));
+    while (!trySeqlockReadSection(state, slots, cap, stride)) {
+      asm_volatile_pause();
+    }
     // If there was an expansion after the corresponding push ticket
     // was issued, adjust accordingly
     maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
@@ -310,7 +323,6 @@ class MPMCQueue<T,Atom,true> :
   }
 
  private:
-
   enum {
     kSeqlockBits = 6,
     kDefaultMinDynamicCapacity = 10,
@@ -344,6 +356,7 @@ class MPMCQueue<T,Atom,true> :
     do {
       ticket = this->pushTicket_.load(std::memory_order_acquire); // A
       if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        asm_volatile_pause();
         continue;
       }
 
@@ -352,8 +365,8 @@ class MPMCQueue<T,Atom,true> :
       uint64_t offset;
       maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
 
-      if (slots[this->idx((ticket-offset), cap, stride)]
-          .mayEnqueue(this->turn(ticket-offset, cap))) {
+      if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+              this->turn(ticket - offset, cap))) {
         // A slot is ready.
         if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
           // Adjust ticket
@@ -388,31 +401,26 @@ class MPMCQueue<T,Atom,true> :
       ticket = this->pushTicket_.load(std::memory_order_acquire);
       auto numPops = this->popTicket_.load(std::memory_order_acquire);
       if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        asm_volatile_pause();
         continue;
       }
 
-      const auto oldCap = cap;
+      const auto curCap = cap;
       // If there was an expansion with offset greater than this ticket,
       // adjust accordingly
       uint64_t offset;
       maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
 
       int64_t n = ticket - numPops;
-      if (n >= static_cast<ssize_t>(this->capacity_)) {
-        ticket -= offset;
-        return false;
-      }
 
-      if (n >= static_cast<ssize_t>(oldCap)) {
-        if (tryExpand(state, oldCap)) {
-          // This or another thread started an expansion. Start over
-          // with a new state.
+      if (n >= static_cast<ssize_t>(cap)) {
+        if ((cap == curCap) && tryExpand(state, cap)) {
+          // This or another thread started an expansion. Start over.
           continue;
-        } else {
-          // Can't expand.
-          ticket -= offset;
-          return false;
         }
+        // Can't expand.
+        ticket -= offset;
+        return false;
       }
 
       if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
@@ -430,6 +438,7 @@ class MPMCQueue<T,Atom,true> :
     do {
       ticket = this->popTicket_.load(std::memory_order_relaxed);
       if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        asm_volatile_pause();
         continue;
       }
 
@@ -438,8 +447,8 @@ class MPMCQueue<T,Atom,true> :
       uint64_t offset;
       maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
 
-      if (slots[this->idx((ticket-offset), cap, stride)]
-          .mayDequeue(this->turn(ticket-offset, cap))) {
+      if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue(
+              this->turn(ticket - offset, cap))) {
         if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
           // Adjust ticket
           ticket -= offset;
@@ -459,6 +468,7 @@ class MPMCQueue<T,Atom,true> :
       ticket = this->popTicket_.load(std::memory_order_acquire);
       auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
       if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        asm_volatile_pause();
         continue;
       }
 
@@ -487,7 +497,8 @@ class MPMCQueue<T,Atom,true> :
     uint64_t state;
     uint64_t offset;
 
-    while (!trySeqlockReadSection(state, slots, cap, stride)) {}
+    while (!trySeqlockReadSection(state, slots, cap, stride)) {
+    }
 
     // If there was an expansion after this ticket was issued, adjust
     // accordingly
@@ -518,12 +529,11 @@ class MPMCQueue<T,Atom,true> :
     assert((state & 1) == 0);
     if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
       assert(cap == this->dcapacity_.load());
-      uint64_t ticket = 1 + std::max(this->pushTicket_.load(),
-                                     this->popTicket_.load());
-      size_t newCapacity =
-        std::min(dmult_ * cap, this->capacity_);
+      uint64_t ticket =
+          1 + std::max(this->pushTicket_.load(), this->popTicket_.load());
+      size_t newCapacity = std::min(dmult_ * cap, this->capacity_);
       Slot* newSlots =
-        new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
+          new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
       if (newSlots == nullptr) {
         // Expansion failed. Restore the seqlock
         this->dstate_.store(state);
@@ -604,10 +614,12 @@ class MPMCQueue<T,Atom,true> :
 namespace detail {
 
 /// CRTP specialization of MPMCQueueBase
-template<
-  template<
-    typename T, template<typename> class Atom, bool Dynamic> class Derived,
-  typename T, template<typename> class Atom, bool Dynamic>
+template <
+    template <typename T, template <typename> class Atom, bool Dynamic>
+    class Derived,
+    typename T,
+    template <typename> class Atom,
+    bool Dynamic>
 class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
 
 // Note: Using CRTP static casts in several functions of this base
@@ -639,11 +651,11 @@ class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
     }
 
     // ideally this would be a static assert, but g++ doesn't allow it
-    assert(alignof(MPMCQueue<T,Atom>)
-           >= detail::CacheLocality::kFalseSharingRange);
-    assert(static_cast<uint8_t*>(static_cast<void*>(&popTicket_))
-           - static_cast<uint8_t*>(static_cast<void*>(&pushTicket_))
-           >= detail::CacheLocality::kFalseSharingRange);
+    assert(alignof(MPMCQueue<T, Atom>) >= CacheLocality::kFalseSharingRange);
+    assert(
+        static_cast<uint8_t*>(static_cast<void*>(&popTicket_)) -
+            static_cast<uint8_t*>(static_cast<void*>(&pushTicket_)) >=
+        CacheLocality::kFalseSharingRange);
   }
 
   /// A default-constructed queue is useful because a usable (non-zero
@@ -729,14 +741,14 @@ class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
       if (pushes == nextPushes) {
         // pushTicket_ didn't change from A (or the previous C) to C,
         // so we can linearize at B (or D)
-        return pushes - pops;
+        return ssize_t(pushes - pops);
       }
       pushes = nextPushes;
       uint64_t nextPops = popTicket_.load(std::memory_order_acquire); // D
       if (pops == nextPops) {
         // popTicket_ didn't chance from B (or the previous D), so we
         // can linearize at C
-        return pushes - pops;
+        return ssize_t(pushes - pops);
       }
       pops = nextPops;
     }
@@ -963,8 +975,7 @@ class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
     /// To avoid false sharing in slots_ with neighboring memory
     /// allocations, we pad it with this many SingleElementQueue-s at
     /// each end
-    kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1)
-        / sizeof(Slot) + 1
+    kSlotPadding = (CacheLocality::kFalseSharingRange - 1) / sizeof(Slot) + 1
   };
 
   /// The maximum number of items in the queue at once
@@ -1016,8 +1027,7 @@ class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
 
   /// Alignment doesn't prevent false sharing at the end of the struct,
   /// so fill out the last cache line
-  char padding_[detail::CacheLocality::kFalseSharingRange -
-                sizeof(Atom<uint32_t>)];
+  char padding_[CacheLocality::kFalseSharingRange - sizeof(Atom<uint32_t>)];
 
   /// We assign tickets in increasing order, but we don't want to
   /// access neighboring elements of slots_ because that will lead to
@@ -1067,7 +1077,7 @@ class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
   /// corresponding SingleElementQueue
   uint32_t turn(uint64_t ticket, size_t cap) noexcept {
     assert(cap != 0);
-    return ticket / cap;
+    return uint32_t(ticket / cap);
   }
 
   /// Tries to obtain a push ticket for which SingleElementQueue::enqueue
@@ -1147,7 +1157,7 @@ class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
       ticket = numPushes;
       const auto numPops = popTicket_.load(std::memory_order_acquire); // B
       // n will be negative if pops are pending
-      const int64_t n = numPushes - numPops;
+      const int64_t n = int64_t(numPushes - numPops);
       if (n >= static_cast<ssize_t>(capacity_)) {
         // Full, linearize at B.  We don't need to recheck the read we
         // performed at A, because if numPushes was stale at B then the
@@ -1296,9 +1306,10 @@ struct SingleElementQueue {
   }
 
   /// enqueue using in-place noexcept construction
-  template <typename ...Args,
-            typename = typename std::enable_if<
-              std::is_nothrow_constructible<T,Args...>::value>::type>
+  template <
+      typename... Args,
+      typename = typename std::enable_if<
+          std::is_nothrow_constructible<T, Args...>::value>::type>
   void enqueue(const uint32_t turn,
                Atom<uint32_t>& spinCutoff,
                const bool updateSpinCutoff,
@@ -1310,15 +1321,17 @@ struct SingleElementQueue {
 
   /// enqueue using move construction, either real (if
   /// is_nothrow_move_constructible) or simulated using relocation and
-  /// default construction (if IsRelocatable and has_nothrow_constructor)
-  template <typename = typename std::enable_if<
-                (folly::IsRelocatable<T>::value &&
-                 boost::has_nothrow_constructor<T>::value) ||
-                std::is_nothrow_constructible<T, T&&>::value>::type>
-  void enqueue(const uint32_t turn,
-               Atom<uint32_t>& spinCutoff,
-               const bool updateSpinCutoff,
-               T&& goner) noexcept {
+  /// default construction (if IsRelocatable and is_nothrow_constructible)
+  template <
+      typename = typename std::enable_if<
+          (folly::IsRelocatable<T>::value &&
+           std::is_nothrow_constructible<T>::value) ||
+          std::is_nothrow_constructible<T, T&&>::value>::type>
+  void enqueue(
+      const uint32_t turn,
+      Atom<uint32_t>& spinCutoff,
+      const bool updateSpinCutoff,
+      T&& goner) noexcept {
     enqueueImpl(
         turn,
         spinCutoff,