2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 // @author Bo Hu (bhu@fb.com)
18 // @author Jordan DeLong (delong.j@fb.com)
27 #include <type_traits>
30 #include <folly/concurrency/CacheLocality.h>
35 * ProducerConsumerQueue is a one producer and one consumer queue
39 struct ProducerConsumerQueue {
42 ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
43 ProducerConsumerQueue& operator = (const ProducerConsumerQueue&) = delete;
47 // Also, note that the number of usable slots in the queue at any
48 // given time is actually (size-1), so if you start with an empty queue,
49 // isFull() will return true after size-1 insertions.
50 explicit ProducerConsumerQueue(uint32_t size)
52 , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
58 throw std::bad_alloc();
62 ~ProducerConsumerQueue() {
63 // We need to destruct anything that may still exist in our queue.
64 // (No real synchronization needed at destructor time: only one
65 // thread can be doing this.)
66 if (!std::is_trivially_destructible<T>::value) {
67 size_t readIndex = readIndex_;
68 size_t endIndex = writeIndex_;
69 while (readIndex != endIndex) {
70 records_[readIndex].~T();
71 if (++readIndex == size_) {
80 template <class... Args>
81 bool write(Args&&... recordArgs) {
82 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
83 auto nextRecord = currentWrite + 1;
84 if (nextRecord == size_) {
87 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
88 new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
89 writeIndex_.store(nextRecord, std::memory_order_release);
97 // move (or copy) the value at the front of the queue to given variable
98 bool read(T& record) {
99 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
100 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
105 auto nextRecord = currentRead + 1;
106 if (nextRecord == size_) {
109 record = std::move(records_[currentRead]);
110 records_[currentRead].~T();
111 readIndex_.store(nextRecord, std::memory_order_release);
115 // pointer to the value at the front of the queue (for use in-place) or
118 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
119 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
123 return &records_[currentRead];
126 // queue must not be empty
128 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
129 assert(currentRead != writeIndex_.load(std::memory_order_acquire));
131 auto nextRecord = currentRead + 1;
132 if (nextRecord == size_) {
135 records_[currentRead].~T();
136 readIndex_.store(nextRecord, std::memory_order_release);
139 bool isEmpty() const {
140 return readIndex_.load(std::memory_order_acquire) ==
141 writeIndex_.load(std::memory_order_acquire);
144 bool isFull() const {
145 auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
146 if (nextRecord == size_) {
149 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
156 // * If called by consumer, then true size may be more (because producer may
157 // be adding items concurrently).
158 // * If called by producer, then true size may be less (because consumer may
159 // be removing items concurrently).
160 // * It is undefined to call this from any other thread.
161 size_t sizeGuess() const {
162 int ret = writeIndex_.load(std::memory_order_acquire) -
163 readIndex_.load(std::memory_order_acquire);
171 char pad0_[CacheLocality::kFalseSharingRange];
172 const uint32_t size_;
175 FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned int> readIndex_;
176 FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned int> writeIndex_;
178 char pad1_[CacheLocality::kFalseSharingRange - sizeof(writeIndex_)];