Use FOLLY_TLS rather than __thread
[folly.git] / folly / test / MPMCQueueTest.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/SysResource.h>
21 #include <folly/portability/SysTime.h>
22 #include <folly/portability/Unistd.h>
23 #include <folly/test/DeterministicSchedule.h>
24
25 #include <boost/intrusive_ptr.hpp>
26 #include <memory>
27 #include <functional>
28 #include <thread>
29 #include <utility>
30
31 #include <gtest/gtest.h>
32
33 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
34
35 using namespace folly;
36 using namespace detail;
37 using namespace test;
38 using std::chrono::time_point;
39 using std::chrono::steady_clock;
40 using std::chrono::seconds;
41 using std::chrono::milliseconds;
42 using std::string;
43 using std::unique_ptr;
44 using std::vector;
45
46 typedef DeterministicSchedule DSched;
47
48 template <template<typename> class Atom>
49 void run_mt_sequencer_thread(
50     int numThreads,
51     int numOps,
52     uint32_t init,
53     TurnSequencer<Atom>& seq,
54     Atom<uint32_t>& spinThreshold,
55     int& prev,
56     int i) {
57   for (int op = i; op < numOps; op += numThreads) {
58     seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
59     EXPECT_EQ(prev, op - 1);
60     prev = op;
61     seq.completeTurn(init + op);
62   }
63 }
64
65 template <template<typename> class Atom>
66 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
67   TurnSequencer<Atom> seq(init);
68   Atom<uint32_t> spinThreshold(0);
69
70   int prev = -1;
71   vector<std::thread> threads(numThreads);
72   for (int i = 0; i < numThreads; ++i) {
73     threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
74           numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
75           std::ref(prev), i));
76   }
77
78   for (auto& thr : threads) {
79     DSched::join(thr);
80   }
81
82   EXPECT_EQ(prev, numOps - 1);
83 }
84
85 TEST(MPMCQueue, sequencer) {
86   run_mt_sequencer_test<std::atomic>(1, 100, 0);
87   run_mt_sequencer_test<std::atomic>(2, 100000, -100);
88   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
89 }
90
91 TEST(MPMCQueue, sequencer_emulated_futex) {
92   run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
93   run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
94   run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
95 }
96
97 TEST(MPMCQueue, sequencer_deterministic) {
98   DSched sched(DSched::uniform(0));
99   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
100   run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
101   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
102 }
103
104 template <typename T>
105 void runElementTypeTest(T&& src) {
106   MPMCQueue<T> cq(10);
107   cq.blockingWrite(std::forward<T>(src));
108   T dest;
109   cq.blockingRead(dest);
110   EXPECT_TRUE(cq.write(std::move(dest)));
111   EXPECT_TRUE(cq.read(dest));
112   auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
113   EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
114   EXPECT_TRUE(cq.read(dest));
115   auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
116   EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
117   EXPECT_TRUE(cq.read(dest));
118 }
119
120 struct RefCounted {
121   static FOLLY_TLS int active_instances;
122
123   mutable std::atomic<int> rc;
124
125   RefCounted() : rc(0) {
126     ++active_instances;
127   }
128
129   ~RefCounted() {
130     --active_instances;
131   }
132 };
133 FOLLY_TLS int RefCounted::active_instances;
134
135 void intrusive_ptr_add_ref(RefCounted const* p) {
136   p->rc++;
137 }
138
139 void intrusive_ptr_release(RefCounted const* p) {
140   if (--(p->rc) == 0) {
141     delete p;
142   }
143 }
144
145 TEST(MPMCQueue, lots_of_element_types) {
146   runElementTypeTest(10);
147   runElementTypeTest(string("abc"));
148   runElementTypeTest(std::make_pair(10, string("def")));
149   runElementTypeTest(vector<string>{{"abc"}});
150   runElementTypeTest(std::make_shared<char>('a'));
151   runElementTypeTest(folly::make_unique<char>('a'));
152   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
153   EXPECT_EQ(RefCounted::active_instances, 0);
154 }
155
156 TEST(MPMCQueue, single_thread_enqdeq) {
157   MPMCQueue<int> cq(10);
158
159   for (int pass = 0; pass < 10; ++pass) {
160     for (int i = 0; i < 10; ++i) {
161       EXPECT_TRUE(cq.write(i));
162     }
163     EXPECT_FALSE(cq.write(-1));
164     EXPECT_FALSE(cq.isEmpty());
165     EXPECT_EQ(cq.size(), 10);
166
167     for (int i = 0; i < 5; ++i) {
168       int dest = -1;
169       EXPECT_TRUE(cq.read(dest));
170       EXPECT_EQ(dest, i);
171     }
172     for (int i = 5; i < 10; ++i) {
173       int dest = -1;
174       cq.blockingRead(dest);
175       EXPECT_EQ(dest, i);
176     }
177     int dest = -1;
178     EXPECT_FALSE(cq.read(dest));
179     EXPECT_EQ(dest, -1);
180
181     EXPECT_TRUE(cq.isEmpty());
182     EXPECT_EQ(cq.size(), 0);
183   }
184 }
185
186 TEST(MPMCQueue, tryenq_capacity_test) {
187   for (size_t cap = 1; cap < 100; ++cap) {
188     MPMCQueue<int> cq(cap);
189     for (size_t i = 0; i < cap; ++i) {
190       EXPECT_TRUE(cq.write(i));
191     }
192     EXPECT_FALSE(cq.write(100));
193   }
194 }
195
196 TEST(MPMCQueue, enq_capacity_test) {
197   for (auto cap : { 1, 100, 10000 }) {
198     MPMCQueue<int> cq(cap);
199     for (int i = 0; i < cap; ++i) {
200       cq.blockingWrite(i);
201     }
202     int t = 0;
203     int when;
204     auto thr = std::thread([&]{
205       cq.blockingWrite(100);
206       when = t;
207     });
208     usleep(2000);
209     t = 1;
210     int dummy;
211     cq.blockingRead(dummy);
212     thr.join();
213     EXPECT_EQ(when, 1);
214   }
215 }
216
217 template <template<typename> class Atom>
218 void runTryEnqDeqThread(
219     int numThreads,
220     int n, /*numOps*/
221     MPMCQueue<int, Atom>& cq,
222     std::atomic<uint64_t>& sum,
223     int t) {
224   uint64_t threadSum = 0;
225   int src = t;
226   // received doesn't reflect any actual values, we just start with
227   // t and increment by numThreads to get the rounding of termination
228   // correct if numThreads doesn't evenly divide numOps
229   int received = t;
230   while (src < n || received < n) {
231     if (src < n && cq.write(src)) {
232       src += numThreads;
233     }
234
235     int dst;
236     if (received < n && cq.read(dst)) {
237       received += numThreads;
238       threadSum += dst;
239     }
240   }
241   sum += threadSum;
242 }
243
244 template <template<typename> class Atom>
245 void runTryEnqDeqTest(int numThreads, int numOps) {
246   // write and read aren't linearizable, so we don't have
247   // hard guarantees on their individual behavior.  We can still test
248   // correctness in aggregate
249   MPMCQueue<int,Atom> cq(numThreads);
250
251   uint64_t n = numOps;
252   vector<std::thread> threads(numThreads);
253   std::atomic<uint64_t> sum(0);
254   for (int t = 0; t < numThreads; ++t) {
255     threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
256           numThreads, n, std::ref(cq), std::ref(sum), t));
257   }
258   for (auto& t : threads) {
259     DSched::join(t);
260   }
261   EXPECT_TRUE(cq.isEmpty());
262   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
263 }
264
265 TEST(MPMCQueue, mt_try_enq_deq) {
266   int nts[] = { 1, 3, 100 };
267
268   int n = 100000;
269   for (int nt : nts) {
270     runTryEnqDeqTest<std::atomic>(nt, n);
271   }
272 }
273
274 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
275   int nts[] = { 1, 3, 100 };
276
277   int n = 100000;
278   for (int nt : nts) {
279     runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
280   }
281 }
282
283 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
284   int nts[] = { 3, 10 };
285
286   long seed = 0;
287   LOG(INFO) << "using seed " << seed;
288
289   int n = 1000;
290   for (int nt : nts) {
291     {
292       DSched sched(DSched::uniform(seed));
293       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
294     }
295     {
296       DSched sched(DSched::uniformSubset(seed, 2));
297       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
298     }
299   }
300 }
301
302 uint64_t nowMicro() {
303   timeval tv;
304   gettimeofday(&tv, 0);
305   return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
306 }
307
308 template <typename Q>
309 struct WriteMethodCaller {
310   WriteMethodCaller() {}
311   virtual ~WriteMethodCaller() = default;
312   virtual bool callWrite(Q& q, int i) = 0;
313   virtual string methodName() = 0;
314 };
315
316 template <typename Q>
317 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
318   bool callWrite(Q& q, int i) override {
319     q.blockingWrite(i);
320     return true;
321   }
322   string methodName() override { return "blockingWrite"; }
323 };
324
325 template <typename Q>
326 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
327   bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
328   string methodName() override { return "writeIfNotFull"; }
329 };
330
331 template <typename Q>
332 struct WriteCaller : public WriteMethodCaller<Q> {
333   bool callWrite(Q& q, int i) override { return q.write(i); }
334   string methodName() override { return "write"; }
335 };
336
337 template <typename Q,
338           class Clock = steady_clock,
339           class Duration = typename Clock::duration>
340 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
341   const Duration duration_;
342   explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
343   bool callWrite(Q& q, int i) override {
344     auto then = Clock::now() + duration_;
345     return q.tryWriteUntil(then, i);
346   }
347   string methodName() override {
348     return folly::sformat(
349         "tryWriteUntil({}ms)",
350         std::chrono::duration_cast<milliseconds>(duration_).count());
351   }
352 };
353
354 template <typename Q>
355 string producerConsumerBench(Q&& queue,
356                              string qName,
357                              int numProducers,
358                              int numConsumers,
359                              int numOps,
360                              WriteMethodCaller<Q>& writer,
361                              bool ignoreContents = false) {
362   Q& q = queue;
363
364   struct rusage beginUsage;
365   getrusage(RUSAGE_SELF, &beginUsage);
366
367   auto beginMicro = nowMicro();
368
369   uint64_t n = numOps;
370   std::atomic<uint64_t> sum(0);
371   std::atomic<uint64_t> failed(0);
372
373   vector<std::thread> producers(numProducers);
374   for (int t = 0; t < numProducers; ++t) {
375     producers[t] = DSched::thread([&,t]{
376       for (int i = t; i < numOps; i += numProducers) {
377         while (!writer.callWrite(q, i)) {
378           ++failed;
379         }
380       }
381     });
382   }
383
384   vector<std::thread> consumers(numConsumers);
385   for (int t = 0; t < numConsumers; ++t) {
386     consumers[t] = DSched::thread([&,t]{
387       uint64_t localSum = 0;
388       for (int i = t; i < numOps; i += numConsumers) {
389         int dest = -1;
390         q.blockingRead(dest);
391         EXPECT_FALSE(dest == -1);
392         localSum += dest;
393       }
394       sum += localSum;
395     });
396   }
397
398   for (auto& t : producers) {
399     DSched::join(t);
400   }
401   for (auto& t : consumers) {
402     DSched::join(t);
403   }
404   if (!ignoreContents) {
405     EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
406   }
407
408   auto endMicro = nowMicro();
409
410   struct rusage endUsage;
411   getrusage(RUSAGE_SELF, &endUsage);
412
413   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
414   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
415       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
416   uint64_t failures = failed;
417
418   return folly::sformat(
419       "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
420       "handoff, {} failures",
421       qName,
422       numProducers,
423       writer.methodName(),
424       numConsumers,
425       nanosPer,
426       csw,
427       n,
428       failures);
429 }
430
431 TEST(MPMCQueue, mt_prod_cons_deterministic) {
432   // we use the Bench method, but perf results are meaningless under DSched
433   DSched sched(DSched::uniform(0));
434
435   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
436       callers;
437   callers.emplace_back(
438       make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
439   callers.emplace_back(
440       make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
441   callers.emplace_back(
442       make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
443   callers.emplace_back(
444       make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
445           milliseconds(1)));
446   callers.emplace_back(
447       make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
448           seconds(2)));
449
450   for (const auto& caller : callers) {
451     LOG(INFO)
452         << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
453                                  "MPMCQueue<int, DeterministicAtomic>(10)",
454                                  1,
455                                  1,
456                                  1000,
457                                  *caller);
458     LOG(INFO)
459         << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
460                                  "MPMCQueue<int, DeterministicAtomic>(100)",
461                                  10,
462                                  10,
463                                  1000,
464                                  *caller);
465     LOG(INFO)
466         << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
467                                  "MPMCQueue<int, DeterministicAtomic>(10)",
468                                  1,
469                                  1,
470                                  1000,
471                                  *caller);
472     LOG(INFO)
473         << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
474                                  "MPMCQueue<int, DeterministicAtomic>(100)",
475                                  10,
476                                  10,
477                                  1000,
478                                  *caller);
479     LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
480                                        "MPMCQueue<int, DeterministicAtomic>(1)",
481                                        10,
482                                        10,
483                                        1000,
484                                        *caller);
485   }
486 }
487
488 #define PC_BENCH(q, np, nc, ...) \
489     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
490
491 TEST(MPMCQueue, mt_prod_cons) {
492   int n = 100000;
493   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
494   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
495   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
496   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
497   callers.emplace_back(
498       make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
499   callers.emplace_back(
500       make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
501   for (const auto& caller : callers) {
502     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
503     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
504     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
505     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
506     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
507     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
508     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
509     LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
510     LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
511   }
512 }
513
514 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
515   int n = 100000;
516   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
517       callers;
518   callers.emplace_back(
519       make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
520   callers.emplace_back(
521       make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
522   callers.emplace_back(
523       make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
524   callers.emplace_back(
525       make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
526           milliseconds(1)));
527   callers.emplace_back(
528       make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
529           seconds(2)));
530   for (const auto& caller : callers) {
531     LOG(INFO) << PC_BENCH(
532         (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
533     LOG(INFO) << PC_BENCH(
534         (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
535     LOG(INFO) << PC_BENCH(
536         (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
537     LOG(INFO) << PC_BENCH(
538         (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
539     LOG(INFO) << PC_BENCH(
540         (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
541     LOG(INFO) << PC_BENCH(
542         (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
543     LOG(INFO) << PC_BENCH(
544         (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
545     LOG(INFO) << PC_BENCH(
546         (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
547     LOG(INFO) << PC_BENCH(
548         (MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
549   }
550 }
551
552 template <template <typename> class Atom>
553 void runNeverFailThread(int numThreads,
554                         int n, /*numOps*/
555                         MPMCQueue<int, Atom>& cq,
556                         std::atomic<uint64_t>& sum,
557                         int t) {
558   uint64_t threadSum = 0;
559   for (int i = t; i < n; i += numThreads) {
560     // enq + deq
561     EXPECT_TRUE(cq.writeIfNotFull(i));
562
563     int dest = -1;
564     EXPECT_TRUE(cq.readIfNotEmpty(dest));
565     EXPECT_TRUE(dest >= 0);
566     threadSum += dest;
567   }
568   sum += threadSum;
569 }
570
571 template <template <typename> class Atom>
572 uint64_t runNeverFailTest(int numThreads, int numOps) {
573   // always #enq >= #deq
574   MPMCQueue<int, Atom> cq(numThreads);
575
576   uint64_t n = numOps;
577   auto beginMicro = nowMicro();
578
579   vector<std::thread> threads(numThreads);
580   std::atomic<uint64_t> sum(0);
581   for (int t = 0; t < numThreads; ++t) {
582     threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
583                                           numThreads,
584                                           n,
585                                           std::ref(cq),
586                                           std::ref(sum),
587                                           t));
588   }
589   for (auto& t : threads) {
590     DSched::join(t);
591   }
592   EXPECT_TRUE(cq.isEmpty());
593   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
594
595   return nowMicro() - beginMicro;
596 }
597
598 TEST(MPMCQueue, mt_never_fail) {
599   int nts[] = {1, 3, 100};
600
601   int n = 100000;
602   for (int nt : nts) {
603     uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
604     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
605               << " threads";
606   }
607 }
608
609 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
610   int nts[] = {1, 3, 100};
611
612   int n = 100000;
613   for (int nt : nts) {
614     uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
615     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
616               << " threads";
617   }
618 }
619
620 TEST(MPMCQueue, mt_never_fail_deterministic) {
621   int nts[] = {3, 10};
622
623   long seed = 0; // nowMicro() % 10000;
624   LOG(INFO) << "using seed " << seed;
625
626   int n = 1000;
627   for (int nt : nts) {
628     {
629       DSched sched(DSched::uniform(seed));
630       runNeverFailTest<DeterministicAtomic>(nt, n);
631     }
632     {
633       DSched sched(DSched::uniformSubset(seed, 2));
634       runNeverFailTest<DeterministicAtomic>(nt, n);
635     }
636   }
637 }
638
639 template <class Clock, template <typename> class Atom>
640 void runNeverFailUntilThread(int numThreads,
641                              int n, /*numOps*/
642                              MPMCQueue<int, Atom>& cq,
643                              std::atomic<uint64_t>& sum,
644                              int t) {
645   uint64_t threadSum = 0;
646   for (int i = t; i < n; i += numThreads) {
647     // enq + deq
648     auto soon = Clock::now() + std::chrono::seconds(1);
649     EXPECT_TRUE(cq.tryWriteUntil(soon, i));
650
651     int dest = -1;
652     EXPECT_TRUE(cq.readIfNotEmpty(dest));
653     EXPECT_TRUE(dest >= 0);
654     threadSum += dest;
655   }
656   sum += threadSum;
657 }
658
659 template <class Clock, template <typename> class Atom>
660 uint64_t runNeverFailTest(int numThreads, int numOps) {
661   // always #enq >= #deq
662   MPMCQueue<int, Atom> cq(numThreads);
663
664   uint64_t n = numOps;
665   auto beginMicro = nowMicro();
666
667   vector<std::thread> threads(numThreads);
668   std::atomic<uint64_t> sum(0);
669   for (int t = 0; t < numThreads; ++t) {
670     threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
671                                           numThreads,
672                                           n,
673                                           std::ref(cq),
674                                           std::ref(sum),
675                                           t));
676   }
677   for (auto& t : threads) {
678     DSched::join(t);
679   }
680   EXPECT_TRUE(cq.isEmpty());
681   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
682
683   return nowMicro() - beginMicro;
684 }
685
686 TEST(MPMCQueue, mt_never_fail_until_system) {
687   int nts[] = {1, 3, 100};
688
689   int n = 100000;
690   for (int nt : nts) {
691     uint64_t elapsed =
692         runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
693     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
694               << " threads";
695   }
696 }
697
698 TEST(MPMCQueue, mt_never_fail_until_steady) {
699   int nts[] = {1, 3, 100};
700
701   int n = 100000;
702   for (int nt : nts) {
703     uint64_t elapsed =
704         runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
705     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
706               << " threads";
707   }
708 }
709
710 enum LifecycleEvent {
711   NOTHING = -1,
712   DEFAULT_CONSTRUCTOR,
713   COPY_CONSTRUCTOR,
714   MOVE_CONSTRUCTOR,
715   TWO_ARG_CONSTRUCTOR,
716   COPY_OPERATOR,
717   MOVE_OPERATOR,
718   DESTRUCTOR,
719   MAX_LIFECYCLE_EVENT
720 };
721
722 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
723 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
724
725 static int lc_outstanding() {
726   return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
727       lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
728       lc_counts[DESTRUCTOR];
729 }
730
731 static void lc_snap() {
732   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
733     lc_prev[i] = lc_counts[i];
734   }
735 }
736
737 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
738
739 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
740   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
741     int delta = i == what || i == what2 ? 1 : 0;
742     EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
743         << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
744         << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
745         << ", from line " << lineno;
746   }
747   lc_snap();
748 }
749
750 template <typename R>
751 struct Lifecycle {
752   typedef R IsRelocatable;
753
754   bool constructed;
755
756   Lifecycle() noexcept : constructed(true) {
757     ++lc_counts[DEFAULT_CONSTRUCTOR];
758   }
759
760   explicit Lifecycle(int /* n */, char const* /* s */) noexcept
761       : constructed(true) {
762     ++lc_counts[TWO_ARG_CONSTRUCTOR];
763   }
764
765   Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
766     ++lc_counts[COPY_CONSTRUCTOR];
767   }
768
769   Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
770     ++lc_counts[MOVE_CONSTRUCTOR];
771   }
772
773   Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
774     ++lc_counts[COPY_OPERATOR];
775     return *this;
776   }
777
778   Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
779     ++lc_counts[MOVE_OPERATOR];
780     return *this;
781   }
782
783   ~Lifecycle() noexcept {
784     ++lc_counts[DESTRUCTOR];
785     assert(lc_outstanding() >= 0);
786     assert(constructed);
787     constructed = false;
788   }
789 };
790
791 template <typename R>
792 void runPerfectForwardingTest() {
793   lc_snap();
794   EXPECT_EQ(lc_outstanding(), 0);
795
796   {
797     MPMCQueue<Lifecycle<R>> queue(50);
798     LIFECYCLE_STEP(NOTHING);
799
800     for (int pass = 0; pass < 10; ++pass) {
801       for (int i = 0; i < 10; ++i) {
802         queue.blockingWrite();
803         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
804
805         queue.blockingWrite(1, "one");
806         LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
807
808         {
809           Lifecycle<R> src;
810           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
811           queue.blockingWrite(std::move(src));
812           LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
813         }
814         LIFECYCLE_STEP(DESTRUCTOR);
815
816         {
817           Lifecycle<R> src;
818           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
819           queue.blockingWrite(src);
820           LIFECYCLE_STEP(COPY_CONSTRUCTOR);
821         }
822         LIFECYCLE_STEP(DESTRUCTOR);
823
824         EXPECT_TRUE(queue.write());
825         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
826       }
827
828       EXPECT_EQ(queue.size(), 50);
829       EXPECT_FALSE(queue.write(2, "two"));
830       LIFECYCLE_STEP(NOTHING);
831
832       for (int i = 0; i < 50; ++i) {
833         {
834           Lifecycle<R> node;
835           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
836
837           queue.blockingRead(node);
838           if (R::value) {
839             // relocatable, moved via memcpy
840             LIFECYCLE_STEP(DESTRUCTOR);
841           } else {
842             LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
843           }
844         }
845         LIFECYCLE_STEP(DESTRUCTOR);
846       }
847
848       EXPECT_EQ(queue.size(), 0);
849     }
850
851     // put one element back before destruction
852     {
853       Lifecycle<R> src(3, "three");
854       LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
855       queue.write(std::move(src));
856       LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
857     }
858     LIFECYCLE_STEP(DESTRUCTOR); // destroy src
859   }
860   LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
861
862   EXPECT_EQ(lc_outstanding(), 0);
863 }
864
865 TEST(MPMCQueue, perfect_forwarding) {
866   runPerfectForwardingTest<std::false_type>();
867 }
868
869 TEST(MPMCQueue, perfect_forwarding_relocatable) {
870   runPerfectForwardingTest<std::true_type>();
871 }
872
873 TEST(MPMCQueue, queue_moving) {
874   lc_snap();
875   EXPECT_EQ(lc_outstanding(), 0);
876
877   {
878     MPMCQueue<Lifecycle<std::false_type>> a(50);
879     LIFECYCLE_STEP(NOTHING);
880
881     a.blockingWrite();
882     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
883
884     // move constructor
885     MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
886     LIFECYCLE_STEP(NOTHING);
887     EXPECT_EQ(a.capacity(), 0);
888     EXPECT_EQ(a.size(), 0);
889     EXPECT_EQ(b.capacity(), 50);
890     EXPECT_EQ(b.size(), 1);
891
892     b.blockingWrite();
893     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
894
895     // move operator
896     MPMCQueue<Lifecycle<std::false_type>> c;
897     LIFECYCLE_STEP(NOTHING);
898     c = std::move(b);
899     LIFECYCLE_STEP(NOTHING);
900     EXPECT_EQ(c.capacity(), 50);
901     EXPECT_EQ(c.size(), 2);
902
903     {
904       Lifecycle<std::false_type> dst;
905       LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
906       c.blockingRead(dst);
907       LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
908
909       {
910         // swap
911         MPMCQueue<Lifecycle<std::false_type>> d(10);
912         LIFECYCLE_STEP(NOTHING);
913         std::swap(c, d);
914         LIFECYCLE_STEP(NOTHING);
915         EXPECT_EQ(c.capacity(), 10);
916         EXPECT_TRUE(c.isEmpty());
917         EXPECT_EQ(d.capacity(), 50);
918         EXPECT_EQ(d.size(), 1);
919
920         d.blockingRead(dst);
921         LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
922
923         c.blockingWrite(dst);
924         LIFECYCLE_STEP(COPY_CONSTRUCTOR);
925
926         d.blockingWrite(std::move(dst));
927         LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
928       } // d goes out of scope
929       LIFECYCLE_STEP(DESTRUCTOR);
930     } // dst goes out of scope
931     LIFECYCLE_STEP(DESTRUCTOR);
932   } // c goes out of scope
933   LIFECYCLE_STEP(DESTRUCTOR);
934 }
935
936 TEST(MPMCQueue, explicit_zero_capacity_fail) {
937   ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
938 }