Add MSVC support for FOLLY_FINAL and FOLLY_OVERRIDE
[folly.git] / folly / ProducerConsumerQueue.h
index 6a545dea9269bf0853ad1e6ca3504ea499c51c83..7d2c05bf43c8beb930e7285680c61c76aabef517 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2012 Facebook, Inc.
+ * Copyright 2015 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
 #include <type_traits>
 #include <utility>
 #include <boost/noncopyable.hpp>
+#include <boost/type_traits.hpp>
 
 namespace folly {
 
@@ -38,7 +39,11 @@ template<class T>
 struct ProducerConsumerQueue : private boost::noncopyable {
   typedef T value_type;
 
-  // size must be >= 2
+  // size must be >= 2.
+  //
+  // Also, note that the number of usable slots in the queue at any
+  // given time is actually (size-1), so if you start with an empty queue,
+  // isFull() will return true after size-1 insertions.
   explicit ProducerConsumerQueue(uint32_t size)
     : size_(size)
     , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
@@ -55,9 +60,9 @@ struct ProducerConsumerQueue : private boost::noncopyable {
     // We need to destruct anything that may still exist in our queue.
     // (No real synchronization needed at destructor time: only one
     // thread can be doing this.)
-    if (!std::has_trivial_destructor<T>::value) {
-      int read = readIndex_;
-      int end = writeIndex_;
+    if (!boost::has_trivial_destructor<T>::value) {
+      size_t read = readIndex_;
+      size_t end = writeIndex_;
       while (read != end) {
         records_[read].~T();
         if (++read == size_) {
@@ -86,6 +91,7 @@ struct ProducerConsumerQueue : private boost::noncopyable {
     return false;
   }
 
+  // move (or copy) the value at the front of the queue to given variable
   bool read(T& record) {
     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
@@ -103,6 +109,35 @@ struct ProducerConsumerQueue : private boost::noncopyable {
     return true;
   }
 
+  // pointer to the value at the front of the queue (for use in-place) or
+  // nullptr if empty.
+  T* frontPtr() {
+    auto const currentRead = readIndex_.load(std::memory_order_relaxed);
+    if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
+      // queue is empty
+      return nullptr;
+    }
+    return &records_[currentRead];
+  }
+
+  // queue must not be empty
+  void popFront() {
+    auto const currentRead = readIndex_.load(std::memory_order_relaxed);
+    assert(currentRead != writeIndex_.load(std::memory_order_acquire));
+
+    auto nextRecord = currentRead + 1;
+    if (nextRecord == size_) {
+      nextRecord = 0;
+    }
+    records_[currentRead].~T();
+    readIndex_.store(nextRecord, std::memory_order_release);
+  }
+
+  bool isEmpty() const {
+   return readIndex_.load(std::memory_order_consume) ==
+         writeIndex_.load(std::memory_order_consume);
+  }
+
   bool isFull() const {
     auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1;
     if (nextRecord == size_) {
@@ -115,12 +150,26 @@ struct ProducerConsumerQueue : private boost::noncopyable {
     return true;
   }
 
+  // * If called by consumer, then true size may be more (because producer may
+  //   be adding items concurrently).
+  // * If called by producer, then true size may be less (because consumer may
+  //   be removing items concurrently).
+  // * It is undefined to call this from any other thread.
+  size_t sizeGuess() const {
+    int ret = writeIndex_.load(std::memory_order_consume) -
+              readIndex_.load(std::memory_order_consume);
+    if (ret < 0) {
+      ret += size_;
+    }
+    return ret;
+  }
+
 private:
   const uint32_t size_;
   T* const records_;
 
-  std::atomic<int> readIndex_;
-  std::atomic<int> writeIndex_;
+  std::atomic<unsigned int> readIndex_;
+  std::atomic<unsigned int> writeIndex_;
 };
 
 }