From: Tom Jackson Date: Mon, 4 Jun 2012 23:57:49 +0000 (-0700) Subject: Adding support for in-place use of ProducerConsumerQueue. X-Git-Tag: v0.22.0~1290 X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=commitdiff_plain;h=c51f93205176b9bb3792a1c8bedcf5228ce669ac Adding support for in-place use of ProducerConsumerQueue. Summary: As it is, ProducerConsumerQueue requires that values are moved or copied on the way out of the queue. It would be nice if it was possible to get a reference to the front of the queue, use it in place, then destruct it. Test Plan: Unit tests FB internal diff: D484538 --- diff --git a/folly/ProducerConsumerQueue.h b/folly/ProducerConsumerQueue.h index 6a545dea..de0e3cc0 100644 --- a/folly/ProducerConsumerQueue.h +++ b/folly/ProducerConsumerQueue.h @@ -86,6 +86,7 @@ struct ProducerConsumerQueue : private boost::noncopyable { return false; } + // move (or copy) the value at the front of the queue to given variable bool read(T& record) { auto const currentRead = readIndex_.load(std::memory_order_relaxed); if (currentRead == writeIndex_.load(std::memory_order_acquire)) { @@ -103,6 +104,35 @@ struct ProducerConsumerQueue : private boost::noncopyable { return true; } + // pointer to the value at the front of the queue (for use in-place) or + // nullptr if empty. + T* frontPtr() { + auto const currentRead = readIndex_.load(std::memory_order_relaxed); + if (currentRead == writeIndex_.load(std::memory_order_acquire)) { + // queue is empty + return nullptr; + } + return &records_[currentRead]; + } + + // queue must not be empty + void popFront() { + auto const currentRead = readIndex_.load(std::memory_order_relaxed); + assert(currentRead != writeIndex_.load(std::memory_order_acquire)); + + auto nextRecord = currentRead + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + records_[currentRead].~T(); + readIndex_.store(nextRecord, std::memory_order_release); + } + + bool isEmpty() const { + return readIndex_.load(std::memory_order_consume) != + writeIndex_.load(std::memory_order_consume); + } + bool isFull() const { auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1; if (nextRecord == size_) { diff --git a/folly/docs/ProducerConsumerQueue.md b/folly/docs/ProducerConsumerQueue.md index 6dc0344b..40ae8cab 100644 --- a/folly/docs/ProducerConsumerQueue.md +++ b/folly/docs/ProducerConsumerQueue.md @@ -5,15 +5,29 @@ The `folly::ProducerConsumerQueue` class is a one-producer one-consumer queue with very low synchronization overhead. The queue must be created with a fixed maximum size (and allocates -that many cells of sizeof(T)), and it provides just three simple -operations: read, write, and isFull. All of these operations are -wait-free. The read and write operations must only be called by the -reader and writer thread, respectively, but isFull is accessible to -both. +that many cells of sizeof(T)), and it provides just a few simple +operations: -Both read and write may fail if the queue is full, so in many -situations it is important to choose the queue size such that the -queue filling up for long is unlikely. + * `read`: Attempt to read the value at the front to the queue into a variable, + returns `false` iff queue was empty. + * `write`: Emplace a value at the end of the queue, returns `false` iff the + queue was full. + * `frontPtr`: Retrieve a pointer to the item at the front of the queue, or + `nullptr` if it is empty. + * `popFront`: Remove the item from the front of the queue (queue must not be + empty). + * `isEmpty`: Check if the queue is empty. + * `isFull`: Check if the queue is full. + +All of these operations are wait-free. The read operations (including +`frontPtr` and `popFront`) and write operations must only be called by the +reader and writer thread, respectively. `isFull` and `isEmpty` may be called by +either thread, but the return values from `read`, `write`, or `frontPtr` are +sufficient for most cases. + +`write` may fail if the queue is full, and `read` may fail if the queue is +empty, so in many situations it is important to choose the queue size such that +the queue filling or staying empty for long is unlikely. ### Example *** @@ -26,7 +40,10 @@ A toy example that doesn't really do anything useful: std::thread reader([&queue] { for (;;) { folly::fbstring str; - while (!queue.read(str)) continue; + while (!queue.read(str)) { + //spin until we get a value + continue; + } sink(str); } @@ -35,6 +52,26 @@ A toy example that doesn't really do anything useful: // producer thread: for (;;) { folly::fbstring str = source(); - while (!queue.write(str)) continue; + while (!queue.write(str)) { + //spin until the queue has room + continue; + } } ``` + +Alternatively, the consumer may be written as follows to use the 'front' value +in place, thus avoiding moves or copies: + +``` Cpp + std::thread reader([&queue] { + for (;;) { + folly::fbstring* pval; + do { + pval = queue.frontPtr(); + } while (!pval); // spin until we get a value; + + sink(*pval); + queue.popFront(); + } + }); +``` diff --git a/folly/test/ProducerConsumerQueueTest.cpp b/folly/test/ProducerConsumerQueueTest.cpp index d322f032..bf28dcb4 100644 --- a/folly/test/ProducerConsumerQueueTest.cpp +++ b/folly/test/ProducerConsumerQueueTest.cpp @@ -34,11 +34,11 @@ template struct TestTraits { }; template<> struct TestTraits { - int limit() const { return 1 << 21; } + int limit() const { return 1 << 22; } std::string generate() const { return std::string(12, ' '); } }; -template +template struct PerfTest { typedef typename QueueType::value_type T; @@ -68,9 +68,17 @@ struct PerfTest { } void consumer() { - while (!done_) { - T data; - queue_.read(data); + /*static*/ if (Pop) { + while (!done_) { + if (queue_.frontPtr()) { + queue_.popFront(); + } + } + } else { + while (!done_) { + T data; + queue_.read(data); + } } } @@ -85,15 +93,16 @@ template void doTest(const char* name) { (*t)(); } -template void perfTestType(const char* type) { +template +void perfTestType(const char* type) { const size_t size = 0xfffe; LOG(INFO) << "Type: " << type; - doTest,size> >( + doTest,size,Pop> >( "ProducerConsumerQueue"); } -template +template struct CorrectnessTest { typedef typename QueueType::value_type T; @@ -125,7 +134,39 @@ struct CorrectnessTest { } void consumer() { - for (auto& expect : testData_) { + if (Pop) { + consumerPop(); + } else { + consumerRead(); + } + } + + void consumerPop() { + for (auto expect : testData_) { + again: + T* data; + if (!(data = queue_.frontPtr())) { + if (done_) { + // Try one more read; unless there's a bug in the queue class + // there should still be more data sitting in the queue even + // though the producer thread exited. + if (!(data = queue_.frontPtr())) { + EXPECT_TRUE(0 && "Finished too early ..."); + return; + } + } else { + goto again; + } + } else { + queue_.popFront(); + } + + EXPECT_EQ(*data, expect); + } + } + + void consumerRead() { + for (auto expect : testData_) { again: T data; if (!queue_.read(data)) { @@ -151,9 +192,10 @@ struct CorrectnessTest { std::atomic done_; }; -template void correctnessTestType(const std::string& type) { +template +void correctnessTestType(const std::string& type) { LOG(INFO) << "Type: " << type; - doTest,0xfffe> >( + doTest,0xfffe,Pop> >( "ProducerConsumerQueue"); } @@ -171,12 +213,14 @@ int DtorChecker::numInstances = 0; ////////////////////////////////////////////////////////////////////// TEST(PCQ, QueueCorrectness) { + correctnessTestType("string (front+pop)"); correctnessTestType("string"); correctnessTestType("int"); correctnessTestType("unsigned long long"); } TEST(PCQ, PerfTest) { + perfTestType("string (front+pop)"); perfTestType("string"); perfTestType("int"); perfTestType("unsigned long long");