Fix SimpleBarrier
[folly.git] / folly / test / MPMCQueueTest.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/Format.h>
18 #include <folly/MPMCQueue.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/GTest.h>
21 #include <folly/portability/SysResource.h>
22 #include <folly/portability/SysTime.h>
23 #include <folly/portability/Unistd.h>
24 #include <folly/stop_watch.h>
25 #include <folly/test/DeterministicSchedule.h>
26
27 #include <boost/intrusive_ptr.hpp>
28 #include <boost/thread/barrier.hpp>
29 #include <functional>
30 #include <memory>
31 #include <thread>
32 #include <utility>
33
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
35
36 using namespace folly;
37 using namespace detail;
38 using namespace test;
39 using std::chrono::time_point;
40 using std::chrono::steady_clock;
41 using std::chrono::seconds;
42 using std::chrono::milliseconds;
43 using std::string;
44 using std::unique_ptr;
45 using std::vector;
46
47 typedef DeterministicSchedule DSched;
48
49 template <template<typename> class Atom>
50 void run_mt_sequencer_thread(
51     int numThreads,
52     int numOps,
53     uint32_t init,
54     TurnSequencer<Atom>& seq,
55     Atom<uint32_t>& spinThreshold,
56     int& prev,
57     int i) {
58   for (int op = i; op < numOps; op += numThreads) {
59     seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
60     EXPECT_EQ(prev, op - 1);
61     prev = op;
62     seq.completeTurn(init + op);
63   }
64 }
65
66 template <template<typename> class Atom>
67 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
68   TurnSequencer<Atom> seq(init);
69   Atom<uint32_t> spinThreshold(0);
70
71   int prev = -1;
72   vector<std::thread> threads(numThreads);
73   for (int i = 0; i < numThreads; ++i) {
74     threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
75           numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
76           std::ref(prev), i));
77   }
78
79   for (auto& thr : threads) {
80     DSched::join(thr);
81   }
82
83   EXPECT_EQ(prev, numOps - 1);
84 }
85
86 TEST(MPMCQueue, sequencer) {
87   run_mt_sequencer_test<std::atomic>(1, 100, 0);
88   run_mt_sequencer_test<std::atomic>(2, 100000, -100);
89   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
90 }
91
92 TEST(MPMCQueue, sequencer_emulated_futex) {
93   run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
94   run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
95   run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
96 }
97
98 TEST(MPMCQueue, sequencer_deterministic) {
99   DSched sched(DSched::uniform(0));
100   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
101   run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
102   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
103 }
104
105 template <bool Dynamic = false, typename T>
106 void runElementTypeTest(T&& src) {
107   MPMCQueue<T, std::atomic, Dynamic> cq(10);
108   cq.blockingWrite(std::forward<T>(src));
109   T dest;
110   cq.blockingRead(dest);
111   EXPECT_TRUE(cq.write(std::move(dest)));
112   EXPECT_TRUE(cq.read(dest));
113   auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
114   EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
115   EXPECT_TRUE(cq.read(dest));
116   auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
117   EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
118   EXPECT_TRUE(cq.read(dest));
119 }
120
121 struct RefCounted {
122   static FOLLY_TLS int active_instances;
123
124   mutable std::atomic<int> rc;
125
126   RefCounted() : rc(0) {
127     ++active_instances;
128   }
129
130   ~RefCounted() {
131     --active_instances;
132   }
133 };
134 FOLLY_TLS int RefCounted::active_instances;
135
136 void intrusive_ptr_add_ref(RefCounted const* p) {
137   p->rc++;
138 }
139
140 void intrusive_ptr_release(RefCounted const* p) {
141   if (--(p->rc) == 0) {
142     delete p;
143   }
144 }
145
146 TEST(MPMCQueue, lots_of_element_types) {
147   runElementTypeTest(10);
148   runElementTypeTest(string("abc"));
149   runElementTypeTest(std::make_pair(10, string("def")));
150   runElementTypeTest(vector<string>{{"abc"}});
151   runElementTypeTest(std::make_shared<char>('a'));
152   runElementTypeTest(folly::make_unique<char>('a'));
153   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
154   EXPECT_EQ(RefCounted::active_instances, 0);
155 }
156
157 TEST(MPMCQueue, lots_of_element_types_dynamic) {
158   runElementTypeTest<true>(10);
159   runElementTypeTest<true>(string("abc"));
160   runElementTypeTest<true>(std::make_pair(10, string("def")));
161   runElementTypeTest<true>(vector<string>{{"abc"}});
162   runElementTypeTest<true>(std::make_shared<char>('a'));
163   runElementTypeTest<true>(folly::make_unique<char>('a'));
164   runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
165   EXPECT_EQ(RefCounted::active_instances, 0);
166 }
167
168 TEST(MPMCQueue, single_thread_enqdeq) {
169   // Non-dynamic version only.
170   // False positive for dynamic version. Capacity can be temporarily
171   // higher than specified.
172   MPMCQueue<int> cq(10);
173
174   for (int pass = 0; pass < 10; ++pass) {
175     for (int i = 0; i < 10; ++i) {
176       EXPECT_TRUE(cq.write(i));
177     }
178     EXPECT_FALSE(cq.write(-1));
179     EXPECT_FALSE(cq.isEmpty());
180     EXPECT_EQ(cq.size(), 10);
181
182     for (int i = 0; i < 5; ++i) {
183       int dest = -1;
184       EXPECT_TRUE(cq.read(dest));
185       EXPECT_EQ(dest, i);
186     }
187     for (int i = 5; i < 10; ++i) {
188       int dest = -1;
189       cq.blockingRead(dest);
190       EXPECT_EQ(dest, i);
191     }
192     int dest = -1;
193     EXPECT_FALSE(cq.read(dest));
194     EXPECT_EQ(dest, -1);
195
196     EXPECT_TRUE(cq.isEmpty());
197     EXPECT_EQ(cq.size(), 0);
198   }
199 }
200
201 TEST(MPMCQueue, tryenq_capacity_test) {
202   // Non-dynamic version only.
203   // False positive for dynamic version. Capacity can be temporarily
204   // higher than specified.
205   for (size_t cap = 1; cap < 100; ++cap) {
206     MPMCQueue<int> cq(cap);
207     for (size_t i = 0; i < cap; ++i) {
208       EXPECT_TRUE(cq.write(i));
209     }
210     EXPECT_FALSE(cq.write(100));
211   }
212 }
213
214 TEST(MPMCQueue, enq_capacity_test) {
215   // Non-dynamic version only.
216   // False positive for dynamic version. Capacity can be temporarily
217   // higher than specified.
218   for (auto cap : { 1, 100, 10000 }) {
219     MPMCQueue<int> cq(cap);
220     for (int i = 0; i < cap; ++i) {
221       cq.blockingWrite(i);
222     }
223     int t = 0;
224     int when;
225     auto thr = std::thread([&]{
226       cq.blockingWrite(100);
227       when = t;
228     });
229     usleep(2000);
230     t = 1;
231     int dummy;
232     cq.blockingRead(dummy);
233     thr.join();
234     EXPECT_EQ(when, 1);
235   }
236 }
237
238 template <template<typename> class Atom, bool Dynamic = false>
239 void runTryEnqDeqThread(
240     int numThreads,
241     int n, /*numOps*/
242     MPMCQueue<int, Atom, Dynamic>& cq,
243     std::atomic<uint64_t>& sum,
244     int t) {
245   uint64_t threadSum = 0;
246   int src = t;
247   // received doesn't reflect any actual values, we just start with
248   // t and increment by numThreads to get the rounding of termination
249   // correct if numThreads doesn't evenly divide numOps
250   int received = t;
251   while (src < n || received < n) {
252     if (src < n && cq.write(src)) {
253       src += numThreads;
254     }
255
256     int dst;
257     if (received < n && cq.read(dst)) {
258       received += numThreads;
259       threadSum += dst;
260     }
261   }
262   sum += threadSum;
263 }
264
265 template <template<typename> class Atom, bool Dynamic = false>
266 void runTryEnqDeqTest(int numThreads, int numOps) {
267   // write and read aren't linearizable, so we don't have
268   // hard guarantees on their individual behavior.  We can still test
269   // correctness in aggregate
270   MPMCQueue<int,Atom, Dynamic> cq(numThreads);
271
272   uint64_t n = numOps;
273   vector<std::thread> threads(numThreads);
274   std::atomic<uint64_t> sum(0);
275   for (int t = 0; t < numThreads; ++t) {
276     threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
277           numThreads, n, std::ref(cq), std::ref(sum), t));
278   }
279   for (auto& t : threads) {
280     DSched::join(t);
281   }
282   EXPECT_TRUE(cq.isEmpty());
283   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
284 }
285
286 TEST(MPMCQueue, mt_try_enq_deq) {
287   int nts[] = { 1, 3, 100 };
288
289   int n = 100000;
290   for (int nt : nts) {
291     runTryEnqDeqTest<std::atomic>(nt, n);
292   }
293 }
294
295 TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
296   int nts[] = { 1, 3, 100 };
297
298   int n = 100000;
299   for (int nt : nts) {
300     runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
301   }
302 }
303
304 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
305   int nts[] = { 1, 3, 100 };
306
307   int n = 100000;
308   for (int nt : nts) {
309     runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
310   }
311 }
312
313 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
314   int nts[] = { 1, 3, 100 };
315
316   int n = 100000;
317   for (int nt : nts) {
318     runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
319   }
320 }
321
322 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
323   int nts[] = { 3, 10 };
324
325   long seed = 0;
326   LOG(INFO) << "using seed " << seed;
327
328   int n = 1000;
329   for (int nt : nts) {
330     {
331       DSched sched(DSched::uniform(seed));
332       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
333     }
334     {
335       DSched sched(DSched::uniformSubset(seed, 2));
336       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
337     }
338     {
339       DSched sched(DSched::uniform(seed));
340       runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
341     }
342     {
343       DSched sched(DSched::uniformSubset(seed, 2));
344       runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
345     }
346   }
347 }
348
349 uint64_t nowMicro() {
350   timeval tv;
351   gettimeofday(&tv, 0);
352   return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
353 }
354
355 template <typename Q>
356 struct WriteMethodCaller {
357   WriteMethodCaller() {}
358   virtual ~WriteMethodCaller() = default;
359   virtual bool callWrite(Q& q, int i) = 0;
360   virtual string methodName() = 0;
361 };
362
363 template <typename Q>
364 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
365   bool callWrite(Q& q, int i) override {
366     q.blockingWrite(i);
367     return true;
368   }
369   string methodName() override { return "blockingWrite"; }
370 };
371
372 template <typename Q>
373 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
374   bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
375   string methodName() override { return "writeIfNotFull"; }
376 };
377
378 template <typename Q>
379 struct WriteCaller : public WriteMethodCaller<Q> {
380   bool callWrite(Q& q, int i) override { return q.write(i); }
381   string methodName() override { return "write"; }
382 };
383
384 template <typename Q,
385           class Clock = steady_clock,
386           class Duration = typename Clock::duration>
387 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
388   const Duration duration_;
389   explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
390   bool callWrite(Q& q, int i) override {
391     auto then = Clock::now() + duration_;
392     return q.tryWriteUntil(then, i);
393   }
394   string methodName() override {
395     return folly::sformat(
396         "tryWriteUntil({}ms)",
397         std::chrono::duration_cast<milliseconds>(duration_).count());
398   }
399 };
400
401 template <typename Q>
402 string producerConsumerBench(Q&& queue,
403                              string qName,
404                              int numProducers,
405                              int numConsumers,
406                              int numOps,
407                              WriteMethodCaller<Q>& writer,
408                              bool ignoreContents = false) {
409   Q& q = queue;
410
411   struct rusage beginUsage;
412   getrusage(RUSAGE_SELF, &beginUsage);
413
414   auto beginMicro = nowMicro();
415
416   uint64_t n = numOps;
417   std::atomic<uint64_t> sum(0);
418   std::atomic<uint64_t> failed(0);
419
420   vector<std::thread> producers(numProducers);
421   for (int t = 0; t < numProducers; ++t) {
422     producers[t] = DSched::thread([&,t]{
423       for (int i = t; i < numOps; i += numProducers) {
424         while (!writer.callWrite(q, i)) {
425           ++failed;
426         }
427       }
428     });
429   }
430
431   vector<std::thread> consumers(numConsumers);
432   for (int t = 0; t < numConsumers; ++t) {
433     consumers[t] = DSched::thread([&,t]{
434       uint64_t localSum = 0;
435       for (int i = t; i < numOps; i += numConsumers) {
436         int dest = -1;
437         q.blockingRead(dest);
438         EXPECT_FALSE(dest == -1);
439         localSum += dest;
440       }
441       sum += localSum;
442     });
443   }
444
445   for (auto& t : producers) {
446     DSched::join(t);
447   }
448   for (auto& t : consumers) {
449     DSched::join(t);
450   }
451   if (!ignoreContents) {
452     EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
453   }
454
455   auto endMicro = nowMicro();
456
457   struct rusage endUsage;
458   getrusage(RUSAGE_SELF, &endUsage);
459
460   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
461   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
462       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
463   uint64_t failures = failed;
464   size_t allocated = q.allocatedCapacity();
465
466   return folly::sformat(
467       "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
468       "handoff, {} failures, {} allocated",
469       qName,
470       numProducers,
471       writer.methodName(),
472       numConsumers,
473       nanosPer,
474       csw,
475       n,
476       failures,
477       allocated);
478 }
479
480 template <bool Dynamic = false>
481 void runMtProdConsDeterministic(long seed) {
482   // we use the Bench method, but perf results are meaningless under DSched
483   DSched sched(DSched::uniform(seed));
484
485   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
486                                                 Dynamic>>>> callers;
487   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
488                        DeterministicAtomic, Dynamic>>>());
489   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
490                        DeterministicAtomic, Dynamic>>>());
491   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
492                        DeterministicAtomic, Dynamic>>>());
493   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
494                        DeterministicAtomic, Dynamic>>>(milliseconds(1)));
495   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
496                        DeterministicAtomic, Dynamic>>>(seconds(2)));
497   size_t cap;
498
499   for (const auto& caller : callers) {
500     cap = 10;
501     LOG(INFO) <<
502       producerConsumerBench(
503         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
504         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
505           + folly::to<std::string>(cap)+")",
506         1,
507         1,
508         1000,
509         *caller);
510     cap = 100;
511     LOG(INFO) <<
512       producerConsumerBench(
513         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
514         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
515           + folly::to<std::string>(cap)+")",
516         10,
517         10,
518         1000,
519         *caller);
520     cap = 10;
521     LOG(INFO) <<
522       producerConsumerBench(
523         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
524         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
525           + folly::to<std::string>(cap)+")",
526         1,
527         1,
528         1000,
529         *caller);
530     cap = 100;
531     LOG(INFO) <<
532       producerConsumerBench(
533         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
534         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
535           + folly::to<std::string>(cap)+")",
536         10,
537         10,
538         1000,
539         *caller);
540     cap = 1;
541     LOG(INFO) <<
542       producerConsumerBench(
543         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
544         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
545           + folly::to<std::string>(cap)+")",
546         10,
547         10,
548         1000,
549         *caller);
550   }
551 }
552
553 void runMtProdConsDeterministicDynamic(
554   long seed,
555   uint32_t prods,
556   uint32_t cons,
557   uint32_t numOps,
558   size_t cap,
559   size_t minCap,
560   size_t mult
561 ) {
562   // we use the Bench method, but perf results are meaningless under DSched
563   DSched sched(DSched::uniform(seed));
564
565   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
566                                                 true>>>> callers;
567   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
568                        DeterministicAtomic, true>>>());
569   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
570                        DeterministicAtomic, true>>>());
571   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
572                        DeterministicAtomic, true>>>());
573   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
574                        DeterministicAtomic, true>>>(milliseconds(1)));
575   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
576                        DeterministicAtomic, true>>>(seconds(2)));
577
578   for (const auto& caller : callers) {
579     LOG(INFO) <<
580       producerConsumerBench(
581         MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
582         "MPMCQueue<int, DeterministicAtomic, true>("
583           + folly::to<std::string>(cap) + ", "
584           + folly::to<std::string>(minCap) + ", "
585           + folly::to<std::string>(mult)+")",
586         prods,
587         cons,
588         numOps,
589         *caller);
590   }
591 }
592
593 TEST(MPMCQueue, mt_prod_cons_deterministic) {
594   runMtProdConsDeterministic(0);
595 }
596
597 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
598   runMtProdConsDeterministic<true>(0);
599 }
600
601 template <typename T>
602 void setFromEnv(T& var, const char* envvar) {
603   char* str = std::getenv(envvar);
604   if (str) { var = atoi(str); }
605 }
606
607 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
608   long seed = 0;
609   uint32_t prods = 10;
610   uint32_t cons = 10;
611   uint32_t numOps = 1000;
612   size_t cap = 10000;
613   size_t minCap = 9;
614   size_t mult = 3;
615   setFromEnv(seed, "SEED");
616   setFromEnv(prods, "PRODS");
617   setFromEnv(cons, "CONS");
618   setFromEnv(numOps, "NUM_OPS");
619   setFromEnv(cap, "CAP");
620   setFromEnv(minCap, "MIN_CAP");
621   setFromEnv(mult, "MULT");
622   runMtProdConsDeterministicDynamic(
623     seed, prods, cons, numOps, cap, minCap, mult);
624 }
625
626 #define PC_BENCH(q, np, nc, ...) \
627     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
628
629 template <bool Dynamic = false>
630 void runMtProdCons() {
631   int n = 100000;
632   setFromEnv(n, "NUM_OPS");
633   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, std::atomic, Dynamic>>>>
634     callers;
635   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
636                        std::atomic, Dynamic>>>());
637   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
638                        std::atomic, Dynamic>>>());
639   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int, std::atomic,
640                        Dynamic>>>());
641   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
642                        std::atomic, Dynamic>>>(milliseconds(1)));
643   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
644                        std::atomic, Dynamic>>>(seconds(2)));
645   for (const auto& caller : callers) {
646     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
647                           1, 1, n, *caller);
648     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
649                           10, 1, n, *caller);
650     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
651                           1, 10, n, *caller);
652     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
653                           10, 10, n, *caller);
654     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
655                           1, 1, n, *caller);
656     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
657                           10, 1, n, *caller);
658     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
659                           1, 10, n, *caller);
660     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
661                           10, 10, n, *caller);
662     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(100000)),
663                           32, 100, n, *caller);
664   }
665 }
666
667 TEST(MPMCQueue, mt_prod_cons) {
668   runMtProdCons();
669 }
670
671 TEST(MPMCQueue, mt_prod_cons_dynamic) {
672   runMtProdCons</* Dynamic = */ true>();
673 }
674
675 template <bool Dynamic = false>
676 void runMtProdConsEmulatedFutex() {
677   int n = 100000;
678   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic,
679                                                 Dynamic>>>> callers;
680   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
681                        EmulatedFutexAtomic, Dynamic>>>());
682   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
683                        EmulatedFutexAtomic, Dynamic>>>());
684   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
685                        EmulatedFutexAtomic, Dynamic>>>());
686   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
687                        EmulatedFutexAtomic, Dynamic>>>(milliseconds(1)));
688   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
689                        EmulatedFutexAtomic, Dynamic>>>(seconds(2)));
690   for (const auto& caller : callers) {
691     LOG(INFO) << PC_BENCH(
692       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 1, n, *caller);
693     LOG(INFO) << PC_BENCH(
694       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 1, n, *caller);
695     LOG(INFO) << PC_BENCH(
696       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 10, n, *caller);
697     LOG(INFO) << PC_BENCH(
698       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 10, n, *caller);
699     LOG(INFO) << PC_BENCH(
700       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 1, n, *caller);
701     LOG(INFO) << PC_BENCH(
702       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 10, 1, n, *caller);
703     LOG(INFO) << PC_BENCH(
704       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 10, n, *caller);
705     LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
706                            (10000)), 10, 10, n, *caller);
707     LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
708                            (100000)), 32, 100, n, *caller);
709   }
710 }
711
712 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
713   runMtProdConsEmulatedFutex();
714 }
715
716 TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
717   runMtProdConsEmulatedFutex</* Dynamic = */ true>();
718 }
719
720 template <template <typename> class Atom, bool Dynamic = false>
721 void runNeverFailThread(int numThreads,
722                         int n, /*numOps*/
723                         MPMCQueue<int, Atom, Dynamic>& cq,
724                         std::atomic<uint64_t>& sum,
725                         int t) {
726   uint64_t threadSum = 0;
727   for (int i = t; i < n; i += numThreads) {
728     // enq + deq
729     EXPECT_TRUE(cq.writeIfNotFull(i));
730
731     int dest = -1;
732     EXPECT_TRUE(cq.readIfNotEmpty(dest));
733     EXPECT_TRUE(dest >= 0);
734     threadSum += dest;
735   }
736   sum += threadSum;
737 }
738
739 template <template <typename> class Atom, bool Dynamic = false>
740 uint64_t runNeverFailTest(int numThreads, int numOps) {
741   // always #enq >= #deq
742   MPMCQueue<int, Atom, Dynamic> cq(numThreads);
743
744   uint64_t n = numOps;
745   auto beginMicro = nowMicro();
746
747   vector<std::thread> threads(numThreads);
748   std::atomic<uint64_t> sum(0);
749   for (int t = 0; t < numThreads; ++t) {
750     threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
751                                           numThreads,
752                                           n,
753                                           std::ref(cq),
754                                           std::ref(sum),
755                                           t));
756   }
757   for (auto& t : threads) {
758     DSched::join(t);
759   }
760   EXPECT_TRUE(cq.isEmpty());
761   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
762
763   return nowMicro() - beginMicro;
764 }
765
766 template <template<typename> class Atom, bool Dynamic = false>
767 void runMtNeverFail(std::vector<int>& nts, int n) {
768   for (int nt : nts) {
769     uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
770     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
771               << " threads";
772   }
773 }
774
775 TEST(MPMCQueue, mt_never_fail) {
776   std::vector<int> nts {1, 3, 100};
777   int n = 100000;
778   runMtNeverFail<std::atomic>(nts, n);
779 }
780
781 TEST(MPMCQueue, mt_never_fail_dynamic) {
782   std::vector<int> nts {1, 3, 100};
783   int n = 100000;
784   runMtNeverFail<std::atomic, true>(nts, n);
785 }
786
787 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
788   std::vector<int> nts {1, 3, 100};
789   int n = 100000;
790   runMtNeverFail<EmulatedFutexAtomic>(nts, n);
791 }
792
793 TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) {
794   std::vector<int> nts {1, 3, 100};
795   int n = 100000;
796   runMtNeverFail<EmulatedFutexAtomic, true>(nts, n);
797 }
798
799 template<bool Dynamic = false>
800 void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
801   LOG(INFO) << "using seed " << seed;
802   for (int nt : nts) {
803     {
804       DSched sched(DSched::uniform(seed));
805       runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
806     }
807     {
808       DSched sched(DSched::uniformSubset(seed, 2));
809       runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
810     }
811   }
812 }
813
814 TEST(MPMCQueue, mt_never_fail_deterministic) {
815   std::vector<int> nts {3, 10};
816   long seed = 0; // nowMicro() % 10000;
817   int n = 1000;
818   runMtNeverFailDeterministic(nts, n, seed);
819 }
820
821 TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) {
822   std::vector<int> nts {3, 10};
823   long seed = 0; // nowMicro() % 10000;
824   int n = 1000;
825   runMtNeverFailDeterministic<true>(nts, n, seed);
826 }
827
828 template <class Clock, template <typename> class Atom, bool Dynamic>
829 void runNeverFailUntilThread(int numThreads,
830                              int n, /*numOps*/
831                              MPMCQueue<int, Atom, Dynamic>& cq,
832                              std::atomic<uint64_t>& sum,
833                              int t) {
834   uint64_t threadSum = 0;
835   for (int i = t; i < n; i += numThreads) {
836     // enq + deq
837     auto soon = Clock::now() + std::chrono::seconds(1);
838     EXPECT_TRUE(cq.tryWriteUntil(soon, i));
839
840     int dest = -1;
841     EXPECT_TRUE(cq.readIfNotEmpty(dest));
842     EXPECT_TRUE(dest >= 0);
843     threadSum += dest;
844   }
845   sum += threadSum;
846 }
847
848 template <class Clock, template <typename> class Atom, bool Dynamic = false>
849 uint64_t runNeverFailTest(int numThreads, int numOps) {
850   // always #enq >= #deq
851   MPMCQueue<int, Atom, Dynamic> cq(numThreads);
852
853   uint64_t n = numOps;
854   auto beginMicro = nowMicro();
855
856   vector<std::thread> threads(numThreads);
857   std::atomic<uint64_t> sum(0);
858   for (int t = 0; t < numThreads; ++t) {
859     threads[t] = DSched::thread(std::bind(
860                                   runNeverFailUntilThread<Clock, Atom, Dynamic>,
861                                   numThreads,
862                                   n,
863                                   std::ref(cq),
864                                   std::ref(sum),
865                                   t));
866   }
867   for (auto& t : threads) {
868     DSched::join(t);
869   }
870   EXPECT_TRUE(cq.isEmpty());
871   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
872
873   return nowMicro() - beginMicro;
874 }
875
876 template <bool Dynamic = false>
877 void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
878   for (int nt : nts) {
879     uint64_t elapsed =
880       runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
881     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
882               << " threads";
883   }
884 }
885
886 TEST(MPMCQueue, mt_never_fail_until_system) {
887   std::vector<int> nts {1, 3, 100};
888   int n = 100000;
889   runMtNeverFailUntilSystem(nts, n);
890 }
891
892 TEST(MPMCQueue, mt_never_fail_until_system_dynamic) {
893   std::vector<int> nts {1, 3, 100};
894   int n = 100000;
895   runMtNeverFailUntilSystem<true>(nts, n);
896 }
897
898 template <bool Dynamic = false>
899 void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
900   for (int nt : nts) {
901     uint64_t elapsed =
902       runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
903     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
904               << " threads";
905   }
906 }
907
908 TEST(MPMCQueue, mt_never_fail_until_steady) {
909   std::vector<int> nts {1, 3, 100};
910   int n = 100000;
911   runMtNeverFailUntilSteady(nts, n);
912 }
913
914 TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) {
915   std::vector<int> nts {1, 3, 100};
916   int n = 100000;
917   runMtNeverFailUntilSteady<true>(nts, n);
918 }
919
920 enum LifecycleEvent {
921   NOTHING = -1,
922   DEFAULT_CONSTRUCTOR,
923   COPY_CONSTRUCTOR,
924   MOVE_CONSTRUCTOR,
925   TWO_ARG_CONSTRUCTOR,
926   COPY_OPERATOR,
927   MOVE_OPERATOR,
928   DESTRUCTOR,
929   MAX_LIFECYCLE_EVENT
930 };
931
932 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
933 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
934
935 static int lc_outstanding() {
936   return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
937       lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
938       lc_counts[DESTRUCTOR];
939 }
940
941 static void lc_snap() {
942   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
943     lc_prev[i] = lc_counts[i];
944   }
945 }
946
947 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
948
949 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
950   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
951     int delta = i == what || i == what2 ? 1 : 0;
952     EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
953         << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
954         << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
955         << ", from line " << lineno;
956   }
957   lc_snap();
958 }
959
960 template <typename R>
961 struct Lifecycle {
962   typedef R IsRelocatable;
963
964   bool constructed;
965
966   Lifecycle() noexcept : constructed(true) {
967     ++lc_counts[DEFAULT_CONSTRUCTOR];
968   }
969
970   explicit Lifecycle(int /* n */, char const* /* s */) noexcept
971       : constructed(true) {
972     ++lc_counts[TWO_ARG_CONSTRUCTOR];
973   }
974
975   Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
976     ++lc_counts[COPY_CONSTRUCTOR];
977   }
978
979   Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
980     ++lc_counts[MOVE_CONSTRUCTOR];
981   }
982
983   Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
984     ++lc_counts[COPY_OPERATOR];
985     return *this;
986   }
987
988   Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
989     ++lc_counts[MOVE_OPERATOR];
990     return *this;
991   }
992
993   ~Lifecycle() noexcept {
994     ++lc_counts[DESTRUCTOR];
995     assert(lc_outstanding() >= 0);
996     assert(constructed);
997     constructed = false;
998   }
999 };
1000
1001 template <typename R>
1002 void runPerfectForwardingTest() {
1003   lc_snap();
1004   EXPECT_EQ(lc_outstanding(), 0);
1005
1006   {
1007     // Non-dynamic only. False positive for dynamic.
1008     MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
1009     LIFECYCLE_STEP(NOTHING);
1010
1011     for (int pass = 0; pass < 10; ++pass) {
1012       for (int i = 0; i < 10; ++i) {
1013         queue.blockingWrite();
1014         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1015
1016         queue.blockingWrite(1, "one");
1017         LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1018
1019         {
1020           Lifecycle<R> src;
1021           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1022           queue.blockingWrite(std::move(src));
1023           LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1024         }
1025         LIFECYCLE_STEP(DESTRUCTOR);
1026
1027         {
1028           Lifecycle<R> src;
1029           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1030           queue.blockingWrite(src);
1031           LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1032         }
1033         LIFECYCLE_STEP(DESTRUCTOR);
1034
1035         EXPECT_TRUE(queue.write());
1036         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1037       }
1038
1039       EXPECT_EQ(queue.size(), 50);
1040       EXPECT_FALSE(queue.write(2, "two"));
1041       LIFECYCLE_STEP(NOTHING);
1042
1043       for (int i = 0; i < 50; ++i) {
1044         {
1045           Lifecycle<R> node;
1046           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1047
1048           queue.blockingRead(node);
1049           if (R::value) {
1050             // relocatable, moved via memcpy
1051             LIFECYCLE_STEP(DESTRUCTOR);
1052           } else {
1053             LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1054           }
1055         }
1056         LIFECYCLE_STEP(DESTRUCTOR);
1057       }
1058
1059       EXPECT_EQ(queue.size(), 0);
1060     }
1061
1062     // put one element back before destruction
1063     {
1064       Lifecycle<R> src(3, "three");
1065       LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1066       queue.write(std::move(src));
1067       LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1068     }
1069     LIFECYCLE_STEP(DESTRUCTOR); // destroy src
1070   }
1071   LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
1072
1073   EXPECT_EQ(lc_outstanding(), 0);
1074 }
1075
1076 TEST(MPMCQueue, perfect_forwarding) {
1077   runPerfectForwardingTest<std::false_type>();
1078 }
1079
1080 TEST(MPMCQueue, perfect_forwarding_relocatable) {
1081   runPerfectForwardingTest<std::true_type>();
1082 }
1083
1084 template <bool Dynamic = false>
1085 void run_queue_moving() {
1086   lc_snap();
1087   EXPECT_EQ(lc_outstanding(), 0);
1088
1089   {
1090     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
1091     LIFECYCLE_STEP(NOTHING);
1092
1093     a.blockingWrite();
1094     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1095
1096     // move constructor
1097     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
1098       = std::move(a);
1099     LIFECYCLE_STEP(NOTHING);
1100     EXPECT_EQ(a.capacity(), 0);
1101     EXPECT_EQ(a.size(), 0);
1102     EXPECT_EQ(b.capacity(), 50);
1103     EXPECT_EQ(b.size(), 1);
1104
1105     b.blockingWrite();
1106     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1107
1108     // move operator
1109     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
1110     LIFECYCLE_STEP(NOTHING);
1111     c = std::move(b);
1112     LIFECYCLE_STEP(NOTHING);
1113     EXPECT_EQ(c.capacity(), 50);
1114     EXPECT_EQ(c.size(), 2);
1115
1116     {
1117       Lifecycle<std::false_type> dst;
1118       LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1119       c.blockingRead(dst);
1120       LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1121
1122       {
1123         // swap
1124         MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
1125         LIFECYCLE_STEP(NOTHING);
1126         std::swap(c, d);
1127         LIFECYCLE_STEP(NOTHING);
1128         EXPECT_EQ(c.capacity(), 10);
1129         EXPECT_TRUE(c.isEmpty());
1130         EXPECT_EQ(d.capacity(), 50);
1131         EXPECT_EQ(d.size(), 1);
1132
1133         d.blockingRead(dst);
1134         LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1135
1136         c.blockingWrite(dst);
1137         LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1138
1139         d.blockingWrite(std::move(dst));
1140         LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1141       } // d goes out of scope
1142       LIFECYCLE_STEP(DESTRUCTOR);
1143     } // dst goes out of scope
1144     LIFECYCLE_STEP(DESTRUCTOR);
1145   } // c goes out of scope
1146   LIFECYCLE_STEP(DESTRUCTOR);
1147 }
1148
1149 TEST(MPMCQueue, queue_moving) {
1150   run_queue_moving();
1151 }
1152
1153 TEST(MPMCQueue, queue_moving_dynamic) {
1154   run_queue_moving<true>();
1155 }
1156
1157 TEST(MPMCQueue, explicit_zero_capacity_fail) {
1158   ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
1159
1160   using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
1161   ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
1162 }
1163
1164 template <bool Dynamic>
1165 void testTryReadUntil() {
1166   MPMCQueue<int, std::atomic, Dynamic> q{1};
1167
1168   const auto wait = std::chrono::milliseconds(100);
1169   stop_watch<> watch;
1170   bool rets[2];
1171   int vals[2];
1172   std::vector<std::thread> threads;
1173   boost::barrier b{3};
1174   for (int i = 0; i < 2; i++) {
1175     threads.emplace_back([&, i] {
1176       b.wait();
1177       rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
1178     });
1179   }
1180
1181   b.wait();
1182   EXPECT_TRUE(q.write(42));
1183
1184   for (int i = 0; i < 2; i++) {
1185     threads[i].join();
1186   }
1187
1188   for (int i = 0; i < 2; i++) {
1189     int other = (i + 1) % 2;
1190     if (rets[i]) {
1191       EXPECT_EQ(42, vals[i]);
1192       EXPECT_FALSE(rets[other]);
1193     }
1194   }
1195
1196   EXPECT_TRUE(watch.elapsed(wait));
1197 }
1198
1199 template <bool Dynamic>
1200 void testTryWriteUntil() {
1201   MPMCQueue<int, std::atomic, Dynamic> q{1};
1202   EXPECT_TRUE(q.write(42));
1203
1204   const auto wait = std::chrono::milliseconds(100);
1205   stop_watch<> watch;
1206   bool rets[2];
1207   std::vector<std::thread> threads;
1208   boost::barrier b{3};
1209   for (int i = 0; i < 2; i++) {
1210     threads.emplace_back([&, i] {
1211       b.wait();
1212       rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
1213     });
1214   }
1215
1216   b.wait();
1217   int x;
1218   EXPECT_TRUE(q.read(x));
1219   EXPECT_EQ(42, x);
1220
1221   for (int i = 0; i < 2; i++) {
1222     threads[i].join();
1223   }
1224   EXPECT_TRUE(q.read(x));
1225
1226   for (int i = 0; i < 2; i++) {
1227     int other = (i + 1) % 2;
1228     if (rets[i]) {
1229       EXPECT_EQ(i, x);
1230       EXPECT_FALSE(rets[other]);
1231     }
1232   }
1233
1234   EXPECT_TRUE(watch.elapsed(wait));
1235 }
1236
1237 TEST(MPMCQueue, try_read_until) {
1238   testTryReadUntil<false>();
1239 }
1240
1241 TEST(MPMCQueue, try_read_until_dynamic) {
1242   testTryReadUntil<true>();
1243 }
1244
1245 TEST(MPMCQueue, try_write_until) {
1246   testTryWriteUntil<false>();
1247 }
1248
1249 TEST(MPMCQueue, try_write_until_dynamic) {
1250   testTryWriteUntil<true>();
1251 }