Fix -Wsign-compare
[folly.git] / folly / test / MPMCQueueTest.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/test/DeterministicSchedule.h>
21
22 #include <boost/intrusive_ptr.hpp>
23 #include <memory>
24 #include <functional>
25 #include <thread>
26 #include <utility>
27 #include <unistd.h>
28 #include <sys/time.h>
29 #include <sys/resource.h>
30
31 #include <gflags/gflags.h>
32 #include <gtest/gtest.h>
33
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
35
36 using namespace folly;
37 using namespace detail;
38 using namespace test;
39
40 typedef DeterministicSchedule DSched;
41
42 template <template<typename> class Atom>
43 void run_mt_sequencer_thread(
44     int numThreads,
45     int numOps,
46     uint32_t init,
47     TurnSequencer<Atom>& seq,
48     Atom<int>& spinThreshold,
49     int& prev,
50     int i) {
51   for (int op = i; op < numOps; op += numThreads) {
52     seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
53     EXPECT_EQ(prev, op - 1);
54     prev = op;
55     seq.completeTurn(init + op);
56   }
57 }
58
59 template <template<typename> class Atom>
60 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
61   TurnSequencer<Atom> seq(init);
62   Atom<int> spinThreshold(0);
63
64   int prev = -1;
65   std::vector<std::thread> threads(numThreads);
66   for (int i = 0; i < numThreads; ++i) {
67     threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
68           numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
69           std::ref(prev), i));
70   }
71
72   for (auto& thr : threads) {
73     DSched::join(thr);
74   }
75
76   EXPECT_EQ(prev, numOps - 1);
77 }
78
79 TEST(MPMCQueue, sequencer) {
80   run_mt_sequencer_test<std::atomic>(1, 100, 0);
81   run_mt_sequencer_test<std::atomic>(2, 100000, -100);
82   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
83 }
84
85 TEST(MPMCQueue, sequencer_emulated_futex) {
86   run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
87   run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
88   run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
89 }
90
91 TEST(MPMCQueue, sequencer_deterministic) {
92   DSched sched(DSched::uniform(0));
93   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
94   run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
95   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
96 }
97
98 template <typename T>
99 void runElementTypeTest(T&& src) {
100   MPMCQueue<T> cq(10);
101   cq.blockingWrite(std::move(src));
102   T dest;
103   cq.blockingRead(dest);
104   EXPECT_TRUE(cq.write(std::move(dest)));
105   EXPECT_TRUE(cq.read(dest));
106 }
107
108 struct RefCounted {
109   static __thread int active_instances;
110
111   mutable std::atomic<int> rc;
112
113   RefCounted() : rc(0) {
114     ++active_instances;
115   }
116
117   ~RefCounted() {
118     --active_instances;
119   }
120 };
121 __thread int RefCounted::active_instances;
122
123
124 void intrusive_ptr_add_ref(RefCounted const* p) {
125   p->rc++;
126 }
127
128 void intrusive_ptr_release(RefCounted const* p) {
129   if (--(p->rc) == 0) {
130     delete p;
131   }
132 }
133
134 TEST(MPMCQueue, lots_of_element_types) {
135   runElementTypeTest(10);
136   runElementTypeTest(std::string("abc"));
137   runElementTypeTest(std::make_pair(10, std::string("def")));
138   runElementTypeTest(std::vector<std::string>{ { "abc" } });
139   runElementTypeTest(std::make_shared<char>('a'));
140   runElementTypeTest(folly::make_unique<char>('a'));
141   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
142   EXPECT_EQ(RefCounted::active_instances, 0);
143 }
144
145 TEST(MPMCQueue, single_thread_enqdeq) {
146   MPMCQueue<int> cq(10);
147
148   for (int pass = 0; pass < 10; ++pass) {
149     for (int i = 0; i < 10; ++i) {
150       EXPECT_TRUE(cq.write(i));
151     }
152     EXPECT_FALSE(cq.write(-1));
153     EXPECT_FALSE(cq.isEmpty());
154     EXPECT_EQ(cq.size(), 10);
155
156     for (int i = 0; i < 5; ++i) {
157       int dest = -1;
158       EXPECT_TRUE(cq.read(dest));
159       EXPECT_EQ(dest, i);
160     }
161     for (int i = 5; i < 10; ++i) {
162       int dest = -1;
163       cq.blockingRead(dest);
164       EXPECT_EQ(dest, i);
165     }
166     int dest = -1;
167     EXPECT_FALSE(cq.read(dest));
168     EXPECT_EQ(dest, -1);
169
170     EXPECT_TRUE(cq.isEmpty());
171     EXPECT_EQ(cq.size(), 0);
172   }
173 }
174
175 TEST(MPMCQueue, tryenq_capacity_test) {
176   for (size_t cap = 1; cap < 100; ++cap) {
177     MPMCQueue<int> cq(cap);
178     for (size_t i = 0; i < cap; ++i) {
179       EXPECT_TRUE(cq.write(i));
180     }
181     EXPECT_FALSE(cq.write(100));
182   }
183 }
184
185 TEST(MPMCQueue, enq_capacity_test) {
186   for (auto cap : { 1, 100, 10000 }) {
187     MPMCQueue<int> cq(cap);
188     for (int i = 0; i < cap; ++i) {
189       cq.blockingWrite(i);
190     }
191     int t = 0;
192     int when;
193     auto thr = std::thread([&]{
194       cq.blockingWrite(100);
195       when = t;
196     });
197     usleep(2000);
198     t = 1;
199     int dummy;
200     cq.blockingRead(dummy);
201     thr.join();
202     EXPECT_EQ(when, 1);
203   }
204 }
205
206 template <template<typename> class Atom>
207 void runTryEnqDeqThread(
208     int numThreads,
209     int n, /*numOps*/
210     MPMCQueue<int, Atom>& cq,
211     std::atomic<uint64_t>& sum,
212     int t) {
213   uint64_t threadSum = 0;
214   int src = t;
215   // received doesn't reflect any actual values, we just start with
216   // t and increment by numThreads to get the rounding of termination
217   // correct if numThreads doesn't evenly divide numOps
218   int received = t;
219   while (src < n || received < n) {
220     if (src < n && cq.write(src)) {
221       src += numThreads;
222     }
223
224     int dst;
225     if (received < n && cq.read(dst)) {
226       received += numThreads;
227       threadSum += dst;
228     }
229   }
230   sum += threadSum;
231 }
232
233 template <template<typename> class Atom>
234 void runTryEnqDeqTest(int numThreads, int numOps) {
235   // write and read aren't linearizable, so we don't have
236   // hard guarantees on their individual behavior.  We can still test
237   // correctness in aggregate
238   MPMCQueue<int,Atom> cq(numThreads);
239
240   uint64_t n = numOps;
241   std::vector<std::thread> threads(numThreads);
242   std::atomic<uint64_t> sum(0);
243   for (int t = 0; t < numThreads; ++t) {
244     threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
245           numThreads, n, std::ref(cq), std::ref(sum), t));
246   }
247   for (auto& t : threads) {
248     DSched::join(t);
249   }
250   EXPECT_TRUE(cq.isEmpty());
251   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
252 }
253
254 TEST(MPMCQueue, mt_try_enq_deq) {
255   int nts[] = { 1, 3, 100 };
256
257   int n = 100000;
258   for (int nt : nts) {
259     runTryEnqDeqTest<std::atomic>(nt, n);
260   }
261 }
262
263 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
264   int nts[] = { 1, 3, 100 };
265
266   int n = 100000;
267   for (int nt : nts) {
268     runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
269   }
270 }
271
272 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
273   int nts[] = { 3, 10 };
274
275   long seed = 0;
276   LOG(INFO) << "using seed " << seed;
277
278   int n = 1000;
279   for (int nt : nts) {
280     {
281       DSched sched(DSched::uniform(seed));
282       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
283     }
284     {
285       DSched sched(DSched::uniformSubset(seed, 2));
286       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
287     }
288   }
289 }
290
291 uint64_t nowMicro() {
292   timeval tv;
293   gettimeofday(&tv, 0);
294   return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
295 }
296
297 template <typename Q>
298 std::string producerConsumerBench(Q&& queue, std::string qName,
299                                   int numProducers, int numConsumers,
300                                   int numOps, bool ignoreContents = false) {
301   Q& q = queue;
302
303   struct rusage beginUsage;
304   getrusage(RUSAGE_SELF, &beginUsage);
305
306   auto beginMicro = nowMicro();
307
308   uint64_t n = numOps;
309   std::atomic<uint64_t> sum(0);
310
311   std::vector<std::thread> producers(numProducers);
312   for (int t = 0; t < numProducers; ++t) {
313     producers[t] = DSched::thread([&,t]{
314       for (int i = t; i < numOps; i += numProducers) {
315         q.blockingWrite(i);
316       }
317     });
318   }
319
320   std::vector<std::thread> consumers(numConsumers);
321   for (int t = 0; t < numConsumers; ++t) {
322     consumers[t] = DSched::thread([&,t]{
323       uint64_t localSum = 0;
324       for (int i = t; i < numOps; i += numConsumers) {
325         int dest = -1;
326         q.blockingRead(dest);
327         EXPECT_FALSE(dest == -1);
328         localSum += dest;
329       }
330       sum += localSum;
331     });
332   }
333
334   for (auto& t : producers) {
335     DSched::join(t);
336   }
337   for (auto& t : consumers) {
338     DSched::join(t);
339   }
340   if (!ignoreContents) {
341     EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
342   }
343
344   auto endMicro = nowMicro();
345
346   struct rusage endUsage;
347   getrusage(RUSAGE_SELF, &endUsage);
348
349   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
350   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
351       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
352
353   return folly::format(
354       "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff",
355       qName, numProducers, numConsumers, nanosPer, csw, n).str();
356 }
357
358
359 TEST(MPMCQueue, mt_prod_cons_deterministic) {
360   // we use the Bench method, but perf results are meaningless under DSched
361   DSched sched(DSched::uniform(0));
362
363   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
364           "", 1, 1, 1000);
365   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
366           "", 10, 10, 1000);
367   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
368           "", 1, 1, 1000);
369   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
370           "", 10, 10, 1000);
371   producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1),
372           "", 10, 10, 1000);
373 }
374
375 #define PC_BENCH(q, np, nc, ...) \
376     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
377
378 TEST(MPMCQueue, mt_prod_cons) {
379   int n = 100000;
380   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n);
381   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n);
382   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n);
383   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n);
384   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n);
385   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n);
386   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n);
387   LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n);
388   LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
389 }
390
391 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
392   int n = 100000;
393   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 1, n);
394   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 1, n);
395   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 10, n);
396   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 10, n);
397   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 1, n);
398   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 1, n);
399   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 10, n);
400   LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 10, n);
401   LOG(INFO)
402     << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(100000)), 32, 100, n);
403 }
404
405 template <template<typename> class Atom>
406 void runNeverFailThread(
407     int numThreads,
408     int n, /*numOps*/
409     MPMCQueue<int, Atom>& cq,
410     std::atomic<uint64_t>& sum,
411     int t) {
412   uint64_t threadSum = 0;
413   for (int i = t; i < n; i += numThreads) {
414     // enq + deq
415     EXPECT_TRUE(cq.writeIfNotFull(i));
416
417     int dest = -1;
418     EXPECT_TRUE(cq.readIfNotEmpty(dest));
419     EXPECT_TRUE(dest >= 0);
420     threadSum += dest;
421   }
422   sum += threadSum;
423 }
424
425 template <template<typename> class Atom>
426 uint64_t runNeverFailTest(int numThreads, int numOps) {
427   // always #enq >= #deq
428   MPMCQueue<int,Atom> cq(numThreads);
429
430   uint64_t n = numOps;
431   auto beginMicro = nowMicro();
432
433   std::vector<std::thread> threads(numThreads);
434   std::atomic<uint64_t> sum(0);
435   for (int t = 0; t < numThreads; ++t) {
436     threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
437           numThreads, n, std::ref(cq), std::ref(sum), t));
438   }
439   for (auto& t : threads) {
440     DSched::join(t);
441   }
442   EXPECT_TRUE(cq.isEmpty());
443   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
444
445   return nowMicro() - beginMicro;
446 }
447
448 TEST(MPMCQueue, mt_never_fail) {
449   int nts[] = { 1, 3, 100 };
450
451   int n = 100000;
452   for (int nt : nts) {
453     uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
454     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
455               << nt << " threads";
456   }
457 }
458
459 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
460   int nts[] = { 1, 3, 100 };
461
462   int n = 100000;
463   for (int nt : nts) {
464     uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
465     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
466               << nt << " threads";
467   }
468 }
469
470 TEST(MPMCQueue, mt_never_fail_deterministic) {
471   int nts[] = { 3, 10 };
472
473   long seed = 0; // nowMicro() % 10000;
474   LOG(INFO) << "using seed " << seed;
475
476   int n = 1000;
477   for (int nt : nts) {
478     {
479       DSched sched(DSched::uniform(seed));
480       runNeverFailTest<DeterministicAtomic>(nt, n);
481     }
482     {
483       DSched sched(DSched::uniformSubset(seed, 2));
484       runNeverFailTest<DeterministicAtomic>(nt, n);
485     }
486   }
487 }
488
489 enum LifecycleEvent {
490   NOTHING = -1,
491   DEFAULT_CONSTRUCTOR,
492   COPY_CONSTRUCTOR,
493   MOVE_CONSTRUCTOR,
494   TWO_ARG_CONSTRUCTOR,
495   COPY_OPERATOR,
496   MOVE_OPERATOR,
497   DESTRUCTOR,
498   MAX_LIFECYCLE_EVENT
499 };
500
501 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
502 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
503
504 static int lc_outstanding() {
505   return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
506       lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
507       lc_counts[DESTRUCTOR];
508 }
509
510 static void lc_snap() {
511   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
512     lc_prev[i] = lc_counts[i];
513   }
514 }
515
516 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
517
518 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
519   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
520     int delta = i == what || i == what2 ? 1 : 0;
521     EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
522         << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
523         << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
524         << ", from line " << lineno;
525   }
526   lc_snap();
527 }
528
529 template <typename R>
530 struct Lifecycle {
531   typedef R IsRelocatable;
532
533   bool constructed;
534
535   Lifecycle() noexcept : constructed(true) {
536     ++lc_counts[DEFAULT_CONSTRUCTOR];
537   }
538
539   explicit Lifecycle(int n, char const* s) noexcept : constructed(true) {
540     ++lc_counts[TWO_ARG_CONSTRUCTOR];
541   }
542
543   Lifecycle(const Lifecycle& rhs) noexcept : constructed(true) {
544     ++lc_counts[COPY_CONSTRUCTOR];
545   }
546
547   Lifecycle(Lifecycle&& rhs) noexcept : constructed(true) {
548     ++lc_counts[MOVE_CONSTRUCTOR];
549   }
550
551   Lifecycle& operator= (const Lifecycle& rhs) noexcept {
552     ++lc_counts[COPY_OPERATOR];
553     return *this;
554   }
555
556   Lifecycle& operator= (Lifecycle&& rhs) noexcept {
557     ++lc_counts[MOVE_OPERATOR];
558     return *this;
559   }
560
561   ~Lifecycle() noexcept {
562     ++lc_counts[DESTRUCTOR];
563     assert(lc_outstanding() >= 0);
564     assert(constructed);
565     constructed = false;
566   }
567 };
568
569 template <typename R>
570 void runPerfectForwardingTest() {
571   lc_snap();
572   EXPECT_EQ(lc_outstanding(), 0);
573
574   {
575     MPMCQueue<Lifecycle<R>> queue(50);
576     LIFECYCLE_STEP(NOTHING);
577
578     for (int pass = 0; pass < 10; ++pass) {
579       for (int i = 0; i < 10; ++i) {
580         queue.blockingWrite();
581         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
582
583         queue.blockingWrite(1, "one");
584         LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
585
586         {
587           Lifecycle<R> src;
588           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
589           queue.blockingWrite(std::move(src));
590           LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
591         }
592         LIFECYCLE_STEP(DESTRUCTOR);
593
594         {
595           Lifecycle<R> src;
596           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
597           queue.blockingWrite(src);
598           LIFECYCLE_STEP(COPY_CONSTRUCTOR);
599         }
600         LIFECYCLE_STEP(DESTRUCTOR);
601
602         EXPECT_TRUE(queue.write());
603         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
604       }
605
606       EXPECT_EQ(queue.size(), 50);
607       EXPECT_FALSE(queue.write(2, "two"));
608       LIFECYCLE_STEP(NOTHING);
609
610       for (int i = 0; i < 50; ++i) {
611         {
612           Lifecycle<R> node;
613           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
614
615           queue.blockingRead(node);
616           if (R::value) {
617             // relocatable, moved via memcpy
618             LIFECYCLE_STEP(DESTRUCTOR);
619           } else {
620             LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
621           }
622         }
623         LIFECYCLE_STEP(DESTRUCTOR);
624       }
625
626       EXPECT_EQ(queue.size(), 0);
627     }
628
629     // put one element back before destruction
630     {
631       Lifecycle<R> src(3, "three");
632       LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
633       queue.write(std::move(src));
634       LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
635     }
636     LIFECYCLE_STEP(DESTRUCTOR); // destroy src
637   }
638   LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
639
640   EXPECT_EQ(lc_outstanding(), 0);
641 }
642
643 TEST(MPMCQueue, perfect_forwarding) {
644   runPerfectForwardingTest<std::false_type>();
645 }
646
647 TEST(MPMCQueue, perfect_forwarding_relocatable) {
648   runPerfectForwardingTest<std::true_type>();
649 }
650
651 TEST(MPMCQueue, queue_moving) {
652   lc_snap();
653   EXPECT_EQ(lc_outstanding(), 0);
654
655   {
656     MPMCQueue<Lifecycle<std::false_type>> a(50);
657     LIFECYCLE_STEP(NOTHING);
658
659     a.blockingWrite();
660     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
661
662     // move constructor
663     MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
664     LIFECYCLE_STEP(NOTHING);
665     EXPECT_EQ(a.capacity(), 0);
666     EXPECT_EQ(a.size(), 0);
667     EXPECT_EQ(b.capacity(), 50);
668     EXPECT_EQ(b.size(), 1);
669
670     b.blockingWrite();
671     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
672
673     // move operator
674     MPMCQueue<Lifecycle<std::false_type>> c;
675     LIFECYCLE_STEP(NOTHING);
676     c = std::move(b);
677     LIFECYCLE_STEP(NOTHING);
678     EXPECT_EQ(c.capacity(), 50);
679     EXPECT_EQ(c.size(), 2);
680
681     {
682       Lifecycle<std::false_type> dst;
683       LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
684       c.blockingRead(dst);
685       LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
686
687       {
688         // swap
689         MPMCQueue<Lifecycle<std::false_type>> d(10);
690         LIFECYCLE_STEP(NOTHING);
691         std::swap(c, d);
692         LIFECYCLE_STEP(NOTHING);
693         EXPECT_EQ(c.capacity(), 10);
694         EXPECT_TRUE(c.isEmpty());
695         EXPECT_EQ(d.capacity(), 50);
696         EXPECT_EQ(d.size(), 1);
697
698         d.blockingRead(dst);
699         LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
700
701         c.blockingWrite(dst);
702         LIFECYCLE_STEP(COPY_CONSTRUCTOR);
703
704         d.blockingWrite(std::move(dst));
705         LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
706       } // d goes out of scope
707       LIFECYCLE_STEP(DESTRUCTOR);
708     } // dst goes out of scope
709     LIFECYCLE_STEP(DESTRUCTOR);
710   } // c goes out of scope
711   LIFECYCLE_STEP(DESTRUCTOR);
712 }
713
714 int main(int argc, char ** argv) {
715   testing::InitGoogleTest(&argc, argv);
716   gflags::ParseCommandLineFlags(&argc, &argv, true);
717   return RUN_ALL_TESTS();
718 }