1 #include <cds/misc/RigtorpSPSCQueue.h>
2 #include <cds_test/stress_test.h>
3 #include <cds_test/stress_test_util.h>
11 class RigtorpSPSCQueueTest_Parallel : public cds_test::stress_fixture {
13 static size_t s_nRigtorpSPSCQueuePassCount;
14 static size_t s_nRigtorpSPSCQueueCapacity;
15 static atomic_int producer_num;
17 static void SetUpTestCase() {
18 cds_test::config const &cfg = get_config("Misc");
19 GetConfigExpected(RigtorpSPSCQueuePassCount, 1000000);
20 GetConfigExpected(RigtorpSPSCQueueCapacity, 1000000);
23 template <typename Queue>
24 static void run_producer(Queue *q, size_t enqueue_count) {
25 for (size_t i = 0; i < enqueue_count; i++) {
26 size_t elem_to_push = rand(enqueue_count);
30 q->push(elem_to_push);
32 producer_num.fetch_sub(1, std::memory_order_release);
35 template <typename Queue>
36 static void run_consumer(Queue *q) {
37 size_t dequeue_sum = 0;
39 size_t *res = nullptr;
40 while ((res = q->front())) {
44 if (producer_num.load(std::memory_order_acquire) == 0) {
45 while ((res = q->front())) {
52 EXPECT_GT(dequeue_sum, 0);
55 template <typename Queue>
56 static void RigtorpSPSCThreading(Queue *q, size_t producer_pass_count) {
57 producer_num.store(1, std::memory_order_relaxed);
58 size_t total_thread_cnt = 2;
59 std::unique_ptr<std::thread[]> threads(new std::thread[total_thread_cnt]);
60 threads[0] = std::thread(run_producer<Queue>, q, producer_pass_count);
61 threads[1] = std::thread(run_consumer<Queue>, q);
62 for (size_t i = 0; i < total_thread_cnt; i++) {
68 atomic_int RigtorpSPSCQueueTest_Parallel::producer_num;
69 size_t RigtorpSPSCQueueTest_Parallel::s_nRigtorpSPSCQueuePassCount;
70 size_t RigtorpSPSCQueueTest_Parallel::s_nRigtorpSPSCQueueCapacity;
72 TEST_F(RigtorpSPSCQueueTest_Parallel, PushPop) {
73 std::unique_ptr<rigtorp::SPSCQueue<size_t>> q(
74 new rigtorp::SPSCQueue<size_t>(s_nRigtorpSPSCQueueCapacity));
75 RigtorpSPSCThreading(q.get(), s_nRigtorpSPSCQueuePassCount);