Uses cmake to build folly test drivers
[folly.git] / folly / stress-test / stress-parallel-folly-queue.cpp
1 #include <folly/concurrency/UnboundedQueue.h>
2 #include <folly/concurrency/DynamicBoundedQueue.h>
3 #include <folly/AtomicLinkedList.h>
4 #include <folly/MPMCQueue.h>
5
6 #include <gtest/gtest.h>
7
8 #include <memory>
9
10 namespace {
11
12 // Unbounded queue
13 size_t kUnboundedQueueEnqueueStride = 10000;
14 size_t kUSPSCQueueEnqueueCount = 1200000000;
15 size_t kUMPSCQueueEnqueueCount = 320000000;
16 size_t kUSPMCQueueEnqueueCount = 320000000;
17 size_t kUMPMCQueueEnqueueCount = 320000000;
18
19 typedef folly::USPSCQueue<size_t, false> USPSCQueue;
20 typedef folly::UMPSCQueue<size_t, false> UMPSCQueue;
21 typedef folly::USPMCQueue<size_t, false> USPMCQueue;
22 typedef folly::UMPMCQueue<size_t, false> UMPMCQueue;
23
24 // Dynamic bounded queue
25 size_t kDynamicBoundedQueueEnqueueStride = 50000;
26 size_t kDynamicBoundedQueueCapacity = 200000;
27 size_t kDSPSCQueueEnqueueCount = 1200000000;
28 size_t kDMPSCQueueEnqueueCount = 320000000;
29 size_t kDSPMCQueueEnqueueCount = 320000000;
30 size_t kDMPMCQueueEnqueueCount = 320000000;
31
32 typedef folly::DSPSCQueue<size_t, false> DSPSCQueue;
33 typedef folly::DMPSCQueue<size_t, false> DMPSCQueue;
34 typedef folly::DSPMCQueue<size_t, false> DSPMCQueue;
35 typedef folly::DMPMCQueue<size_t, false> DMPMCQueue;
36
37 // AtomicLinkedList
38 size_t kAtomicLinkedListSize = 50000;
39 size_t kAtomicLinkedListPassCount = 10000;
40 typedef folly::AtomicLinkedList<size_t> AtomicLinkedList;
41
42 // MPMC Queue (linearizable)
43 size_t kMPMCQueueEnqueueStride = 10000;
44 size_t kMPMCQueueCapacity = 50000;
45 size_t kMPMCQueueEnqueueCount = 500000000;
46 typedef folly::MPMCQueue<size_t> MPMCQueue;
47
48 }
49
50 void run_atomic_linkedlist() {
51   for (size_t pass = 0; pass < kAtomicLinkedListPassCount; pass++) {
52     std::unique_ptr<AtomicLinkedList> list(new AtomicLinkedList());
53     bool in_order = true;
54     for (size_t i = 0; i < kAtomicLinkedListSize; i++) {
55       list->insertHead(i);
56     }
57     size_t nSum = 0;
58     auto func = [&nSum] (size_t elem) { nSum += elem; };
59     if (in_order) {
60       list->sweep(func);
61     } else {
62       list->reverseSweep(func);
63     }
64     in_order = !in_order;
65
66     size_t supposed_sum = kAtomicLinkedListSize * (kAtomicLinkedListSize - 1) / 2;
67     EXPECT_EQ(nSum, supposed_sum);
68   }
69 }
70
71 template <typename Queue>
72 void run_queue(Queue* q, size_t enqueue_count, size_t enqueue_stride) {
73     size_t nNo = 0;
74     size_t pop_sum = 0;
75     while (nNo < enqueue_count) {
76       size_t curr_push_count =
77           std::min(enqueue_count - nNo, enqueue_stride);
78       for (size_t i = 0; i < curr_push_count; i++) {
79         q->enqueue(nNo++);
80       }
81       size_t res;
82       for (size_t i = 0; i < curr_push_count; i++) {
83         q->dequeue(res);
84         pop_sum += res;
85       }
86     }
87     size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
88     EXPECT_EQ (pop_sum, supposed_sum);
89 }
90
91 // MPMC Specialization.
92 template <>
93 void run_queue(MPMCQueue* q, size_t enqueue_count, size_t enqueue_stride) {
94     size_t nNo = 0;
95     size_t push_sum = 0;
96     size_t pop_sum = 0;
97     while (nNo < enqueue_count) {
98       size_t curr_push_count =
99           std::min(enqueue_count - nNo, enqueue_stride);
100       for (size_t i = 0; i < curr_push_count; i++) {
101         if (q->write(nNo)) {
102           push_sum += nNo;
103           nNo++;
104         }
105       }
106       size_t res;
107       while (q->read(res)) {
108         pop_sum += res;
109       }
110     }
111
112     size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
113     EXPECT_EQ(pop_sum, supposed_sum);
114 }
115
116 template <typename Queue>
117 void run_without_initial_capacity(size_t enqueue_count, size_t enqueue_stride) {
118   std::unique_ptr<Queue> q(new Queue());
119   run_queue(q.get(), enqueue_count, enqueue_stride);
120 }
121
122 template <typename Queue>
123 void run_with_initial_capacity(size_t queue_capacity, size_t enqueue_count,
124                                size_t enqueue_stride) {
125   std::unique_ptr<Queue> q(new Queue(queue_capacity));
126   run_queue(q.get(), enqueue_count, enqueue_stride);
127 }
128
129 class FollyQueueEnqueueDequeueTest : public ::testing::Test {
130
131 };
132
133 TEST_F(FollyQueueEnqueueDequeueTest, FollyMPMCQueue) {
134   run_with_initial_capacity<MPMCQueue>(
135       kMPMCQueueCapacity, kMPMCQueueEnqueueCount, kMPMCQueueEnqueueStride);
136 }
137
138 TEST_F(FollyQueueEnqueueDequeueTest, FollyAtomicLinkedList) {
139   run_atomic_linkedlist();
140 }
141
142 TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_SPSC) {
143   run_without_initial_capacity<USPSCQueue>(
144       kUSPSCQueueEnqueueCount, kUnboundedQueueEnqueueStride);
145 }
146
147 TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_MPSC) {
148   run_without_initial_capacity<UMPSCQueue>(
149       kUMPSCQueueEnqueueCount, kUnboundedQueueEnqueueStride);
150 }
151
152 TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_SPMC) {
153   run_without_initial_capacity<USPMCQueue>(
154       kUSPMCQueueEnqueueCount, kUnboundedQueueEnqueueStride);
155 }
156
157 TEST_F(FollyQueueEnqueueDequeueTest, FollyUnboundedQueue_MPMC) {
158   run_without_initial_capacity<UMPMCQueue>(
159       kUMPMCQueueEnqueueCount,
160       kUnboundedQueueEnqueueStride);
161 }
162
163 TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_SPSC) {
164   // DynamicBoundedQueue
165   run_with_initial_capacity<DSPSCQueue>(
166       kDynamicBoundedQueueCapacity,
167       kDSPSCQueueEnqueueCount, kDynamicBoundedQueueEnqueueStride);
168 }
169
170 TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_MPSC) {
171   run_with_initial_capacity<DMPSCQueue>(
172       kDynamicBoundedQueueCapacity,
173       kDMPSCQueueEnqueueCount,
174       kDynamicBoundedQueueEnqueueStride);
175 }
176
177 TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_SPMC) {
178   run_with_initial_capacity<DSPMCQueue>(
179       kDynamicBoundedQueueCapacity,
180       kDSPMCQueueEnqueueCount,
181       kDynamicBoundedQueueEnqueueStride);
182 }
183
184 TEST_F(FollyQueueEnqueueDequeueTest, FollyDynamicBoundedQueue_MPMC) {
185   run_with_initial_capacity<DMPMCQueue>(
186       kDynamicBoundedQueueCapacity,
187       kDMPMCQueueEnqueueCount,
188       kDynamicBoundedQueueEnqueueStride);
189 }
190
191 int main(int argc, char** argv) {
192   // Init Google test
193   ::testing::InitGoogleTest(&argc, argv);
194   int result = RUN_ALL_TESTS();
195   return result;
196 }