folly copyright 2015 -> copyright 2016
[folly.git] / folly / test / MPMCQueueTest.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/test/DeterministicSchedule.h>
21
22 #include <boost/intrusive_ptr.hpp>
23 #include <memory>
24 #include <functional>
25 #include <thread>
26 #include <utility>
27 #include <unistd.h>
28 #include <sys/time.h>
29 #include <sys/resource.h>
30
31 #include <gflags/gflags.h>
32 #include <gtest/gtest.h>
33
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
35
36 using namespace folly;
37 using namespace detail;
38 using namespace test;
39
40 typedef DeterministicSchedule DSched;
41
42 template <template<typename> class Atom>
43 void run_mt_sequencer_thread(
44     int numThreads,
45     int numOps,
46     uint32_t init,
47     TurnSequencer<Atom>& seq,
48     Atom<uint32_t>& spinThreshold,
49     int& prev,
50     int i) {
51   for (int op = i; op < numOps; op += numThreads) {
52     seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
53     EXPECT_EQ(prev, op - 1);
54     prev = op;
55     seq.completeTurn(init + op);
56   }
57 }
58
59 template <template<typename> class Atom>
60 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
61   TurnSequencer<Atom> seq(init);
62   Atom<uint32_t> spinThreshold(0);
63
64   int prev = -1;
65   std::vector<std::thread> threads(numThreads);
66   for (int i = 0; i < numThreads; ++i) {
67     threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
68           numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
69           std::ref(prev), i));
70   }
71
72   for (auto& thr : threads) {
73     DSched::join(thr);
74   }
75
76   EXPECT_EQ(prev, numOps - 1);
77 }
78
79 TEST(MPMCQueue, sequencer) {
80   run_mt_sequencer_test<std::atomic>(1, 100, 0);
81   run_mt_sequencer_test<std::atomic>(2, 100000, -100);
82   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
83 }
84
85 TEST(MPMCQueue, sequencer_emulated_futex) {
86   run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
87   run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
88   run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
89 }
90
91 TEST(MPMCQueue, sequencer_deterministic) {
92   DSched sched(DSched::uniform(0));
93   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
94   run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
95   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
96 }
97
98 template <typename T>
99 void runElementTypeTest(T&& src) {
100   MPMCQueue<T> cq(10);
101   cq.blockingWrite(std::move(src));
102   T dest;
103   cq.blockingRead(dest);
104   EXPECT_TRUE(cq.write(std::move(dest)));
105   EXPECT_TRUE(cq.read(dest));
106 }
107
108 struct RefCounted {
109   static __thread int active_instances;
110
111   mutable std::atomic<int> rc;
112
113   RefCounted() : rc(0) {
114     ++active_instances;
115   }
116
117   ~RefCounted() {
118     --active_instances;
119   }
120 };
121 __thread int RefCounted::active_instances;
122
123
124 void intrusive_ptr_add_ref(RefCounted const* p) {
125   p->rc++;
126 }
127
128 void intrusive_ptr_release(RefCounted const* p) {
129   if (--(p->rc) == 0) {
130     delete p;
131   }
132 }
133
134 TEST(MPMCQueue, lots_of_element_types) {
135   runElementTypeTest(10);
136   runElementTypeTest(std::string("abc"));
137   runElementTypeTest(std::make_pair(10, std::string("def")));
138   runElementTypeTest(std::vector<std::string>{ { "abc" } });
139   runElementTypeTest(std::make_shared<char>('a'));
140   runElementTypeTest(folly::make_unique<char>('a'));
141   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
142   EXPECT_EQ(RefCounted::active_instances, 0);
143 }
144
145 TEST(MPMCQueue, single_thread_enqdeq) {
146   MPMCQueue<int> cq(10);
147
148   for (int pass = 0; pass < 10; ++pass) {
149     for (int i = 0; i < 10; ++i) {
150       EXPECT_TRUE(cq.write(i));
151     }
152     EXPECT_FALSE(cq.write(-1));
153     EXPECT_FALSE(cq.isEmpty());
154     EXPECT_EQ(cq.size(), 10);
155
156     for (int i = 0; i < 5; ++i) {
157       int dest = -1;
158       EXPECT_TRUE(cq.read(dest));
159       EXPECT_EQ(dest, i);
160     }
161     for (int i = 5; i < 10; ++i) {
162       int dest = -1;
163       cq.blockingRead(dest);
164       EXPECT_EQ(dest, i);
165     }
166     int dest = -1;
167     EXPECT_FALSE(cq.read(dest));
168     EXPECT_EQ(dest, -1);
169
170     EXPECT_TRUE(cq.isEmpty());
171     EXPECT_EQ(cq.size(), 0);
172   }
173 }
174
175 TEST(MPMCQueue, tryenq_capacity_test) {
176   for (size_t cap = 1; cap < 100; ++cap) {
177     MPMCQueue<int> cq(cap);
178     for (size_t i = 0; i < cap; ++i) {
179       EXPECT_TRUE(cq.write(i));
180     }
181     EXPECT_FALSE(cq.write(100));
182   }
183 }
184
185 TEST(MPMCQueue, enq_capacity_test) {
186   for (auto cap : { 1, 100, 10000 }) {
187     MPMCQueue<int> cq(cap);
188     for (int i = 0; i < cap; ++i) {
189       cq.blockingWrite(i);
190     }
191     int t = 0;
192     int when;
193     auto thr = std::thread([&]{
194       cq.blockingWrite(100);
195       when = t;
196     });
197     usleep(2000);
198     t = 1;
199     int dummy;
200     cq.blockingRead(dummy);
201     thr.join();
202     EXPECT_EQ(when, 1);
203   }
204 }
205
206 template <template<typename> class Atom>
207 void runTryEnqDeqThread(
208     int numThreads,
209     int n, /*numOps*/
210     MPMCQueue<int, Atom>& cq,
211     std::atomic<uint64_t>& sum,
212     int t) {
213   uint64_t threadSum = 0;
214   int src = t;
215   // received doesn't reflect any actual values, we just start with
216   // t and increment by numThreads to get the rounding of termination
217   // correct if numThreads doesn't evenly divide numOps
218   int received = t;
219   while (src < n || received < n) {
220     if (src < n && cq.write(src)) {
221       src += numThreads;
222     }
223
224     int dst;
225     if (received < n && cq.read(dst)) {
226       received += numThreads;
227       threadSum += dst;
228     }
229   }
230   sum += threadSum;
231 }
232
233 template <template<typename> class Atom>
234 void runTryEnqDeqTest(int numThreads, int numOps) {
235   // write and read aren't linearizable, so we don't have
236   // hard guarantees on their individual behavior.  We can still test
237   // correctness in aggregate
238   MPMCQueue<int,Atom> cq(numThreads);
239
240   uint64_t n = numOps;
241   std::vector<std::thread> threads(numThreads);
242   std::atomic<uint64_t> sum(0);
243   for (int t = 0; t < numThreads; ++t) {
244     threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
245           numThreads, n, std::ref(cq), std::ref(sum), t));
246   }
247   for (auto& t : threads) {
248     DSched::join(t);
249   }
250   EXPECT_TRUE(cq.isEmpty());
251   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
252 }
253
254 TEST(MPMCQueue, mt_try_enq_deq) {
255   int nts[] = { 1, 3, 100 };
256
257   int n = 100000;
258   for (int nt : nts) {
259     runTryEnqDeqTest<std::atomic>(nt, n);
260   }
261 }
262
263 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
264   int nts[] = { 1, 3, 100 };
265
266   int n = 100000;
267   for (int nt : nts) {
268     runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
269   }
270 }
271
272 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
273   int nts[] = { 3, 10 };
274
275   long seed = 0;
276   LOG(INFO) << "using seed " << seed;
277
278   int n = 1000;
279   for (int nt : nts) {
280     {
281       DSched sched(DSched::uniform(seed));
282       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
283     }
284     {
285       DSched sched(DSched::uniformSubset(seed, 2));
286       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
287     }
288   }
289 }
290
291 uint64_t nowMicro() {
292   timeval tv;
293   gettimeofday(&tv, 0);
294   return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
295 }
296
297 template <typename Q>
298 std::string producerConsumerBench(Q&& queue, std::string qName,
299                                   int numProducers, int numConsumers,
300                                   int numOps, bool ignoreContents = false) {
301   Q& q = queue;
302
303   struct rusage beginUsage;
304   getrusage(RUSAGE_SELF, &beginUsage);
305
306   auto beginMicro = nowMicro();
307
308   uint64_t n = numOps;
309   std::atomic<uint64_t> sum(0);
310
311   std::vector<std::thread> producers(numProducers);
312   for (int t = 0; t < numProducers; ++t) {
313     producers[t] = DSched::thread([&,t]{
314       for (int i = t; i < numOps; i += numProducers) {
315         q.blockingWrite(i);
316       }
317     });
318   }
319
320   std::vector<std::thread> consumers(numConsumers);
321   for (int t = 0; t < numConsumers; ++t) {
322     consumers[t] = DSched::thread([&,t]{
323       uint64_t localSum = 0;
324       for (int i = t; i < numOps; i += numConsumers) {
325         int dest = -1;
326         q.blockingRead(dest);
327         EXPECT_FALSE(dest == -1);
328         localSum += dest;
329       }
330       sum += localSum;
331     });
332   }
333
334   for (auto& t : producers) {
335     DSched::join(t);
336   }
337   for (auto& t : consumers) {
338     DSched::join(t);
339   }
340   if (!ignoreContents) {
341     EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
342   }
343
344   auto endMicro = nowMicro();
345
346   struct rusage endUsage;
347   getrusage(RUSAGE_SELF, &endUsage);
348
349   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
350   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
351       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
352
353   return folly::format(
354       "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff",
355       qName, numProducers, numConsumers, nanosPer, csw, n).str();
356 }
357
358
359 TEST(MPMCQueue, mt_prod_cons_deterministic) {
360   // we use the Bench method, but perf results are meaningless under DSched
361   DSched sched(DSched::uniform(0));
362
363   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
364           "", 1, 1, 1000);
365   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
366           "", 10, 10, 1000);
367   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
368           "", 1, 1, 1000);
369   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
370           "", 10, 10, 1000);
371   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1),
372           "", 10, 10, 1000);
373 }
374
375 #define PC_BENCH(q, np, nc, ...) \
376     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
377
378 TEST(MPMCQueue, mt_prod_cons) {
379   int n = 100000;
380   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n);
381   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n);
382   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n);
383   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n);
384   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n);
385   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n);
386   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n);
387   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n);
388   LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
389 }
390
391 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
392   int n = 100000;
393   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 1, n);
394   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 1, n);
395   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 10, n);
396   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 10, n);
397   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 1, n);
398   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 1, n);
399   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 10, n);
400   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 10, n);
401   LOG(INFO)
402     << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(100000)), 32, 100, n);
403 }
404
405 template <template<typename> class Atom>
406 void runNeverFailThread(
407     int numThreads,
408     int n, /*numOps*/
409     MPMCQueue<int, Atom>& cq,
410     std::atomic<uint64_t>& sum,
411     int t) {
412   uint64_t threadSum = 0;
413   for (int i = t; i < n; i += numThreads) {
414     // enq + deq
415     EXPECT_TRUE(cq.writeIfNotFull(i));
416
417     int dest = -1;
418     EXPECT_TRUE(cq.readIfNotEmpty(dest));
419     EXPECT_TRUE(dest >= 0);
420     threadSum += dest;
421   }
422   sum += threadSum;
423 }
424
425 template <template<typename> class Atom>
426 uint64_t runNeverFailTest(int numThreads, int numOps) {
427   // always #enq >= #deq
428   MPMCQueue<int,Atom> cq(numThreads);
429
430   uint64_t n = numOps;
431   auto beginMicro = nowMicro();
432
433   std::vector<std::thread> threads(numThreads);
434   std::atomic<uint64_t> sum(0);
435   for (int t = 0; t < numThreads; ++t) {
436     threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
437           numThreads, n, std::ref(cq), std::ref(sum), t));
438   }
439   for (auto& t : threads) {
440     DSched::join(t);
441   }
442   EXPECT_TRUE(cq.isEmpty());
443   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
444
445   return nowMicro() - beginMicro;
446 }
447
448 TEST(MPMCQueue, mt_never_fail) {
449   int nts[] = { 1, 3, 100 };
450
451   int n = 100000;
452   for (int nt : nts) {
453     uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
454     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
455               << nt << " threads";
456   }
457 }
458
459 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
460   int nts[] = { 1, 3, 100 };
461
462   int n = 100000;
463   for (int nt : nts) {
464     uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
465     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
466               << nt << " threads";
467   }
468 }
469
470 TEST(MPMCQueue, mt_never_fail_deterministic) {
471   int nts[] = { 3, 10 };
472
473   long seed = 0; // nowMicro() % 10000;
474   LOG(INFO) << "using seed " << seed;
475
476   int n = 1000;
477   for (int nt : nts) {
478     {
479       DSched sched(DSched::uniform(seed));
480       runNeverFailTest<DeterministicAtomic>(nt, n);
481     }
482     {
483       DSched sched(DSched::uniformSubset(seed, 2));
484       runNeverFailTest<DeterministicAtomic>(nt, n);
485     }
486   }
487 }
488
489 enum LifecycleEvent {
490   NOTHING = -1,
491   DEFAULT_CONSTRUCTOR,
492   COPY_CONSTRUCTOR,
493   MOVE_CONSTRUCTOR,
494   TWO_ARG_CONSTRUCTOR,
495   COPY_OPERATOR,
496   MOVE_OPERATOR,
497   DESTRUCTOR,
498   MAX_LIFECYCLE_EVENT
499 };
500
501 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
502 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
503
504 static int lc_outstanding() {
505   return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
506       lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
507       lc_counts[DESTRUCTOR];
508 }
509
510 static void lc_snap() {
511   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
512     lc_prev[i] = lc_counts[i];
513   }
514 }
515
516 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
517
518 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
519   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
520     int delta = i == what || i == what2 ? 1 : 0;
521     EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
522         << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
523         << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
524         << ", from line " << lineno;
525   }
526   lc_snap();
527 }
528
529 template <typename R>
530 struct Lifecycle {
531   typedef R IsRelocatable;
532
533   bool constructed;
534
535   Lifecycle() noexcept : constructed(true) {
536     ++lc_counts[DEFAULT_CONSTRUCTOR];
537   }
538
539   explicit Lifecycle(int /* n */, char const* /* s */) noexcept
540       : constructed(true) {
541     ++lc_counts[TWO_ARG_CONSTRUCTOR];
542   }
543
544   Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
545     ++lc_counts[COPY_CONSTRUCTOR];
546   }
547
548   Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
549     ++lc_counts[MOVE_CONSTRUCTOR];
550   }
551
552   Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
553     ++lc_counts[COPY_OPERATOR];
554     return *this;
555   }
556
557   Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
558     ++lc_counts[MOVE_OPERATOR];
559     return *this;
560   }
561
562   ~Lifecycle() noexcept {
563     ++lc_counts[DESTRUCTOR];
564     assert(lc_outstanding() >= 0);
565     assert(constructed);
566     constructed = false;
567   }
568 };
569
570 template <typename R>
571 void runPerfectForwardingTest() {
572   lc_snap();
573   EXPECT_EQ(lc_outstanding(), 0);
574
575   {
576     MPMCQueue<Lifecycle<R>> queue(50);
577     LIFECYCLE_STEP(NOTHING);
578
579     for (int pass = 0; pass < 10; ++pass) {
580       for (int i = 0; i < 10; ++i) {
581         queue.blockingWrite();
582         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
583
584         queue.blockingWrite(1, "one");
585         LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
586
587         {
588           Lifecycle<R> src;
589           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
590           queue.blockingWrite(std::move(src));
591           LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
592         }
593         LIFECYCLE_STEP(DESTRUCTOR);
594
595         {
596           Lifecycle<R> src;
597           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
598           queue.blockingWrite(src);
599           LIFECYCLE_STEP(COPY_CONSTRUCTOR);
600         }
601         LIFECYCLE_STEP(DESTRUCTOR);
602
603         EXPECT_TRUE(queue.write());
604         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
605       }
606
607       EXPECT_EQ(queue.size(), 50);
608       EXPECT_FALSE(queue.write(2, "two"));
609       LIFECYCLE_STEP(NOTHING);
610
611       for (int i = 0; i < 50; ++i) {
612         {
613           Lifecycle<R> node;
614           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
615
616           queue.blockingRead(node);
617           if (R::value) {
618             // relocatable, moved via memcpy
619             LIFECYCLE_STEP(DESTRUCTOR);
620           } else {
621             LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
622           }
623         }
624         LIFECYCLE_STEP(DESTRUCTOR);
625       }
626
627       EXPECT_EQ(queue.size(), 0);
628     }
629
630     // put one element back before destruction
631     {
632       Lifecycle<R> src(3, "three");
633       LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
634       queue.write(std::move(src));
635       LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
636     }
637     LIFECYCLE_STEP(DESTRUCTOR); // destroy src
638   }
639   LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
640
641   EXPECT_EQ(lc_outstanding(), 0);
642 }
643
644 TEST(MPMCQueue, perfect_forwarding) {
645   runPerfectForwardingTest<std::false_type>();
646 }
647
648 TEST(MPMCQueue, perfect_forwarding_relocatable) {
649   runPerfectForwardingTest<std::true_type>();
650 }
651
652 TEST(MPMCQueue, queue_moving) {
653   lc_snap();
654   EXPECT_EQ(lc_outstanding(), 0);
655
656   {
657     MPMCQueue<Lifecycle<std::false_type>> a(50);
658     LIFECYCLE_STEP(NOTHING);
659
660     a.blockingWrite();
661     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
662
663     // move constructor
664     MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
665     LIFECYCLE_STEP(NOTHING);
666     EXPECT_EQ(a.capacity(), 0);
667     EXPECT_EQ(a.size(), 0);
668     EXPECT_EQ(b.capacity(), 50);
669     EXPECT_EQ(b.size(), 1);
670
671     b.blockingWrite();
672     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
673
674     // move operator
675     MPMCQueue<Lifecycle<std::false_type>> c;
676     LIFECYCLE_STEP(NOTHING);
677     c = std::move(b);
678     LIFECYCLE_STEP(NOTHING);
679     EXPECT_EQ(c.capacity(), 50);
680     EXPECT_EQ(c.size(), 2);
681
682     {
683       Lifecycle<std::false_type> dst;
684       LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
685       c.blockingRead(dst);
686       LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
687
688       {
689         // swap
690         MPMCQueue<Lifecycle<std::false_type>> d(10);
691         LIFECYCLE_STEP(NOTHING);
692         std::swap(c, d);
693         LIFECYCLE_STEP(NOTHING);
694         EXPECT_EQ(c.capacity(), 10);
695         EXPECT_TRUE(c.isEmpty());
696         EXPECT_EQ(d.capacity(), 50);
697         EXPECT_EQ(d.size(), 1);
698
699         d.blockingRead(dst);
700         LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
701
702         c.blockingWrite(dst);
703         LIFECYCLE_STEP(COPY_CONSTRUCTOR);
704
705         d.blockingWrite(std::move(dst));
706         LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
707       } // d goes out of scope
708       LIFECYCLE_STEP(DESTRUCTOR);
709     } // dst goes out of scope
710     LIFECYCLE_STEP(DESTRUCTOR);
711   } // c goes out of scope
712   LIFECYCLE_STEP(DESTRUCTOR);
713 }
714
715 TEST(MPMCQueue, explicit_zero_capacity_fail) {
716   ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
717 }
718
719
720 int main(int argc, char ** argv) {
721   testing::InitGoogleTest(&argc, argv);
722   gflags::ParseCommandLineFlags(&argc, &argv, true);
723   return RUN_ALL_TESTS();
724 }