X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2FProducerConsumerQueue.h;h=b020da8445ae6f80a28ef671354b40d62da65cea;hp=2a6ed376fc37a2eb5cd9ce1ab4799d067e840c95;hb=2a4ad2c8ddc1eb1be8b7ffb607de954ccc2b666e;hpb=cf0e592335227371fee3527e8a2c47fc7d3ca0ab diff --git a/folly/ProducerConsumerQueue.h b/folly/ProducerConsumerQueue.h index 2a6ed376..b020da84 100644 --- a/folly/ProducerConsumerQueue.h +++ b/folly/ProducerConsumerQueue.h @@ -1,5 +1,5 @@ /* - * Copyright 2013 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,17 +17,17 @@ // @author Bo Hu (bhu@fb.com) // @author Jordan DeLong (delong.j@fb.com) -#ifndef PRODUCER_CONSUMER_QUEUE_H_ -#define PRODUCER_CONSUMER_QUEUE_H_ +#pragma once #include #include #include +#include #include #include #include -#include -#include + +#include namespace folly { @@ -35,10 +35,13 @@ namespace folly { * ProducerConsumerQueue is a one producer and one consumer queue * without locks. */ -template -struct ProducerConsumerQueue : private boost::noncopyable { +template +struct ProducerConsumerQueue { typedef T value_type; + ProducerConsumerQueue(const ProducerConsumerQueue&) = delete; + ProducerConsumerQueue& operator = (const ProducerConsumerQueue&) = delete; + // size must be >= 2. // // Also, note that the number of usable slots in the queue at any @@ -60,13 +63,13 @@ struct ProducerConsumerQueue : private boost::noncopyable { // We need to destruct anything that may still exist in our queue. // (No real synchronization needed at destructor time: only one // thread can be doing this.) - if (!boost::has_trivial_destructor::value) { - int read = readIndex_; - int end = writeIndex_; - while (read != end) { - records_[read].~T(); - if (++read == size_) { - read = 0; + if (!std::is_trivially_destructible::value) { + size_t readIndex = readIndex_; + size_t endIndex = writeIndex_; + while (readIndex != endIndex) { + records_[readIndex].~T(); + if (++readIndex == size_) { + readIndex = 0; } } } @@ -74,7 +77,7 @@ struct ProducerConsumerQueue : private boost::noncopyable { std::free(records_); } - template + template bool write(Args&&... recordArgs) { auto const currentWrite = writeIndex_.load(std::memory_order_relaxed); auto nextRecord = currentWrite + 1; @@ -134,16 +137,16 @@ struct ProducerConsumerQueue : private boost::noncopyable { } bool isEmpty() const { - return readIndex_.load(std::memory_order_consume) == - writeIndex_.load(std::memory_order_consume); + return readIndex_.load(std::memory_order_acquire) == + writeIndex_.load(std::memory_order_acquire); } bool isFull() const { - auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1; + auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1; if (nextRecord == size_) { nextRecord = 0; } - if (nextRecord != readIndex_.load(std::memory_order_consume)) { + if (nextRecord != readIndex_.load(std::memory_order_acquire)) { return false; } // queue is full @@ -156,22 +159,30 @@ struct ProducerConsumerQueue : private boost::noncopyable { // be removing items concurrently). // * It is undefined to call this from any other thread. size_t sizeGuess() const { - int ret = writeIndex_.load(std::memory_order_consume) - - readIndex_.load(std::memory_order_consume); + int ret = writeIndex_.load(std::memory_order_acquire) - + readIndex_.load(std::memory_order_acquire); if (ret < 0) { ret += size_; } return ret; } -private: + // maximum number of items in the queue. + size_t capacity() const { + return size_ - 1; + } + + private: + char pad0_[hardware_destructive_interference_size]; const uint32_t size_; T* const records_; - std::atomic readIndex_; - std::atomic writeIndex_; -}; + alignas(hardware_destructive_interference_size) + std::atomic readIndex_; + alignas(hardware_destructive_interference_size) + std::atomic writeIndex_; -} + char pad1_[hardware_destructive_interference_size - sizeof(writeIndex_)]; +}; -#endif +} // namespace folly