/*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <chrono>
#include <memory>
+#include <glog/logging.h>
+
#include <folly/concurrency/CacheLocality.h>
+#include <folly/experimental/hazptr/hazptr.h>
+#include <folly/synchronization/SaturatingSemaphore.h>
namespace folly {
/// spins. A performance tuning parameter.
/// - LgSegmentSize (default 8): Log base 2 of number of elements per
/// segment. A performance tuning parameter. See below.
+/// - LgAlign (default 7): Log base 2 of alignment directive; can be
+/// used to balance scalability (avoidance of false sharing) with
+/// memory efficiency.
///
/// When to use UnboundedQueue:
/// - If a small bound may lead to deadlock or performance degradation
/// SPSC) ProducerConsumerQueue.
///
/// Template Aliases:
-/// USPSCQueue<T, MayBlock, LgSegmentSize>
-/// UMPSCQueue<T, MayBlock, LgSegmentSize>
-/// USPMCQueue<T, MayBlock, LgSegmentSize>
-/// UMPMCQueue<T, MayBlock, LgSegmentSize>
+/// USPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+/// UMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+/// USPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+/// UMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
///
/// Functions:
/// Producer operations never wait or fail (unless OOM)
/// producers or consumers, have references to them or their
/// predessors. That is, a lagging thread may delay the reclamation
/// of a chain of removed segments.
+/// - The template parameter LgAlign can be used to reduce memory usage
+/// at the cost of increased chance of false sharing.
///
/// Performance considerations:
/// - All operations take constant time, excluding the costs of
bool SingleConsumer,
bool MayBlock,
size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
template <typename> class Atom = std::atomic>
class UnboundedQueue {
using Ticket = uint64_t;
class Segment;
static constexpr bool SPSC = SingleProducer && SingleConsumer;
- static constexpr size_t SegmentSize = 1 << LgSegmentSize;
static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27;
+ static constexpr size_t SegmentSize = 1u << LgSegmentSize;
+ static constexpr size_t Align = 1u << LgAlign;
static_assert(
std::is_nothrow_destructible<T>::value,
"T must be nothrow_destructible");
static_assert((Stride & 1) == 1, "Stride must be odd");
static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32");
+ static_assert(LgAlign < 16, "LgAlign must be < 16");
- FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
+ FOLLY_ALIGNED(Align)
Atom<Segment*> head_;
Atom<Ticket> consumerTicket_;
- FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
+ FOLLY_ALIGNED(Align)
Atom<Segment*> tail_;
Atom<Ticket> producerTicket_;
public:
- UnboundedQueue();
- ~UnboundedQueue();
+ /** constructor */
+ UnboundedQueue() {
+ setProducerTicket(0);
+ setConsumerTicket(0);
+ Segment* s = new Segment(0);
+ setTail(s);
+ setHead(s);
+ }
+
+ /** destructor */
+ ~UnboundedQueue() {
+ Segment* next;
+ for (Segment* s = head(); s; s = next) {
+ next = s->nextSegment();
+ reclaimSegment(s);
+ }
+ }
/** enqueue */
FOLLY_ALWAYS_INLINE void enqueue(const T& arg) {
}
/** dequeue */
- void dequeue(T& item) noexcept;
+ FOLLY_ALWAYS_INLINE void dequeue(T& item) noexcept {
+ dequeueImpl(item);
+ }
/** try_dequeue */
- bool try_dequeue(T& item) noexcept {
- return try_dequeue_until(
- item, std::chrono::steady_clock::time_point::min());
+ FOLLY_ALWAYS_INLINE bool try_dequeue(T& item) noexcept {
+ return tryDequeueUntil(item, std::chrono::steady_clock::time_point::min());
}
/** try_dequeue_until */
template <typename Clock, typename Duration>
- bool try_dequeue_until(
+ FOLLY_ALWAYS_INLINE bool try_dequeue_until(
T& item,
- const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ return tryDequeueUntil(item, deadline);
+ }
/** try_dequeue_for */
template <typename Rep, typename Period>
- bool try_dequeue_for(
+ FOLLY_ALWAYS_INLINE bool try_dequeue_for(
T& item,
const std::chrono::duration<Rep, Period>& duration) noexcept {
- return try_dequeue_until(item, std::chrono::steady_clock::now() + duration);
+ if (LIKELY(try_dequeue(item))) {
+ return true;
+ }
+ return tryDequeueUntil(item, std::chrono::steady_clock::now() + duration);
}
/** size */
}
private:
+ /** enqueueImpl */
template <typename Arg>
- void enqueueImpl(Arg&& arg);
+ FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& arg) {
+ if (SPSC) {
+ Segment* s = tail();
+ enqueueCommon(s, std::forward<Arg>(arg));
+ } else {
+ // Using hazptr_holder instead of hazptr_local because it is
+ // possible that the T ctor happens to use hazard pointers.
+ folly::hazptr::hazptr_holder hptr;
+ Segment* s = hptr.get_protected(tail_);
+ enqueueCommon(s, std::forward<Arg>(arg));
+ }
+ }
+ /** enqueueCommon */
template <typename Arg>
- void enqueueCommon(Segment* s, Arg&& arg);
+ FOLLY_ALWAYS_INLINE void enqueueCommon(Segment* s, Arg&& arg) {
+ Ticket t = fetchIncrementProducerTicket();
+ if (!SingleProducer) {
+ s = findSegment(s, t);
+ }
+ DCHECK_GE(t, s->minTicket());
+ DCHECK_LT(t, s->minTicket() + SegmentSize);
+ size_t idx = index(t);
+ Entry& e = s->entry(idx);
+ e.putItem(std::forward<Arg>(arg));
+ if (responsibleForAlloc(t)) {
+ allocNextSegment(s, t + SegmentSize);
+ }
+ if (responsibleForAdvance(t)) {
+ advanceTail(s);
+ }
+ }
- void dequeueCommon(Segment* s, T& item) noexcept;
+ /** dequeueImpl */
+ FOLLY_ALWAYS_INLINE void dequeueImpl(T& item) noexcept {
+ if (SPSC) {
+ Segment* s = head();
+ dequeueCommon(s, item);
+ } else {
+ // Using hazptr_holder instead of hazptr_local because it is
+ // possible to call the T dtor and it may happen to use hazard
+ // pointers.
+ folly::hazptr::hazptr_holder hptr;
+ Segment* s = hptr.get_protected(head_);
+ dequeueCommon(s, item);
+ }
+ }
+ /** dequeueCommon */
+ FOLLY_ALWAYS_INLINE void dequeueCommon(Segment* s, T& item) noexcept {
+ Ticket t = fetchIncrementConsumerTicket();
+ if (!SingleConsumer) {
+ s = findSegment(s, t);
+ }
+ size_t idx = index(t);
+ Entry& e = s->entry(idx);
+ e.takeItem(item);
+ if (responsibleForAdvance(t)) {
+ advanceHead(s);
+ }
+ }
+
+ /** tryDequeueUntil */
template <typename Clock, typename Duration>
- bool singleConsumerTryDequeueUntil(
+ FOLLY_ALWAYS_INLINE bool tryDequeueUntil(
+ T& item,
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ if (SingleConsumer) {
+ Segment* s = head();
+ return tryDequeueUntilSC(s, item, deadline);
+ } else {
+ // Using hazptr_holder instead of hazptr_local because it is
+ // possible to call ~T() and it may happen to use hazard pointers.
+ folly::hazptr::hazptr_holder hptr;
+ Segment* s = hptr.get_protected(head_);
+ return ryDequeueUntilMC(s, item, deadline);
+ }
+ }
+
+ /** ryDequeueUntilSC */
+ template <typename Clock, typename Duration>
+ FOLLY_ALWAYS_INLINE bool tryDequeueUntilSC(
Segment* s,
T& item,
- const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ Ticket t = consumerTicket();
+ DCHECK_GE(t, s->minTicket());
+ DCHECK_LT(t, (s->minTicket() + SegmentSize));
+ size_t idx = index(t);
+ Entry& e = s->entry(idx);
+ if (!e.tryWaitUntil(deadline)) {
+ return false;
+ }
+ setConsumerTicket(t + 1);
+ e.takeItem(item);
+ if (responsibleForAdvance(t)) {
+ advanceHead(s);
+ }
+ return true;
+ }
+ /** tryDequeueUntilMC */
template <typename Clock, typename Duration>
- bool multiConsumerTryDequeueUntil(
+ FOLLY_ALWAYS_INLINE bool ryDequeueUntilMC(
Segment* s,
T& item,
- const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ while (true) {
+ Ticket t = consumerTicket();
+ if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
+ s = tryGetNextSegmentUntil(s, deadline);
+ if (s == nullptr) {
+ return false; // timed out
+ }
+ continue;
+ }
+ size_t idx = index(t);
+ Entry& e = s->entry(idx);
+ if (!e.tryWaitUntil(deadline)) {
+ return false;
+ }
+ if (!consumerTicket_.compare_exchange_weak(
+ t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
+ continue;
+ }
+ e.takeItem(item);
+ if (responsibleForAdvance(t)) {
+ advanceHead(s);
+ }
+ return true;
+ }
+ }
- Segment* findSegment(Segment* s, const Ticket t) const noexcept;
+ /** findSegment */
+ FOLLY_ALWAYS_INLINE
+ Segment* findSegment(Segment* s, const Ticket t) const noexcept {
+ while (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
+ auto deadline = std::chrono::steady_clock::time_point::max();
+ s = tryGetNextSegmentUntil(s, deadline);
+ DCHECK(s != nullptr);
+ }
+ return s;
+ }
- void allocNextSegment(Segment* s, const Ticket t);
+ /** tryGetNextSegmentUntil */
+ template <typename Clock, typename Duration>
+ Segment* tryGetNextSegmentUntil(
+ Segment* s,
+ const std::chrono::time_point<Clock, Duration>& deadline) const noexcept {
+ // The following loop will not spin indefinitely (as long as the
+ // number of concurrently waiting consumers does not exceeds
+ // SegmentSize and the OS scheduler does not pause ready threads
+ // indefinitely). Under such conditions, the algorithm guarantees
+ // that the producer reponsible for advancing the tail pointer to
+ // the next segment has already acquired its ticket.
+ while (tail() == s) {
+ if (deadline < Clock::time_point::max() && deadline > Clock::now()) {
+ return nullptr;
+ }
+ asm_volatile_pause();
+ }
+ Segment* next = s->nextSegment();
+ DCHECK(next != nullptr);
+ return next;
+ }
- void advanceTail(Segment* s) noexcept;
+ /** allocNextSegment */
+ void allocNextSegment(Segment* s, const Ticket t) {
+ Segment* next = new Segment(t);
+ if (!SPSC) {
+ next->acquire_ref_safe(); // hazptr
+ }
+ DCHECK(s->nextSegment() == nullptr);
+ s->setNextSegment(next);
+ }
- void advanceHead(Segment* s) noexcept;
+ /** advanceTail */
+ void advanceTail(Segment* s) noexcept {
+ Segment* next = s->nextSegment();
+ if (!SingleProducer) {
+ // The following loop will not spin indefinitely (as long as the
+ // OS scheduler does not pause ready threads indefinitely). The
+ // algorithm guarantees that the producer reponsible for setting
+ // the next pointer has already acquired its ticket.
+ while (next == nullptr) {
+ asm_volatile_pause();
+ next = s->nextSegment();
+ }
+ }
+ DCHECK(next != nullptr);
+ setTail(next);
+ }
+
+ /** advanceHead */
+ void advanceHead(Segment* s) noexcept {
+ auto deadline = std::chrono::steady_clock::time_point::max();
+ Segment* next = tryGetNextSegmentUntil(s, deadline);
+ DCHECK(next != nullptr);
+ setHead(next);
+ reclaimSegment(s);
+ }
+
+ /** reclaimSegment */
+ void reclaimSegment(Segment* s) noexcept {
+ if (SPSC) {
+ delete s;
+ } else {
+ s->retire(); // hazptr
+ }
+ }
FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept {
return (t * Stride) & (SegmentSize - 1);
FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept {
if (SingleConsumer) {
- auto oldval = consumerTicket();
+ Ticket oldval = consumerTicket();
setConsumerTicket(oldval + 1);
return oldval;
} else { // MC
FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept {
if (SingleProducer) {
- auto oldval = producerTicket();
+ Ticket oldval = producerTicket();
setProducerTicket(oldval + 1);
return oldval;
} else { // MP
return producerTicket_.fetch_add(1, std::memory_order_acq_rel);
}
}
+
+ /**
+ * Entry
+ */
+ class Entry {
+ folly::SaturatingSemaphore<MayBlock, Atom> flag_;
+ typename std::aligned_storage<sizeof(T), alignof(T)>::type item_;
+
+ public:
+ template <typename Arg>
+ FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) {
+ new (&item_) T(std::forward<Arg>(arg));
+ flag_.post();
+ }
+
+ FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept {
+ flag_.wait();
+ getItem(item);
+ }
+
+ template <typename Clock, typename Duration>
+ FOLLY_ALWAYS_INLINE bool tryWaitUntil(
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ return flag_.try_wait_until(deadline);
+ }
+
+ private:
+ FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept {
+ item = std::move(*(itemPtr()));
+ destroyItem();
+ }
+
+ FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
+ return static_cast<T*>(static_cast<void*>(&item_));
+ }
+
+ FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
+ itemPtr()->~T();
+ }
+ }; // Entry
+
+ /**
+ * Segment
+ */
+ class Segment : public folly::hazptr::hazptr_obj_base_refcounted<Segment> {
+ Atom<Segment*> next_;
+ const Ticket min_;
+ bool marked_; // used for iterative deletion
+ FOLLY_ALIGNED(Align)
+ Entry b_[SegmentSize];
+
+ public:
+ explicit Segment(const Ticket t)
+ : next_(nullptr), min_(t), marked_(false) {}
+
+ ~Segment() {
+ if (!SPSC && !marked_) {
+ Segment* next = nextSegment();
+ while (next) {
+ if (!next->release_ref()) { // hazptr
+ return;
+ }
+ Segment* s = next;
+ next = s->nextSegment();
+ s->marked_ = true;
+ delete s;
+ }
+ }
+ }
+
+ Segment* nextSegment() const noexcept {
+ return next_.load(std::memory_order_acquire);
+ }
+
+ void setNextSegment(Segment* s) noexcept {
+ next_.store(s, std::memory_order_release);
+ }
+
+ FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept {
+ DCHECK_EQ((min_ & (SegmentSize - 1)), 0);
+ return min_;
+ }
+
+ FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept {
+ return b_[index];
+ }
+ }; // Segment
+
}; // UnboundedQueue
/* Aliases */
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
template <typename> class Atom = std::atomic>
-using USPSCQueue = UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, Atom>;
+using USPSCQueue =
+ UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
template <
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
template <typename> class Atom = std::atomic>
using UMPSCQueue =
- UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, Atom>;
+ UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
template <
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
template <typename> class Atom = std::atomic>
using USPMCQueue =
- UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, Atom>;
+ UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
template <
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
template <typename> class Atom = std::atomic>
using UMPMCQueue =
- UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, Atom>;
+ UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
} // namespace folly
-
-#include <folly/concurrency/UnboundedQueue-inl.h>