Use standard variadic macros instead of gcc-specific ones
[folly.git] / folly / test / MPMCQueueTest.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/test/DeterministicSchedule.h>
21
22 #include <boost/intrusive_ptr.hpp>
23 #include <memory>
24 #include <functional>
25 #include <thread>
26 #include <utility>
27 #include <unistd.h>
28 #include <sys/time.h>
29 #include <sys/resource.h>
30
31 #include <gflags/gflags.h>
32 #include <gtest/gtest.h>
33
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
35
36 using namespace folly;
37 using namespace detail;
38 using namespace test;
39
40 typedef DeterministicSchedule DSched;
41
42
43 template <template<typename> class Atom>
44 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
45   TurnSequencer<Atom> seq(init);
46   Atom<int> spinThreshold(0);
47
48   int prev = -1;
49   std::vector<std::thread> threads(numThreads);
50   for (int i = 0; i < numThreads; ++i) {
51     threads[i] = DSched::thread([&, i]{
52       for (int op = i; op < numOps; op += numThreads) {
53         seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
54         EXPECT_EQ(prev, op - 1);
55         prev = op;
56         seq.completeTurn(init + op);
57       }
58     });
59   }
60
61   for (auto& thr : threads) {
62     DSched::join(thr);
63   }
64
65   EXPECT_EQ(prev, numOps - 1);
66 }
67
68 TEST(MPMCQueue, sequencer) {
69   run_mt_sequencer_test<std::atomic>(1, 100, 0);
70   run_mt_sequencer_test<std::atomic>(2, 100000, -100);
71   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
72 }
73
74 TEST(MPMCQueue, sequencer_deterministic) {
75   DSched sched(DSched::uniform(0));
76   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
77   run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
78   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
79 }
80
81 template <typename T>
82 void runElementTypeTest(T&& src) {
83   MPMCQueue<T> cq(10);
84   cq.blockingWrite(std::move(src));
85   T dest;
86   cq.blockingRead(dest);
87   EXPECT_TRUE(cq.write(std::move(dest)));
88   EXPECT_TRUE(cq.read(dest));
89 }
90
91 struct RefCounted {
92   mutable std::atomic<int> rc;
93
94   RefCounted() : rc(0) {}
95 };
96
97 void intrusive_ptr_add_ref(RefCounted const* p) {
98   p->rc++;
99 }
100
101 void intrusive_ptr_release(RefCounted const* p) {
102   if (--(p->rc)) {
103     delete p;
104   }
105 }
106
107 TEST(MPMCQueue, lots_of_element_types) {
108   runElementTypeTest(10);
109   runElementTypeTest(std::string("abc"));
110   runElementTypeTest(std::make_pair(10, std::string("def")));
111   runElementTypeTest(std::vector<std::string>{ { "abc" } });
112   runElementTypeTest(std::make_shared<char>('a'));
113   runElementTypeTest(folly::make_unique<char>('a'));
114   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
115 }
116
117 TEST(MPMCQueue, single_thread_enqdeq) {
118   MPMCQueue<int> cq(10);
119
120   for (int pass = 0; pass < 10; ++pass) {
121     for (int i = 0; i < 10; ++i) {
122       EXPECT_TRUE(cq.write(i));
123     }
124     EXPECT_FALSE(cq.write(-1));
125     EXPECT_FALSE(cq.isEmpty());
126     EXPECT_EQ(cq.size(), 10);
127
128     for (int i = 0; i < 5; ++i) {
129       int dest = -1;
130       EXPECT_TRUE(cq.read(dest));
131       EXPECT_EQ(dest, i);
132     }
133     for (int i = 5; i < 10; ++i) {
134       int dest = -1;
135       cq.blockingRead(dest);
136       EXPECT_EQ(dest, i);
137     }
138     int dest = -1;
139     EXPECT_FALSE(cq.read(dest));
140     EXPECT_EQ(dest, -1);
141
142     EXPECT_TRUE(cq.isEmpty());
143     EXPECT_EQ(cq.size(), 0);
144   }
145 }
146
147 TEST(MPMCQueue, tryenq_capacity_test) {
148   for (size_t cap = 1; cap < 100; ++cap) {
149     MPMCQueue<int> cq(cap);
150     for (int i = 0; i < cap; ++i) {
151       EXPECT_TRUE(cq.write(i));
152     }
153     EXPECT_FALSE(cq.write(100));
154   }
155 }
156
157 TEST(MPMCQueue, enq_capacity_test) {
158   for (auto cap : { 1, 100, 10000 }) {
159     MPMCQueue<int> cq(cap);
160     for (int i = 0; i < cap; ++i) {
161       cq.blockingWrite(i);
162     }
163     int t = 0;
164     int when;
165     auto thr = std::thread([&]{
166       cq.blockingWrite(100);
167       when = t;
168     });
169     usleep(2000);
170     t = 1;
171     int dummy;
172     cq.blockingRead(dummy);
173     thr.join();
174     EXPECT_EQ(when, 1);
175   }
176 }
177
178 template <template<typename> class Atom>
179 void runTryEnqDeqTest(int numThreads, int numOps) {
180   // write and read aren't linearizable, so we don't have
181   // hard guarantees on their individual behavior.  We can still test
182   // correctness in aggregate
183   MPMCQueue<int,Atom> cq(numThreads);
184
185   uint64_t n = numOps;
186   std::vector<std::thread> threads(numThreads);
187   std::atomic<uint64_t> sum(0);
188   for (int t = 0; t < numThreads; ++t) {
189     threads[t] = DSched::thread([&,t]{
190       uint64_t threadSum = 0;
191       int src = t;
192       // received doesn't reflect any actual values, we just start with
193       // t and increment by numThreads to get the rounding of termination
194       // correct if numThreads doesn't evenly divide numOps
195       int received = t;
196       while (src < n || received < n) {
197         if (src < n && cq.write(src)) {
198           src += numThreads;
199         }
200
201         int dst;
202         if (received < n && cq.read(dst)) {
203           received += numThreads;
204           threadSum += dst;
205         }
206       }
207       sum += threadSum;
208     });
209   }
210   for (auto& t : threads) {
211     DSched::join(t);
212   }
213   EXPECT_TRUE(cq.isEmpty());
214   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
215 }
216
217 TEST(MPMCQueue, mt_try_enq_deq) {
218   int nts[] = { 1, 3, 100 };
219
220   int n = 100000;
221   for (int nt : nts) {
222     runTryEnqDeqTest<std::atomic>(nt, n);
223   }
224 }
225
226 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
227   int nts[] = { 3, 10 };
228
229   long seed = 0;
230   LOG(INFO) << "using seed " << seed;
231
232   int n = 1000;
233   for (int nt : nts) {
234     {
235       DSched sched(DSched::uniform(seed));
236       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
237     }
238     {
239       DSched sched(DSched::uniformSubset(seed, 2));
240       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
241     }
242   }
243 }
244
245 uint64_t nowMicro() {
246   timeval tv;
247   gettimeofday(&tv, 0);
248   return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
249 }
250
251 template <typename Q>
252 std::string producerConsumerBench(Q&& queue, std::string qName,
253                                   int numProducers, int numConsumers,
254                                   int numOps, bool ignoreContents = false) {
255   Q& q = queue;
256
257   struct rusage beginUsage;
258   getrusage(RUSAGE_SELF, &beginUsage);
259
260   auto beginMicro = nowMicro();
261
262   uint64_t n = numOps;
263   std::atomic<uint64_t> sum(0);
264
265   std::vector<std::thread> producers(numProducers);
266   for (int t = 0; t < numProducers; ++t) {
267     producers[t] = DSched::thread([&,t]{
268       for (int i = t; i < numOps; i += numProducers) {
269         q.blockingWrite(i);
270       }
271     });
272   }
273
274   std::vector<std::thread> consumers(numConsumers);
275   for (int t = 0; t < numConsumers; ++t) {
276     consumers[t] = DSched::thread([&,t]{
277       uint64_t localSum = 0;
278       for (int i = t; i < numOps; i += numConsumers) {
279         int dest = -1;
280         q.blockingRead(dest);
281         EXPECT_FALSE(dest == -1);
282         localSum += dest;
283       }
284       sum += localSum;
285     });
286   }
287
288   for (auto& t : producers) {
289     DSched::join(t);
290   }
291   for (auto& t : consumers) {
292     DSched::join(t);
293   }
294   if (!ignoreContents) {
295     EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
296   }
297
298   auto endMicro = nowMicro();
299
300   struct rusage endUsage;
301   getrusage(RUSAGE_SELF, &endUsage);
302
303   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
304   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
305       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
306
307   return folly::format(
308       "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff",
309       qName, numProducers, numConsumers, nanosPer, csw, n).str();
310 }
311
312
313 TEST(MPMCQueue, mt_prod_cons_deterministic) {
314   // we use the Bench method, but perf results are meaningless under DSched
315   DSched sched(DSched::uniform(0));
316
317   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
318           "", 1, 1, 1000);
319   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
320           "", 10, 10, 1000);
321   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
322           "", 1, 1, 1000);
323   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
324           "", 10, 10, 1000);
325   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1),
326           "", 10, 10, 1000);
327 }
328
329 #define PC_BENCH(q, np, nc, ...) \
330     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
331
332 TEST(MPMCQueue, mt_prod_cons) {
333   int n = 100000;
334   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n);
335   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n);
336   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n);
337   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n);
338   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n);
339   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n);
340   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n);
341   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n);
342   LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
343 }
344
345 template <template<typename> class Atom>
346 uint64_t runNeverFailTest(int numThreads, int numOps) {
347   // always #enq >= #deq
348   MPMCQueue<int,Atom> cq(numThreads);
349
350   uint64_t n = numOps;
351   auto beginMicro = nowMicro();
352
353   std::vector<std::thread> threads(numThreads);
354   std::atomic<uint64_t> sum(0);
355   for (int t = 0; t < numThreads; ++t) {
356     threads[t] = DSched::thread([&,t]{
357       uint64_t threadSum = 0;
358       for (int i = t; i < n; i += numThreads) {
359         // enq + deq
360         EXPECT_TRUE(cq.writeIfNotFull(i));
361
362         int dest = -1;
363         EXPECT_TRUE(cq.readIfNotEmpty(dest));
364         EXPECT_TRUE(dest >= 0);
365         threadSum += dest;
366       }
367       sum += threadSum;
368     });
369   }
370   for (auto& t : threads) {
371     DSched::join(t);
372   }
373   EXPECT_TRUE(cq.isEmpty());
374   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
375
376   return nowMicro() - beginMicro;
377 }
378
379 TEST(MPMCQueue, mt_never_fail) {
380   int nts[] = { 1, 3, 100 };
381
382   int n = 100000;
383   for (int nt : nts) {
384     uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
385     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
386               << nt << " threads";
387   }
388 }
389
390 TEST(MPMCQueue, mt_never_fail_deterministic) {
391   int nts[] = { 3, 10 };
392
393   long seed = 0; // nowMicro() % 10000;
394   LOG(INFO) << "using seed " << seed;
395
396   int n = 1000;
397   for (int nt : nts) {
398     {
399       DSched sched(DSched::uniform(seed));
400       runNeverFailTest<DeterministicAtomic>(nt, n);
401     }
402     {
403       DSched sched(DSched::uniformSubset(seed, 2));
404       runNeverFailTest<DeterministicAtomic>(nt, n);
405     }
406   }
407 }
408
409 enum LifecycleEvent {
410   NOTHING = -1,
411   DEFAULT_CONSTRUCTOR,
412   COPY_CONSTRUCTOR,
413   MOVE_CONSTRUCTOR,
414   TWO_ARG_CONSTRUCTOR,
415   COPY_OPERATOR,
416   MOVE_OPERATOR,
417   DESTRUCTOR,
418   MAX_LIFECYCLE_EVENT
419 };
420
421 static __thread int lc_counts[MAX_LIFECYCLE_EVENT];
422 static __thread int lc_prev[MAX_LIFECYCLE_EVENT];
423
424 static int lc_outstanding() {
425   return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
426       lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
427       lc_counts[DESTRUCTOR];
428 }
429
430 static void lc_snap() {
431   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
432     lc_prev[i] = lc_counts[i];
433   }
434 }
435
436 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
437
438 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
439   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
440     int delta = i == what || i == what2 ? 1 : 0;
441     EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
442         << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
443         << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
444         << ", from line " << lineno;
445   }
446   lc_snap();
447 }
448
449 template <typename R>
450 struct Lifecycle {
451   typedef R IsRelocatable;
452
453   bool constructed;
454
455   Lifecycle() noexcept : constructed(true) {
456     ++lc_counts[DEFAULT_CONSTRUCTOR];
457   }
458
459   explicit Lifecycle(int n, char const* s) noexcept : constructed(true) {
460     ++lc_counts[TWO_ARG_CONSTRUCTOR];
461   }
462
463   Lifecycle(const Lifecycle& rhs) noexcept : constructed(true) {
464     ++lc_counts[COPY_CONSTRUCTOR];
465   }
466
467   Lifecycle(Lifecycle&& rhs) noexcept : constructed(true) {
468     ++lc_counts[MOVE_CONSTRUCTOR];
469   }
470
471   Lifecycle& operator= (const Lifecycle& rhs) noexcept {
472     ++lc_counts[COPY_OPERATOR];
473     return *this;
474   }
475
476   Lifecycle& operator= (Lifecycle&& rhs) noexcept {
477     ++lc_counts[MOVE_OPERATOR];
478     return *this;
479   }
480
481   ~Lifecycle() noexcept {
482     ++lc_counts[DESTRUCTOR];
483     assert(lc_outstanding() >= 0);
484     assert(constructed);
485     constructed = false;
486   }
487 };
488
489 template <typename R>
490 void runPerfectForwardingTest() {
491   lc_snap();
492   EXPECT_EQ(lc_outstanding(), 0);
493
494   {
495     MPMCQueue<Lifecycle<R>> queue(50);
496     LIFECYCLE_STEP(NOTHING);
497
498     for (int pass = 0; pass < 10; ++pass) {
499       for (int i = 0; i < 10; ++i) {
500         queue.blockingWrite();
501         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
502
503         queue.blockingWrite(1, "one");
504         LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
505
506         {
507           Lifecycle<R> src;
508           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
509           queue.blockingWrite(std::move(src));
510           LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
511         }
512         LIFECYCLE_STEP(DESTRUCTOR);
513
514         {
515           Lifecycle<R> src;
516           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
517           queue.blockingWrite(src);
518           LIFECYCLE_STEP(COPY_CONSTRUCTOR);
519         }
520         LIFECYCLE_STEP(DESTRUCTOR);
521
522         EXPECT_TRUE(queue.write());
523         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
524       }
525
526       EXPECT_EQ(queue.size(), 50);
527       EXPECT_FALSE(queue.write(2, "two"));
528       LIFECYCLE_STEP(NOTHING);
529
530       for (int i = 0; i < 50; ++i) {
531         {
532           Lifecycle<R> node;
533           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
534
535           queue.blockingRead(node);
536           if (R::value) {
537             // relocatable, moved via memcpy
538             LIFECYCLE_STEP(DESTRUCTOR);
539           } else {
540             LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
541           }
542         }
543         LIFECYCLE_STEP(DESTRUCTOR);
544       }
545
546       EXPECT_EQ(queue.size(), 0);
547     }
548
549     // put one element back before destruction
550     {
551       Lifecycle<R> src(3, "three");
552       LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
553       queue.write(std::move(src));
554       LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
555     }
556     LIFECYCLE_STEP(DESTRUCTOR); // destroy src
557   }
558   LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
559
560   EXPECT_EQ(lc_outstanding(), 0);
561 }
562
563 TEST(MPMCQueue, perfect_forwarding) {
564   runPerfectForwardingTest<std::false_type>();
565 }
566
567 TEST(MPMCQueue, perfect_forwarding_relocatable) {
568   runPerfectForwardingTest<std::true_type>();
569 }
570
571 TEST(MPMCQueue, queue_moving) {
572   lc_snap();
573   EXPECT_EQ(lc_outstanding(), 0);
574
575   {
576     MPMCQueue<Lifecycle<std::false_type>> a(50);
577     LIFECYCLE_STEP(NOTHING);
578
579     a.blockingWrite();
580     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
581
582     // move constructor
583     MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
584     LIFECYCLE_STEP(NOTHING);
585     EXPECT_EQ(a.capacity(), 0);
586     EXPECT_EQ(a.size(), 0);
587     EXPECT_EQ(b.capacity(), 50);
588     EXPECT_EQ(b.size(), 1);
589
590     b.blockingWrite();
591     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
592
593     // move operator
594     MPMCQueue<Lifecycle<std::false_type>> c;
595     LIFECYCLE_STEP(NOTHING);
596     c = std::move(b);
597     LIFECYCLE_STEP(NOTHING);
598     EXPECT_EQ(c.capacity(), 50);
599     EXPECT_EQ(c.size(), 2);
600
601     {
602       Lifecycle<std::false_type> dst;
603       LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
604       c.blockingRead(dst);
605       LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
606
607       {
608         // swap
609         MPMCQueue<Lifecycle<std::false_type>> d(10);
610         LIFECYCLE_STEP(NOTHING);
611         std::swap(c, d);
612         LIFECYCLE_STEP(NOTHING);
613         EXPECT_EQ(c.capacity(), 10);
614         EXPECT_TRUE(c.isEmpty());
615         EXPECT_EQ(d.capacity(), 50);
616         EXPECT_EQ(d.size(), 1);
617
618         d.blockingRead(dst);
619         LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
620
621         c.blockingWrite(dst);
622         LIFECYCLE_STEP(COPY_CONSTRUCTOR);
623
624         d.blockingWrite(std::move(dst));
625         LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
626       } // d goes out of scope
627       LIFECYCLE_STEP(DESTRUCTOR);
628     } // dst goes out of scope
629     LIFECYCLE_STEP(DESTRUCTOR);
630   } // c goes out of scope
631   LIFECYCLE_STEP(DESTRUCTOR);
632 }
633
634 int main(int argc, char ** argv) {
635   testing::InitGoogleTest(&argc, argv);
636   google::ParseCommandLineFlags(&argc, &argv, true);
637   return RUN_ALL_TESTS();
638 }