Adds Folly MPMCQueue test case
[folly.git] / folly / stress-test / stress-sequential-folly-queue.cpp
1 #include <folly/concurrency/UnboundedQueue.h>
2 #include <folly/concurrency/DynamicBoundedQueue.h>
3 #include <folly/AtomicLinkedList.h>
4 #include <folly/MPMCQueue.h>
5
6 #include <chrono>
7 #include <cassert>
8 #include <iostream>
9 #include <memory>
10
11 namespace {
12
13 const char* kTestName = "EnqueueDequeue";
14
15 // Unbounded queue
16 size_t kUnboundedQueueEnqueueStride = 10000;
17 size_t kUSPSCQueueEnqueueCount = 1200000000;
18 const char* kUSPSCQueueBenchmarkName = "FollyUnboundedQueue_SPSC";
19 size_t kUMPSCQueueEnqueueCount = 320000000;
20 const char* kUMPSCQueueBenchmarkName = "FollyUnboundedQueue_MPSC";
21 size_t kUSPMCQueueEnqueueCount = 320000000;
22 const char* kUSPMCQueueBenchmarkName = "FollyUnboundedQueue_SPMC";
23 size_t kUMPMCQueueEnqueueCount = 320000000;
24 const char* kUMPMCQueueBenchmarkName = "FollyUnboundedQueue_MPMC";
25
26 typedef folly::USPSCQueue<size_t, false> USPSCQueue;
27 typedef folly::UMPSCQueue<size_t, false> UMPSCQueue;
28 typedef folly::USPMCQueue<size_t, false> USPMCQueue;
29 typedef folly::UMPMCQueue<size_t, false> UMPMCQueue;
30
31 // Dynamic bounded queue
32 size_t kDynamicBoundedQueueEnqueueStride = 50000;
33 size_t kDynamicBoundedQueueCapacity = 200000;
34 size_t kDSPSCQueueEnqueueCount = 1200000000;
35 const char* kDSPSCQueueBenchmarkName = "FollyDynamicBoundedQueue_SPSC";
36 size_t kDMPSCQueueEnqueueCount = 320000000;
37 const char* kDMPSCQueueBenchmarkName = "FollyDynamicBoundedQueue_MPSC";
38 size_t kDSPMCQueueEnqueueCount = 320000000;
39 const char* kDSPMCQueueBenchmarkName = "FollyDynamicBoundedQueue_SPMC";
40 size_t kDMPMCQueueEnqueueCount = 320000000;
41 const char* kDMPMCQueueBenchmarkName = "FollyDynamicBoundedQueue_MPMC";
42
43 typedef folly::DSPSCQueue<size_t, false> DSPSCQueue;
44 typedef folly::DMPSCQueue<size_t, false> DMPSCQueue;
45 typedef folly::DSPMCQueue<size_t, false> DSPMCQueue;
46 typedef folly::DMPMCQueue<size_t, false> DMPMCQueue;
47
48 // AtomicLinkedList
49 size_t kAtomicLinkedListSize = 50000;
50 size_t kAtomicLinkedListPassCount = 10000;
51 const char* kAtomicLinkedListBenchmarkName = "FollyAtomicLinkedList";
52 typedef folly::AtomicLinkedList<size_t> AtomicLinkedList;
53
54 // MPMC Queue (linearizable)
55 size_t kMPMCQueueEnqueueStride = 10000;
56 size_t kMPMCQueueCapacity = 50000;
57 size_t kMPMCQueueEnqueueCount = 500000000;
58 const char* kMPMCQueueBenchmarkName = "FollyMPMCQueue";
59 typedef folly::MPMCQueue<size_t> MPMCQueue;
60
61 }
62
63 void run_atomic_linkedlist() {
64   std::cout << "[ RUN      ] " << kTestName << "."
65             << kAtomicLinkedListBenchmarkName << std::endl;
66   auto start_time = std::chrono::system_clock::now();
67   for (size_t pass = 0; pass < kAtomicLinkedListPassCount; pass++) {
68     std::unique_ptr<AtomicLinkedList> list(new AtomicLinkedList());
69     bool in_order = true;
70     for (size_t i = 0; i < kAtomicLinkedListSize; i++) {
71       list->insertHead(i);
72     }
73     size_t nSum = 0;
74     auto func = [&nSum] (size_t elem) { nSum += elem; };
75     if (in_order) {
76       list->sweep(func);
77     } else {
78       list->reverseSweep(func);
79     }
80     in_order = !in_order;
81
82     size_t supposed_sum = kAtomicLinkedListSize * (kAtomicLinkedListSize - 1) / 2;
83     if (nSum != supposed_sum) {
84       std::cout << "Sequential linked list pop sum: " << nSum
85                 << " != " << supposed_sum << "\n";
86       auto finish_time = std::chrono::system_clock::now();
87       auto dur = finish_time - start_time;
88       auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
89       std::cout << "[       FAILED ] " << kTestName << "." << kAtomicLinkedListBenchmarkName
90                 << " (" << milisecs.count() << " ms)" << std::endl;
91       assert(false && "Folly AtomicLinkedList ERROR");
92     }
93   }
94   auto finish_time = std::chrono::system_clock::now();
95   auto dur = finish_time - start_time;
96   auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
97   std::cout << "[       OK ] " << kTestName << "." << kAtomicLinkedListBenchmarkName
98             << " (" << milisecs.count() << " ms)" << std::endl;
99 }
100
101 template <typename Queue>
102 void run_queue(Queue* q, size_t enqueue_count, const char* bench_name,
103                size_t enqueue_stride) {
104     std::cout << "[ RUN      ] " << kTestName << "." << bench_name << std::endl;
105     auto start_time = std::chrono::system_clock::now();
106
107     size_t nNo = 0;
108     size_t pop_sum = 0;
109     while (nNo < enqueue_count) {
110       size_t curr_push_count =
111           std::min(enqueue_count - nNo, enqueue_stride);
112       for (size_t i = 0; i < curr_push_count; i++) {
113         q->enqueue(nNo++);
114       }
115       size_t res;
116       for (size_t i = 0; i < curr_push_count; i++) {
117         q->dequeue(res);
118         pop_sum += res;
119       }
120     }
121
122     auto finish_time = std::chrono::system_clock::now();
123     auto dur = finish_time - start_time;
124     auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
125
126     size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
127     if (pop_sum != supposed_sum) {
128       std::cout << "Sequential queue pop sum: " << pop_sum
129                 << " != " << supposed_sum << "\n";
130       std::cout << "[       FAILED ] " << kTestName << "." << bench_name
131                 << " (" << milisecs.count() << " ms)" << std::endl;
132       assert(false && "Folly concurrent queue ERROR");
133     } else {
134         std::cout << "[       OK ] " << kTestName << "." << bench_name
135                   << " (" << milisecs.count() << " ms)" << std::endl;
136     }
137 }
138
139 // MPMC Specialization.
140 template <>
141 void run_queue(MPMCQueue* q, size_t enqueue_count, const char* bench_name,
142                size_t enqueue_stride) {
143     std::cout << "[ RUN      ] " << kTestName << "." << bench_name << std::endl;
144     auto start_time = std::chrono::system_clock::now();
145
146     size_t nNo = 0;
147     size_t push_sum = 0;
148     size_t pop_sum = 0;
149     while (nNo < enqueue_count) {
150       size_t curr_push_count =
151           std::min(enqueue_count - nNo, enqueue_stride);
152       for (size_t i = 0; i < curr_push_count; i++) {
153         if (q->write(nNo)) {
154           push_sum += nNo;
155           nNo++;
156         }
157       }
158       size_t res;
159       while (q->read(res)) {
160         pop_sum += res;
161       }
162     }
163
164     auto finish_time = std::chrono::system_clock::now();
165     auto dur = finish_time - start_time;
166     auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
167
168     size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
169     if (pop_sum != supposed_sum) {
170       std::cout << "Sequential queue pop sum: " << pop_sum
171                 << " != " << supposed_sum << "\n";
172       std::cout << "[       FAILED ] " << kTestName << "." << bench_name
173                 << " (" << milisecs.count() << " ms)" << std::endl;
174       assert(false && "Folly concurrent queue ERROR");
175     } else {
176         std::cout << "[       OK ] " << kTestName << "." << bench_name
177                   << " (" << milisecs.count() << " ms)" << std::endl;
178     }
179 }
180
181 template <typename Queue>
182 void run_without_initial_capacity(size_t enqueue_count, const char* bench_name,
183                                   size_t enqueue_stride) {
184   std::unique_ptr<Queue> q(new Queue());
185   run_queue(q.get(), enqueue_count, bench_name, enqueue_stride);
186 }
187
188 template <typename Queue>
189 void run_with_initial_capacity(size_t queue_capacity, size_t enqueue_count,
190                                const char* bench_name, size_t enqueue_stride) {
191   std::unique_ptr<Queue> q(new Queue(queue_capacity));
192   run_queue(q.get(), enqueue_count, bench_name, enqueue_stride);
193 }
194
195 int main() {
196   // MPMCQueue
197   run_with_initial_capacity<MPMCQueue>(
198       kMPMCQueueCapacity ,
199       kMPMCQueueEnqueueCount,
200       kMPMCQueueBenchmarkName,
201       kMPMCQueueEnqueueStride);
202
203   // AtomicLinkedList
204   run_atomic_linkedlist();
205
206   // UnboundedQueue
207   run_without_initial_capacity<USPSCQueue>(
208       kUSPSCQueueEnqueueCount,
209       kUSPSCQueueBenchmarkName,
210       kUnboundedQueueEnqueueStride);
211   run_without_initial_capacity<UMPSCQueue>(
212       kUMPSCQueueEnqueueCount,
213       kUMPSCQueueBenchmarkName,
214       kUnboundedQueueEnqueueStride);
215   run_without_initial_capacity<USPMCQueue>(
216       kUSPMCQueueEnqueueCount,
217       kUSPMCQueueBenchmarkName,
218       kUnboundedQueueEnqueueStride);
219   run_without_initial_capacity<UMPMCQueue>(
220       kUMPMCQueueEnqueueCount,
221       kUMPMCQueueBenchmarkName,
222       kUnboundedQueueEnqueueStride);
223
224   // DynamicBoundedQueue
225   run_with_initial_capacity<DSPSCQueue>(
226       kDynamicBoundedQueueCapacity ,
227       kDSPSCQueueEnqueueCount, kDSPSCQueueBenchmarkName,
228       kDynamicBoundedQueueEnqueueStride);
229   run_with_initial_capacity<DMPSCQueue>(
230       kDynamicBoundedQueueCapacity,
231       kDMPSCQueueEnqueueCount,
232       kDMPSCQueueBenchmarkName,
233       kDynamicBoundedQueueEnqueueStride);
234   run_with_initial_capacity<DSPMCQueue>(
235       kDynamicBoundedQueueCapacity,
236       kDSPMCQueueEnqueueCount,
237       kDSPMCQueueBenchmarkName,
238       kDynamicBoundedQueueEnqueueStride);
239   run_with_initial_capacity<DMPMCQueue>(
240       kDynamicBoundedQueueCapacity,
241       kDMPMCQueueEnqueueCount,
242       kDMPMCQueueBenchmarkName,
243       kDynamicBoundedQueueEnqueueStride);
244
245   return 0;
246 }