Adds RigtorpSPSC benchmark
authorPeizhao Ou <peizhaoo@uci.edu>
Fri, 26 Jan 2018 19:47:20 +0000 (11:47 -0800)
committerPeizhao Ou <peizhaoo@uci.edu>
Fri, 26 Jan 2018 19:47:20 +0000 (11:47 -0800)
cds/misc/RigtorpSPSCQueue.h [new file with mode: 0644]

diff --git a/cds/misc/RigtorpSPSCQueue.h b/cds/misc/RigtorpSPSCQueue.h
new file mode 100644 (file)
index 0000000..60fc2f5
--- /dev/null
@@ -0,0 +1,170 @@
+/*
+Copyright (c) 2017 Erik Rigtorp <erik@rigtorp.se>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <cassert>
+#include <stdexcept>
+#include <type_traits>
+
+namespace rigtorp {
+
+template <typename T> class SPSCQueue {
+public:
+  explicit SPSCQueue(const size_t capacity)
+      : capacity_(capacity),
+        slots_(capacity_ < 2 ? nullptr
+                             : static_cast<T *>(operator new[](
+                                   sizeof(T) * (capacity_ + 2 * kPadding)))),
+        head_(0), tail_(0) {
+    if (capacity_ < 2) {
+      throw std::invalid_argument("size < 2");
+    }
+    assert(alignof(SPSCQueue<T>) >= kCacheLineSize);
+    assert(reinterpret_cast<char *>(&tail_) -
+               reinterpret_cast<char *>(&head_) >=
+           kCacheLineSize);
+  }
+
+  ~SPSCQueue() {
+    while (front()) {
+      pop();
+    }
+    operator delete[](slots_);
+  }
+
+  // non-copyable and non-movable
+  SPSCQueue(const SPSCQueue &) = delete;
+  SPSCQueue &operator=(const SPSCQueue &) = delete;
+
+  template <typename... Args>
+  void emplace(Args &&... args) noexcept(
+      std::is_nothrow_constructible<T, Args &&...>::value) {
+    static_assert(std::is_constructible<T, Args &&...>::value,
+                  "T must be constructible with Args&&...");
+    auto const head = head_.load(std::memory_order_relaxed);
+    auto nextHead = head + 1;
+    if (nextHead == capacity_) {
+      nextHead = 0;
+    }
+    while (nextHead == tail_.load(std::memory_order_acquire))
+      ;
+    new (&slots_[head + kPadding]) T(std::forward<Args>(args)...);
+    head_.store(nextHead, std::memory_order_release);
+  }
+
+  template <typename... Args>
+  bool try_emplace(Args &&... args) noexcept(
+      std::is_nothrow_constructible<T, Args &&...>::value) {
+    static_assert(std::is_constructible<T, Args &&...>::value,
+                  "T must be constructible with Args&&...");
+    auto const head = head_.load(std::memory_order_relaxed);
+    auto nextHead = head + 1;
+    if (nextHead == capacity_) {
+      nextHead = 0;
+    }
+    if (nextHead == tail_.load(std::memory_order_acquire)) {
+      return false;
+    }
+    new (&slots_[head + kPadding]) T(std::forward<Args>(args)...);
+    head_.store(nextHead, std::memory_order_release);
+    return true;
+  }
+
+  void push(const T &v) noexcept(std::is_nothrow_copy_constructible<T>::value) {
+    static_assert(std::is_copy_constructible<T>::value,
+                  "T must be copy constructible");
+    emplace(v);
+  }
+
+  template <typename P, typename = typename std::enable_if<
+                            std::is_constructible<T, P &&>::value>::type>
+  void push(P &&v) noexcept(std::is_nothrow_constructible<T, P &&>::value) {
+    emplace(std::forward<P>(v));
+  }
+
+  bool
+  try_push(const T &v) noexcept(std::is_nothrow_copy_constructible<T>::value) {
+    static_assert(std::is_copy_constructible<T>::value,
+                  "T must be copy constructible");
+    return try_emplace(v);
+  }
+
+  template <typename P, typename = typename std::enable_if<
+                            std::is_constructible<T, P &&>::value>::type>
+  bool try_push(P &&v) noexcept(std::is_nothrow_constructible<T, P &&>::value) {
+    return try_emplace(std::forward<P>(v));
+  }
+
+  T *front() noexcept {
+    auto const tail = tail_.load(std::memory_order_relaxed);
+    if (head_.load(std::memory_order_acquire) == tail) {
+      return nullptr;
+    }
+    return &slots_[tail + kPadding];
+  }
+
+  void pop() noexcept {
+    static_assert(std::is_nothrow_destructible<T>::value,
+                  "T must be nothrow destructible");
+    auto const tail = tail_.load(std::memory_order_relaxed);
+    assert(head_.load(std::memory_order_acquire) != tail);
+    slots_[tail + kPadding].~T();
+    auto nextTail = tail + 1;
+    if (nextTail == capacity_) {
+      nextTail = 0;
+    }
+    tail_.store(nextTail, std::memory_order_release);
+  }
+
+  size_t size() const noexcept {
+    ssize_t diff = head_.load(std::memory_order_acquire) -
+                   tail_.load(std::memory_order_acquire);
+    if (diff < 0) {
+      diff += capacity_;
+    }
+    return diff;
+  }
+
+  bool empty() const noexcept { return size() == 0; }
+
+  size_t capacity() const noexcept { return capacity_; }
+
+private:
+  static constexpr size_t kCacheLineSize = 128;
+
+  // Padding to avoid false sharing between slots_ and adjacent allocations
+  static constexpr size_t kPadding = (kCacheLineSize - 1) / sizeof(T) + 1;
+
+private:
+  const size_t capacity_;
+  T *const slots_;
+
+  // Align to avoid false sharing between head_ and tail_
+  alignas(kCacheLineSize) std::atomic<size_t> head_;
+  alignas(kCacheLineSize) std::atomic<size_t> tail_;
+
+  // Padding to avoid adjacent allocations to share cache line with tail_
+  char padding_[kCacheLineSize - sizeof(tail_)];
+};
+}