Codemod folly::make_unique to std::make_unique
[folly.git] / folly / test / MPMCQueueTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/Format.h>
18 #include <folly/MPMCQueue.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/GTest.h>
21 #include <folly/portability/SysResource.h>
22 #include <folly/portability/SysTime.h>
23 #include <folly/portability/Unistd.h>
24 #include <folly/stop_watch.h>
25 #include <folly/test/DeterministicSchedule.h>
26
27 #include <boost/intrusive_ptr.hpp>
28 #include <boost/thread/barrier.hpp>
29 #include <functional>
30 #include <memory>
31 #include <thread>
32 #include <utility>
33
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
35
36 using namespace folly;
37 using namespace detail;
38 using namespace test;
39 using std::chrono::time_point;
40 using std::chrono::steady_clock;
41 using std::chrono::seconds;
42 using std::chrono::milliseconds;
43 using std::string;
44 using std::unique_ptr;
45 using std::vector;
46
47 typedef DeterministicSchedule DSched;
48
49 template <template<typename> class Atom>
50 void run_mt_sequencer_thread(
51     int numThreads,
52     int numOps,
53     uint32_t init,
54     TurnSequencer<Atom>& seq,
55     Atom<uint32_t>& spinThreshold,
56     int& prev,
57     int i) {
58   for (int op = i; op < numOps; op += numThreads) {
59     seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
60     EXPECT_EQ(prev, op - 1);
61     prev = op;
62     seq.completeTurn(init + op);
63   }
64 }
65
66 template <template<typename> class Atom>
67 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
68   TurnSequencer<Atom> seq(init);
69   Atom<uint32_t> spinThreshold(0);
70
71   int prev = -1;
72   vector<std::thread> threads(numThreads);
73   for (int i = 0; i < numThreads; ++i) {
74     threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
75           numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
76           std::ref(prev), i));
77   }
78
79   for (auto& thr : threads) {
80     DSched::join(thr);
81   }
82
83   EXPECT_EQ(prev, numOps - 1);
84 }
85
86 TEST(MPMCQueue, sequencer) {
87   run_mt_sequencer_test<std::atomic>(1, 100, 0);
88   run_mt_sequencer_test<std::atomic>(2, 100000, -100);
89   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
90 }
91
92 TEST(MPMCQueue, sequencer_emulated_futex) {
93   run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
94   run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
95   run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
96 }
97
98 TEST(MPMCQueue, sequencer_deterministic) {
99   DSched sched(DSched::uniform(0));
100   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
101   run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
102   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
103 }
104
105 template <bool Dynamic = false, typename T>
106 void runElementTypeTest(T&& src) {
107   MPMCQueue<T, std::atomic, Dynamic> cq(10);
108   cq.blockingWrite(std::forward<T>(src));
109   T dest;
110   cq.blockingRead(dest);
111   EXPECT_TRUE(cq.write(std::move(dest)));
112   EXPECT_TRUE(cq.read(dest));
113   auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
114   EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
115   EXPECT_TRUE(cq.read(dest));
116   auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
117   EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
118   EXPECT_TRUE(cq.read(dest));
119 }
120
121 struct RefCounted {
122   static FOLLY_TLS int active_instances;
123
124   mutable std::atomic<int> rc;
125
126   RefCounted() : rc(0) {
127     ++active_instances;
128   }
129
130   ~RefCounted() {
131     --active_instances;
132   }
133 };
134 FOLLY_TLS int RefCounted::active_instances;
135
136 void intrusive_ptr_add_ref(RefCounted const* p) {
137   p->rc++;
138 }
139
140 void intrusive_ptr_release(RefCounted const* p) {
141   if (--(p->rc) == 0) {
142     delete p;
143   }
144 }
145
146 TEST(MPMCQueue, lots_of_element_types) {
147   runElementTypeTest(10);
148   runElementTypeTest(string("abc"));
149   runElementTypeTest(std::make_pair(10, string("def")));
150   runElementTypeTest(vector<string>{{"abc"}});
151   runElementTypeTest(std::make_shared<char>('a'));
152   runElementTypeTest(std::make_unique<char>('a'));
153   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
154   EXPECT_EQ(RefCounted::active_instances, 0);
155 }
156
157 TEST(MPMCQueue, lots_of_element_types_dynamic) {
158   runElementTypeTest<true>(10);
159   runElementTypeTest<true>(string("abc"));
160   runElementTypeTest<true>(std::make_pair(10, string("def")));
161   runElementTypeTest<true>(vector<string>{{"abc"}});
162   runElementTypeTest<true>(std::make_shared<char>('a'));
163   runElementTypeTest<true>(std::make_unique<char>('a'));
164   runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
165   EXPECT_EQ(RefCounted::active_instances, 0);
166 }
167
168 TEST(MPMCQueue, single_thread_enqdeq) {
169   // Non-dynamic version only.
170   // False positive for dynamic version. Capacity can be temporarily
171   // higher than specified.
172   MPMCQueue<int> cq(10);
173
174   for (int pass = 0; pass < 10; ++pass) {
175     for (int i = 0; i < 10; ++i) {
176       EXPECT_TRUE(cq.write(i));
177     }
178     EXPECT_FALSE(cq.write(-1));
179     EXPECT_FALSE(cq.isEmpty());
180     EXPECT_EQ(cq.size(), 10);
181
182     for (int i = 0; i < 5; ++i) {
183       int dest = -1;
184       EXPECT_TRUE(cq.read(dest));
185       EXPECT_EQ(dest, i);
186     }
187     for (int i = 5; i < 10; ++i) {
188       int dest = -1;
189       cq.blockingRead(dest);
190       EXPECT_EQ(dest, i);
191     }
192     int dest = -1;
193     EXPECT_FALSE(cq.read(dest));
194     EXPECT_EQ(dest, -1);
195
196     EXPECT_TRUE(cq.isEmpty());
197     EXPECT_EQ(cq.size(), 0);
198   }
199 }
200
201 TEST(MPMCQueue, tryenq_capacity_test) {
202   // Non-dynamic version only.
203   // False positive for dynamic version. Capacity can be temporarily
204   // higher than specified.
205   for (size_t cap = 1; cap < 100; ++cap) {
206     MPMCQueue<int> cq(cap);
207     for (size_t i = 0; i < cap; ++i) {
208       EXPECT_TRUE(cq.write(i));
209     }
210     EXPECT_FALSE(cq.write(100));
211   }
212 }
213
214 TEST(MPMCQueue, enq_capacity_test) {
215   // Non-dynamic version only.
216   // False positive for dynamic version. Capacity can be temporarily
217   // higher than specified.
218   for (auto cap : { 1, 100, 10000 }) {
219     MPMCQueue<int> cq(cap);
220     for (int i = 0; i < cap; ++i) {
221       cq.blockingWrite(i);
222     }
223     int t = 0;
224     int when;
225     auto thr = std::thread([&]{
226       cq.blockingWrite(100);
227       when = t;
228     });
229     usleep(2000);
230     t = 1;
231     int dummy;
232     cq.blockingRead(dummy);
233     thr.join();
234     EXPECT_EQ(when, 1);
235   }
236 }
237
238 template <template<typename> class Atom, bool Dynamic = false>
239 void runTryEnqDeqThread(
240     int numThreads,
241     int n, /*numOps*/
242     MPMCQueue<int, Atom, Dynamic>& cq,
243     std::atomic<uint64_t>& sum,
244     int t) {
245   uint64_t threadSum = 0;
246   int src = t;
247   // received doesn't reflect any actual values, we just start with
248   // t and increment by numThreads to get the rounding of termination
249   // correct if numThreads doesn't evenly divide numOps
250   int received = t;
251   while (src < n || received < n) {
252     if (src < n && cq.write(src)) {
253       src += numThreads;
254     }
255
256     int dst;
257     if (received < n && cq.read(dst)) {
258       received += numThreads;
259       threadSum += dst;
260     }
261   }
262   sum += threadSum;
263 }
264
265 template <template<typename> class Atom, bool Dynamic = false>
266 void runTryEnqDeqTest(int numThreads, int numOps) {
267   // write and read aren't linearizable, so we don't have
268   // hard guarantees on their individual behavior.  We can still test
269   // correctness in aggregate
270   MPMCQueue<int,Atom, Dynamic> cq(numThreads);
271
272   uint64_t n = numOps;
273   vector<std::thread> threads(numThreads);
274   std::atomic<uint64_t> sum(0);
275   for (int t = 0; t < numThreads; ++t) {
276     threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
277           numThreads, n, std::ref(cq), std::ref(sum), t));
278   }
279   for (auto& t : threads) {
280     DSched::join(t);
281   }
282   EXPECT_TRUE(cq.isEmpty());
283   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
284 }
285
286 TEST(MPMCQueue, mt_try_enq_deq) {
287   int nts[] = { 1, 3, 100 };
288
289   int n = 100000;
290   for (int nt : nts) {
291     runTryEnqDeqTest<std::atomic>(nt, n);
292   }
293 }
294
295 TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
296   int nts[] = { 1, 3, 100 };
297
298   int n = 100000;
299   for (int nt : nts) {
300     runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
301   }
302 }
303
304 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
305   int nts[] = { 1, 3, 100 };
306
307   int n = 100000;
308   for (int nt : nts) {
309     runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
310   }
311 }
312
313 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
314   int nts[] = { 1, 3, 100 };
315
316   int n = 100000;
317   for (int nt : nts) {
318     runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
319   }
320 }
321
322 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
323   int nts[] = { 3, 10 };
324
325   long seed = 0;
326   LOG(INFO) << "using seed " << seed;
327
328   int n = 1000;
329   for (int nt : nts) {
330     {
331       DSched sched(DSched::uniform(seed));
332       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
333     }
334     {
335       DSched sched(DSched::uniformSubset(seed, 2));
336       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
337     }
338     {
339       DSched sched(DSched::uniform(seed));
340       runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
341     }
342     {
343       DSched sched(DSched::uniformSubset(seed, 2));
344       runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
345     }
346   }
347 }
348
349 uint64_t nowMicro() {
350   timeval tv;
351   gettimeofday(&tv, 0);
352   return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
353 }
354
355 template <typename Q>
356 struct WriteMethodCaller {
357   WriteMethodCaller() {}
358   virtual ~WriteMethodCaller() = default;
359   virtual bool callWrite(Q& q, int i) = 0;
360   virtual string methodName() = 0;
361 };
362
363 template <typename Q>
364 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
365   bool callWrite(Q& q, int i) override {
366     q.blockingWrite(i);
367     return true;
368   }
369   string methodName() override { return "blockingWrite"; }
370 };
371
372 template <typename Q>
373 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
374   bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
375   string methodName() override { return "writeIfNotFull"; }
376 };
377
378 template <typename Q>
379 struct WriteCaller : public WriteMethodCaller<Q> {
380   bool callWrite(Q& q, int i) override { return q.write(i); }
381   string methodName() override { return "write"; }
382 };
383
384 template <typename Q,
385           class Clock = steady_clock,
386           class Duration = typename Clock::duration>
387 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
388   const Duration duration_;
389   explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
390   bool callWrite(Q& q, int i) override {
391     auto then = Clock::now() + duration_;
392     return q.tryWriteUntil(then, i);
393   }
394   string methodName() override {
395     return folly::sformat(
396         "tryWriteUntil({}ms)",
397         std::chrono::duration_cast<milliseconds>(duration_).count());
398   }
399 };
400
401 template <typename Q>
402 string producerConsumerBench(Q&& queue,
403                              string qName,
404                              int numProducers,
405                              int numConsumers,
406                              int numOps,
407                              WriteMethodCaller<Q>& writer,
408                              bool ignoreContents = false) {
409   Q& q = queue;
410
411   struct rusage beginUsage;
412   getrusage(RUSAGE_SELF, &beginUsage);
413
414   auto beginMicro = nowMicro();
415
416   uint64_t n = numOps;
417   std::atomic<uint64_t> sum(0);
418   std::atomic<uint64_t> failed(0);
419
420   vector<std::thread> producers(numProducers);
421   for (int t = 0; t < numProducers; ++t) {
422     producers[t] = DSched::thread([&,t]{
423       for (int i = t; i < numOps; i += numProducers) {
424         while (!writer.callWrite(q, i)) {
425           ++failed;
426         }
427       }
428     });
429   }
430
431   vector<std::thread> consumers(numConsumers);
432   for (int t = 0; t < numConsumers; ++t) {
433     consumers[t] = DSched::thread([&,t]{
434       uint64_t localSum = 0;
435       for (int i = t; i < numOps; i += numConsumers) {
436         int dest = -1;
437         q.blockingRead(dest);
438         EXPECT_FALSE(dest == -1);
439         localSum += dest;
440       }
441       sum += localSum;
442     });
443   }
444
445   for (auto& t : producers) {
446     DSched::join(t);
447   }
448   for (auto& t : consumers) {
449     DSched::join(t);
450   }
451   if (!ignoreContents) {
452     EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
453   }
454
455   auto endMicro = nowMicro();
456
457   struct rusage endUsage;
458   getrusage(RUSAGE_SELF, &endUsage);
459
460   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
461   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
462       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
463   uint64_t failures = failed;
464   size_t allocated = q.allocatedCapacity();
465
466   return folly::sformat(
467       "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
468       "handoff, {} failures, {} allocated",
469       qName,
470       numProducers,
471       writer.methodName(),
472       numConsumers,
473       nanosPer,
474       csw,
475       n,
476       failures,
477       allocated);
478 }
479
480 template <bool Dynamic = false>
481 void runMtProdConsDeterministic(long seed) {
482   // we use the Bench method, but perf results are meaningless under DSched
483   DSched sched(DSched::uniform(seed));
484
485   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
486                                                 Dynamic>>>> callers;
487   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
488                        DeterministicAtomic, Dynamic>>>());
489   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
490                        DeterministicAtomic, Dynamic>>>());
491   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
492                        DeterministicAtomic, Dynamic>>>());
493   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
494                        DeterministicAtomic, Dynamic>>>(milliseconds(1)));
495   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
496                        DeterministicAtomic, Dynamic>>>(seconds(2)));
497   size_t cap;
498
499   for (const auto& caller : callers) {
500     cap = 10;
501     LOG(INFO) <<
502       producerConsumerBench(
503         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
504         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
505           + folly::to<std::string>(cap)+")",
506         1,
507         1,
508         1000,
509         *caller);
510     cap = 100;
511     LOG(INFO) <<
512       producerConsumerBench(
513         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
514         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
515           + folly::to<std::string>(cap)+")",
516         10,
517         10,
518         1000,
519         *caller);
520     cap = 10;
521     LOG(INFO) <<
522       producerConsumerBench(
523         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
524         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
525           + folly::to<std::string>(cap)+")",
526         1,
527         1,
528         1000,
529         *caller);
530     cap = 100;
531     LOG(INFO) <<
532       producerConsumerBench(
533         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
534         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
535           + folly::to<std::string>(cap)+")",
536         10,
537         10,
538         1000,
539         *caller);
540     cap = 1;
541     LOG(INFO) <<
542       producerConsumerBench(
543         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
544         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
545           + folly::to<std::string>(cap)+")",
546         10,
547         10,
548         1000,
549         *caller);
550   }
551 }
552
553 void runMtProdConsDeterministicDynamic(
554   long seed,
555   uint32_t prods,
556   uint32_t cons,
557   uint32_t numOps,
558   size_t cap,
559   size_t minCap,
560   size_t mult
561 ) {
562   // we use the Bench method, but perf results are meaningless under DSched
563   DSched sched(DSched::uniform(seed));
564
565   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
566                                                 true>>>> callers;
567   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
568                        DeterministicAtomic, true>>>());
569   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
570                        DeterministicAtomic, true>>>());
571   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
572                        DeterministicAtomic, true>>>());
573   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
574                        DeterministicAtomic, true>>>(milliseconds(1)));
575   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
576                        DeterministicAtomic, true>>>(seconds(2)));
577
578   for (const auto& caller : callers) {
579     LOG(INFO) <<
580       producerConsumerBench(
581         MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
582         "MPMCQueue<int, DeterministicAtomic, true>("
583           + folly::to<std::string>(cap) + ", "
584           + folly::to<std::string>(minCap) + ", "
585           + folly::to<std::string>(mult)+")",
586         prods,
587         cons,
588         numOps,
589         *caller);
590   }
591 }
592
593 TEST(MPMCQueue, mt_prod_cons_deterministic) {
594   runMtProdConsDeterministic(0);
595 }
596
597 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
598   runMtProdConsDeterministic<true>(0);
599 }
600
601 template <typename T>
602 void setFromEnv(T& var, const char* envvar) {
603   char* str = std::getenv(envvar);
604   if (str) { var = atoi(str); }
605 }
606
607 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
608   long seed = 0;
609   uint32_t prods = 10;
610   uint32_t cons = 10;
611   uint32_t numOps = 1000;
612   size_t cap = 10000;
613   size_t minCap = 9;
614   size_t mult = 3;
615   setFromEnv(seed, "SEED");
616   setFromEnv(prods, "PRODS");
617   setFromEnv(cons, "CONS");
618   setFromEnv(numOps, "NUM_OPS");
619   setFromEnv(cap, "CAP");
620   setFromEnv(minCap, "MIN_CAP");
621   setFromEnv(mult, "MULT");
622   runMtProdConsDeterministicDynamic(
623     seed, prods, cons, numOps, cap, minCap, mult);
624 }
625
626 #define PC_BENCH(q, np, nc, ...) \
627     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
628
629 template <bool Dynamic = false>
630 void runMtProdCons() {
631   int n = 100000;
632   setFromEnv(n, "NUM_OPS");
633   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, std::atomic, Dynamic>>>>
634     callers;
635   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
636                        std::atomic, Dynamic>>>());
637   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
638                        std::atomic, Dynamic>>>());
639   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int, std::atomic,
640                        Dynamic>>>());
641   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
642                        std::atomic, Dynamic>>>(milliseconds(1)));
643   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
644                        std::atomic, Dynamic>>>(seconds(2)));
645   for (const auto& caller : callers) {
646     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
647                           1, 1, n, *caller);
648     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
649                           10, 1, n, *caller);
650     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
651                           1, 10, n, *caller);
652     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
653                           10, 10, n, *caller);
654     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
655                           1, 1, n, *caller);
656     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
657                           10, 1, n, *caller);
658     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
659                           1, 10, n, *caller);
660     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
661                           10, 10, n, *caller);
662     LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(100000)),
663                           32, 100, n, *caller);
664   }
665 }
666
667 TEST(MPMCQueue, mt_prod_cons) {
668   runMtProdCons();
669 }
670
671 TEST(MPMCQueue, mt_prod_cons_dynamic) {
672   runMtProdCons</* Dynamic = */ true>();
673 }
674
675 template <bool Dynamic = false>
676 void runMtProdConsEmulatedFutex() {
677   int n = 100000;
678   vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic,
679                                                 Dynamic>>>> callers;
680   callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
681                        EmulatedFutexAtomic, Dynamic>>>());
682   callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
683                        EmulatedFutexAtomic, Dynamic>>>());
684   callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
685                        EmulatedFutexAtomic, Dynamic>>>());
686   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
687                        EmulatedFutexAtomic, Dynamic>>>(milliseconds(1)));
688   callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
689                        EmulatedFutexAtomic, Dynamic>>>(seconds(2)));
690   for (const auto& caller : callers) {
691     LOG(INFO) << PC_BENCH(
692       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 1, n, *caller);
693     LOG(INFO) << PC_BENCH(
694       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 1, n, *caller);
695     LOG(INFO) << PC_BENCH(
696       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 10, n, *caller);
697     LOG(INFO) << PC_BENCH(
698       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 10, n, *caller);
699     LOG(INFO) << PC_BENCH(
700       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 1, n, *caller);
701     LOG(INFO) << PC_BENCH(
702       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 10, 1, n, *caller);
703     LOG(INFO) << PC_BENCH(
704       (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 10, n, *caller);
705     LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
706                            (10000)), 10, 10, n, *caller);
707     LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
708                            (100000)), 32, 100, n, *caller);
709   }
710 }
711
712 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
713   runMtProdConsEmulatedFutex();
714 }
715
716 TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
717   runMtProdConsEmulatedFutex</* Dynamic = */ true>();
718 }
719
720 template <template <typename> class Atom, bool Dynamic = false>
721 void runNeverFailThread(int numThreads,
722                         int n, /*numOps*/
723                         MPMCQueue<int, Atom, Dynamic>& cq,
724                         std::atomic<uint64_t>& sum,
725                         int t) {
726   uint64_t threadSum = 0;
727   for (int i = t; i < n; i += numThreads) {
728     // enq + deq
729     EXPECT_TRUE(cq.writeIfNotFull(i));
730
731     int dest = -1;
732     EXPECT_TRUE(cq.readIfNotEmpty(dest));
733     EXPECT_TRUE(dest >= 0);
734     threadSum += dest;
735   }
736   sum += threadSum;
737 }
738
739 template <template <typename> class Atom, bool Dynamic = false>
740 uint64_t runNeverFailTest(int numThreads, int numOps) {
741   // always #enq >= #deq
742   MPMCQueue<int, Atom, Dynamic> cq(numThreads);
743
744   uint64_t n = numOps;
745   auto beginMicro = nowMicro();
746
747   vector<std::thread> threads(numThreads);
748   std::atomic<uint64_t> sum(0);
749   for (int t = 0; t < numThreads; ++t) {
750     threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
751                                           numThreads,
752                                           n,
753                                           std::ref(cq),
754                                           std::ref(sum),
755                                           t));
756   }
757   for (auto& t : threads) {
758     DSched::join(t);
759   }
760   EXPECT_TRUE(cq.isEmpty());
761   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
762
763   return nowMicro() - beginMicro;
764 }
765
766 template <template<typename> class Atom, bool Dynamic = false>
767 void runMtNeverFail(std::vector<int>& nts, int n) {
768   for (int nt : nts) {
769     uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
770     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
771               << " threads";
772   }
773 }
774
775 // All the never_fail tests are for the non-dynamic version only.
776 // False positive for dynamic version. Some writeIfNotFull() and
777 // tryWriteUntil() operations may fail in transient conditions related
778 // to expansion.
779
780 TEST(MPMCQueue, mt_never_fail) {
781   std::vector<int> nts {1, 3, 100};
782   int n = 100000;
783   runMtNeverFail<std::atomic>(nts, n);
784 }
785
786 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
787   std::vector<int> nts {1, 3, 100};
788   int n = 100000;
789   runMtNeverFail<EmulatedFutexAtomic>(nts, n);
790 }
791
792 template<bool Dynamic = false>
793 void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
794   LOG(INFO) << "using seed " << seed;
795   for (int nt : nts) {
796     {
797       DSched sched(DSched::uniform(seed));
798       runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
799     }
800     {
801       DSched sched(DSched::uniformSubset(seed, 2));
802       runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
803     }
804   }
805 }
806
807 TEST(MPMCQueue, mt_never_fail_deterministic) {
808   std::vector<int> nts {3, 10};
809   long seed = 0; // nowMicro() % 10000;
810   int n = 1000;
811   runMtNeverFailDeterministic(nts, n, seed);
812 }
813
814 template <class Clock, template <typename> class Atom, bool Dynamic>
815 void runNeverFailUntilThread(int numThreads,
816                              int n, /*numOps*/
817                              MPMCQueue<int, Atom, Dynamic>& cq,
818                              std::atomic<uint64_t>& sum,
819                              int t) {
820   uint64_t threadSum = 0;
821   for (int i = t; i < n; i += numThreads) {
822     // enq + deq
823     auto soon = Clock::now() + std::chrono::seconds(1);
824     EXPECT_TRUE(cq.tryWriteUntil(soon, i));
825
826     int dest = -1;
827     EXPECT_TRUE(cq.readIfNotEmpty(dest));
828     EXPECT_TRUE(dest >= 0);
829     threadSum += dest;
830   }
831   sum += threadSum;
832 }
833
834 template <class Clock, template <typename> class Atom, bool Dynamic = false>
835 uint64_t runNeverFailTest(int numThreads, int numOps) {
836   // always #enq >= #deq
837   MPMCQueue<int, Atom, Dynamic> cq(numThreads);
838
839   uint64_t n = numOps;
840   auto beginMicro = nowMicro();
841
842   vector<std::thread> threads(numThreads);
843   std::atomic<uint64_t> sum(0);
844   for (int t = 0; t < numThreads; ++t) {
845     threads[t] = DSched::thread(std::bind(
846                                   runNeverFailUntilThread<Clock, Atom, Dynamic>,
847                                   numThreads,
848                                   n,
849                                   std::ref(cq),
850                                   std::ref(sum),
851                                   t));
852   }
853   for (auto& t : threads) {
854     DSched::join(t);
855   }
856   EXPECT_TRUE(cq.isEmpty());
857   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
858
859   return nowMicro() - beginMicro;
860 }
861
862 template <bool Dynamic = false>
863 void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
864   for (int nt : nts) {
865     uint64_t elapsed =
866       runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
867     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
868               << " threads";
869   }
870 }
871
872 TEST(MPMCQueue, mt_never_fail_until_system) {
873   std::vector<int> nts {1, 3, 100};
874   int n = 100000;
875   runMtNeverFailUntilSystem(nts, n);
876 }
877
878 template <bool Dynamic = false>
879 void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
880   for (int nt : nts) {
881     uint64_t elapsed =
882       runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
883     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
884               << " threads";
885   }
886 }
887
888 TEST(MPMCQueue, mt_never_fail_until_steady) {
889   std::vector<int> nts {1, 3, 100};
890   int n = 100000;
891   runMtNeverFailUntilSteady(nts, n);
892 }
893
894 enum LifecycleEvent {
895   NOTHING = -1,
896   DEFAULT_CONSTRUCTOR,
897   COPY_CONSTRUCTOR,
898   MOVE_CONSTRUCTOR,
899   TWO_ARG_CONSTRUCTOR,
900   COPY_OPERATOR,
901   MOVE_OPERATOR,
902   DESTRUCTOR,
903   MAX_LIFECYCLE_EVENT
904 };
905
906 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
907 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
908
909 static int lc_outstanding() {
910   return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
911       lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
912       lc_counts[DESTRUCTOR];
913 }
914
915 static void lc_snap() {
916   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
917     lc_prev[i] = lc_counts[i];
918   }
919 }
920
921 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
922
923 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
924   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
925     int delta = i == what || i == what2 ? 1 : 0;
926     EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
927         << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
928         << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
929         << ", from line " << lineno;
930   }
931   lc_snap();
932 }
933
934 template <typename R>
935 struct Lifecycle {
936   typedef R IsRelocatable;
937
938   bool constructed;
939
940   Lifecycle() noexcept : constructed(true) {
941     ++lc_counts[DEFAULT_CONSTRUCTOR];
942   }
943
944   explicit Lifecycle(int /* n */, char const* /* s */) noexcept
945       : constructed(true) {
946     ++lc_counts[TWO_ARG_CONSTRUCTOR];
947   }
948
949   Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
950     ++lc_counts[COPY_CONSTRUCTOR];
951   }
952
953   Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
954     ++lc_counts[MOVE_CONSTRUCTOR];
955   }
956
957   Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
958     ++lc_counts[COPY_OPERATOR];
959     return *this;
960   }
961
962   Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
963     ++lc_counts[MOVE_OPERATOR];
964     return *this;
965   }
966
967   ~Lifecycle() noexcept {
968     ++lc_counts[DESTRUCTOR];
969     assert(lc_outstanding() >= 0);
970     assert(constructed);
971     constructed = false;
972   }
973 };
974
975 template <typename R>
976 void runPerfectForwardingTest() {
977   lc_snap();
978   EXPECT_EQ(lc_outstanding(), 0);
979
980   {
981     // Non-dynamic only. False positive for dynamic.
982     MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
983     LIFECYCLE_STEP(NOTHING);
984
985     for (int pass = 0; pass < 10; ++pass) {
986       for (int i = 0; i < 10; ++i) {
987         queue.blockingWrite();
988         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
989
990         queue.blockingWrite(1, "one");
991         LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
992
993         {
994           Lifecycle<R> src;
995           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
996           queue.blockingWrite(std::move(src));
997           LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
998         }
999         LIFECYCLE_STEP(DESTRUCTOR);
1000
1001         {
1002           Lifecycle<R> src;
1003           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1004           queue.blockingWrite(src);
1005           LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1006         }
1007         LIFECYCLE_STEP(DESTRUCTOR);
1008
1009         EXPECT_TRUE(queue.write());
1010         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1011       }
1012
1013       EXPECT_EQ(queue.size(), 50);
1014       EXPECT_FALSE(queue.write(2, "two"));
1015       LIFECYCLE_STEP(NOTHING);
1016
1017       for (int i = 0; i < 50; ++i) {
1018         {
1019           Lifecycle<R> node;
1020           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1021
1022           queue.blockingRead(node);
1023           if (R::value) {
1024             // relocatable, moved via memcpy
1025             LIFECYCLE_STEP(DESTRUCTOR);
1026           } else {
1027             LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1028           }
1029         }
1030         LIFECYCLE_STEP(DESTRUCTOR);
1031       }
1032
1033       EXPECT_EQ(queue.size(), 0);
1034     }
1035
1036     // put one element back before destruction
1037     {
1038       Lifecycle<R> src(3, "three");
1039       LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1040       queue.write(std::move(src));
1041       LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1042     }
1043     LIFECYCLE_STEP(DESTRUCTOR); // destroy src
1044   }
1045   LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
1046
1047   EXPECT_EQ(lc_outstanding(), 0);
1048 }
1049
1050 TEST(MPMCQueue, perfect_forwarding) {
1051   runPerfectForwardingTest<std::false_type>();
1052 }
1053
1054 TEST(MPMCQueue, perfect_forwarding_relocatable) {
1055   runPerfectForwardingTest<std::true_type>();
1056 }
1057
1058 template <bool Dynamic = false>
1059 void run_queue_moving() {
1060   lc_snap();
1061   EXPECT_EQ(lc_outstanding(), 0);
1062
1063   {
1064     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
1065     LIFECYCLE_STEP(NOTHING);
1066
1067     a.blockingWrite();
1068     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1069
1070     // move constructor
1071     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
1072       = std::move(a);
1073     LIFECYCLE_STEP(NOTHING);
1074     EXPECT_EQ(a.capacity(), 0);
1075     EXPECT_EQ(a.size(), 0);
1076     EXPECT_EQ(b.capacity(), 50);
1077     EXPECT_EQ(b.size(), 1);
1078
1079     b.blockingWrite();
1080     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1081
1082     // move operator
1083     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
1084     LIFECYCLE_STEP(NOTHING);
1085     c = std::move(b);
1086     LIFECYCLE_STEP(NOTHING);
1087     EXPECT_EQ(c.capacity(), 50);
1088     EXPECT_EQ(c.size(), 2);
1089
1090     {
1091       Lifecycle<std::false_type> dst;
1092       LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1093       c.blockingRead(dst);
1094       LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1095
1096       {
1097         // swap
1098         MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
1099         LIFECYCLE_STEP(NOTHING);
1100         std::swap(c, d);
1101         LIFECYCLE_STEP(NOTHING);
1102         EXPECT_EQ(c.capacity(), 10);
1103         EXPECT_TRUE(c.isEmpty());
1104         EXPECT_EQ(d.capacity(), 50);
1105         EXPECT_EQ(d.size(), 1);
1106
1107         d.blockingRead(dst);
1108         LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1109
1110         c.blockingWrite(dst);
1111         LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1112
1113         d.blockingWrite(std::move(dst));
1114         LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1115       } // d goes out of scope
1116       LIFECYCLE_STEP(DESTRUCTOR);
1117     } // dst goes out of scope
1118     LIFECYCLE_STEP(DESTRUCTOR);
1119   } // c goes out of scope
1120   LIFECYCLE_STEP(DESTRUCTOR);
1121 }
1122
1123 TEST(MPMCQueue, queue_moving) {
1124   run_queue_moving();
1125 }
1126
1127 TEST(MPMCQueue, queue_moving_dynamic) {
1128   run_queue_moving<true>();
1129 }
1130
1131 TEST(MPMCQueue, explicit_zero_capacity_fail) {
1132   ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
1133
1134   using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
1135   ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
1136 }
1137
1138 template <bool Dynamic>
1139 void testTryReadUntil() {
1140   MPMCQueue<int, std::atomic, Dynamic> q{1};
1141
1142   const auto wait = std::chrono::milliseconds(100);
1143   stop_watch<> watch;
1144   bool rets[2];
1145   int vals[2];
1146   std::vector<std::thread> threads;
1147   boost::barrier b{3};
1148   for (int i = 0; i < 2; i++) {
1149     threads.emplace_back([&, i] {
1150       b.wait();
1151       rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
1152     });
1153   }
1154
1155   b.wait();
1156   EXPECT_TRUE(q.write(42));
1157
1158   for (int i = 0; i < 2; i++) {
1159     threads[i].join();
1160   }
1161
1162   for (int i = 0; i < 2; i++) {
1163     int other = (i + 1) % 2;
1164     if (rets[i]) {
1165       EXPECT_EQ(42, vals[i]);
1166       EXPECT_FALSE(rets[other]);
1167     }
1168   }
1169
1170   EXPECT_TRUE(watch.elapsed(wait));
1171 }
1172
1173 template <bool Dynamic>
1174 void testTryWriteUntil() {
1175   MPMCQueue<int, std::atomic, Dynamic> q{1};
1176   EXPECT_TRUE(q.write(42));
1177
1178   const auto wait = std::chrono::milliseconds(100);
1179   stop_watch<> watch;
1180   bool rets[2];
1181   std::vector<std::thread> threads;
1182   boost::barrier b{3};
1183   for (int i = 0; i < 2; i++) {
1184     threads.emplace_back([&, i] {
1185       b.wait();
1186       rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
1187     });
1188   }
1189
1190   b.wait();
1191   int x;
1192   EXPECT_TRUE(q.read(x));
1193   EXPECT_EQ(42, x);
1194
1195   for (int i = 0; i < 2; i++) {
1196     threads[i].join();
1197   }
1198   EXPECT_TRUE(q.read(x));
1199
1200   for (int i = 0; i < 2; i++) {
1201     int other = (i + 1) % 2;
1202     if (rets[i]) {
1203       EXPECT_EQ(i, x);
1204       EXPECT_FALSE(rets[other]);
1205     }
1206   }
1207
1208   EXPECT_TRUE(watch.elapsed(wait));
1209 }
1210
1211 TEST(MPMCQueue, try_read_until) {
1212   testTryReadUntil<false>();
1213 }
1214
1215 TEST(MPMCQueue, try_read_until_dynamic) {
1216   testTryReadUntil<true>();
1217 }
1218
1219 TEST(MPMCQueue, try_write_until) {
1220   testTryWriteUntil<false>();
1221 }
1222
1223 TEST(MPMCQueue, try_write_until_dynamic) {
1224   testTryWriteUntil<true>();
1225 }
1226
1227 template <bool Dynamic>
1228 void testTimeout(MPMCQueue<int, std::atomic, Dynamic>& q) {
1229   CHECK(q.write(1));
1230   /* The following must not block forever */
1231   q.tryWriteUntil(
1232       std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
1233 }
1234
1235 TEST(MPMCQueue, try_write_until_timeout) {
1236   folly::MPMCQueue<int, std::atomic, false> queue(1);
1237   testTimeout<false>(queue);
1238 }
1239
1240 TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
1241   folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
1242   testTimeout<true>(queue);
1243 }