2 * Copyright 2012 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 // @author Bo Hu (bhu@fb.com)
18 // @author Jordan DeLong (delong.j@fb.com)
20 #ifndef PRODUCER_CONSUMER_QUEUE_H_
21 #define PRODUCER_CONSUMER_QUEUE_H_
27 #include <type_traits>
29 #include <boost/noncopyable.hpp>
34 * ProducerConsumerQueue is a one producer and one consumer queue
38 struct ProducerConsumerQueue : private boost::noncopyable {
42 explicit ProducerConsumerQueue(uint32_t size)
44 , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
50 throw std::bad_alloc();
54 ~ProducerConsumerQueue() {
55 // We need to destruct anything that may still exist in our queue.
56 // (No real synchronization needed at destructor time: only one
57 // thread can be doing this.)
58 if (!std::has_trivial_destructor<T>::value) {
59 int read = readIndex_;
60 int end = writeIndex_;
63 if (++read == size_) {
72 template<class ...Args>
73 bool write(Args&&... recordArgs) {
74 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
75 auto nextRecord = currentWrite + 1;
76 if (nextRecord == size_) {
79 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
80 new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
81 writeIndex_.store(nextRecord, std::memory_order_release);
89 bool read(T& record) {
90 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
91 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
96 auto nextRecord = currentRead + 1;
97 if (nextRecord == size_) {
100 record = std::move(records_[currentRead]);
101 records_[currentRead].~T();
102 readIndex_.store(nextRecord, std::memory_order_release);
106 bool isFull() const {
107 auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1;
108 if (nextRecord == size_) {
111 if (nextRecord != readIndex_.load(std::memory_order_consume)) {
119 const uint32_t size_;
122 std::atomic<int> readIndex_;
123 std::atomic<int> writeIndex_;