60fc2f5d86e6091962069a9e29bbd9f6a766fd2b
[libcds.git] / cds / misc / RigtorpSPSCQueue.h
1 /*
2 Copyright (c) 2017 Erik Rigtorp <erik@rigtorp.se>
3
4 Permission is hereby granted, free of charge, to any person obtaining a copy
5 of this software and associated documentation files (the "Software"), to deal
6 in the Software without restriction, including without limitation the rights
7 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 copies of the Software, and to permit persons to whom the Software is
9 furnished to do so, subject to the following conditions:
10
11 The above copyright notice and this permission notice shall be included in all
12 copies or substantial portions of the Software.
13
14 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 SOFTWARE.
21  */
22
23 #pragma once
24
25 #include <atomic>
26 #include <cassert>
27 #include <stdexcept>
28 #include <type_traits>
29
30 namespace rigtorp {
31
32 template <typename T> class SPSCQueue {
33 public:
34   explicit SPSCQueue(const size_t capacity)
35       : capacity_(capacity),
36         slots_(capacity_ < 2 ? nullptr
37                              : static_cast<T *>(operator new[](
38                                    sizeof(T) * (capacity_ + 2 * kPadding)))),
39         head_(0), tail_(0) {
40     if (capacity_ < 2) {
41       throw std::invalid_argument("size < 2");
42     }
43     assert(alignof(SPSCQueue<T>) >= kCacheLineSize);
44     assert(reinterpret_cast<char *>(&tail_) -
45                reinterpret_cast<char *>(&head_) >=
46            kCacheLineSize);
47   }
48
49   ~SPSCQueue() {
50     while (front()) {
51       pop();
52     }
53     operator delete[](slots_);
54   }
55
56   // non-copyable and non-movable
57   SPSCQueue(const SPSCQueue &) = delete;
58   SPSCQueue &operator=(const SPSCQueue &) = delete;
59
60   template <typename... Args>
61   void emplace(Args &&... args) noexcept(
62       std::is_nothrow_constructible<T, Args &&...>::value) {
63     static_assert(std::is_constructible<T, Args &&...>::value,
64                   "T must be constructible with Args&&...");
65     auto const head = head_.load(std::memory_order_relaxed);
66     auto nextHead = head + 1;
67     if (nextHead == capacity_) {
68       nextHead = 0;
69     }
70     while (nextHead == tail_.load(std::memory_order_acquire))
71       ;
72     new (&slots_[head + kPadding]) T(std::forward<Args>(args)...);
73     head_.store(nextHead, std::memory_order_release);
74   }
75
76   template <typename... Args>
77   bool try_emplace(Args &&... args) noexcept(
78       std::is_nothrow_constructible<T, Args &&...>::value) {
79     static_assert(std::is_constructible<T, Args &&...>::value,
80                   "T must be constructible with Args&&...");
81     auto const head = head_.load(std::memory_order_relaxed);
82     auto nextHead = head + 1;
83     if (nextHead == capacity_) {
84       nextHead = 0;
85     }
86     if (nextHead == tail_.load(std::memory_order_acquire)) {
87       return false;
88     }
89     new (&slots_[head + kPadding]) T(std::forward<Args>(args)...);
90     head_.store(nextHead, std::memory_order_release);
91     return true;
92   }
93
94   void push(const T &v) noexcept(std::is_nothrow_copy_constructible<T>::value) {
95     static_assert(std::is_copy_constructible<T>::value,
96                   "T must be copy constructible");
97     emplace(v);
98   }
99
100   template <typename P, typename = typename std::enable_if<
101                             std::is_constructible<T, P &&>::value>::type>
102   void push(P &&v) noexcept(std::is_nothrow_constructible<T, P &&>::value) {
103     emplace(std::forward<P>(v));
104   }
105
106   bool
107   try_push(const T &v) noexcept(std::is_nothrow_copy_constructible<T>::value) {
108     static_assert(std::is_copy_constructible<T>::value,
109                   "T must be copy constructible");
110     return try_emplace(v);
111   }
112
113   template <typename P, typename = typename std::enable_if<
114                             std::is_constructible<T, P &&>::value>::type>
115   bool try_push(P &&v) noexcept(std::is_nothrow_constructible<T, P &&>::value) {
116     return try_emplace(std::forward<P>(v));
117   }
118
119   T *front() noexcept {
120     auto const tail = tail_.load(std::memory_order_relaxed);
121     if (head_.load(std::memory_order_acquire) == tail) {
122       return nullptr;
123     }
124     return &slots_[tail + kPadding];
125   }
126
127   void pop() noexcept {
128     static_assert(std::is_nothrow_destructible<T>::value,
129                   "T must be nothrow destructible");
130     auto const tail = tail_.load(std::memory_order_relaxed);
131     assert(head_.load(std::memory_order_acquire) != tail);
132     slots_[tail + kPadding].~T();
133     auto nextTail = tail + 1;
134     if (nextTail == capacity_) {
135       nextTail = 0;
136     }
137     tail_.store(nextTail, std::memory_order_release);
138   }
139
140   size_t size() const noexcept {
141     ssize_t diff = head_.load(std::memory_order_acquire) -
142                    tail_.load(std::memory_order_acquire);
143     if (diff < 0) {
144       diff += capacity_;
145     }
146     return diff;
147   }
148
149   bool empty() const noexcept { return size() == 0; }
150
151   size_t capacity() const noexcept { return capacity_; }
152
153 private:
154   static constexpr size_t kCacheLineSize = 128;
155
156   // Padding to avoid false sharing between slots_ and adjacent allocations
157   static constexpr size_t kPadding = (kCacheLineSize - 1) / sizeof(T) + 1;
158
159 private:
160   const size_t capacity_;
161   T *const slots_;
162
163   // Align to avoid false sharing between head_ and tail_
164   alignas(kCacheLineSize) std::atomic<size_t> head_;
165   alignas(kCacheLineSize) std::atomic<size_t> tail_;
166
167   // Padding to avoid adjacent allocations to share cache line with tail_
168   char padding_[kCacheLineSize - sizeof(tail_)];
169 };
170 }