f4a195a492b998ce9d88f854e5d5a62bf50a191d
[folly.git] / folly / test / MPMCQueueTest.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/SysResource.h>
21 #include <folly/portability/SysTime.h>
22 #include <folly/portability/Unistd.h>
23 #include <folly/test/DeterministicSchedule.h>
24
25 #include <boost/intrusive_ptr.hpp>
26 #include <memory>
27 #include <functional>
28 #include <thread>
29 #include <utility>
30
31 #include <gtest/gtest.h>
32
33 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
34
35 using namespace folly;
36 using namespace detail;
37 using namespace test;
38 using std::chrono::time_point;
39 using std::chrono::steady_clock;
40 using std::chrono::seconds;
41 using std::chrono::milliseconds;
42 using std::string;
43 using std::unique_ptr;
44 using std::vector;
45
46 typedef DeterministicSchedule DSched;
47
48 template <template<typename> class Atom>
49 void run_mt_sequencer_thread(
50     int numThreads,
51     int numOps,
52     uint32_t init,
53     TurnSequencer<Atom>& seq,
54     Atom<uint32_t>& spinThreshold,
55     int& prev,
56     int i) {
57   for (int op = i; op < numOps; op += numThreads) {
58     seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
59     EXPECT_EQ(prev, op - 1);
60     prev = op;
61     seq.completeTurn(init + op);
62   }
63 }
64
65 template <template<typename> class Atom>
66 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
67   TurnSequencer<Atom> seq(init);
68   Atom<uint32_t> spinThreshold(0);
69
70   int prev = -1;
71   vector<std::thread> threads(numThreads);
72   for (int i = 0; i < numThreads; ++i) {
73     threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
74           numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
75           std::ref(prev), i));
76   }
77
78   for (auto& thr : threads) {
79     DSched::join(thr);
80   }
81
82   EXPECT_EQ(prev, numOps - 1);
83 }
84
85 TEST(MPMCQueue, sequencer) {
86   run_mt_sequencer_test<std::atomic>(1, 100, 0);
87   run_mt_sequencer_test<std::atomic>(2, 100000, -100);
88   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
89 }
90
91 TEST(MPMCQueue, sequencer_emulated_futex) {
92   run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
93   run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
94   run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
95 }
96
97 TEST(MPMCQueue, sequencer_deterministic) {
98   DSched sched(DSched::uniform(0));
99   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
100   run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
101   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
102 }
103
104 template <bool Dynamic = false, typename T>
105 void runElementTypeTest(T&& src) {
106   MPMCQueue<T, std::atomic, Dynamic> cq(10);
107   cq.blockingWrite(std::forward<T>(src));
108   T dest;
109   cq.blockingRead(dest);
110   EXPECT_TRUE(cq.write(std::move(dest)));
111   EXPECT_TRUE(cq.read(dest));
112   auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
113   EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
114   EXPECT_TRUE(cq.read(dest));
115   auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
116   EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
117   EXPECT_TRUE(cq.read(dest));
118 }
119
120 struct RefCounted {
121   static FOLLY_TLS int active_instances;
122
123   mutable std::atomic<int> rc;
124
125   RefCounted() : rc(0) {
126     ++active_instances;
127   }
128
129   ~RefCounted() {
130     --active_instances;
131   }
132 };
133 FOLLY_TLS int RefCounted::active_instances;
134
135 void intrusive_ptr_add_ref(RefCounted const* p) {
136   p->rc++;
137 }
138
139 void intrusive_ptr_release(RefCounted const* p) {
140   if (--(p->rc) == 0) {
141     delete p;
142   }
143 }
144
145 TEST(MPMCQueue, lots_of_element_types) {
146   runElementTypeTest(10);
147   runElementTypeTest(string("abc"));
148   runElementTypeTest(std::make_pair(10, string("def")));
149   runElementTypeTest(vector<string>{{"abc"}});
150   runElementTypeTest(std::make_shared<char>('a'));
151   runElementTypeTest(folly::make_unique<char>('a'));
152   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
153   EXPECT_EQ(RefCounted::active_instances, 0);
154 }
155
156 TEST(MPMCQueue, lots_of_element_types_dynamic) {
157   runElementTypeTest<true>(10);
158   runElementTypeTest<true>(string("abc"));
159   runElementTypeTest<true>(std::make_pair(10, string("def")));
160   runElementTypeTest<true>(vector<string>{{"abc"}});
161   runElementTypeTest<true>(std::make_shared<char>('a'));
162   runElementTypeTest<true>(folly::make_unique<char>('a'));
163   runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
164   EXPECT_EQ(RefCounted::active_instances, 0);
165 }
166
167 TEST(MPMCQueue, single_thread_enqdeq) {
168   // Non-dynamic version only.
169   // False positive for dynamic version. Capacity can be temporarily
170   // higher than specified.
171   MPMCQueue<int> cq(10);
172
173   for (int pass = 0; pass < 10; ++pass) {
174     for (int i = 0; i < 10; ++i) {
175       EXPECT_TRUE(cq.write(i));
176     }
177     EXPECT_FALSE(cq.write(-1));
178     EXPECT_FALSE(cq.isEmpty());
179     EXPECT_EQ(cq.size(), 10);
180
181     for (int i = 0; i < 5; ++i) {
182       int dest = -1;
183       EXPECT_TRUE(cq.read(dest));
184       EXPECT_EQ(dest, i);
185     }
186     for (int i = 5; i < 10; ++i) {
187       int dest = -1;
188       cq.blockingRead(dest);
189       EXPECT_EQ(dest, i);
190     }
191     int dest = -1;
192     EXPECT_FALSE(cq.read(dest));
193     EXPECT_EQ(dest, -1);
194
195     EXPECT_TRUE(cq.isEmpty());
196     EXPECT_EQ(cq.size(), 0);
197   }
198 }
199
200 TEST(MPMCQueue, tryenq_capacity_test) {
201   // Non-dynamic version only.
202   // False positive for dynamic version. Capacity can be temporarily
203   // higher than specified.
204   for (size_t cap = 1; cap < 100; ++cap) {
205     MPMCQueue<int> cq(cap);
206     for (size_t i = 0; i < cap; ++i) {
207       EXPECT_TRUE(cq.write(i));
208     }
209     EXPECT_FALSE(cq.write(100));
210   }
211 }
212
213 TEST(MPMCQueue, enq_capacity_test) {
214   // Non-dynamic version only.
215   // False positive for dynamic version. Capacity can be temporarily
216   // higher than specified.
217   for (auto cap : { 1, 100, 10000 }) {
218     MPMCQueue<int> cq(cap);
219     for (int i = 0; i < cap; ++i) {
220       cq.blockingWrite(i);
221     }
222     int t = 0;
223     int when;
224     auto thr = std::thread([&]{
225       cq.blockingWrite(100);
226       when = t;
227     });
228     usleep(2000);
229     t = 1;
230     int dummy;
231     cq.blockingRead(dummy);
232     thr.join();
233     EXPECT_EQ(when, 1);
234   }
235 }
236
237 template <template<typename> class Atom, bool Dynamic = false>
238 void runTryEnqDeqThread(
239     int numThreads,
240     int n, /*numOps*/
241     MPMCQueue<int, Atom, Dynamic>& cq,
242     std::atomic<uint64_t>& sum,
243     int t) {
244   uint64_t threadSum = 0;
245   int src = t;
246   // received doesn't reflect any actual values, we just start with
247   // t and increment by numThreads to get the rounding of termination
248   // correct if numThreads doesn't evenly divide numOps
249   int received = t;
250   while (src < n || received < n) {
251     if (src < n && cq.write(src)) {
252       src += numThreads;
253     }
254
255     int dst;
256     if (received < n && cq.read(dst)) {
257       received += numThreads;
258       threadSum += dst;
259     }
260   }
261   sum += threadSum;
262 }
263
264 template <template<typename> class Atom, bool Dynamic = false>
265 void runTryEnqDeqTest(int numThreads, int numOps) {
266   // write and read aren't linearizable, so we don't have
267   // hard guarantees on their individual behavior.  We can still test
268   // correctness in aggregate
269   MPMCQueue<int,Atom, Dynamic> cq(numThreads);
270
271   uint64_t n = numOps;
272   vector<std::thread> threads(numThreads);
273   std::atomic<uint64_t> sum(0);
274   for (int t = 0; t < numThreads; ++t) {
275     threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
276           numThreads, n, std::ref(cq), std::ref(sum), t));
277   }
278   for (auto& t : threads) {
279     DSched::join(t);
280   }
281   EXPECT_TRUE(cq.isEmpty());
282   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
283 }
284
285 TEST(MPMCQueue, mt_try_enq_deq) {
286   int nts[] = { 1, 3, 100 };
287
288   int n = 100000;
289   for (int nt : nts) {
290     runTryEnqDeqTest<std::atomic>(nt, n);
291   }
292 }
293
294 TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
295   int nts[] = { 1, 3, 100 };
296
297   int n = 100000;
298   for (int nt : nts) {
299     runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
300   }
301 }
302
303 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
304   int nts[] = { 1, 3, 100 };
305
306   int n = 100000;
307   for (int nt : nts) {
308     runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
309   }
310 }
311
312 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
313   int nts[] = { 1, 3, 100 };
314
315   int n = 100000;
316   for (int nt : nts) {
317     runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
318   }
319 }
320
321 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
322   int nts[] = { 3, 10 };
323
324   long seed = 0;
325   LOG(INFO) << "using seed " << seed;
326
327   int n = 1000;
328   for (int nt : nts) {
329     {
330       DSched sched(DSched::uniform(seed));
331       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
332     }
333     {
334       DSched sched(DSched::uniformSubset(seed, 2));
335       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
336     }
337     {
338       DSched sched(DSched::uniform(seed));
339       runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
340     }
341     {
342       DSched sched(DSched::uniformSubset(seed, 2));
343       runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
344     }
345   }
346 }
347
348 uint64_t nowMicro() {
349   timeval tv;
350   gettimeofday(&tv, 0);
351   return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
352 }
353
354 template <typename Q>
355 struct WriteMethodCaller {
356   WriteMethodCaller() {}
357   virtual ~WriteMethodCaller() = default;
358   virtual bool callWrite(Q& q, int i) = 0;
359   virtual string methodName() = 0;
360 };
361
362 template <typename Q>
363 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
364   bool callWrite(Q& q, int i) override {
365     q.blockingWrite(i);
366     return true;
367   }
368   string methodName() override { return "blockingWrite"; }
369 };
370
371 template <typename Q>
372 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
373   bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
374   string methodName() override { return "writeIfNotFull"; }
375 };
376
377 template <typename Q>
378 struct WriteCaller : public WriteMethodCaller<Q> {
379   bool callWrite(Q& q, int i) override { return q.write(i); }
380   string methodName() override { return "write"; }
381 };
382
383 template <typename Q,
384           class Clock = steady_clock,
385           class Duration = typename Clock::duration>
386 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
387   const Duration duration_;
388   explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
389   bool callWrite(Q& q, int i) override {
390     auto then = Clock::now() + duration_;
391     return q.tryWriteUntil(then, i);
392   }
393   string methodName() override {
394     return folly::sformat(
395         "tryWriteUntil({}ms)",
396         std::chrono::duration_cast<milliseconds>(duration_).count());
397   }
398 };
399
400 template <typename Q>
401 string producerConsumerBench(Q&& queue,
402                              string qName,
403                              int numProducers,
404                              int numConsumers,
405                              int numOps,
406                              WriteMethodCaller<Q>& writer,
407                              bool ignoreContents = false) {
408   Q& q = queue;
409
410   struct rusage beginUsage;
411   getrusage(RUSAGE_SELF, &beginUsage);
412
413   auto beginMicro = nowMicro();
414
415   uint64_t n = numOps;
416   std::atomic<uint64_t> sum(0);
417   std::atomic<uint64_t> failed(0);
418
419   vector<std::thread> producers(numProducers);
420   for (int t = 0; t < numProducers; ++t) {
421     producers[t] = DSched::thread([&,t]{
422       for (int i = t; i < numOps; i += numProducers) {
423         while (!writer.callWrite(q, i)) {
424           ++failed;
425         }
426       }
427     });
428   }
429
430   vector<std::thread> consumers(numConsumers);
431   for (int t = 0; t < numConsumers; ++t) {
432     consumers[t] = DSched::thread([&,t]{
433       uint64_t localSum = 0;
434       for (int i = t; i < numOps; i += numConsumers) {
435         int dest = -1;
436         q.blockingRead(dest);
437         EXPECT_FALSE(dest == -1);
438         localSum += dest;
439       }
440       sum += localSum;
441     });
442   }
443
444   for (auto& t : producers) {
445     DSched::join(t);
446   }
447   for (auto& t : consumers) {
448     DSched::join(t);
449   }
450   if (!ignoreContents) {
451     EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
452   }
453
454   auto endMicro = nowMicro();
455
456   struct rusage endUsage;
457   getrusage(RUSAGE_SELF, &endUsage);
458
459   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
460   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
461       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
462   uint64_t failures = failed;
463   size_t allocated = q.allocatedCapacity();
464
465   return folly::sformat(
466       "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
467       "handoff, {} failures, {} allocated",
468       qName,
469       numProducers,
470       writer.methodName(),
471       numConsumers,
472       nanosPer,
473       csw,
474       n,
475       failures,
476       allocated);
477 }
478
479 template <bool Dynamic = false>
480 void runMtProdConsDeterministic(long seed) {
481   // we use the Bench method, but perf results are meaningless under DSched
482   DSched sched(DSched::uniform(seed));
483
484   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
485                                                 Dynamic>>>> callers;
486   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
487                        DeterministicAtomic, Dynamic>>>());
488   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
489                        DeterministicAtomic, Dynamic>>>());
490   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
491                        DeterministicAtomic, Dynamic>>>());
492   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
493                        DeterministicAtomic, Dynamic>>>(milliseconds(1)));
494   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
495                        DeterministicAtomic, Dynamic>>>(seconds(2)));
496   size_t cap;
497
498   for (const auto& caller : callers) {
499     cap = 10;
500     LOG(INFO) <<
501       producerConsumerBench(
502         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
503         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
504           + folly::to<std::string>(cap)+")",
505         1,
506         1,
507         1000,
508         *caller);
509     cap = 100;
510     LOG(INFO) <<
511       producerConsumerBench(
512         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
513         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
514           + folly::to<std::string>(cap)+")",
515         10,
516         10,
517         1000,
518         *caller);
519     cap = 10;
520     LOG(INFO) <<
521       producerConsumerBench(
522         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
523         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
524           + folly::to<std::string>(cap)+")",
525         1,
526         1,
527         1000,
528         *caller);
529     cap = 100;
530     LOG(INFO) <<
531       producerConsumerBench(
532         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
533         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
534           + folly::to<std::string>(cap)+")",
535         10,
536         10,
537         1000,
538         *caller);
539     cap = 1;
540     LOG(INFO) <<
541       producerConsumerBench(
542         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
543         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
544           + folly::to<std::string>(cap)+")",
545         10,
546         10,
547         1000,
548         *caller);
549   }
550 }
551
552 void runMtProdConsDeterministicDynamic(
553   long seed,
554   uint32_t prods,
555   uint32_t cons,
556   uint32_t numOps,
557   size_t cap,
558   size_t minCap,
559   size_t mult
560 ) {
561   // we use the Bench method, but perf results are meaningless under DSched
562   DSched sched(DSched::uniform(seed));
563
564   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
565                                                 true>>>> callers;
566   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
567                        DeterministicAtomic, true>>>());
568   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
569                        DeterministicAtomic, true>>>());
570   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
571                        DeterministicAtomic, true>>>());
572   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
573                        DeterministicAtomic, true>>>(milliseconds(1)));
574   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
575                        DeterministicAtomic, true>>>(seconds(2)));
576
577   for (const auto& caller : callers) {
578     LOG(INFO) <<
579       producerConsumerBench(
580         MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
581         "MPMCQueue<int, DeterministicAtomic, true>("
582           + folly::to<std::string>(cap) + ", "
583           + folly::to<std::string>(minCap) + ", "
584           + folly::to<std::string>(mult)+")",
585         prods,
586         cons,
587         numOps,
588         *caller);
589   }
590 }
591
592 TEST(MPMCQueue, mt_prod_cons_deterministic) {
593   runMtProdConsDeterministic(0);
594 }
595
596 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
597   runMtProdConsDeterministic<true>(0);
598 }
599
600 template <typename T>
601 void setFromEnv(T& var, const char* envvar) {
602   char* str = std::getenv(envvar);
603   if (str) { var = atoi(str); }
604 }
605
606 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
607   long seed = 0;
608   uint32_t prods = 10;
609   uint32_t cons = 10;
610   uint32_t numOps = 1000;
611   size_t cap = 10000;
612   size_t minCap = 9;
613   size_t mult = 3;
614   setFromEnv(seed, "SEED");
615   setFromEnv(prods, "PRODS");
616   setFromEnv(cons, "CONS");
617   setFromEnv(numOps, "NUM_OPS");
618   setFromEnv(cap, "CAP");
619   setFromEnv(minCap, "MIN_CAP");
620   setFromEnv(mult, "MULT");
621   runMtProdConsDeterministicDynamic(
622     seed, prods, cons, numOps, cap, minCap, mult);
623 }
624
625 #define PC_BENCH(q, np, nc, ...) \
626     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
627
628 template <bool Dynamic = false>
629 void runMtProdCons() {
630   int n = 100000;
631   setFromEnv(n, "NUM_OPS");
632   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, std::atomic, Dynamic>>>>
633     callers;
634   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
635                        std::atomic, Dynamic>>>());
636   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
637                        std::atomic, Dynamic>>>());
638   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int, std::atomic,
639                        Dynamic>>>());
640   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
641                        std::atomic, Dynamic>>>(milliseconds(1)));
642   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
643                        std::atomic, Dynamic>>>(seconds(2)));
644   for (const auto& caller : callers) {
645     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
646                           1, 1, n, *caller);
647     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
648                           10, 1, n, *caller);
649     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
650                           1, 10, n, *caller);
651     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
652                           10, 10, n, *caller);
653     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
654                           1, 1, n, *caller);
655     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
656                           10, 1, n, *caller);
657     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
658                           1, 10, n, *caller);
659     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
660                           10, 10, n, *caller);
661     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(100000)),
662                           32, 100, n, *caller);
663   }
664 }
665
666 TEST(MPMCQueue, mt_prod_cons) {
667   runMtProdCons();
668 }
669
670 TEST(MPMCQueue, mt_prod_cons_dynamic) {
671   runMtProdCons</* Dynamic = */ true>();
672 }
673
674 template <bool Dynamic = false>
675 void runMtProdConsEmulatedFutex() {
676   int n = 100000;
677   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic,
678                                                 Dynamic>>>> callers;
679   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
680                        EmulatedFutexAtomic, Dynamic>>>());
681   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
682                        EmulatedFutexAtomic, Dynamic>>>());
683   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
684                        EmulatedFutexAtomic, Dynamic>>>());
685   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
686                        EmulatedFutexAtomic, Dynamic>>>(milliseconds(1)));
687   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
688                        EmulatedFutexAtomic, Dynamic>>>(seconds(2)));
689   for (const auto& caller : callers) {
690     LOG(INFO) << PC_BENCH(
691       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 1, n, *caller);
692     LOG(INFO) << PC_BENCH(
693       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 1, n, *caller);
694     LOG(INFO) << PC_BENCH(
695       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 10, n, *caller);
696     LOG(INFO) << PC_BENCH(
697       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 10, n, *caller);
698     LOG(INFO) << PC_BENCH(
699       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 1, n, *caller);
700     LOG(INFO) << PC_BENCH(
701       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 10, 1, n, *caller);
702     LOG(INFO) << PC_BENCH(
703       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 10, n, *caller);
704     LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
705                            (10000)), 10, 10, n, *caller);
706     LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
707                            (100000)), 32, 100, n, *caller);
708   }
709 }
710
711 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
712   runMtProdConsEmulatedFutex();
713 }
714
715 TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
716   runMtProdConsEmulatedFutex</* Dynamic = */ true>();
717 }
718
719 template <template <typename> class Atom, bool Dynamic = false>
720 void runNeverFailThread(int numThreads,
721                         int n, /*numOps*/
722                         MPMCQueue<int, Atom, Dynamic>& cq,
723                         std::atomic<uint64_t>& sum,
724                         int t) {
725   uint64_t threadSum = 0;
726   for (int i = t; i < n; i += numThreads) {
727     // enq + deq
728     EXPECT_TRUE(cq.writeIfNotFull(i));
729
730     int dest = -1;
731     EXPECT_TRUE(cq.readIfNotEmpty(dest));
732     EXPECT_TRUE(dest >= 0);
733     threadSum += dest;
734   }
735   sum += threadSum;
736 }
737
738 template <template <typename> class Atom, bool Dynamic = false>
739 uint64_t runNeverFailTest(int numThreads, int numOps) {
740   // always #enq >= #deq
741   MPMCQueue<int, Atom, Dynamic> cq(numThreads);
742
743   uint64_t n = numOps;
744   auto beginMicro = nowMicro();
745
746   vector<std::thread> threads(numThreads);
747   std::atomic<uint64_t> sum(0);
748   for (int t = 0; t < numThreads; ++t) {
749     threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
750                                           numThreads,
751                                           n,
752                                           std::ref(cq),
753                                           std::ref(sum),
754                                           t));
755   }
756   for (auto& t : threads) {
757     DSched::join(t);
758   }
759   EXPECT_TRUE(cq.isEmpty());
760   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
761
762   return nowMicro() - beginMicro;
763 }
764
765 template <template<typename> class Atom, bool Dynamic = false>
766 void runMtNeverFail(std::vector<int>& nts, int n) {
767   for (int nt : nts) {
768     uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
769     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
770               << " threads";
771   }
772 }
773
774 TEST(MPMCQueue, mt_never_fail) {
775   std::vector<int> nts {1, 3, 100};
776   int n = 100000;
777   runMtNeverFail<std::atomic>(nts, n);
778 }
779
780 TEST(MPMCQueue, mt_never_fail_dynamic) {
781   std::vector<int> nts {1, 3, 100};
782   int n = 100000;
783   runMtNeverFail<std::atomic, true>(nts, n);
784 }
785
786 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
787   std::vector<int> nts {1, 3, 100};
788   int n = 100000;
789   runMtNeverFail<EmulatedFutexAtomic>(nts, n);
790 }
791
792 TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) {
793   std::vector<int> nts {1, 3, 100};
794   int n = 100000;
795   runMtNeverFail<EmulatedFutexAtomic, true>(nts, n);
796 }
797
798 template<bool Dynamic = false>
799 void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
800   LOG(INFO) << "using seed " << seed;
801   for (int nt : nts) {
802     {
803       DSched sched(DSched::uniform(seed));
804       runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
805     }
806     {
807       DSched sched(DSched::uniformSubset(seed, 2));
808       runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
809     }
810   }
811 }
812
813 TEST(MPMCQueue, mt_never_fail_deterministic) {
814   std::vector<int> nts {3, 10};
815   long seed = 0; // nowMicro() % 10000;
816   int n = 1000;
817   runMtNeverFailDeterministic(nts, n, seed);
818 }
819
820 TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) {
821   std::vector<int> nts {3, 10};
822   long seed = 0; // nowMicro() % 10000;
823   int n = 1000;
824   runMtNeverFailDeterministic<true>(nts, n, seed);
825 }
826
827 template <class Clock, template <typename> class Atom, bool Dynamic>
828 void runNeverFailUntilThread(int numThreads,
829                              int n, /*numOps*/
830                              MPMCQueue<int, Atom, Dynamic>& cq,
831                              std::atomic<uint64_t>& sum,
832                              int t) {
833   uint64_t threadSum = 0;
834   for (int i = t; i < n; i += numThreads) {
835     // enq + deq
836     auto soon = Clock::now() + std::chrono::seconds(1);
837     EXPECT_TRUE(cq.tryWriteUntil(soon, i));
838
839     int dest = -1;
840     EXPECT_TRUE(cq.readIfNotEmpty(dest));
841     EXPECT_TRUE(dest >= 0);
842     threadSum += dest;
843   }
844   sum += threadSum;
845 }
846
847 template <class Clock, template <typename> class Atom, bool Dynamic = false>
848 uint64_t runNeverFailTest(int numThreads, int numOps) {
849   // always #enq >= #deq
850   MPMCQueue<int, Atom, Dynamic> cq(numThreads);
851
852   uint64_t n = numOps;
853   auto beginMicro = nowMicro();
854
855   vector<std::thread> threads(numThreads);
856   std::atomic<uint64_t> sum(0);
857   for (int t = 0; t < numThreads; ++t) {
858     threads[t] = DSched::thread(std::bind(
859                                   runNeverFailUntilThread<Clock, Atom, Dynamic>,
860                                   numThreads,
861                                   n,
862                                   std::ref(cq),
863                                   std::ref(sum),
864                                   t));
865   }
866   for (auto& t : threads) {
867     DSched::join(t);
868   }
869   EXPECT_TRUE(cq.isEmpty());
870   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
871
872   return nowMicro() - beginMicro;
873 }
874
875 template <bool Dynamic = false>
876 void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
877   for (int nt : nts) {
878     uint64_t elapsed =
879       runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
880     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
881               << " threads";
882   }
883 }
884
885 TEST(MPMCQueue, mt_never_fail_until_system) {
886   std::vector<int> nts {1, 3, 100};
887   int n = 100000;
888   runMtNeverFailUntilSystem(nts, n);
889 }
890
891 TEST(MPMCQueue, mt_never_fail_until_system_dynamic) {
892   std::vector<int> nts {1, 3, 100};
893   int n = 100000;
894   runMtNeverFailUntilSystem<true>(nts, n);
895 }
896
897 template <bool Dynamic = false>
898 void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
899   for (int nt : nts) {
900     uint64_t elapsed =
901       runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
902     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
903               << " threads";
904   }
905 }
906
907 TEST(MPMCQueue, mt_never_fail_until_steady) {
908   std::vector<int> nts {1, 3, 100};
909   int n = 100000;
910   runMtNeverFailUntilSteady(nts, n);
911 }
912
913 TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) {
914   std::vector<int> nts {1, 3, 100};
915   int n = 100000;
916   runMtNeverFailUntilSteady<true>(nts, n);
917 }
918
919 enum LifecycleEvent {
920   NOTHING = -1,
921   DEFAULT_CONSTRUCTOR,
922   COPY_CONSTRUCTOR,
923   MOVE_CONSTRUCTOR,
924   TWO_ARG_CONSTRUCTOR,
925   COPY_OPERATOR,
926   MOVE_OPERATOR,
927   DESTRUCTOR,
928   MAX_LIFECYCLE_EVENT
929 };
930
931 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
932 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
933
934 static int lc_outstanding() {
935   return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
936       lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
937       lc_counts[DESTRUCTOR];
938 }
939
940 static void lc_snap() {
941   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
942     lc_prev[i] = lc_counts[i];
943   }
944 }
945
946 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
947
948 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
949   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
950     int delta = i == what || i == what2 ? 1 : 0;
951     EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
952         << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
953         << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
954         << ", from line " << lineno;
955   }
956   lc_snap();
957 }
958
959 template <typename R>
960 struct Lifecycle {
961   typedef R IsRelocatable;
962
963   bool constructed;
964
965   Lifecycle() noexcept : constructed(true) {
966     ++lc_counts[DEFAULT_CONSTRUCTOR];
967   }
968
969   explicit Lifecycle(int /* n */, char const* /* s */) noexcept
970       : constructed(true) {
971     ++lc_counts[TWO_ARG_CONSTRUCTOR];
972   }
973
974   Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
975     ++lc_counts[COPY_CONSTRUCTOR];
976   }
977
978   Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
979     ++lc_counts[MOVE_CONSTRUCTOR];
980   }
981
982   Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
983     ++lc_counts[COPY_OPERATOR];
984     return *this;
985   }
986
987   Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
988     ++lc_counts[MOVE_OPERATOR];
989     return *this;
990   }
991
992   ~Lifecycle() noexcept {
993     ++lc_counts[DESTRUCTOR];
994     assert(lc_outstanding() >= 0);
995     assert(constructed);
996     constructed = false;
997   }
998 };
999
1000 template <typename R>
1001 void runPerfectForwardingTest() {
1002   lc_snap();
1003   EXPECT_EQ(lc_outstanding(), 0);
1004
1005   {
1006     // Non-dynamic only. False positive for dynamic.
1007     MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
1008     LIFECYCLE_STEP(NOTHING);
1009
1010     for (int pass = 0; pass < 10; ++pass) {
1011       for (int i = 0; i < 10; ++i) {
1012         queue.blockingWrite();
1013         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1014
1015         queue.blockingWrite(1, "one");
1016         LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1017
1018         {
1019           Lifecycle<R> src;
1020           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1021           queue.blockingWrite(std::move(src));
1022           LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1023         }
1024         LIFECYCLE_STEP(DESTRUCTOR);
1025
1026         {
1027           Lifecycle<R> src;
1028           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1029           queue.blockingWrite(src);
1030           LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1031         }
1032         LIFECYCLE_STEP(DESTRUCTOR);
1033
1034         EXPECT_TRUE(queue.write());
1035         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1036       }
1037
1038       EXPECT_EQ(queue.size(), 50);
1039       EXPECT_FALSE(queue.write(2, "two"));
1040       LIFECYCLE_STEP(NOTHING);
1041
1042       for (int i = 0; i < 50; ++i) {
1043         {
1044           Lifecycle<R> node;
1045           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1046
1047           queue.blockingRead(node);
1048           if (R::value) {
1049             // relocatable, moved via memcpy
1050             LIFECYCLE_STEP(DESTRUCTOR);
1051           } else {
1052             LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1053           }
1054         }
1055         LIFECYCLE_STEP(DESTRUCTOR);
1056       }
1057
1058       EXPECT_EQ(queue.size(), 0);
1059     }
1060
1061     // put one element back before destruction
1062     {
1063       Lifecycle<R> src(3, "three");
1064       LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1065       queue.write(std::move(src));
1066       LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1067     }
1068     LIFECYCLE_STEP(DESTRUCTOR); // destroy src
1069   }
1070   LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
1071
1072   EXPECT_EQ(lc_outstanding(), 0);
1073 }
1074
1075 TEST(MPMCQueue, perfect_forwarding) {
1076   runPerfectForwardingTest<std::false_type>();
1077 }
1078
1079 TEST(MPMCQueue, perfect_forwarding_relocatable) {
1080   runPerfectForwardingTest<std::true_type>();
1081 }
1082
1083 template <bool Dynamic = false>
1084 void run_queue_moving() {
1085   lc_snap();
1086   EXPECT_EQ(lc_outstanding(), 0);
1087
1088   {
1089     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
1090     LIFECYCLE_STEP(NOTHING);
1091
1092     a.blockingWrite();
1093     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1094
1095     // move constructor
1096     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
1097       = std::move(a);
1098     LIFECYCLE_STEP(NOTHING);
1099     EXPECT_EQ(a.capacity(), 0);
1100     EXPECT_EQ(a.size(), 0);
1101     EXPECT_EQ(b.capacity(), 50);
1102     EXPECT_EQ(b.size(), 1);
1103
1104     b.blockingWrite();
1105     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1106
1107     // move operator
1108     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
1109     LIFECYCLE_STEP(NOTHING);
1110     c = std::move(b);
1111     LIFECYCLE_STEP(NOTHING);
1112     EXPECT_EQ(c.capacity(), 50);
1113     EXPECT_EQ(c.size(), 2);
1114
1115     {
1116       Lifecycle<std::false_type> dst;
1117       LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1118       c.blockingRead(dst);
1119       LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1120
1121       {
1122         // swap
1123         MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
1124         LIFECYCLE_STEP(NOTHING);
1125         std::swap(c, d);
1126         LIFECYCLE_STEP(NOTHING);
1127         EXPECT_EQ(c.capacity(), 10);
1128         EXPECT_TRUE(c.isEmpty());
1129         EXPECT_EQ(d.capacity(), 50);
1130         EXPECT_EQ(d.size(), 1);
1131
1132         d.blockingRead(dst);
1133         LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1134
1135         c.blockingWrite(dst);
1136         LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1137
1138         d.blockingWrite(std::move(dst));
1139         LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1140       } // d goes out of scope
1141       LIFECYCLE_STEP(DESTRUCTOR);
1142     } // dst goes out of scope
1143     LIFECYCLE_STEP(DESTRUCTOR);
1144   } // c goes out of scope
1145   LIFECYCLE_STEP(DESTRUCTOR);
1146 }
1147
1148 TEST(MPMCQueue, queue_moving) {
1149   run_queue_moving();
1150 }
1151
1152 TEST(MPMCQueue, queue_moving_dynamic) {
1153   run_queue_moving<true>();
1154 }
1155
1156 TEST(MPMCQueue, explicit_zero_capacity_fail) {
1157   ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
1158
1159   using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
1160   ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
1161 }