Uses different pass count for different parallel queue test cases
[libcds.git] / cds / misc / RigtorpMPMCQueue.h
1 /*
2 Copyright (c) 2017 Erik Rigtorp <erik@rigtorp.se>
3
4 Permission is hereby granted, free of charge, to any person obtaining a copy
5 of this software and associated documentation files (the "Software"), to deal
6 in the Software without restriction, including without limitation the rights
7 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 copies of the Software, and to permit persons to whom the Software is
9 furnished to do so, subject to the following conditions:
10
11 The above copyright notice and this permission notice shall be included in all
12 copies or substantial portions of the Software.
13
14 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 SOFTWARE.
21  */
22
23 #pragma once
24
25 #include <atomic>
26 #include <cds/details/defs.h>
27 #include <limits>
28 #include <memory>
29 #include <stdexcept>
30
31 namespace rigtorp {
32
33 template <typename T> class MPMCQueue {
34 private:
35   static_assert(std::is_nothrow_copy_assignable<T>::value ||
36                     std::is_nothrow_move_assignable<T>::value,
37                 "T must be nothrow copy or move assignable");
38
39   static_assert(std::is_nothrow_destructible<T>::value,
40                 "T must be nothrow destructible");
41
42 public:
43   explicit MPMCQueue(const size_t capacity)
44       : capacity_(capacity), head_(0), tail_(0) {
45     if (capacity_ < 1) {
46       throw std::invalid_argument("capacity < 1");
47     }
48     size_t space = capacity * sizeof(Slot) + kCacheLineSize - 1;
49     buf_ = malloc(space);
50     if (buf_ == nullptr) {
51       throw std::bad_alloc();
52     }
53     void *buf = buf_;
54     slots_ = reinterpret_cast<Slot *>(
55         std::align(kCacheLineSize, capacity * sizeof(Slot), buf, space));
56     if (slots_ == nullptr) {
57       free(buf_);
58       throw std::bad_alloc();
59     }
60     for (size_t i = 0; i < capacity_; ++i) {
61       new (&slots_[i]) Slot();
62     }
63     static_assert(sizeof(MPMCQueue<T>) % kCacheLineSize == 0,
64                   "MPMCQueue<T> size must be a multiple of cache line size to "
65                   "prevent false sharing between adjacent queues");
66     static_assert(sizeof(Slot) % kCacheLineSize == 0,
67                   "Slot size must be a multiple of cache line size to prevent "
68                   "false sharing between adjacent slots");
69     assert(reinterpret_cast<size_t>(slots_) % kCacheLineSize == 0 &&
70            "slots_ array must be aligned to cache line size to prevent false "
71            "sharing between adjacent slots");
72     assert(reinterpret_cast<char *>(&tail_) -
73                    reinterpret_cast<char *>(&head_) >=
74                kCacheLineSize &&
75            "head and tail must be a cache line apart to prevent false sharing");
76   }
77
78   ~MPMCQueue() noexcept {
79     for (size_t i = 0; i < capacity_; ++i) {
80       slots_[i].~Slot();
81     }
82     free(buf_);
83   }
84
85   // non-copyable and non-movable
86   MPMCQueue(const MPMCQueue &) = delete;
87   MPMCQueue &operator=(const MPMCQueue &) = delete;
88
89   template <typename... Args> void emplace(Args &&... args) noexcept {
90     static_assert(std::is_nothrow_constructible<T, Args &&...>::value,
91                   "T must be nothrow constructible with Args&&...");
92     auto const head = head_.fetch_add(1);
93     auto &slot = slots_[idx(head)];
94     while (turn(head) * 2 != slot.turn.load(std::memory_order_acquire))
95       ;
96     slot.construct(std::forward<Args>(args)...);
97     slot.turn.store(turn(head) * 2 + 1, std::memory_order_release);
98   }
99
100   template <typename... Args> bool try_emplace(Args &&... args) noexcept {
101     static_assert(std::is_nothrow_constructible<T, Args &&...>::value,
102                   "T must be nothrow constructible with Args&&...");
103     auto head = head_.load(std::memory_order_acquire);
104     for (;;) {
105       auto &slot = slots_[idx(head)];
106       if (turn(head) * 2 == slot.turn.load(std::memory_order_acquire)) {
107         if (head_.compare_exchange_strong(head, head + 1)) {
108           slot.construct(std::forward<Args>(args)...);
109           slot.turn.store(turn(head) * 2 + 1, std::memory_order_release);
110           return true;
111         }
112       } else {
113         auto const prevHead = head;
114         head = head_.load(std::memory_order_acquire);
115         if (head == prevHead) {
116           return false;
117         }
118       }
119     }
120   }
121
122   void push(const T &v) noexcept {
123     static_assert(std::is_nothrow_copy_constructible<T>::value,
124                   "T must be nothrow copy constructible");
125     emplace(v);
126   }
127
128   template <typename P,
129             typename = typename std::enable_if<
130                 std::is_nothrow_constructible<T, P &&>::value>::type>
131   void push(P &&v) noexcept {
132     emplace(std::forward<P>(v));
133   }
134
135   bool try_push(const T &v) noexcept {
136     static_assert(std::is_nothrow_copy_constructible<T>::value,
137                   "T must be nothrow copy constructible");
138     return try_emplace(v);
139   }
140
141   template <typename P,
142             typename = typename std::enable_if<
143                 std::is_nothrow_constructible<T, P &&>::value>::type>
144   bool try_push(P &&v) noexcept {
145     return try_emplace(std::forward<P>(v));
146   }
147
148   void pop(T &v) noexcept {
149     auto const tail = tail_.fetch_add(1);
150     auto &slot = slots_[idx(tail)];
151     while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire))
152       ;
153     v = slot.move();
154     slot.destroy();
155     slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release);
156   }
157
158   bool try_pop(T &v) noexcept {
159     auto tail = tail_.load(std::memory_order_acquire);
160     for (;;) {
161       auto &slot = slots_[idx(tail)];
162       if (turn(tail) * 2 + 1 == slot.turn.load(std::memory_order_acquire)) {
163         if (tail_.compare_exchange_strong(tail, tail + 1)) {
164           v = slot.move();
165           slot.destroy();
166           slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release);
167           return true;
168         }
169       } else {
170         auto const prevTail = tail;
171         tail = tail_.load(std::memory_order_acquire);
172         if (tail == prevTail) {
173           return false;
174         }
175       }
176     }
177   }
178
179 private:
180   constexpr size_t idx(size_t i) const noexcept { return i % capacity_; }
181
182   constexpr size_t turn(size_t i) const noexcept { return i / capacity_; }
183
184   static constexpr size_t kCacheLineSize = 128;
185
186   struct Slot {
187     ~Slot() noexcept {
188       if (turn & 1) {
189         destroy();
190       }
191     }
192
193     template <typename... Args> void construct(Args &&... args) noexcept {
194       static_assert(std::is_nothrow_constructible<T, Args &&...>::value,
195                     "T must be nothrow constructible with Args&&...");
196       new (&storage) T(std::forward<Args>(args)...);
197     }
198
199     void destroy() noexcept {
200       static_assert(std::is_nothrow_destructible<T>::value,
201                     "T must be nothrow destructible");
202       reinterpret_cast<T *>(&storage)->~T();
203     }
204
205     T &&move() noexcept { return reinterpret_cast<T &&>(storage); }
206
207     // Align to avoid false sharing between adjacent slots
208     alignas(kCacheLineSize) std::atomic<size_t> turn = {0};
209     typename std::aligned_storage<sizeof(T), alignof(T)>::type storage;
210   };
211
212 private:
213   const size_t capacity_;
214   Slot *slots_;
215   void *buf_;
216
217   // Align to avoid false sharing between head_ and tail_
218   alignas(kCacheLineSize) std::atomic<size_t> head_;
219   alignas(kCacheLineSize) std::atomic<size_t> tail_;
220 };
221 }