21f5311ca3de668151adcd4fa9e92ae5737d009d
[folly.git] / folly / stress-test / stress-sequential-folly-queue.cpp
1 #include <folly/concurrency/UnboundedQueue.h>
2 #include <folly/concurrency/DynamicBoundedQueue.h>
3 #include <folly/AtomicLinkedList.h>
4
5 #include <chrono>
6 #include <cassert>
7 #include <iostream>
8 #include <memory>
9
10 namespace {
11
12 const char* kTestName = "EnqueueDequeue";
13
14 // Unbounded queue
15 size_t kUnboundedQueueEnqueueStride = 10000;
16 size_t kUSPSCQueueEnqueueCount = 1200000000;
17 const char* kUSPSCQueueBenchmarkName = "FollyUnboundedQueue_SPSC";
18 size_t kUMPSCQueueEnqueueCount = 320000000;
19 const char* kUMPSCQueueBenchmarkName = "FollyUnboundedQueue_MPSC";
20 size_t kUSPMCQueueEnqueueCount = 320000000;
21 const char* kUSPMCQueueBenchmarkName = "FollyUnboundedQueue_SPMC";
22 size_t kUMPMCQueueEnqueueCount = 320000000;
23 const char* kUMPMCQueueBenchmarkName = "FollyUnboundedQueue_MPMC";
24
25 typedef folly::USPSCQueue<size_t, false> USPSCQueue;
26 typedef folly::UMPSCQueue<size_t, false> UMPSCQueue;
27 typedef folly::USPMCQueue<size_t, false> USPMCQueue;
28 typedef folly::UMPMCQueue<size_t, false> UMPMCQueue;
29
30 // Dynamic bounded queue
31 size_t kDynamicBoundedQueueEnqueueStride = 50000;
32 size_t kDynamicBoundedQueueCapacity = 200000;
33 size_t kDSPSCQueueEnqueueCount = 1200000000;
34 const char* kDSPSCQueueBenchmarkName = "FollyDynamicBoundedQueue_SPSC";
35 size_t kDMPSCQueueEnqueueCount = 320000000;
36 const char* kDMPSCQueueBenchmarkName = "FollyDynamicBoundedQueue_MPSC";
37 size_t kDSPMCQueueEnqueueCount = 320000000;
38 const char* kDSPMCQueueBenchmarkName = "FollyDynamicBoundedQueue_SPMC";
39 size_t kDMPMCQueueEnqueueCount = 320000000;
40 const char* kDMPMCQueueBenchmarkName = "FollyDynamicBoundedQueue_MPMC";
41
42 typedef folly::DSPSCQueue<size_t, false> DSPSCQueue;
43 typedef folly::DMPSCQueue<size_t, false> DMPSCQueue;
44 typedef folly::DSPMCQueue<size_t, false> DSPMCQueue;
45 typedef folly::DMPMCQueue<size_t, false> DMPMCQueue;
46
47 // AtomicLinkedList
48 size_t kAtomicLinkedListSize = 50000;
49 size_t kAtomicLinkedListPassCount = 10000;
50 const char* kAtomicLinkedListBenchmarkName = "FollyAtomicLinkedList";
51 typedef folly::AtomicLinkedList<size_t> AtomicLinkedList;
52
53 }
54
55 void run_atomic_linkedlist() {
56   std::cout << "[ RUN      ] " << kTestName << "."
57             << kAtomicLinkedListBenchmarkName << std::endl;
58   auto start_time = std::chrono::system_clock::now();
59   for (size_t pass = 0; pass < kAtomicLinkedListPassCount; pass++) {
60     std::unique_ptr<AtomicLinkedList> list(new AtomicLinkedList());
61     bool in_order = true;
62     for (size_t i = 0; i < kAtomicLinkedListSize; i++) {
63       list->insertHead(i);
64     }
65     size_t nSum = 0;
66     auto func = [&nSum] (size_t elem) { nSum += elem; };
67     if (in_order) {
68       list->sweep(func);
69     } else {
70       list->reverseSweep(func);
71     }
72     in_order = !in_order;
73
74     size_t supposed_sum = kAtomicLinkedListSize * (kAtomicLinkedListSize - 1) / 2;
75     if (nSum != supposed_sum) {
76       std::cout << "Sequential linked list pop sum: " << nSum
77                 << " != " << supposed_sum << "\n";
78       auto finish_time = std::chrono::system_clock::now();
79       auto dur = finish_time - start_time;
80       auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
81       std::cout << "[       FAILED ] " << kTestName << "." << kAtomicLinkedListBenchmarkName
82                 << " (" << milisecs.count() << " ms)" << std::endl;
83       assert(false && "Folly AtomicLinkedList ERROR");
84     }
85   }
86   auto finish_time = std::chrono::system_clock::now();
87   auto dur = finish_time - start_time;
88   auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
89   std::cout << "[       OK ] " << kTestName << "." << kAtomicLinkedListBenchmarkName
90             << " (" << milisecs.count() << " ms)" << std::endl;
91 }
92
93 template <typename Queue>
94 void run_queue(Queue* q, size_t enqueue_count, const char* bench_name,
95                size_t enqueue_stride) {
96     std::cout << "[ RUN      ] " << kTestName << "." << bench_name << std::endl;
97     auto start_time = std::chrono::system_clock::now();
98
99     size_t nNo = 0;
100     size_t push_failure = 0;
101     size_t pop_sum = 0;
102     while (nNo < enqueue_count) {
103       size_t curr_push_count =
104           std::min(enqueue_count - nNo, enqueue_stride);
105       for (size_t i = 0; i < curr_push_count; i++) {
106         q->enqueue(nNo++);
107       }
108       size_t res;
109       for (size_t i = 0; i < curr_push_count; i++) {
110         q->dequeue(res);
111         pop_sum += res;
112       }
113     }
114
115     auto finish_time = std::chrono::system_clock::now();
116     auto dur = finish_time - start_time;
117     auto milisecs = std::chrono::duration_cast<std::chrono::milliseconds>(dur);
118
119     size_t supposed_sum = enqueue_count * (enqueue_count - 1) / 2;
120     if (pop_sum != supposed_sum) {
121       std::cout << "Sequential queue pop sum: " << pop_sum
122                 << " != " << supposed_sum << "\n";
123       std::cout << "[       FAILED ] " << kTestName << "." << bench_name
124                 << " (" << milisecs.count() << " ms)" << std::endl;
125       assert(false && "Folly unbounded queue ERROR");
126     } else {
127         std::cout << "[       OK ] " << kTestName << "." << bench_name
128                   << " (" << milisecs.count() << " ms)" << std::endl;
129     }
130 }
131
132 template <typename Queue>
133 void run_unbounded(size_t enqueue_count, const char* bench_name,
134                    size_t enqueue_stride) {
135   std::unique_ptr<Queue> q(new Queue());
136   run_queue(q.get(), enqueue_count, bench_name, enqueue_stride);
137 }
138
139 template <typename Queue>
140 void run_dynamic_bounded(size_t queue_capacity, size_t enqueue_count,
141                          const char* bench_name,
142                          size_t enqueue_stride) {
143   std::unique_ptr<Queue> q(new Queue(queue_capacity));
144   run_queue(q.get(), enqueue_count, bench_name, enqueue_stride);
145 }
146
147 int main() {
148   run_atomic_linkedlist();
149   run_unbounded<USPSCQueue>(kUSPSCQueueEnqueueCount, kUSPSCQueueBenchmarkName,
150                         kUnboundedQueueEnqueueStride);
151   run_unbounded<UMPSCQueue>(kUMPSCQueueEnqueueCount, kUMPSCQueueBenchmarkName,
152                         kUnboundedQueueEnqueueStride);
153   run_unbounded<USPMCQueue>(kUSPMCQueueEnqueueCount, kUSPMCQueueBenchmarkName,
154                         kUnboundedQueueEnqueueStride);
155   run_unbounded<UMPMCQueue>(kUMPMCQueueEnqueueCount, kUMPMCQueueBenchmarkName,
156                         kUnboundedQueueEnqueueStride);
157
158   run_dynamic_bounded<DSPSCQueue>(kDynamicBoundedQueueCapacity ,
159                         kDSPSCQueueEnqueueCount, kDSPSCQueueBenchmarkName,
160                         kDynamicBoundedQueueEnqueueStride);
161   run_dynamic_bounded<DMPSCQueue>(kDynamicBoundedQueueCapacity,
162                         kDMPSCQueueEnqueueCount, kDMPSCQueueBenchmarkName,
163                         kDynamicBoundedQueueEnqueueStride);
164   run_dynamic_bounded<DSPMCQueue>(kDynamicBoundedQueueCapacity,
165                         kDSPMCQueueEnqueueCount, kDSPMCQueueBenchmarkName,
166                         kDynamicBoundedQueueEnqueueStride);
167   run_dynamic_bounded<DMPMCQueue>(kDynamicBoundedQueueCapacity,
168                         kDMPMCQueueEnqueueCount, kDMPMCQueueBenchmarkName,
169                         kDynamicBoundedQueueEnqueueStride);
170
171   return 0;
172 }