cc9e8c123460ddc09109f0d2c2cd579eb2edfa39
[folly.git] / folly / test / MPMCQueueTest.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/GTest.h>
21 #include <folly/portability/SysResource.h>
22 #include <folly/portability/SysTime.h>
23 #include <folly/portability/Unistd.h>
24 #include <folly/test/DeterministicSchedule.h>
25
26 #include <boost/intrusive_ptr.hpp>
27 #include <memory>
28 #include <functional>
29 #include <thread>
30 #include <utility>
31
32 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
33
34 using namespace folly;
35 using namespace detail;
36 using namespace test;
37 using std::chrono::time_point;
38 using std::chrono::steady_clock;
39 using std::chrono::seconds;
40 using std::chrono::milliseconds;
41 using std::string;
42 using std::unique_ptr;
43 using std::vector;
44
45 typedef DeterministicSchedule DSched;
46
47 template <template<typename> class Atom>
48 void run_mt_sequencer_thread(
49     int numThreads,
50     int numOps,
51     uint32_t init,
52     TurnSequencer<Atom>& seq,
53     Atom<uint32_t>& spinThreshold,
54     int& prev,
55     int i) {
56   for (int op = i; op < numOps; op += numThreads) {
57     seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
58     EXPECT_EQ(prev, op - 1);
59     prev = op;
60     seq.completeTurn(init + op);
61   }
62 }
63
64 template <template<typename> class Atom>
65 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
66   TurnSequencer<Atom> seq(init);
67   Atom<uint32_t> spinThreshold(0);
68
69   int prev = -1;
70   vector<std::thread> threads(numThreads);
71   for (int i = 0; i < numThreads; ++i) {
72     threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
73           numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
74           std::ref(prev), i));
75   }
76
77   for (auto& thr : threads) {
78     DSched::join(thr);
79   }
80
81   EXPECT_EQ(prev, numOps - 1);
82 }
83
84 TEST(MPMCQueue, sequencer) {
85   run_mt_sequencer_test<std::atomic>(1, 100, 0);
86   run_mt_sequencer_test<std::atomic>(2, 100000, -100);
87   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
88 }
89
90 TEST(MPMCQueue, sequencer_emulated_futex) {
91   run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
92   run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
93   run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
94 }
95
96 TEST(MPMCQueue, sequencer_deterministic) {
97   DSched sched(DSched::uniform(0));
98   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
99   run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
100   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
101 }
102
103 template <bool Dynamic = false, typename T>
104 void runElementTypeTest(T&& src) {
105   MPMCQueue<T, std::atomic, Dynamic> cq(10);
106   cq.blockingWrite(std::forward<T>(src));
107   T dest;
108   cq.blockingRead(dest);
109   EXPECT_TRUE(cq.write(std::move(dest)));
110   EXPECT_TRUE(cq.read(dest));
111   auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
112   EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
113   EXPECT_TRUE(cq.read(dest));
114   auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
115   EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
116   EXPECT_TRUE(cq.read(dest));
117 }
118
119 struct RefCounted {
120   static FOLLY_TLS int active_instances;
121
122   mutable std::atomic<int> rc;
123
124   RefCounted() : rc(0) {
125     ++active_instances;
126   }
127
128   ~RefCounted() {
129     --active_instances;
130   }
131 };
132 FOLLY_TLS int RefCounted::active_instances;
133
134 void intrusive_ptr_add_ref(RefCounted const* p) {
135   p->rc++;
136 }
137
138 void intrusive_ptr_release(RefCounted const* p) {
139   if (--(p->rc) == 0) {
140     delete p;
141   }
142 }
143
144 TEST(MPMCQueue, lots_of_element_types) {
145   runElementTypeTest(10);
146   runElementTypeTest(string("abc"));
147   runElementTypeTest(std::make_pair(10, string("def")));
148   runElementTypeTest(vector<string>{{"abc"}});
149   runElementTypeTest(std::make_shared<char>('a'));
150   runElementTypeTest(folly::make_unique<char>('a'));
151   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
152   EXPECT_EQ(RefCounted::active_instances, 0);
153 }
154
155 TEST(MPMCQueue, lots_of_element_types_dynamic) {
156   runElementTypeTest<true>(10);
157   runElementTypeTest<true>(string("abc"));
158   runElementTypeTest<true>(std::make_pair(10, string("def")));
159   runElementTypeTest<true>(vector<string>{{"abc"}});
160   runElementTypeTest<true>(std::make_shared<char>('a'));
161   runElementTypeTest<true>(folly::make_unique<char>('a'));
162   runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
163   EXPECT_EQ(RefCounted::active_instances, 0);
164 }
165
166 TEST(MPMCQueue, single_thread_enqdeq) {
167   // Non-dynamic version only.
168   // False positive for dynamic version. Capacity can be temporarily
169   // higher than specified.
170   MPMCQueue<int> cq(10);
171
172   for (int pass = 0; pass < 10; ++pass) {
173     for (int i = 0; i < 10; ++i) {
174       EXPECT_TRUE(cq.write(i));
175     }
176     EXPECT_FALSE(cq.write(-1));
177     EXPECT_FALSE(cq.isEmpty());
178     EXPECT_EQ(cq.size(), 10);
179
180     for (int i = 0; i < 5; ++i) {
181       int dest = -1;
182       EXPECT_TRUE(cq.read(dest));
183       EXPECT_EQ(dest, i);
184     }
185     for (int i = 5; i < 10; ++i) {
186       int dest = -1;
187       cq.blockingRead(dest);
188       EXPECT_EQ(dest, i);
189     }
190     int dest = -1;
191     EXPECT_FALSE(cq.read(dest));
192     EXPECT_EQ(dest, -1);
193
194     EXPECT_TRUE(cq.isEmpty());
195     EXPECT_EQ(cq.size(), 0);
196   }
197 }
198
199 TEST(MPMCQueue, tryenq_capacity_test) {
200   // Non-dynamic version only.
201   // False positive for dynamic version. Capacity can be temporarily
202   // higher than specified.
203   for (size_t cap = 1; cap < 100; ++cap) {
204     MPMCQueue<int> cq(cap);
205     for (size_t i = 0; i < cap; ++i) {
206       EXPECT_TRUE(cq.write(i));
207     }
208     EXPECT_FALSE(cq.write(100));
209   }
210 }
211
212 TEST(MPMCQueue, enq_capacity_test) {
213   // Non-dynamic version only.
214   // False positive for dynamic version. Capacity can be temporarily
215   // higher than specified.
216   for (auto cap : { 1, 100, 10000 }) {
217     MPMCQueue<int> cq(cap);
218     for (int i = 0; i < cap; ++i) {
219       cq.blockingWrite(i);
220     }
221     int t = 0;
222     int when;
223     auto thr = std::thread([&]{
224       cq.blockingWrite(100);
225       when = t;
226     });
227     usleep(2000);
228     t = 1;
229     int dummy;
230     cq.blockingRead(dummy);
231     thr.join();
232     EXPECT_EQ(when, 1);
233   }
234 }
235
236 template <template<typename> class Atom, bool Dynamic = false>
237 void runTryEnqDeqThread(
238     int numThreads,
239     int n, /*numOps*/
240     MPMCQueue<int, Atom, Dynamic>& cq,
241     std::atomic<uint64_t>& sum,
242     int t) {
243   uint64_t threadSum = 0;
244   int src = t;
245   // received doesn't reflect any actual values, we just start with
246   // t and increment by numThreads to get the rounding of termination
247   // correct if numThreads doesn't evenly divide numOps
248   int received = t;
249   while (src < n || received < n) {
250     if (src < n && cq.write(src)) {
251       src += numThreads;
252     }
253
254     int dst;
255     if (received < n && cq.read(dst)) {
256       received += numThreads;
257       threadSum += dst;
258     }
259   }
260   sum += threadSum;
261 }
262
263 template <template<typename> class Atom, bool Dynamic = false>
264 void runTryEnqDeqTest(int numThreads, int numOps) {
265   // write and read aren't linearizable, so we don't have
266   // hard guarantees on their individual behavior.  We can still test
267   // correctness in aggregate
268   MPMCQueue<int,Atom, Dynamic> cq(numThreads);
269
270   uint64_t n = numOps;
271   vector<std::thread> threads(numThreads);
272   std::atomic<uint64_t> sum(0);
273   for (int t = 0; t < numThreads; ++t) {
274     threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
275           numThreads, n, std::ref(cq), std::ref(sum), t));
276   }
277   for (auto& t : threads) {
278     DSched::join(t);
279   }
280   EXPECT_TRUE(cq.isEmpty());
281   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
282 }
283
284 TEST(MPMCQueue, mt_try_enq_deq) {
285   int nts[] = { 1, 3, 100 };
286
287   int n = 100000;
288   for (int nt : nts) {
289     runTryEnqDeqTest<std::atomic>(nt, n);
290   }
291 }
292
293 TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
294   int nts[] = { 1, 3, 100 };
295
296   int n = 100000;
297   for (int nt : nts) {
298     runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
299   }
300 }
301
302 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
303   int nts[] = { 1, 3, 100 };
304
305   int n = 100000;
306   for (int nt : nts) {
307     runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
308   }
309 }
310
311 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
312   int nts[] = { 1, 3, 100 };
313
314   int n = 100000;
315   for (int nt : nts) {
316     runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
317   }
318 }
319
320 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
321   int nts[] = { 3, 10 };
322
323   long seed = 0;
324   LOG(INFO) << "using seed " << seed;
325
326   int n = 1000;
327   for (int nt : nts) {
328     {
329       DSched sched(DSched::uniform(seed));
330       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
331     }
332     {
333       DSched sched(DSched::uniformSubset(seed, 2));
334       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
335     }
336     {
337       DSched sched(DSched::uniform(seed));
338       runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
339     }
340     {
341       DSched sched(DSched::uniformSubset(seed, 2));
342       runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
343     }
344   }
345 }
346
347 uint64_t nowMicro() {
348   timeval tv;
349   gettimeofday(&tv, 0);
350   return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
351 }
352
353 template <typename Q>
354 struct WriteMethodCaller {
355   WriteMethodCaller() {}
356   virtual ~WriteMethodCaller() = default;
357   virtual bool callWrite(Q& q, int i) = 0;
358   virtual string methodName() = 0;
359 };
360
361 template <typename Q>
362 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
363   bool callWrite(Q& q, int i) override {
364     q.blockingWrite(i);
365     return true;
366   }
367   string methodName() override { return "blockingWrite"; }
368 };
369
370 template <typename Q>
371 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
372   bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
373   string methodName() override { return "writeIfNotFull"; }
374 };
375
376 template <typename Q>
377 struct WriteCaller : public WriteMethodCaller<Q> {
378   bool callWrite(Q& q, int i) override { return q.write(i); }
379   string methodName() override { return "write"; }
380 };
381
382 template <typename Q,
383           class Clock = steady_clock,
384           class Duration = typename Clock::duration>
385 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
386   const Duration duration_;
387   explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
388   bool callWrite(Q& q, int i) override {
389     auto then = Clock::now() + duration_;
390     return q.tryWriteUntil(then, i);
391   }
392   string methodName() override {
393     return folly::sformat(
394         "tryWriteUntil({}ms)",
395         std::chrono::duration_cast<milliseconds>(duration_).count());
396   }
397 };
398
399 template <typename Q>
400 string producerConsumerBench(Q&& queue,
401                              string qName,
402                              int numProducers,
403                              int numConsumers,
404                              int numOps,
405                              WriteMethodCaller<Q>& writer,
406                              bool ignoreContents = false) {
407   Q& q = queue;
408
409   struct rusage beginUsage;
410   getrusage(RUSAGE_SELF, &beginUsage);
411
412   auto beginMicro = nowMicro();
413
414   uint64_t n = numOps;
415   std::atomic<uint64_t> sum(0);
416   std::atomic<uint64_t> failed(0);
417
418   vector<std::thread> producers(numProducers);
419   for (int t = 0; t < numProducers; ++t) {
420     producers[t] = DSched::thread([&,t]{
421       for (int i = t; i < numOps; i += numProducers) {
422         while (!writer.callWrite(q, i)) {
423           ++failed;
424         }
425       }
426     });
427   }
428
429   vector<std::thread> consumers(numConsumers);
430   for (int t = 0; t < numConsumers; ++t) {
431     consumers[t] = DSched::thread([&,t]{
432       uint64_t localSum = 0;
433       for (int i = t; i < numOps; i += numConsumers) {
434         int dest = -1;
435         q.blockingRead(dest);
436         EXPECT_FALSE(dest == -1);
437         localSum += dest;
438       }
439       sum += localSum;
440     });
441   }
442
443   for (auto& t : producers) {
444     DSched::join(t);
445   }
446   for (auto& t : consumers) {
447     DSched::join(t);
448   }
449   if (!ignoreContents) {
450     EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
451   }
452
453   auto endMicro = nowMicro();
454
455   struct rusage endUsage;
456   getrusage(RUSAGE_SELF, &endUsage);
457
458   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
459   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
460       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
461   uint64_t failures = failed;
462   size_t allocated = q.allocatedCapacity();
463
464   return folly::sformat(
465       "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
466       "handoff, {} failures, {} allocated",
467       qName,
468       numProducers,
469       writer.methodName(),
470       numConsumers,
471       nanosPer,
472       csw,
473       n,
474       failures,
475       allocated);
476 }
477
478 template <bool Dynamic = false>
479 void runMtProdConsDeterministic(long seed) {
480   // we use the Bench method, but perf results are meaningless under DSched
481   DSched sched(DSched::uniform(seed));
482
483   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
484                                                 Dynamic>>>> callers;
485   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
486                        DeterministicAtomic, Dynamic>>>());
487   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
488                        DeterministicAtomic, Dynamic>>>());
489   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
490                        DeterministicAtomic, Dynamic>>>());
491   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
492                        DeterministicAtomic, Dynamic>>>(milliseconds(1)));
493   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
494                        DeterministicAtomic, Dynamic>>>(seconds(2)));
495   size_t cap;
496
497   for (const auto& caller : callers) {
498     cap = 10;
499     LOG(INFO) <<
500       producerConsumerBench(
501         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
502         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
503           + folly::to<std::string>(cap)+")",
504         1,
505         1,
506         1000,
507         *caller);
508     cap = 100;
509     LOG(INFO) <<
510       producerConsumerBench(
511         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
512         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
513           + folly::to<std::string>(cap)+")",
514         10,
515         10,
516         1000,
517         *caller);
518     cap = 10;
519     LOG(INFO) <<
520       producerConsumerBench(
521         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
522         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
523           + folly::to<std::string>(cap)+")",
524         1,
525         1,
526         1000,
527         *caller);
528     cap = 100;
529     LOG(INFO) <<
530       producerConsumerBench(
531         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
532         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
533           + folly::to<std::string>(cap)+")",
534         10,
535         10,
536         1000,
537         *caller);
538     cap = 1;
539     LOG(INFO) <<
540       producerConsumerBench(
541         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
542         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
543           + folly::to<std::string>(cap)+")",
544         10,
545         10,
546         1000,
547         *caller);
548   }
549 }
550
551 void runMtProdConsDeterministicDynamic(
552   long seed,
553   uint32_t prods,
554   uint32_t cons,
555   uint32_t numOps,
556   size_t cap,
557   size_t minCap,
558   size_t mult
559 ) {
560   // we use the Bench method, but perf results are meaningless under DSched
561   DSched sched(DSched::uniform(seed));
562
563   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
564                                                 true>>>> callers;
565   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
566                        DeterministicAtomic, true>>>());
567   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
568                        DeterministicAtomic, true>>>());
569   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
570                        DeterministicAtomic, true>>>());
571   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
572                        DeterministicAtomic, true>>>(milliseconds(1)));
573   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
574                        DeterministicAtomic, true>>>(seconds(2)));
575
576   for (const auto& caller : callers) {
577     LOG(INFO) <<
578       producerConsumerBench(
579         MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
580         "MPMCQueue<int, DeterministicAtomic, true>("
581           + folly::to<std::string>(cap) + ", "
582           + folly::to<std::string>(minCap) + ", "
583           + folly::to<std::string>(mult)+")",
584         prods,
585         cons,
586         numOps,
587         *caller);
588   }
589 }
590
591 TEST(MPMCQueue, mt_prod_cons_deterministic) {
592   runMtProdConsDeterministic(0);
593 }
594
595 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
596   runMtProdConsDeterministic<true>(0);
597 }
598
599 template <typename T>
600 void setFromEnv(T& var, const char* envvar) {
601   char* str = std::getenv(envvar);
602   if (str) { var = atoi(str); }
603 }
604
605 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
606   long seed = 0;
607   uint32_t prods = 10;
608   uint32_t cons = 10;
609   uint32_t numOps = 1000;
610   size_t cap = 10000;
611   size_t minCap = 9;
612   size_t mult = 3;
613   setFromEnv(seed, "SEED");
614   setFromEnv(prods, "PRODS");
615   setFromEnv(cons, "CONS");
616   setFromEnv(numOps, "NUM_OPS");
617   setFromEnv(cap, "CAP");
618   setFromEnv(minCap, "MIN_CAP");
619   setFromEnv(mult, "MULT");
620   runMtProdConsDeterministicDynamic(
621     seed, prods, cons, numOps, cap, minCap, mult);
622 }
623
624 #define PC_BENCH(q, np, nc, ...) \
625     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
626
627 template <bool Dynamic = false>
628 void runMtProdCons() {
629   int n = 100000;
630   setFromEnv(n, "NUM_OPS");
631   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, std::atomic, Dynamic>>>>
632     callers;
633   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
634                        std::atomic, Dynamic>>>());
635   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
636                        std::atomic, Dynamic>>>());
637   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int, std::atomic,
638                        Dynamic>>>());
639   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
640                        std::atomic, Dynamic>>>(milliseconds(1)));
641   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
642                        std::atomic, Dynamic>>>(seconds(2)));
643   for (const auto& caller : callers) {
644     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
645                           1, 1, n, *caller);
646     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
647                           10, 1, n, *caller);
648     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
649                           1, 10, n, *caller);
650     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
651                           10, 10, n, *caller);
652     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
653                           1, 1, n, *caller);
654     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
655                           10, 1, n, *caller);
656     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
657                           1, 10, n, *caller);
658     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
659                           10, 10, n, *caller);
660     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(100000)),
661                           32, 100, n, *caller);
662   }
663 }
664
665 TEST(MPMCQueue, mt_prod_cons) {
666   runMtProdCons();
667 }
668
669 TEST(MPMCQueue, mt_prod_cons_dynamic) {
670   runMtProdCons</* Dynamic = */ true>();
671 }
672
673 template <bool Dynamic = false>
674 void runMtProdConsEmulatedFutex() {
675   int n = 100000;
676   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic,
677                                                 Dynamic>>>> callers;
678   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
679                        EmulatedFutexAtomic, Dynamic>>>());
680   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
681                        EmulatedFutexAtomic, Dynamic>>>());
682   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
683                        EmulatedFutexAtomic, Dynamic>>>());
684   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
685                        EmulatedFutexAtomic, Dynamic>>>(milliseconds(1)));
686   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
687                        EmulatedFutexAtomic, Dynamic>>>(seconds(2)));
688   for (const auto& caller : callers) {
689     LOG(INFO) << PC_BENCH(
690       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 1, n, *caller);
691     LOG(INFO) << PC_BENCH(
692       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 1, n, *caller);
693     LOG(INFO) << PC_BENCH(
694       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 10, n, *caller);
695     LOG(INFO) << PC_BENCH(
696       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 10, n, *caller);
697     LOG(INFO) << PC_BENCH(
698       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 1, n, *caller);
699     LOG(INFO) << PC_BENCH(
700       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 10, 1, n, *caller);
701     LOG(INFO) << PC_BENCH(
702       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 10, n, *caller);
703     LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
704                            (10000)), 10, 10, n, *caller);
705     LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
706                            (100000)), 32, 100, n, *caller);
707   }
708 }
709
710 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
711   runMtProdConsEmulatedFutex();
712 }
713
714 TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
715   runMtProdConsEmulatedFutex</* Dynamic = */ true>();
716 }
717
718 template <template <typename> class Atom, bool Dynamic = false>
719 void runNeverFailThread(int numThreads,
720                         int n, /*numOps*/
721                         MPMCQueue<int, Atom, Dynamic>& cq,
722                         std::atomic<uint64_t>& sum,
723                         int t) {
724   uint64_t threadSum = 0;
725   for (int i = t; i < n; i += numThreads) {
726     // enq + deq
727     EXPECT_TRUE(cq.writeIfNotFull(i));
728
729     int dest = -1;
730     EXPECT_TRUE(cq.readIfNotEmpty(dest));
731     EXPECT_TRUE(dest >= 0);
732     threadSum += dest;
733   }
734   sum += threadSum;
735 }
736
737 template <template <typename> class Atom, bool Dynamic = false>
738 uint64_t runNeverFailTest(int numThreads, int numOps) {
739   // always #enq >= #deq
740   MPMCQueue<int, Atom, Dynamic> cq(numThreads);
741
742   uint64_t n = numOps;
743   auto beginMicro = nowMicro();
744
745   vector<std::thread> threads(numThreads);
746   std::atomic<uint64_t> sum(0);
747   for (int t = 0; t < numThreads; ++t) {
748     threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
749                                           numThreads,
750                                           n,
751                                           std::ref(cq),
752                                           std::ref(sum),
753                                           t));
754   }
755   for (auto& t : threads) {
756     DSched::join(t);
757   }
758   EXPECT_TRUE(cq.isEmpty());
759   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
760
761   return nowMicro() - beginMicro;
762 }
763
764 template <template<typename> class Atom, bool Dynamic = false>
765 void runMtNeverFail(std::vector<int>& nts, int n) {
766   for (int nt : nts) {
767     uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
768     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
769               << " threads";
770   }
771 }
772
773 TEST(MPMCQueue, mt_never_fail) {
774   std::vector<int> nts {1, 3, 100};
775   int n = 100000;
776   runMtNeverFail<std::atomic>(nts, n);
777 }
778
779 TEST(MPMCQueue, mt_never_fail_dynamic) {
780   std::vector<int> nts {1, 3, 100};
781   int n = 100000;
782   runMtNeverFail<std::atomic, true>(nts, n);
783 }
784
785 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
786   std::vector<int> nts {1, 3, 100};
787   int n = 100000;
788   runMtNeverFail<EmulatedFutexAtomic>(nts, n);
789 }
790
791 TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) {
792   std::vector<int> nts {1, 3, 100};
793   int n = 100000;
794   runMtNeverFail<EmulatedFutexAtomic, true>(nts, n);
795 }
796
797 template<bool Dynamic = false>
798 void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
799   LOG(INFO) << "using seed " << seed;
800   for (int nt : nts) {
801     {
802       DSched sched(DSched::uniform(seed));
803       runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
804     }
805     {
806       DSched sched(DSched::uniformSubset(seed, 2));
807       runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
808     }
809   }
810 }
811
812 TEST(MPMCQueue, mt_never_fail_deterministic) {
813   std::vector<int> nts {3, 10};
814   long seed = 0; // nowMicro() % 10000;
815   int n = 1000;
816   runMtNeverFailDeterministic(nts, n, seed);
817 }
818
819 TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) {
820   std::vector<int> nts {3, 10};
821   long seed = 0; // nowMicro() % 10000;
822   int n = 1000;
823   runMtNeverFailDeterministic<true>(nts, n, seed);
824 }
825
826 template <class Clock, template <typename> class Atom, bool Dynamic>
827 void runNeverFailUntilThread(int numThreads,
828                              int n, /*numOps*/
829                              MPMCQueue<int, Atom, Dynamic>& cq,
830                              std::atomic<uint64_t>& sum,
831                              int t) {
832   uint64_t threadSum = 0;
833   for (int i = t; i < n; i += numThreads) {
834     // enq + deq
835     auto soon = Clock::now() + std::chrono::seconds(1);
836     EXPECT_TRUE(cq.tryWriteUntil(soon, i));
837
838     int dest = -1;
839     EXPECT_TRUE(cq.readIfNotEmpty(dest));
840     EXPECT_TRUE(dest >= 0);
841     threadSum += dest;
842   }
843   sum += threadSum;
844 }
845
846 template <class Clock, template <typename> class Atom, bool Dynamic = false>
847 uint64_t runNeverFailTest(int numThreads, int numOps) {
848   // always #enq >= #deq
849   MPMCQueue<int, Atom, Dynamic> cq(numThreads);
850
851   uint64_t n = numOps;
852   auto beginMicro = nowMicro();
853
854   vector<std::thread> threads(numThreads);
855   std::atomic<uint64_t> sum(0);
856   for (int t = 0; t < numThreads; ++t) {
857     threads[t] = DSched::thread(std::bind(
858                                   runNeverFailUntilThread<Clock, Atom, Dynamic>,
859                                   numThreads,
860                                   n,
861                                   std::ref(cq),
862                                   std::ref(sum),
863                                   t));
864   }
865   for (auto& t : threads) {
866     DSched::join(t);
867   }
868   EXPECT_TRUE(cq.isEmpty());
869   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
870
871   return nowMicro() - beginMicro;
872 }
873
874 template <bool Dynamic = false>
875 void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
876   for (int nt : nts) {
877     uint64_t elapsed =
878       runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
879     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
880               << " threads";
881   }
882 }
883
884 TEST(MPMCQueue, mt_never_fail_until_system) {
885   std::vector<int> nts {1, 3, 100};
886   int n = 100000;
887   runMtNeverFailUntilSystem(nts, n);
888 }
889
890 TEST(MPMCQueue, mt_never_fail_until_system_dynamic) {
891   std::vector<int> nts {1, 3, 100};
892   int n = 100000;
893   runMtNeverFailUntilSystem<true>(nts, n);
894 }
895
896 template <bool Dynamic = false>
897 void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
898   for (int nt : nts) {
899     uint64_t elapsed =
900       runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
901     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
902               << " threads";
903   }
904 }
905
906 TEST(MPMCQueue, mt_never_fail_until_steady) {
907   std::vector<int> nts {1, 3, 100};
908   int n = 100000;
909   runMtNeverFailUntilSteady(nts, n);
910 }
911
912 TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) {
913   std::vector<int> nts {1, 3, 100};
914   int n = 100000;
915   runMtNeverFailUntilSteady<true>(nts, n);
916 }
917
918 enum LifecycleEvent {
919   NOTHING = -1,
920   DEFAULT_CONSTRUCTOR,
921   COPY_CONSTRUCTOR,
922   MOVE_CONSTRUCTOR,
923   TWO_ARG_CONSTRUCTOR,
924   COPY_OPERATOR,
925   MOVE_OPERATOR,
926   DESTRUCTOR,
927   MAX_LIFECYCLE_EVENT
928 };
929
930 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
931 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
932
933 static int lc_outstanding() {
934   return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
935       lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
936       lc_counts[DESTRUCTOR];
937 }
938
939 static void lc_snap() {
940   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
941     lc_prev[i] = lc_counts[i];
942   }
943 }
944
945 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
946
947 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
948   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
949     int delta = i == what || i == what2 ? 1 : 0;
950     EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
951         << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
952         << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
953         << ", from line " << lineno;
954   }
955   lc_snap();
956 }
957
958 template <typename R>
959 struct Lifecycle {
960   typedef R IsRelocatable;
961
962   bool constructed;
963
964   Lifecycle() noexcept : constructed(true) {
965     ++lc_counts[DEFAULT_CONSTRUCTOR];
966   }
967
968   explicit Lifecycle(int /* n */, char const* /* s */) noexcept
969       : constructed(true) {
970     ++lc_counts[TWO_ARG_CONSTRUCTOR];
971   }
972
973   Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
974     ++lc_counts[COPY_CONSTRUCTOR];
975   }
976
977   Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
978     ++lc_counts[MOVE_CONSTRUCTOR];
979   }
980
981   Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
982     ++lc_counts[COPY_OPERATOR];
983     return *this;
984   }
985
986   Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
987     ++lc_counts[MOVE_OPERATOR];
988     return *this;
989   }
990
991   ~Lifecycle() noexcept {
992     ++lc_counts[DESTRUCTOR];
993     assert(lc_outstanding() >= 0);
994     assert(constructed);
995     constructed = false;
996   }
997 };
998
999 template <typename R>
1000 void runPerfectForwardingTest() {
1001   lc_snap();
1002   EXPECT_EQ(lc_outstanding(), 0);
1003
1004   {
1005     // Non-dynamic only. False positive for dynamic.
1006     MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
1007     LIFECYCLE_STEP(NOTHING);
1008
1009     for (int pass = 0; pass < 10; ++pass) {
1010       for (int i = 0; i < 10; ++i) {
1011         queue.blockingWrite();
1012         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1013
1014         queue.blockingWrite(1, "one");
1015         LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1016
1017         {
1018           Lifecycle<R> src;
1019           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1020           queue.blockingWrite(std::move(src));
1021           LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1022         }
1023         LIFECYCLE_STEP(DESTRUCTOR);
1024
1025         {
1026           Lifecycle<R> src;
1027           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1028           queue.blockingWrite(src);
1029           LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1030         }
1031         LIFECYCLE_STEP(DESTRUCTOR);
1032
1033         EXPECT_TRUE(queue.write());
1034         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1035       }
1036
1037       EXPECT_EQ(queue.size(), 50);
1038       EXPECT_FALSE(queue.write(2, "two"));
1039       LIFECYCLE_STEP(NOTHING);
1040
1041       for (int i = 0; i < 50; ++i) {
1042         {
1043           Lifecycle<R> node;
1044           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1045
1046           queue.blockingRead(node);
1047           if (R::value) {
1048             // relocatable, moved via memcpy
1049             LIFECYCLE_STEP(DESTRUCTOR);
1050           } else {
1051             LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1052           }
1053         }
1054         LIFECYCLE_STEP(DESTRUCTOR);
1055       }
1056
1057       EXPECT_EQ(queue.size(), 0);
1058     }
1059
1060     // put one element back before destruction
1061     {
1062       Lifecycle<R> src(3, "three");
1063       LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1064       queue.write(std::move(src));
1065       LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1066     }
1067     LIFECYCLE_STEP(DESTRUCTOR); // destroy src
1068   }
1069   LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
1070
1071   EXPECT_EQ(lc_outstanding(), 0);
1072 }
1073
1074 TEST(MPMCQueue, perfect_forwarding) {
1075   runPerfectForwardingTest<std::false_type>();
1076 }
1077
1078 TEST(MPMCQueue, perfect_forwarding_relocatable) {
1079   runPerfectForwardingTest<std::true_type>();
1080 }
1081
1082 template <bool Dynamic = false>
1083 void run_queue_moving() {
1084   lc_snap();
1085   EXPECT_EQ(lc_outstanding(), 0);
1086
1087   {
1088     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
1089     LIFECYCLE_STEP(NOTHING);
1090
1091     a.blockingWrite();
1092     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1093
1094     // move constructor
1095     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
1096       = std::move(a);
1097     LIFECYCLE_STEP(NOTHING);
1098     EXPECT_EQ(a.capacity(), 0);
1099     EXPECT_EQ(a.size(), 0);
1100     EXPECT_EQ(b.capacity(), 50);
1101     EXPECT_EQ(b.size(), 1);
1102
1103     b.blockingWrite();
1104     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1105
1106     // move operator
1107     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
1108     LIFECYCLE_STEP(NOTHING);
1109     c = std::move(b);
1110     LIFECYCLE_STEP(NOTHING);
1111     EXPECT_EQ(c.capacity(), 50);
1112     EXPECT_EQ(c.size(), 2);
1113
1114     {
1115       Lifecycle<std::false_type> dst;
1116       LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1117       c.blockingRead(dst);
1118       LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1119
1120       {
1121         // swap
1122         MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
1123         LIFECYCLE_STEP(NOTHING);
1124         std::swap(c, d);
1125         LIFECYCLE_STEP(NOTHING);
1126         EXPECT_EQ(c.capacity(), 10);
1127         EXPECT_TRUE(c.isEmpty());
1128         EXPECT_EQ(d.capacity(), 50);
1129         EXPECT_EQ(d.size(), 1);
1130
1131         d.blockingRead(dst);
1132         LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1133
1134         c.blockingWrite(dst);
1135         LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1136
1137         d.blockingWrite(std::move(dst));
1138         LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1139       } // d goes out of scope
1140       LIFECYCLE_STEP(DESTRUCTOR);
1141     } // dst goes out of scope
1142     LIFECYCLE_STEP(DESTRUCTOR);
1143   } // c goes out of scope
1144   LIFECYCLE_STEP(DESTRUCTOR);
1145 }
1146
1147 TEST(MPMCQueue, queue_moving) {
1148   run_queue_moving();
1149 }
1150
1151 TEST(MPMCQueue, queue_moving_dynamic) {
1152   run_queue_moving<true>();
1153 }
1154
1155 TEST(MPMCQueue, explicit_zero_capacity_fail) {
1156   ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
1157
1158   using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
1159   ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
1160 }