2 * Copyright 2012 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 // @author Bo Hu (bhu@fb.com)
18 // @author Jordan DeLong (delong.j@fb.com)
20 #ifndef PRODUCER_CONSUMER_QUEUE_H_
21 #define PRODUCER_CONSUMER_QUEUE_H_
27 #include <type_traits>
29 #include <boost/noncopyable.hpp>
34 * ProducerConsumerQueue is a one producer and one consumer queue
38 struct ProducerConsumerQueue : private boost::noncopyable {
42 explicit ProducerConsumerQueue(uint32_t size)
44 , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
50 throw std::bad_alloc();
54 ~ProducerConsumerQueue() {
55 // We need to destruct anything that may still exist in our queue.
56 // (No real synchronization needed at destructor time: only one
57 // thread can be doing this.)
58 if (!std::has_trivial_destructor<T>::value) {
59 int read = readIndex_;
60 int end = writeIndex_;
63 if (++read == size_) {
72 template<class ...Args>
73 bool write(Args&&... recordArgs) {
74 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
75 auto nextRecord = currentWrite + 1;
76 if (nextRecord == size_) {
79 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
80 new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
81 writeIndex_.store(nextRecord, std::memory_order_release);
89 // move (or copy) the value at the front of the queue to given variable
90 bool read(T& record) {
91 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
92 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
97 auto nextRecord = currentRead + 1;
98 if (nextRecord == size_) {
101 record = std::move(records_[currentRead]);
102 records_[currentRead].~T();
103 readIndex_.store(nextRecord, std::memory_order_release);
107 // pointer to the value at the front of the queue (for use in-place) or
110 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
111 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
115 return &records_[currentRead];
118 // queue must not be empty
120 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
121 assert(currentRead != writeIndex_.load(std::memory_order_acquire));
123 auto nextRecord = currentRead + 1;
124 if (nextRecord == size_) {
127 records_[currentRead].~T();
128 readIndex_.store(nextRecord, std::memory_order_release);
131 bool isEmpty() const {
132 return readIndex_.load(std::memory_order_consume) !=
133 writeIndex_.load(std::memory_order_consume);
136 bool isFull() const {
137 auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1;
138 if (nextRecord == size_) {
141 if (nextRecord != readIndex_.load(std::memory_order_consume)) {
149 const uint32_t size_;
152 std::atomic<int> readIndex_;
153 std::atomic<int> writeIndex_;