1 #include <cds/misc/RigtorpMPMCQueue.h>
2 #include <cds_test/stress_test.h>
3 #include <cds_test/stress_test_util.h>
11 class RigtorpMPMCQueueTest_Parallel : public cds_test::stress_fixture {
13 static size_t s_nRigtorpMPMCQueueThreadCount;
14 static size_t s_nRigtorpMPMCQueuePassCount;
15 static size_t s_nRigtorpMPMCQueueCapacity;
16 static atomic_int producer_num;
18 static void SetUpTestCase() {
19 cds_test::config const &cfg = get_config("Misc");
20 GetConfigExpected(RigtorpMPMCQueueThreadCount, 2);
21 GetConfigExpected(RigtorpMPMCQueuePassCount, 10000);
22 GetConfigExpected(RigtorpMPMCQueueCapacity, 2048);
25 template <typename Queue>
26 static void run_producer(Queue *q, size_t enqueue_count) {
27 for (size_t i = 0; i < enqueue_count; i++) {
28 size_t elem_to_push = rand(enqueue_count);
32 q->push(elem_to_push);
34 producer_num.fetch_sub(1, std::memory_order_release);
37 template <typename Queue>
38 static void run_consumer(Queue *q) {
39 size_t dequeue_sum = 0;
42 if (q->try_pop(res)) {
44 } else if (producer_num.load(std::memory_order_acquire) == 0) {
45 while (q->try_pop(res)) {
51 EXPECT_GT(dequeue_sum, 0);
54 template <typename Queue>
55 static void RigtorpMPMCThreading(Queue *q, size_t producer_pass_count) {
56 size_t producer_cnt = s_nRigtorpMPMCQueueThreadCount / 2;
57 size_t consumer_cnt = s_nRigtorpMPMCQueueThreadCount - producer_cnt;
58 producer_num.store(producer_cnt, std::memory_order_relaxed);
59 std::unique_ptr<std::thread[]> threads(
60 new std::thread[s_nRigtorpMPMCQueueThreadCount]);
61 for (size_t i = 0; i < s_nRigtorpMPMCQueueThreadCount; i++) {
62 if (i < producer_cnt) {
63 threads[i] = std::thread(run_producer<Queue>, q, producer_pass_count);
65 threads[i] = std::thread(run_consumer<Queue>, q);
68 for (size_t i = 0; i < s_nRigtorpMPMCQueueThreadCount; i++) {
74 atomic_int RigtorpMPMCQueueTest_Parallel::producer_num;
75 size_t RigtorpMPMCQueueTest_Parallel::s_nRigtorpMPMCQueueThreadCount;
76 size_t RigtorpMPMCQueueTest_Parallel::s_nRigtorpMPMCQueuePassCount;
77 size_t RigtorpMPMCQueueTest_Parallel::s_nRigtorpMPMCQueueCapacity;
79 TEST_F(RigtorpMPMCQueueTest_Parallel, PushPop) {
80 std::unique_ptr<rigtorp::MPMCQueue<size_t>> q(
81 new rigtorp::MPMCQueue<size_t>(s_nRigtorpMPMCQueueCapacity));
82 RigtorpMPMCThreading(q.get(), s_nRigtorpMPMCQueuePassCount);