Include <folly/portability/SysTime.h> rather than <sys/time.h>
[folly.git] / folly / test / MPMCQueueTest.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/SysTime.h>
21 #include <folly/test/DeterministicSchedule.h>
22
23 #include <boost/intrusive_ptr.hpp>
24 #include <memory>
25 #include <functional>
26 #include <thread>
27 #include <utility>
28 #include <unistd.h>
29 #include <sys/resource.h>
30
31 #include <gtest/gtest.h>
32
33 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
34
35 using namespace folly;
36 using namespace detail;
37 using namespace test;
38 using std::chrono::time_point;
39 using std::chrono::steady_clock;
40 using std::chrono::seconds;
41 using std::chrono::milliseconds;
42 using std::string;
43 using std::unique_ptr;
44 using std::vector;
45
46 typedef DeterministicSchedule DSched;
47
48 template <template<typename> class Atom>
49 void run_mt_sequencer_thread(
50     int numThreads,
51     int numOps,
52     uint32_t init,
53     TurnSequencer<Atom>& seq,
54     Atom<uint32_t>& spinThreshold,
55     int& prev,
56     int i) {
57   for (int op = i; op < numOps; op += numThreads) {
58     seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
59     EXPECT_EQ(prev, op - 1);
60     prev = op;
61     seq.completeTurn(init + op);
62   }
63 }
64
65 template <template<typename> class Atom>
66 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
67   TurnSequencer<Atom> seq(init);
68   Atom<uint32_t> spinThreshold(0);
69
70   int prev = -1;
71   vector<std::thread> threads(numThreads);
72   for (int i = 0; i < numThreads; ++i) {
73     threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
74           numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
75           std::ref(prev), i));
76   }
77
78   for (auto& thr : threads) {
79     DSched::join(thr);
80   }
81
82   EXPECT_EQ(prev, numOps - 1);
83 }
84
85 TEST(MPMCQueue, sequencer) {
86   run_mt_sequencer_test<std::atomic>(1, 100, 0);
87   run_mt_sequencer_test<std::atomic>(2, 100000, -100);
88   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
89 }
90
91 TEST(MPMCQueue, sequencer_emulated_futex) {
92   run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
93   run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
94   run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
95 }
96
97 TEST(MPMCQueue, sequencer_deterministic) {
98   DSched sched(DSched::uniform(0));
99   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
100   run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
101   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
102 }
103
104 template <typename T>
105 void runElementTypeTest(T&& src) {
106   MPMCQueue<T> cq(10);
107   cq.blockingWrite(std::forward<T>(src));
108   T dest;
109   cq.blockingRead(dest);
110   EXPECT_TRUE(cq.write(std::move(dest)));
111   EXPECT_TRUE(cq.read(dest));
112   auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
113   EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
114   EXPECT_TRUE(cq.read(dest));
115   auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
116   EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
117   EXPECT_TRUE(cq.read(dest));
118 }
119
120 struct RefCounted {
121   static __thread int active_instances;
122
123   mutable std::atomic<int> rc;
124
125   RefCounted() : rc(0) {
126     ++active_instances;
127   }
128
129   ~RefCounted() {
130     --active_instances;
131   }
132 };
133 __thread int RefCounted::active_instances;
134
135
136 void intrusive_ptr_add_ref(RefCounted const* p) {
137   p->rc++;
138 }
139
140 void intrusive_ptr_release(RefCounted const* p) {
141   if (--(p->rc) == 0) {
142     delete p;
143   }
144 }
145
146 TEST(MPMCQueue, lots_of_element_types) {
147   runElementTypeTest(10);
148   runElementTypeTest(string("abc"));
149   runElementTypeTest(std::make_pair(10, string("def")));
150   runElementTypeTest(vector<string>{{"abc"}});
151   runElementTypeTest(std::make_shared<char>('a'));
152   runElementTypeTest(folly::make_unique<char>('a'));
153   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
154   EXPECT_EQ(RefCounted::active_instances, 0);
155 }
156
157 TEST(MPMCQueue, single_thread_enqdeq) {
158   MPMCQueue<int> cq(10);
159
160   for (int pass = 0; pass < 10; ++pass) {
161     for (int i = 0; i < 10; ++i) {
162       EXPECT_TRUE(cq.write(i));
163     }
164     EXPECT_FALSE(cq.write(-1));
165     EXPECT_FALSE(cq.isEmpty());
166     EXPECT_EQ(cq.size(), 10);
167
168     for (int i = 0; i < 5; ++i) {
169       int dest = -1;
170       EXPECT_TRUE(cq.read(dest));
171       EXPECT_EQ(dest, i);
172     }
173     for (int i = 5; i < 10; ++i) {
174       int dest = -1;
175       cq.blockingRead(dest);
176       EXPECT_EQ(dest, i);
177     }
178     int dest = -1;
179     EXPECT_FALSE(cq.read(dest));
180     EXPECT_EQ(dest, -1);
181
182     EXPECT_TRUE(cq.isEmpty());
183     EXPECT_EQ(cq.size(), 0);
184   }
185 }
186
187 TEST(MPMCQueue, tryenq_capacity_test) {
188   for (size_t cap = 1; cap < 100; ++cap) {
189     MPMCQueue<int> cq(cap);
190     for (size_t i = 0; i < cap; ++i) {
191       EXPECT_TRUE(cq.write(i));
192     }
193     EXPECT_FALSE(cq.write(100));
194   }
195 }
196
197 TEST(MPMCQueue, enq_capacity_test) {
198   for (auto cap : { 1, 100, 10000 }) {
199     MPMCQueue<int> cq(cap);
200     for (int i = 0; i < cap; ++i) {
201       cq.blockingWrite(i);
202     }
203     int t = 0;
204     int when;
205     auto thr = std::thread([&]{
206       cq.blockingWrite(100);
207       when = t;
208     });
209     usleep(2000);
210     t = 1;
211     int dummy;
212     cq.blockingRead(dummy);
213     thr.join();
214     EXPECT_EQ(when, 1);
215   }
216 }
217
218 template <template<typename> class Atom>
219 void runTryEnqDeqThread(
220     int numThreads,
221     int n, /*numOps*/
222     MPMCQueue<int, Atom>& cq,
223     std::atomic<uint64_t>& sum,
224     int t) {
225   uint64_t threadSum = 0;
226   int src = t;
227   // received doesn't reflect any actual values, we just start with
228   // t and increment by numThreads to get the rounding of termination
229   // correct if numThreads doesn't evenly divide numOps
230   int received = t;
231   while (src < n || received < n) {
232     if (src < n && cq.write(src)) {
233       src += numThreads;
234     }
235
236     int dst;
237     if (received < n && cq.read(dst)) {
238       received += numThreads;
239       threadSum += dst;
240     }
241   }
242   sum += threadSum;
243 }
244
245 template <template<typename> class Atom>
246 void runTryEnqDeqTest(int numThreads, int numOps) {
247   // write and read aren't linearizable, so we don't have
248   // hard guarantees on their individual behavior.  We can still test
249   // correctness in aggregate
250   MPMCQueue<int,Atom> cq(numThreads);
251
252   uint64_t n = numOps;
253   vector<std::thread> threads(numThreads);
254   std::atomic<uint64_t> sum(0);
255   for (int t = 0; t < numThreads; ++t) {
256     threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
257           numThreads, n, std::ref(cq), std::ref(sum), t));
258   }
259   for (auto& t : threads) {
260     DSched::join(t);
261   }
262   EXPECT_TRUE(cq.isEmpty());
263   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
264 }
265
266 TEST(MPMCQueue, mt_try_enq_deq) {
267   int nts[] = { 1, 3, 100 };
268
269   int n = 100000;
270   for (int nt : nts) {
271     runTryEnqDeqTest<std::atomic>(nt, n);
272   }
273 }
274
275 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
276   int nts[] = { 1, 3, 100 };
277
278   int n = 100000;
279   for (int nt : nts) {
280     runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
281   }
282 }
283
284 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
285   int nts[] = { 3, 10 };
286
287   long seed = 0;
288   LOG(INFO) << "using seed " << seed;
289
290   int n = 1000;
291   for (int nt : nts) {
292     {
293       DSched sched(DSched::uniform(seed));
294       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
295     }
296     {
297       DSched sched(DSched::uniformSubset(seed, 2));
298       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
299     }
300   }
301 }
302
303 uint64_t nowMicro() {
304   timeval tv;
305   gettimeofday(&tv, 0);
306   return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
307 }
308
309 template <typename Q>
310 struct WriteMethodCaller {
311   WriteMethodCaller() {}
312   virtual ~WriteMethodCaller() = default;
313   virtual bool callWrite(Q& q, int i) = 0;
314   virtual string methodName() = 0;
315 };
316
317 template <typename Q>
318 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
319   bool callWrite(Q& q, int i) override {
320     q.blockingWrite(i);
321     return true;
322   }
323   string methodName() override { return "blockingWrite"; }
324 };
325
326 template <typename Q>
327 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
328   bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
329   string methodName() override { return "writeIfNotFull"; }
330 };
331
332 template <typename Q>
333 struct WriteCaller : public WriteMethodCaller<Q> {
334   bool callWrite(Q& q, int i) override { return q.write(i); }
335   string methodName() override { return "write"; }
336 };
337
338 template <typename Q,
339           class Clock = steady_clock,
340           class Duration = typename Clock::duration>
341 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
342   const Duration duration_;
343   explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
344   bool callWrite(Q& q, int i) override {
345     auto then = Clock::now() + duration_;
346     return q.tryWriteUntil(then, i);
347   }
348   string methodName() override {
349     return folly::sformat(
350         "tryWriteUntil({}ms)",
351         std::chrono::duration_cast<milliseconds>(duration_).count());
352   }
353 };
354
355 template <typename Q>
356 string producerConsumerBench(Q&& queue,
357                              string qName,
358                              int numProducers,
359                              int numConsumers,
360                              int numOps,
361                              WriteMethodCaller<Q>& writer,
362                              bool ignoreContents = false) {
363   Q& q = queue;
364
365   struct rusage beginUsage;
366   getrusage(RUSAGE_SELF, &beginUsage);
367
368   auto beginMicro = nowMicro();
369
370   uint64_t n = numOps;
371   std::atomic<uint64_t> sum(0);
372   std::atomic<uint64_t> failed(0);
373
374   vector<std::thread> producers(numProducers);
375   for (int t = 0; t < numProducers; ++t) {
376     producers[t] = DSched::thread([&,t]{
377       for (int i = t; i < numOps; i += numProducers) {
378         while (!writer.callWrite(q, i)) {
379           ++failed;
380         }
381       }
382     });
383   }
384
385   vector<std::thread> consumers(numConsumers);
386   for (int t = 0; t < numConsumers; ++t) {
387     consumers[t] = DSched::thread([&,t]{
388       uint64_t localSum = 0;
389       for (int i = t; i < numOps; i += numConsumers) {
390         int dest = -1;
391         q.blockingRead(dest);
392         EXPECT_FALSE(dest == -1);
393         localSum += dest;
394       }
395       sum += localSum;
396     });
397   }
398
399   for (auto& t : producers) {
400     DSched::join(t);
401   }
402   for (auto& t : consumers) {
403     DSched::join(t);
404   }
405   if (!ignoreContents) {
406     EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
407   }
408
409   auto endMicro = nowMicro();
410
411   struct rusage endUsage;
412   getrusage(RUSAGE_SELF, &endUsage);
413
414   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
415   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
416       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
417   uint64_t failures = failed;
418
419   return folly::sformat(
420       "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
421       "handoff, {} failures",
422       qName,
423       numProducers,
424       writer.methodName(),
425       numConsumers,
426       nanosPer,
427       csw,
428       n,
429       failures);
430 }
431
432 TEST(MPMCQueue, mt_prod_cons_deterministic) {
433   // we use the Bench method, but perf results are meaningless under DSched
434   DSched sched(DSched::uniform(0));
435
436   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
437       callers;
438   callers.emplace_back(
439       make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
440   callers.emplace_back(
441       make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
442   callers.emplace_back(
443       make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
444   callers.emplace_back(
445       make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
446           milliseconds(1)));
447   callers.emplace_back(
448       make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
449           seconds(2)));
450
451   for (const auto& caller : callers) {
452     LOG(INFO)
453         << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
454                                  "MPMCQueue<int, DeterministicAtomic>(10)",
455                                  1,
456                                  1,
457                                  1000,
458                                  *caller);
459     LOG(INFO)
460         << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
461                                  "MPMCQueue<int, DeterministicAtomic>(100)",
462                                  10,
463                                  10,
464                                  1000,
465                                  *caller);
466     LOG(INFO)
467         << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
468                                  "MPMCQueue<int, DeterministicAtomic>(10)",
469                                  1,
470                                  1,
471                                  1000,
472                                  *caller);
473     LOG(INFO)
474         << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
475                                  "MPMCQueue<int, DeterministicAtomic>(100)",
476                                  10,
477                                  10,
478                                  1000,
479                                  *caller);
480     LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
481                                        "MPMCQueue<int, DeterministicAtomic>(1)",
482                                        10,
483                                        10,
484                                        1000,
485                                        *caller);
486   }
487 }
488
489 #define PC_BENCH(q, np, nc, ...) \
490     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
491
492 TEST(MPMCQueue, mt_prod_cons) {
493   int n = 100000;
494   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
495   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
496   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
497   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
498   callers.emplace_back(
499       make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
500   callers.emplace_back(
501       make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
502   for (const auto& caller : callers) {
503     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
504     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
505     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
506     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
507     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
508     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
509     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
510     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
511     LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
512   }
513 }
514
515 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
516   int n = 100000;
517   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
518       callers;
519   callers.emplace_back(
520       make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
521   callers.emplace_back(
522       make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
523   callers.emplace_back(
524       make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
525   callers.emplace_back(
526       make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
527           milliseconds(1)));
528   callers.emplace_back(
529       make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
530           seconds(2)));
531   for (const auto& caller : callers) {
532     LOG(INFO) << PC_BENCH(
533         (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
534     LOG(INFO) << PC_BENCH(
535         (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
536     LOG(INFO) << PC_BENCH(
537         (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
538     LOG(INFO) << PC_BENCH(
539         (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
540     LOG(INFO) << PC_BENCH(
541         (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
542     LOG(INFO) << PC_BENCH(
543         (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
544     LOG(INFO) << PC_BENCH(
545         (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
546     LOG(INFO) << PC_BENCH(
547         (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
548     LOG(INFO) << PC_BENCH(
549         (MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
550   }
551 }
552
553 template <template <typename> class Atom>
554 void runNeverFailThread(int numThreads,
555                         int n, /*numOps*/
556                         MPMCQueue<int, Atom>& cq,
557                         std::atomic<uint64_t>& sum,
558                         int t) {
559   uint64_t threadSum = 0;
560   for (int i = t; i < n; i += numThreads) {
561     // enq + deq
562     EXPECT_TRUE(cq.writeIfNotFull(i));
563
564     int dest = -1;
565     EXPECT_TRUE(cq.readIfNotEmpty(dest));
566     EXPECT_TRUE(dest >= 0);
567     threadSum += dest;
568   }
569   sum += threadSum;
570 }
571
572 template <template <typename> class Atom>
573 uint64_t runNeverFailTest(int numThreads, int numOps) {
574   // always #enq >= #deq
575   MPMCQueue<int, Atom> cq(numThreads);
576
577   uint64_t n = numOps;
578   auto beginMicro = nowMicro();
579
580   vector<std::thread> threads(numThreads);
581   std::atomic<uint64_t> sum(0);
582   for (int t = 0; t < numThreads; ++t) {
583     threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
584                                           numThreads,
585                                           n,
586                                           std::ref(cq),
587                                           std::ref(sum),
588                                           t));
589   }
590   for (auto& t : threads) {
591     DSched::join(t);
592   }
593   EXPECT_TRUE(cq.isEmpty());
594   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
595
596   return nowMicro() - beginMicro;
597 }
598
599 TEST(MPMCQueue, mt_never_fail) {
600   int nts[] = {1, 3, 100};
601
602   int n = 100000;
603   for (int nt : nts) {
604     uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
605     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
606               << " threads";
607   }
608 }
609
610 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
611   int nts[] = {1, 3, 100};
612
613   int n = 100000;
614   for (int nt : nts) {
615     uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
616     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
617               << " threads";
618   }
619 }
620
621 TEST(MPMCQueue, mt_never_fail_deterministic) {
622   int nts[] = {3, 10};
623
624   long seed = 0; // nowMicro() % 10000;
625   LOG(INFO) << "using seed " << seed;
626
627   int n = 1000;
628   for (int nt : nts) {
629     {
630       DSched sched(DSched::uniform(seed));
631       runNeverFailTest<DeterministicAtomic>(nt, n);
632     }
633     {
634       DSched sched(DSched::uniformSubset(seed, 2));
635       runNeverFailTest<DeterministicAtomic>(nt, n);
636     }
637   }
638 }
639
640 template <class Clock, template <typename> class Atom>
641 void runNeverFailUntilThread(int numThreads,
642                              int n, /*numOps*/
643                              MPMCQueue<int, Atom>& cq,
644                              std::atomic<uint64_t>& sum,
645                              int t) {
646   uint64_t threadSum = 0;
647   for (int i = t; i < n; i += numThreads) {
648     // enq + deq
649     auto soon = Clock::now() + std::chrono::seconds(1);
650     EXPECT_TRUE(cq.tryWriteUntil(soon, i));
651
652     int dest = -1;
653     EXPECT_TRUE(cq.readIfNotEmpty(dest));
654     EXPECT_TRUE(dest >= 0);
655     threadSum += dest;
656   }
657   sum += threadSum;
658 }
659
660 template <class Clock, template <typename> class Atom>
661 uint64_t runNeverFailTest(int numThreads, int numOps) {
662   // always #enq >= #deq
663   MPMCQueue<int, Atom> cq(numThreads);
664
665   uint64_t n = numOps;
666   auto beginMicro = nowMicro();
667
668   vector<std::thread> threads(numThreads);
669   std::atomic<uint64_t> sum(0);
670   for (int t = 0; t < numThreads; ++t) {
671     threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
672                                           numThreads,
673                                           n,
674                                           std::ref(cq),
675                                           std::ref(sum),
676                                           t));
677   }
678   for (auto& t : threads) {
679     DSched::join(t);
680   }
681   EXPECT_TRUE(cq.isEmpty());
682   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
683
684   return nowMicro() - beginMicro;
685 }
686
687 TEST(MPMCQueue, mt_never_fail_until_system) {
688   int nts[] = {1, 3, 100};
689
690   int n = 100000;
691   for (int nt : nts) {
692     uint64_t elapsed =
693         runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
694     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
695               << " threads";
696   }
697 }
698
699 TEST(MPMCQueue, mt_never_fail_until_steady) {
700   int nts[] = {1, 3, 100};
701
702   int n = 100000;
703   for (int nt : nts) {
704     uint64_t elapsed =
705         runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
706     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
707               << " threads";
708   }
709 }
710
711 enum LifecycleEvent {
712   NOTHING = -1,
713   DEFAULT_CONSTRUCTOR,
714   COPY_CONSTRUCTOR,
715   MOVE_CONSTRUCTOR,
716   TWO_ARG_CONSTRUCTOR,
717   COPY_OPERATOR,
718   MOVE_OPERATOR,
719   DESTRUCTOR,
720   MAX_LIFECYCLE_EVENT
721 };
722
723 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
724 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
725
726 static int lc_outstanding() {
727   return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
728       lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
729       lc_counts[DESTRUCTOR];
730 }
731
732 static void lc_snap() {
733   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
734     lc_prev[i] = lc_counts[i];
735   }
736 }
737
738 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
739
740 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
741   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
742     int delta = i == what || i == what2 ? 1 : 0;
743     EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
744         << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
745         << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
746         << ", from line " << lineno;
747   }
748   lc_snap();
749 }
750
751 template <typename R>
752 struct Lifecycle {
753   typedef R IsRelocatable;
754
755   bool constructed;
756
757   Lifecycle() noexcept : constructed(true) {
758     ++lc_counts[DEFAULT_CONSTRUCTOR];
759   }
760
761   explicit Lifecycle(int /* n */, char const* /* s */) noexcept
762       : constructed(true) {
763     ++lc_counts[TWO_ARG_CONSTRUCTOR];
764   }
765
766   Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
767     ++lc_counts[COPY_CONSTRUCTOR];
768   }
769
770   Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
771     ++lc_counts[MOVE_CONSTRUCTOR];
772   }
773
774   Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
775     ++lc_counts[COPY_OPERATOR];
776     return *this;
777   }
778
779   Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
780     ++lc_counts[MOVE_OPERATOR];
781     return *this;
782   }
783
784   ~Lifecycle() noexcept {
785     ++lc_counts[DESTRUCTOR];
786     assert(lc_outstanding() >= 0);
787     assert(constructed);
788     constructed = false;
789   }
790 };
791
792 template <typename R>
793 void runPerfectForwardingTest() {
794   lc_snap();
795   EXPECT_EQ(lc_outstanding(), 0);
796
797   {
798     MPMCQueue<Lifecycle<R>> queue(50);
799     LIFECYCLE_STEP(NOTHING);
800
801     for (int pass = 0; pass < 10; ++pass) {
802       for (int i = 0; i < 10; ++i) {
803         queue.blockingWrite();
804         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
805
806         queue.blockingWrite(1, "one");
807         LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
808
809         {
810           Lifecycle<R> src;
811           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
812           queue.blockingWrite(std::move(src));
813           LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
814         }
815         LIFECYCLE_STEP(DESTRUCTOR);
816
817         {
818           Lifecycle<R> src;
819           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
820           queue.blockingWrite(src);
821           LIFECYCLE_STEP(COPY_CONSTRUCTOR);
822         }
823         LIFECYCLE_STEP(DESTRUCTOR);
824
825         EXPECT_TRUE(queue.write());
826         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
827       }
828
829       EXPECT_EQ(queue.size(), 50);
830       EXPECT_FALSE(queue.write(2, "two"));
831       LIFECYCLE_STEP(NOTHING);
832
833       for (int i = 0; i < 50; ++i) {
834         {
835           Lifecycle<R> node;
836           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
837
838           queue.blockingRead(node);
839           if (R::value) {
840             // relocatable, moved via memcpy
841             LIFECYCLE_STEP(DESTRUCTOR);
842           } else {
843             LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
844           }
845         }
846         LIFECYCLE_STEP(DESTRUCTOR);
847       }
848
849       EXPECT_EQ(queue.size(), 0);
850     }
851
852     // put one element back before destruction
853     {
854       Lifecycle<R> src(3, "three");
855       LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
856       queue.write(std::move(src));
857       LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
858     }
859     LIFECYCLE_STEP(DESTRUCTOR); // destroy src
860   }
861   LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
862
863   EXPECT_EQ(lc_outstanding(), 0);
864 }
865
866 TEST(MPMCQueue, perfect_forwarding) {
867   runPerfectForwardingTest<std::false_type>();
868 }
869
870 TEST(MPMCQueue, perfect_forwarding_relocatable) {
871   runPerfectForwardingTest<std::true_type>();
872 }
873
874 TEST(MPMCQueue, queue_moving) {
875   lc_snap();
876   EXPECT_EQ(lc_outstanding(), 0);
877
878   {
879     MPMCQueue<Lifecycle<std::false_type>> a(50);
880     LIFECYCLE_STEP(NOTHING);
881
882     a.blockingWrite();
883     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
884
885     // move constructor
886     MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
887     LIFECYCLE_STEP(NOTHING);
888     EXPECT_EQ(a.capacity(), 0);
889     EXPECT_EQ(a.size(), 0);
890     EXPECT_EQ(b.capacity(), 50);
891     EXPECT_EQ(b.size(), 1);
892
893     b.blockingWrite();
894     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
895
896     // move operator
897     MPMCQueue<Lifecycle<std::false_type>> c;
898     LIFECYCLE_STEP(NOTHING);
899     c = std::move(b);
900     LIFECYCLE_STEP(NOTHING);
901     EXPECT_EQ(c.capacity(), 50);
902     EXPECT_EQ(c.size(), 2);
903
904     {
905       Lifecycle<std::false_type> dst;
906       LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
907       c.blockingRead(dst);
908       LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
909
910       {
911         // swap
912         MPMCQueue<Lifecycle<std::false_type>> d(10);
913         LIFECYCLE_STEP(NOTHING);
914         std::swap(c, d);
915         LIFECYCLE_STEP(NOTHING);
916         EXPECT_EQ(c.capacity(), 10);
917         EXPECT_TRUE(c.isEmpty());
918         EXPECT_EQ(d.capacity(), 50);
919         EXPECT_EQ(d.size(), 1);
920
921         d.blockingRead(dst);
922         LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
923
924         c.blockingWrite(dst);
925         LIFECYCLE_STEP(COPY_CONSTRUCTOR);
926
927         d.blockingWrite(std::move(dst));
928         LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
929       } // d goes out of scope
930       LIFECYCLE_STEP(DESTRUCTOR);
931     } // dst goes out of scope
932     LIFECYCLE_STEP(DESTRUCTOR);
933   } // c goes out of scope
934   LIFECYCLE_STEP(DESTRUCTOR);
935 }
936
937 TEST(MPMCQueue, explicit_zero_capacity_fail) {
938   ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
939 }