Adds Rigtorp queues parallel test cases
authorPeizhao Ou <peizhaoo@uci.edu>
Sat, 17 Feb 2018 09:17:58 +0000 (01:17 -0800)
committerPeizhao Ou <peizhaoo@uci.edu>
Sat, 17 Feb 2018 09:17:58 +0000 (01:17 -0800)
test/stress/misc/CMakeLists.txt
test/stress/misc/rigtorp_mpmc_driver.cpp [new file with mode: 0644]
test/stress/misc/rigtorp_spsc_driver.cpp [new file with mode: 0644]

index d12a6e8..3a7aedc 100644 (file)
@@ -2,6 +2,8 @@ set(PACKAGE_NAME stress-misc)
 
 set(CDSSTRESS_STACK_SOURCES
     ../main.cpp
+    rigtorp_mpmc_driver.cpp
+    rigtorp_spsc_driver.cpp
     deque_driver.cpp
     spinlock_driver.cpp
     barrier_driver.cpp
diff --git a/test/stress/misc/rigtorp_mpmc_driver.cpp b/test/stress/misc/rigtorp_mpmc_driver.cpp
new file mode 100644 (file)
index 0000000..62075d1
--- /dev/null
@@ -0,0 +1,85 @@
+#include <cds/misc/RigtorpMPMCQueue.h>
+#include <cds_test/stress_test.h>
+#include <cds_test/stress_test_util.h>
+#include <ctime>
+#include <iostream>
+
+using namespace std;
+
+namespace {
+
+class RigtorpMPMCQueueTest_Parallel : public cds_test::stress_fixture {
+protected:
+  static size_t s_nRigtorpMPMCQueueThreadCount;
+  static size_t s_nRigtorpMPMCQueuePassCount;
+  static size_t s_nRigtorpMPMCQueueCapacity;
+  static atomic_int producer_num;
+
+  static void SetUpTestCase() {
+    cds_test::config const &cfg = get_config("Misc");
+    GetConfigExpected(RigtorpMPMCQueueThreadCount, 4);
+    GetConfigExpected(RigtorpMPMCQueuePassCount, 10000);
+    GetConfigExpected(RigtorpMPMCQueueCapacity, 2048);
+  }
+
+  template <typename Queue>
+  static void run_producer(Queue *q, size_t enqueue_count) {
+    for (size_t i = 0; i < enqueue_count; i++) {
+      size_t elem_to_push = rand(enqueue_count);
+      if (!elem_to_push) {
+        elem_to_push++;
+      }
+      q->push(elem_to_push);
+    }
+    producer_num.fetch_sub(1, std::memory_order_release);
+  }
+
+       template <typename Queue>
+       static void run_consumer(Queue *q) {
+    size_t dequeue_sum = 0;
+    while (true) {
+      size_t res = 0;
+      if (q->try_pop(res)) {
+        dequeue_sum += res;
+      } else if (producer_num.load(std::memory_order_acquire) == 0) {
+        while (q->try_pop(res)) {
+          dequeue_sum += res;
+        }
+        break;
+      }
+    }
+    EXPECT_GT(dequeue_sum, 0);
+  }
+
+  template <typename Queue>
+  static void RigtorpMPMCThreading(Queue *q, size_t producer_pass_count) {
+    size_t producer_cnt = s_nRigtorpMPMCQueueThreadCount / 2;
+    size_t consumer_cnt = s_nRigtorpMPMCQueueThreadCount - producer_cnt;
+    producer_num.store(producer_cnt, std::memory_order_relaxed);
+    std::unique_ptr<std::thread[]> threads(
+        new std::thread[s_nRigtorpMPMCQueueThreadCount]);
+    for (size_t i = 0; i < s_nRigtorpMPMCQueueThreadCount; i++) {
+      if (i < producer_cnt) {
+        threads[i] = std::thread(run_producer<Queue>, q, producer_pass_count);
+      } else {
+        threads[i] = std::thread(run_consumer<Queue>, q);
+      }
+    }
+    for (size_t i = 0; i < s_nRigtorpMPMCQueueThreadCount; i++) {
+      threads[i].join();
+    }
+  }
+};
+
+atomic_int RigtorpMPMCQueueTest_Parallel::producer_num;
+size_t RigtorpMPMCQueueTest_Parallel::s_nRigtorpMPMCQueueThreadCount;
+size_t RigtorpMPMCQueueTest_Parallel::s_nRigtorpMPMCQueuePassCount;
+size_t RigtorpMPMCQueueTest_Parallel::s_nRigtorpMPMCQueueCapacity;
+
+TEST_F(RigtorpMPMCQueueTest_Parallel, PushPop) {
+  std::unique_ptr<rigtorp::MPMCQueue<size_t>> q(
+      new rigtorp::MPMCQueue<size_t>(s_nRigtorpMPMCQueueCapacity));
+  RigtorpMPMCThreading(q.get(), s_nRigtorpMPMCQueuePassCount);
+}
+
+} // namespace
diff --git a/test/stress/misc/rigtorp_spsc_driver.cpp b/test/stress/misc/rigtorp_spsc_driver.cpp
new file mode 100644 (file)
index 0000000..ade9994
--- /dev/null
@@ -0,0 +1,78 @@
+#include <cds/misc/RigtorpSPSCQueue.h>
+#include <cds_test/stress_test.h>
+#include <cds_test/stress_test_util.h>
+#include <ctime>
+#include <iostream>
+
+using namespace std;
+
+namespace {
+
+class RigtorpSPSCQueueTest_Parallel : public cds_test::stress_fixture {
+protected:
+  static size_t s_nRigtorpSPSCQueuePassCount;
+  static size_t s_nRigtorpSPSCQueueCapacity;
+  static atomic_int producer_num;
+
+  static void SetUpTestCase() {
+    cds_test::config const &cfg = get_config("Misc");
+    GetConfigExpected(RigtorpSPSCQueuePassCount, 1000000);
+    GetConfigExpected(RigtorpSPSCQueueCapacity, 1000000);
+  }
+
+  template <typename Queue>
+  static void run_producer(Queue *q, size_t enqueue_count) {
+    for (size_t i = 0; i < enqueue_count; i++) {
+      size_t elem_to_push = rand(enqueue_count);
+      if (!elem_to_push) {
+        elem_to_push++;
+      }
+      q->push(elem_to_push);
+    }
+    producer_num.fetch_sub(1, std::memory_order_release);
+  }
+
+       template <typename Queue>
+       static void run_consumer(Queue *q) {
+    size_t dequeue_sum = 0;
+    while (true) {
+      size_t *res = nullptr;
+      while ((res = q->front())) {
+        dequeue_sum += *res;
+        q->pop();
+      }
+      if (producer_num.load(std::memory_order_acquire) == 0) {
+        while ((res = q->front())) {
+          dequeue_sum += *res;
+          q->pop();
+        }
+        break;
+      }
+    }
+    EXPECT_GT(dequeue_sum, 0);
+  }
+
+  template <typename Queue>
+  static void RigtorpSPSCThreading(Queue *q, size_t producer_pass_count) {
+    producer_num.store(1, std::memory_order_relaxed);
+    size_t total_thread_cnt = 2;
+    std::unique_ptr<std::thread[]> threads(new std::thread[total_thread_cnt]);
+    threads[0] = std::thread(run_producer<Queue>, q, producer_pass_count);
+    threads[1] = std::thread(run_consumer<Queue>, q);
+    for (size_t i = 0; i < total_thread_cnt; i++) {
+      threads[i].join();
+    }
+  }
+};
+
+atomic_int RigtorpSPSCQueueTest_Parallel::producer_num;
+size_t RigtorpSPSCQueueTest_Parallel::s_nRigtorpSPSCQueuePassCount;
+size_t RigtorpSPSCQueueTest_Parallel::s_nRigtorpSPSCQueueCapacity;
+
+TEST_F(RigtorpSPSCQueueTest_Parallel, PushPop) {
+  std::unique_ptr<rigtorp::SPSCQueue<size_t>> q(
+      new rigtorp::SPSCQueue<size_t>(s_nRigtorpSPSCQueueCapacity));
+  RigtorpSPSCThreading(q.get(), s_nRigtorpSPSCQueuePassCount);
+}
+
+} // namespace