Adding support for in-place use of ProducerConsumerQueue.
authorTom Jackson <tjackson@fb.com>
Mon, 4 Jun 2012 23:57:49 +0000 (16:57 -0700)
committerTudor Bosman <tudorb@fb.com>
Tue, 5 Jun 2012 02:21:47 +0000 (19:21 -0700)
Summary: As it is, ProducerConsumerQueue requires that values are moved or copied on the way out of the queue. It would be nice if it was possible to get a reference to the front of the queue, use it in place, then destruct it.

Test Plan: Unit tests

FB internal diff: D484538

folly/ProducerConsumerQueue.h
folly/docs/ProducerConsumerQueue.md
folly/test/ProducerConsumerQueueTest.cpp

index 6a545dea9269bf0853ad1e6ca3504ea499c51c83..de0e3cc0a39892cde2903ad60f560de33b8809d5 100644 (file)
@@ -86,6 +86,7 @@ struct ProducerConsumerQueue : private boost::noncopyable {
     return false;
   }
 
     return false;
   }
 
+  // move (or copy) the value at the front of the queue to given variable
   bool read(T& record) {
     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
   bool read(T& record) {
     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
@@ -103,6 +104,35 @@ struct ProducerConsumerQueue : private boost::noncopyable {
     return true;
   }
 
     return true;
   }
 
+  // pointer to the value at the front of the queue (for use in-place) or
+  // nullptr if empty.
+  T* frontPtr() {
+    auto const currentRead = readIndex_.load(std::memory_order_relaxed);
+    if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
+      // queue is empty
+      return nullptr;
+    }
+    return &records_[currentRead];
+  }
+
+  // queue must not be empty
+  void popFront() {
+    auto const currentRead = readIndex_.load(std::memory_order_relaxed);
+    assert(currentRead != writeIndex_.load(std::memory_order_acquire));
+
+    auto nextRecord = currentRead + 1;
+    if (nextRecord == size_) {
+      nextRecord = 0;
+    }
+    records_[currentRead].~T();
+    readIndex_.store(nextRecord, std::memory_order_release);
+  }
+
+  bool isEmpty() const {
+   return readIndex_.load(std::memory_order_consume) !=
+         writeIndex_.load(std::memory_order_consume);
+  }
+
   bool isFull() const {
     auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1;
     if (nextRecord == size_) {
   bool isFull() const {
     auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1;
     if (nextRecord == size_) {
index 6dc0344b204ff10941136c61b1cdd7df8cad255d..40ae8cab8996527bb8fe26ef56a8ec551e2c1b79 100644 (file)
@@ -5,15 +5,29 @@ The `folly::ProducerConsumerQueue` class is a one-producer
 one-consumer queue with very low synchronization overhead.
 
 The queue must be created with a fixed maximum size (and allocates
 one-consumer queue with very low synchronization overhead.
 
 The queue must be created with a fixed maximum size (and allocates
-that many cells of sizeof(T)), and it provides just three simple
-operations: read, write, and isFull.  All of these operations are
-wait-free.  The read and write operations must only be called by the
-reader and writer thread, respectively, but isFull is accessible to
-both.
+that many cells of sizeof(T)), and it provides just a few simple
+operations:
 
 
-Both read and write may fail if the queue is full, so in many
-situations it is important to choose the queue size such that the
-queue filling up for long is unlikely.
+ * `read`: Attempt to read the value at the front to the queue into a variable,
+           returns `false` iff queue was empty.
+ * `write`: Emplace a value at the end of the queue, returns `false` iff the
+            queue was full.
+ * `frontPtr`: Retrieve a pointer to the item at the front of the queue, or
+               `nullptr` if it is empty.
+ * `popFront`: Remove the item from the front of the queue (queue must not be
+               empty).
+ * `isEmpty`: Check if the queue is empty.
+ * `isFull`: Check if the queue is full.
+
+All of these operations are wait-free.  The read operations (including
+`frontPtr` and `popFront`) and write operations must only be called by the
+reader and writer thread, respectively. `isFull` and `isEmpty` may be called by
+either thread, but the return values from `read`, `write`, or `frontPtr` are
+sufficient for most cases.
+
+`write` may fail if the queue is full, and `read` may fail if the queue is
+empty, so in many situations it is important to choose the queue size such that
+the queue filling  or staying empty for long is unlikely.
 
 ### Example
 ***
 
 ### Example
 ***
@@ -26,7 +40,10 @@ A toy example that doesn't really do anything useful:
     std::thread reader([&queue] {
       for (;;) {
         folly::fbstring str;
     std::thread reader([&queue] {
       for (;;) {
         folly::fbstring str;
-        while (!queue.read(str)) continue;
+        while (!queue.read(str)) {
+          //spin until we get a value
+          continue;
+        }
 
         sink(str);
       }
 
         sink(str);
       }
@@ -35,6 +52,26 @@ A toy example that doesn't really do anything useful:
     // producer thread:
     for (;;) {
       folly::fbstring str = source();
     // producer thread:
     for (;;) {
       folly::fbstring str = source();
-      while (!queue.write(str)) continue;
+      while (!queue.write(str)) {
+        //spin until the queue has room
+        continue;
+      }
     }
 ```
     }
 ```
+
+Alternatively, the consumer may be written as follows to use the 'front' value
+in place, thus avoiding moves or copies:
+
+``` Cpp
+    std::thread reader([&queue] {
+      for (;;) {
+        folly::fbstring* pval;
+        do {
+          pval = queue.frontPtr();
+        } while (!pval); // spin until we get a value;
+
+        sink(*pval);
+        queue.popFront();
+      }
+    });
+```
index d322f0327f4902a15f490b32a09eb915d803c519..bf28dcb41c56115386d041f13e370581aa8a4d3c 100644 (file)
@@ -34,11 +34,11 @@ template<class T> struct TestTraits {
 };
 
 template<> struct TestTraits<std::string> {
 };
 
 template<> struct TestTraits<std::string> {
-  int limit() const { return 1 << 21; }
+  int limit() const { return 1 << 22; }
   std::string generate() const { return std::string(12, ' '); }
 };
 
   std::string generate() const { return std::string(12, ' '); }
 };
 
-template<class QueueType, size_t Size>
+template<class QueueType, size_t Size, bool Pop = false>
 struct PerfTest {
   typedef typename QueueType::value_type T;
 
 struct PerfTest {
   typedef typename QueueType::value_type T;
 
@@ -68,9 +68,17 @@ struct PerfTest {
   }
 
   void consumer() {
   }
 
   void consumer() {
-    while (!done_) {
-      T data;
-      queue_.read(data);
+    /*static*/ if (Pop) {
+      while (!done_) {
+        if (queue_.frontPtr()) {
+          queue_.popFront();
+        }
+      }
+    } else {
+      while (!done_) {
+        T data;
+        queue_.read(data);
+      }
     }
   }
 
     }
   }
 
@@ -85,15 +93,16 @@ template<class TestType> void doTest(const char* name) {
   (*t)();
 }
 
   (*t)();
 }
 
-template<class T> void perfTestType(const char* type) {
+template<class T, bool Pop = false>
+void perfTestType(const char* type) {
   const size_t size = 0xfffe;
 
   LOG(INFO) << "Type: " << type;
   const size_t size = 0xfffe;
 
   LOG(INFO) << "Type: " << type;
-  doTest<PerfTest<folly::ProducerConsumerQueue<T>,size> >(
+  doTest<PerfTest<folly::ProducerConsumerQueue<T>,size,Pop> >(
     "ProducerConsumerQueue");
 }
 
     "ProducerConsumerQueue");
 }
 
-template<class QueueType, size_t Size>
+template<class QueueType, size_t Size, bool Pop>
 struct CorrectnessTest {
   typedef typename QueueType::value_type T;
 
 struct CorrectnessTest {
   typedef typename QueueType::value_type T;
 
@@ -125,7 +134,39 @@ struct CorrectnessTest {
   }
 
   void consumer() {
   }
 
   void consumer() {
-    for (auto& expect : testData_) {
+    if (Pop) {
+      consumerPop();
+    } else {
+      consumerRead();
+    }
+  }
+
+  void consumerPop() {
+    for (auto expect : testData_) {
+    again:
+      T* data;
+      if (!(data = queue_.frontPtr())) {
+        if (done_) {
+          // Try one more read; unless there's a bug in the queue class
+          // there should still be more data sitting in the queue even
+          // though the producer thread exited.
+          if (!(data = queue_.frontPtr())) {
+            EXPECT_TRUE(0 && "Finished too early ...");
+            return;
+          }
+        } else {
+          goto again;
+        }
+      } else {
+        queue_.popFront();
+      }
+
+      EXPECT_EQ(*data, expect);
+    }
+  }
+
+  void consumerRead() {
+    for (auto expect : testData_) {
     again:
       T data;
       if (!queue_.read(data)) {
     again:
       T data;
       if (!queue_.read(data)) {
@@ -151,9 +192,10 @@ struct CorrectnessTest {
   std::atomic<bool> done_;
 };
 
   std::atomic<bool> done_;
 };
 
-template<class T> void correctnessTestType(const std::string& type) {
+template<class T, bool Pop = false>
+void correctnessTestType(const std::string& type) {
   LOG(INFO) << "Type: " << type;
   LOG(INFO) << "Type: " << type;
-  doTest<CorrectnessTest<folly::ProducerConsumerQueue<T>,0xfffe> >(
+  doTest<CorrectnessTest<folly::ProducerConsumerQueue<T>,0xfffe,Pop> >(
     "ProducerConsumerQueue");
 }
 
     "ProducerConsumerQueue");
 }
 
@@ -171,12 +213,14 @@ int DtorChecker::numInstances = 0;
 //////////////////////////////////////////////////////////////////////
 
 TEST(PCQ, QueueCorrectness) {
 //////////////////////////////////////////////////////////////////////
 
 TEST(PCQ, QueueCorrectness) {
+  correctnessTestType<std::string,true>("string (front+pop)");
   correctnessTestType<std::string>("string");
   correctnessTestType<int>("int");
   correctnessTestType<unsigned long long>("unsigned long long");
 }
 
 TEST(PCQ, PerfTest) {
   correctnessTestType<std::string>("string");
   correctnessTestType<int>("int");
   correctnessTestType<unsigned long long>("unsigned long long");
 }
 
 TEST(PCQ, PerfTest) {
+  perfTestType<std::string,true>("string (front+pop)");
   perfTestType<std::string>("string");
   perfTestType<int>("int");
   perfTestType<unsigned long long>("unsigned long long");
   perfTestType<std::string>("string");
   perfTestType<int>("int");
   perfTestType<unsigned long long>("unsigned long long");