Fix error in ProducerQueue::isEmpty
[folly.git] / folly / ProducerConsumerQueue.h
1 /*
2  * Copyright 2012 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author Bo Hu (bhu@fb.com)
18 // @author Jordan DeLong (delong.j@fb.com)
19
20 #ifndef PRODUCER_CONSUMER_QUEUE_H_
21 #define PRODUCER_CONSUMER_QUEUE_H_
22
23 #include <atomic>
24 #include <cassert>
25 #include <cstdlib>
26 #include <stdexcept>
27 #include <type_traits>
28 #include <utility>
29 #include <boost/noncopyable.hpp>
30
31 namespace folly {
32
33 /*
34  * ProducerConsumerQueue is a one producer and one consumer queue
35  * without locks.
36  */
37 template<class T>
38 struct ProducerConsumerQueue : private boost::noncopyable {
39   typedef T value_type;
40
41   // size must be >= 2.
42   //
43   // Also, note that the number of usable slots in the queue at any
44   // given time is actually (size-1), so if you start with an empty queue,
45   // isFull() will return true after size-1 insertions.
46   explicit ProducerConsumerQueue(uint32_t size)
47     : size_(size)
48     , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
49     , readIndex_(0)
50     , writeIndex_(0)
51   {
52     assert(size >= 2);
53     if (!records_) {
54       throw std::bad_alloc();
55     }
56   }
57
58   ~ProducerConsumerQueue() {
59     // We need to destruct anything that may still exist in our queue.
60     // (No real synchronization needed at destructor time: only one
61     // thread can be doing this.)
62     if (!std::has_trivial_destructor<T>::value) {
63       int read = readIndex_;
64       int end = writeIndex_;
65       while (read != end) {
66         records_[read].~T();
67         if (++read == size_) {
68           read = 0;
69         }
70       }
71     }
72
73     std::free(records_);
74   }
75
76   template<class ...Args>
77   bool write(Args&&... recordArgs) {
78     auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
79     auto nextRecord = currentWrite + 1;
80     if (nextRecord == size_) {
81       nextRecord = 0;
82     }
83     if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
84       new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
85       writeIndex_.store(nextRecord, std::memory_order_release);
86       return true;
87     }
88
89     // queue is full
90     return false;
91   }
92
93   // move (or copy) the value at the front of the queue to given variable
94   bool read(T& record) {
95     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
96     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
97       // queue is empty
98       return false;
99     }
100
101     auto nextRecord = currentRead + 1;
102     if (nextRecord == size_) {
103       nextRecord = 0;
104     }
105     record = std::move(records_[currentRead]);
106     records_[currentRead].~T();
107     readIndex_.store(nextRecord, std::memory_order_release);
108     return true;
109   }
110
111   // pointer to the value at the front of the queue (for use in-place) or
112   // nullptr if empty.
113   T* frontPtr() {
114     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
115     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
116       // queue is empty
117       return nullptr;
118     }
119     return &records_[currentRead];
120   }
121
122   // queue must not be empty
123   void popFront() {
124     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
125     assert(currentRead != writeIndex_.load(std::memory_order_acquire));
126
127     auto nextRecord = currentRead + 1;
128     if (nextRecord == size_) {
129       nextRecord = 0;
130     }
131     records_[currentRead].~T();
132     readIndex_.store(nextRecord, std::memory_order_release);
133   }
134
135   bool isEmpty() const {
136    return readIndex_.load(std::memory_order_consume) ==
137          writeIndex_.load(std::memory_order_consume);
138   }
139
140   bool isFull() const {
141     auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1;
142     if (nextRecord == size_) {
143       nextRecord = 0;
144     }
145     if (nextRecord != readIndex_.load(std::memory_order_consume)) {
146       return false;
147     }
148     // queue is full
149     return true;
150   }
151
152 private:
153   const uint32_t size_;
154   T* const records_;
155
156   std::atomic<int> readIndex_;
157   std::atomic<int> writeIndex_;
158 };
159
160 }
161
162 #endif