-#include <folly/concurrency/UnboundedQueue.h>
-#include <folly/concurrency/DynamicBoundedQueue.h>
-#include <folly/AtomicLinkedList.h>
-#include <folly/MPMCQueue.h>
-
-#include <gtest/gtest.h>
-
-#include <memory>
-
-namespace {
-
-// Unbounded queue
-size_t kUnboundedQueueEnqueueStride = 10000;
-size_t kUSPSCQueueEnqueueCount = 1200000000;
-size_t kUMPSCQueueEnqueueCount = 320000000;
-size_t kUSPMCQueueEnqueueCount = 320000000;
-size_t kUMPMCQueueEnqueueCount = 320000000;
-
-typedef folly::USPSCQueue<size_t, false> USPSCQueue;
-typedef folly::UMPSCQueue<size_t, false> UMPSCQueue;
-typedef folly::USPMCQueue<size_t, false> USPMCQueue;
-typedef folly::UMPMCQueue<size_t, false> UMPMCQueue;
-
-// Dynamic bounded queue
-size_t kDynamicBoundedQueueEnqueueStride = 50000;
-size_t kDynamicBoundedQueueCapacity = 200000;
-size_t kDSPSCQueueEnqueueCount = 1200000000;
-size_t kDMPSCQueueEnqueueCount = 320000000;
-size_t kDSPMCQueueEnqueueCount = 320000000;
-size_t kDMPMCQueueEnqueueCount = 320000000;
-
-typedef folly::DSPSCQueue<size_t, false> DSPSCQueue;
-typedef folly::DMPSCQueue<size_t, false> DMPSCQueue;
-typedef folly::DSPMCQueue<size_t, false> DSPMCQueue;
-typedef folly::DMPMCQueue<size_t, false> DMPMCQueue;
+#include "queue_test.h"
+
+namespace folly_test {
+
+class FollyQueueEnqueueDequeueTest_Parallel : public cds_test::stress_fixture {
+protected:
+ static std::atomic_int producer_num;
+
+ // The milliseconds for consumers to wait when a failed try_dequeue happens.
+ static unsigned s_nConsumerWaitTime;
+ // For MPMC, half of the threads are producers, and the rest are consumers.
+ static unsigned s_nThreadCount;
+ // Unbounded queue
+ static size_t s_nUnboundedQueueEnqueueStride;
+ static size_t s_nUSPSCQueueEnqueueCount;
+ static size_t s_nUMPSCQueueEnqueueCount;
+ static size_t s_nUSPMCQueueEnqueueCount;
+ static size_t s_nUMPMCQueueEnqueueCount;
+
+ // Dynamic bounded queue
+ static size_t s_nDynamicBoundedQueueEnqueueStride;
+ static size_t s_nDynamicBoundedQueueCapacity;
+ static size_t s_nDSPSCQueueEnqueueCount;
+ static size_t s_nDMPSCQueueEnqueueCount;
+ static size_t s_nDSPMCQueueEnqueueCount;
+ static size_t s_nDMPMCQueueEnqueueCount;
+
+ // AtomicLinkedList
+ static size_t s_nAtomicLinkedListPassCount;
+
+ // MPMC Queue (linearizable)
+ static size_t s_nMPMCQueueEnqueueStride;
+ static size_t s_nMPMCQueueCapacity;
+ static size_t s_nMPMCQueueEnqueueCount;
+
+ static void SetUpTestCase() {
+ const cds_test::config &cfg = get_config("ParallelFollyQueue");
+ // Unbounded queue
+ GetConfigNonZeroExpected(ConsumerWaitTime, 200);
+ GetConfigNonZeroExpected(ThreadCount, 4);
+ GetConfigNonZeroExpected(UnboundedQueueEnqueueStride, 10000);
+ GetConfigNonZeroExpected(USPSCQueueEnqueueCount, 1200000000);
+ GetConfigNonZeroExpected(UMPSCQueueEnqueueCount, 320000000);
+ GetConfigNonZeroExpected(USPMCQueueEnqueueCount, 320000000);
+ GetConfigNonZeroExpected(UMPMCQueueEnqueueCount, 320000000);
+ // Dynamic bounded queue
+ GetConfigNonZeroExpected(DynamicBoundedQueueEnqueueStride, 50000);
+ GetConfigNonZeroExpected(DynamicBoundedQueueCapacity, 200000);
+ GetConfigNonZeroExpected(DSPSCQueueEnqueueCount, 1200000000);
+ GetConfigNonZeroExpected(DMPSCQueueEnqueueCount, 320000000);
+ GetConfigNonZeroExpected(DSPMCQueueEnqueueCount, 320000000);
+ GetConfigNonZeroExpected(DMPMCQueueEnqueueCount, 320000000);
+ // AtomicLinkedList
+ GetConfigNonZeroExpected(AtomicLinkedListPassCount, 10000);
+ // MPMC Queue (linearizable)
+ GetConfigNonZeroExpected(MPMCQueueEnqueueStride, 10000);
+ GetConfigNonZeroExpected(MPMCQueueCapacity, 50000);
+ GetConfigNonZeroExpected(MPMCQueueEnqueueCount, 500000000);
+ }
-// AtomicLinkedList
-size_t kAtomicLinkedListSize = 50000;
-size_t kAtomicLinkedListPassCount = 10000;
-typedef folly::AtomicLinkedList<size_t> AtomicLinkedList;
+ template <typename Queue, typename Type>
+ static void general_enqueue(Queue *q, const Type &elem) {
+ q->enqueue(elem);
+ }
-// MPMC Queue (linearizable)
-size_t kMPMCQueueEnqueueStride = 10000;
-size_t kMPMCQueueCapacity = 50000;
-size_t kMPMCQueueEnqueueCount = 500000000;
-typedef folly::MPMCQueue<size_t> MPMCQueue;
+ template <typename Queue, typename Type>
+ static bool general_try_dequeue(Queue *q, Type &result) {
+ return q->try_dequeue(result);
+ }
-}
+ // MPMC Specialization.
+ template <typename Type>
+ static void general_enqueue(MPMCQueue *q, const Type &elem) {
+ EXPECT_TRUE(q->write(elem));
+ }
-void run_atomic_linkedlist() {
- for (size_t pass = 0; pass < kAtomicLinkedListPassCount; pass++) {
- std::unique_ptr<AtomicLinkedList> list(new AtomicLinkedList());
- bool in_order = true;
- for (size_t i = 0; i < kAtomicLinkedListSize; i++) {
- list->insertHead(i);
- }
- size_t nSum = 0;
- auto func = [&nSum] (size_t elem) { nSum += elem; };
- if (in_order) {
- list->sweep(func);
- } else {
- list->reverseSweep(func);
- }
- in_order = !in_order;
+ template <typename Type>
+ static bool general_try_dequeue(MPMCQueue *q, Type &result) {
+ return q->read(result);
+ }
- size_t supposed_sum = kAtomicLinkedListSize * (kAtomicLinkedListSize - 1) / 2;
- EXPECT_EQ(nSum, supposed_sum);
+ // AtomicLinkedList Specialization.
+ template <typename Type>
+ static void general_enqueue(AtomicLinkedList *q, const Type &elem) {
+ q->insertHead(elem);
}
-}
-template <typename Queue>
-void run_queue(Queue* q, size_t enqueue_count, size_t enqueue_stride) {
- size_t nNo = 0;
- size_t pop_sum = 0;
- while (nNo < enqueue_count) {
- size_t curr_push_count =
- std::min(enqueue_count - nNo, enqueue_stride);
- for (size_t i = 0; i < curr_push_count; i++) {
- q->enqueue(nNo++);
- }
- size_t res;
- for (size_t i = 0; i < curr_push_count; i++) {
- q->dequeue(res);
- pop_sum += res;
+ template <typename Queue>
+ static void run_producer(Queue *q, size_t enqueue_count) {
+ for (size_t i = 0; i < enqueue_count; i++) {
+ size_t elem_to_push = rand(enqueue_count);
+ if (!elem_to_push) {
+ elem_to_push++;
}
+ general_enqueue(q, elem_to_push);
}
- size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
- EXPECT_EQ (pop_sum, supposed_sum);
-}
+ producer_num.fetch_sub(1, std::memory_order_release);
+ }
-// MPMC Specialization.
-template <>
-void run_queue(MPMCQueue* q, size_t enqueue_count, size_t enqueue_stride) {
- size_t nNo = 0;
- size_t push_sum = 0;
- size_t pop_sum = 0;
- while (nNo < enqueue_count) {
- size_t curr_push_count =
- std::min(enqueue_count - nNo, enqueue_stride);
- for (size_t i = 0; i < curr_push_count; i++) {
- if (q->write(nNo)) {
- push_sum += nNo;
- nNo++;
+ template <typename Queue> static void run_consumer(Queue *q) {
+ size_t dequeue_sum = 0;
+ size_t result = 0;
+ while (true) {
+ if (!general_try_dequeue(q, result)) {
+ if (producer_num.load(std::memory_order_acquire) == 0) {
+ if (!general_try_dequeue(q, result)) {
+ // If all producers are done and we still dequeue to nothing,
+ // the consumer quits.
+ break;
+ }
}
}
- size_t res;
- while (q->read(res)) {
- pop_sum += res;
+ dequeue_sum += result;
+ }
+ EXPECT_GT(dequeue_sum, 0);
+ }
+
+ template <typename QueueType>
+ static void FollyMPMCThreading(QueueType *q, size_t producer_cnt,
+ size_t producer_pass_count,
+ size_t consumer_cnt) {
+ producer_num.store(producer_cnt, std::memory_order_relaxed);
+ size_t total_thread_cnt = producer_cnt + consumer_cnt;
+ std::unique_ptr<std::thread[]> threads(new std::thread[total_thread_cnt]);
+ for (size_t i = 0; i < total_thread_cnt; i++) {
+ if (i < producer_cnt) {
+ threads[i] =
+ std::thread(run_producer<QueueType>, q, producer_pass_count);
+ } else {
+ threads[i] = std::thread(run_consumer<QueueType>, q);
}
}
+ for (size_t i = 0; i < total_thread_cnt; i++) {
+ threads[i].join();
+ }
+ }
+};
- size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
- EXPECT_EQ(pop_sum, supposed_sum);
+// Specialization for AtomicLinkedList
+template <>
+void FollyQueueEnqueueDequeueTest_Parallel::run_consumer(AtomicLinkedList *q) {
+ size_t dequeue_sum = 0;
+ auto func = [&dequeue_sum](size_t elem) { dequeue_sum += elem; };
+ while (true) {
+ q->sweep(func);
+ if (producer_num.load(std::memory_order_acquire) == 0) {
+ q->sweep(func);
+ // If all producers are done and we still dequeue to nothing,
+ // the consumer quits.
+ break;
+ }
+ }
+ EXPECT_GT(dequeue_sum, 0);
}
-template <typename Queue>
-void run_without_initial_capacity(size_t enqueue_count, size_t enqueue_stride) {
+std::atomic_int FollyQueueEnqueueDequeueTest_Parallel::producer_num;
+
+unsigned FollyQueueEnqueueDequeueTest_Parallel::s_nConsumerWaitTime;
+unsigned FollyQueueEnqueueDequeueTest_Parallel::s_nThreadCount;
+// Unbounded queue
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUnboundedQueueEnqueueStride;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUSPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUMPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUSPMCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nUMPMCQueueEnqueueCount;
+// Dynamic bounded queue
+size_t
+ FollyQueueEnqueueDequeueTest_Parallel::s_nDynamicBoundedQueueEnqueueStride;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDynamicBoundedQueueCapacity;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDSPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDMPSCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDSPMCQueueEnqueueCount;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nDMPMCQueueEnqueueCount;
+// AtomicLinkedList
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nAtomicLinkedListPassCount;
+// MPMC Queue (linearizable)
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nMPMCQueueEnqueueStride;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nMPMCQueueCapacity;
+size_t FollyQueueEnqueueDequeueTest_Parallel::s_nMPMCQueueEnqueueCount;
+
+// Used as a MPSC queue.
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyAtomicLinkedList) {
+ typedef AtomicLinkedList Queue;
std::unique_ptr<Queue> q(new Queue());
- run_queue(q.get(), enqueue_count, enqueue_stride);
+ FollyMPMCThreading(q.get(), s_nThreadCount - 1, s_nAtomicLinkedListPassCount,
+ 1);
}
-template <typename Queue>
-void run_with_initial_capacity(size_t queue_capacity, size_t enqueue_count,
- size_t enqueue_stride) {
- std::unique_ptr<Queue> q(new Queue(queue_capacity));
- run_queue(q.get(), enqueue_count, enqueue_stride);
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyMPMCQueue) {
+ typedef MPMCQueue Queue;
+ std::unique_ptr<Queue> q(new Queue(s_nMPMCQueueCapacity));
+ FollyMPMCThreading(q.get(), s_nThreadCount / 2, s_nMPMCQueueEnqueueCount,
+ s_nThreadCount - s_nThreadCount / 2);
}
-class FollyQueueEnqueueDequeueTest : public ::testing::Test {
-
-};
-
-TEST_F(FollyQueueEnqueueDequeueTest, FollyMPMCQueue) {
- run_with_initial_capacity<MPMCQueue>(
- kMPMCQueueCapacity, kMPMCQueueEnqueueCount, kMPMCQueueEnqueueStride);
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_SPSC) {
+ typedef DSPSCQueue Queue;
+ std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
+ FollyMPMCThreading(q.get(), 1, s_nDSPSCQueueEnqueueCount, 1);
}
-TEST_F(FollyQueueEnqueueDequeueTest, FollyAtomicLinkedList) {
- run_atomic_linkedlist();
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_MPSC) {
+ typedef DMPSCQueue Queue;
+ std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
+ FollyMPMCThreading(q.get(), s_nThreadCount - 1, s_nDMPSCQueueEnqueueCount, 1);
}
-TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_SPSC) {
- run_without_initial_capacity<USPSCQueue>(
- kUSPSCQueueEnqueueCount, kUnboundedQueueEnqueueStride);
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_SPMC) {
+ typedef DSPMCQueue Queue;
+ std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
+ FollyMPMCThreading(q.get(), 1, s_nDSPMCQueueEnqueueCount, s_nThreadCount - 1);
}
-TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_MPSC) {
- run_without_initial_capacity<UMPSCQueue>(
- kUMPSCQueueEnqueueCount, kUnboundedQueueEnqueueStride);
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyDynamicBoundedQueue_MPMC) {
+ typedef DMPMCQueue Queue;
+ std::unique_ptr<Queue> q(new Queue(s_nDynamicBoundedQueueCapacity));
+ FollyMPMCThreading(q.get(), s_nThreadCount / 2, s_nDMPMCQueueEnqueueCount,
+ s_nThreadCount - s_nThreadCount / 2);
}
-TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_SPMC) {
- run_without_initial_capacity<USPMCQueue>(
- kUSPMCQueueEnqueueCount, kUnboundedQueueEnqueueStride);
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_SPSC) {
+ typedef USPSCQueue Queue;
+ std::unique_ptr<Queue> q(new Queue());
+ FollyMPMCThreading(q.get(), 1, s_nUSPSCQueueEnqueueCount, 1);
}
-TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_MPMC) {
- run_without_initial_capacity<UMPMCQueue>(
- kUMPMCQueueEnqueueCount,
- kUnboundedQueueEnqueueStride);
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_MPSC) {
+ typedef UMPSCQueue Queue;
+ std::unique_ptr<Queue> q(new Queue());
+ FollyMPMCThreading(q.get(), s_nThreadCount - 1, s_nUMPSCQueueEnqueueCount, 1);
}
-TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_SPSC) {
- // DynamicBoundedQueue
- run_with_initial_capacity<DSPSCQueue>(
- kDynamicBoundedQueueCapacity,
- kDSPSCQueueEnqueueCount, kDynamicBoundedQueueEnqueueStride);
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_SPMC) {
+ typedef USPMCQueue Queue;
+ std::unique_ptr<Queue> q(new Queue());
+ FollyMPMCThreading(q.get(), 1, s_nUSPMCQueueEnqueueCount, s_nThreadCount - 1);
}
-TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_MPSC) {
- run_with_initial_capacity<DMPSCQueue>(
- kDynamicBoundedQueueCapacity,
- kDMPSCQueueEnqueueCount,
- kDynamicBoundedQueueEnqueueStride);
-}
+TEST_F(FollyQueueEnqueueDequeueTest_Parallel, FollyUnboundedQueue_MPMC) {
+ typedef UMPMCQueue Queue;
-TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_SPMC) {
- run_with_initial_capacity<DSPMCQueue>(
- kDynamicBoundedQueueCapacity,
- kDSPMCQueueEnqueueCount,
- kDynamicBoundedQueueEnqueueStride);
+ std::unique_ptr<Queue> q(new Queue());
+ FollyMPMCThreading(q.get(), s_nThreadCount / 2, s_nUMPMCQueueEnqueueCount,
+ s_nThreadCount - s_nThreadCount / 2);
}
-TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_MPMC) {
- run_with_initial_capacity<DMPMCQueue>(
- kDynamicBoundedQueueCapacity,
- kDMPMCQueueEnqueueCount,
- kDynamicBoundedQueueEnqueueStride);
-}
+} // namespace folly_test