1 #include <folly/concurrency/UnboundedQueue.h>
2 #include <folly/concurrency/DynamicBoundedQueue.h>
3 #include <folly/AtomicLinkedList.h>
4 #include <folly/MPMCQueue.h>
6 #include <gtest/gtest.h>
13 size_t kUnboundedQueueEnqueueStride = 10000;
14 size_t kUSPSCQueueEnqueueCount = 1200000000;
15 size_t kUMPSCQueueEnqueueCount = 320000000;
16 size_t kUSPMCQueueEnqueueCount = 320000000;
17 size_t kUMPMCQueueEnqueueCount = 320000000;
19 typedef folly::USPSCQueue<size_t, false> USPSCQueue;
20 typedef folly::UMPSCQueue<size_t, false> UMPSCQueue;
21 typedef folly::USPMCQueue<size_t, false> USPMCQueue;
22 typedef folly::UMPMCQueue<size_t, false> UMPMCQueue;
24 // Dynamic bounded queue
25 size_t kDynamicBoundedQueueEnqueueStride = 50000;
26 size_t kDynamicBoundedQueueCapacity = 200000;
27 size_t kDSPSCQueueEnqueueCount = 1200000000;
28 size_t kDMPSCQueueEnqueueCount = 320000000;
29 size_t kDSPMCQueueEnqueueCount = 320000000;
30 size_t kDMPMCQueueEnqueueCount = 320000000;
32 typedef folly::DSPSCQueue<size_t, false> DSPSCQueue;
33 typedef folly::DMPSCQueue<size_t, false> DMPSCQueue;
34 typedef folly::DSPMCQueue<size_t, false> DSPMCQueue;
35 typedef folly::DMPMCQueue<size_t, false> DMPMCQueue;
38 size_t kAtomicLinkedListSize = 50000;
39 size_t kAtomicLinkedListPassCount = 10000;
40 typedef folly::AtomicLinkedList<size_t> AtomicLinkedList;
42 // MPMC Queue (linearizable)
43 size_t kMPMCQueueEnqueueStride = 10000;
44 size_t kMPMCQueueCapacity = 50000;
45 size_t kMPMCQueueEnqueueCount = 500000000;
46 typedef folly::MPMCQueue<size_t> MPMCQueue;
50 void run_atomic_linkedlist() {
51 for (size_t pass = 0; pass < kAtomicLinkedListPassCount; pass++) {
52 std::unique_ptr<AtomicLinkedList> list(new AtomicLinkedList());
54 for (size_t i = 0; i < kAtomicLinkedListSize; i++) {
58 auto func = [&nSum] (size_t elem) { nSum += elem; };
62 list->reverseSweep(func);
66 size_t supposed_sum = kAtomicLinkedListSize * (kAtomicLinkedListSize - 1) / 2;
67 EXPECT_EQ(nSum, supposed_sum);
71 template <typename Queue>
72 void run_queue(Queue* q, size_t enqueue_count, size_t enqueue_stride) {
75 while (nNo < enqueue_count) {
76 size_t curr_push_count =
77 std::min(enqueue_count - nNo, enqueue_stride);
78 for (size_t i = 0; i < curr_push_count; i++) {
82 for (size_t i = 0; i < curr_push_count; i++) {
87 size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
88 EXPECT_EQ (pop_sum, supposed_sum);
91 // MPMC Specialization.
93 void run_queue(MPMCQueue* q, size_t enqueue_count, size_t enqueue_stride) {
97 while (nNo < enqueue_count) {
98 size_t curr_push_count =
99 std::min(enqueue_count - nNo, enqueue_stride);
100 for (size_t i = 0; i < curr_push_count; i++) {
107 while (q->read(res)) {
112 size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
113 EXPECT_EQ(pop_sum, supposed_sum);
116 template <typename Queue>
117 void run_without_initial_capacity(size_t enqueue_count, size_t enqueue_stride) {
118 std::unique_ptr<Queue> q(new Queue());
119 run_queue(q.get(), enqueue_count, enqueue_stride);
122 template <typename Queue>
123 void run_with_initial_capacity(size_t queue_capacity, size_t enqueue_count,
124 size_t enqueue_stride) {
125 std::unique_ptr<Queue> q(new Queue(queue_capacity));
126 run_queue(q.get(), enqueue_count, enqueue_stride);
129 class FollyQueueEnqueueDequeueTest : public ::testing::Test {
133 TEST_F(FollyQueueEnqueueDequeueTest, FollyMPMCQueue) {
134 run_with_initial_capacity<MPMCQueue>(
135 kMPMCQueueCapacity, kMPMCQueueEnqueueCount, kMPMCQueueEnqueueStride);
138 TEST_F(FollyQueueEnqueueDequeueTest, FollyAtomicLinkedList) {
139 run_atomic_linkedlist();
142 TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_SPSC) {
143 run_without_initial_capacity<USPSCQueue>(
144 kUSPSCQueueEnqueueCount, kUnboundedQueueEnqueueStride);
147 TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_MPSC) {
148 run_without_initial_capacity<UMPSCQueue>(
149 kUMPSCQueueEnqueueCount, kUnboundedQueueEnqueueStride);
152 TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_SPMC) {
153 run_without_initial_capacity<USPMCQueue>(
154 kUSPMCQueueEnqueueCount, kUnboundedQueueEnqueueStride);
157 TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_MPMC) {
158 run_without_initial_capacity<UMPMCQueue>(
159 kUMPMCQueueEnqueueCount,
160 kUnboundedQueueEnqueueStride);
163 TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_SPSC) {
164 // DynamicBoundedQueue
165 run_with_initial_capacity<DSPSCQueue>(
166 kDynamicBoundedQueueCapacity,
167 kDSPSCQueueEnqueueCount, kDynamicBoundedQueueEnqueueStride);
170 TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_MPSC) {
171 run_with_initial_capacity<DMPSCQueue>(
172 kDynamicBoundedQueueCapacity,
173 kDMPSCQueueEnqueueCount,
174 kDynamicBoundedQueueEnqueueStride);
177 TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_SPMC) {
178 run_with_initial_capacity<DSPMCQueue>(
179 kDynamicBoundedQueueCapacity,
180 kDSPMCQueueEnqueueCount,
181 kDynamicBoundedQueueEnqueueStride);
184 TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_MPMC) {
185 run_with_initial_capacity<DMPMCQueue>(
186 kDynamicBoundedQueueCapacity,
187 kDMPMCQueueEnqueueCount,
188 kDynamicBoundedQueueEnqueueStride);
191 int main(int argc, char** argv) {
193 ::testing::InitGoogleTest(&argc, argv);
194 int result = RUN_ALL_TESTS();