X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2FProducerConsumerQueue.h;h=053c0e264b649075b2d148af5dc199b83ad95b22;hb=46b709bc7780303b5fefd4bbbaf67b9f268d4865;hp=de0e3cc0a39892cde2903ad60f560de33b8809d5;hpb=c51f93205176b9bb3792a1c8bedcf5228ce669ac;p=folly.git diff --git a/folly/ProducerConsumerQueue.h b/folly/ProducerConsumerQueue.h index de0e3cc0..053c0e26 100644 --- a/folly/ProducerConsumerQueue.h +++ b/folly/ProducerConsumerQueue.h @@ -1,5 +1,5 @@ /* - * Copyright 2012 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,16 +17,15 @@ // @author Bo Hu (bhu@fb.com) // @author Jordan DeLong (delong.j@fb.com) -#ifndef PRODUCER_CONSUMER_QUEUE_H_ -#define PRODUCER_CONSUMER_QUEUE_H_ +#pragma once #include #include #include +#include #include #include #include -#include namespace folly { @@ -35,10 +34,17 @@ namespace folly { * without locks. */ template -struct ProducerConsumerQueue : private boost::noncopyable { +struct ProducerConsumerQueue { typedef T value_type; - // size must be >= 2 + ProducerConsumerQueue(const ProducerConsumerQueue&) = delete; + ProducerConsumerQueue& operator = (const ProducerConsumerQueue&) = delete; + + // size must be >= 2. + // + // Also, note that the number of usable slots in the queue at any + // given time is actually (size-1), so if you start with an empty queue, + // isFull() will return true after size-1 insertions. explicit ProducerConsumerQueue(uint32_t size) : size_(size) , records_(static_cast(std::malloc(sizeof(T) * size))) @@ -55,9 +61,9 @@ struct ProducerConsumerQueue : private boost::noncopyable { // We need to destruct anything that may still exist in our queue. // (No real synchronization needed at destructor time: only one // thread can be doing this.) - if (!std::has_trivial_destructor::value) { - int read = readIndex_; - int end = writeIndex_; + if (!std::is_trivially_destructible::value) { + size_t read = readIndex_; + size_t end = writeIndex_; while (read != end) { records_[read].~T(); if (++read == size_) { @@ -129,30 +135,42 @@ struct ProducerConsumerQueue : private boost::noncopyable { } bool isEmpty() const { - return readIndex_.load(std::memory_order_consume) != - writeIndex_.load(std::memory_order_consume); + return readIndex_.load(std::memory_order_acquire) == + writeIndex_.load(std::memory_order_acquire); } bool isFull() const { - auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1; + auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1; if (nextRecord == size_) { nextRecord = 0; } - if (nextRecord != readIndex_.load(std::memory_order_consume)) { + if (nextRecord != readIndex_.load(std::memory_order_acquire)) { return false; } // queue is full return true; } + // * If called by consumer, then true size may be more (because producer may + // be adding items concurrently). + // * If called by producer, then true size may be less (because consumer may + // be removing items concurrently). + // * It is undefined to call this from any other thread. + size_t sizeGuess() const { + int ret = writeIndex_.load(std::memory_order_acquire) - + readIndex_.load(std::memory_order_acquire); + if (ret < 0) { + ret += size_; + } + return ret; + } + private: const uint32_t size_; T* const records_; - std::atomic readIndex_; - std::atomic writeIndex_; + std::atomic readIndex_; + std::atomic writeIndex_; }; } - -#endif