X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fstress-test%2Fstress-parallel-folly-queue.cpp;fp=folly%2Fstress-test%2Fstress-parallel-folly-queue.cpp;h=70bcb15d5641715786635d5a7861c6af89ed35fc;hp=0000000000000000000000000000000000000000;hb=930c75ffee7931cb06e8c5fa99779c2d67bd2cb1;hpb=614a2213b3d18c09b4fba87795fe5c9923acb085 diff --git a/folly/stress-test/stress-parallel-folly-queue.cpp b/folly/stress-test/stress-parallel-folly-queue.cpp new file mode 100644 index 00000000..70bcb15d --- /dev/null +++ b/folly/stress-test/stress-parallel-folly-queue.cpp @@ -0,0 +1,196 @@ +#include +#include +#include +#include + +#include + +#include + +namespace { + +// Unbounded queue +size_t kUnboundedQueueEnqueueStride = 10000; +size_t kUSPSCQueueEnqueueCount = 1200000000; +size_t kUMPSCQueueEnqueueCount = 320000000; +size_t kUSPMCQueueEnqueueCount = 320000000; +size_t kUMPMCQueueEnqueueCount = 320000000; + +typedef folly::USPSCQueue USPSCQueue; +typedef folly::UMPSCQueue UMPSCQueue; +typedef folly::USPMCQueue USPMCQueue; +typedef folly::UMPMCQueue UMPMCQueue; + +// Dynamic bounded queue +size_t kDynamicBoundedQueueEnqueueStride = 50000; +size_t kDynamicBoundedQueueCapacity = 200000; +size_t kDSPSCQueueEnqueueCount = 1200000000; +size_t kDMPSCQueueEnqueueCount = 320000000; +size_t kDSPMCQueueEnqueueCount = 320000000; +size_t kDMPMCQueueEnqueueCount = 320000000; + +typedef folly::DSPSCQueue DSPSCQueue; +typedef folly::DMPSCQueue DMPSCQueue; +typedef folly::DSPMCQueue DSPMCQueue; +typedef folly::DMPMCQueue DMPMCQueue; + +// AtomicLinkedList +size_t kAtomicLinkedListSize = 50000; +size_t kAtomicLinkedListPassCount = 10000; +typedef folly::AtomicLinkedList AtomicLinkedList; + +// MPMC Queue (linearizable) +size_t kMPMCQueueEnqueueStride = 10000; +size_t kMPMCQueueCapacity = 50000; +size_t kMPMCQueueEnqueueCount = 500000000; +typedef folly::MPMCQueue MPMCQueue; + +} + +void run_atomic_linkedlist() { + for (size_t pass = 0; pass < kAtomicLinkedListPassCount; pass++) { + std::unique_ptr list(new AtomicLinkedList()); + bool in_order = true; + for (size_t i = 0; i < kAtomicLinkedListSize; i++) { + list->insertHead(i); + } + size_t nSum = 0; + auto func = [&nSum] (size_t elem) { nSum += elem; }; + if (in_order) { + list->sweep(func); + } else { + list->reverseSweep(func); + } + in_order = !in_order; + + size_t supposed_sum = kAtomicLinkedListSize * (kAtomicLinkedListSize - 1) / 2; + EXPECT_EQ(nSum, supposed_sum); + } +} + +template +void run_queue(Queue* q, size_t enqueue_count, size_t enqueue_stride) { + size_t nNo = 0; + size_t pop_sum = 0; + while (nNo < enqueue_count) { + size_t curr_push_count = + std::min(enqueue_count - nNo, enqueue_stride); + for (size_t i = 0; i < curr_push_count; i++) { + q->enqueue(nNo++); + } + size_t res; + for (size_t i = 0; i < curr_push_count; i++) { + q->dequeue(res); + pop_sum += res; + } + } + size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2; + EXPECT_EQ (pop_sum, supposed_sum); +} + +// MPMC Specialization. +template <> +void run_queue(MPMCQueue* q, size_t enqueue_count, size_t enqueue_stride) { + size_t nNo = 0; + size_t push_sum = 0; + size_t pop_sum = 0; + while (nNo < enqueue_count) { + size_t curr_push_count = + std::min(enqueue_count - nNo, enqueue_stride); + for (size_t i = 0; i < curr_push_count; i++) { + if (q->write(nNo)) { + push_sum += nNo; + nNo++; + } + } + size_t res; + while (q->read(res)) { + pop_sum += res; + } + } + + size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2; + EXPECT_EQ(pop_sum, supposed_sum); +} + +template +void run_without_initial_capacity(size_t enqueue_count, size_t enqueue_stride) { + std::unique_ptr q(new Queue()); + run_queue(q.get(), enqueue_count, enqueue_stride); +} + +template +void run_with_initial_capacity(size_t queue_capacity, size_t enqueue_count, + size_t enqueue_stride) { + std::unique_ptr q(new Queue(queue_capacity)); + run_queue(q.get(), enqueue_count, enqueue_stride); +} + +class FollyQueueEnqueueDequeueTest : public ::testing::Test { + +}; + +TEST_F(FollyQueueEnqueueDequeueTest, FollyMPMCQueue) { + run_with_initial_capacity( + kMPMCQueueCapacity, kMPMCQueueEnqueueCount, kMPMCQueueEnqueueStride); +} + +TEST_F(FollyQueueEnqueueDequeueTest, FollyAtomicLinkedList) { + run_atomic_linkedlist(); +} + +TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_SPSC) { + run_without_initial_capacity( + kUSPSCQueueEnqueueCount, kUnboundedQueueEnqueueStride); +} + +TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_MPSC) { + run_without_initial_capacity( + kUMPSCQueueEnqueueCount, kUnboundedQueueEnqueueStride); +} + +TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_SPMC) { + run_without_initial_capacity( + kUSPMCQueueEnqueueCount, kUnboundedQueueEnqueueStride); +} + +TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_MPMC) { + run_without_initial_capacity( + kUMPMCQueueEnqueueCount, + kUnboundedQueueEnqueueStride); +} + +TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_SPSC) { + // DynamicBoundedQueue + run_with_initial_capacity( + kDynamicBoundedQueueCapacity, + kDSPSCQueueEnqueueCount, kDynamicBoundedQueueEnqueueStride); +} + +TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_MPSC) { + run_with_initial_capacity( + kDynamicBoundedQueueCapacity, + kDMPSCQueueEnqueueCount, + kDynamicBoundedQueueEnqueueStride); +} + +TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_SPMC) { + run_with_initial_capacity( + kDynamicBoundedQueueCapacity, + kDSPMCQueueEnqueueCount, + kDynamicBoundedQueueEnqueueStride); +} + +TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_MPMC) { + run_with_initial_capacity( + kDynamicBoundedQueueCapacity, + kDMPMCQueueEnqueueCount, + kDynamicBoundedQueueEnqueueStride); +} + +int main(int argc, char** argv) { + // Init Google test + ::testing::InitGoogleTest(&argc, argv); + int result = RUN_ALL_TESTS(); + return result; +}