Ensure curly-braces around control-flow
[folly.git] / folly / experimental / test / FlatCombiningPriorityQueueTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/Benchmark.h>
18 #include <folly/experimental/FlatCombiningPriorityQueue.h>
19 #include <folly/portability/GTest.h>
20 #include <glog/logging.h>
21
22 #include <condition_variable>
23 #include <mutex>
24 #include <queue>
25
26 DEFINE_bool(bench, false, "run benchmark");
27 DEFINE_int32(reps, 10, "number of reps");
28 DEFINE_int32(ops, 100000, "number of operations per rep");
29 DEFINE_int32(size, 64, "initial size of the priority queue");
30 DEFINE_int32(work, 1000, "amount of unrelated work per operation");
31
32 void doWork(int work) {
33   uint64_t a = 0;
34   for (int i = work; i > 0; --i) {
35     a += i;
36   }
37   folly::doNotOptimizeAway(a);
38 }
39
40 /// Baseline implementation represents a conventional single-lock
41 /// implementation that supports cond var blocking.
42 template <
43     typename T,
44     typename PriorityQueue = std::priority_queue<T>,
45     typename Mutex = std::mutex>
46 class BaselinePQ {
47  public:
48   template <
49       typename... PQArgs,
50       typename = decltype(PriorityQueue(std::declval<PQArgs>()...))>
51   explicit BaselinePQ(size_t maxSize = 0, PQArgs... args)
52       : maxSize_(maxSize), pq_(std::forward<PQArgs>(args)...) {}
53
54   bool empty() const {
55     std::lock_guard<Mutex> g(m_);
56     return pq_.empty();
57   }
58
59   size_t size() const {
60     std::lock_guard<Mutex> g(m_);
61     return pq_.size();
62   }
63
64   bool tryPush(const T& val) {
65     std::lock_guard<Mutex> g(m_);
66     if (maxSize_ > 0 && pq_.size() == maxSize_) {
67       return false;
68     }
69     DCHECK(maxSize_ == 0 || pq_.size() < maxSize_);
70     try {
71       pq_.push(val);
72       notempty_.notify_one();
73       return true;
74     } catch (const std::bad_alloc&) {
75       return false;
76     }
77   }
78
79   bool tryPop(T& val) {
80     std::lock_guard<Mutex> g(m_);
81     if (!pq_.empty()) {
82       val = pq_.top();
83       pq_.pop();
84       notfull_.notify_one();
85       return true;
86     }
87     return false;
88   }
89
90   bool tryPeek(T& val) {
91     std::lock_guard<Mutex> g(m_);
92     if (!pq_.empty()) {
93       val = pq_.top();
94       return true;
95     }
96     return false;
97   }
98
99  private:
100   Mutex m_;
101   size_t maxSize_;
102   PriorityQueue pq_;
103   std::condition_variable notempty_;
104   std::condition_variable notfull_;
105 };
106
107 using FCPQ = folly::FlatCombiningPriorityQueue<int>;
108 using Baseline = BaselinePQ<int>;
109
110 #if FOLLY_SANITIZE_THREAD
111 static std::vector<int> nthr = {1, 2, 3, 4, 6, 8, 12, 16};
112 #else
113 static std::vector<int> nthr = {1, 2, 3, 4, 6, 8, 12, 16, 24, 32, 48, 64};
114 #endif
115 static uint32_t nthreads;
116
117 template <typename PriorityQueue, typename Func>
118 static uint64_t run_once(PriorityQueue& pq, const Func& fn) {
119   int ops = FLAGS_ops;
120   int size = FLAGS_size;
121   std::atomic<bool> start{false};
122   std::atomic<uint32_t> started{0};
123
124   for (int i = 0; i < size; ++i) {
125     CHECK(pq.tryPush(i * (ops / size)));
126   }
127
128   std::vector<std::thread> threads(nthreads);
129   for (uint32_t tid = 0; tid < nthreads; ++tid) {
130     threads[tid] = std::thread([&, tid] {
131       started.fetch_add(1);
132       while (!start.load()) {
133         /* nothing */;
134       }
135       fn(tid);
136     });
137   }
138
139   while (started.load() < nthreads) {
140     /* nothing */;
141   }
142   auto tbegin = std::chrono::steady_clock::now();
143
144   // begin time measurement
145   start.store(true);
146
147   for (auto& t : threads) {
148     t.join();
149   }
150
151   // end time measurement
152   uint64_t duration = 0;
153   auto tend = std::chrono::steady_clock::now();
154   duration = std::chrono::duration_cast<std::chrono::nanoseconds>(tend - tbegin)
155                  .count();
156   return duration;
157 }
158
159 TEST(FCPriQueue, basic) {
160   FCPQ pq;
161   CHECK(pq.empty());
162   CHECK_EQ(pq.size(), 0);
163   int v;
164   CHECK(!pq.tryPop(v));
165
166   CHECK(pq.tryPush(1));
167   CHECK(pq.tryPush(2));
168   CHECK(!pq.empty());
169   CHECK_EQ(pq.size(), 2);
170
171   pq.peek(v);
172   CHECK_EQ(v, 2); // higher value has higher priority
173   CHECK(pq.tryPeek(v));
174   CHECK_EQ(v, 2);
175   CHECK(!pq.empty());
176   CHECK_EQ(pq.size(), 2);
177
178   CHECK(pq.tryPop(v));
179   CHECK_EQ(v, 2);
180   CHECK(!pq.empty());
181   CHECK_EQ(pq.size(), 1);
182
183   CHECK(pq.tryPop(v));
184   CHECK_EQ(v, 1);
185   CHECK(pq.empty());
186   CHECK_EQ(pq.size(), 0);
187 }
188
189 TEST(FCPriQueue, bounded) {
190   FCPQ pq(1);
191   CHECK(pq.tryPush(1));
192   CHECK(!pq.tryPush(1));
193   CHECK_EQ(pq.size(), 1);
194   CHECK(!pq.empty());
195   int v;
196   CHECK(pq.tryPop(v));
197   CHECK_EQ(v, 1);
198   CHECK_EQ(pq.size(), 0);
199   CHECK(pq.empty());
200 }
201
202 TEST(FCPriQueue, timeout) {
203   FCPQ pq(1);
204   int v;
205   CHECK(!pq.tryPeek(
206       v,
207       std::chrono::steady_clock::now() + std::chrono::microseconds(1000)));
208   CHECK(!pq.tryPop(
209       v,
210       std::chrono::steady_clock::now() + std::chrono::microseconds(1000)));
211   pq.push(10);
212   CHECK(!pq.tryPush(
213       20,
214       std::chrono::steady_clock::now() + std::chrono::microseconds(1000)));
215 }
216
217 TEST(FCPriQueue, push_pop) {
218   int ops = 1000;
219   int work = 0;
220   std::chrono::steady_clock::time_point when =
221       std::chrono::steady_clock::now() + std::chrono::hours(24);
222   for (auto n : nthr) {
223     nthreads = n;
224     FCPQ pq(10000);
225     auto fn = [&](uint32_t tid) {
226       for (int i = tid; i < ops; i += nthreads) {
227         CHECK(pq.tryPush(i));
228         CHECK(pq.tryPush(i, when));
229         pq.push(i);
230         doWork(work);
231         int v;
232         CHECK(pq.tryPop(v));
233         CHECK(pq.tryPop(v, when));
234         pq.pop(v);
235         doWork(work);
236       }
237     };
238     run_once(pq, fn);
239   }
240 }
241
242 enum Exp {
243   NoFC,
244   FCNonBlock,
245   FCBlock,
246   FCTimed
247 };
248
249 static uint64_t test(std::string name, Exp exp, uint64_t base) {
250   int ops = FLAGS_ops;
251   int work = FLAGS_work;
252
253   uint64_t min = UINTMAX_MAX;
254   uint64_t max = 0;
255   uint64_t sum = 0;
256
257   for (int r = 0; r < FLAGS_reps; ++r) {
258     uint64_t dur;
259     switch(exp) {
260       case NoFC: {
261         Baseline pq;
262         auto fn = [&](uint32_t tid) {
263           for (int i = tid; i < ops; i += nthreads) {
264             CHECK(pq.tryPush(i));
265             doWork(work);
266             int v;
267             CHECK(pq.tryPop(v));
268             doWork(work);
269           }
270         };
271         dur = run_once(pq, fn);
272       } break;
273       case FCNonBlock: {
274         FCPQ pq;
275         auto fn = [&](uint32_t tid) {
276           for (int i = tid; i < ops; i += nthreads) {
277             CHECK(pq.tryPush(i));
278             doWork(work);
279             int v;
280             CHECK(pq.tryPop(v));
281             doWork(work);
282           }
283         };
284         dur = run_once(pq, fn);
285       } break;
286       case FCBlock: {
287         FCPQ pq;
288         auto fn = [&](uint32_t tid) {
289           for (int i = tid; i < ops; i += nthreads) {
290             pq.push(i);
291             doWork(work);
292             int v;
293             pq.pop(v);
294             doWork(work);
295           }
296         };
297         dur = run_once(pq, fn);
298       } break;
299       case FCTimed: {
300         FCPQ pq;
301         auto fn = [&](uint32_t tid) {
302           std::chrono::steady_clock::time_point when =
303               std::chrono::steady_clock::now() + std::chrono::hours(24);
304           for (int i = tid; i < ops; i += nthreads) {
305             CHECK(pq.tryPush(i, when));
306             doWork(work);
307             int v;
308             CHECK(pq.tryPop(v, when));
309             doWork(work);
310           }
311         };
312         dur = run_once(pq, fn);
313       } break;
314       default:
315         CHECK(false);
316     }
317
318     sum += dur;
319     min = std::min(min, dur);
320     max = std::max(max, dur);
321   }
322
323   uint64_t avg = sum / FLAGS_reps;
324   uint64_t res = min;
325   std::cout << name;
326   std::cout << "   " << std::setw(4) << max / FLAGS_ops << " ns";
327   std::cout << "   " << std::setw(4) << avg / FLAGS_ops << " ns";
328   std::cout << "   " << std::setw(4) << res / FLAGS_ops << " ns";
329   if (base) {
330     std::cout << " " << std::setw(3) << 100 * base / res << "%";
331   }
332   std::cout << std::endl;
333   return res;
334 }
335
336 TEST(FCPriQueue, bench) {
337   if (!FLAGS_bench) {
338     return;
339   }
340
341   std::cout << "Test_name, Max time, Avg time, Min time, % base min / min"
342             << std::endl;
343   for (int i : nthr) {
344     nthreads = i;
345     std::cout << "\n------------------------------------ Number of threads = "
346               << i
347               << std::endl;
348     uint64_t base =
349     test("baseline                    ", NoFC, 0);
350     test("baseline - dup              ", NoFC, base);
351     std::cout << "---- fc -------------------------------" << std::endl;
352     test("fc non-blocking             ", FCNonBlock, base);
353     test("fc non-blocking - dup       ", FCNonBlock, base);
354     test("fc timed                    ", FCTimed, base);
355     test("fc timed - dup              ", FCTimed, base);
356     test("fc blocking                 ", FCBlock, base);
357     test("fc blocking - dup           ", FCBlock, base);
358   }
359 }
360
361 /*
362 $ numactl -N 1 folly/experimental/test/fc_pri_queue_test --bench
363
364 [ RUN      ] FCPriQueue.bench
365 Test_name, Max time, Avg time, Min time, % base min / min
366
367 ------------------------------------ Number of threads = 1
368 baseline                        815 ns    793 ns    789 ns
369 baseline - dup                  886 ns    827 ns    789 ns  99%
370 ---- fc -------------------------------
371 fc non-blocking                 881 ns    819 ns    789 ns  99%
372 fc non-blocking - dup           833 ns    801 ns    786 ns 100%
373 fc timed                        863 ns    801 ns    781 ns 100%
374 fc timed - dup                  830 ns    793 ns    782 ns 100%
375 fc blocking                    1043 ns    820 ns    789 ns  99%
376 fc blocking - dup               801 ns    793 ns    789 ns 100%
377
378 ------------------------------------ Number of threads = 2
379 baseline                        579 ns    557 ns    540 ns
380 baseline - dup                  905 ns    621 ns    538 ns 100%
381 ---- fc -------------------------------
382 fc non-blocking                 824 ns    642 ns    568 ns  95%
383 fc non-blocking - dup           737 ns    645 ns    591 ns  91%
384 fc timed                        654 ns    590 ns    542 ns  99%
385 fc timed - dup                  666 ns    586 ns    534 ns 101%
386 fc blocking                     622 ns    599 ns    575 ns  93%
387 fc blocking - dup               677 ns    618 ns    570 ns  94%
388
389 ------------------------------------ Number of threads = 3
390 baseline                        740 ns    717 ns    699 ns
391 baseline - dup                  742 ns    716 ns    697 ns 100%
392 ---- fc -------------------------------
393 fc non-blocking                 730 ns    689 ns    645 ns 108%
394 fc non-blocking - dup           719 ns    695 ns    639 ns 109%
395 fc timed                        695 ns    650 ns    597 ns 117%
396 fc timed - dup                  694 ns    654 ns    624 ns 112%
397 fc blocking                     711 ns    687 ns    669 ns 104%
398 fc blocking - dup               716 ns    695 ns    624 ns 112%
399
400 ------------------------------------ Number of threads = 4
401 baseline                        777 ns    766 ns    750 ns
402 baseline - dup                  778 ns    752 ns    731 ns 102%
403 ---- fc -------------------------------
404 fc non-blocking                 653 ns    615 ns    589 ns 127%
405 fc non-blocking - dup           611 ns    593 ns    563 ns 133%
406 fc timed                        597 ns    577 ns    569 ns 131%
407 fc timed - dup                  618 ns    575 ns    546 ns 137%
408 fc blocking                     603 ns    590 ns    552 ns 135%
409 fc blocking - dup               614 ns    590 ns    556 ns 134%
410
411 ------------------------------------ Number of threads = 6
412 baseline                        925 ns    900 ns    869 ns
413 baseline - dup                  930 ns    895 ns    866 ns 100%
414 ---- fc -------------------------------
415 fc non-blocking                 568 ns    530 ns    481 ns 180%
416 fc non-blocking - dup           557 ns    521 ns    488 ns 177%
417 fc timed                        516 ns    496 ns    463 ns 187%
418 fc timed - dup                  517 ns    500 ns    474 ns 183%
419 fc blocking                     559 ns    513 ns    450 ns 193%
420 fc blocking - dup               564 ns    528 ns    466 ns 186%
421
422 ------------------------------------ Number of threads = 8
423 baseline                        999 ns    981 ns    962 ns
424 baseline - dup                  998 ns    984 ns    965 ns  99%
425 ---- fc -------------------------------
426 fc non-blocking                 491 ns    386 ns    317 ns 303%
427 fc non-blocking - dup           433 ns    344 ns    298 ns 322%
428 fc timed                        445 ns    348 ns    294 ns 327%
429 fc timed - dup                  446 ns    357 ns    292 ns 328%
430 fc blocking                     505 ns    389 ns    318 ns 302%
431 fc blocking - dup               416 ns    333 ns    293 ns 328%
432
433 ------------------------------------ Number of threads = 12
434 baseline                       1092 ns   1080 ns   1072 ns
435 baseline - dup                 1085 ns   1074 ns   1065 ns 100%
436 ---- fc -------------------------------
437 fc non-blocking                 360 ns    283 ns    258 ns 415%
438 fc non-blocking - dup           340 ns    278 ns    250 ns 427%
439 fc timed                        271 ns    260 ns    249 ns 429%
440 fc timed - dup                  397 ns    283 ns    253 ns 423%
441 fc blocking                     331 ns    279 ns    258 ns 415%
442 fc blocking - dup               358 ns    280 ns    259 ns 412%
443
444 ------------------------------------ Number of threads = 16
445 baseline                       1120 ns   1115 ns   1103 ns
446 baseline - dup                 1122 ns   1118 ns   1114 ns  99%
447 ---- fc -------------------------------
448 fc non-blocking                 339 ns    297 ns    246 ns 448%
449 fc non-blocking - dup           353 ns    301 ns    264 ns 417%
450 fc timed                        326 ns    287 ns    247 ns 445%
451 fc timed - dup                  338 ns    294 ns    259 ns 425%
452 fc blocking                     329 ns    288 ns    247 ns 445%
453 fc blocking - dup               375 ns    308 ns    265 ns 415%
454
455 ------------------------------------ Number of threads = 24
456 baseline                       1073 ns   1068 ns   1064 ns
457 baseline - dup                 1075 ns   1071 ns   1069 ns  99%
458 ---- fc -------------------------------
459 fc non-blocking                 439 ns    342 ns    278 ns 382%
460 fc non-blocking - dup           389 ns    318 ns    291 ns 364%
461 fc timed                        368 ns    324 ns    266 ns 398%
462 fc timed - dup                  412 ns    328 ns    302 ns 352%
463 fc blocking                     425 ns    345 ns    275 ns 386%
464 fc blocking - dup               429 ns    340 ns    269 ns 395%
465
466 ------------------------------------ Number of threads = 32
467 baseline                       1001 ns    990 ns    981 ns
468 baseline - dup                 1002 ns    992 ns    983 ns  99%
469 ---- fc -------------------------------
470 fc non-blocking                 404 ns    342 ns    273 ns 359%
471 fc non-blocking - dup           395 ns    316 ns    259 ns 378%
472 fc timed                        379 ns    330 ns    258 ns 380%
473 fc timed - dup                  392 ns    335 ns    274 ns 357%
474 fc blocking                     423 ns    340 ns    277 ns 353%
475 fc blocking - dup               445 ns    359 ns    275 ns 356%
476
477 ------------------------------------ Number of threads = 48
478 baseline                        978 ns    975 ns    971 ns
479 baseline - dup                  977 ns    974 ns    972 ns  99%
480 ---- fc -------------------------------
481 fc non-blocking                 424 ns    327 ns    258 ns 375%
482 fc non-blocking - dup           378 ns    317 ns    256 ns 379%
483 fc timed                        368 ns    311 ns    277 ns 350%
484 fc timed - dup                  385 ns    310 ns    251 ns 385%
485 fc blocking                     422 ns    313 ns    255 ns 380%
486 fc blocking - dup               406 ns    314 ns    258 ns 376%
487
488 ------------------------------------ Number of threads = 64
489 baseline                        993 ns    981 ns    974 ns
490 baseline - dup                  984 ns    979 ns    975 ns  99%
491 ---- fc -------------------------------
492 fc non-blocking                 353 ns    301 ns    266 ns 365%
493 fc non-blocking - dup           339 ns    301 ns    271 ns 358%
494 fc timed                        399 ns    321 ns    259 ns 375%
495 fc timed - dup                  381 ns    300 ns    263 ns 369%
496 fc blocking                     390 ns    301 ns    251 ns 387%
497 fc blocking - dup               345 ns    289 ns    259 ns 374%
498 [       OK ] FCPriQueue.bench (112424 ms)
499
500 $ lscpu
501 Architecture:          x86_64
502 CPU op-mode(s):        32-bit, 64-bit
503 Byte Order:            Little Endian
504 CPU(s):                32
505 On-line CPU(s) list:   0-31
506 Thread(s) per core:    2
507 Core(s) per socket:    8
508 Socket(s):             2
509 NUMA node(s):          2
510 Vendor ID:             GenuineIntel
511 CPU family:            6
512 Model:                 45
513 Model name:            Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz
514 Stepping:              6
515 CPU MHz:               2200.000
516 CPU max MHz:           2200.0000
517 CPU min MHz:           1200.0000
518 BogoMIPS:              4399.87
519 Virtualization:        VT-x
520 L1d cache:             32K
521 L1i cache:             32K
522 L2 cache:              256K
523 L3 cache:              20480K
524 NUMA node0 CPU(s):     0-7,16-23
525 NUMA node1 CPU(s):     8-15,24-31
526 Flags:                 fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 cx16 xtpr pdcm pcid dca sse4_1 sse4_2 x2apic popcnt tsc_deadline_timer aes xsave avx lahf_lm epb tpr_shadow vnmi flexpriority ept vpid xsaveopt dtherm arat pln pts
527
528  */