Adds Rigtorp queues parallel test cases
[libcds.git] / test / stress / misc / rigtorp_spsc_driver.cpp
1 #include <cds/misc/RigtorpSPSCQueue.h>
2 #include <cds_test/stress_test.h>
3 #include <cds_test/stress_test_util.h>
4 #include <ctime>
5 #include <iostream>
6
7 using namespace std;
8
9 namespace {
10
11 class RigtorpSPSCQueueTest_Parallel : public cds_test::stress_fixture {
12 protected:
13   static size_t s_nRigtorpSPSCQueuePassCount;
14   static size_t s_nRigtorpSPSCQueueCapacity;
15   static atomic_int producer_num;
16
17   static void SetUpTestCase() {
18     cds_test::config const &cfg = get_config("Misc");
19     GetConfigExpected(RigtorpSPSCQueuePassCount, 1000000);
20     GetConfigExpected(RigtorpSPSCQueueCapacity, 1000000);
21   }
22
23   template <typename Queue>
24   static void run_producer(Queue *q, size_t enqueue_count) {
25     for (size_t i = 0; i < enqueue_count; i++) {
26       size_t elem_to_push = rand(enqueue_count);
27       if (!elem_to_push) {
28         elem_to_push++;
29       }
30       q->push(elem_to_push);
31     }
32     producer_num.fetch_sub(1, std::memory_order_release);
33   }
34
35         template <typename Queue>
36         static void run_consumer(Queue *q) {
37     size_t dequeue_sum = 0;
38     while (true) {
39       size_t *res = nullptr;
40       while ((res = q->front())) {
41         dequeue_sum += *res;
42         q->pop();
43       }
44       if (producer_num.load(std::memory_order_acquire) == 0) {
45         while ((res = q->front())) {
46           dequeue_sum += *res;
47           q->pop();
48         }
49         break;
50       }
51     }
52     EXPECT_GT(dequeue_sum, 0);
53   }
54
55   template <typename Queue>
56   static void RigtorpSPSCThreading(Queue *q, size_t producer_pass_count) {
57     producer_num.store(1, std::memory_order_relaxed);
58     size_t total_thread_cnt = 2;
59     std::unique_ptr<std::thread[]> threads(new std::thread[total_thread_cnt]);
60     threads[0] = std::thread(run_producer<Queue>, q, producer_pass_count);
61     threads[1] = std::thread(run_consumer<Queue>, q);
62     for (size_t i = 0; i < total_thread_cnt; i++) {
63       threads[i].join();
64     }
65   }
66 };
67
68 atomic_int RigtorpSPSCQueueTest_Parallel::producer_num;
69 size_t RigtorpSPSCQueueTest_Parallel::s_nRigtorpSPSCQueuePassCount;
70 size_t RigtorpSPSCQueueTest_Parallel::s_nRigtorpSPSCQueueCapacity;
71
72 TEST_F(RigtorpSPSCQueueTest_Parallel, PushPop) {
73   std::unique_ptr<rigtorp::SPSCQueue<size_t>> q(
74       new rigtorp::SPSCQueue<size_t>(s_nRigtorpSPSCQueueCapacity));
75   RigtorpSPSCThreading(q.get(), s_nRigtorpSPSCQueuePassCount);
76 }
77
78 } // namespace