cb21b809dc38dd9a7db4f78ffa9507a41fc93c88
[folly.git] / folly / experimental / test / FlatCombiningPriorityQueueTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/Benchmark.h>
18 #include <folly/experimental/FlatCombiningPriorityQueue.h>
19 #include <folly/portability/GTest.h>
20 #include <glog/logging.h>
21
22 #include <condition_variable>
23 #include <mutex>
24 #include <queue>
25
26 DEFINE_bool(bench, false, "run benchmark");
27 DEFINE_int32(reps, 10, "number of reps");
28 DEFINE_int32(ops, 100000, "number of operations per rep");
29 DEFINE_int32(size, 64, "initial size of the priority queue");
30 DEFINE_int32(work, 1000, "amount of unrelated work per operation");
31
32 void doWork(int work) {
33   uint64_t a = 0;
34   for (int i = work; i > 0; --i) {
35     a += i;
36   }
37   folly::doNotOptimizeAway(a);
38 }
39
40 /// Baseline implementation represents a conventional single-lock
41 /// implementation that supports cond var blocking.
42 template <
43     typename T,
44     typename PriorityQueue = std::priority_queue<T>,
45     typename Mutex = std::mutex>
46 class BaselinePQ {
47  public:
48   template <
49       typename... PQArgs,
50       typename = decltype(PriorityQueue(std::declval<PQArgs>()...))>
51   explicit BaselinePQ(size_t maxSize = 0, PQArgs... args)
52       : maxSize_(maxSize), pq_(std::forward<PQArgs>(args)...) {}
53
54   bool empty() const {
55     std::lock_guard<Mutex> g(m_);
56     return pq_.empty();
57   }
58
59   size_t size() const {
60     std::lock_guard<Mutex> g(m_);
61     return pq_.size();
62   }
63
64   bool tryPush(const T& val) {
65     std::lock_guard<Mutex> g(m_);
66     if (maxSize_ > 0 && pq_.size() == maxSize_) {
67       return false;
68     }
69     DCHECK(maxSize_ == 0 || pq_.size() < maxSize_);
70     try {
71       pq_.push(val);
72       notempty_.notify_one();
73       return true;
74     } catch (const std::bad_alloc&) {
75       return false;
76     }
77   }
78
79   bool tryPop(T& val) {
80     std::lock_guard<Mutex> g(m_);
81     if (!pq_.empty()) {
82       val = pq_.top();
83       pq_.pop();
84       notfull_.notify_one();
85       return true;
86     }
87     return false;
88   }
89
90   bool tryPeek(T& val) {
91     std::lock_guard<Mutex> g(m_);
92     if (!pq_.empty()) {
93       val = pq_.top();
94       return true;
95     }
96     return false;
97   }
98
99  private:
100   Mutex m_;
101   size_t maxSize_;
102   PriorityQueue pq_;
103   std::condition_variable notempty_;
104   std::condition_variable notfull_;
105 };
106
107 using FCPQ = folly::FlatCombiningPriorityQueue<int>;
108 using Baseline = BaselinePQ<int>;
109
110 #if FOLLY_SANITIZE_THREAD
111 static std::vector<int> nthr = {1, 2, 3, 4, 6, 8, 12, 16};
112 #else
113 static std::vector<int> nthr = {1, 2, 3, 4, 6, 8, 12, 16, 24, 32, 48, 64};
114 #endif
115 static uint32_t nthreads;
116
117 template <typename PriorityQueue, typename Func>
118 static uint64_t run_once(PriorityQueue& pq, const Func& fn) {
119   int ops = FLAGS_ops;
120   int size = FLAGS_size;
121   std::atomic<bool> start{false};
122   std::atomic<uint32_t> started{0};
123
124   for (int i = 0; i < size; ++i) {
125     CHECK(pq.tryPush(i * (ops / size)));
126   }
127
128   std::vector<std::thread> threads(nthreads);
129   for (uint32_t tid = 0; tid < nthreads; ++tid) {
130     threads[tid] = std::thread([&, tid] {
131       started.fetch_add(1);
132       while (!start.load())
133         /* nothing */;
134       fn(tid);
135     });
136   }
137
138   while (started.load() < nthreads)
139     /* nothing */;
140   auto tbegin = std::chrono::steady_clock::now();
141
142   // begin time measurement
143   start.store(true);
144
145   for (auto& t : threads) {
146     t.join();
147   }
148
149   // end time measurement
150   uint64_t duration = 0;
151   auto tend = std::chrono::steady_clock::now();
152   duration = std::chrono::duration_cast<std::chrono::nanoseconds>(tend - tbegin)
153                  .count();
154   return duration;
155 }
156
157 TEST(FCPriQueue, basic) {
158   FCPQ pq;
159   CHECK(pq.empty());
160   CHECK_EQ(pq.size(), 0);
161   int v;
162   CHECK(!pq.tryPop(v));
163
164   CHECK(pq.tryPush(1));
165   CHECK(pq.tryPush(2));
166   CHECK(!pq.empty());
167   CHECK_EQ(pq.size(), 2);
168
169   pq.peek(v);
170   CHECK_EQ(v, 2); // higher value has higher priority
171   CHECK(pq.tryPeek(v));
172   CHECK_EQ(v, 2);
173   CHECK(!pq.empty());
174   CHECK_EQ(pq.size(), 2);
175
176   CHECK(pq.tryPop(v));
177   CHECK_EQ(v, 2);
178   CHECK(!pq.empty());
179   CHECK_EQ(pq.size(), 1);
180
181   CHECK(pq.tryPop(v));
182   CHECK_EQ(v, 1);
183   CHECK(pq.empty());
184   CHECK_EQ(pq.size(), 0);
185 }
186
187 TEST(FCPriQueue, bounded) {
188   FCPQ pq(1);
189   CHECK(pq.tryPush(1));
190   CHECK(!pq.tryPush(1));
191   CHECK_EQ(pq.size(), 1);
192   CHECK(!pq.empty());
193   int v;
194   CHECK(pq.tryPop(v));
195   CHECK_EQ(v, 1);
196   CHECK_EQ(pq.size(), 0);
197   CHECK(pq.empty());
198 }
199
200 TEST(FCPriQueue, timeout) {
201   FCPQ pq(1);
202   int v;
203   CHECK(!pq.tryPeek(
204       v,
205       std::chrono::steady_clock::now() + std::chrono::microseconds(1000)));
206   CHECK(!pq.tryPop(
207       v,
208       std::chrono::steady_clock::now() + std::chrono::microseconds(1000)));
209   pq.push(10);
210   CHECK(!pq.tryPush(
211       20,
212       std::chrono::steady_clock::now() + std::chrono::microseconds(1000)));
213 }
214
215 TEST(FCPriQueue, push_pop) {
216   int ops = 1000;
217   int work = 0;
218   std::chrono::steady_clock::time_point when =
219       std::chrono::steady_clock::now() + std::chrono::hours(24);
220   for (auto n : nthr) {
221     nthreads = n;
222     FCPQ pq(10000);
223     auto fn = [&](uint32_t tid) {
224       for (int i = tid; i < ops; i += nthreads) {
225         CHECK(pq.tryPush(i));
226         CHECK(pq.tryPush(i, when));
227         pq.push(i);
228         doWork(work);
229         int v;
230         CHECK(pq.tryPop(v));
231         CHECK(pq.tryPop(v, when));
232         pq.pop(v);
233         doWork(work);
234       }
235     };
236     run_once(pq, fn);
237   }
238 }
239
240 enum Exp {
241   NoFC,
242   FCNonBlock,
243   FCBlock,
244   FCTimed
245 };
246
247 static uint64_t test(std::string name, Exp exp, uint64_t base) {
248   int ops = FLAGS_ops;
249   int work = FLAGS_work;
250
251   uint64_t min = UINTMAX_MAX;
252   uint64_t max = 0;
253   uint64_t sum = 0;
254
255   for (int r = 0; r < FLAGS_reps; ++r) {
256     uint64_t dur;
257     switch(exp) {
258       case NoFC: {
259         Baseline pq;
260         auto fn = [&](uint32_t tid) {
261           for (int i = tid; i < ops; i += nthreads) {
262             CHECK(pq.tryPush(i));
263             doWork(work);
264             int v;
265             CHECK(pq.tryPop(v));
266             doWork(work);
267           }
268         };
269         dur = run_once(pq, fn);
270       } break;
271       case FCNonBlock: {
272         FCPQ pq;
273         auto fn = [&](uint32_t tid) {
274           for (int i = tid; i < ops; i += nthreads) {
275             CHECK(pq.tryPush(i));
276             doWork(work);
277             int v;
278             CHECK(pq.tryPop(v));
279             doWork(work);
280           }
281         };
282         dur = run_once(pq, fn);
283       } break;
284       case FCBlock: {
285         FCPQ pq;
286         auto fn = [&](uint32_t tid) {
287           for (int i = tid; i < ops; i += nthreads) {
288             pq.push(i);
289             doWork(work);
290             int v;
291             pq.pop(v);
292             doWork(work);
293           }
294         };
295         dur = run_once(pq, fn);
296       } break;
297       case FCTimed: {
298         FCPQ pq;
299         auto fn = [&](uint32_t tid) {
300           std::chrono::steady_clock::time_point when =
301               std::chrono::steady_clock::now() + std::chrono::hours(24);
302           for (int i = tid; i < ops; i += nthreads) {
303             CHECK(pq.tryPush(i, when));
304             doWork(work);
305             int v;
306             CHECK(pq.tryPop(v, when));
307             doWork(work);
308           }
309         };
310         dur = run_once(pq, fn);
311       } break;
312       default:
313         CHECK(false);
314     }
315
316     sum += dur;
317     min = std::min(min, dur);
318     max = std::max(max, dur);
319   }
320
321   uint64_t avg = sum / FLAGS_reps;
322   uint64_t res = min;
323   std::cout << name;
324   std::cout << "   " << std::setw(4) << max / FLAGS_ops << " ns";
325   std::cout << "   " << std::setw(4) << avg / FLAGS_ops << " ns";
326   std::cout << "   " << std::setw(4) << res / FLAGS_ops << " ns";
327   if (base) {
328     std::cout << " " << std::setw(3) << 100 * base / res << "%";
329   }
330   std::cout << std::endl;
331   return res;
332 }
333
334 TEST(FCPriQueue, bench) {
335   if (!FLAGS_bench) {
336     return;
337   }
338
339   std::cout << "Test_name, Max time, Avg time, Min time, % base min / min"
340             << std::endl;
341   for (int i : nthr) {
342     nthreads = i;
343     std::cout << "\n------------------------------------ Number of threads = "
344               << i
345               << std::endl;
346     uint64_t base =
347     test("baseline                    ", NoFC, 0);
348     test("baseline - dup              ", NoFC, base);
349     std::cout << "---- fc -------------------------------" << std::endl;
350     test("fc non-blocking             ", FCNonBlock, base);
351     test("fc non-blocking - dup       ", FCNonBlock, base);
352     test("fc timed                    ", FCTimed, base);
353     test("fc timed - dup              ", FCTimed, base);
354     test("fc blocking                 ", FCBlock, base);
355     test("fc blocking - dup           ", FCBlock, base);
356   }
357 }
358
359 /*
360 $ numactl -N 1 folly/experimental/test/fc_pri_queue_test --bench
361
362 [ RUN      ] FCPriQueue.bench
363 Test_name, Max time, Avg time, Min time, % base min / min
364
365 ------------------------------------ Number of threads = 1
366 baseline                        815 ns    793 ns    789 ns
367 baseline - dup                  886 ns    827 ns    789 ns  99%
368 ---- fc -------------------------------
369 fc non-blocking                 881 ns    819 ns    789 ns  99%
370 fc non-blocking - dup           833 ns    801 ns    786 ns 100%
371 fc timed                        863 ns    801 ns    781 ns 100%
372 fc timed - dup                  830 ns    793 ns    782 ns 100%
373 fc blocking                    1043 ns    820 ns    789 ns  99%
374 fc blocking - dup               801 ns    793 ns    789 ns 100%
375
376 ------------------------------------ Number of threads = 2
377 baseline                        579 ns    557 ns    540 ns
378 baseline - dup                  905 ns    621 ns    538 ns 100%
379 ---- fc -------------------------------
380 fc non-blocking                 824 ns    642 ns    568 ns  95%
381 fc non-blocking - dup           737 ns    645 ns    591 ns  91%
382 fc timed                        654 ns    590 ns    542 ns  99%
383 fc timed - dup                  666 ns    586 ns    534 ns 101%
384 fc blocking                     622 ns    599 ns    575 ns  93%
385 fc blocking - dup               677 ns    618 ns    570 ns  94%
386
387 ------------------------------------ Number of threads = 3
388 baseline                        740 ns    717 ns    699 ns
389 baseline - dup                  742 ns    716 ns    697 ns 100%
390 ---- fc -------------------------------
391 fc non-blocking                 730 ns    689 ns    645 ns 108%
392 fc non-blocking - dup           719 ns    695 ns    639 ns 109%
393 fc timed                        695 ns    650 ns    597 ns 117%
394 fc timed - dup                  694 ns    654 ns    624 ns 112%
395 fc blocking                     711 ns    687 ns    669 ns 104%
396 fc blocking - dup               716 ns    695 ns    624 ns 112%
397
398 ------------------------------------ Number of threads = 4
399 baseline                        777 ns    766 ns    750 ns
400 baseline - dup                  778 ns    752 ns    731 ns 102%
401 ---- fc -------------------------------
402 fc non-blocking                 653 ns    615 ns    589 ns 127%
403 fc non-blocking - dup           611 ns    593 ns    563 ns 133%
404 fc timed                        597 ns    577 ns    569 ns 131%
405 fc timed - dup                  618 ns    575 ns    546 ns 137%
406 fc blocking                     603 ns    590 ns    552 ns 135%
407 fc blocking - dup               614 ns    590 ns    556 ns 134%
408
409 ------------------------------------ Number of threads = 6
410 baseline                        925 ns    900 ns    869 ns
411 baseline - dup                  930 ns    895 ns    866 ns 100%
412 ---- fc -------------------------------
413 fc non-blocking                 568 ns    530 ns    481 ns 180%
414 fc non-blocking - dup           557 ns    521 ns    488 ns 177%
415 fc timed                        516 ns    496 ns    463 ns 187%
416 fc timed - dup                  517 ns    500 ns    474 ns 183%
417 fc blocking                     559 ns    513 ns    450 ns 193%
418 fc blocking - dup               564 ns    528 ns    466 ns 186%
419
420 ------------------------------------ Number of threads = 8
421 baseline                        999 ns    981 ns    962 ns
422 baseline - dup                  998 ns    984 ns    965 ns  99%
423 ---- fc -------------------------------
424 fc non-blocking                 491 ns    386 ns    317 ns 303%
425 fc non-blocking - dup           433 ns    344 ns    298 ns 322%
426 fc timed                        445 ns    348 ns    294 ns 327%
427 fc timed - dup                  446 ns    357 ns    292 ns 328%
428 fc blocking                     505 ns    389 ns    318 ns 302%
429 fc blocking - dup               416 ns    333 ns    293 ns 328%
430
431 ------------------------------------ Number of threads = 12
432 baseline                       1092 ns   1080 ns   1072 ns
433 baseline - dup                 1085 ns   1074 ns   1065 ns 100%
434 ---- fc -------------------------------
435 fc non-blocking                 360 ns    283 ns    258 ns 415%
436 fc non-blocking - dup           340 ns    278 ns    250 ns 427%
437 fc timed                        271 ns    260 ns    249 ns 429%
438 fc timed - dup                  397 ns    283 ns    253 ns 423%
439 fc blocking                     331 ns    279 ns    258 ns 415%
440 fc blocking - dup               358 ns    280 ns    259 ns 412%
441
442 ------------------------------------ Number of threads = 16
443 baseline                       1120 ns   1115 ns   1103 ns
444 baseline - dup                 1122 ns   1118 ns   1114 ns  99%
445 ---- fc -------------------------------
446 fc non-blocking                 339 ns    297 ns    246 ns 448%
447 fc non-blocking - dup           353 ns    301 ns    264 ns 417%
448 fc timed                        326 ns    287 ns    247 ns 445%
449 fc timed - dup                  338 ns    294 ns    259 ns 425%
450 fc blocking                     329 ns    288 ns    247 ns 445%
451 fc blocking - dup               375 ns    308 ns    265 ns 415%
452
453 ------------------------------------ Number of threads = 24
454 baseline                       1073 ns   1068 ns   1064 ns
455 baseline - dup                 1075 ns   1071 ns   1069 ns  99%
456 ---- fc -------------------------------
457 fc non-blocking                 439 ns    342 ns    278 ns 382%
458 fc non-blocking - dup           389 ns    318 ns    291 ns 364%
459 fc timed                        368 ns    324 ns    266 ns 398%
460 fc timed - dup                  412 ns    328 ns    302 ns 352%
461 fc blocking                     425 ns    345 ns    275 ns 386%
462 fc blocking - dup               429 ns    340 ns    269 ns 395%
463
464 ------------------------------------ Number of threads = 32
465 baseline                       1001 ns    990 ns    981 ns
466 baseline - dup                 1002 ns    992 ns    983 ns  99%
467 ---- fc -------------------------------
468 fc non-blocking                 404 ns    342 ns    273 ns 359%
469 fc non-blocking - dup           395 ns    316 ns    259 ns 378%
470 fc timed                        379 ns    330 ns    258 ns 380%
471 fc timed - dup                  392 ns    335 ns    274 ns 357%
472 fc blocking                     423 ns    340 ns    277 ns 353%
473 fc blocking - dup               445 ns    359 ns    275 ns 356%
474
475 ------------------------------------ Number of threads = 48
476 baseline                        978 ns    975 ns    971 ns
477 baseline - dup                  977 ns    974 ns    972 ns  99%
478 ---- fc -------------------------------
479 fc non-blocking                 424 ns    327 ns    258 ns 375%
480 fc non-blocking - dup           378 ns    317 ns    256 ns 379%
481 fc timed                        368 ns    311 ns    277 ns 350%
482 fc timed - dup                  385 ns    310 ns    251 ns 385%
483 fc blocking                     422 ns    313 ns    255 ns 380%
484 fc blocking - dup               406 ns    314 ns    258 ns 376%
485
486 ------------------------------------ Number of threads = 64
487 baseline                        993 ns    981 ns    974 ns
488 baseline - dup                  984 ns    979 ns    975 ns  99%
489 ---- fc -------------------------------
490 fc non-blocking                 353 ns    301 ns    266 ns 365%
491 fc non-blocking - dup           339 ns    301 ns    271 ns 358%
492 fc timed                        399 ns    321 ns    259 ns 375%
493 fc timed - dup                  381 ns    300 ns    263 ns 369%
494 fc blocking                     390 ns    301 ns    251 ns 387%
495 fc blocking - dup               345 ns    289 ns    259 ns 374%
496 [       OK ] FCPriQueue.bench (112424 ms)
497
498 $ lscpu
499 Architecture:          x86_64
500 CPU op-mode(s):        32-bit, 64-bit
501 Byte Order:            Little Endian
502 CPU(s):                32
503 On-line CPU(s) list:   0-31
504 Thread(s) per core:    2
505 Core(s) per socket:    8
506 Socket(s):             2
507 NUMA node(s):          2
508 Vendor ID:             GenuineIntel
509 CPU family:            6
510 Model:                 45
511 Model name:            Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz
512 Stepping:              6
513 CPU MHz:               2200.000
514 CPU max MHz:           2200.0000
515 CPU min MHz:           1200.0000
516 BogoMIPS:              4399.87
517 Virtualization:        VT-x
518 L1d cache:             32K
519 L1i cache:             32K
520 L2 cache:              256K
521 L3 cache:              20480K
522 NUMA node0 CPU(s):     0-7,16-23
523 NUMA node1 CPU(s):     8-15,24-31
524 Flags:                 fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 cx16 xtpr pdcm pcid dca sse4_1 sse4_2 x2apic popcnt tsc_deadline_timer aes xsave avx lahf_lm epb tpr_shadow vnmi flexpriority ept vpid xsaveopt dtherm arat pln pts
525
526  */