Futex::futexWait returns FutexResult
[folly.git] / folly / test / MPMCQueueTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/Format.h>
18 #include <folly/MPMCQueue.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/GTest.h>
21 #include <folly/portability/SysResource.h>
22 #include <folly/portability/SysTime.h>
23 #include <folly/portability/Unistd.h>
24 #include <folly/stop_watch.h>
25 #include <folly/test/DeterministicSchedule.h>
26
27 #include <boost/intrusive_ptr.hpp>
28 #include <boost/thread/barrier.hpp>
29 #include <functional>
30 #include <memory>
31 #include <thread>
32 #include <utility>
33
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
35
36 using namespace folly;
37 using namespace detail;
38 using namespace test;
39 using std::chrono::time_point;
40 using std::chrono::steady_clock;
41 using std::chrono::seconds;
42 using std::chrono::milliseconds;
43 using std::string;
44 using std::unique_ptr;
45 using std::vector;
46
47 typedef DeterministicSchedule DSched;
48
49 template <template <typename> class Atom>
50 void run_mt_sequencer_thread(
51     int numThreads,
52     int numOps,
53     uint32_t init,
54     TurnSequencer<Atom>& seq,
55     Atom<uint32_t>& spinThreshold,
56     int& prev,
57     int i) {
58   for (int op = i; op < numOps; op += numThreads) {
59     seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
60     EXPECT_EQ(prev, op - 1);
61     prev = op;
62     seq.completeTurn(init + op);
63   }
64 }
65
66 template <template <typename> class Atom>
67 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
68   TurnSequencer<Atom> seq(init);
69   Atom<uint32_t> spinThreshold(0);
70
71   int prev = -1;
72   vector<std::thread> threads(numThreads);
73   for (int i = 0; i < numThreads; ++i) {
74     threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
75           numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
76           std::ref(prev), i));
77   }
78
79   for (auto& thr : threads) {
80     DSched::join(thr);
81   }
82
83   EXPECT_EQ(prev, numOps - 1);
84 }
85
86 TEST(MPMCQueue, sequencer) {
87   run_mt_sequencer_test<std::atomic>(1, 100, 0);
88   run_mt_sequencer_test<std::atomic>(2, 100000, -100);
89   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
90 }
91
92 TEST(MPMCQueue, sequencer_emulated_futex) {
93   run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
94   run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
95   run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
96 }
97
98 TEST(MPMCQueue, sequencer_deterministic) {
99   DSched sched(DSched::uniform(0));
100   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
101   run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
102   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
103 }
104
105 template <bool Dynamic = false, typename T>
106 void runElementTypeTest(T&& src) {
107   MPMCQueue<T, std::atomic, Dynamic> cq(10);
108   cq.blockingWrite(std::forward<T>(src));
109   T dest;
110   cq.blockingRead(dest);
111   EXPECT_TRUE(cq.write(std::move(dest)));
112   EXPECT_TRUE(cq.read(dest));
113   auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
114   EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
115   EXPECT_TRUE(cq.read(dest));
116   auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
117   EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
118   EXPECT_TRUE(cq.read(dest));
119 }
120
121 struct RefCounted {
122   static FOLLY_TLS int active_instances;
123
124   mutable std::atomic<int> rc;
125
126   RefCounted() : rc(0) {
127     ++active_instances;
128   }
129
130   ~RefCounted() {
131     --active_instances;
132   }
133 };
134 FOLLY_TLS int RefCounted::active_instances;
135
136 void intrusive_ptr_add_ref(RefCounted const* p) {
137   p->rc++;
138 }
139
140 void intrusive_ptr_release(RefCounted const* p) {
141   if (--(p->rc) == 0) {
142     delete p;
143   }
144 }
145
146 TEST(MPMCQueue, lots_of_element_types) {
147   runElementTypeTest(10);
148   runElementTypeTest(string("abc"));
149   runElementTypeTest(std::make_pair(10, string("def")));
150   runElementTypeTest(vector<string>{{"abc"}});
151   runElementTypeTest(std::make_shared<char>('a'));
152   runElementTypeTest(std::make_unique<char>('a'));
153   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
154   EXPECT_EQ(RefCounted::active_instances, 0);
155 }
156
157 TEST(MPMCQueue, lots_of_element_types_dynamic) {
158   runElementTypeTest<true>(10);
159   runElementTypeTest<true>(string("abc"));
160   runElementTypeTest<true>(std::make_pair(10, string("def")));
161   runElementTypeTest<true>(vector<string>{{"abc"}});
162   runElementTypeTest<true>(std::make_shared<char>('a'));
163   runElementTypeTest<true>(std::make_unique<char>('a'));
164   runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
165   EXPECT_EQ(RefCounted::active_instances, 0);
166 }
167
168 TEST(MPMCQueue, single_thread_enqdeq) {
169   // Non-dynamic version only.
170   // False positive for dynamic version. Capacity can be temporarily
171   // higher than specified.
172   MPMCQueue<int> cq(10);
173
174   for (int pass = 0; pass < 10; ++pass) {
175     for (int i = 0; i < 10; ++i) {
176       EXPECT_TRUE(cq.write(i));
177     }
178     EXPECT_FALSE(cq.write(-1));
179     EXPECT_FALSE(cq.isEmpty());
180     EXPECT_EQ(cq.size(), 10);
181
182     for (int i = 0; i < 5; ++i) {
183       int dest = -1;
184       EXPECT_TRUE(cq.read(dest));
185       EXPECT_EQ(dest, i);
186     }
187     for (int i = 5; i < 10; ++i) {
188       int dest = -1;
189       cq.blockingRead(dest);
190       EXPECT_EQ(dest, i);
191     }
192     int dest = -1;
193     EXPECT_FALSE(cq.read(dest));
194     EXPECT_EQ(dest, -1);
195
196     EXPECT_TRUE(cq.isEmpty());
197     EXPECT_EQ(cq.size(), 0);
198   }
199 }
200
201 TEST(MPMCQueue, tryenq_capacity_test) {
202   // Non-dynamic version only.
203   // False positive for dynamic version. Capacity can be temporarily
204   // higher than specified.
205   for (size_t cap = 1; cap < 100; ++cap) {
206     MPMCQueue<int> cq(cap);
207     for (size_t i = 0; i < cap; ++i) {
208       EXPECT_TRUE(cq.write(i));
209     }
210     EXPECT_FALSE(cq.write(100));
211   }
212 }
213
214 TEST(MPMCQueue, enq_capacity_test) {
215   // Non-dynamic version only.
216   // False positive for dynamic version. Capacity can be temporarily
217   // higher than specified.
218   for (auto cap : { 1, 100, 10000 }) {
219     MPMCQueue<int> cq(cap);
220     for (int i = 0; i < cap; ++i) {
221       cq.blockingWrite(i);
222     }
223     int t = 0;
224     int when;
225     auto thr = std::thread([&]{
226       cq.blockingWrite(100);
227       when = t;
228     });
229     usleep(2000);
230     t = 1;
231     int dummy;
232     cq.blockingRead(dummy);
233     thr.join();
234     EXPECT_EQ(when, 1);
235   }
236 }
237
238 template <template <typename> class Atom, bool Dynamic = false>
239 void runTryEnqDeqThread(
240     int numThreads,
241     int n, /*numOps*/
242     MPMCQueue<int, Atom, Dynamic>& cq,
243     std::atomic<uint64_t>& sum,
244     int t) {
245   uint64_t threadSum = 0;
246   int src = t;
247   // received doesn't reflect any actual values, we just start with
248   // t and increment by numThreads to get the rounding of termination
249   // correct if numThreads doesn't evenly divide numOps
250   int received = t;
251   while (src < n || received < n) {
252     if (src < n && cq.write(src)) {
253       src += numThreads;
254     }
255
256     int dst;
257     if (received < n && cq.read(dst)) {
258       received += numThreads;
259       threadSum += dst;
260     }
261   }
262   sum += threadSum;
263 }
264
265 template <template <typename> class Atom, bool Dynamic = false>
266 void runTryEnqDeqTest(int numThreads, int numOps) {
267   // write and read aren't linearizable, so we don't have
268   // hard guarantees on their individual behavior.  We can still test
269   // correctness in aggregate
270   MPMCQueue<int,Atom, Dynamic> cq(numThreads);
271
272   uint64_t n = numOps;
273   vector<std::thread> threads(numThreads);
274   std::atomic<uint64_t> sum(0);
275   for (int t = 0; t < numThreads; ++t) {
276     threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
277           numThreads, n, std::ref(cq), std::ref(sum), t));
278   }
279   for (auto& t : threads) {
280     DSched::join(t);
281   }
282   EXPECT_TRUE(cq.isEmpty());
283   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
284 }
285
286 TEST(MPMCQueue, mt_try_enq_deq) {
287   int nts[] = { 1, 3, 100 };
288
289   int n = 100000;
290   for (int nt : nts) {
291     runTryEnqDeqTest<std::atomic>(nt, n);
292   }
293 }
294
295 TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
296   int nts[] = { 1, 3, 100 };
297
298   int n = 100000;
299   for (int nt : nts) {
300     runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
301   }
302 }
303
304 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
305   int nts[] = { 1, 3, 100 };
306
307   int n = 100000;
308   for (int nt : nts) {
309     runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
310   }
311 }
312
313 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
314   int nts[] = { 1, 3, 100 };
315
316   int n = 100000;
317   for (int nt : nts) {
318     runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
319   }
320 }
321
322 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
323   int nts[] = { 3, 10 };
324
325   long seed = 0;
326   LOG(INFO) << "using seed " << seed;
327
328   int n = 1000;
329   for (int nt : nts) {
330     {
331       DSched sched(DSched::uniform(seed));
332       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
333     }
334     {
335       DSched sched(DSched::uniformSubset(seed, 2));
336       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
337     }
338     {
339       DSched sched(DSched::uniform(seed));
340       runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
341     }
342     {
343       DSched sched(DSched::uniformSubset(seed, 2));
344       runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
345     }
346   }
347 }
348
349 uint64_t nowMicro() {
350   timeval tv;
351   gettimeofday(&tv, nullptr);
352   return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
353 }
354
355 template <typename Q>
356 struct WriteMethodCaller {
357   WriteMethodCaller() {}
358   virtual ~WriteMethodCaller() = default;
359   virtual bool callWrite(Q& q, int i) = 0;
360   virtual string methodName() = 0;
361 };
362
363 template <typename Q>
364 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
365   bool callWrite(Q& q, int i) override {
366     q.blockingWrite(i);
367     return true;
368   }
369   string methodName() override { return "blockingWrite"; }
370 };
371
372 template <typename Q>
373 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
374   bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
375   string methodName() override { return "writeIfNotFull"; }
376 };
377
378 template <typename Q>
379 struct WriteCaller : public WriteMethodCaller<Q> {
380   bool callWrite(Q& q, int i) override { return q.write(i); }
381   string methodName() override { return "write"; }
382 };
383
384 template <
385     typename Q,
386     class Clock = steady_clock,
387     class Duration = typename Clock::duration>
388 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
389   const Duration duration_;
390   explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
391   bool callWrite(Q& q, int i) override {
392     auto then = Clock::now() + duration_;
393     return q.tryWriteUntil(then, i);
394   }
395   string methodName() override {
396     return folly::sformat(
397         "tryWriteUntil({}ms)",
398         std::chrono::duration_cast<milliseconds>(duration_).count());
399   }
400 };
401
402 template <typename Q>
403 string producerConsumerBench(Q&& queue,
404                              string qName,
405                              int numProducers,
406                              int numConsumers,
407                              int numOps,
408                              WriteMethodCaller<Q>& writer,
409                              bool ignoreContents = false) {
410   Q& q = queue;
411
412   struct rusage beginUsage;
413   getrusage(RUSAGE_SELF, &beginUsage);
414
415   auto beginMicro = nowMicro();
416
417   uint64_t n = numOps;
418   std::atomic<uint64_t> sum(0);
419   std::atomic<uint64_t> failed(0);
420
421   vector<std::thread> producers(numProducers);
422   for (int t = 0; t < numProducers; ++t) {
423     producers[t] = DSched::thread([&,t]{
424       for (int i = t; i < numOps; i += numProducers) {
425         while (!writer.callWrite(q, i)) {
426           ++failed;
427         }
428       }
429     });
430   }
431
432   vector<std::thread> consumers(numConsumers);
433   for (int t = 0; t < numConsumers; ++t) {
434     consumers[t] = DSched::thread([&,t]{
435       uint64_t localSum = 0;
436       for (int i = t; i < numOps; i += numConsumers) {
437         int dest = -1;
438         q.blockingRead(dest);
439         EXPECT_FALSE(dest == -1);
440         localSum += dest;
441       }
442       sum += localSum;
443     });
444   }
445
446   for (auto& t : producers) {
447     DSched::join(t);
448   }
449   for (auto& t : consumers) {
450     DSched::join(t);
451   }
452   if (!ignoreContents) {
453     EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
454   }
455
456   auto endMicro = nowMicro();
457
458   struct rusage endUsage;
459   getrusage(RUSAGE_SELF, &endUsage);
460
461   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
462   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
463       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
464   uint64_t failures = failed;
465   size_t allocated = q.allocatedCapacity();
466
467   return folly::sformat(
468       "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
469       "handoff, {} failures, {} allocated",
470       qName,
471       numProducers,
472       writer.methodName(),
473       numConsumers,
474       nanosPer,
475       csw,
476       n,
477       failures,
478       allocated);
479 }
480
481 template <bool Dynamic = false>
482 void runMtProdConsDeterministic(long seed) {
483   // we use the Bench method, but perf results are meaningless under DSched
484   DSched sched(DSched::uniform(seed));
485
486   using QueueType = MPMCQueue<int, DeterministicAtomic, Dynamic>;
487
488   vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
489   callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
490   callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
491   callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
492   callers.emplace_back(
493       std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
494   callers.emplace_back(
495       std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
496   size_t cap;
497
498   for (const auto& caller : callers) {
499     cap = 10;
500     LOG(INFO) <<
501       producerConsumerBench(
502         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
503         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
504           + folly::to<std::string>(cap)+")",
505         1,
506         1,
507         1000,
508         *caller);
509     cap = 100;
510     LOG(INFO) <<
511       producerConsumerBench(
512         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
513         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
514           + folly::to<std::string>(cap)+")",
515         10,
516         10,
517         1000,
518         *caller);
519     cap = 10;
520     LOG(INFO) <<
521       producerConsumerBench(
522         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
523         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
524           + folly::to<std::string>(cap)+")",
525         1,
526         1,
527         1000,
528         *caller);
529     cap = 100;
530     LOG(INFO) <<
531       producerConsumerBench(
532         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
533         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
534           + folly::to<std::string>(cap)+")",
535         10,
536         10,
537         1000,
538         *caller);
539     cap = 1;
540     LOG(INFO) <<
541       producerConsumerBench(
542         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
543         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
544           + folly::to<std::string>(cap)+")",
545         10,
546         10,
547         1000,
548         *caller);
549   }
550 }
551
552 void runMtProdConsDeterministicDynamic(
553   long seed,
554   uint32_t prods,
555   uint32_t cons,
556   uint32_t numOps,
557   size_t cap,
558   size_t minCap,
559   size_t mult
560 ) {
561   // we use the Bench method, but perf results are meaningless under DSched
562   DSched sched(DSched::uniform(seed));
563
564   using QueueType = MPMCQueue<int, DeterministicAtomic, true>;
565
566   vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
567   callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
568   callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
569   callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
570   callers.emplace_back(
571       std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
572   callers.emplace_back(
573       std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
574
575   for (const auto& caller : callers) {
576     LOG(INFO) <<
577       producerConsumerBench(
578         MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
579         "MPMCQueue<int, DeterministicAtomic, true>("
580           + folly::to<std::string>(cap) + ", "
581           + folly::to<std::string>(minCap) + ", "
582           + folly::to<std::string>(mult)+")",
583         prods,
584         cons,
585         numOps,
586         *caller);
587   }
588 }
589
590 TEST(MPMCQueue, mt_prod_cons_deterministic) {
591   runMtProdConsDeterministic(0);
592 }
593
594 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
595   runMtProdConsDeterministic<true>(0);
596 }
597
598 template <typename T>
599 void setFromEnv(T& var, const char* envvar) {
600   char* str = std::getenv(envvar);
601   if (str) { var = atoi(str); }
602 }
603
604 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
605   long seed = 0;
606   uint32_t prods = 10;
607   uint32_t cons = 10;
608   uint32_t numOps = 1000;
609   size_t cap = 10000;
610   size_t minCap = 9;
611   size_t mult = 3;
612   setFromEnv(seed, "SEED");
613   setFromEnv(prods, "PRODS");
614   setFromEnv(cons, "CONS");
615   setFromEnv(numOps, "NUM_OPS");
616   setFromEnv(cap, "CAP");
617   setFromEnv(minCap, "MIN_CAP");
618   setFromEnv(mult, "MULT");
619   runMtProdConsDeterministicDynamic(
620     seed, prods, cons, numOps, cap, minCap, mult);
621 }
622
623 #define PC_BENCH(q, np, nc, ...) \
624     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
625
626 template <bool Dynamic = false>
627 void runMtProdCons() {
628   using QueueType = MPMCQueue<int, std::atomic, Dynamic>;
629
630   int n = 100000;
631   setFromEnv(n, "NUM_OPS");
632   vector<unique_ptr<WriteMethodCaller<QueueType>>>
633     callers;
634   callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
635   callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
636   callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
637   callers.emplace_back(
638       std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
639   callers.emplace_back(
640       std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
641   for (const auto& caller : callers) {
642     LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
643     LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
644     LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
645     LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
646     LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
647     LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
648     LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
649     LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
650     LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
651   }
652 }
653
654 TEST(MPMCQueue, mt_prod_cons) {
655   runMtProdCons();
656 }
657
658 TEST(MPMCQueue, mt_prod_cons_dynamic) {
659   runMtProdCons</* Dynamic = */ true>();
660 }
661
662 template <bool Dynamic = false>
663 void runMtProdConsEmulatedFutex() {
664   using QueueType = MPMCQueue<int, EmulatedFutexAtomic, Dynamic>;
665
666   int n = 100000;
667   vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
668   callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
669   callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
670   callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
671   callers.emplace_back(
672       std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
673   callers.emplace_back(
674       std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
675   for (const auto& caller : callers) {
676     LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
677     LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
678     LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
679     LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
680     LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
681     LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
682     LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
683     LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
684     LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
685   }
686 }
687
688 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
689   runMtProdConsEmulatedFutex();
690 }
691
692 TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
693   runMtProdConsEmulatedFutex</* Dynamic = */ true>();
694 }
695
696 template <template <typename> class Atom, bool Dynamic = false>
697 void runNeverFailThread(int numThreads,
698                         int n, /*numOps*/
699                         MPMCQueue<int, Atom, Dynamic>& cq,
700                         std::atomic<uint64_t>& sum,
701                         int t) {
702   uint64_t threadSum = 0;
703   for (int i = t; i < n; i += numThreads) {
704     // enq + deq
705     EXPECT_TRUE(cq.writeIfNotFull(i));
706
707     int dest = -1;
708     EXPECT_TRUE(cq.readIfNotEmpty(dest));
709     EXPECT_TRUE(dest >= 0);
710     threadSum += dest;
711   }
712   sum += threadSum;
713 }
714
715 template <template <typename> class Atom, bool Dynamic = false>
716 uint64_t runNeverFailTest(int numThreads, int numOps) {
717   // always #enq >= #deq
718   MPMCQueue<int, Atom, Dynamic> cq(numThreads);
719
720   uint64_t n = numOps;
721   auto beginMicro = nowMicro();
722
723   vector<std::thread> threads(numThreads);
724   std::atomic<uint64_t> sum(0);
725   for (int t = 0; t < numThreads; ++t) {
726     threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
727                                           numThreads,
728                                           n,
729                                           std::ref(cq),
730                                           std::ref(sum),
731                                           t));
732   }
733   for (auto& t : threads) {
734     DSched::join(t);
735   }
736   EXPECT_TRUE(cq.isEmpty());
737   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
738
739   return nowMicro() - beginMicro;
740 }
741
742 template <template <typename> class Atom, bool Dynamic = false>
743 void runMtNeverFail(std::vector<int>& nts, int n) {
744   for (int nt : nts) {
745     uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
746     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
747               << " threads";
748   }
749 }
750
751 // All the never_fail tests are for the non-dynamic version only.
752 // False positive for dynamic version. Some writeIfNotFull() and
753 // tryWriteUntil() operations may fail in transient conditions related
754 // to expansion.
755
756 TEST(MPMCQueue, mt_never_fail) {
757   std::vector<int> nts {1, 3, 100};
758   int n = 100000;
759   runMtNeverFail<std::atomic>(nts, n);
760 }
761
762 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
763   std::vector<int> nts {1, 3, 100};
764   int n = 100000;
765   runMtNeverFail<EmulatedFutexAtomic>(nts, n);
766 }
767
768 template <bool Dynamic = false>
769 void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
770   LOG(INFO) << "using seed " << seed;
771   for (int nt : nts) {
772     {
773       DSched sched(DSched::uniform(seed));
774       runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
775     }
776     {
777       DSched sched(DSched::uniformSubset(seed, 2));
778       runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
779     }
780   }
781 }
782
783 TEST(MPMCQueue, mt_never_fail_deterministic) {
784   std::vector<int> nts {3, 10};
785   long seed = 0; // nowMicro() % 10000;
786   int n = 1000;
787   runMtNeverFailDeterministic(nts, n, seed);
788 }
789
790 template <class Clock, template <typename> class Atom, bool Dynamic>
791 void runNeverFailUntilThread(int numThreads,
792                              int n, /*numOps*/
793                              MPMCQueue<int, Atom, Dynamic>& cq,
794                              std::atomic<uint64_t>& sum,
795                              int t) {
796   uint64_t threadSum = 0;
797   for (int i = t; i < n; i += numThreads) {
798     // enq + deq
799     auto soon = Clock::now() + std::chrono::seconds(1);
800     EXPECT_TRUE(cq.tryWriteUntil(soon, i));
801
802     int dest = -1;
803     EXPECT_TRUE(cq.readIfNotEmpty(dest));
804     EXPECT_TRUE(dest >= 0);
805     threadSum += dest;
806   }
807   sum += threadSum;
808 }
809
810 template <class Clock, template <typename> class Atom, bool Dynamic = false>
811 uint64_t runNeverFailTest(int numThreads, int numOps) {
812   // always #enq >= #deq
813   MPMCQueue<int, Atom, Dynamic> cq(numThreads);
814
815   uint64_t n = numOps;
816   auto beginMicro = nowMicro();
817
818   vector<std::thread> threads(numThreads);
819   std::atomic<uint64_t> sum(0);
820   for (int t = 0; t < numThreads; ++t) {
821     threads[t] = DSched::thread(std::bind(
822                                   runNeverFailUntilThread<Clock, Atom, Dynamic>,
823                                   numThreads,
824                                   n,
825                                   std::ref(cq),
826                                   std::ref(sum),
827                                   t));
828   }
829   for (auto& t : threads) {
830     DSched::join(t);
831   }
832   EXPECT_TRUE(cq.isEmpty());
833   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
834
835   return nowMicro() - beginMicro;
836 }
837
838 template <bool Dynamic = false>
839 void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
840   for (int nt : nts) {
841     uint64_t elapsed =
842       runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
843     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
844               << " threads";
845   }
846 }
847
848 TEST(MPMCQueue, mt_never_fail_until_system) {
849   std::vector<int> nts {1, 3, 100};
850   int n = 100000;
851   runMtNeverFailUntilSystem(nts, n);
852 }
853
854 template <bool Dynamic = false>
855 void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
856   for (int nt : nts) {
857     uint64_t elapsed =
858       runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
859     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
860               << " threads";
861   }
862 }
863
864 TEST(MPMCQueue, mt_never_fail_until_steady) {
865   std::vector<int> nts {1, 3, 100};
866   int n = 100000;
867   runMtNeverFailUntilSteady(nts, n);
868 }
869
870 enum LifecycleEvent {
871   NOTHING = -1,
872   DEFAULT_CONSTRUCTOR,
873   COPY_CONSTRUCTOR,
874   MOVE_CONSTRUCTOR,
875   TWO_ARG_CONSTRUCTOR,
876   COPY_OPERATOR,
877   MOVE_OPERATOR,
878   DESTRUCTOR,
879   MAX_LIFECYCLE_EVENT
880 };
881
882 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
883 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
884
885 static int lc_outstanding() {
886   return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
887       lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
888       lc_counts[DESTRUCTOR];
889 }
890
891 static void lc_snap() {
892   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
893     lc_prev[i] = lc_counts[i];
894   }
895 }
896
897 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
898
899 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
900   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
901     int delta = i == what || i == what2 ? 1 : 0;
902     EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
903         << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
904         << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
905         << ", from line " << lineno;
906   }
907   lc_snap();
908 }
909
910 template <typename R>
911 struct Lifecycle {
912   typedef R IsRelocatable;
913
914   bool constructed;
915
916   Lifecycle() noexcept : constructed(true) {
917     ++lc_counts[DEFAULT_CONSTRUCTOR];
918   }
919
920   explicit Lifecycle(int /* n */, char const* /* s */) noexcept
921       : constructed(true) {
922     ++lc_counts[TWO_ARG_CONSTRUCTOR];
923   }
924
925   Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
926     ++lc_counts[COPY_CONSTRUCTOR];
927   }
928
929   Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
930     ++lc_counts[MOVE_CONSTRUCTOR];
931   }
932
933   Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
934     ++lc_counts[COPY_OPERATOR];
935     return *this;
936   }
937
938   Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
939     ++lc_counts[MOVE_OPERATOR];
940     return *this;
941   }
942
943   ~Lifecycle() noexcept {
944     ++lc_counts[DESTRUCTOR];
945     assert(lc_outstanding() >= 0);
946     assert(constructed);
947     constructed = false;
948   }
949 };
950
951 template <typename R>
952 void runPerfectForwardingTest() {
953   lc_snap();
954   EXPECT_EQ(lc_outstanding(), 0);
955
956   {
957     // Non-dynamic only. False positive for dynamic.
958     MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
959     LIFECYCLE_STEP(NOTHING);
960
961     for (int pass = 0; pass < 10; ++pass) {
962       for (int i = 0; i < 10; ++i) {
963         queue.blockingWrite();
964         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
965
966         queue.blockingWrite(1, "one");
967         LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
968
969         {
970           Lifecycle<R> src;
971           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
972           queue.blockingWrite(std::move(src));
973           LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
974         }
975         LIFECYCLE_STEP(DESTRUCTOR);
976
977         {
978           Lifecycle<R> src;
979           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
980           queue.blockingWrite(src);
981           LIFECYCLE_STEP(COPY_CONSTRUCTOR);
982         }
983         LIFECYCLE_STEP(DESTRUCTOR);
984
985         EXPECT_TRUE(queue.write());
986         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
987       }
988
989       EXPECT_EQ(queue.size(), 50);
990       EXPECT_FALSE(queue.write(2, "two"));
991       LIFECYCLE_STEP(NOTHING);
992
993       for (int i = 0; i < 50; ++i) {
994         {
995           Lifecycle<R> node;
996           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
997
998           queue.blockingRead(node);
999           if (R::value) {
1000             // relocatable, moved via memcpy
1001             LIFECYCLE_STEP(DESTRUCTOR);
1002           } else {
1003             LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1004           }
1005         }
1006         LIFECYCLE_STEP(DESTRUCTOR);
1007       }
1008
1009       EXPECT_EQ(queue.size(), 0);
1010     }
1011
1012     // put one element back before destruction
1013     {
1014       Lifecycle<R> src(3, "three");
1015       LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1016       queue.write(std::move(src));
1017       LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1018     }
1019     LIFECYCLE_STEP(DESTRUCTOR); // destroy src
1020   }
1021   LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
1022
1023   EXPECT_EQ(lc_outstanding(), 0);
1024 }
1025
1026 TEST(MPMCQueue, perfect_forwarding) {
1027   runPerfectForwardingTest<std::false_type>();
1028 }
1029
1030 TEST(MPMCQueue, perfect_forwarding_relocatable) {
1031   runPerfectForwardingTest<std::true_type>();
1032 }
1033
1034 template <bool Dynamic = false>
1035 void run_queue_moving() {
1036   lc_snap();
1037   EXPECT_EQ(lc_outstanding(), 0);
1038
1039   {
1040     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
1041     LIFECYCLE_STEP(NOTHING);
1042
1043     a.blockingWrite();
1044     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1045
1046     // move constructor
1047     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
1048       = std::move(a);
1049     LIFECYCLE_STEP(NOTHING);
1050     EXPECT_EQ(a.capacity(), 0);
1051     EXPECT_EQ(a.size(), 0);
1052     EXPECT_EQ(b.capacity(), 50);
1053     EXPECT_EQ(b.size(), 1);
1054
1055     b.blockingWrite();
1056     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1057
1058     // move operator
1059     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
1060     LIFECYCLE_STEP(NOTHING);
1061     c = std::move(b);
1062     LIFECYCLE_STEP(NOTHING);
1063     EXPECT_EQ(c.capacity(), 50);
1064     EXPECT_EQ(c.size(), 2);
1065
1066     {
1067       Lifecycle<std::false_type> dst;
1068       LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1069       c.blockingRead(dst);
1070       LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1071
1072       {
1073         // swap
1074         MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
1075         LIFECYCLE_STEP(NOTHING);
1076         std::swap(c, d);
1077         LIFECYCLE_STEP(NOTHING);
1078         EXPECT_EQ(c.capacity(), 10);
1079         EXPECT_TRUE(c.isEmpty());
1080         EXPECT_EQ(d.capacity(), 50);
1081         EXPECT_EQ(d.size(), 1);
1082
1083         d.blockingRead(dst);
1084         LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1085
1086         c.blockingWrite(dst);
1087         LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1088
1089         d.blockingWrite(std::move(dst));
1090         LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1091       } // d goes out of scope
1092       LIFECYCLE_STEP(DESTRUCTOR);
1093     } // dst goes out of scope
1094     LIFECYCLE_STEP(DESTRUCTOR);
1095   } // c goes out of scope
1096   LIFECYCLE_STEP(DESTRUCTOR);
1097 }
1098
1099 TEST(MPMCQueue, queue_moving) {
1100   run_queue_moving();
1101 }
1102
1103 TEST(MPMCQueue, queue_moving_dynamic) {
1104   run_queue_moving<true>();
1105 }
1106
1107 TEST(MPMCQueue, explicit_zero_capacity_fail) {
1108   ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
1109
1110   using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
1111   ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
1112 }
1113
1114 template <bool Dynamic>
1115 void testTryReadUntil() {
1116   MPMCQueue<int, std::atomic, Dynamic> q{1};
1117
1118   const auto wait = std::chrono::milliseconds(100);
1119   stop_watch<> watch;
1120   bool rets[2];
1121   int vals[2];
1122   std::vector<std::thread> threads;
1123   boost::barrier b{3};
1124   for (int i = 0; i < 2; i++) {
1125     threads.emplace_back([&, i] {
1126       b.wait();
1127       rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
1128     });
1129   }
1130
1131   b.wait();
1132   EXPECT_TRUE(q.write(42));
1133
1134   for (int i = 0; i < 2; i++) {
1135     threads[i].join();
1136   }
1137
1138   for (int i = 0; i < 2; i++) {
1139     int other = (i + 1) % 2;
1140     if (rets[i]) {
1141       EXPECT_EQ(42, vals[i]);
1142       EXPECT_FALSE(rets[other]);
1143     }
1144   }
1145
1146   EXPECT_TRUE(watch.elapsed(wait));
1147 }
1148
1149 template <bool Dynamic>
1150 void testTryWriteUntil() {
1151   MPMCQueue<int, std::atomic, Dynamic> q{1};
1152   EXPECT_TRUE(q.write(42));
1153
1154   const auto wait = std::chrono::milliseconds(100);
1155   stop_watch<> watch;
1156   bool rets[2];
1157   std::vector<std::thread> threads;
1158   boost::barrier b{3};
1159   for (int i = 0; i < 2; i++) {
1160     threads.emplace_back([&, i] {
1161       b.wait();
1162       rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
1163     });
1164   }
1165
1166   b.wait();
1167   int x;
1168   EXPECT_TRUE(q.read(x));
1169   EXPECT_EQ(42, x);
1170
1171   for (int i = 0; i < 2; i++) {
1172     threads[i].join();
1173   }
1174   EXPECT_TRUE(q.read(x));
1175
1176   for (int i = 0; i < 2; i++) {
1177     int other = (i + 1) % 2;
1178     if (rets[i]) {
1179       EXPECT_EQ(i, x);
1180       EXPECT_FALSE(rets[other]);
1181     }
1182   }
1183
1184   EXPECT_TRUE(watch.elapsed(wait));
1185 }
1186
1187 TEST(MPMCQueue, try_read_until) {
1188   testTryReadUntil<false>();
1189 }
1190
1191 TEST(MPMCQueue, try_read_until_dynamic) {
1192   testTryReadUntil<true>();
1193 }
1194
1195 TEST(MPMCQueue, try_write_until) {
1196   testTryWriteUntil<false>();
1197 }
1198
1199 TEST(MPMCQueue, try_write_until_dynamic) {
1200   testTryWriteUntil<true>();
1201 }
1202
1203 template <bool Dynamic>
1204 void testTimeout(MPMCQueue<int, std::atomic, Dynamic>& q) {
1205   CHECK(q.write(1));
1206   /* The following must not block forever */
1207   q.tryWriteUntil(
1208       std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
1209 }
1210
1211 TEST(MPMCQueue, try_write_until_timeout) {
1212   folly::MPMCQueue<int, std::atomic, false> queue(1);
1213   testTimeout<false>(queue);
1214 }
1215
1216 TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
1217   folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
1218   testTimeout<true>(queue);
1219 }