Statically allocate futex array
[folly.git] / folly / ProducerConsumerQueue.h
index de0e3cc0a39892cde2903ad60f560de33b8809d5..16de57d35486a8207fcb1def59c48d59a1360fd3 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2012 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 // @author Bo Hu (bhu@fb.com)
 // @author Jordan DeLong (delong.j@fb.com)
 
-#ifndef PRODUCER_CONSUMER_QUEUE_H_
-#define PRODUCER_CONSUMER_QUEUE_H_
+#pragma once
 
 #include <atomic>
 #include <cassert>
 #include <cstdlib>
+#include <memory>
 #include <stdexcept>
 #include <type_traits>
 #include <utility>
-#include <boost/noncopyable.hpp>
+
+#include <folly/concurrency/CacheLocality.h>
 
 namespace folly {
 
@@ -34,11 +35,18 @@ namespace folly {
  * ProducerConsumerQueue is a one producer and one consumer queue
  * without locks.
  */
-template<class T>
-struct ProducerConsumerQueue : private boost::noncopyable {
+template <class T>
+struct ProducerConsumerQueue {
   typedef T value_type;
 
-  // size must be >= 2
+  ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
+  ProducerConsumerQueue& operator = (const ProducerConsumerQueue&) = delete;
+
+  // size must be >= 2.
+  //
+  // Also, note that the number of usable slots in the queue at any
+  // given time is actually (size-1), so if you start with an empty queue,
+  // isFull() will return true after size-1 insertions.
   explicit ProducerConsumerQueue(uint32_t size)
     : size_(size)
     , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
@@ -55,13 +63,13 @@ struct ProducerConsumerQueue : private boost::noncopyable {
     // We need to destruct anything that may still exist in our queue.
     // (No real synchronization needed at destructor time: only one
     // thread can be doing this.)
-    if (!std::has_trivial_destructor<T>::value) {
-      int read = readIndex_;
-      int end = writeIndex_;
-      while (read != end) {
-        records_[read].~T();
-        if (++read == size_) {
-          read = 0;
+    if (!std::is_trivially_destructible<T>::value) {
+      size_t readIndex = readIndex_;
+      size_t endIndex = writeIndex_;
+      while (readIndex != endIndex) {
+        records_[readIndex].~T();
+        if (++readIndex == size_) {
+          readIndex = 0;
         }
       }
     }
@@ -69,7 +77,7 @@ struct ProducerConsumerQueue : private boost::noncopyable {
     std::free(records_);
   }
 
-  template<class ...Args>
+  template <class... Args>
   bool write(Args&&... recordArgs) {
     auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
     auto nextRecord = currentWrite + 1;
@@ -129,30 +137,50 @@ struct ProducerConsumerQueue : private boost::noncopyable {
   }
 
   bool isEmpty() const {
-   return readIndex_.load(std::memory_order_consume) !=
-         writeIndex_.load(std::memory_order_consume);
+    return readIndex_.load(std::memory_order_acquire) ==
+        writeIndex_.load(std::memory_order_acquire);
   }
 
   bool isFull() const {
-    auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1;
+    auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
     if (nextRecord == size_) {
       nextRecord = 0;
     }
-    if (nextRecord != readIndex_.load(std::memory_order_consume)) {
+    if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
       return false;
     }
     // queue is full
     return true;
   }
 
-private:
+  // * If called by consumer, then true size may be more (because producer may
+  //   be adding items concurrently).
+  // * If called by producer, then true size may be less (because consumer may
+  //   be removing items concurrently).
+  // * It is undefined to call this from any other thread.
+  size_t sizeGuess() const {
+    int ret = writeIndex_.load(std::memory_order_acquire) -
+        readIndex_.load(std::memory_order_acquire);
+    if (ret < 0) {
+      ret += size_;
+    }
+    return ret;
+  }
+
+  // maximum number of items in the queue.
+  size_t capacity() const {
+    return size_ - 1;
+  }
+
+ private:
+  char pad0_[CacheLocality::kFalseSharingRange];
   const uint32_t size_;
   T* const records_;
 
-  std::atomic<int> readIndex_;
-  std::atomic<int> writeIndex_;
-};
+  FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned int> readIndex_;
+  FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned int> writeIndex_;
 
-}
+  char pad1_[CacheLocality::kFalseSharingRange - sizeof(writeIndex_)];
+};
 
-#endif
+} // namespace folly