return false;
}
+ // move (or copy) the value at the front of the queue to given variable
bool read(T& record) {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
return true;
}
+ // pointer to the value at the front of the queue (for use in-place) or
+ // nullptr if empty.
+ T* frontPtr() {
+ auto const currentRead = readIndex_.load(std::memory_order_relaxed);
+ if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
+ // queue is empty
+ return nullptr;
+ }
+ return &records_[currentRead];
+ }
+
+ // queue must not be empty
+ void popFront() {
+ auto const currentRead = readIndex_.load(std::memory_order_relaxed);
+ assert(currentRead != writeIndex_.load(std::memory_order_acquire));
+
+ auto nextRecord = currentRead + 1;
+ if (nextRecord == size_) {
+ nextRecord = 0;
+ }
+ records_[currentRead].~T();
+ readIndex_.store(nextRecord, std::memory_order_release);
+ }
+
+ bool isEmpty() const {
+ return readIndex_.load(std::memory_order_consume) !=
+ writeIndex_.load(std::memory_order_consume);
+ }
+
bool isFull() const {
auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1;
if (nextRecord == size_) {
one-consumer queue with very low synchronization overhead.
The queue must be created with a fixed maximum size (and allocates
-that many cells of sizeof(T)), and it provides just three simple
-operations: read, write, and isFull. All of these operations are
-wait-free. The read and write operations must only be called by the
-reader and writer thread, respectively, but isFull is accessible to
-both.
+that many cells of sizeof(T)), and it provides just a few simple
+operations:
-Both read and write may fail if the queue is full, so in many
-situations it is important to choose the queue size such that the
-queue filling up for long is unlikely.
+ * `read`: Attempt to read the value at the front to the queue into a variable,
+ returns `false` iff queue was empty.
+ * `write`: Emplace a value at the end of the queue, returns `false` iff the
+ queue was full.
+ * `frontPtr`: Retrieve a pointer to the item at the front of the queue, or
+ `nullptr` if it is empty.
+ * `popFront`: Remove the item from the front of the queue (queue must not be
+ empty).
+ * `isEmpty`: Check if the queue is empty.
+ * `isFull`: Check if the queue is full.
+
+All of these operations are wait-free. The read operations (including
+`frontPtr` and `popFront`) and write operations must only be called by the
+reader and writer thread, respectively. `isFull` and `isEmpty` may be called by
+either thread, but the return values from `read`, `write`, or `frontPtr` are
+sufficient for most cases.
+
+`write` may fail if the queue is full, and `read` may fail if the queue is
+empty, so in many situations it is important to choose the queue size such that
+the queue filling or staying empty for long is unlikely.
### Example
***
std::thread reader([&queue] {
for (;;) {
folly::fbstring str;
- while (!queue.read(str)) continue;
+ while (!queue.read(str)) {
+ //spin until we get a value
+ continue;
+ }
sink(str);
}
// producer thread:
for (;;) {
folly::fbstring str = source();
- while (!queue.write(str)) continue;
+ while (!queue.write(str)) {
+ //spin until the queue has room
+ continue;
+ }
}
```
+
+Alternatively, the consumer may be written as follows to use the 'front' value
+in place, thus avoiding moves or copies:
+
+``` Cpp
+ std::thread reader([&queue] {
+ for (;;) {
+ folly::fbstring* pval;
+ do {
+ pval = queue.frontPtr();
+ } while (!pval); // spin until we get a value;
+
+ sink(*pval);
+ queue.popFront();
+ }
+ });
+```
};
template<> struct TestTraits<std::string> {
- int limit() const { return 1 << 21; }
+ int limit() const { return 1 << 22; }
std::string generate() const { return std::string(12, ' '); }
};
-template<class QueueType, size_t Size>
+template<class QueueType, size_t Size, bool Pop = false>
struct PerfTest {
typedef typename QueueType::value_type T;
}
void consumer() {
- while (!done_) {
- T data;
- queue_.read(data);
+ /*static*/ if (Pop) {
+ while (!done_) {
+ if (queue_.frontPtr()) {
+ queue_.popFront();
+ }
+ }
+ } else {
+ while (!done_) {
+ T data;
+ queue_.read(data);
+ }
}
}
(*t)();
}
-template<class T> void perfTestType(const char* type) {
+template<class T, bool Pop = false>
+void perfTestType(const char* type) {
const size_t size = 0xfffe;
LOG(INFO) << "Type: " << type;
- doTest<PerfTest<folly::ProducerConsumerQueue<T>,size> >(
+ doTest<PerfTest<folly::ProducerConsumerQueue<T>,size,Pop> >(
"ProducerConsumerQueue");
}
-template<class QueueType, size_t Size>
+template<class QueueType, size_t Size, bool Pop>
struct CorrectnessTest {
typedef typename QueueType::value_type T;
}
void consumer() {
- for (auto& expect : testData_) {
+ if (Pop) {
+ consumerPop();
+ } else {
+ consumerRead();
+ }
+ }
+
+ void consumerPop() {
+ for (auto expect : testData_) {
+ again:
+ T* data;
+ if (!(data = queue_.frontPtr())) {
+ if (done_) {
+ // Try one more read; unless there's a bug in the queue class
+ // there should still be more data sitting in the queue even
+ // though the producer thread exited.
+ if (!(data = queue_.frontPtr())) {
+ EXPECT_TRUE(0 && "Finished too early ...");
+ return;
+ }
+ } else {
+ goto again;
+ }
+ } else {
+ queue_.popFront();
+ }
+
+ EXPECT_EQ(*data, expect);
+ }
+ }
+
+ void consumerRead() {
+ for (auto expect : testData_) {
again:
T data;
if (!queue_.read(data)) {
std::atomic<bool> done_;
};
-template<class T> void correctnessTestType(const std::string& type) {
+template<class T, bool Pop = false>
+void correctnessTestType(const std::string& type) {
LOG(INFO) << "Type: " << type;
- doTest<CorrectnessTest<folly::ProducerConsumerQueue<T>,0xfffe> >(
+ doTest<CorrectnessTest<folly::ProducerConsumerQueue<T>,0xfffe,Pop> >(
"ProducerConsumerQueue");
}
//////////////////////////////////////////////////////////////////////
TEST(PCQ, QueueCorrectness) {
+ correctnessTestType<std::string,true>("string (front+pop)");
correctnessTestType<std::string>("string");
correctnessTestType<int>("int");
correctnessTestType<unsigned long long>("unsigned long long");
}
TEST(PCQ, PerfTest) {
+ perfTestType<std::string,true>("string (front+pop)");
perfTestType<std::string>("string");
perfTestType<int>("int");
perfTestType<unsigned long long>("unsigned long long");