Allow 2 threads
[libcds.git] / test / stress / misc / rigtorp_mpmc_driver.cpp
1 #include <cds/misc/RigtorpMPMCQueue.h>
2 #include <cds_test/stress_test.h>
3 #include <cds_test/stress_test_util.h>
4 #include <ctime>
5 #include <iostream>
6
7 using namespace std;
8
9 namespace {
10
11 class RigtorpMPMCQueueTest_Parallel : public cds_test::stress_fixture {
12 protected:
13   static size_t s_nRigtorpMPMCQueueThreadCount;
14   static size_t s_nRigtorpMPMCQueuePassCount;
15   static size_t s_nRigtorpMPMCQueueCapacity;
16   static atomic_int producer_num;
17
18   static void SetUpTestCase() {
19     cds_test::config const &cfg = get_config("Misc");
20     GetConfigExpected(RigtorpMPMCQueueThreadCount, 2);
21     GetConfigExpected(RigtorpMPMCQueuePassCount, 10000);
22     GetConfigExpected(RigtorpMPMCQueueCapacity, 2048);
23   }
24
25   template <typename Queue>
26   static void run_producer(Queue *q, size_t enqueue_count) {
27     for (size_t i = 0; i < enqueue_count; i++) {
28       size_t elem_to_push = rand(enqueue_count);
29       if (!elem_to_push) {
30         elem_to_push++;
31       }
32       q->push(elem_to_push);
33     }
34     producer_num.fetch_sub(1, std::memory_order_release);
35   }
36
37         template <typename Queue>
38         static void run_consumer(Queue *q) {
39     size_t dequeue_sum = 0;
40     while (true) {
41       size_t res = 0;
42       if (q->try_pop(res)) {
43         dequeue_sum += res;
44       } else if (producer_num.load(std::memory_order_acquire) == 0) {
45         while (q->try_pop(res)) {
46           dequeue_sum += res;
47         }
48         break;
49       }
50     }
51     EXPECT_GT(dequeue_sum, 0);
52   }
53
54   template <typename Queue>
55   static void RigtorpMPMCThreading(Queue *q, size_t producer_pass_count) {
56     size_t producer_cnt = s_nRigtorpMPMCQueueThreadCount / 2;
57     size_t consumer_cnt = s_nRigtorpMPMCQueueThreadCount - producer_cnt;
58     producer_num.store(producer_cnt, std::memory_order_relaxed);
59     std::unique_ptr<std::thread[]> threads(
60         new std::thread[s_nRigtorpMPMCQueueThreadCount]);
61     for (size_t i = 0; i < s_nRigtorpMPMCQueueThreadCount; i++) {
62       if (i < producer_cnt) {
63         threads[i] = std::thread(run_producer<Queue>, q, producer_pass_count);
64       } else {
65         threads[i] = std::thread(run_consumer<Queue>, q);
66       }
67     }
68     for (size_t i = 0; i < s_nRigtorpMPMCQueueThreadCount; i++) {
69       threads[i].join();
70     }
71   }
72 };
73
74 atomic_int RigtorpMPMCQueueTest_Parallel::producer_num;
75 size_t RigtorpMPMCQueueTest_Parallel::s_nRigtorpMPMCQueueThreadCount;
76 size_t RigtorpMPMCQueueTest_Parallel::s_nRigtorpMPMCQueuePassCount;
77 size_t RigtorpMPMCQueueTest_Parallel::s_nRigtorpMPMCQueueCapacity;
78
79 TEST_F(RigtorpMPMCQueueTest_Parallel, PushPop) {
80   std::unique_ptr<rigtorp::MPMCQueue<size_t>> q(
81       new rigtorp::MPMCQueue<size_t>(s_nRigtorpMPMCQueueCapacity));
82   RigtorpMPMCThreading(q.get(), s_nRigtorpMPMCQueuePassCount);
83 }
84
85 } // namespace