Fix a folly build failure under clang: MPMCQueueTest.cpp.
[folly.git] / folly / test / MPMCQueueTest.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/test/DeterministicSchedule.h>
21
22 #include <boost/intrusive_ptr.hpp>
23 #include <memory>
24 #include <functional>
25 #include <thread>
26 #include <utility>
27 #include <unistd.h>
28 #include <sys/time.h>
29 #include <sys/resource.h>
30
31 #include <gflags/gflags.h>
32 #include <gtest/gtest.h>
33
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
35
36 using namespace folly;
37 using namespace detail;
38 using namespace test;
39
40 typedef DeterministicSchedule DSched;
41
42 template <template<typename> class Atom>
43 void run_mt_sequencer_thread(
44     int numThreads,
45     int numOps,
46     uint32_t init,
47     TurnSequencer<Atom>& seq,
48     Atom<int>& spinThreshold,
49     int& prev,
50     int i) {
51   for (int op = i; op < numOps; op += numThreads) {
52     seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
53     EXPECT_EQ(prev, op - 1);
54     prev = op;
55     seq.completeTurn(init + op);
56   }
57 }
58
59 template <template<typename> class Atom>
60 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
61   TurnSequencer<Atom> seq(init);
62   Atom<int> spinThreshold(0);
63
64   int prev = -1;
65   std::vector<std::thread> threads(numThreads);
66   for (int i = 0; i < numThreads; ++i) {
67     threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
68           numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
69           std::ref(prev), i));
70   }
71
72   for (auto& thr : threads) {
73     DSched::join(thr);
74   }
75
76   EXPECT_EQ(prev, numOps - 1);
77 }
78
79 TEST(MPMCQueue, sequencer) {
80   run_mt_sequencer_test<std::atomic>(1, 100, 0);
81   run_mt_sequencer_test<std::atomic>(2, 100000, -100);
82   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
83 }
84
85 TEST(MPMCQueue, sequencer_deterministic) {
86   DSched sched(DSched::uniform(0));
87   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
88   run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
89   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
90 }
91
92 template <typename T>
93 void runElementTypeTest(T&& src) {
94   MPMCQueue<T> cq(10);
95   cq.blockingWrite(std::move(src));
96   T dest;
97   cq.blockingRead(dest);
98   EXPECT_TRUE(cq.write(std::move(dest)));
99   EXPECT_TRUE(cq.read(dest));
100 }
101
102 struct RefCounted {
103   mutable std::atomic<int> rc;
104
105   RefCounted() : rc(0) {}
106 };
107
108 void intrusive_ptr_add_ref(RefCounted const* p) {
109   p->rc++;
110 }
111
112 void intrusive_ptr_release(RefCounted const* p) {
113   if (--(p->rc)) {
114     delete p;
115   }
116 }
117
118 TEST(MPMCQueue, lots_of_element_types) {
119   runElementTypeTest(10);
120   runElementTypeTest(std::string("abc"));
121   runElementTypeTest(std::make_pair(10, std::string("def")));
122   runElementTypeTest(std::vector<std::string>{ { "abc" } });
123   runElementTypeTest(std::make_shared<char>('a'));
124   runElementTypeTest(folly::make_unique<char>('a'));
125   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
126 }
127
128 TEST(MPMCQueue, single_thread_enqdeq) {
129   MPMCQueue<int> cq(10);
130
131   for (int pass = 0; pass < 10; ++pass) {
132     for (int i = 0; i < 10; ++i) {
133       EXPECT_TRUE(cq.write(i));
134     }
135     EXPECT_FALSE(cq.write(-1));
136     EXPECT_FALSE(cq.isEmpty());
137     EXPECT_EQ(cq.size(), 10);
138
139     for (int i = 0; i < 5; ++i) {
140       int dest = -1;
141       EXPECT_TRUE(cq.read(dest));
142       EXPECT_EQ(dest, i);
143     }
144     for (int i = 5; i < 10; ++i) {
145       int dest = -1;
146       cq.blockingRead(dest);
147       EXPECT_EQ(dest, i);
148     }
149     int dest = -1;
150     EXPECT_FALSE(cq.read(dest));
151     EXPECT_EQ(dest, -1);
152
153     EXPECT_TRUE(cq.isEmpty());
154     EXPECT_EQ(cq.size(), 0);
155   }
156 }
157
158 TEST(MPMCQueue, tryenq_capacity_test) {
159   for (size_t cap = 1; cap < 100; ++cap) {
160     MPMCQueue<int> cq(cap);
161     for (int i = 0; i < cap; ++i) {
162       EXPECT_TRUE(cq.write(i));
163     }
164     EXPECT_FALSE(cq.write(100));
165   }
166 }
167
168 TEST(MPMCQueue, enq_capacity_test) {
169   for (auto cap : { 1, 100, 10000 }) {
170     MPMCQueue<int> cq(cap);
171     for (int i = 0; i < cap; ++i) {
172       cq.blockingWrite(i);
173     }
174     int t = 0;
175     int when;
176     auto thr = std::thread([&]{
177       cq.blockingWrite(100);
178       when = t;
179     });
180     usleep(2000);
181     t = 1;
182     int dummy;
183     cq.blockingRead(dummy);
184     thr.join();
185     EXPECT_EQ(when, 1);
186   }
187 }
188
189 template <template<typename> class Atom>
190 void runTryEnqDeqThread(
191     int numThreads,
192     int n, /*numOps*/
193     MPMCQueue<int, Atom>& cq,
194     std::atomic<uint64_t>& sum,
195     int t) {
196   uint64_t threadSum = 0;
197   int src = t;
198   // received doesn't reflect any actual values, we just start with
199   // t and increment by numThreads to get the rounding of termination
200   // correct if numThreads doesn't evenly divide numOps
201   int received = t;
202   while (src < n || received < n) {
203     if (src < n && cq.write(src)) {
204       src += numThreads;
205     }
206
207     int dst;
208     if (received < n && cq.read(dst)) {
209       received += numThreads;
210       threadSum += dst;
211     }
212   }
213   sum += threadSum;
214 }
215
216 template <template<typename> class Atom>
217 void runTryEnqDeqTest(int numThreads, int numOps) {
218   // write and read aren't linearizable, so we don't have
219   // hard guarantees on their individual behavior.  We can still test
220   // correctness in aggregate
221   MPMCQueue<int,Atom> cq(numThreads);
222
223   uint64_t n = numOps;
224   std::vector<std::thread> threads(numThreads);
225   std::atomic<uint64_t> sum(0);
226   for (int t = 0; t < numThreads; ++t) {
227     threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
228           numThreads, n, std::ref(cq), std::ref(sum), t));
229   }
230   for (auto& t : threads) {
231     DSched::join(t);
232   }
233   EXPECT_TRUE(cq.isEmpty());
234   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
235 }
236
237 TEST(MPMCQueue, mt_try_enq_deq) {
238   int nts[] = { 1, 3, 100 };
239
240   int n = 100000;
241   for (int nt : nts) {
242     runTryEnqDeqTest<std::atomic>(nt, n);
243   }
244 }
245
246 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
247   int nts[] = { 3, 10 };
248
249   long seed = 0;
250   LOG(INFO) << "using seed " << seed;
251
252   int n = 1000;
253   for (int nt : nts) {
254     {
255       DSched sched(DSched::uniform(seed));
256       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
257     }
258     {
259       DSched sched(DSched::uniformSubset(seed, 2));
260       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
261     }
262   }
263 }
264
265 uint64_t nowMicro() {
266   timeval tv;
267   gettimeofday(&tv, 0);
268   return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
269 }
270
271 template <typename Q>
272 std::string producerConsumerBench(Q&& queue, std::string qName,
273                                   int numProducers, int numConsumers,
274                                   int numOps, bool ignoreContents = false) {
275   Q& q = queue;
276
277   struct rusage beginUsage;
278   getrusage(RUSAGE_SELF, &beginUsage);
279
280   auto beginMicro = nowMicro();
281
282   uint64_t n = numOps;
283   std::atomic<uint64_t> sum(0);
284
285   std::vector<std::thread> producers(numProducers);
286   for (int t = 0; t < numProducers; ++t) {
287     producers[t] = DSched::thread([&,t]{
288       for (int i = t; i < numOps; i += numProducers) {
289         q.blockingWrite(i);
290       }
291     });
292   }
293
294   std::vector<std::thread> consumers(numConsumers);
295   for (int t = 0; t < numConsumers; ++t) {
296     consumers[t] = DSched::thread([&,t]{
297       uint64_t localSum = 0;
298       for (int i = t; i < numOps; i += numConsumers) {
299         int dest = -1;
300         q.blockingRead(dest);
301         EXPECT_FALSE(dest == -1);
302         localSum += dest;
303       }
304       sum += localSum;
305     });
306   }
307
308   for (auto& t : producers) {
309     DSched::join(t);
310   }
311   for (auto& t : consumers) {
312     DSched::join(t);
313   }
314   if (!ignoreContents) {
315     EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
316   }
317
318   auto endMicro = nowMicro();
319
320   struct rusage endUsage;
321   getrusage(RUSAGE_SELF, &endUsage);
322
323   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
324   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
325       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
326
327   return folly::format(
328       "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff",
329       qName, numProducers, numConsumers, nanosPer, csw, n).str();
330 }
331
332
333 TEST(MPMCQueue, mt_prod_cons_deterministic) {
334   // we use the Bench method, but perf results are meaningless under DSched
335   DSched sched(DSched::uniform(0));
336
337   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
338           "", 1, 1, 1000);
339   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
340           "", 10, 10, 1000);
341   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
342           "", 1, 1, 1000);
343   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
344           "", 10, 10, 1000);
345   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1),
346           "", 10, 10, 1000);
347 }
348
349 #define PC_BENCH(q, np, nc, ...) \
350     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
351
352 TEST(MPMCQueue, mt_prod_cons) {
353   int n = 100000;
354   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n);
355   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n);
356   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n);
357   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n);
358   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n);
359   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n);
360   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n);
361   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n);
362   LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
363 }
364
365 template <template<typename> class Atom>
366 void runNeverFailThread(
367     int numThreads,
368     int n, /*numOps*/
369     MPMCQueue<int, Atom>& cq,
370     std::atomic<uint64_t>& sum,
371     int t) {
372   uint64_t threadSum = 0;
373   for (int i = t; i < n; i += numThreads) {
374     // enq + deq
375     EXPECT_TRUE(cq.writeIfNotFull(i));
376
377     int dest = -1;
378     EXPECT_TRUE(cq.readIfNotEmpty(dest));
379     EXPECT_TRUE(dest >= 0);
380     threadSum += dest;
381   }
382   sum += threadSum;
383 }
384
385 template <template<typename> class Atom>
386 uint64_t runNeverFailTest(int numThreads, int numOps) {
387   // always #enq >= #deq
388   MPMCQueue<int,Atom> cq(numThreads);
389
390   uint64_t n = numOps;
391   auto beginMicro = nowMicro();
392
393   std::vector<std::thread> threads(numThreads);
394   std::atomic<uint64_t> sum(0);
395   for (int t = 0; t < numThreads; ++t) {
396     threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
397           numThreads, n, std::ref(cq), std::ref(sum), t));
398   }
399   for (auto& t : threads) {
400     DSched::join(t);
401   }
402   EXPECT_TRUE(cq.isEmpty());
403   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
404
405   return nowMicro() - beginMicro;
406 }
407
408 TEST(MPMCQueue, mt_never_fail) {
409   int nts[] = { 1, 3, 100 };
410
411   int n = 100000;
412   for (int nt : nts) {
413     uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
414     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
415               << nt << " threads";
416   }
417 }
418
419 TEST(MPMCQueue, mt_never_fail_deterministic) {
420   int nts[] = { 3, 10 };
421
422   long seed = 0; // nowMicro() % 10000;
423   LOG(INFO) << "using seed " << seed;
424
425   int n = 1000;
426   for (int nt : nts) {
427     {
428       DSched sched(DSched::uniform(seed));
429       runNeverFailTest<DeterministicAtomic>(nt, n);
430     }
431     {
432       DSched sched(DSched::uniformSubset(seed, 2));
433       runNeverFailTest<DeterministicAtomic>(nt, n);
434     }
435   }
436 }
437
438 enum LifecycleEvent {
439   NOTHING = -1,
440   DEFAULT_CONSTRUCTOR,
441   COPY_CONSTRUCTOR,
442   MOVE_CONSTRUCTOR,
443   TWO_ARG_CONSTRUCTOR,
444   COPY_OPERATOR,
445   MOVE_OPERATOR,
446   DESTRUCTOR,
447   MAX_LIFECYCLE_EVENT
448 };
449
450 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
451 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
452
453 static int lc_outstanding() {
454   return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
455       lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
456       lc_counts[DESTRUCTOR];
457 }
458
459 static void lc_snap() {
460   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
461     lc_prev[i] = lc_counts[i];
462   }
463 }
464
465 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
466
467 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
468   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
469     int delta = i == what || i == what2 ? 1 : 0;
470     EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
471         << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
472         << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
473         << ", from line " << lineno;
474   }
475   lc_snap();
476 }
477
478 template <typename R>
479 struct Lifecycle {
480   typedef R IsRelocatable;
481
482   bool constructed;
483
484   Lifecycle() noexcept : constructed(true) {
485     ++lc_counts[DEFAULT_CONSTRUCTOR];
486   }
487
488   explicit Lifecycle(int n, char const* s) noexcept : constructed(true) {
489     ++lc_counts[TWO_ARG_CONSTRUCTOR];
490   }
491
492   Lifecycle(const Lifecycle& rhs) noexcept : constructed(true) {
493     ++lc_counts[COPY_CONSTRUCTOR];
494   }
495
496   Lifecycle(Lifecycle&& rhs) noexcept : constructed(true) {
497     ++lc_counts[MOVE_CONSTRUCTOR];
498   }
499
500   Lifecycle& operator= (const Lifecycle& rhs) noexcept {
501     ++lc_counts[COPY_OPERATOR];
502     return *this;
503   }
504
505   Lifecycle& operator= (Lifecycle&& rhs) noexcept {
506     ++lc_counts[MOVE_OPERATOR];
507     return *this;
508   }
509
510   ~Lifecycle() noexcept {
511     ++lc_counts[DESTRUCTOR];
512     assert(lc_outstanding() >= 0);
513     assert(constructed);
514     constructed = false;
515   }
516 };
517
518 template <typename R>
519 void runPerfectForwardingTest() {
520   lc_snap();
521   EXPECT_EQ(lc_outstanding(), 0);
522
523   {
524     MPMCQueue<Lifecycle<R>> queue(50);
525     LIFECYCLE_STEP(NOTHING);
526
527     for (int pass = 0; pass < 10; ++pass) {
528       for (int i = 0; i < 10; ++i) {
529         queue.blockingWrite();
530         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
531
532         queue.blockingWrite(1, "one");
533         LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
534
535         {
536           Lifecycle<R> src;
537           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
538           queue.blockingWrite(std::move(src));
539           LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
540         }
541         LIFECYCLE_STEP(DESTRUCTOR);
542
543         {
544           Lifecycle<R> src;
545           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
546           queue.blockingWrite(src);
547           LIFECYCLE_STEP(COPY_CONSTRUCTOR);
548         }
549         LIFECYCLE_STEP(DESTRUCTOR);
550
551         EXPECT_TRUE(queue.write());
552         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
553       }
554
555       EXPECT_EQ(queue.size(), 50);
556       EXPECT_FALSE(queue.write(2, "two"));
557       LIFECYCLE_STEP(NOTHING);
558
559       for (int i = 0; i < 50; ++i) {
560         {
561           Lifecycle<R> node;
562           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
563
564           queue.blockingRead(node);
565           if (R::value) {
566             // relocatable, moved via memcpy
567             LIFECYCLE_STEP(DESTRUCTOR);
568           } else {
569             LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
570           }
571         }
572         LIFECYCLE_STEP(DESTRUCTOR);
573       }
574
575       EXPECT_EQ(queue.size(), 0);
576     }
577
578     // put one element back before destruction
579     {
580       Lifecycle<R> src(3, "three");
581       LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
582       queue.write(std::move(src));
583       LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
584     }
585     LIFECYCLE_STEP(DESTRUCTOR); // destroy src
586   }
587   LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
588
589   EXPECT_EQ(lc_outstanding(), 0);
590 }
591
592 TEST(MPMCQueue, perfect_forwarding) {
593   runPerfectForwardingTest<std::false_type>();
594 }
595
596 TEST(MPMCQueue, perfect_forwarding_relocatable) {
597   runPerfectForwardingTest<std::true_type>();
598 }
599
600 TEST(MPMCQueue, queue_moving) {
601   lc_snap();
602   EXPECT_EQ(lc_outstanding(), 0);
603
604   {
605     MPMCQueue<Lifecycle<std::false_type>> a(50);
606     LIFECYCLE_STEP(NOTHING);
607
608     a.blockingWrite();
609     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
610
611     // move constructor
612     MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
613     LIFECYCLE_STEP(NOTHING);
614     EXPECT_EQ(a.capacity(), 0);
615     EXPECT_EQ(a.size(), 0);
616     EXPECT_EQ(b.capacity(), 50);
617     EXPECT_EQ(b.size(), 1);
618
619     b.blockingWrite();
620     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
621
622     // move operator
623     MPMCQueue<Lifecycle<std::false_type>> c;
624     LIFECYCLE_STEP(NOTHING);
625     c = std::move(b);
626     LIFECYCLE_STEP(NOTHING);
627     EXPECT_EQ(c.capacity(), 50);
628     EXPECT_EQ(c.size(), 2);
629
630     {
631       Lifecycle<std::false_type> dst;
632       LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
633       c.blockingRead(dst);
634       LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
635
636       {
637         // swap
638         MPMCQueue<Lifecycle<std::false_type>> d(10);
639         LIFECYCLE_STEP(NOTHING);
640         std::swap(c, d);
641         LIFECYCLE_STEP(NOTHING);
642         EXPECT_EQ(c.capacity(), 10);
643         EXPECT_TRUE(c.isEmpty());
644         EXPECT_EQ(d.capacity(), 50);
645         EXPECT_EQ(d.size(), 1);
646
647         d.blockingRead(dst);
648         LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
649
650         c.blockingWrite(dst);
651         LIFECYCLE_STEP(COPY_CONSTRUCTOR);
652
653         d.blockingWrite(std::move(dst));
654         LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
655       } // d goes out of scope
656       LIFECYCLE_STEP(DESTRUCTOR);
657     } // dst goes out of scope
658     LIFECYCLE_STEP(DESTRUCTOR);
659   } // c goes out of scope
660   LIFECYCLE_STEP(DESTRUCTOR);
661 }
662
663 int main(int argc, char ** argv) {
664   testing::InitGoogleTest(&argc, argv);
665   gflags::ParseCommandLineFlags(&argc, &argv, true);
666   return RUN_ALL_TESTS();
667 }