UnboundedQueue: Add LgAlign template parameter - Refactor code
authorMaged Michael <magedmichael@fb.com>
Fri, 8 Dec 2017 17:25:57 +0000 (09:25 -0800)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Fri, 8 Dec 2017 17:36:05 +0000 (09:36 -0800)
Summary:
- Add a template parameter, LgAlign, to control memory usage. The parameter is used in DynamicBoundedQueue.
- Refactor code.

Reviewed By: yfeldblum

Differential Revision: D6508015

fbshipit-source-id: 6e17b1d8fd900595147dc4217e04d379a13fbdf8

folly/concurrency/UnboundedQueue-inl.h [deleted file]
folly/concurrency/UnboundedQueue.h
folly/concurrency/test/UnboundedQueueTest.cpp

diff --git a/folly/concurrency/UnboundedQueue-inl.h b/folly/concurrency/UnboundedQueue-inl.h
deleted file mode 100644 (file)
index 101f668..0000000
+++ /dev/null
@@ -1,568 +0,0 @@
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/experimental/hazptr/hazptr.h>
-#include <folly/lang/Launder.h>
-#include <folly/synchronization/SaturatingSemaphore.h>
-
-#include <glog/logging.h>
-
-namespace folly {
-
-/* constructor */
-
-template <
-    typename T,
-    bool SingleProducer,
-    bool SingleConsumer,
-    bool MayBlock,
-    size_t LgSegmentSize,
-    template <typename> class Atom>
-inline UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::UnboundedQueue() {
-  setProducerTicket(0);
-  setConsumerTicket(0);
-  auto s = new Segment(0);
-  DEBUG_PRINT(s);
-  setTail(s);
-  setHead(s);
-}
-
-/* destructor */
-
-template <
-    typename T,
-    bool SingleProducer,
-    bool SingleConsumer,
-    bool MayBlock,
-    size_t LgSegmentSize,
-    template <typename> class Atom>
-inline UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::~UnboundedQueue() {
-  Segment* next;
-  for (auto s = head(); s; s = next) {
-    next = s->nextSegment();
-    if (SPSC) {
-      delete s;
-    } else {
-      s->retire(); // hazptr
-    }
-  }
-}
-
-/* dequeue */
-
-template <
-    typename T,
-    bool SingleProducer,
-    bool SingleConsumer,
-    bool MayBlock,
-    size_t LgSegmentSize,
-    template <typename> class Atom>
-FOLLY_ALWAYS_INLINE void UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::dequeue(T& item) noexcept {
-  if (SPSC) {
-    auto s = head();
-    dequeueCommon(s, item);
-  } else {
-    // Using hazptr_holder instead of hazptr_local because it is
-    // possible to call ~T() and it may happen to use hazard pointers.
-    folly::hazptr::hazptr_holder hptr;
-    auto s = hptr.get_protected(head_);
-    dequeueCommon(s, item);
-  }
-}
-
-/* try_dequeue_until */
-
-template <
-    typename T,
-    bool SingleProducer,
-    bool SingleConsumer,
-    bool MayBlock,
-    size_t LgSegmentSize,
-    template <typename> class Atom>
-template <typename Clock, typename Duration>
-FOLLY_ALWAYS_INLINE bool UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::
-    try_dequeue_until(
-        T& item,
-        const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
-  if (SingleConsumer) {
-    auto s = head();
-    return singleConsumerTryDequeueUntil(s, item, deadline);
-  } else {
-    // Using hazptr_holder instead of hazptr_local because it is
-    // possible to call ~T() and it may happen to use hazard pointers.
-    folly::hazptr::hazptr_holder hptr;
-    auto s = hptr.get_protected(head_);
-    return multiConsumerTryDequeueUntil(s, item, deadline);
-  }
-}
-
-/* enqueueImpl */
-
-template <
-    typename T,
-    bool SingleProducer,
-    bool SingleConsumer,
-    bool MayBlock,
-    size_t LgSegmentSize,
-    template <typename> class Atom>
-template <typename Arg>
-FOLLY_ALWAYS_INLINE void UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::enqueueImpl(Arg&& arg) {
-  if (SPSC) {
-    auto s = tail();
-    enqueueCommon(s, std::forward<Arg>(arg));
-  } else {
-    // Using hazptr_holder instead of hazptr_local because it is
-    // possible that the T construcctor happens to use hazard
-    // pointers.
-    folly::hazptr::hazptr_holder hptr;
-    auto s = hptr.get_protected(tail_);
-    enqueueCommon(s, std::forward<Arg>(arg));
-  }
-}
-
-/* enqueueCommon */
-
-template <
-    typename T,
-    bool SingleProducer,
-    bool SingleConsumer,
-    bool MayBlock,
-    size_t LgSegmentSize,
-    template <typename> class Atom>
-template <typename Arg>
-FOLLY_ALWAYS_INLINE void UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::enqueueCommon(Segment* s, Arg&& arg) {
-  auto t = fetchIncrementProducerTicket();
-  if (!SingleProducer) {
-    s = findSegment(s, t);
-  }
-  DCHECK_GE(t, s->minTicket());
-  DCHECK_LT(t, (s->minTicket() + SegmentSize));
-  size_t idx = index(t);
-  Entry& e = s->entry(idx);
-  e.putItem(std::forward<Arg>(arg));
-  if (responsibleForAlloc(t)) {
-    allocNextSegment(s, t + SegmentSize);
-  }
-  if (responsibleForAdvance(t)) {
-    advanceTail(s);
-  }
-}
-
-/* dequeueCommon */
-
-template <
-    typename T,
-    bool SingleProducer,
-    bool SingleConsumer,
-    bool MayBlock,
-    size_t LgSegmentSize,
-    template <typename> class Atom>
-FOLLY_ALWAYS_INLINE void UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::dequeueCommon(Segment* s, T& item) noexcept {
-  auto t = fetchIncrementConsumerTicket();
-  if (!SingleConsumer) {
-    s = findSegment(s, t);
-  }
-  size_t idx = index(t);
-  Entry& e = s->entry(idx);
-  e.takeItem(item);
-  if (responsibleForAdvance(t)) {
-    advanceHead(s);
-  }
-}
-
-/* singleConsumerTryDequeueUntil */
-
-template <
-    typename T,
-    bool SingleProducer,
-    bool SingleConsumer,
-    bool MayBlock,
-    size_t LgSegmentSize,
-    template <typename> class Atom>
-template <typename Clock, typename Duration>
-FOLLY_ALWAYS_INLINE bool UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::
-    singleConsumerTryDequeueUntil(
-        Segment* s,
-        T& item,
-        const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
-  auto t = consumerTicket();
-  DCHECK_GE(t, s->minTicket());
-  DCHECK_LT(t, (s->minTicket() + SegmentSize));
-  size_t idx = index(t);
-  Entry& e = s->entry(idx);
-  if (!e.tryWaitUntil(deadline)) {
-    return false;
-  }
-  setConsumerTicket(t + 1);
-  e.takeItem(item);
-  if (responsibleForAdvance(t)) {
-    advanceHead(s);
-  }
-  return true;
-}
-
-/* multiConsumerTryDequeueUntil */
-
-template <
-    typename T,
-    bool SingleProducer,
-    bool SingleConsumer,
-    bool MayBlock,
-    size_t LgSegmentSize,
-    template <typename> class Atom>
-template <typename Clock, typename Duration>
-FOLLY_ALWAYS_INLINE bool UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::
-    multiConsumerTryDequeueUntil(
-        Segment* s,
-        T& item,
-        const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
-  while (true) {
-    auto t = consumerTicket();
-    if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
-      Segment* next;
-      // Note that the following loop will not spin indefinitely (as
-      // long as the number of concurrently waiting consumers is not
-      // greater than SegmentSize). The algorithm guarantees in such a
-      // case that the producer reponsible for setting the next
-      // pointer is already running.
-      while ((next = s->nextSegment()) == nullptr) {
-        if (Clock::now() > deadline) {
-          return false;
-        }
-        asm_volatile_pause();
-      }
-      s = next;
-      DCHECK(s != nullptr);
-      continue;
-    }
-    size_t idx = index(t);
-    Entry& e = s->entry(idx);
-    if (!e.tryWaitUntil(deadline)) {
-      return false;
-    }
-    if (!consumerTicket_.compare_exchange_weak(
-            t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
-      continue;
-    }
-    e.takeItem(item);
-    if (responsibleForAdvance(t)) {
-      advanceHead(s);
-    }
-    return true;
-  }
-}
-
-/* findSegment */
-
-template <
-    typename T,
-    bool SingleProducer,
-    bool SingleConsumer,
-    bool MayBlock,
-    size_t LgSegmentSize,
-    template <typename> class Atom>
-inline typename UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::Segment*
-UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::findSegment(Segment* s, const Ticket t) const noexcept {
-  while (t >= (s->minTicket() + SegmentSize)) {
-    Segment* next = s->nextSegment();
-    // Note that the following loop will not spin indefinitely. The
-    // algorithm guarantees that the producer reponsible for setting
-    // the next pointer is already running.
-    while (next == nullptr) {
-      asm_volatile_pause();
-      next = s->nextSegment();
-    }
-    DCHECK(next != nullptr);
-    s = next;
-  }
-  return s;
-}
-
-/* allocNextSegment */
-
-template <
-    typename T,
-    bool SingleProducer,
-    bool SingleConsumer,
-    bool MayBlock,
-    size_t LgSegmentSize,
-    template <typename> class Atom>
-inline void UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::allocNextSegment(Segment* s, const Ticket t) {
-  auto next = new Segment(t);
-  if (!SPSC) {
-    next->acquire_ref_safe(); // hazptr
-  }
-  DEBUG_PRINT(s << " " << next);
-  DCHECK(s->nextSegment() == nullptr);
-  s->setNextSegment(next);
-}
-
-/* advanceTail */
-
-template <
-    typename T,
-    bool SingleProducer,
-    bool SingleConsumer,
-    bool MayBlock,
-    size_t LgSegmentSize,
-    template <typename> class Atom>
-inline void UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::advanceTail(Segment* s) noexcept {
-  Segment* next = s->nextSegment();
-  if (!SingleProducer) {
-    // Note that the following loop will not spin indefinitely. The
-    // algorithm guarantees that the producer reponsible for setting
-    // the next pointer is already running.
-    while (next == nullptr) {
-      asm_volatile_pause();
-      next = s->nextSegment();
-    }
-  }
-  DCHECK(next != nullptr);
-  DEBUG_PRINT(s << " " << next);
-  setTail(next);
-}
-
-/* advanceHead */
-
-template <
-    typename T,
-    bool SingleProducer,
-    bool SingleConsumer,
-    bool MayBlock,
-    size_t LgSegmentSize,
-    template <typename> class Atom>
-inline void UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::advanceHead(Segment* s) noexcept {
-  // Note that the following loops will not spin indefinitely. The
-  // algorithm guarantees that the producers reponsible for advancing
-  // the tail pointer and setting the next pointer are already
-  // running.
-  while (tail() == s) {
-    asm_volatile_pause();
-  }
-  auto next = s->nextSegment();
-  while (next == nullptr) {
-    next = s->nextSegment();
-  }
-  DEBUG_PRINT(s << " " << next);
-  setHead(next);
-  if (SPSC) {
-    delete s;
-  } else {
-    s->retire(); // hazptr
-  }
-}
-
-/**
- *  Entry
- */
-
-template <
-    typename T,
-    bool SingleProducer,
-    bool SingleConsumer,
-    bool MayBlock,
-    size_t LgSegmentSize,
-    template <typename> class Atom>
-class UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::Entry {
-  folly::SaturatingSemaphore<MayBlock, Atom> flag_;
-  typename std::aligned_storage<sizeof(T), alignof(T)>::type item_;
-
- public:
-  template <typename Arg>
-  FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) {
-    new (&item_) T(std::forward<Arg>(arg));
-    flag_.post();
-  }
-
-  FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept {
-    flag_.wait();
-    getItem(item);
-  }
-
-  template <typename Clock, typename Duration>
-  FOLLY_ALWAYS_INLINE bool tryWaitUntil(
-      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
-    return flag_.try_wait_until(deadline);
-  }
-
- private:
-  FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept {
-    item = std::move(*(folly::launder(itemPtr())));
-    destroyItem();
-  }
-
-  FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
-    return static_cast<T*>(static_cast<void*>(&item_));
-  }
-
-  FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
-    itemPtr()->~T();
-  }
-}; // UnboundedQueue::Entry
-
-/**
- *  Segment
- */
-
-template <
-    typename T,
-    bool SingleProducer,
-    bool SingleConsumer,
-    bool MayBlock,
-    size_t LgSegmentSize,
-    template <typename> class Atom>
-class UnboundedQueue<
-    T,
-    SingleProducer,
-    SingleConsumer,
-    MayBlock,
-    LgSegmentSize,
-    Atom>::Segment : public folly::hazptr::hazptr_obj_base_refcounted<Segment> {
-  Atom<Segment*> next_;
-  const Ticket min_;
-  bool marked_; // used for iterative deletion
-  FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
-  Entry b_[SegmentSize];
-
- public:
-  explicit Segment(const Ticket t) : next_(nullptr), min_(t), marked_(false) {}
-
-  ~Segment() {
-    if (!SPSC && !marked_) {
-      auto next = nextSegment();
-      while (next) {
-        if (!next->release_ref()) { // hazptr
-          return;
-        }
-        auto s = next;
-        next = s->nextSegment();
-        s->marked_ = true;
-        delete s;
-      }
-    }
-  }
-
-  Segment* nextSegment() const noexcept {
-    return next_.load(std::memory_order_acquire);
-  }
-
-  void setNextSegment(Segment* s) noexcept {
-    next_.store(s, std::memory_order_release);
-  }
-
-  FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept {
-    DCHECK_EQ((min_ & (SegmentSize - 1)), 0);
-    return min_;
-  }
-
-  FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept {
-    return b_[index];
-  }
-}; // UnboundedQueue::Segment
-
-} // namespace folly
index 1ff19ed2ffed182a64f63c8c8db06a4420218f6e..7320b6cd594ee59d57a4b206f5894b2c4e0b6b1e 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2017-present Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 #include <chrono>
 #include <memory>
 
