/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
/// use noexcept, you will have to wrap it in something that provides
/// the guarantee. We provide an alternate safe implementation for types
/// that don't use noexcept but that are marked folly::IsRelocatable
-/// and boost::has_nothrow_constructor, which is common for folly types.
+/// and std::is_nothrow_constructible, which is common for folly types.
/// In particular, if you can declare FOLLY_ASSUME_FBVECTOR_COMPATIBLE
/// then your type can be put in MPMCQueue.
///
/// call to blockingWrite() when the queue size is known to be equal
/// to its capacity.
///
+/// Note that some writeIfNotFull() and tryWriteUntil() operations may
+/// fail even if the size of the queue is less than its maximum
+/// capacity and despite the success of expansion, if the operation
+/// happens to acquire a ticket that belongs to a closed array. This
+/// is a transient condition. Typically, one or two ticket values may
+/// be subject to such condition per expansion.
+///
/// The dynamic version is a partial specialization of MPMCQueue with
/// Dynamic == true
template <typename T, template<typename> class Atom>
uint64_t offset;
do {
if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
continue;
}
if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
int stride;
uint64_t state;
uint64_t offset;
- while (!trySeqlockReadSection(state, slots, cap, stride));
+ while (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
+ }
// If there was an expansion after the corresponding push ticket
// was issued, adjust accordingly
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
do {
ticket = this->pushTicket_.load(std::memory_order_acquire); // A
if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
continue;
}
ticket = this->pushTicket_.load(std::memory_order_acquire);
auto numPops = this->popTicket_.load(std::memory_order_acquire);
if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
continue;
}
- const auto oldCap = cap;
+ const auto curCap = cap;
// If there was an expansion with offset greater than this ticket,
// adjust accordingly
uint64_t offset;
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
int64_t n = ticket - numPops;
- if (n >= static_cast<ssize_t>(this->capacity_)) {
- ticket -= offset;
- return false;
- }
- if (n >= static_cast<ssize_t>(oldCap)) {
- if (tryExpand(state, oldCap)) {
- // This or another thread started an expansion. Start over
- // with a new state.
+ if (n >= static_cast<ssize_t>(cap)) {
+ if ((cap == curCap) && tryExpand(state, cap)) {
+ // This or another thread started an expansion. Start over.
continue;
- } else {
- // Can't expand.
- ticket -= offset;
- return false;
}
+ // Can't expand.
+ ticket -= offset;
+ return false;
}
if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
do {
ticket = this->popTicket_.load(std::memory_order_relaxed);
if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
continue;
}
ticket = this->popTicket_.load(std::memory_order_acquire);
auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ asm_volatile_pause();
continue;
}
ticket = numPushes;
const auto numPops = popTicket_.load(std::memory_order_acquire); // B
// n will be negative if pops are pending
- const int64_t n = numPushes - numPops;
+ const int64_t n = int64_t(numPushes - numPops);
if (n >= static_cast<ssize_t>(capacity_)) {
// Full, linearize at B. We don't need to recheck the read we
// performed at A, because if numPushes was stale at B then the
/// enqueue using move construction, either real (if
/// is_nothrow_move_constructible) or simulated using relocation and
- /// default construction (if IsRelocatable and has_nothrow_constructor)
- template <typename = typename std::enable_if<
- (folly::IsRelocatable<T>::value &&
- boost::has_nothrow_constructor<T>::value) ||
- std::is_nothrow_constructible<T, T&&>::value>::type>
- void enqueue(const uint32_t turn,
- Atom<uint32_t>& spinCutoff,
- const bool updateSpinCutoff,
- T&& goner) noexcept {
+ /// default construction (if IsRelocatable and is_nothrow_constructible)
+ template <
+ typename = typename std::enable_if<
+ (folly::IsRelocatable<T>::value &&
+ std::is_nothrow_constructible<T>::value) ||
+ std::is_nothrow_constructible<T, T&&>::value>::type>
+ void enqueue(
+ const uint32_t turn,
+ Atom<uint32_t>& spinCutoff,
+ const bool updateSpinCutoff,
+ T&& goner) noexcept {
enqueueImpl(
turn,
spinCutoff,