Harden failure signal handler in the face of memory corruptions
[folly.git] / folly / ProducerConsumerQueue.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author Bo Hu (bhu@fb.com)
18 // @author Jordan DeLong (delong.j@fb.com)
19
20 #ifndef PRODUCER_CONSUMER_QUEUE_H_
21 #define PRODUCER_CONSUMER_QUEUE_H_
22
23 #include <atomic>
24 #include <cassert>
25 #include <cstdlib>
26 #include <stdexcept>
27 #include <type_traits>
28 #include <utility>
29 #include <boost/noncopyable.hpp>
30 #include <boost/type_traits.hpp>
31
32 namespace folly {
33
34 /*
35  * ProducerConsumerQueue is a one producer and one consumer queue
36  * without locks.
37  */
38 template<class T>
39 struct ProducerConsumerQueue : private boost::noncopyable {
40   typedef T value_type;
41
42   // size must be >= 2.
43   //
44   // Also, note that the number of usable slots in the queue at any
45   // given time is actually (size-1), so if you start with an empty queue,
46   // isFull() will return true after size-1 insertions.
47   explicit ProducerConsumerQueue(uint32_t size)
48     : size_(size)
49     , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
50     , readIndex_(0)
51     , writeIndex_(0)
52   {
53     assert(size >= 2);
54     if (!records_) {
55       throw std::bad_alloc();
56     }
57   }
58
59   ~ProducerConsumerQueue() {
60     // We need to destruct anything that may still exist in our queue.
61     // (No real synchronization needed at destructor time: only one
62     // thread can be doing this.)
63     if (!boost::has_trivial_destructor<T>::value) {
64       int read = readIndex_;
65       int end = writeIndex_;
66       while (read != end) {
67         records_[read].~T();
68         if (++read == size_) {
69           read = 0;
70         }
71       }
72     }
73
74     std::free(records_);
75   }
76
77   template<class ...Args>
78   bool write(Args&&... recordArgs) {
79     auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
80     auto nextRecord = currentWrite + 1;
81     if (nextRecord == size_) {
82       nextRecord = 0;
83     }
84     if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
85       new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
86       writeIndex_.store(nextRecord, std::memory_order_release);
87       return true;
88     }
89
90     // queue is full
91     return false;
92   }
93
94   // move (or copy) the value at the front of the queue to given variable
95   bool read(T& record) {
96     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
97     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
98       // queue is empty
99       return false;
100     }
101
102     auto nextRecord = currentRead + 1;
103     if (nextRecord == size_) {
104       nextRecord = 0;
105     }
106     record = std::move(records_[currentRead]);
107     records_[currentRead].~T();
108     readIndex_.store(nextRecord, std::memory_order_release);
109     return true;
110   }
111
112   // pointer to the value at the front of the queue (for use in-place) or
113   // nullptr if empty.
114   T* frontPtr() {
115     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
116     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
117       // queue is empty
118       return nullptr;
119     }
120     return &records_[currentRead];
121   }
122
123   // queue must not be empty
124   void popFront() {
125     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
126     assert(currentRead != writeIndex_.load(std::memory_order_acquire));
127
128     auto nextRecord = currentRead + 1;
129     if (nextRecord == size_) {
130       nextRecord = 0;
131     }
132     records_[currentRead].~T();
133     readIndex_.store(nextRecord, std::memory_order_release);
134   }
135
136   bool isEmpty() const {
137    return readIndex_.load(std::memory_order_consume) ==
138          writeIndex_.load(std::memory_order_consume);
139   }
140
141   bool isFull() const {
142     auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1;
143     if (nextRecord == size_) {
144       nextRecord = 0;
145     }
146     if (nextRecord != readIndex_.load(std::memory_order_consume)) {
147       return false;
148     }
149     // queue is full
150     return true;
151   }
152
153   // * If called by consumer, then true size may be more (because producer may
154   //   be adding items concurrently).
155   // * If called by producer, then true size may be less (because consumer may
156   //   be removing items concurrently).
157   // * It is undefined to call this from any other thread.
158   size_t sizeGuess() const {
159     int ret = writeIndex_.load(std::memory_order_consume) -
160               readIndex_.load(std::memory_order_consume);
161     if (ret < 0) {
162       ret += size_;
163     }
164     return ret;
165   }
166
167 private:
168   const uint32_t size_;
169   T* const records_;
170
171   std::atomic<int> readIndex_;
172   std::atomic<int> writeIndex_;
173 };
174
175 }
176
177 #endif