Convert a polling loop to a futex wait
[folly.git] / folly / test / MPMCQueueTest.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/test/DeterministicSchedule.h>
21
22 #include <boost/intrusive_ptr.hpp>
23 #include <memory>
24 #include <functional>
25 #include <thread>
26 #include <utility>
27 #include <unistd.h>
28 #include <sys/time.h>
29 #include <sys/resource.h>
30
31 #include <gtest/gtest.h>
32
33 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
34
35 using namespace folly;
36 using namespace detail;
37 using namespace test;
38 using std::chrono::time_point;
39 using std::chrono::steady_clock;
40 using std::chrono::seconds;
41 using std::chrono::milliseconds;
42 using std::string;
43 using std::make_unique;
44 using std::unique_ptr;
45 using std::vector;
46
47 typedef DeterministicSchedule DSched;
48
49 template <template<typename> class Atom>
50 void run_mt_sequencer_thread(
51     int numThreads,
52     int numOps,
53     uint32_t init,
54     TurnSequencer<Atom>& seq,
55     Atom<uint32_t>& spinThreshold,
56     int& prev,
57     int i) {
58   for (int op = i; op < numOps; op += numThreads) {
59     seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
60     EXPECT_EQ(prev, op - 1);
61     prev = op;
62     seq.completeTurn(init + op);
63   }
64 }
65
66 template <template<typename> class Atom>
67 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
68   TurnSequencer<Atom> seq(init);
69   Atom<uint32_t> spinThreshold(0);
70
71   int prev = -1;
72   vector<std::thread> threads(numThreads);
73   for (int i = 0; i < numThreads; ++i) {
74     threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
75           numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
76           std::ref(prev), i));
77   }
78
79   for (auto& thr : threads) {
80     DSched::join(thr);
81   }
82
83   EXPECT_EQ(prev, numOps - 1);
84 }
85
86 TEST(MPMCQueue, sequencer) {
87   run_mt_sequencer_test<std::atomic>(1, 100, 0);
88   run_mt_sequencer_test<std::atomic>(2, 100000, -100);
89   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
90 }
91
92 TEST(MPMCQueue, sequencer_emulated_futex) {
93   run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
94   run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
95   run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
96 }
97
98 TEST(MPMCQueue, sequencer_deterministic) {
99   DSched sched(DSched::uniform(0));
100   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
101   run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
102   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
103 }
104
105 template <typename T>
106 void runElementTypeTest(T&& src) {
107   MPMCQueue<T> cq(10);
108   cq.blockingWrite(std::move(src));
109   T dest;
110   cq.blockingRead(dest);
111   EXPECT_TRUE(cq.write(std::move(dest)));
112   EXPECT_TRUE(cq.read(dest));
113   auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
114   EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
115   EXPECT_TRUE(cq.read(dest));
116   auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
117   EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
118   EXPECT_TRUE(cq.read(dest));
119 }
120
121 struct RefCounted {
122   static __thread int active_instances;
123
124   mutable std::atomic<int> rc;
125
126   RefCounted() : rc(0) {
127     ++active_instances;
128   }
129
130   ~RefCounted() {
131     --active_instances;
132   }
133 };
134 __thread int RefCounted::active_instances;
135
136
137 void intrusive_ptr_add_ref(RefCounted const* p) {
138   p->rc++;
139 }
140
141 void intrusive_ptr_release(RefCounted const* p) {
142   if (--(p->rc) == 0) {
143     delete p;
144   }
145 }
146
147 TEST(MPMCQueue, lots_of_element_types) {
148   runElementTypeTest(10);
149   runElementTypeTest(string("abc"));
150   runElementTypeTest(std::make_pair(10, string("def")));
151   runElementTypeTest(vector<string>{{"abc"}});
152   runElementTypeTest(std::make_shared<char>('a'));
153   runElementTypeTest(folly::make_unique<char>('a'));
154   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
155   EXPECT_EQ(RefCounted::active_instances, 0);
156 }
157
158 TEST(MPMCQueue, single_thread_enqdeq) {
159   MPMCQueue<int> cq(10);
160
161   for (int pass = 0; pass < 10; ++pass) {
162     for (int i = 0; i < 10; ++i) {
163       EXPECT_TRUE(cq.write(i));
164     }
165     EXPECT_FALSE(cq.write(-1));
166     EXPECT_FALSE(cq.isEmpty());
167     EXPECT_EQ(cq.size(), 10);
168
169     for (int i = 0; i < 5; ++i) {
170       int dest = -1;
171       EXPECT_TRUE(cq.read(dest));
172       EXPECT_EQ(dest, i);
173     }
174     for (int i = 5; i < 10; ++i) {
175       int dest = -1;
176       cq.blockingRead(dest);
177       EXPECT_EQ(dest, i);
178     }
179     int dest = -1;
180     EXPECT_FALSE(cq.read(dest));
181     EXPECT_EQ(dest, -1);
182
183     EXPECT_TRUE(cq.isEmpty());
184     EXPECT_EQ(cq.size(), 0);
185   }
186 }
187
188 TEST(MPMCQueue, tryenq_capacity_test) {
189   for (size_t cap = 1; cap < 100; ++cap) {
190     MPMCQueue<int> cq(cap);
191     for (size_t i = 0; i < cap; ++i) {
192       EXPECT_TRUE(cq.write(i));
193     }
194     EXPECT_FALSE(cq.write(100));
195   }
196 }
197
198 TEST(MPMCQueue, enq_capacity_test) {
199   for (auto cap : { 1, 100, 10000 }) {
200     MPMCQueue<int> cq(cap);
201     for (int i = 0; i < cap; ++i) {
202       cq.blockingWrite(i);
203     }
204     int t = 0;
205     int when;
206     auto thr = std::thread([&]{
207       cq.blockingWrite(100);
208       when = t;
209     });
210     usleep(2000);
211     t = 1;
212     int dummy;
213     cq.blockingRead(dummy);
214     thr.join();
215     EXPECT_EQ(when, 1);
216   }
217 }
218
219 template <template<typename> class Atom>
220 void runTryEnqDeqThread(
221     int numThreads,
222     int n, /*numOps*/
223     MPMCQueue<int, Atom>& cq,
224     std::atomic<uint64_t>& sum,
225     int t) {
226   uint64_t threadSum = 0;
227   int src = t;
228   // received doesn't reflect any actual values, we just start with
229   // t and increment by numThreads to get the rounding of termination
230   // correct if numThreads doesn't evenly divide numOps
231   int received = t;
232   while (src < n || received < n) {
233     if (src < n && cq.write(src)) {
234       src += numThreads;
235     }
236
237     int dst;
238     if (received < n && cq.read(dst)) {
239       received += numThreads;
240       threadSum += dst;
241     }
242   }
243   sum += threadSum;
244 }
245
246 template <template<typename> class Atom>
247 void runTryEnqDeqTest(int numThreads, int numOps) {
248   // write and read aren't linearizable, so we don't have
249   // hard guarantees on their individual behavior.  We can still test
250   // correctness in aggregate
251   MPMCQueue<int,Atom> cq(numThreads);
252
253   uint64_t n = numOps;
254   vector<std::thread> threads(numThreads);
255   std::atomic<uint64_t> sum(0);
256   for (int t = 0; t < numThreads; ++t) {
257     threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
258           numThreads, n, std::ref(cq), std::ref(sum), t));
259   }
260   for (auto& t : threads) {
261     DSched::join(t);
262   }
263   EXPECT_TRUE(cq.isEmpty());
264   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
265 }
266
267 TEST(MPMCQueue, mt_try_enq_deq) {
268   int nts[] = { 1, 3, 100 };
269
270   int n = 100000;
271   for (int nt : nts) {
272     runTryEnqDeqTest<std::atomic>(nt, n);
273   }
274 }
275
276 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
277   int nts[] = { 1, 3, 100 };
278
279   int n = 100000;
280   for (int nt : nts) {
281     runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
282   }
283 }
284
285 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
286   int nts[] = { 3, 10 };
287
288   long seed = 0;
289   LOG(INFO) << "using seed " << seed;
290
291   int n = 1000;
292   for (int nt : nts) {
293     {
294       DSched sched(DSched::uniform(seed));
295       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
296     }
297     {
298       DSched sched(DSched::uniformSubset(seed, 2));
299       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
300     }
301   }
302 }
303
304 uint64_t nowMicro() {
305   timeval tv;
306   gettimeofday(&tv, 0);
307   return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
308 }
309
310 template <typename Q>
311 struct WriteMethodCaller {
312   WriteMethodCaller() {}
313   virtual ~WriteMethodCaller() = default;
314   virtual bool callWrite(Q& q, int i) = 0;
315   virtual string methodName() = 0;
316 };
317
318 template <typename Q>
319 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
320   bool callWrite(Q& q, int i) override {
321     q.blockingWrite(i);
322     return true;
323   }
324   string methodName() override { return "blockingWrite"; }
325 };
326
327 template <typename Q>
328 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
329   bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
330   string methodName() override { return "writeIfNotFull"; }
331 };
332
333 template <typename Q>
334 struct WriteCaller : public WriteMethodCaller<Q> {
335   bool callWrite(Q& q, int i) override { return q.write(i); }
336   string methodName() override { return "write"; }
337 };
338
339 template <typename Q,
340           class Clock = steady_clock,
341           class Duration = typename Clock::duration>
342 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
343   const Duration duration_;
344   explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
345   bool callWrite(Q& q, int i) override {
346     auto then = Clock::now() + duration_;
347     return q.tryWriteUntil(then, i);
348   }
349   string methodName() override {
350     return folly::sformat(
351         "tryWriteUntil({}ms)",
352         std::chrono::duration_cast<milliseconds>(duration_).count());
353   }
354 };
355
356 template <typename Q>
357 string producerConsumerBench(Q&& queue,
358                              string qName,
359                              int numProducers,
360                              int numConsumers,
361                              int numOps,
362                              WriteMethodCaller<Q>& writer,
363                              bool ignoreContents = false) {
364   Q& q = queue;
365
366   struct rusage beginUsage;
367   getrusage(RUSAGE_SELF, &beginUsage);
368
369   auto beginMicro = nowMicro();
370
371   uint64_t n = numOps;
372   std::atomic<uint64_t> sum(0);
373   std::atomic<uint64_t> failed(0);
374
375   vector<std::thread> producers(numProducers);
376   for (int t = 0; t < numProducers; ++t) {
377     producers[t] = DSched::thread([&,t]{
378       for (int i = t; i < numOps; i += numProducers) {
379         while (!writer.callWrite(q, i)) {
380           ++failed;
381         }
382       }
383     });
384   }
385
386   vector<std::thread> consumers(numConsumers);
387   for (int t = 0; t < numConsumers; ++t) {
388     consumers[t] = DSched::thread([&,t]{
389       uint64_t localSum = 0;
390       for (int i = t; i < numOps; i += numConsumers) {
391         int dest = -1;
392         q.blockingRead(dest);
393         EXPECT_FALSE(dest == -1);
394         localSum += dest;
395       }
396       sum += localSum;
397     });
398   }
399
400   for (auto& t : producers) {
401     DSched::join(t);
402   }
403   for (auto& t : consumers) {
404     DSched::join(t);
405   }
406   if (!ignoreContents) {
407     EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
408   }
409
410   auto endMicro = nowMicro();
411
412   struct rusage endUsage;
413   getrusage(RUSAGE_SELF, &endUsage);
414
415   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
416   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
417       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
418   uint64_t failures = failed;
419
420   return folly::sformat(
421       "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
422       "handoff, {} failures",
423       qName,
424       numProducers,
425       writer.methodName(),
426       numConsumers,
427       nanosPer,
428       csw,
429       n,
430       failures);
431 }
432
433 TEST(MPMCQueue, mt_prod_cons_deterministic) {
434   // we use the Bench method, but perf results are meaningless under DSched
435   DSched sched(DSched::uniform(0));
436
437   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
438       callers;
439   callers.emplace_back(
440       make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
441   callers.emplace_back(
442       make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
443   callers.emplace_back(
444       make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
445   callers.emplace_back(
446       make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
447           milliseconds(1)));
448   callers.emplace_back(
449       make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
450           seconds(2)));
451
452   for (const auto& caller : callers) {
453     LOG(INFO)
454         << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
455                                  "MPMCQueue<int, DeterministicAtomic>(10)",
456                                  1,
457                                  1,
458                                  1000,
459                                  *caller);
460     LOG(INFO)
461         << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
462                                  "MPMCQueue<int, DeterministicAtomic>(100)",
463                                  10,
464                                  10,
465                                  1000,
466                                  *caller);
467     LOG(INFO)
468         << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
469                                  "MPMCQueue<int, DeterministicAtomic>(10)",
470                                  1,
471                                  1,
472                                  1000,
473                                  *caller);
474     LOG(INFO)
475         << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
476                                  "MPMCQueue<int, DeterministicAtomic>(100)",
477                                  10,
478                                  10,
479                                  1000,
480                                  *caller);
481     LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
482                                        "MPMCQueue<int, DeterministicAtomic>(1)",
483                                        10,
484                                        10,
485                                        1000,
486                                        *caller);
487   }
488 }
489
490 #define PC_BENCH(q, np, nc, ...) \
491     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
492
493 TEST(MPMCQueue, mt_prod_cons) {
494   int n = 100000;
495   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
496   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
497   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
498   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
499   callers.emplace_back(
500       make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
501   callers.emplace_back(
502       make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
503   for (const auto& caller : callers) {
504     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
505     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
506     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
507     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
508     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
509     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
510     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
511     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
512     LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
513   }
514 }
515
516 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
517   int n = 100000;
518   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
519       callers;
520   callers.emplace_back(
521       make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
522   callers.emplace_back(
523       make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
524   callers.emplace_back(
525       make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
526   callers.emplace_back(
527       make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
528           milliseconds(1)));
529   callers.emplace_back(
530       make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
531           seconds(2)));
532   for (const auto& caller : callers) {
533     LOG(INFO) << PC_BENCH(
534         (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
535     LOG(INFO) << PC_BENCH(
536         (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
537     LOG(INFO) << PC_BENCH(
538         (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
539     LOG(INFO) << PC_BENCH(
540         (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
541     LOG(INFO) << PC_BENCH(
542         (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
543     LOG(INFO) << PC_BENCH(
544         (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
545     LOG(INFO) << PC_BENCH(
546         (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
547     LOG(INFO) << PC_BENCH(
548         (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
549     LOG(INFO) << PC_BENCH(
550         (MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
551   }
552 }
553
554 template <template <typename> class Atom>
555 void runNeverFailThread(int numThreads,
556                         int n, /*numOps*/
557                         MPMCQueue<int, Atom>& cq,
558                         std::atomic<uint64_t>& sum,
559                         int t) {
560   uint64_t threadSum = 0;
561   for (int i = t; i < n; i += numThreads) {
562     // enq + deq
563     EXPECT_TRUE(cq.writeIfNotFull(i));
564
565     int dest = -1;
566     EXPECT_TRUE(cq.readIfNotEmpty(dest));
567     EXPECT_TRUE(dest >= 0);
568     threadSum += dest;
569   }
570   sum += threadSum;
571 }
572
573 template <template <typename> class Atom>
574 uint64_t runNeverFailTest(int numThreads, int numOps) {
575   // always #enq >= #deq
576   MPMCQueue<int, Atom> cq(numThreads);
577
578   uint64_t n = numOps;
579   auto beginMicro = nowMicro();
580
581   vector<std::thread> threads(numThreads);
582   std::atomic<uint64_t> sum(0);
583   for (int t = 0; t < numThreads; ++t) {
584     threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
585                                           numThreads,
586                                           n,
587                                           std::ref(cq),
588                                           std::ref(sum),
589                                           t));
590   }
591   for (auto& t : threads) {
592     DSched::join(t);
593   }
594   EXPECT_TRUE(cq.isEmpty());
595   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
596
597   return nowMicro() - beginMicro;
598 }
599
600 TEST(MPMCQueue, mt_never_fail) {
601   int nts[] = {1, 3, 100};
602
603   int n = 100000;
604   for (int nt : nts) {
605     uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
606     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
607               << " threads";
608   }
609 }
610
611 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
612   int nts[] = {1, 3, 100};
613
614   int n = 100000;
615   for (int nt : nts) {
616     uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
617     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
618               << " threads";
619   }
620 }
621
622 TEST(MPMCQueue, mt_never_fail_deterministic) {
623   int nts[] = {3, 10};
624
625   long seed = 0; // nowMicro() % 10000;
626   LOG(INFO) << "using seed " << seed;
627
628   int n = 1000;
629   for (int nt : nts) {
630     {
631       DSched sched(DSched::uniform(seed));
632       runNeverFailTest<DeterministicAtomic>(nt, n);
633     }
634     {
635       DSched sched(DSched::uniformSubset(seed, 2));
636       runNeverFailTest<DeterministicAtomic>(nt, n);
637     }
638   }
639 }
640
641 template <class Clock, template <typename> class Atom>
642 void runNeverFailUntilThread(int numThreads,
643                              int n, /*numOps*/
644                              MPMCQueue<int, Atom>& cq,
645                              std::atomic<uint64_t>& sum,
646                              int t) {
647   uint64_t threadSum = 0;
648   for (int i = t; i < n; i += numThreads) {
649     // enq + deq
650     auto soon = Clock::now() + std::chrono::seconds(1);
651     EXPECT_TRUE(cq.tryWriteUntil(soon, i));
652
653     int dest = -1;
654     EXPECT_TRUE(cq.readIfNotEmpty(dest));
655     EXPECT_TRUE(dest >= 0);
656     threadSum += dest;
657   }
658   sum += threadSum;
659 }
660
661 template <class Clock, template <typename> class Atom>
662 uint64_t runNeverFailTest(int numThreads, int numOps) {
663   // always #enq >= #deq
664   MPMCQueue<int, Atom> cq(numThreads);
665
666   uint64_t n = numOps;
667   auto beginMicro = nowMicro();
668
669   vector<std::thread> threads(numThreads);
670   std::atomic<uint64_t> sum(0);
671   for (int t = 0; t < numThreads; ++t) {
672     threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
673                                           numThreads,
674                                           n,
675                                           std::ref(cq),
676                                           std::ref(sum),
677                                           t));
678   }
679   for (auto& t : threads) {
680     DSched::join(t);
681   }
682   EXPECT_TRUE(cq.isEmpty());
683   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
684
685   return nowMicro() - beginMicro;
686 }
687
688 TEST(MPMCQueue, mt_never_fail_until_system) {
689   int nts[] = {1, 3, 100};
690
691   int n = 100000;
692   for (int nt : nts) {
693     uint64_t elapsed =
694         runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
695     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
696               << " threads";
697   }
698 }
699
700 TEST(MPMCQueue, mt_never_fail_until_steady) {
701   int nts[] = {1, 3, 100};
702
703   int n = 100000;
704   for (int nt : nts) {
705     uint64_t elapsed =
706         runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
707     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
708               << " threads";
709   }
710 }
711
712 enum LifecycleEvent {
713   NOTHING = -1,
714   DEFAULT_CONSTRUCTOR,
715   COPY_CONSTRUCTOR,
716   MOVE_CONSTRUCTOR,
717   TWO_ARG_CONSTRUCTOR,
718   COPY_OPERATOR,
719   MOVE_OPERATOR,
720   DESTRUCTOR,
721   MAX_LIFECYCLE_EVENT
722 };
723
724 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
725 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
726
727 static int lc_outstanding() {
728   return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
729       lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
730       lc_counts[DESTRUCTOR];
731 }
732
733 static void lc_snap() {
734   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
735     lc_prev[i] = lc_counts[i];
736   }
737 }
738
739 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
740
741 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
742   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
743     int delta = i == what || i == what2 ? 1 : 0;
744     EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
745         << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
746         << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
747         << ", from line " << lineno;
748   }
749   lc_snap();
750 }
751
752 template <typename R>
753 struct Lifecycle {
754   typedef R IsRelocatable;
755
756   bool constructed;
757
758   Lifecycle() noexcept : constructed(true) {
759     ++lc_counts[DEFAULT_CONSTRUCTOR];
760   }
761
762   explicit Lifecycle(int /* n */, char const* /* s */) noexcept
763       : constructed(true) {
764     ++lc_counts[TWO_ARG_CONSTRUCTOR];
765   }
766
767   Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
768     ++lc_counts[COPY_CONSTRUCTOR];
769   }
770
771   Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
772     ++lc_counts[MOVE_CONSTRUCTOR];
773   }
774
775   Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
776     ++lc_counts[COPY_OPERATOR];
777     return *this;
778   }
779
780   Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
781     ++lc_counts[MOVE_OPERATOR];
782     return *this;
783   }
784
785   ~Lifecycle() noexcept {
786     ++lc_counts[DESTRUCTOR];
787     assert(lc_outstanding() >= 0);
788     assert(constructed);
789     constructed = false;
790   }
791 };
792
793 template <typename R>
794 void runPerfectForwardingTest() {
795   lc_snap();
796   EXPECT_EQ(lc_outstanding(), 0);
797
798   {
799     MPMCQueue<Lifecycle<R>> queue(50);
800     LIFECYCLE_STEP(NOTHING);
801
802     for (int pass = 0; pass < 10; ++pass) {
803       for (int i = 0; i < 10; ++i) {
804         queue.blockingWrite();
805         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
806
807         queue.blockingWrite(1, "one");
808         LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
809
810         {
811           Lifecycle<R> src;
812           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
813           queue.blockingWrite(std::move(src));
814           LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
815         }
816         LIFECYCLE_STEP(DESTRUCTOR);
817
818         {
819           Lifecycle<R> src;
820           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
821           queue.blockingWrite(src);
822           LIFECYCLE_STEP(COPY_CONSTRUCTOR);
823         }
824         LIFECYCLE_STEP(DESTRUCTOR);
825
826         EXPECT_TRUE(queue.write());
827         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
828       }
829
830       EXPECT_EQ(queue.size(), 50);
831       EXPECT_FALSE(queue.write(2, "two"));
832       LIFECYCLE_STEP(NOTHING);
833
834       for (int i = 0; i < 50; ++i) {
835         {
836           Lifecycle<R> node;
837           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
838
839           queue.blockingRead(node);
840           if (R::value) {
841             // relocatable, moved via memcpy
842             LIFECYCLE_STEP(DESTRUCTOR);
843           } else {
844             LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
845           }
846         }
847         LIFECYCLE_STEP(DESTRUCTOR);
848       }
849
850       EXPECT_EQ(queue.size(), 0);
851     }
852
853     // put one element back before destruction
854     {
855       Lifecycle<R> src(3, "three");
856       LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
857       queue.write(std::move(src));
858       LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
859     }
860     LIFECYCLE_STEP(DESTRUCTOR); // destroy src
861   }
862   LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
863
864   EXPECT_EQ(lc_outstanding(), 0);
865 }
866
867 TEST(MPMCQueue, perfect_forwarding) {
868   runPerfectForwardingTest<std::false_type>();
869 }
870
871 TEST(MPMCQueue, perfect_forwarding_relocatable) {
872   runPerfectForwardingTest<std::true_type>();
873 }
874
875 TEST(MPMCQueue, queue_moving) {
876   lc_snap();
877   EXPECT_EQ(lc_outstanding(), 0);
878
879   {
880     MPMCQueue<Lifecycle<std::false_type>> a(50);
881     LIFECYCLE_STEP(NOTHING);
882
883     a.blockingWrite();
884     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
885
886     // move constructor
887     MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
888     LIFECYCLE_STEP(NOTHING);
889     EXPECT_EQ(a.capacity(), 0);
890     EXPECT_EQ(a.size(), 0);
891     EXPECT_EQ(b.capacity(), 50);
892     EXPECT_EQ(b.size(), 1);
893
894     b.blockingWrite();
895     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
896
897     // move operator
898     MPMCQueue<Lifecycle<std::false_type>> c;
899     LIFECYCLE_STEP(NOTHING);
900     c = std::move(b);
901     LIFECYCLE_STEP(NOTHING);
902     EXPECT_EQ(c.capacity(), 50);
903     EXPECT_EQ(c.size(), 2);
904
905     {
906       Lifecycle<std::false_type> dst;
907       LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
908       c.blockingRead(dst);
909       LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
910
911       {
912         // swap
913         MPMCQueue<Lifecycle<std::false_type>> d(10);
914         LIFECYCLE_STEP(NOTHING);
915         std::swap(c, d);
916         LIFECYCLE_STEP(NOTHING);
917         EXPECT_EQ(c.capacity(), 10);
918         EXPECT_TRUE(c.isEmpty());
919         EXPECT_EQ(d.capacity(), 50);
920         EXPECT_EQ(d.size(), 1);
921
922         d.blockingRead(dst);
923         LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
924
925         c.blockingWrite(dst);
926         LIFECYCLE_STEP(COPY_CONSTRUCTOR);
927
928         d.blockingWrite(std::move(dst));
929         LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
930       } // d goes out of scope
931       LIFECYCLE_STEP(DESTRUCTOR);
932     } // dst goes out of scope
933     LIFECYCLE_STEP(DESTRUCTOR);
934   } // c goes out of scope
935   LIFECYCLE_STEP(DESTRUCTOR);
936 }
937
938 TEST(MPMCQueue, explicit_zero_capacity_fail) {
939   ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
940 }