From 2ffdfa60c10505851c105ef69a05251e0b73b8a5 Mon Sep 17 00:00:00 2001 From: Peizhao Ou Date: Fri, 26 Jan 2018 11:47:20 -0800 Subject: [PATCH] Adds RigtorpSPSC benchmark --- cds/misc/RigtorpSPSCQueue.h | 170 ++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 cds/misc/RigtorpSPSCQueue.h diff --git a/cds/misc/RigtorpSPSCQueue.h b/cds/misc/RigtorpSPSCQueue.h new file mode 100644 index 00000000..60fc2f5d --- /dev/null +++ b/cds/misc/RigtorpSPSCQueue.h @@ -0,0 +1,170 @@ +/* +Copyright (c) 2017 Erik Rigtorp + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + */ + +#pragma once + +#include +#include +#include +#include + +namespace rigtorp { + +template class SPSCQueue { +public: + explicit SPSCQueue(const size_t capacity) + : capacity_(capacity), + slots_(capacity_ < 2 ? nullptr + : static_cast(operator new[]( + sizeof(T) * (capacity_ + 2 * kPadding)))), + head_(0), tail_(0) { + if (capacity_ < 2) { + throw std::invalid_argument("size < 2"); + } + assert(alignof(SPSCQueue) >= kCacheLineSize); + assert(reinterpret_cast(&tail_) - + reinterpret_cast(&head_) >= + kCacheLineSize); + } + + ~SPSCQueue() { + while (front()) { + pop(); + } + operator delete[](slots_); + } + + // non-copyable and non-movable + SPSCQueue(const SPSCQueue &) = delete; + SPSCQueue &operator=(const SPSCQueue &) = delete; + + template + void emplace(Args &&... args) noexcept( + std::is_nothrow_constructible::value) { + static_assert(std::is_constructible::value, + "T must be constructible with Args&&..."); + auto const head = head_.load(std::memory_order_relaxed); + auto nextHead = head + 1; + if (nextHead == capacity_) { + nextHead = 0; + } + while (nextHead == tail_.load(std::memory_order_acquire)) + ; + new (&slots_[head + kPadding]) T(std::forward(args)...); + head_.store(nextHead, std::memory_order_release); + } + + template + bool try_emplace(Args &&... args) noexcept( + std::is_nothrow_constructible::value) { + static_assert(std::is_constructible::value, + "T must be constructible with Args&&..."); + auto const head = head_.load(std::memory_order_relaxed); + auto nextHead = head + 1; + if (nextHead == capacity_) { + nextHead = 0; + } + if (nextHead == tail_.load(std::memory_order_acquire)) { + return false; + } + new (&slots_[head + kPadding]) T(std::forward(args)...); + head_.store(nextHead, std::memory_order_release); + return true; + } + + void push(const T &v) noexcept(std::is_nothrow_copy_constructible::value) { + static_assert(std::is_copy_constructible::value, + "T must be copy constructible"); + emplace(v); + } + + template ::value>::type> + void push(P &&v) noexcept(std::is_nothrow_constructible::value) { + emplace(std::forward

(v)); + } + + bool + try_push(const T &v) noexcept(std::is_nothrow_copy_constructible::value) { + static_assert(std::is_copy_constructible::value, + "T must be copy constructible"); + return try_emplace(v); + } + + template ::value>::type> + bool try_push(P &&v) noexcept(std::is_nothrow_constructible::value) { + return try_emplace(std::forward

(v)); + } + + T *front() noexcept { + auto const tail = tail_.load(std::memory_order_relaxed); + if (head_.load(std::memory_order_acquire) == tail) { + return nullptr; + } + return &slots_[tail + kPadding]; + } + + void pop() noexcept { + static_assert(std::is_nothrow_destructible::value, + "T must be nothrow destructible"); + auto const tail = tail_.load(std::memory_order_relaxed); + assert(head_.load(std::memory_order_acquire) != tail); + slots_[tail + kPadding].~T(); + auto nextTail = tail + 1; + if (nextTail == capacity_) { + nextTail = 0; + } + tail_.store(nextTail, std::memory_order_release); + } + + size_t size() const noexcept { + ssize_t diff = head_.load(std::memory_order_acquire) - + tail_.load(std::memory_order_acquire); + if (diff < 0) { + diff += capacity_; + } + return diff; + } + + bool empty() const noexcept { return size() == 0; } + + size_t capacity() const noexcept { return capacity_; } + +private: + static constexpr size_t kCacheLineSize = 128; + + // Padding to avoid false sharing between slots_ and adjacent allocations + static constexpr size_t kPadding = (kCacheLineSize - 1) / sizeof(T) + 1; + +private: + const size_t capacity_; + T *const slots_; + + // Align to avoid false sharing between head_ and tail_ + alignas(kCacheLineSize) std::atomic head_; + alignas(kCacheLineSize) std::atomic tail_; + + // Padding to avoid adjacent allocations to share cache line with tail_ + char padding_[kCacheLineSize - sizeof(tail_)]; +}; +} -- 2.34.1