All template params for PriorityMPMCQueue
[folly.git] / folly / ProducerConsumerQueue.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author Bo Hu (bhu@fb.com)
18 // @author Jordan DeLong (delong.j@fb.com)
19
20 #pragma once
21
22 #include <atomic>
23 #include <cassert>
24 #include <cstdlib>
25 #include <memory>
26 #include <stdexcept>
27 #include <type_traits>
28 #include <utility>
29
30 namespace folly {
31
32 /*
33  * ProducerConsumerQueue is a one producer and one consumer queue
34  * without locks.
35  */
36 template<class T>
37 struct ProducerConsumerQueue {
38   typedef T value_type;
39
40   ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
41   ProducerConsumerQueue& operator = (const ProducerConsumerQueue&) = delete;
42
43   // size must be >= 2.
44   //
45   // Also, note that the number of usable slots in the queue at any
46   // given time is actually (size-1), so if you start with an empty queue,
47   // isFull() will return true after size-1 insertions.
48   explicit ProducerConsumerQueue(uint32_t size)
49     : size_(size)
50     , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
51     , readIndex_(0)
52     , writeIndex_(0)
53   {
54     assert(size >= 2);
55     if (!records_) {
56       throw std::bad_alloc();
57     }
58   }
59
60   ~ProducerConsumerQueue() {
61     // We need to destruct anything that may still exist in our queue.
62     // (No real synchronization needed at destructor time: only one
63     // thread can be doing this.)
64     if (!std::is_trivially_destructible<T>::value) {
65       size_t read = readIndex_;
66       size_t end = writeIndex_;
67       while (read != end) {
68         records_[read].~T();
69         if (++read == size_) {
70           read = 0;
71         }
72       }
73     }
74
75     std::free(records_);
76   }
77
78   template<class ...Args>
79   bool write(Args&&... recordArgs) {
80     auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
81     auto nextRecord = currentWrite + 1;
82     if (nextRecord == size_) {
83       nextRecord = 0;
84     }
85     if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
86       new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
87       writeIndex_.store(nextRecord, std::memory_order_release);
88       return true;
89     }
90
91     // queue is full
92     return false;
93   }
94
95   // move (or copy) the value at the front of the queue to given variable
96   bool read(T& record) {
97     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
98     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
99       // queue is empty
100       return false;
101     }
102
103     auto nextRecord = currentRead + 1;
104     if (nextRecord == size_) {
105       nextRecord = 0;
106     }
107     record = std::move(records_[currentRead]);
108     records_[currentRead].~T();
109     readIndex_.store(nextRecord, std::memory_order_release);
110     return true;
111   }
112
113   // pointer to the value at the front of the queue (for use in-place) or
114   // nullptr if empty.
115   T* frontPtr() {
116     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
117     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
118       // queue is empty
119       return nullptr;
120     }
121     return &records_[currentRead];
122   }
123
124   // queue must not be empty
125   void popFront() {
126     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
127     assert(currentRead != writeIndex_.load(std::memory_order_acquire));
128
129     auto nextRecord = currentRead + 1;
130     if (nextRecord == size_) {
131       nextRecord = 0;
132     }
133     records_[currentRead].~T();
134     readIndex_.store(nextRecord, std::memory_order_release);
135   }
136
137   bool isEmpty() const {
138     return readIndex_.load(std::memory_order_acquire) ==
139         writeIndex_.load(std::memory_order_acquire);
140   }
141
142   bool isFull() const {
143     auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
144     if (nextRecord == size_) {
145       nextRecord = 0;
146     }
147     if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
148       return false;
149     }
150     // queue is full
151     return true;
152   }
153
154   // * If called by consumer, then true size may be more (because producer may
155   //   be adding items concurrently).
156   // * If called by producer, then true size may be less (because consumer may
157   //   be removing items concurrently).
158   // * It is undefined to call this from any other thread.
159   size_t sizeGuess() const {
160     int ret = writeIndex_.load(std::memory_order_acquire) -
161         readIndex_.load(std::memory_order_acquire);
162     if (ret < 0) {
163       ret += size_;
164     }
165     return ret;
166   }
167
168 private:
169   const uint32_t size_;
170   T* const records_;
171
172   std::atomic<unsigned int> readIndex_;
173   std::atomic<unsigned int> writeIndex_;
174 };
175
176 }