/*
- * Copyright 2012 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
// @author Bo Hu (bhu@fb.com)
// @author Jordan DeLong (delong.j@fb.com)
-#ifndef PRODUCER_CONSUMER_QUEUE_H_
-#define PRODUCER_CONSUMER_QUEUE_H_
+#pragma once
#include <atomic>
#include <cassert>
#include <cstdlib>
+#include <memory>
#include <stdexcept>
#include <type_traits>
#include <utility>
-#include <boost/noncopyable.hpp>
+
+#include <folly/concurrency/CacheLocality.h>
namespace folly {
* ProducerConsumerQueue is a one producer and one consumer queue
* without locks.
*/
-template<class T>
-struct ProducerConsumerQueue : private boost::noncopyable {
+template <class T>
+struct ProducerConsumerQueue {
typedef T value_type;
- // size must be >= 2
+ ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
+ ProducerConsumerQueue& operator = (const ProducerConsumerQueue&) = delete;
+
+ // size must be >= 2.
+ //
+ // Also, note that the number of usable slots in the queue at any
+ // given time is actually (size-1), so if you start with an empty queue,
+ // isFull() will return true after size-1 insertions.
explicit ProducerConsumerQueue(uint32_t size)
: size_(size)
, records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
// We need to destruct anything that may still exist in our queue.
// (No real synchronization needed at destructor time: only one
// thread can be doing this.)
- if (!std::has_trivial_destructor<T>::value) {
- int read = readIndex_;
- int end = writeIndex_;
- while (read != end) {
- records_[read].~T();
- if (++read == size_) {
- read = 0;
+ if (!std::is_trivially_destructible<T>::value) {
+ size_t readIndex = readIndex_;
+ size_t endIndex = writeIndex_;
+ while (readIndex != endIndex) {
+ records_[readIndex].~T();
+ if (++readIndex == size_) {
+ readIndex = 0;
}
}
}
std::free(records_);
}
- template<class ...Args>
+ template <class... Args>
bool write(Args&&... recordArgs) {
auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
auto nextRecord = currentWrite + 1;
return false;
}
+ // move (or copy) the value at the front of the queue to given variable
bool read(T& record) {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
return true;
}
+ // pointer to the value at the front of the queue (for use in-place) or
+ // nullptr if empty.
+ T* frontPtr() {
+ auto const currentRead = readIndex_.load(std::memory_order_relaxed);
+ if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
+ // queue is empty
+ return nullptr;
+ }
+ return &records_[currentRead];
+ }
+
+ // queue must not be empty
+ void popFront() {
+ auto const currentRead = readIndex_.load(std::memory_order_relaxed);
+ assert(currentRead != writeIndex_.load(std::memory_order_acquire));
+
+ auto nextRecord = currentRead + 1;
+ if (nextRecord == size_) {
+ nextRecord = 0;
+ }
+ records_[currentRead].~T();
+ readIndex_.store(nextRecord, std::memory_order_release);
+ }
+
+ bool isEmpty() const {
+ return readIndex_.load(std::memory_order_acquire) ==
+ writeIndex_.load(std::memory_order_acquire);
+ }
+
bool isFull() const {
- auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1;
+ auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
- if (nextRecord != readIndex_.load(std::memory_order_consume)) {
+ if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
return false;
}
// queue is full
return true;
}
-private:
+ // * If called by consumer, then true size may be more (because producer may
+ // be adding items concurrently).
+ // * If called by producer, then true size may be less (because consumer may
+ // be removing items concurrently).
+ // * It is undefined to call this from any other thread.
+ size_t sizeGuess() const {
+ int ret = writeIndex_.load(std::memory_order_acquire) -
+ readIndex_.load(std::memory_order_acquire);
+ if (ret < 0) {
+ ret += size_;
+ }
+ return ret;
+ }
+
+ // maximum number of items in the queue.
+ size_t capacity() const {
+ return size_ - 1;
+ }
+
+ private:
+ char pad0_[CacheLocality::kFalseSharingRange];
const uint32_t size_;
T* const records_;
- std::atomic<int> readIndex_;
- std::atomic<int> writeIndex_;
-};
+ FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned int> readIndex_;
+ FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned int> writeIndex_;
-}
+ char pad1_[CacheLocality::kFalseSharingRange - sizeof(writeIndex_)];
+};
-#endif
+} // namespace folly