Allow forcing value for FOLLY_FUTURE_USING_FIBER in folly-config.h
[folly.git] / folly / ProducerConsumerQueue.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author Bo Hu (bhu@fb.com)
18 // @author Jordan DeLong (delong.j@fb.com)
19
20 #pragma once
21
22 #include <atomic>
23 #include <cassert>
24 #include <cstdlib>
25 #include <memory>
26 #include <stdexcept>
27 #include <type_traits>
28 #include <utility>
29
30 #include <folly/concurrency/CacheLocality.h>
31
32 namespace folly {
33
34 /*
35  * ProducerConsumerQueue is a one producer and one consumer queue
36  * without locks.
37  */
38 template <class T>
39 struct ProducerConsumerQueue {
40   typedef T value_type;
41
42   ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
43   ProducerConsumerQueue& operator = (const ProducerConsumerQueue&) = delete;
44
45   // size must be >= 2.
46   //
47   // Also, note that the number of usable slots in the queue at any
48   // given time is actually (size-1), so if you start with an empty queue,
49   // isFull() will return true after size-1 insertions.
50   explicit ProducerConsumerQueue(uint32_t size)
51     : size_(size)
52     , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
53     , readIndex_(0)
54     , writeIndex_(0)
55   {
56     assert(size >= 2);
57     if (!records_) {
58       throw std::bad_alloc();
59     }
60   }
61
62   ~ProducerConsumerQueue() {
63     // We need to destruct anything that may still exist in our queue.
64     // (No real synchronization needed at destructor time: only one
65     // thread can be doing this.)
66     if (!std::is_trivially_destructible<T>::value) {
67       size_t readIndex = readIndex_;
68       size_t endIndex = writeIndex_;
69       while (readIndex != endIndex) {
70         records_[readIndex].~T();
71         if (++readIndex == size_) {
72           readIndex = 0;
73         }
74       }
75     }
76
77     std::free(records_);
78   }
79
80   template <class... Args>
81   bool write(Args&&... recordArgs) {
82     auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
83     auto nextRecord = currentWrite + 1;
84     if (nextRecord == size_) {
85       nextRecord = 0;
86     }
87     if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
88       new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
89       writeIndex_.store(nextRecord, std::memory_order_release);
90       return true;
91     }
92
93     // queue is full
94     return false;
95   }
96
97   // move (or copy) the value at the front of the queue to given variable
98   bool read(T& record) {
99     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
100     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
101       // queue is empty
102       return false;
103     }
104
105     auto nextRecord = currentRead + 1;
106     if (nextRecord == size_) {
107       nextRecord = 0;
108     }
109     record = std::move(records_[currentRead]);
110     records_[currentRead].~T();
111     readIndex_.store(nextRecord, std::memory_order_release);
112     return true;
113   }
114
115   // pointer to the value at the front of the queue (for use in-place) or
116   // nullptr if empty.
117   T* frontPtr() {
118     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
119     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
120       // queue is empty
121       return nullptr;
122     }
123     return &records_[currentRead];
124   }
125
126   // queue must not be empty
127   void popFront() {
128     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
129     assert(currentRead != writeIndex_.load(std::memory_order_acquire));
130
131     auto nextRecord = currentRead + 1;
132     if (nextRecord == size_) {
133       nextRecord = 0;
134     }
135     records_[currentRead].~T();
136     readIndex_.store(nextRecord, std::memory_order_release);
137   }
138
139   bool isEmpty() const {
140     return readIndex_.load(std::memory_order_acquire) ==
141         writeIndex_.load(std::memory_order_acquire);
142   }
143
144   bool isFull() const {
145     auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
146     if (nextRecord == size_) {
147       nextRecord = 0;
148     }
149     if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
150       return false;
151     }
152     // queue is full
153     return true;
154   }
155
156   // * If called by consumer, then true size may be more (because producer may
157   //   be adding items concurrently).
158   // * If called by producer, then true size may be less (because consumer may
159   //   be removing items concurrently).
160   // * It is undefined to call this from any other thread.
161   size_t sizeGuess() const {
162     int ret = writeIndex_.load(std::memory_order_acquire) -
163         readIndex_.load(std::memory_order_acquire);
164     if (ret < 0) {
165       ret += size_;
166     }
167     return ret;
168   }
169
170  private:
171   char pad0_[CacheLocality::kFalseSharingRange];
172   const uint32_t size_;
173   T* const records_;
174
175   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned int> readIndex_;
176   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned int> writeIndex_;
177
178   char pad1_[CacheLocality::kFalseSharingRange - sizeof(writeIndex_)];
179 };
180
181 }