UnboundedQueue: Add LgAlign template parameter - Refactor code
[folly.git] / folly / concurrency / UnboundedQueue.h
index 1ff19ed2ffed182a64f63c8c8db06a4420218f6e..7320b6cd594ee59d57a4b206f5894b2c4e0b6b1e 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2017-present Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 #include <chrono>
 #include <memory>
 
+#include <glog/logging.h>
+
 #include <folly/concurrency/CacheLocality.h>
+#include <folly/experimental/hazptr/hazptr.h>
+#include <folly/synchronization/SaturatingSemaphore.h>
 
 namespace folly {
 
@@ -42,6 +46,9 @@ namespace folly {
 ///   spins. A performance tuning parameter.
 /// - LgSegmentSize (default 8): Log base 2 of number of elements per
 ///   segment. A performance tuning parameter. See below.
+/// - LgAlign (default 7): Log base 2 of alignment directive; can be
+///   used to balance scalability (avoidance of false sharing) with
+///   memory efficiency.
 ///
 /// When to use UnboundedQueue:
 /// - If a small bound may lead to deadlock or performance degradation
@@ -56,10 +63,10 @@ namespace folly {
 ///   SPSC) ProducerConsumerQueue.
 ///
 /// Template Aliases:
-///   USPSCQueue<T, MayBlock, LgSegmentSize>
-///   UMPSCQueue<T, MayBlock, LgSegmentSize>
-///   USPMCQueue<T, MayBlock, LgSegmentSize>
-///   UMPMCQueue<T, MayBlock, LgSegmentSize>
+///   USPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+///   UMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+///   USPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+///   UMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
 ///
 /// Functions:
 ///   Producer operations never wait or fail (unless OOM)
@@ -145,6 +152,8 @@ namespace folly {
 ///   producers or consumers, have references to them or their
 ///   predessors. That is, a lagging thread may delay the reclamation
 ///   of a chain of removed segments.
+/// - The template parameter LgAlign can be used to reduce memory usage
+///   at the cost of increased chance of false sharing.
 ///
 /// Performance considerations:
 /// - All operations take constant time, excluding the costs of
@@ -188,6 +197,7 @@ template <
     bool SingleConsumer,
     bool MayBlock,
     size_t LgSegmentSize = 8,
+    size_t LgAlign = 7,
     template <typename> class Atom = std::atomic>
 class UnboundedQueue {
   using Ticket = uint64_t;
@@ -195,25 +205,42 @@ class UnboundedQueue {
   class Segment;
 
   static constexpr bool SPSC = SingleProducer && SingleConsumer;
-  static constexpr size_t SegmentSize = 1 << LgSegmentSize;
   static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27;
+  static constexpr size_t SegmentSize = 1u << LgSegmentSize;
+  static constexpr size_t Align = 1u << LgAlign;
 
   static_assert(
       std::is_nothrow_destructible<T>::value,
       "T must be nothrow_destructible");
   static_assert((Stride & 1) == 1, "Stride must be odd");
   static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32");
+  static_assert(LgAlign < 16, "LgAlign must be < 16");
 
-  FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
+  FOLLY_ALIGNED(Align)
   Atom<Segment*> head_;
   Atom<Ticket> consumerTicket_;
-  FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
+  FOLLY_ALIGNED(Align)
   Atom<Segment*> tail_;
   Atom<Ticket> producerTicket_;
 
  public:
-  UnboundedQueue();
-  ~UnboundedQueue();
+  /** constructor */
+  UnboundedQueue() {
+    setProducerTicket(0);
+    setConsumerTicket(0);
+    Segment* s = new Segment(0);
+    setTail(s);
+    setHead(s);
+  }
+
+  /** destructor */
+  ~UnboundedQueue() {
+    Segment* next;
+    for (Segment* s = head(); s; s = next) {
+      next = s->nextSegment();
+      reclaimSegment(s);
+    }
+  }
 
   /** enqueue */
   FOLLY_ALWAYS_INLINE void enqueue(const T& arg) {
@@ -225,26 +252,32 @@ class UnboundedQueue {
   }
 
   /** dequeue */
-  void dequeue(T& item) noexcept;
+  FOLLY_ALWAYS_INLINE void dequeue(T& item) noexcept {
+    dequeueImpl(item);
+  }
 
   /** try_dequeue */
-  bool try_dequeue(T& item) noexcept {
-    return try_dequeue_until(
-        item, std::chrono::steady_clock::time_point::min());
+  FOLLY_ALWAYS_INLINE bool try_dequeue(T& item) noexcept {
+    return tryDequeueUntil(item, std::chrono::steady_clock::time_point::min());
   }
 
   /** try_dequeue_until */
   template <typename Clock, typename Duration>
-  bool try_dequeue_until(
+  FOLLY_ALWAYS_INLINE bool try_dequeue_until(
       T& item,
-      const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+    return tryDequeueUntil(item, deadline);
+  }
 
   /** try_dequeue_for */
   template <typename Rep, typename Period>
-  bool try_dequeue_for(
+  FOLLY_ALWAYS_INLINE bool try_dequeue_for(
       T& item,
       const std::chrono::duration<Rep, Period>& duration) noexcept {
-    return try_dequeue_until(item, std::chrono::steady_clock::now() + duration);
+    if (LIKELY(try_dequeue(item))) {
+      return true;
+    }
+    return tryDequeueUntil(item, std::chrono::steady_clock::now() + duration);
   }
 
   /** size */
@@ -262,33 +295,218 @@ class UnboundedQueue {
   }
 
  private:
+  /** enqueueImpl */
   template <typename Arg>
-  void enqueueImpl(Arg&& arg);
+  FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& arg) {
+    if (SPSC) {
+      Segment* s = tail();
+      enqueueCommon(s, std::forward<Arg>(arg));
+    } else {
+      // Using hazptr_holder instead of hazptr_local because it is
+      // possible that the T ctor happens to use hazard pointers.
+      folly::hazptr::hazptr_holder hptr;
+      Segment* s = hptr.get_protected(tail_);
+      enqueueCommon(s, std::forward<Arg>(arg));
+    }
+  }
 
+  /** enqueueCommon */
   template <typename Arg>
-  void enqueueCommon(Segment* s, Arg&& arg);
+  FOLLY_ALWAYS_INLINE void enqueueCommon(Segment* s, Arg&& arg) {
+    Ticket t = fetchIncrementProducerTicket();
+    if (!SingleProducer) {
+      s = findSegment(s, t);
+    }
+    DCHECK_GE(t, s->minTicket());
+    DCHECK_LT(t, s->minTicket() + SegmentSize);
+    size_t idx = index(t);
+    Entry& e = s->entry(idx);
+    e.putItem(std::forward<Arg>(arg));
+    if (responsibleForAlloc(t)) {
+      allocNextSegment(s, t + SegmentSize);
+    }
+    if (responsibleForAdvance(t)) {
+      advanceTail(s);
+    }
+  }
 
-  void dequeueCommon(Segment* s, T& item) noexcept;
+  /** dequeueImpl */
+  FOLLY_ALWAYS_INLINE void dequeueImpl(T& item) noexcept {
+    if (SPSC) {
+      Segment* s = head();
+      dequeueCommon(s, item);
+    } else {
+      // Using hazptr_holder instead of hazptr_local because it is
+      // possible to call the T dtor and it may happen to use hazard
+      // pointers.
+      folly::hazptr::hazptr_holder hptr;
+      Segment* s = hptr.get_protected(head_);
+      dequeueCommon(s, item);
+    }
+  }
 
+  /** dequeueCommon */
+  FOLLY_ALWAYS_INLINE void dequeueCommon(Segment* s, T& item) noexcept {
+    Ticket t = fetchIncrementConsumerTicket();
+    if (!SingleConsumer) {
+      s = findSegment(s, t);
+    }
+    size_t idx = index(t);
+    Entry& e = s->entry(idx);
+    e.takeItem(item);
+    if (responsibleForAdvance(t)) {
+      advanceHead(s);
+    }
+  }
+
+  /** tryDequeueUntil */
   template <typename Clock, typename Duration>
-  bool singleConsumerTryDequeueUntil(
+  FOLLY_ALWAYS_INLINE bool tryDequeueUntil(
+      T& item,
+      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+    if (SingleConsumer) {
+      Segment* s = head();
+      return tryDequeueUntilSC(s, item, deadline);
+    } else {
+      // Using hazptr_holder instead of hazptr_local because it is
+      // possible to call ~T() and it may happen to use hazard pointers.
+      folly::hazptr::hazptr_holder hptr;
+      Segment* s = hptr.get_protected(head_);
+      return ryDequeueUntilMC(s, item, deadline);
+    }
+  }
+
+  /** ryDequeueUntilSC */
+  template <typename Clock, typename Duration>
+  FOLLY_ALWAYS_INLINE bool tryDequeueUntilSC(
       Segment* s,
       T& item,
-      const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+    Ticket t = consumerTicket();
+    DCHECK_GE(t, s->minTicket());
+    DCHECK_LT(t, (s->minTicket() + SegmentSize));
+    size_t idx = index(t);
+    Entry& e = s->entry(idx);
+    if (!e.tryWaitUntil(deadline)) {
+      return false;
+    }
+    setConsumerTicket(t + 1);
+    e.takeItem(item);
+    if (responsibleForAdvance(t)) {
+      advanceHead(s);
+    }
+    return true;
+  }
 
+  /** tryDequeueUntilMC */
   template <typename Clock, typename Duration>
-  bool multiConsumerTryDequeueUntil(
+  FOLLY_ALWAYS_INLINE bool ryDequeueUntilMC(
       Segment* s,
       T& item,
-      const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+    while (true) {
+      Ticket t = consumerTicket();
+      if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
+        s = tryGetNextSegmentUntil(s, deadline);
+        if (s == nullptr) {
+          return false; // timed out
+        }
+        continue;
+      }
+      size_t idx = index(t);
+      Entry& e = s->entry(idx);
+      if (!e.tryWaitUntil(deadline)) {
+        return false;
+      }
+      if (!consumerTicket_.compare_exchange_weak(
+              t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
+        continue;
+      }
+      e.takeItem(item);
+      if (responsibleForAdvance(t)) {
+        advanceHead(s);
+      }
+      return true;
+    }
+  }
 
-  Segment* findSegment(Segment* s, const Ticket t) const noexcept;
+  /** findSegment */
+  FOLLY_ALWAYS_INLINE
+  Segment* findSegment(Segment* s, const Ticket t) const noexcept {
+    while (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
+      auto deadline = std::chrono::steady_clock::time_point::max();
+      s = tryGetNextSegmentUntil(s, deadline);
+      DCHECK(s != nullptr);
+    }
+    return s;
+  }
 
-  void allocNextSegment(Segment* s, const Ticket t);
+  /** tryGetNextSegmentUntil */
+  template <typename Clock, typename Duration>
+  Segment* tryGetNextSegmentUntil(
+      Segment* s,
+      const std::chrono::time_point<Clock, Duration>& deadline) const noexcept {
+    // The following loop will not spin indefinitely (as long as the
+    // number of concurrently waiting consumers does not exceeds
+    // SegmentSize and the OS scheduler does not pause ready threads
+    // indefinitely). Under such conditions, the algorithm guarantees
+    // that the producer reponsible for advancing the tail pointer to
+    // the next segment has already acquired its ticket.
+    while (tail() == s) {
+      if (deadline < Clock::time_point::max() && deadline > Clock::now()) {
+        return nullptr;
+      }
+      asm_volatile_pause();
+    }
+    Segment* next = s->nextSegment();
+    DCHECK(next != nullptr);
+    return next;
+  }
 
-  void advanceTail(Segment* s) noexcept;
+  /** allocNextSegment */
+  void allocNextSegment(Segment* s, const Ticket t) {
+    Segment* next = new Segment(t);
+    if (!SPSC) {
+      next->acquire_ref_safe(); // hazptr
+    }
+    DCHECK(s->nextSegment() == nullptr);
+    s->setNextSegment(next);
+  }
 
-  void advanceHead(Segment* s) noexcept;
+  /** advanceTail */
+  void advanceTail(Segment* s) noexcept {
+    Segment* next = s->nextSegment();
+    if (!SingleProducer) {
+      // The following loop will not spin indefinitely (as long as the
+      // OS scheduler does not pause ready threads indefinitely). The
+      // algorithm guarantees that the producer reponsible for setting
+      // the next pointer has already acquired its ticket.
+      while (next == nullptr) {
+        asm_volatile_pause();
+        next = s->nextSegment();
+      }
+    }
+    DCHECK(next != nullptr);
+    setTail(next);
+  }
+
+  /** advanceHead */
+  void advanceHead(Segment* s) noexcept {
+    auto deadline = std::chrono::steady_clock::time_point::max();
+    Segment* next = tryGetNextSegmentUntil(s, deadline);
+    DCHECK(next != nullptr);
+    setHead(next);
+    reclaimSegment(s);
+  }
+
+  /** reclaimSegment */
+  void reclaimSegment(Segment* s) noexcept {
+    if (SPSC) {
+      delete s;
+    } else {
+      s->retire(); // hazptr
+    }
+  }
 
   FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept {
     return (t * Stride) & (SegmentSize - 1);
@@ -336,7 +554,7 @@ class UnboundedQueue {
 
   FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept {
     if (SingleConsumer) {
-      auto oldval = consumerTicket();
+      Ticket oldval = consumerTicket();
       setConsumerTicket(oldval + 1);
       return oldval;
     } else { // MC
@@ -346,13 +564,101 @@ class UnboundedQueue {
 
   FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept {
     if (SingleProducer) {
-      auto oldval = producerTicket();
+      Ticket oldval = producerTicket();
       setProducerTicket(oldval + 1);
       return oldval;
     } else { // MP
       return producerTicket_.fetch_add(1, std::memory_order_acq_rel);
     }
   }
+
+  /**
+   *  Entry
+   */
+  class Entry {
+    folly::SaturatingSemaphore<MayBlock, Atom> flag_;
+    typename std::aligned_storage<sizeof(T), alignof(T)>::type item_;
+
+   public:
+    template <typename Arg>
+    FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) {
+      new (&item_) T(std::forward<Arg>(arg));
+      flag_.post();
+    }
+
+    FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept {
+      flag_.wait();
+      getItem(item);
+    }
+
+    template <typename Clock, typename Duration>
+    FOLLY_ALWAYS_INLINE bool tryWaitUntil(
+        const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+      return flag_.try_wait_until(deadline);
+    }
+
+   private:
+    FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept {
+      item = std::move(*(itemPtr()));
+      destroyItem();
+    }
+
+    FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
+      return static_cast<T*>(static_cast<void*>(&item_));
+    }
+
+    FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
+      itemPtr()->~T();
+    }
+  }; // Entry
+
+  /**
+   *  Segment
+   */
+  class Segment : public folly::hazptr::hazptr_obj_base_refcounted<Segment> {
+    Atom<Segment*> next_;
+    const Ticket min_;
+    bool marked_; // used for iterative deletion
+    FOLLY_ALIGNED(Align)
+    Entry b_[SegmentSize];
+
+   public:
+    explicit Segment(const Ticket t)
+        : next_(nullptr), min_(t), marked_(false) {}
+
+    ~Segment() {
+      if (!SPSC && !marked_) {
+        Segment* next = nextSegment();
+        while (next) {
+          if (!next->release_ref()) { // hazptr
+            return;
+          }
+          Segment* s = next;
+          next = s->nextSegment();
+          s->marked_ = true;
+          delete s;
+        }
+      }
+    }
+
+    Segment* nextSegment() const noexcept {
+      return next_.load(std::memory_order_acquire);
+    }
+
+    void setNextSegment(Segment* s) noexcept {
+      next_.store(s, std::memory_order_release);
+    }
+
+    FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept {
+      DCHECK_EQ((min_ & (SegmentSize - 1)), 0);
+      return min_;
+    }
+
+    FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept {
+      return b_[index];
+    }
+  }; // Segment
+
 }; // UnboundedQueue
 
 /* Aliases */
@@ -361,33 +667,36 @@ template <
     typename T,
     bool MayBlock,
     size_t LgSegmentSize = 8,
+    size_t LgAlign = 7,
     template <typename> class Atom = std::atomic>
-using USPSCQueue = UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, Atom>;
+using USPSCQueue =
+    UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
 
 template <
     typename T,
     bool MayBlock,
     size_t LgSegmentSize = 8,
+    size_t LgAlign = 7,
     template <typename> class Atom = std::atomic>
 using UMPSCQueue =
-    UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, Atom>;
+    UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
 
 template <
     typename T,
     bool MayBlock,
     size_t LgSegmentSize = 8,
+    size_t LgAlign = 7,
     template <typename> class Atom = std::atomic>
 using USPMCQueue =
-    UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, Atom>;
+    UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
 
 template <
     typename T,
     bool MayBlock,
     size_t LgSegmentSize = 8,
+    size_t LgAlign = 7,
     template <typename> class Atom = std::atomic>
 using UMPMCQueue =
-    UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, Atom>;
+    UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
 
 } // namespace folly
-
-#include <folly/concurrency/UnboundedQueue-inl.h>