From c2f542090dfff67b49506a72e69858a482578db2 Mon Sep 17 00:00:00 2001 From: Peizhao Ou Date: Mon, 12 Feb 2018 22:15:15 -0800 Subject: [PATCH] Adds folly queue parallel test cases --- .../stress-parallel-folly-queue.cpp | 358 ++++++++++-------- .../stress-parallel-folly-sync.cpp | 29 +- 2 files changed, 221 insertions(+), 166 deletions(-) diff --git a/folly/stress-test/stress-parallel-folly-queue.cpp b/folly/stress-test/stress-parallel-folly-queue.cpp index 115c5231..ba6ee797 100644 --- a/folly/stress-test/stress-parallel-folly-queue.cpp +++ b/folly/stress-test/stress-parallel-folly-queue.cpp @@ -1,189 +1,247 @@ -#include -#include -#include -#include - -#include - -#include - -namespace { - -// Unbounded queue -size_t kUnboundedQueueEnqueueStride = 10000; -size_t kUSPSCQueueEnqueueCount = 1200000000; -size_t kUMPSCQueueEnqueueCount = 320000000; -size_t kUSPMCQueueEnqueueCount = 320000000; -size_t kUMPMCQueueEnqueueCount = 320000000; - -typedef folly::USPSCQueue USPSCQueue; -typedef folly::UMPSCQueue UMPSCQueue; -typedef folly::USPMCQueue USPMCQueue; -typedef folly::UMPMCQueue UMPMCQueue; - -// Dynamic bounded queue -size_t kDynamicBoundedQueueEnqueueStride = 50000; -size_t kDynamicBoundedQueueCapacity = 200000; -size_t kDSPSCQueueEnqueueCount = 1200000000; -size_t kDMPSCQueueEnqueueCount = 320000000; -size_t kDSPMCQueueEnqueueCount = 320000000; -size_t kDMPMCQueueEnqueueCount = 320000000; - -typedef folly::DSPSCQueue DSPSCQueue; -typedef folly::DMPSCQueue DMPSCQueue; -typedef folly::DSPMCQueue DSPMCQueue; -typedef folly::DMPMCQueue DMPMCQueue; +#include "queue_test.h" + +namespace folly_test { + +class FollyQueueEnqueueDequeueTest_Parallel : public cds_test::stress_fixture { +protected: + static std::atomic_int producer_num; + + // The milliseconds for consumers to wait when a failed try_dequeue happens. + static unsigned s_nConsumerWaitTime; + // For MPMC, half of the threads are producers, and the rest are consumers. + static unsigned s_nThreadCount; + // Unbounded queue + static size_t s_nUnboundedQueueEnqueueStride; + static size_t s_nUSPSCQueueEnqueueCount; + static size_t s_nUMPSCQueueEnqueueCount; + static size_t s_nUSPMCQueueEnqueueCount; + static size_t s_nUMPMCQueueEnqueueCount; + + // Dynamic bounded queue + static size_t s_nDynamicBoundedQueueEnqueueStride; + static size_t s_nDynamicBoundedQueueCapacity; + static size_t s_nDSPSCQueueEnqueueCount; + static size_t s_nDMPSCQueueEnqueueCount; + static size_t s_nDSPMCQueueEnqueueCount; + static size_t s_nDMPMCQueueEnqueueCount; + + // AtomicLinkedList + static size_t s_nAtomicLinkedListPassCount; + + // MPMC Queue (linearizable) + static size_t s_nMPMCQueueEnqueueStride; + static size_t s_nMPMCQueueCapacity; + static size_t s_nMPMCQueueEnqueueCount; + + static void SetUpTestCase() { + const cds_test::config &cfg = get_config("ParallelFollyQueue"); + // Unbounded queue + GetConfigNonZeroExpected(ConsumerWaitTime, 200); + GetConfigNonZeroExpected(ThreadCount, 4); + GetConfigNonZeroExpected(UnboundedQueueEnqueueStride, 10000); + GetConfigNonZeroExpected(USPSCQueueEnqueueCount, 1200000000); + GetConfigNonZeroExpected(UMPSCQueueEnqueueCount, 320000000); + GetConfigNonZeroExpected(USPMCQueueEnqueueCount, 320000000); + GetConfigNonZeroExpected(UMPMCQueueEnqueueCount, 320000000); + // Dynamic bounded queue + GetConfigNonZeroExpected(DynamicBoundedQueueEnqueueStride, 50000); + GetConfigNonZeroExpected(DynamicBoundedQueueCapacity, 200000); + GetConfigNonZeroExpected(DSPSCQueueEnqueueCount, 1200000000); + GetConfigNonZeroExpected(DMPSCQueueEnqueueCount, 320000000); + GetConfigNonZeroExpected(DSPMCQueueEnqueueCount, 320000000); + GetConfigNonZeroExpected(DMPMCQueueEnqueueCount, 320000000); + // AtomicLinkedList + GetConfigNonZeroExpected(AtomicLinkedListPassCount, 10000); + // MPMC Queue (linearizable) + GetConfigNonZeroExpected(MPMCQueueEnqueueStride, 10000); + GetConfigNonZeroExpected(MPMCQueueCapacity, 50000); + GetConfigNonZeroExpected(MPMCQueueEnqueueCount, 500000000); + } -// AtomicLinkedList -size_t kAtomicLinkedListSize = 50000; -size_t kAtomicLinkedListPassCount = 10000; -typedef folly::AtomicLinkedList AtomicLinkedList; + template + static void general_enqueue(Queue *q, const Type &elem) { + q->enqueue(elem); + } -// MPMC Queue (linearizable) -size_t kMPMCQueueEnqueueStride = 10000; -size_t kMPMCQueueCapacity = 50000; -size_t kMPMCQueueEnqueueCount = 500000000; -typedef folly::MPMCQueue MPMCQueue; + template + static bool general_try_dequeue(Queue *q, Type &result) { + return q->try_dequeue(result); + } -} + // MPMC Specialization. + template + static void general_enqueue(MPMCQueue *q, const Type &elem) { + EXPECT_TRUE(q->write(elem)); + } -void run_atomic_linkedlist() { - for (size_t pass = 0; pass < kAtomicLinkedListPassCount; pass++) { - std::unique_ptr list(new AtomicLinkedList()); - bool in_order = true; - for (size_t i = 0; i < kAtomicLinkedListSize; i++) { - list->insertHead(i); - } - size_t nSum = 0; - auto func = [&nSum] (size_t elem) { nSum += elem; }; - if (in_order) { - list->sweep(func); - } else { - list->reverseSweep(func); - } - in_order = !in_order; + template + static bool general_try_dequeue(MPMCQueue *q, Type &result) { + return q->read(result); + } - size_t supposed_sum = kAtomicLinkedListSize * (kAtomicLinkedListSize - 1) / 2; - EXPECT_EQ(nSum, supposed_sum); + // AtomicLinkedList Specialization. + template + static void general_enqueue(AtomicLinkedList *q, const Type &elem) { + q->insertHead(elem); } -} -template -void run_queue(Queue* q, size_t enqueue_count, size_t enqueue_stride) { - size_t nNo = 0; - size_t pop_sum = 0; - while (nNo < enqueue_count) { - size_t curr_push_count = - std::min(enqueue_count - nNo, enqueue_stride); - for (size_t i = 0; i < curr_push_count; i++) { - q->enqueue(nNo++); - } - size_t res; - for (size_t i = 0; i < curr_push_count; i++) { - q->dequeue(res); - pop_sum += res; + template + static void run_producer(Queue *q, size_t enqueue_count) { + for (size_t i = 0; i < enqueue_count; i++) { + size_t elem_to_push = rand(enqueue_count); + if (!elem_to_push) { + elem_to_push++; } + general_enqueue(q, elem_to_push); } - size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2; - EXPECT_EQ (pop_sum, supposed_sum); -} + producer_num.fetch_sub(1, std::memory_order_release); + } -// MPMC Specialization. -template <> -void run_queue(MPMCQueue* q, size_t enqueue_count, size_t enqueue_stride) { - size_t nNo = 0; - size_t push_sum = 0; - size_t pop_sum = 0; - while (nNo < enqueue_count) { - size_t curr_push_count = - std::min(enqueue_count - nNo, enqueue_stride); - for (size_t i = 0; i < curr_push_count; i++) { - if (q->write(nNo)) { - push_sum += nNo; - nNo++; + template static void run_consumer(Queue *q) { + size_t dequeue_sum = 0; + size_t result = 0; + while (true) { + if (!general_try_dequeue(q, result)) { + if (producer_num.load(std::memory_order_acquire) == 0) { + if (!general_try_dequeue(q, result)) { + // If all producers are done and we still dequeue to nothing, + // the consumer quits. + break; + } } } - size_t res; - while (q->read(res)) { - pop_sum += res; + dequeue_sum += result; + } + EXPECT_GT(dequeue_sum, 0); + } + + template + static void FollyMPMCThreading(QueueType *q, size_t producer_cnt, + size_t producer_pass_count, + size_t consumer_cnt) { + producer_num.store(producer_cnt, std::memory_order_relaxed); + size_t total_thread_cnt = producer_cnt + consumer_cnt; + std::unique_ptr threads(new std::thread[total_thread_cnt]); + for (size_t i = 0; i < total_thread_cnt; i++) { + if (i < producer_cnt) { + threads[i] = + std::thread(run_producer, q, producer_pass_count); + } else { + threads[i] = std::thread(run_consumer, q); } } + for (size_t i = 0; i < total_thread_cnt; i++) { + threads[i].join(); + } + } +}; - size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2; - EXPECT_EQ(pop_sum, supposed_sum); +// Specialization for AtomicLinkedList +template <> +void FollyQueueEnqueueDequeueTest_Parallel::run_consumer(AtomicLinkedList *q) { + size_t dequeue_sum = 0; + auto func = [&dequeue_sum](size_t elem) { dequeue_sum += elem; }; + while (true) { + q->sweep(func); + if (producer_num.load(std::memory_order_acquire) == 0) { + q->sweep(func); + // If all producers are done and we still dequeue to nothing, + // the consumer quits. + break; + } + } + EXPECT_GT(dequeue_sum, 0); } -template -void run_without_initial_capacity(size_t enqueue_count, size_t enqueue_stride) { +std::atomic_int FollyQueueEnqueueDequeueTest_Parallel::producer_num; + +unsigned FollyQueueEnqueueDequeueTest_Parallel::s_nConsumerWaitTime; +unsigned FollyQueueEnqueueDequeueTest_Parallel::s_nThreadCount; +// Unbounded queue +size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUnboundedQueueEnqueueStride; +size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUSPSCQueueEnqueueCount; +size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUMPSCQueueEnqueueCount; +size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUSPMCQueueEnqueueCount; +size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUMPMCQueueEnqueueCount; +// Dynamic bounded queue +size_t + FollyQueueEnqueueDequeueTest_Parallel::s_nDynamicBoundedQueueEnqueueStride; +size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDynamicBoundedQueueCapacity; +size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDSPSCQueueEnqueueCount; +size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDMPSCQueueEnqueueCount; +size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDSPMCQueueEnqueueCount; +size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDMPMCQueueEnqueueCount; +// AtomicLinkedList +size_t FollyQueueEnqueueDequeueTest_Parallel::s_nAtomicLinkedListPassCount; +// MPMC Queue (linearizable) +size_t FollyQueueEnqueueDequeueTest_Parallel::s_nMPMCQueueEnqueueStride; +size_t FollyQueueEnqueueDequeueTest_Parallel::s_nMPMCQueueCapacity; +size_t FollyQueueEnqueueDequeueTest_Parallel::s_nMPMCQueueEnqueueCount; + +// Used as a MPSC queue. +TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyAtomicLinkedList) { + typedef AtomicLinkedList Queue; std::unique_ptr q(new Queue()); - run_queue(q.get(), enqueue_count, enqueue_stride); + FollyMPMCThreading(q.get(), s_nThreadCount - 1, s_nAtomicLinkedListPassCount, + 1); } -template -void run_with_initial_capacity(size_t queue_capacity, size_t enqueue_count, - size_t enqueue_stride) { - std::unique_ptr q(new Queue(queue_capacity)); - run_queue(q.get(), enqueue_count, enqueue_stride); +TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyMPMCQueue) { + typedef MPMCQueue Queue; + std::unique_ptr q(new Queue(s_nMPMCQueueCapacity)); + FollyMPMCThreading(q.get(), s_nThreadCount / 2, s_nMPMCQueueEnqueueCount, + s_nThreadCount - s_nThreadCount / 2); } -class FollyQueueEnqueueDequeueTest : public ::testing::Test { - -}; - -TEST_F(FollyQueueEnqueueDequeueTest, FollyMPMCQueue) { - run_with_initial_capacity( - kMPMCQueueCapacity, kMPMCQueueEnqueueCount, kMPMCQueueEnqueueStride); +TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_SPSC) { + typedef DSPSCQueue Queue; + std::unique_ptr q(new Queue(s_nDynamicBoundedQueueCapacity)); + FollyMPMCThreading(q.get(), 1, s_nDSPSCQueueEnqueueCount, 1); } -TEST_F(FollyQueueEnqueueDequeueTest, FollyAtomicLinkedList) { - run_atomic_linkedlist(); +TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_MPSC) { + typedef DMPSCQueue Queue; + std::unique_ptr q(new Queue(s_nDynamicBoundedQueueCapacity)); + FollyMPMCThreading(q.get(), s_nThreadCount - 1, s_nDMPSCQueueEnqueueCount, 1); } -TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_SPSC) { - run_without_initial_capacity( - kUSPSCQueueEnqueueCount, kUnboundedQueueEnqueueStride); +TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_SPMC) { + typedef DSPMCQueue Queue; + std::unique_ptr q(new Queue(s_nDynamicBoundedQueueCapacity)); + FollyMPMCThreading(q.get(), 1, s_nDSPMCQueueEnqueueCount, s_nThreadCount - 1); } -TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_MPSC) { - run_without_initial_capacity( - kUMPSCQueueEnqueueCount, kUnboundedQueueEnqueueStride); +TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_MPMC) { + typedef DMPMCQueue Queue; + std::unique_ptr q(new Queue(s_nDynamicBoundedQueueCapacity)); + FollyMPMCThreading(q.get(), s_nThreadCount / 2, s_nDMPMCQueueEnqueueCount, + s_nThreadCount - s_nThreadCount / 2); } -TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_SPMC) { - run_without_initial_capacity( - kUSPMCQueueEnqueueCount, kUnboundedQueueEnqueueStride); +TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_SPSC) { + typedef USPSCQueue Queue; + std::unique_ptr q(new Queue()); + FollyMPMCThreading(q.get(), 1, s_nUSPSCQueueEnqueueCount, 1); } -TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_MPMC) { - run_without_initial_capacity( - kUMPMCQueueEnqueueCount, - kUnboundedQueueEnqueueStride); +TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_MPSC) { + typedef UMPSCQueue Queue; + std::unique_ptr q(new Queue()); + FollyMPMCThreading(q.get(), s_nThreadCount - 1, s_nUMPSCQueueEnqueueCount, 1); } -TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_SPSC) { - // DynamicBoundedQueue - run_with_initial_capacity( - kDynamicBoundedQueueCapacity, - kDSPSCQueueEnqueueCount, kDynamicBoundedQueueEnqueueStride); +TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_SPMC) { + typedef USPMCQueue Queue; + std::unique_ptr q(new Queue()); + FollyMPMCThreading(q.get(), 1, s_nUSPMCQueueEnqueueCount, s_nThreadCount - 1); } -TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_MPSC) { - run_with_initial_capacity( - kDynamicBoundedQueueCapacity, - kDMPSCQueueEnqueueCount, - kDynamicBoundedQueueEnqueueStride); -} +TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_MPMC) { + typedef UMPMCQueue Queue; -TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_SPMC) { - run_with_initial_capacity( - kDynamicBoundedQueueCapacity, - kDSPMCQueueEnqueueCount, - kDynamicBoundedQueueEnqueueStride); + std::unique_ptr q(new Queue()); + FollyMPMCThreading(q.get(), s_nThreadCount / 2, s_nUMPMCQueueEnqueueCount, + s_nThreadCount - s_nThreadCount / 2); } -TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_MPMC) { - run_with_initial_capacity( - kDynamicBoundedQueueCapacity, - kDMPMCQueueEnqueueCount, - kDynamicBoundedQueueEnqueueStride); -} +} // namespace folly_test diff --git a/folly/stress-test/stress-parallel-folly-sync.cpp b/folly/stress-test/stress-parallel-folly-sync.cpp index addc92ce..af820c7f 100644 --- a/folly/stress-test/stress-parallel-folly-sync.cpp +++ b/folly/stress-test/stress-parallel-folly-sync.cpp @@ -11,7 +11,7 @@ protected: // For RCU, we mostly want to benchmark the readers (cause it's designed for // very fast readers and occasional writers). We have a writer thread that // runs nonstop until all other reader threads are done. - static std::atomic_bool rcu_readers_done; + static std::atomic_uint rcu_readers_num; // MicroLock static size_t s_nMicroLockPassCount; // MicroSpinLock @@ -55,7 +55,7 @@ protected: } static void run_rcu_writer_sync() { - while (!rcu_readers_done.load(std::memory_order_acquire)) { + while (rcu_readers_num.load(std::memory_order_acquire) > 0) { auto *old_data = rcu_data.load(std::memory_order_relaxed); auto *new_data = new RcuData(*old_data); new_data->d1++; @@ -69,7 +69,7 @@ protected: } static void run_rcu_writer_no_sync() { - while (!rcu_readers_done.load(std::memory_order_acquire)) { + while (rcu_readers_num.load(std::memory_order_acquire) > 0) { auto *old_data = rcu_data.load(std::memory_order_relaxed); auto *new_data = new RcuData(*old_data); new_data->d1++; @@ -88,7 +88,7 @@ protected: auto *data = rcu_data.load(std::memory_order_relaxed); sum += (data->d1 + data->d2); } - std::cout << "Reader done" << std::endl; + rcu_readers_num.fetch_sub(1, std::memory_order_release); // Just want to simulate the reading. EXPECT_GT(sum, 0); } @@ -133,20 +133,17 @@ protected: template static void FollyRcuThreading(WriterFunc writer_func) { + rcu_readers_num.store(s_nThreadCount - 1, std::memory_order_release); + + std::unique_ptr threads(new std::thread[s_nThreadCount]); // One of the threads is a writer. - size_t reader_thrd_cnt = s_nThreadCount - 1; - rcu_readers_done.store(false, std::memory_order_release); - std::unique_ptr reader_threads( - new std::thread[reader_thrd_cnt]); - std::thread writer_thread(writer_func); - for (size_t i = 0; i < reader_thrd_cnt; i++) { - reader_threads[i] = std::thread(run_rcu_reader, s_nRcuReaderPassCount); + threads[0] = std::thread(writer_func); + for (size_t i = 1; i < s_nThreadCount; i++) { + threads[i] = std::thread(run_rcu_reader, s_nRcuReaderPassCount); } - for (size_t i = 0; i < reader_thrd_cnt; i++) { - reader_threads[i].join(); + for (size_t i = 0; i < s_nThreadCount; i++) { + threads[i].join(); } - rcu_readers_done.store(true, std::memory_order_release); - writer_thread.join(); } template @@ -169,7 +166,7 @@ protected: size_t FollySyncTest_Parallel::locked_data; std::atomic FollySyncTest_Parallel::rcu_data; -std::atomic_bool FollySyncTest_Parallel::rcu_readers_done; +std::atomic_uint FollySyncTest_Parallel::rcu_readers_num; size_t FollySyncTest_Parallel::s_nThreadCount; size_t FollySyncTest_Parallel::s_nMicroLockPassCount; size_t FollySyncTest_Parallel::s_nMicroSpinLockPassCount; -- 2.34.1