1 #include <folly/concurrency/UnboundedQueue.h>
2 #include <folly/concurrency/DynamicBoundedQueue.h>
3 #include <folly/AtomicLinkedList.h>
4 #include <folly/MPMCQueue.h>
13 const char* kTestName = "EnqueueDequeue";
16 size_t kUnboundedQueueEnqueueStride = 10000;
17 size_t kUSPSCQueueEnqueueCount = 1200000000;
18 const char* kUSPSCQueueBenchmarkName = "FollyUnboundedQueue_SPSC";
19 size_t kUMPSCQueueEnqueueCount = 320000000;
20 const char* kUMPSCQueueBenchmarkName = "FollyUnboundedQueue_MPSC";
21 size_t kUSPMCQueueEnqueueCount = 320000000;
22 const char* kUSPMCQueueBenchmarkName = "FollyUnboundedQueue_SPMC";
23 size_t kUMPMCQueueEnqueueCount = 320000000;
24 const char* kUMPMCQueueBenchmarkName = "FollyUnboundedQueue_MPMC";
26 typedef folly::USPSCQueue<size_t, false> USPSCQueue;
27 typedef folly::UMPSCQueue<size_t, false> UMPSCQueue;
28 typedef folly::USPMCQueue<size_t, false> USPMCQueue;
29 typedef folly::UMPMCQueue<size_t, false> UMPMCQueue;
31 // Dynamic bounded queue
32 size_t kDynamicBoundedQueueEnqueueStride = 50000;
33 size_t kDynamicBoundedQueueCapacity = 200000;
34 size_t kDSPSCQueueEnqueueCount = 1200000000;
35 const char* kDSPSCQueueBenchmarkName = "FollyDynamicBoundedQueue_SPSC";
36 size_t kDMPSCQueueEnqueueCount = 320000000;
37 const char* kDMPSCQueueBenchmarkName = "FollyDynamicBoundedQueue_MPSC";
38 size_t kDSPMCQueueEnqueueCount = 320000000;
39 const char* kDSPMCQueueBenchmarkName = "FollyDynamicBoundedQueue_SPMC";
40 size_t kDMPMCQueueEnqueueCount = 320000000;
41 const char* kDMPMCQueueBenchmarkName = "FollyDynamicBoundedQueue_MPMC";
43 typedef folly::DSPSCQueue<size_t, false> DSPSCQueue;
44 typedef folly::DMPSCQueue<size_t, false> DMPSCQueue;
45 typedef folly::DSPMCQueue<size_t, false> DSPMCQueue;
46 typedef folly::DMPMCQueue<size_t, false> DMPMCQueue;
49 size_t kAtomicLinkedListSize = 50000;
50 size_t kAtomicLinkedListPassCount = 10000;
51 const char* kAtomicLinkedListBenchmarkName = "FollyAtomicLinkedList";
52 typedef folly::AtomicLinkedList<size_t> AtomicLinkedList;
54 // MPMC Queue (linearizable)
55 size_t kMPMCQueueEnqueueStride = 10000;
56 size_t kMPMCQueueCapacity = 50000;
57 size_t kMPMCQueueEnqueueCount = 500000000;
58 const char* kMPMCQueueBenchmarkName = "FollyMPMCQueue";
59 typedef folly::MPMCQueue<size_t> MPMCQueue;
63 void run_atomic_linkedlist() {
64 std::cout << "[ RUN ] " << kTestName << "."
65 << kAtomicLinkedListBenchmarkName << std::endl;
66 auto start_time = std::chrono::system_clock::now();
67 for (size_t pass = 0; pass < kAtomicLinkedListPassCount; pass++) {
68 std::unique_ptr<AtomicLinkedList> list(new AtomicLinkedList());
70 for (size_t i = 0; i < kAtomicLinkedListSize; i++) {
74 auto func = [&nSum] (size_t elem) { nSum += elem; };
78 list->reverseSweep(func);
82 size_t supposed_sum = kAtomicLinkedListSize * (kAtomicLinkedListSize - 1) / 2;
83 if (nSum != supposed_sum) {
84 std::cout << "Sequential linked list pop sum: " << nSum
85 << " != " << supposed_sum << "\n";
86 auto finish_time = std::chrono::system_clock::now();
87 auto dur = finish_time - start_time;
88 auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
89 std::cout << "[ FAILED ] " << kTestName << "." << kAtomicLinkedListBenchmarkName
90 << " (" << milisecs.count() << " ms)" << std::endl;
91 assert(false && "Folly AtomicLinkedList ERROR");
94 auto finish_time = std::chrono::system_clock::now();
95 auto dur = finish_time - start_time;
96 auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
97 std::cout << "[ OK ] " << kTestName << "." << kAtomicLinkedListBenchmarkName
98 << " (" << milisecs.count() << " ms)" << std::endl;
101 template <typename Queue>
102 void run_queue(Queue* q, size_t enqueue_count, const char* bench_name,
103 size_t enqueue_stride) {
104 std::cout << "[ RUN ] " << kTestName << "." << bench_name << std::endl;
105 auto start_time = std::chrono::system_clock::now();
109 while (nNo < enqueue_count) {
110 size_t curr_push_count =
111 std::min(enqueue_count - nNo, enqueue_stride);
112 for (size_t i = 0; i < curr_push_count; i++) {
116 for (size_t i = 0; i < curr_push_count; i++) {
122 auto finish_time = std::chrono::system_clock::now();
123 auto dur = finish_time - start_time;
124 auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
126 size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
127 if (pop_sum != supposed_sum) {
128 std::cout << "Sequential queue pop sum: " << pop_sum
129 << " != " << supposed_sum << "\n";
130 std::cout << "[ FAILED ] " << kTestName << "." << bench_name
131 << " (" << milisecs.count() << " ms)" << std::endl;
132 assert(false && "Folly concurrent queue ERROR");
134 std::cout << "[ OK ] " << kTestName << "." << bench_name
135 << " (" << milisecs.count() << " ms)" << std::endl;
139 // MPMC Specialization.
141 void run_queue(MPMCQueue* q, size_t enqueue_count, const char* bench_name,
142 size_t enqueue_stride) {
143 std::cout << "[ RUN ] " << kTestName << "." << bench_name << std::endl;
144 auto start_time = std::chrono::system_clock::now();
149 while (nNo < enqueue_count) {
150 size_t curr_push_count =
151 std::min(enqueue_count - nNo, enqueue_stride);
152 for (size_t i = 0; i < curr_push_count; i++) {
159 while (q->read(res)) {
164 auto finish_time = std::chrono::system_clock::now();
165 auto dur = finish_time - start_time;
166 auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
168 size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
169 if (pop_sum != supposed_sum) {
170 std::cout << "Sequential queue pop sum: " << pop_sum
171 << " != " << supposed_sum << "\n";
172 std::cout << "[ FAILED ] " << kTestName << "." << bench_name
173 << " (" << milisecs.count() << " ms)" << std::endl;
174 assert(false && "Folly concurrent queue ERROR");
176 std::cout << "[ OK ] " << kTestName << "." << bench_name
177 << " (" << milisecs.count() << " ms)" << std::endl;
181 template <typename Queue>
182 void run_without_initial_capacity(size_t enqueue_count, const char* bench_name,
183 size_t enqueue_stride) {
184 std::unique_ptr<Queue> q(new Queue());
185 run_queue(q.get(), enqueue_count, bench_name, enqueue_stride);
188 template <typename Queue>
189 void run_with_initial_capacity(size_t queue_capacity, size_t enqueue_count,
190 const char* bench_name, size_t enqueue_stride) {
191 std::unique_ptr<Queue> q(new Queue(queue_capacity));
192 run_queue(q.get(), enqueue_count, bench_name, enqueue_stride);
197 run_with_initial_capacity<MPMCQueue>(
199 kMPMCQueueEnqueueCount,
200 kMPMCQueueBenchmarkName,
201 kMPMCQueueEnqueueStride);
204 run_atomic_linkedlist();
207 run_without_initial_capacity<USPSCQueue>(
208 kUSPSCQueueEnqueueCount,
209 kUSPSCQueueBenchmarkName,
210 kUnboundedQueueEnqueueStride);
211 run_without_initial_capacity<UMPSCQueue>(
212 kUMPSCQueueEnqueueCount,
213 kUMPSCQueueBenchmarkName,
214 kUnboundedQueueEnqueueStride);
215 run_without_initial_capacity<USPMCQueue>(
216 kUSPMCQueueEnqueueCount,
217 kUSPMCQueueBenchmarkName,
218 kUnboundedQueueEnqueueStride);
219 run_without_initial_capacity<UMPMCQueue>(
220 kUMPMCQueueEnqueueCount,
221 kUMPMCQueueBenchmarkName,
222 kUnboundedQueueEnqueueStride);
224 // DynamicBoundedQueue
225 run_with_initial_capacity<DSPSCQueue>(
226 kDynamicBoundedQueueCapacity ,
227 kDSPSCQueueEnqueueCount, kDSPSCQueueBenchmarkName,
228 kDynamicBoundedQueueEnqueueStride);
229 run_with_initial_capacity<DMPSCQueue>(
230 kDynamicBoundedQueueCapacity,
231 kDMPSCQueueEnqueueCount,
232 kDMPSCQueueBenchmarkName,
233 kDynamicBoundedQueueEnqueueStride);
234 run_with_initial_capacity<DSPMCQueue>(
235 kDynamicBoundedQueueCapacity,
236 kDSPMCQueueEnqueueCount,
237 kDSPMCQueueBenchmarkName,
238 kDynamicBoundedQueueEnqueueStride);
239 run_with_initial_capacity<DMPMCQueue>(
240 kDynamicBoundedQueueCapacity,
241 kDMPMCQueueEnqueueCount,
242 kDMPMCQueueBenchmarkName,
243 kDynamicBoundedQueueEnqueueStride);