From: Peizhao Ou Date: Sat, 17 Feb 2018 09:17:58 +0000 (-0800) Subject: Adds Rigtorp queues parallel test cases X-Git-Url: http://plrg.eecs.uci.edu/git/?p=libcds.git;a=commitdiff_plain;h=512b79262de6a6e7c35619e09a4d5283b0a32da1 Adds Rigtorp queues parallel test cases --- diff --git a/test/stress/misc/CMakeLists.txt b/test/stress/misc/CMakeLists.txt index d12a6e84..3a7aedcf 100644 --- a/test/stress/misc/CMakeLists.txt +++ b/test/stress/misc/CMakeLists.txt @@ -2,6 +2,8 @@ set(PACKAGE_NAME stress-misc) set(CDSSTRESS_STACK_SOURCES ../main.cpp + rigtorp_mpmc_driver.cpp + rigtorp_spsc_driver.cpp deque_driver.cpp spinlock_driver.cpp barrier_driver.cpp diff --git a/test/stress/misc/rigtorp_mpmc_driver.cpp b/test/stress/misc/rigtorp_mpmc_driver.cpp new file mode 100644 index 00000000..62075d10 --- /dev/null +++ b/test/stress/misc/rigtorp_mpmc_driver.cpp @@ -0,0 +1,85 @@ +#include +#include +#include +#include +#include + +using namespace std; + +namespace { + +class RigtorpMPMCQueueTest_Parallel : public cds_test::stress_fixture { +protected: + static size_t s_nRigtorpMPMCQueueThreadCount; + static size_t s_nRigtorpMPMCQueuePassCount; + static size_t s_nRigtorpMPMCQueueCapacity; + static atomic_int producer_num; + + static void SetUpTestCase() { + cds_test::config const &cfg = get_config("Misc"); + GetConfigExpected(RigtorpMPMCQueueThreadCount, 4); + GetConfigExpected(RigtorpMPMCQueuePassCount, 10000); + GetConfigExpected(RigtorpMPMCQueueCapacity, 2048); + } + + template + static void run_producer(Queue *q, size_t enqueue_count) { + for (size_t i = 0; i < enqueue_count; i++) { + size_t elem_to_push = rand(enqueue_count); + if (!elem_to_push) { + elem_to_push++; + } + q->push(elem_to_push); + } + producer_num.fetch_sub(1, std::memory_order_release); + } + + template + static void run_consumer(Queue *q) { + size_t dequeue_sum = 0; + while (true) { + size_t res = 0; + if (q->try_pop(res)) { + dequeue_sum += res; + } else if (producer_num.load(std::memory_order_acquire) == 0) { + while (q->try_pop(res)) { + dequeue_sum += res; + } + break; + } + } + EXPECT_GT(dequeue_sum, 0); + } + + template + static void RigtorpMPMCThreading(Queue *q, size_t producer_pass_count) { + size_t producer_cnt = s_nRigtorpMPMCQueueThreadCount / 2; + size_t consumer_cnt = s_nRigtorpMPMCQueueThreadCount - producer_cnt; + producer_num.store(producer_cnt, std::memory_order_relaxed); + std::unique_ptr threads( + new std::thread[s_nRigtorpMPMCQueueThreadCount]); + for (size_t i = 0; i < s_nRigtorpMPMCQueueThreadCount; i++) { + if (i < producer_cnt) { + threads[i] = std::thread(run_producer, q, producer_pass_count); + } else { + threads[i] = std::thread(run_consumer, q); + } + } + for (size_t i = 0; i < s_nRigtorpMPMCQueueThreadCount; i++) { + threads[i].join(); + } + } +}; + +atomic_int RigtorpMPMCQueueTest_Parallel::producer_num; +size_t RigtorpMPMCQueueTest_Parallel::s_nRigtorpMPMCQueueThreadCount; +size_t RigtorpMPMCQueueTest_Parallel::s_nRigtorpMPMCQueuePassCount; +size_t RigtorpMPMCQueueTest_Parallel::s_nRigtorpMPMCQueueCapacity; + +TEST_F(RigtorpMPMCQueueTest_Parallel, PushPop) { + std::unique_ptr> q( + new rigtorp::MPMCQueue(s_nRigtorpMPMCQueueCapacity)); + RigtorpMPMCThreading(q.get(), s_nRigtorpMPMCQueuePassCount); +} + +} // namespace diff --git a/test/stress/misc/rigtorp_spsc_driver.cpp b/test/stress/misc/rigtorp_spsc_driver.cpp new file mode 100644 index 00000000..ade99941 --- /dev/null +++ b/test/stress/misc/rigtorp_spsc_driver.cpp @@ -0,0 +1,78 @@ +#include +#include +#include +#include +#include + +using namespace std; + +namespace { + +class RigtorpSPSCQueueTest_Parallel : public cds_test::stress_fixture { +protected: + static size_t s_nRigtorpSPSCQueuePassCount; + static size_t s_nRigtorpSPSCQueueCapacity; + static atomic_int producer_num; + + static void SetUpTestCase() { + cds_test::config const &cfg = get_config("Misc"); + GetConfigExpected(RigtorpSPSCQueuePassCount, 1000000); + GetConfigExpected(RigtorpSPSCQueueCapacity, 1000000); + } + + template + static void run_producer(Queue *q, size_t enqueue_count) { + for (size_t i = 0; i < enqueue_count; i++) { + size_t elem_to_push = rand(enqueue_count); + if (!elem_to_push) { + elem_to_push++; + } + q->push(elem_to_push); + } + producer_num.fetch_sub(1, std::memory_order_release); + } + + template + static void run_consumer(Queue *q) { + size_t dequeue_sum = 0; + while (true) { + size_t *res = nullptr; + while ((res = q->front())) { + dequeue_sum += *res; + q->pop(); + } + if (producer_num.load(std::memory_order_acquire) == 0) { + while ((res = q->front())) { + dequeue_sum += *res; + q->pop(); + } + break; + } + } + EXPECT_GT(dequeue_sum, 0); + } + + template + static void RigtorpSPSCThreading(Queue *q, size_t producer_pass_count) { + producer_num.store(1, std::memory_order_relaxed); + size_t total_thread_cnt = 2; + std::unique_ptr threads(new std::thread[total_thread_cnt]); + threads[0] = std::thread(run_producer, q, producer_pass_count); + threads[1] = std::thread(run_consumer, q); + for (size_t i = 0; i < total_thread_cnt; i++) { + threads[i].join(); + } + } +}; + +atomic_int RigtorpSPSCQueueTest_Parallel::producer_num; +size_t RigtorpSPSCQueueTest_Parallel::s_nRigtorpSPSCQueuePassCount; +size_t RigtorpSPSCQueueTest_Parallel::s_nRigtorpSPSCQueueCapacity; + +TEST_F(RigtorpSPSCQueueTest_Parallel, PushPop) { + std::unique_ptr> q( + new rigtorp::SPSCQueue(s_nRigtorpSPSCQueueCapacity)); + RigtorpSPSCThreading(q.get(), s_nRigtorpSPSCQueuePassCount); +} + +} // namespace