+#include <glog/logging.h>
+
 #include <folly/concurrency/CacheLocality.h>
+#include <folly/experimental/hazptr/hazptr.h>
+#include <folly/synchronization/SaturatingSemaphore.h>
 
 namespace folly {
 
@@ -42,6 +46,9 @@ namespace folly {
 ///   spins. A performance tuning parameter.
 /// - LgSegmentSize (default 8): Log base 2 of number of elements per
 ///   segment. A performance tuning parameter. See below.
+/// - LgAlign (default 7): Log base 2 of alignment directive; can be
+///   used to balance scalability (avoidance of false sharing) with
+///   memory efficiency.
 ///
 /// When to use UnboundedQueue:
 /// - If a small bound may lead to deadlock or performance degradation
@@ -56,10 +63,10 @@ namespace folly {
 ///   SPSC) ProducerConsumerQueue.
 ///
 /// Template Aliases:
-///   USPSCQueue<T, MayBlock, LgSegmentSize>
-///   UMPSCQueue<T, MayBlock, LgSegmentSize>
-///   USPMCQueue<T, MayBlock, LgSegmentSize>
-///   UMPMCQueue<T, MayBlock, LgSegmentSize>
+///   USPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+///   UMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+///   USPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+///   UMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
 ///
 /// Functions:
 ///   Producer operations never wait or fail (unless OOM)
@@ -145,6 +152,8 @@ namespace folly {
 ///   producers or consumers, have references to them or their
 ///   predessors. That is, a lagging thread may delay the reclamation
 ///   of a chain of removed segments.
+/// - The template parameter LgAlign can be used to reduce memory usage
+///   at the cost of increased chance of false sharing.
 ///
 /// Performance considerations:
 /// - All operations take constant time, excluding the costs of
@@ -188,6 +197,7 @@ template <
     bool SingleConsumer,
     bool MayBlock,
     size_t LgSegmentSize = 8,
+    size_t LgAlign = 7,
     template <typename> class Atom = std::atomic>
 class UnboundedQueue {
   using Ticket = uint64_t;
@@ -195,25 +205,42 @@ class UnboundedQueue {
   class Segment;
 
   static constexpr bool SPSC = SingleProducer && SingleConsumer;
-  static constexpr size_t SegmentSize = 1 << LgSegmentSize;
   static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27;
+  static constexpr size_t SegmentSize = 1u << LgSegmentSize;
+  static constexpr size_t Align = 1u << LgAlign;
 
   static_assert(
       std::is_nothrow_destructible<T>::value,
       "T must be nothrow_destructible");
   static_assert((Stride & 1) == 1, "Stride must be odd");
   static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32");
+  static_assert(LgAlign < 16, "LgAlign must be < 16");
 
-  FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
+  FOLLY_ALIGNED(Align)
   Atom<Segment*> head_;
   Atom<Ticket> consumerTicket_;
-  FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
+  FOLLY_ALIGNED(Align)
   Atom<Segment*> tail_;
   Atom<Ticket> producerTicket_;
 
  public:
-  UnboundedQueue();
-  ~UnboundedQueue();
+  /** constructor */
+  UnboundedQueue() {
+    setProducerTicket(0);
+    setConsumerTicket(0);
+    Segment* s = new Segment(0);
+    setTail(s);
+    setHead(s);
+  }
+
+  /** destructor */
+  ~UnboundedQueue() {
+    Segment* next;
+    for (Segment* s = head(); s; s = next) {
+      next = s->nextSegment();
+      reclaimSegment(s);
+    }
+  }
 
   /** enqueue */
   FOLLY_ALWAYS_INLINE void enqueue(const T& arg) {
@@ -225,26 +252,32 @@ class UnboundedQueue {
   }
 
   /** dequeue */
-  void dequeue(T& item) noexcept;
+  FOLLY_ALWAYS_INLINE void dequeue(T& item) noexcept {
+    dequeueImpl(item);
+  }
 
   /** try_dequeue */
-  bool try_dequeue(T& item) noexcept {
-    return try_dequeue_until(
-        item, std::chrono::steady_clock::time_point::min());
+  FOLLY_ALWAYS_INLINE bool try_dequeue(T& item) noexcept {
+    return tryDequeueUntil(item, std::chrono::steady_clock::time_point::min());
   }
 
   /** try_dequeue_until */
   template <typename Clock, typename Duration>
-  bool try_dequeue_until(
+  FOLLY_ALWAYS_INLINE bool try_dequeue_until(
       T& item,
-      const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+    return tryDequeueUntil(item, deadline);
+  }
 
   /** try_dequeue_for */
   template <typename Rep, typename Period>
-  bool try_dequeue_for(
+  FOLLY_ALWAYS_INLINE bool try_dequeue_for(
       T& item,
       const std::chrono::duration<Rep, Period>& duration) noexcept {
-    return try_dequeue_until(item, std::chrono::steady_clock::now() + duration);
+    if (LIKELY(try_dequeue(item))) {
+      return true;
+    }
+    return tryDequeueUntil(item, std::chrono::steady_clock::now() + duration);
   }
 
   /** size */
@@ -262,33 +295,218 @@ class UnboundedQueue {
   }
 
  private:
+  /** enqueueImpl */
   template <typename Arg>
-  void enqueueImpl(Arg&& arg);
+  FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& arg) {
+    if (SPSC) {
+      Segment* s = tail();
+      enqueueCommon(s, std::forward<Arg>(arg));
+    } else {
+      // Using hazptr_holder instead of hazptr_local because it is
+      // possible that the T ctor happens to use hazard pointers.
+      folly::hazptr::hazptr_holder hptr;
+      Segment* s = hptr.get_protected(tail_);
+      enqueueCommon(s, std::forward<Arg>(arg));
+    }
+  }
 
+  /** enqueueCommon */
   template <typename Arg>
-  void enqueueCommon(Segment* s, Arg&& arg);
+  FOLLY_ALWAYS_INLINE void enqueueCommon(Segment* s, Arg&& arg) {
+    Ticket t = fetchIncrementProducerTicket();
+    if (!SingleProducer) {
+      s = findSegment(s, t);
+    }
+    DCHECK_GE(t, s->minTicket());
+    DCHECK_LT(t, s->minTicket() + SegmentSize);
+    size_t idx = index(t);
+    Entry& e = s->entry(idx);
+    e.putItem(std::forward<Arg>(arg));
+    if (responsibleForAlloc(t)) {
+      allocNextSegment(s, t + SegmentSize);
+    }
+    if (responsibleForAdvance(t)) {
+      advanceTail(s);
+    }
+  }
 
-  void dequeueCommon(Segment* s, T& item) noexcept;
+  /** dequeueImpl */
+  FOLLY_ALWAYS_INLINE void dequeueImpl(T& item) noexcept {
+    if (SPSC) {
+      Segment* s = head();
+      dequeueCommon(s, item);
+    } else {
+      // Using hazptr_holder instead of hazptr_local because it is
+      // possible to call the T dtor and it may happen to use hazard
+      // pointers.
+      folly::hazptr::hazptr_holder hptr;
+      Segment* s = hptr.get_protected(head_);
+      dequeueCommon(s, item);
+    }
+  }
 
+  /** dequeueCommon */
+  FOLLY_ALWAYS_INLINE void dequeueCommon(Segment* s, T& item) noexcept {
+    Ticket t = fetchIncrementConsumerTicket();
+    if (!SingleConsumer) {
+      s = findSegment(s, t);
+    }
+    size_t idx = index(t);
+    Entry& e = s->entry(idx);
+    e.takeItem(item);
+    if (responsibleForAdvance(t)) {
+      advanceHead(s);
+    }
+  }
+
+  /** tryDequeueUntil */
   template <typename Clock, typename Duration>
-  bool singleConsumerTryDequeueUntil(
+  FOLLY_ALWAYS_INLINE bool tryDequeueUntil(
+      T& item,
+      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+    if (SingleConsumer) {
+      Segment* s = head();
+      return tryDequeueUntilSC(s, item, deadline);
+    } else {
+      // Using hazptr_holder instead of hazptr_local because it is
+      // possible to call ~T() and it may happen to use hazard pointers.
+      folly::hazptr::hazptr_holder hptr;
+      Segment* s = hptr.get_protected(head_);
+      return ryDequeueUntilMC(s, item, deadline);
+    }
+  }
+
+  /** ryDequeueUntilSC */
+  template <typename Clock, typename Duration>
+  FOLLY_ALWAYS_INLINE bool tryDequeueUntilSC(
       Segment* s,
       T& item,
-      const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+    Ticket t = consumerTicket();
+    DCHECK_GE(t, s->minTicket());
+    DCHECK_LT(t, (s->minTicket() + SegmentSize));
+    size_t idx = index(t);
+    Entry& e = s->entry(idx);
+    if (!e.tryWaitUntil(deadline)) {
+      return false;
+    }
+    setConsumerTicket(t + 1);
+    e.takeItem(item);
+    if (responsibleForAdvance(t)) {
+      advanceHead(s);
+    }
+    return true;
+  }
 
+  /** tryDequeueUntilMC */
   template <typename Clock, typename Duration>
-  bool multiConsumerTryDequeueUntil(
+  FOLLY_ALWAYS_INLINE bool ryDequeueUntilMC(
       Segment* s,
       T& item,
-      const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+    while (true) {
+      Ticket t = consumerTicket();
+      if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
+        s = tryGetNextSegmentUntil(s, deadline);
+        if (s == nullptr) {
+          return false; // timed out
+        }
+        continue;
+      }
+      size_t idx = index(t);
+      Entry& e = s->entry(idx);
+      if (!e.tryWaitUntil(deadline)) {
+        return false;
+      }
+      if (!consumerTicket_.compare_exchange_weak(
+              t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
+        continue;
+      }
+      e.takeItem(item);
+      if (responsibleForAdvance(t)) {
+        advanceHead(s);
+      }
+      return true;
+    }
+  }
 
-  Segment* findSegment(Segment* s, const Ticket t) const noexcept;
+  /** findSegment */
+  FOLLY_ALWAYS_INLINE
+  Segment* findSegment(Segment* s, const Ticket t) const noexcept {
+    while (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
+      auto deadline = std::chrono::steady_clock::time_point::max();
+      s = tryGetNextSegmentUntil(s, deadline);
+      DCHECK(s != nullptr);
+    }
+    return s;
+  }
 
-  void allocNextSegment(Segment* s, const Ticket t);
+  /** tryGetNextSegmentUntil */
+  template <typename Clock, typename Duration>
+  Segment* tryGetNextSegmentUntil(
+      Segment* s,
+      const std::chrono::time_point<Clock, Duration>& deadline) const noexcept {
+    // The following loop will not spin indefinitely (as long as the
+    // number of concurrently waiting consumers does not exceeds
+    // SegmentSize and the OS scheduler does not pause ready threads
+    // indefinitely). Under such conditions, the algorithm guarantees
+    // that the producer reponsible for advancing the tail pointer to
+    // the next segment has already acquired its ticket.
+    while (tail() == s) {
+      if (deadline < Clock::time_point::max() && deadline > Clock::now()) {
+        return nullptr;
+      }
+      asm_volatile_pause();
+    }
+    Segment* next = s->nextSegment();
+    DCHECK(next != nullptr);
+    return next;
+  }
 
-  void advanceTail(Segment* s) noexcept;
+  /** allocNextSegment */
+  void allocNextSegment(Segment* s, const Ticket t) {
+    Segment* next = new Segment(t);
+    if (!SPSC) {
+      next->acquire_ref_safe(); // hazptr
+    }
+    DCHECK(s->nextSegment() == nullptr);
+    s->setNextSegment(next);
+  }
 
-  void advanceHead(Segment* s) noexcept;
+  /** advanceTail */
+  void advanceTail(Segment* s) noexcept {
+    Segment* next = s->nextSegment();
+    if (!SingleProducer) {
+      // The following loop will not spin indefinitely (as long as the
+      // OS scheduler does not pause ready threads indefinitely). The
+      // algorithm guarantees that the producer reponsible for setting
+      // the next pointer has already acquired its ticket.
+      while (next == nullptr) {
+        asm_volatile_pause();
+        next = s->nextSegment();
+      }
+    }
+    DCHECK(next != nullptr);
+    setTail(next);
+  }
+
+  /** advanceHead */
+  void advanceHead(Segment* s) noexcept {
+    auto deadline = std::chrono::steady_clock::time_point::max();
+    Segment* next = tryGetNextSegmentUntil(s, deadline);
+    DCHECK(next != nullptr);
+    setHead(next);
+    reclaimSegment(s);
+  }
+
+  /** reclaimSegment */
+  void reclaimSegment(Segment* s) noexcept {
+    if (SPSC) {
+      delete s;
+    } else {
+      s->retire(); // hazptr
+    }
+  }
 
   FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept {
     return (t * Stride) & (SegmentSize - 1);
@@ -336,7 +554,7 @@ class UnboundedQueue {
 
   FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept {
     if (SingleConsumer) {
-      auto oldval = consumerTicket();
+      Ticket oldval = consumerTicket();
       setConsumerTicket(oldval + 1);
       return oldval;
     } else { // MC
@@ -346,13 +564,101 @@ class UnboundedQueue {
 
   FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept {
     if (SingleProducer) {
-      auto oldval = producerTicket();
+      Ticket oldval = producerTicket();
       setProducerTicket(oldval + 1);
       return oldval;
     } else { // MP
       return producerTicket_.fetch_add(1, std::memory_order_acq_rel);
     }
   }
+
+  /**
+   *  Entry
+   */
+  class Entry {
+    folly::SaturatingSemaphore<MayBlock, Atom> flag_;
+    typename std::aligned_storage<sizeof(T), alignof(T)>::type item_;
+
+   public:
+    template <typename Arg>
+    FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) {
+      new (&item_) T(std::forward<Arg>(arg));
+      flag_.post();
+    }
+
+    FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept {
+      flag_.wait();
+      getItem(item);
+    }
+
+    template <typename Clock, typename Duration>
+    FOLLY_ALWAYS_INLINE bool tryWaitUntil(
+        const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+      return flag_.try_wait_until(deadline);
+    }
+
+   private:
+    FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept {
+      item = std::move(*(itemPtr()));
+      destroyItem();
+    }
+
+    FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
+      return static_cast<T*>(static_cast<void*>(&item_));
+    }
+
+    FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
+      itemPtr()->~T();
+    }
+  }; // Entry
+
+  /**
+   *  Segment
+   */
+  class Segment : public folly::hazptr::hazptr_obj_base_refcounted<Segment> {
+    Atom<Segment*> next_;
+    const Ticket min_;
+    bool marked_; // used for iterative deletion
+    FOLLY_ALIGNED(Align)
+    Entry b_[SegmentSize];
+
+   public:
+    explicit Segment(const Ticket t)
+        : next_(nullptr), min_(t), marked_(false) {}
+
+    ~Segment() {
+      if (!SPSC && !marked_) {
+        Segment* next = nextSegment();
+        while (next) {
+          if (!next->release_ref()) { // hazptr
+            return;
+          }
+          Segment* s = next;
+          next = s->nextSegment();
+          s->marked_ = true;
+          delete s;
+        }
+      }
+    }
+
+    Segment* nextSegment() const noexcept {
+      return next_.load(std::memory_order_acquire);
+    }
+
+    void setNextSegment(Segment* s) noexcept {
+      next_.store(s, std::memory_order_release);
+    }
+
+    FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept {
+      DCHECK_EQ((min_ & (SegmentSize - 1)), 0);
+      return min_;
+    }
+
+    FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept {
+      return b_[index];
+    }
+  }; // Segment
+
 }; // UnboundedQueue
 
 /* Aliases */
@@ -361,33 +667,36 @@ template <
     typename T,
     bool MayBlock,
     size_t LgSegmentSize = 8,
+    size_t LgAlign = 7,
     template <typename> class Atom = std::atomic>
-using USPSCQueue = UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, Atom>;
+using USPSCQueue =
+    UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
 
 template <
     typename T,
     bool MayBlock,
     size_t LgSegmentSize = 8,
+    size_t LgAlign = 7,
     template <typename> class Atom = std::atomic>
 using UMPSCQueue =
-    UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, Atom>;
+    UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
 
 template <
     typename T,
     bool MayBlock,
     size_t LgSegmentSize = 8,
+    size_t LgAlign = 7,
     template <typename> class Atom = std::atomic>
 using USPMCQueue =
-    UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, Atom>;
+    UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
 
 template <
     typename T,
     bool MayBlock,
     size_t LgSegmentSize = 8,
+    size_t LgAlign = 7,
     template <typename> class Atom = std::atomic>
 using UMPMCQueue =
-    UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, Atom>;
+    UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
 
 } // namespace folly
-
-#include <folly/concurrency/UnboundedQueue-inl.h>
index 4eccea114d9d1aeeac44a8109fec029be6894a01..0139b156f41e8c4f729f9fafa024c03826227ee5 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2017-present Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
 DEFINE_bool(bench, false, "run benchmark");
 DEFINE_int32(reps, 10, "number of reps");
 DEFINE_int32(ops, 1000000, "number of operations per rep");
+DEFINE_int64(capacity, 256 * 1024, "capacity");
 
 template <typename T, bool MayBlock>
 using USPSC = folly::USPSCQueue<T, MayBlock>;
@@ -81,7 +82,7 @@ TEST(UnboundedQueue, basic) {
 template <template <typename, bool> class Q, bool MayBlock>
 void timeout_test() {
   Q<int, MayBlock> q;
-  int v = -1;
+  int v;
   ASSERT_FALSE(q.try_dequeue_until(
       v, std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
   ASSERT_FALSE(q.try_dequeue_for(v, std::chrono::microseconds(1)));
@@ -189,9 +190,8 @@ void enq_deq_test(const int nprod, const int ncons) {
           /* keep trying */;
         }
       } else if ((i % 3) == 1) {
-        std::chrono::steady_clock::time_point deadline =
-            std::chrono::steady_clock::now() + std::chrono::milliseconds(1);
-        while (!q.try_dequeue_until(v, deadline)) {
+        auto duration = std::chrono::milliseconds(1);
+        while (!q.try_dequeue_for(v, duration)) {
           /* keep trying */;
         }
       } else {
@@ -293,8 +293,6 @@ uint64_t bench(const int nprod, const int ncons, const std::string& name) {
       }
     };
     auto cons = [&](int tid) {
-      std::chrono::steady_clock::time_point deadline =
-          std::chrono::steady_clock::now() + std::chrono::hours(24);
       uint64_t mysum = 0;
       for (int i = tid; i < ops; i += ncons) {
         T v;
@@ -303,7 +301,8 @@ uint64_t bench(const int nprod, const int ncons, const std::string& name) {
             /* keep trying */;
           }
         } else if (Op == 1 || Op == 4) {
-          while (UNLIKELY(!q.try_dequeue_until(v, deadline))) {
+          auto duration = std::chrono::microseconds(1000);
+          while (UNLIKELY(!q.try_dequeue_for(v, duration))) {
             /* keep trying */;
           }
         } else {
@@ -329,12 +328,12 @@ uint64_t bench(const int nprod, const int ncons, const std::string& name) {
 }
 
 /* For performance comparison */
-template <typename T, size_t capacity>
+template <typename T>
 class MPMC {
   folly::MPMCQueue<T> q_;
 
  public:
-  MPMC() : q_(capacity) {}
+  MPMC() : q_(FLAGS_capacity) {}
 
   template <typename... Args>
   void enqueue(Args&&... args) {
@@ -349,23 +348,24 @@ class MPMC {
     return q_.read(item);
   }
 
-  template <typename Clock, typename Duration>
-  bool try_dequeue_until(
+  template <typename Rep, typename Period>
+  bool try_dequeue_for(
       T& item,
-      const std::chrono::time_point<Clock, Duration>& deadline) {
+      const std::chrono::duration<Rep, Period>& duration) noexcept {
+    auto deadline = std::chrono::steady_clock::now() + duration;
     return q_.tryReadUntil(deadline, item);
   }
 };
 
 template <typename T, bool ignore>
-using FMPMC = MPMC<T, 256 * 1024>;
+using FMPMC = MPMC<T>;
 
-template <typename T, size_t capacity>
+template <typename T>
 class PCQ {
   folly::ProducerConsumerQueue<T> q_;
 
  public:
-  PCQ() : q_(capacity) {}
+  PCQ() : q_(FLAGS_capacity) {}
 
   template <typename... Args>
   void enqueue(Args&&... args) {
@@ -382,14 +382,14 @@ class PCQ {
     return q_.read(item);
   }
 
-  template <typename Clock, typename Duration>
-  bool try_dequeue_until(T&, const std::chrono::time_point<Clock, Duration>&) {
+  template <typename Rep, typename Period>
+  bool try_dequeue_for(T&, const std::chrono::duration<Rep, Period>&) noexcept {
     return false;
   }
 };
 
 template <typename T, bool ignore>
-using FPCQ = PCQ<T, 256 * 1024>;
+using FPCQ = PCQ<T>;
 
 template <size_t M>
 struct IntArray {
@@ -479,8 +479,9 @@ TEST(UnboundedQueue, bench) {
   dottedLine();
   std::cout << "$ numactl -N 1 $dir/unbounded_queue_test --bench\n";
   dottedLine();
-  std::cout << "Using a capacity of 256K for folly::ProducerConsumerQueue\n"
-            << "and folly::MPMCQueue\n";
+  std::cout << "Using capacity " << FLAGS_capacity
+            << " for folly::ProducerConsumerQueue and\n"
+            << "folly::MPMCQueue\n";
   std::cout << "=============================================================="
             << std::endl;
   std::cout << "Test name                         Max time  Avg time  Min time"
@@ -509,8 +510,8 @@ TEST(UnboundedQueue, bench) {
 ..............................................................
 $ numactl -N 1 $dir/unbounded_queue_test --bench
 ..............................................................
-Using a capacity of 256K for folly::ProducerConsumerQueue
-and folly::MPMCQueue
+Using capacity 262144 for folly::ProducerConsumerQueue and
+folly::MPMCQueue
 ==============================================================
 Test name                         Max time  Avg time  Min time
 ======================  1 prod   1 cons ======================
@@ -616,14 +617,14 @@ Unbounded SPMC timed spin only      202 ns    198 ns    193 ns
 Unbounded SPMC wait  spin only       36 ns     36 ns     35 ns
 Unbounded SPMC try   may block      202 ns    195 ns    190 ns
 Unbounded SPMC timed may block      208 ns    197 ns    190 ns
-Unbounded SPMC wait  may block     1645 ns   1427 ns     36 ns
+Unbounded SPMC wait  may block       96 ns     77 ns     64 ns
 ..............................................................
 Unbounded MPMC try   spin only      204 ns    198 ns    194 ns
 Unbounded MPMC timed spin only      202 ns    195 ns    190 ns
 Unbounded MPMC wait  spin only       61 ns     59 ns     57 ns
 Unbounded MPMC try   may block      206 ns    196 ns    191 ns
 Unbounded MPMC timed may block      204 ns    198 ns    192 ns
-Unbounded MPMC wait  may block     1658 ns   1293 ns     70 ns
+Unbounded MPMC wait  may block      100 ns     88 ns     84 ns
 ..............................................................
 folly::MPMC  read                   210 ns    191 ns    182 ns
 folly::MPMC  tryReadUntil           574 ns    248 ns    192 ns
@@ -636,14 +637,14 @@ Unbounded SPMC timed spin only      208 ns    205 ns    200 ns
 Unbounded SPMC wait  spin only      175 ns     51 ns     33 ns
 Unbounded SPMC try   may block      215 ns    203 ns    186 ns
 Unbounded SPMC timed may block      453 ns    334 ns    204 ns
-Unbounded SPMC wait  may block     1601 ns   1514 ns   1373 ns
+Unbounded SPMC wait  may block      110 ns     87 ns     55 ns
 ..............................................................
 Unbounded MPMC try   spin only      328 ns    218 ns    197 ns
 Unbounded MPMC timed spin only      217 ns    206 ns    200 ns
 Unbounded MPMC wait  spin only      147 ns     85 ns     58 ns
 Unbounded MPMC try   may block      310 ns    223 ns    199 ns
 Unbounded MPMC timed may block      461 ns    275 ns    196 ns
-Unbounded MPMC wait  may block     1623 ns   1526 ns    888 ns
+Unbounded MPMC wait  may block      148 ns    111 ns     78 ns
 ..............................................................
 folly::MPMC  read                   280 ns    215 ns    194 ns
 folly::MPMC  tryReadUntil          28740 ns   13508 ns    212 ns