Remove extra `int main`s from unit tests.
[folly.git] / folly / test / MPMCQueueTest.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/test/DeterministicSchedule.h>
21
22 #include <boost/intrusive_ptr.hpp>
23 #include <memory>
24 #include <functional>
25 #include <thread>
26 #include <utility>
27 #include <unistd.h>
28 #include <sys/time.h>
29 #include <sys/resource.h>
30
31 #include <gtest/gtest.h>
32
33 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
34
35 using namespace folly;
36 using namespace detail;
37 using namespace test;
38
39 typedef DeterministicSchedule DSched;
40
41 template <template<typename> class Atom>
42 void run_mt_sequencer_thread(
43     int numThreads,
44     int numOps,
45     uint32_t init,
46     TurnSequencer<Atom>& seq,
47     Atom<uint32_t>& spinThreshold,
48     int& prev,
49     int i) {
50   for (int op = i; op < numOps; op += numThreads) {
51     seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
52     EXPECT_EQ(prev, op - 1);
53     prev = op;
54     seq.completeTurn(init + op);
55   }
56 }
57
58 template <template<typename> class Atom>
59 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
60   TurnSequencer<Atom> seq(init);
61   Atom<uint32_t> spinThreshold(0);
62
63   int prev = -1;
64   std::vector<std::thread> threads(numThreads);
65   for (int i = 0; i < numThreads; ++i) {
66     threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
67           numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
68           std::ref(prev), i));
69   }
70
71   for (auto& thr : threads) {
72     DSched::join(thr);
73   }
74
75   EXPECT_EQ(prev, numOps - 1);
76 }
77
78 TEST(MPMCQueue, sequencer) {
79   run_mt_sequencer_test<std::atomic>(1, 100, 0);
80   run_mt_sequencer_test<std::atomic>(2, 100000, -100);
81   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
82 }
83
84 TEST(MPMCQueue, sequencer_emulated_futex) {
85   run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
86   run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
87   run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
88 }
89
90 TEST(MPMCQueue, sequencer_deterministic) {
91   DSched sched(DSched::uniform(0));
92   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
93   run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
94   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
95 }
96
97 template <typename T>
98 void runElementTypeTest(T&& src) {
99   MPMCQueue<T> cq(10);
100   cq.blockingWrite(std::move(src));
101   T dest;
102   cq.blockingRead(dest);
103   EXPECT_TRUE(cq.write(std::move(dest)));
104   EXPECT_TRUE(cq.read(dest));
105 }
106
107 struct RefCounted {
108   static __thread int active_instances;
109
110   mutable std::atomic<int> rc;
111
112   RefCounted() : rc(0) {
113     ++active_instances;
114   }
115
116   ~RefCounted() {
117     --active_instances;
118   }
119 };
120 __thread int RefCounted::active_instances;
121
122
123 void intrusive_ptr_add_ref(RefCounted const* p) {
124   p->rc++;
125 }
126
127 void intrusive_ptr_release(RefCounted const* p) {
128   if (--(p->rc) == 0) {
129     delete p;
130   }
131 }
132
133 TEST(MPMCQueue, lots_of_element_types) {
134   runElementTypeTest(10);
135   runElementTypeTest(std::string("abc"));
136   runElementTypeTest(std::make_pair(10, std::string("def")));
137   runElementTypeTest(std::vector<std::string>{ { "abc" } });
138   runElementTypeTest(std::make_shared<char>('a'));
139   runElementTypeTest(folly::make_unique<char>('a'));
140   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
141   EXPECT_EQ(RefCounted::active_instances, 0);
142 }
143
144 TEST(MPMCQueue, single_thread_enqdeq) {
145   MPMCQueue<int> cq(10);
146
147   for (int pass = 0; pass < 10; ++pass) {
148     for (int i = 0; i < 10; ++i) {
149       EXPECT_TRUE(cq.write(i));
150     }
151     EXPECT_FALSE(cq.write(-1));
152     EXPECT_FALSE(cq.isEmpty());
153     EXPECT_EQ(cq.size(), 10);
154
155     for (int i = 0; i < 5; ++i) {
156       int dest = -1;
157       EXPECT_TRUE(cq.read(dest));
158       EXPECT_EQ(dest, i);
159     }
160     for (int i = 5; i < 10; ++i) {
161       int dest = -1;
162       cq.blockingRead(dest);
163       EXPECT_EQ(dest, i);
164     }
165     int dest = -1;
166     EXPECT_FALSE(cq.read(dest));
167     EXPECT_EQ(dest, -1);
168
169     EXPECT_TRUE(cq.isEmpty());
170     EXPECT_EQ(cq.size(), 0);
171   }
172 }
173
174 TEST(MPMCQueue, tryenq_capacity_test) {
175   for (size_t cap = 1; cap < 100; ++cap) {
176     MPMCQueue<int> cq(cap);
177     for (size_t i = 0; i < cap; ++i) {
178       EXPECT_TRUE(cq.write(i));
179     }
180     EXPECT_FALSE(cq.write(100));
181   }
182 }
183
184 TEST(MPMCQueue, enq_capacity_test) {
185   for (auto cap : { 1, 100, 10000 }) {
186     MPMCQueue<int> cq(cap);
187     for (int i = 0; i < cap; ++i) {
188       cq.blockingWrite(i);
189     }
190     int t = 0;
191     int when;
192     auto thr = std::thread([&]{
193       cq.blockingWrite(100);
194       when = t;
195     });
196     usleep(2000);
197     t = 1;
198     int dummy;
199     cq.blockingRead(dummy);
200     thr.join();
201     EXPECT_EQ(when, 1);
202   }
203 }
204
205 template <template<typename> class Atom>
206 void runTryEnqDeqThread(
207     int numThreads,
208     int n, /*numOps*/
209     MPMCQueue<int, Atom>& cq,
210     std::atomic<uint64_t>& sum,
211     int t) {
212   uint64_t threadSum = 0;
213   int src = t;
214   // received doesn't reflect any actual values, we just start with
215   // t and increment by numThreads to get the rounding of termination
216   // correct if numThreads doesn't evenly divide numOps
217   int received = t;
218   while (src < n || received < n) {
219     if (src < n && cq.write(src)) {
220       src += numThreads;
221     }
222
223     int dst;
224     if (received < n && cq.read(dst)) {
225       received += numThreads;
226       threadSum += dst;
227     }
228   }
229   sum += threadSum;
230 }
231
232 template <template<typename> class Atom>
233 void runTryEnqDeqTest(int numThreads, int numOps) {
234   // write and read aren't linearizable, so we don't have
235   // hard guarantees on their individual behavior.  We can still test
236   // correctness in aggregate
237   MPMCQueue<int,Atom> cq(numThreads);
238
239   uint64_t n = numOps;
240   std::vector<std::thread> threads(numThreads);
241   std::atomic<uint64_t> sum(0);
242   for (int t = 0; t < numThreads; ++t) {
243     threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
244           numThreads, n, std::ref(cq), std::ref(sum), t));
245   }
246   for (auto& t : threads) {
247     DSched::join(t);
248   }
249   EXPECT_TRUE(cq.isEmpty());
250   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
251 }
252
253 TEST(MPMCQueue, mt_try_enq_deq) {
254   int nts[] = { 1, 3, 100 };
255
256   int n = 100000;
257   for (int nt : nts) {
258     runTryEnqDeqTest<std::atomic>(nt, n);
259   }
260 }
261
262 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
263   int nts[] = { 1, 3, 100 };
264
265   int n = 100000;
266   for (int nt : nts) {
267     runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
268   }
269 }
270
271 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
272   int nts[] = { 3, 10 };
273
274   long seed = 0;
275   LOG(INFO) << "using seed " << seed;
276
277   int n = 1000;
278   for (int nt : nts) {
279     {
280       DSched sched(DSched::uniform(seed));
281       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
282     }
283     {
284       DSched sched(DSched::uniformSubset(seed, 2));
285       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
286     }
287   }
288 }
289
290 uint64_t nowMicro() {
291   timeval tv;
292   gettimeofday(&tv, 0);
293   return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
294 }
295
296 template <typename Q>
297 std::string producerConsumerBench(Q&& queue, std::string qName,
298                                   int numProducers, int numConsumers,
299                                   int numOps, bool ignoreContents = false) {
300   Q& q = queue;
301
302   struct rusage beginUsage;
303   getrusage(RUSAGE_SELF, &beginUsage);
304
305   auto beginMicro = nowMicro();
306
307   uint64_t n = numOps;
308   std::atomic<uint64_t> sum(0);
309
310   std::vector<std::thread> producers(numProducers);
311   for (int t = 0; t < numProducers; ++t) {
312     producers[t] = DSched::thread([&,t]{
313       for (int i = t; i < numOps; i += numProducers) {
314         q.blockingWrite(i);
315       }
316     });
317   }
318
319   std::vector<std::thread> consumers(numConsumers);
320   for (int t = 0; t < numConsumers; ++t) {
321     consumers[t] = DSched::thread([&,t]{
322       uint64_t localSum = 0;
323       for (int i = t; i < numOps; i += numConsumers) {
324         int dest = -1;
325         q.blockingRead(dest);
326         EXPECT_FALSE(dest == -1);
327         localSum += dest;
328       }
329       sum += localSum;
330     });
331   }
332
333   for (auto& t : producers) {
334     DSched::join(t);
335   }
336   for (auto& t : consumers) {
337     DSched::join(t);
338   }
339   if (!ignoreContents) {
340     EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
341   }
342
343   auto endMicro = nowMicro();
344
345   struct rusage endUsage;
346   getrusage(RUSAGE_SELF, &endUsage);
347
348   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
349   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
350       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
351
352   return folly::format(
353       "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff",
354       qName, numProducers, numConsumers, nanosPer, csw, n).str();
355 }
356
357
358 TEST(MPMCQueue, mt_prod_cons_deterministic) {
359   // we use the Bench method, but perf results are meaningless under DSched
360   DSched sched(DSched::uniform(0));
361
362   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
363           "", 1, 1, 1000);
364   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
365           "", 10, 10, 1000);
366   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
367           "", 1, 1, 1000);
368   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
369           "", 10, 10, 1000);
370   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1),
371           "", 10, 10, 1000);
372 }
373
374 #define PC_BENCH(q, np, nc, ...) \
375     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
376
377 TEST(MPMCQueue, mt_prod_cons) {
378   int n = 100000;
379   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n);
380   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n);
381   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n);
382   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n);
383   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n);
384   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n);
385   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n);
386   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n);
387   LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
388 }
389
390 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
391   int n = 100000;
392   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 1, n);
393   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 1, n);
394   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 10, n);
395   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 10, n);
396   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 1, n);
397   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 1, n);
398   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 10, n);
399   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 10, n);
400   LOG(INFO)
401     << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(100000)), 32, 100, n);
402 }
403
404 template <template<typename> class Atom>
405 void runNeverFailThread(
406     int numThreads,
407     int n, /*numOps*/
408     MPMCQueue<int, Atom>& cq,
409     std::atomic<uint64_t>& sum,
410     int t) {
411   uint64_t threadSum = 0;
412   for (int i = t; i < n; i += numThreads) {
413     // enq + deq
414     EXPECT_TRUE(cq.writeIfNotFull(i));
415
416     int dest = -1;
417     EXPECT_TRUE(cq.readIfNotEmpty(dest));
418     EXPECT_TRUE(dest >= 0);
419     threadSum += dest;
420   }
421   sum += threadSum;
422 }
423
424 template <template<typename> class Atom>
425 uint64_t runNeverFailTest(int numThreads, int numOps) {
426   // always #enq >= #deq
427   MPMCQueue<int,Atom> cq(numThreads);
428
429   uint64_t n = numOps;
430   auto beginMicro = nowMicro();
431
432   std::vector<std::thread> threads(numThreads);
433   std::atomic<uint64_t> sum(0);
434   for (int t = 0; t < numThreads; ++t) {
435     threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
436           numThreads, n, std::ref(cq), std::ref(sum), t));
437   }
438   for (auto& t : threads) {
439     DSched::join(t);
440   }
441   EXPECT_TRUE(cq.isEmpty());
442   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
443
444   return nowMicro() - beginMicro;
445 }
446
447 TEST(MPMCQueue, mt_never_fail) {
448   int nts[] = { 1, 3, 100 };
449
450   int n = 100000;
451   for (int nt : nts) {
452     uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
453     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
454               << nt << " threads";
455   }
456 }
457
458 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
459   int nts[] = { 1, 3, 100 };
460
461   int n = 100000;
462   for (int nt : nts) {
463     uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
464     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
465               << nt << " threads";
466   }
467 }
468
469 TEST(MPMCQueue, mt_never_fail_deterministic) {
470   int nts[] = { 3, 10 };
471
472   long seed = 0; // nowMicro() % 10000;
473   LOG(INFO) << "using seed " << seed;
474
475   int n = 1000;
476   for (int nt : nts) {
477     {
478       DSched sched(DSched::uniform(seed));
479       runNeverFailTest<DeterministicAtomic>(nt, n);
480     }
481     {
482       DSched sched(DSched::uniformSubset(seed, 2));
483       runNeverFailTest<DeterministicAtomic>(nt, n);
484     }
485   }
486 }
487
488 enum LifecycleEvent {
489   NOTHING = -1,
490   DEFAULT_CONSTRUCTOR,
491   COPY_CONSTRUCTOR,
492   MOVE_CONSTRUCTOR,
493   TWO_ARG_CONSTRUCTOR,
494   COPY_OPERATOR,
495   MOVE_OPERATOR,
496   DESTRUCTOR,
497   MAX_LIFECYCLE_EVENT
498 };
499
500 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
501 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
502
503 static int lc_outstanding() {
504   return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
505       lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
506       lc_counts[DESTRUCTOR];
507 }
508
509 static void lc_snap() {
510   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
511     lc_prev[i] = lc_counts[i];
512   }
513 }
514
515 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
516
517 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
518   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
519     int delta = i == what || i == what2 ? 1 : 0;
520     EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
521         << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
522         << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
523         << ", from line " << lineno;
524   }
525   lc_snap();
526 }
527
528 template <typename R>
529 struct Lifecycle {
530   typedef R IsRelocatable;
531
532   bool constructed;
533
534   Lifecycle() noexcept : constructed(true) {
535     ++lc_counts[DEFAULT_CONSTRUCTOR];
536   }
537
538   explicit Lifecycle(int /* n */, char const* /* s */) noexcept
539       : constructed(true) {
540     ++lc_counts[TWO_ARG_CONSTRUCTOR];
541   }
542
543   Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
544     ++lc_counts[COPY_CONSTRUCTOR];
545   }
546
547   Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
548     ++lc_counts[MOVE_CONSTRUCTOR];
549   }
550
551   Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
552     ++lc_counts[COPY_OPERATOR];
553     return *this;
554   }
555
556   Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
557     ++lc_counts[MOVE_OPERATOR];
558     return *this;
559   }
560
561   ~Lifecycle() noexcept {
562     ++lc_counts[DESTRUCTOR];
563     assert(lc_outstanding() >= 0);
564     assert(constructed);
565     constructed = false;
566   }
567 };
568
569 template <typename R>
570 void runPerfectForwardingTest() {
571   lc_snap();
572   EXPECT_EQ(lc_outstanding(), 0);
573
574   {
575     MPMCQueue<Lifecycle<R>> queue(50);
576     LIFECYCLE_STEP(NOTHING);
577
578     for (int pass = 0; pass < 10; ++pass) {
579       for (int i = 0; i < 10; ++i) {
580         queue.blockingWrite();
581         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
582
583         queue.blockingWrite(1, "one");
584         LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
585
586         {
587           Lifecycle<R> src;
588           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
589           queue.blockingWrite(std::move(src));
590           LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
591         }
592         LIFECYCLE_STEP(DESTRUCTOR);
593
594         {
595           Lifecycle<R> src;
596           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
597           queue.blockingWrite(src);
598           LIFECYCLE_STEP(COPY_CONSTRUCTOR);
599         }
600         LIFECYCLE_STEP(DESTRUCTOR);
601
602         EXPECT_TRUE(queue.write());
603         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
604       }
605
606       EXPECT_EQ(queue.size(), 50);
607       EXPECT_FALSE(queue.write(2, "two"));
608       LIFECYCLE_STEP(NOTHING);
609
610       for (int i = 0; i < 50; ++i) {
611         {
612           Lifecycle<R> node;
613           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
614
615           queue.blockingRead(node);
616           if (R::value) {
617             // relocatable, moved via memcpy
618             LIFECYCLE_STEP(DESTRUCTOR);
619           } else {
620             LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
621           }
622         }
623         LIFECYCLE_STEP(DESTRUCTOR);
624       }
625
626       EXPECT_EQ(queue.size(), 0);
627     }
628
629     // put one element back before destruction
630     {
631       Lifecycle<R> src(3, "three");
632       LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
633       queue.write(std::move(src));
634       LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
635     }
636     LIFECYCLE_STEP(DESTRUCTOR); // destroy src
637   }
638   LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
639
640   EXPECT_EQ(lc_outstanding(), 0);
641 }
642
643 TEST(MPMCQueue, perfect_forwarding) {
644   runPerfectForwardingTest<std::false_type>();
645 }
646
647 TEST(MPMCQueue, perfect_forwarding_relocatable) {
648   runPerfectForwardingTest<std::true_type>();
649 }
650
651 TEST(MPMCQueue, queue_moving) {
652   lc_snap();
653   EXPECT_EQ(lc_outstanding(), 0);
654
655   {
656     MPMCQueue<Lifecycle<std::false_type>> a(50);
657     LIFECYCLE_STEP(NOTHING);
658
659     a.blockingWrite();
660     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
661
662     // move constructor
663     MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
664     LIFECYCLE_STEP(NOTHING);
665     EXPECT_EQ(a.capacity(), 0);
666     EXPECT_EQ(a.size(), 0);
667     EXPECT_EQ(b.capacity(), 50);
668     EXPECT_EQ(b.size(), 1);
669
670     b.blockingWrite();
671     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
672
673     // move operator
674     MPMCQueue<Lifecycle<std::false_type>> c;
675     LIFECYCLE_STEP(NOTHING);
676     c = std::move(b);
677     LIFECYCLE_STEP(NOTHING);
678     EXPECT_EQ(c.capacity(), 50);
679     EXPECT_EQ(c.size(), 2);
680
681     {
682       Lifecycle<std::false_type> dst;
683       LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
684       c.blockingRead(dst);
685       LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
686
687       {
688         // swap
689         MPMCQueue<Lifecycle<std::false_type>> d(10);
690         LIFECYCLE_STEP(NOTHING);
691         std::swap(c, d);
692         LIFECYCLE_STEP(NOTHING);
693         EXPECT_EQ(c.capacity(), 10);
694         EXPECT_TRUE(c.isEmpty());
695         EXPECT_EQ(d.capacity(), 50);
696         EXPECT_EQ(d.size(), 1);
697
698         d.blockingRead(dst);
699         LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
700
701         c.blockingWrite(dst);
702         LIFECYCLE_STEP(COPY_CONSTRUCTOR);
703
704         d.blockingWrite(std::move(dst));
705         LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
706       } // d goes out of scope
707       LIFECYCLE_STEP(DESTRUCTOR);
708     } // dst goes out of scope
709     LIFECYCLE_STEP(DESTRUCTOR);
710   } // c goes out of scope
711   LIFECYCLE_STEP(DESTRUCTOR);
712 }
713
714 TEST(MPMCQueue, explicit_zero_capacity_fail) {
715   ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
716 }