#include <algorithm>
#include <atomic>
-#include <assert.h>
-#include <boost/noncopyable.hpp>
+#include <cassert>
+#include <cstring>
#include <limits>
-#include <string.h>
#include <type_traits>
+#include <boost/noncopyable.hpp>
+
#include <folly/Traits.h>
-#include <folly/detail/CacheLocality.h>
+#include <folly/concurrency/CacheLocality.h>
#include <folly/detail/TurnSequencer.h>
#include <folly/portability/Unistd.h>
namespace detail {
-template<typename T, template<typename> class Atom>
+template <typename T, template <typename> class Atom>
struct SingleElementQueue;
template <typename T> class MPMCPipelineStageImpl;
/// are you can enqueue one sentinel and then have each consumer requeue
/// two sentinels after it receives it (by requeuing 2 the shutdown can
/// complete in O(log P) time instead of O(P)).
-template<typename T, template<typename> class Atom = std::atomic,
- bool Dynamic = false>
+template <
+ typename T,
+ template <typename> class Atom = std::atomic,
+ bool Dynamic = false>
class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
friend class detail::MPMCPipelineStageImpl<T>;
using Slot = detail::SingleElementQueue<T,Atom>;
/// closed arrays instead of the current one. Information about closed
/// slots arrays (array address, capacity, stride, and offset) is
/// maintained in a logarithmic-sized structure. Each entry in that
-/// structure never need to be changed once set. The number of closed
+/// structure never needs to be changed once set. The number of closed
/// arrays is half the value of the seqlock (when unlocked).
///
/// The acquisition of the seqlock to perform an expansion does not
///
/// The dynamic version is a partial specialization of MPMCQueue with
/// Dynamic == true
-template <typename T, template<typename> class Atom>
+template <typename T, template <typename> class Atom>
class MPMCQueue<T,Atom,true> :
public detail::MPMCQueueBase<MPMCQueue<T,Atom,true>> {
friend class detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>;
// There was an expansion after this ticket was issued.
break;
}
- if (slots[this->idx((ticket-offset), cap, stride)]
- .mayEnqueue(this->turn(ticket-offset, cap))) {
+ if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+ this->turn(ticket - offset, cap))) {
// A slot is ready. No need to expand.
break;
- } else if (this->popTicket_.load(std::memory_order_relaxed) + cap
- > ticket) {
+ } else if (
+ this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) {
// May block, but a pop is in progress. No need to expand.
// Get seqlock read section info again in case an expansion
// occurred with an equal or higher ticket.
}
private:
-
enum {
kSeqlockBits = 6,
kDefaultMinDynamicCapacity = 10,
uint64_t offset;
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
- if (slots[this->idx((ticket-offset), cap, stride)]
- .mayEnqueue(this->turn(ticket-offset, cap))) {
+ if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+ this->turn(ticket - offset, cap))) {
// A slot is ready.
if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
// Adjust ticket
uint64_t offset;
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
- if (slots[this->idx((ticket-offset), cap, stride)]
- .mayDequeue(this->turn(ticket-offset, cap))) {
+ if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue(
+ this->turn(ticket - offset, cap))) {
if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
// Adjust ticket
ticket -= offset;
uint64_t state;
uint64_t offset;
- while (!trySeqlockReadSection(state, slots, cap, stride)) {}
+ while (!trySeqlockReadSection(state, slots, cap, stride)) {
+ }
// If there was an expansion after this ticket was issued, adjust
// accordingly
assert((state & 1) == 0);
if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
assert(cap == this->dcapacity_.load());
- uint64_t ticket = 1 + std::max(this->pushTicket_.load(),
- this->popTicket_.load());
- size_t newCapacity =
- std::min(dmult_ * cap, this->capacity_);
+ uint64_t ticket =
+ 1 + std::max(this->pushTicket_.load(), this->popTicket_.load());
+ size_t newCapacity = std::min(dmult_ * cap, this->capacity_);
Slot* newSlots =
- new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
+ new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
if (newSlots == nullptr) {
// Expansion failed. Restore the seqlock
this->dstate_.store(state);
namespace detail {
/// CRTP specialization of MPMCQueueBase
-template<
- template<
- typename T, template<typename> class Atom, bool Dynamic> class Derived,
- typename T, template<typename> class Atom, bool Dynamic>
+template <
+ template <typename T, template <typename> class Atom, bool Dynamic>
+ class Derived,
+ typename T,
+ template <typename> class Atom,
+ bool Dynamic>
class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
// Note: Using CRTP static casts in several functions of this base
}
// ideally this would be a static assert, but g++ doesn't allow it
- assert(alignof(MPMCQueue<T,Atom>)
- >= detail::CacheLocality::kFalseSharingRange);
- assert(static_cast<uint8_t*>(static_cast<void*>(&popTicket_))
- - static_cast<uint8_t*>(static_cast<void*>(&pushTicket_))
- >= detail::CacheLocality::kFalseSharingRange);
+ assert(alignof(MPMCQueue<T, Atom>) >= CacheLocality::kFalseSharingRange);
+ assert(
+ static_cast<uint8_t*>(static_cast<void*>(&popTicket_)) -
+ static_cast<uint8_t*>(static_cast<void*>(&pushTicket_)) >=
+ CacheLocality::kFalseSharingRange);
}
/// A default-constructed queue is useful because a usable (non-zero
/// To avoid false sharing in slots_ with neighboring memory
/// allocations, we pad it with this many SingleElementQueue-s at
/// each end
- kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1)
- / sizeof(Slot) + 1
+ kSlotPadding = (CacheLocality::kFalseSharingRange - 1) / sizeof(Slot) + 1
};
/// The maximum number of items in the queue at once
/// Alignment doesn't prevent false sharing at the end of the struct,
/// so fill out the last cache line
- char padding_[detail::CacheLocality::kFalseSharingRange -
- sizeof(Atom<uint32_t>)];
+ char padding_[CacheLocality::kFalseSharingRange - sizeof(Atom<uint32_t>)];
/// We assign tickets in increasing order, but we don't want to
/// access neighboring elements of slots_ because that will lead to
ticket = numPushes;
const auto numPops = popTicket_.load(std::memory_order_acquire); // B
// n will be negative if pops are pending
- const int64_t n = numPushes - numPops;
+ const int64_t n = int64_t(numPushes - numPops);
if (n >= static_cast<ssize_t>(capacity_)) {
// Full, linearize at B. We don't need to recheck the read we
// performed at A, because if numPushes was stale at B then the
}
/// enqueue using in-place noexcept construction
- template <typename ...Args,
- typename = typename std::enable_if<
- std::is_nothrow_constructible<T,Args...>::value>::type>
+ template <
+ typename... Args,
+ typename = typename std::enable_if<
+ std::is_nothrow_constructible<T, Args...>::value>::type>
void enqueue(const uint32_t turn,
Atom<uint32_t>& spinCutoff,
const bool updateSpinCutoff,