/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
/// use noexcept, you will have to wrap it in something that provides
/// the guarantee. We provide an alternate safe implementation for types
/// that don't use noexcept but that are marked folly::IsRelocatable
-/// and boost::has_nothrow_constructor, which is common for folly types.
+/// and std::is_nothrow_constructible, which is common for folly types.
/// In particular, if you can declare FOLLY_ASSUME_FBVECTOR_COMPATIBLE
/// then your type can be put in MPMCQueue.
///
/// call to blockingWrite() when the queue size is known to be equal
/// to its capacity.
///
+/// Note that some writeIfNotFull() and tryWriteUntil() operations may
+/// fail even if the size of the queue is less than its maximum
+/// capacity and despite the success of expansion, if the operation
+/// happens to acquire a ticket that belongs to a closed array. This
+/// is a transient condition. Typically, one or two ticket values may
+/// be subject to such condition per expansion.
+///
/// The dynamic version is a partial specialization of MPMCQueue with
/// Dynamic == true
template <typename T, template<typename> class Atom>
uint64_t offset;
do {
if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
continue;
}
- offset = getOffset(state);
- if (ticket < offset) {
+ if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
// There was an expansion after this ticket was issued.
- updateFromClosed(state, ticket, offset, slots, cap, stride);
break;
}
if (slots[this->idx((ticket-offset), cap, stride)]
int stride;
uint64_t state;
uint64_t offset;
- while (!trySeqlockReadSection(state, slots, cap, stride));
- offset = getOffset(state);
- if (ticket < offset) {
- // There was an expansion after the corresponding push ticket
- // was issued.
- updateFromClosed(state, ticket, offset, slots, cap, stride);
+ while (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
}
+ // If there was an expansion after the corresponding push ticket
+ // was issued, adjust accordingly
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
this->dequeueWithTicketBase(ticket-offset, slots, cap, stride, elem);
}
do {
ticket = this->pushTicket_.load(std::memory_order_acquire); // A
if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
continue;
}
- uint64_t offset = getOffset(state);
- if (ticket < offset) {
- // There was an expansion with offset greater than this ticket
- updateFromClosed(state, ticket, offset, slots, cap, stride);
- }
+
+ // If there was an expansion with offset greater than this ticket,
+ // adjust accordingly
+ uint64_t offset;
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+
if (slots[this->idx((ticket-offset), cap, stride)]
.mayEnqueue(this->turn(ticket-offset, cap))) {
// A slot is ready.
ticket = this->pushTicket_.load(std::memory_order_acquire);
auto numPops = this->popTicket_.load(std::memory_order_acquire);
if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
continue;
}
+
+ const auto curCap = cap;
+ // If there was an expansion with offset greater than this ticket,
+ // adjust accordingly
+ uint64_t offset;
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+
int64_t n = ticket - numPops;
- if (n >= static_cast<ssize_t>(this->capacity_)) {
- return false;
- }
- if ((n >= static_cast<ssize_t>(cap))) {
- if (tryExpand(state, cap)) {
- // This or another thread started an expansion. Start over
- // with a new state.
+
+ if (n >= static_cast<ssize_t>(cap)) {
+ if ((cap == curCap) && tryExpand(state, cap)) {
+ // This or another thread started an expansion. Start over.
continue;
- } else {
- // Can't expand.
- return false;
}
+ // Can't expand.
+ ticket -= offset;
+ return false;
}
- uint64_t offset = getOffset(state);
- if (ticket < offset) {
- // There was an expansion with offset greater than this ticket
- updateFromClosed(state, ticket, offset, slots, cap, stride);
- }
+
if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
// Adjust ticket
ticket -= offset;
do {
ticket = this->popTicket_.load(std::memory_order_relaxed);
if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
continue;
}
- uint64_t offset = getOffset(state);
- if (ticket < offset) {
- // There was an expansion after the corresponding push ticket
- // was issued.
- updateFromClosed(state, ticket, offset, slots, cap, stride);
- }
+
+ // If there was an expansion after the corresponding push ticket
+ // was issued, adjust accordingly
+ uint64_t offset;
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+
if (slots[this->idx((ticket-offset), cap, stride)]
.mayDequeue(this->turn(ticket-offset, cap))) {
if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
ticket = this->popTicket_.load(std::memory_order_acquire);
auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
continue;
}
+
+ uint64_t offset;
+ // If there was an expansion after the corresponding push
+ // ticket was issued, adjust accordingly
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+
if (ticket >= numPushes) {
+ ticket -= offset;
return false;
}
if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
- // Adjust ticket
- uint64_t offset = getOffset(state);
- if (ticket < offset) {
- // There was an expansion after the corresponding push
- // ticket was issued.
- updateFromClosed(state, ticket, offset, slots, cap, stride);
- }
- // Adjust ticket
ticket -= offset;
return true;
}
int stride;
uint64_t state;
uint64_t offset;
+
while (!trySeqlockReadSection(state, slots, cap, stride)) {}
- offset = getOffset(state);
- if (ticket < offset) {
- // There was an expansion after this ticket was issued.
- updateFromClosed(state, ticket, offset, slots, cap, stride);
- }
+
+ // If there was an expansion after this ticket was issued, adjust
+ // accordingly
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+
this->enqueueWithTicketBase(ticket-offset, slots, cap, stride,
std::forward<Args>(args)...);
}
return (state == this->dstate_.load(std::memory_order_relaxed));
}
- /// Update local variables of a lagging operation using the
- /// most recent closed array with offset <= ticket
- void updateFromClosed(
- const uint64_t state, const uint64_t ticket,
- uint64_t& offset, Slot*& slots, size_t& cap, int& stride
- ) noexcept {
+ /// If there was an expansion after ticket was issued, update local variables
+ /// of the lagging operation using the most recent closed array with
+ /// offset <= ticket and return true. Otherwise, return false;
+ bool maybeUpdateFromClosed(
+ const uint64_t state,
+ const uint64_t ticket,
+ uint64_t& offset,
+ Slot*& slots,
+ size_t& cap,
+ int& stride) noexcept {
+ offset = getOffset(state);
+ if (ticket >= offset) {
+ return false;
+ }
for (int i = getNumClosed(state) - 1; i >= 0; --i) {
offset = closed_[i].offset_;
if (offset <= ticket) {
slots = closed_[i].slots_;
cap = closed_[i].capacity_;
stride = closed_[i].stride_;
- return;;
+ return true;
}
}
// A closed array with offset <= ticket should have been found
assert(false);
+ return false;
}
};
if (pushes == nextPushes) {
// pushTicket_ didn't change from A (or the previous C) to C,
// so we can linearize at B (or D)
- return pushes - pops;
+ return ssize_t(pushes - pops);
}
pushes = nextPushes;
uint64_t nextPops = popTicket_.load(std::memory_order_acquire); // D
if (pops == nextPops) {
// popTicket_ didn't chance from B (or the previous D), so we
// can linearize at C
- return pushes - pops;
+ return ssize_t(pushes - pops);
}
pops = nextPops;
}
/// Same as blockingRead() but also records the ticket nunmer
void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept {
+ assert(capacity_ != 0);
ticket = popTicket_++;
dequeueWithTicketBase(ticket, slots_, capacity_, stride_, elem);
}
}
}
+ template <class Clock, typename... Args>
+ bool tryReadUntil(
+ const std::chrono::time_point<Clock>& when,
+ T& elem) noexcept {
+ uint64_t ticket;
+ Slot* slots;
+ size_t cap;
+ int stride;
+ if (tryObtainPromisedPopTicketUntil(ticket, slots, cap, stride, when)) {
+ // we have pre-validated that the ticket won't block, or rather that
+ // it won't block longer than it takes another thread to enqueue an
+ // element on the slot it identifies.
+ dequeueWithTicketBase(ticket, slots, cap, stride, elem);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
/// If the queue is not empty, dequeues and returns true, otherwise
/// returns false. If the matching write is still in progress then this
/// method may block waiting for it. If you don't rely on being able
/// Maps an enqueue or dequeue ticket to the turn should be used at the
/// corresponding SingleElementQueue
uint32_t turn(uint64_t ticket, size_t cap) noexcept {
- return ticket / cap;
+ assert(cap != 0);
+ return uint32_t(ticket / cap);
}
/// Tries to obtain a push ticket for which SingleElementQueue::enqueue
cap = capacity_;
stride = stride_;
while (true) {
- auto numPops = popTicket_.load(std::memory_order_acquire); // B
- // n will be negative if pops are pending
- int64_t n = numPushes - numPops;
ticket = numPushes;
+ const auto numPops = popTicket_.load(std::memory_order_acquire); // B
+ // n will be negative if pops are pending
+ const int64_t n = int64_t(numPushes - numPops);
if (n >= static_cast<ssize_t>(capacity_)) {
// Full, linearize at B. We don't need to recheck the read we
// performed at A, because if numPushes was stale at B then the
}
}
+ /// Tries until when to obtain a pop ticket for which
+ /// SingleElementQueue::dequeue won't block. Returns true on success, false
+ /// on failure.
+ /// ticket is filled on success AND failure.
+ template <class Clock>
+ bool tryObtainPromisedPopTicketUntil(
+ uint64_t& ticket,
+ Slot*& slots,
+ size_t& cap,
+ int& stride,
+ const std::chrono::time_point<Clock>& when) noexcept {
+ bool deadlineReached = false;
+ while (!deadlineReached) {
+ if (static_cast<Derived<T, Atom, Dynamic>*>(this)
+ ->tryObtainPromisedPopTicket(ticket, slots, cap, stride)) {
+ return true;
+ }
+ // ticket is a blocking ticket until the preceding ticket has been
+ // processed: wait until this ticket's turn arrives. We have not reserved
+ // this ticket so we will have to re-attempt to get a non-blocking ticket
+ // if we wake up before we time-out.
+ deadlineReached =
+ !slots[idx(ticket, cap, stride)].tryWaitForDequeueTurnUntil(
+ turn(ticket, cap),
+ pushSpinCutoff_,
+ (ticket % kAdaptationFreq) == 0,
+ when);
+ }
+ return false;
+ }
+
/// Similar to tryObtainReadyPopTicket, but returns a pop ticket whose
/// corresponding push ticket has already been handed out, rather than
/// returning one whose corresponding push ticket has already been
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
auto numPops = popTicket_.load(std::memory_order_acquire); // A
+ slots = slots_;
+ cap = capacity_;
+ stride = stride_;
while (true) {
- auto numPushes = pushTicket_.load(std::memory_order_acquire); // B
+ ticket = numPops;
+ const auto numPushes = pushTicket_.load(std::memory_order_acquire); // B
if (numPops >= numPushes) {
// Empty, or empty with pending pops. Linearize at B. We don't
// need to recheck the read we performed at A, because if numPops
return false;
}
if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) {
- ticket = numPops;
- slots = slots_;
- cap = capacity_;
- stride = stride_;
return true;
}
}
void dequeueWithTicketBase(
uint64_t ticket, Slot* slots, size_t cap, int stride, T& elem
) noexcept {
+ assert(cap != 0);
slots[idx(ticket, cap, stride)]
.dequeue(turn(ticket, cap),
popSpinCutoff_,
/// enqueue using move construction, either real (if
/// is_nothrow_move_constructible) or simulated using relocation and
- /// default construction (if IsRelocatable and has_nothrow_constructor)
- template <typename = typename std::enable_if<
- (folly::IsRelocatable<T>::value &&
- boost::has_nothrow_constructor<T>::value) ||
- std::is_nothrow_constructible<T, T&&>::value>::type>
- void enqueue(const uint32_t turn,
- Atom<uint32_t>& spinCutoff,
- const bool updateSpinCutoff,
- T&& goner) noexcept {
+ /// default construction (if IsRelocatable and is_nothrow_constructible)
+ template <
+ typename = typename std::enable_if<
+ (folly::IsRelocatable<T>::value &&
+ std::is_nothrow_constructible<T>::value) ||
+ std::is_nothrow_constructible<T, T&&>::value>::type>
+ void enqueue(
+ const uint32_t turn,
+ Atom<uint32_t>& spinCutoff,
+ const bool updateSpinCutoff,
+ T&& goner) noexcept {
enqueueImpl(
turn,
spinCutoff,
const bool updateSpinCutoff,
const std::chrono::time_point<Clock>& when) noexcept {
return sequencer_.tryWaitForTurn(
- turn * 2, spinCutoff, updateSpinCutoff, &when);
+ turn * 2, spinCutoff, updateSpinCutoff, &when) !=
+ TurnSequencer<Atom>::TryWaitResult::TIMEDOUT;
}
bool mayEnqueue(const uint32_t turn) const noexcept {
ImplByMove>::type());
}
+ /// Waits until either:
+ /// 1: the enqueue turn preceding the given dequeue turn has arrived
+ /// 2: the given deadline has arrived
+ /// Case 1 returns true, case 2 returns false.
+ template <class Clock>
+ bool tryWaitForDequeueTurnUntil(
+ const uint32_t turn,
+ Atom<uint32_t>& spinCutoff,
+ const bool updateSpinCutoff,
+ const std::chrono::time_point<Clock>& when) noexcept {
+ return sequencer_.tryWaitForTurn(
+ turn * 2 + 1, spinCutoff, updateSpinCutoff, &when) !=
+ TurnSequencer<Atom>::TryWaitResult::TIMEDOUT;
+ }
+
bool mayDequeue(const uint32_t turn) const noexcept {
return sequencer_.isTurn(turn * 2 + 1);
}