6a545dea9269bf0853ad1e6ca3504ea499c51c83
[folly.git] / folly / ProducerConsumerQueue.h
1 /*
2  * Copyright 2012 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author Bo Hu (bhu@fb.com)
18 // @author Jordan DeLong (delong.j@fb.com)
19
20 #ifndef PRODUCER_CONSUMER_QUEUE_H_
21 #define PRODUCER_CONSUMER_QUEUE_H_
22
23 #include <atomic>
24 #include <cassert>
25 #include <cstdlib>
26 #include <stdexcept>
27 #include <type_traits>
28 #include <utility>
29 #include <boost/noncopyable.hpp>
30
31 namespace folly {
32
33 /*
34  * ProducerConsumerQueue is a one producer and one consumer queue
35  * without locks.
36  */
37 template<class T>
38 struct ProducerConsumerQueue : private boost::noncopyable {
39   typedef T value_type;
40
41   // size must be >= 2
42   explicit ProducerConsumerQueue(uint32_t size)
43     : size_(size)
44     , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
45     , readIndex_(0)
46     , writeIndex_(0)
47   {
48     assert(size >= 2);
49     if (!records_) {
50       throw std::bad_alloc();
51     }
52   }
53
54   ~ProducerConsumerQueue() {
55     // We need to destruct anything that may still exist in our queue.
56     // (No real synchronization needed at destructor time: only one
57     // thread can be doing this.)
58     if (!std::has_trivial_destructor<T>::value) {
59       int read = readIndex_;
60       int end = writeIndex_;
61       while (read != end) {
62         records_[read].~T();
63         if (++read == size_) {
64           read = 0;
65         }
66       }
67     }
68
69     std::free(records_);
70   }
71
72   template<class ...Args>
73   bool write(Args&&... recordArgs) {
74     auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
75     auto nextRecord = currentWrite + 1;
76     if (nextRecord == size_) {
77       nextRecord = 0;
78     }
79     if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
80       new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
81       writeIndex_.store(nextRecord, std::memory_order_release);
82       return true;
83     }
84
85     // queue is full
86     return false;
87   }
88
89   bool read(T& record) {
90     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
91     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
92       // queue is empty
93       return false;
94     }
95
96     auto nextRecord = currentRead + 1;
97     if (nextRecord == size_) {
98       nextRecord = 0;
99     }
100     record = std::move(records_[currentRead]);
101     records_[currentRead].~T();
102     readIndex_.store(nextRecord, std::memory_order_release);
103     return true;
104   }
105
106   bool isFull() const {
107     auto nextRecord = writeIndex_.load(std::memory_order_consume) + 1;
108     if (nextRecord == size_) {
109       nextRecord = 0;
110     }
111     if (nextRecord != readIndex_.load(std::memory_order_consume)) {
112       return false;
113     }
114     // queue is full
115     return true;
116   }
117
118 private:
119   const uint32_t size_;
120   T* const records_;
121
122   std::atomic<int> readIndex_;
123   std::atomic<int> writeIndex_;
124 };
125
126 }
127
128 #endif