Switch implicit references of folly::make_unique to std::make_unique
[folly.git] / folly / test / MPMCQueueTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/Format.h>
18 #include <folly/MPMCQueue.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/GTest.h>
21 #include <folly/portability/SysResource.h>
22 #include <folly/portability/SysTime.h>
23 #include <folly/portability/Unistd.h>
24 #include <folly/stop_watch.h>
25 #include <folly/test/DeterministicSchedule.h>
26
27 #include <boost/intrusive_ptr.hpp>
28 #include <boost/thread/barrier.hpp>
29 #include <functional>
30 #include <memory>
31 #include <thread>
32 #include <utility>
33
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
35
36 using namespace folly;
37 using namespace detail;
38 using namespace test;
39 using std::chrono::time_point;
40 using std::chrono::steady_clock;
41 using std::chrono::seconds;
42 using std::chrono::milliseconds;
43 using std::string;
44 using std::unique_ptr;
45 using std::vector;
46
47 typedef DeterministicSchedule DSched;
48
49 template <template<typename> class Atom>
50 void run_mt_sequencer_thread(
51     int numThreads,
52     int numOps,
53     uint32_t init,
54     TurnSequencer<Atom>& seq,
55     Atom<uint32_t>& spinThreshold,
56     int& prev,
57     int i) {
58   for (int op = i; op < numOps; op += numThreads) {
59     seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
60     EXPECT_EQ(prev, op - 1);
61     prev = op;
62     seq.completeTurn(init + op);
63   }
64 }
65
66 template <template<typename> class Atom>
67 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
68   TurnSequencer<Atom> seq(init);
69   Atom<uint32_t> spinThreshold(0);
70
71   int prev = -1;
72   vector<std::thread> threads(numThreads);
73   for (int i = 0; i < numThreads; ++i) {
74     threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
75           numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
76           std::ref(prev), i));
77   }
78
79   for (auto& thr : threads) {
80     DSched::join(thr);
81   }
82
83   EXPECT_EQ(prev, numOps - 1);
84 }
85
86 TEST(MPMCQueue, sequencer) {
87   run_mt_sequencer_test<std::atomic>(1, 100, 0);
88   run_mt_sequencer_test<std::atomic>(2, 100000, -100);
89   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
90 }
91
92 TEST(MPMCQueue, sequencer_emulated_futex) {
93   run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
94   run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
95   run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
96 }
97
98 TEST(MPMCQueue, sequencer_deterministic) {
99   DSched sched(DSched::uniform(0));
100   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
101   run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
102   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
103 }
104
105 template <bool Dynamic = false, typename T>
106 void runElementTypeTest(T&& src) {
107   MPMCQueue<T, std::atomic, Dynamic> cq(10);
108   cq.blockingWrite(std::forward<T>(src));
109   T dest;
110   cq.blockingRead(dest);
111   EXPECT_TRUE(cq.write(std::move(dest)));
112   EXPECT_TRUE(cq.read(dest));
113   auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
114   EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
115   EXPECT_TRUE(cq.read(dest));
116   auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
117   EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
118   EXPECT_TRUE(cq.read(dest));
119 }
120
121 struct RefCounted {
122   static FOLLY_TLS int active_instances;
123
124   mutable std::atomic<int> rc;
125
126   RefCounted() : rc(0) {
127     ++active_instances;
128   }
129
130   ~RefCounted() {
131     --active_instances;
132   }
133 };
134 FOLLY_TLS int RefCounted::active_instances;
135
136 void intrusive_ptr_add_ref(RefCounted const* p) {
137   p->rc++;
138 }
139
140 void intrusive_ptr_release(RefCounted const* p) {
141   if (--(p->rc) == 0) {
142     delete p;
143   }
144 }
145
146 TEST(MPMCQueue, lots_of_element_types) {
147   runElementTypeTest(10);
148   runElementTypeTest(string("abc"));
149   runElementTypeTest(std::make_pair(10, string("def")));
150   runElementTypeTest(vector<string>{{"abc"}});
151   runElementTypeTest(std::make_shared<char>('a'));
152   runElementTypeTest(std::make_unique<char>('a'));
153   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
154   EXPECT_EQ(RefCounted::active_instances, 0);
155 }
156
157 TEST(MPMCQueue, lots_of_element_types_dynamic) {
158   runElementTypeTest<true>(10);
159   runElementTypeTest<true>(string("abc"));
160   runElementTypeTest<true>(std::make_pair(10, string("def")));
161   runElementTypeTest<true>(vector<string>{{"abc"}});
162   runElementTypeTest<true>(std::make_shared<char>('a'));
163   runElementTypeTest<true>(std::make_unique<char>('a'));
164   runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
165   EXPECT_EQ(RefCounted::active_instances, 0);
166 }
167
168 TEST(MPMCQueue, single_thread_enqdeq) {
169   // Non-dynamic version only.
170   // False positive for dynamic version. Capacity can be temporarily
171   // higher than specified.
172   MPMCQueue<int> cq(10);
173
174   for (int pass = 0; pass < 10; ++pass) {
175     for (int i = 0; i < 10; ++i) {
176       EXPECT_TRUE(cq.write(i));
177     }
178     EXPECT_FALSE(cq.write(-1));
179     EXPECT_FALSE(cq.isEmpty());
180     EXPECT_EQ(cq.size(), 10);
181
182     for (int i = 0; i < 5; ++i) {
183       int dest = -1;
184       EXPECT_TRUE(cq.read(dest));
185       EXPECT_EQ(dest, i);
186     }
187     for (int i = 5; i < 10; ++i) {
188       int dest = -1;
189       cq.blockingRead(dest);
190       EXPECT_EQ(dest, i);
191     }
192     int dest = -1;
193     EXPECT_FALSE(cq.read(dest));
194     EXPECT_EQ(dest, -1);
195
196     EXPECT_TRUE(cq.isEmpty());
197     EXPECT_EQ(cq.size(), 0);
198   }
199 }
200
201 TEST(MPMCQueue, tryenq_capacity_test) {
202   // Non-dynamic version only.
203   // False positive for dynamic version. Capacity can be temporarily
204   // higher than specified.
205   for (size_t cap = 1; cap < 100; ++cap) {
206     MPMCQueue<int> cq(cap);
207     for (size_t i = 0; i < cap; ++i) {
208       EXPECT_TRUE(cq.write(i));
209     }
210     EXPECT_FALSE(cq.write(100));
211   }
212 }
213
214 TEST(MPMCQueue, enq_capacity_test) {
215   // Non-dynamic version only.
216   // False positive for dynamic version. Capacity can be temporarily
217   // higher than specified.
218   for (auto cap : { 1, 100, 10000 }) {
219     MPMCQueue<int> cq(cap);
220     for (int i = 0; i < cap; ++i) {
221       cq.blockingWrite(i);
222     }
223     int t = 0;
224     int when;
225     auto thr = std::thread([&]{
226       cq.blockingWrite(100);
227       when = t;
228     });
229     usleep(2000);
230     t = 1;
231     int dummy;
232     cq.blockingRead(dummy);
233     thr.join();
234     EXPECT_EQ(when, 1);
235   }
236 }
237
238 template <template<typename> class Atom, bool Dynamic = false>
239 void runTryEnqDeqThread(
240     int numThreads,
241     int n, /*numOps*/
242     MPMCQueue<int, Atom, Dynamic>& cq,
243     std::atomic<uint64_t>& sum,
244     int t) {
245   uint64_t threadSum = 0;
246   int src = t;
247   // received doesn't reflect any actual values, we just start with
248   // t and increment by numThreads to get the rounding of termination
249   // correct if numThreads doesn't evenly divide numOps
250   int received = t;
251   while (src < n || received < n) {
252     if (src < n && cq.write(src)) {
253       src += numThreads;
254     }
255
256     int dst;
257     if (received < n && cq.read(dst)) {
258       received += numThreads;
259       threadSum += dst;
260     }
261   }
262   sum += threadSum;
263 }
264
265 template <template<typename> class Atom, bool Dynamic = false>
266 void runTryEnqDeqTest(int numThreads, int numOps) {
267   // write and read aren't linearizable, so we don't have
268   // hard guarantees on their individual behavior.  We can still test
269   // correctness in aggregate
270   MPMCQueue<int,Atom, Dynamic> cq(numThreads);
271
272   uint64_t n = numOps;
273   vector<std::thread> threads(numThreads);
274   std::atomic<uint64_t> sum(0);
275   for (int t = 0; t < numThreads; ++t) {
276     threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
277           numThreads, n, std::ref(cq), std::ref(sum), t));
278   }
279   for (auto& t : threads) {
280     DSched::join(t);
281   }
282   EXPECT_TRUE(cq.isEmpty());
283   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
284 }
285
286 TEST(MPMCQueue, mt_try_enq_deq) {
287   int nts[] = { 1, 3, 100 };
288
289   int n = 100000;
290   for (int nt : nts) {
291     runTryEnqDeqTest<std::atomic>(nt, n);
292   }
293 }
294
295 TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
296   int nts[] = { 1, 3, 100 };
297
298   int n = 100000;
299   for (int nt : nts) {
300     runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
301   }
302 }
303
304 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
305   int nts[] = { 1, 3, 100 };
306
307   int n = 100000;
308   for (int nt : nts) {
309     runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
310   }
311 }
312
313 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
314   int nts[] = { 1, 3, 100 };
315
316   int n = 100000;
317   for (int nt : nts) {
318     runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
319   }
320 }
321
322 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
323   int nts[] = { 3, 10 };
324
325   long seed = 0;
326   LOG(INFO) << "using seed " << seed;
327
328   int n = 1000;
329   for (int nt : nts) {
330     {
331       DSched sched(DSched::uniform(seed));
332       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
333     }
334     {
335       DSched sched(DSched::uniformSubset(seed, 2));
336       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
337     }
338     {
339       DSched sched(DSched::uniform(seed));
340       runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
341     }
342     {
343       DSched sched(DSched::uniformSubset(seed, 2));
344       runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
345     }
346   }
347 }
348
349 uint64_t nowMicro() {
350   timeval tv;
351   gettimeofday(&tv, 0);
352   return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
353 }
354
355 template <typename Q>
356 struct WriteMethodCaller {
357   WriteMethodCaller() {}
358   virtual ~WriteMethodCaller() = default;
359   virtual bool callWrite(Q& q, int i) = 0;
360   virtual string methodName() = 0;
361 };
362
363 template <typename Q>
364 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
365   bool callWrite(Q& q, int i) override {
366     q.blockingWrite(i);
367     return true;
368   }
369   string methodName() override { return "blockingWrite"; }
370 };
371
372 template <typename Q>
373 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
374   bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
375   string methodName() override { return "writeIfNotFull"; }
376 };
377
378 template <typename Q>
379 struct WriteCaller : public WriteMethodCaller<Q> {
380   bool callWrite(Q& q, int i) override { return q.write(i); }
381   string methodName() override { return "write"; }
382 };
383
384 template <typename Q,
385           class Clock = steady_clock,
386           class Duration = typename Clock::duration>
387 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
388   const Duration duration_;
389   explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
390   bool callWrite(Q& q, int i) override {
391     auto then = Clock::now() + duration_;
392     return q.tryWriteUntil(then, i);
393   }
394   string methodName() override {
395     return folly::sformat(
396         "tryWriteUntil({}ms)",
397         std::chrono::duration_cast<milliseconds>(duration_).count());
398   }
399 };
400
401 template <typename Q>
402 string producerConsumerBench(Q&& queue,
403                              string qName,
404                              int numProducers,
405                              int numConsumers,
406                              int numOps,
407                              WriteMethodCaller<Q>& writer,
408                              bool ignoreContents = false) {
409   Q& q = queue;
410
411   struct rusage beginUsage;
412   getrusage(RUSAGE_SELF, &beginUsage);
413
414   auto beginMicro = nowMicro();
415
416   uint64_t n = numOps;
417   std::atomic<uint64_t> sum(0);
418   std::atomic<uint64_t> failed(0);
419
420   vector<std::thread> producers(numProducers);
421   for (int t = 0; t < numProducers; ++t) {
422     producers[t] = DSched::thread([&,t]{
423       for (int i = t; i < numOps; i += numProducers) {
424         while (!writer.callWrite(q, i)) {
425           ++failed;
426         }
427       }
428     });
429   }
430
431   vector<std::thread> consumers(numConsumers);
432   for (int t = 0; t < numConsumers; ++t) {
433     consumers[t] = DSched::thread([&,t]{
434       uint64_t localSum = 0;
435       for (int i = t; i < numOps; i += numConsumers) {
436         int dest = -1;
437         q.blockingRead(dest);
438         EXPECT_FALSE(dest == -1);
439         localSum += dest;
440       }
441       sum += localSum;
442     });
443   }
444
445   for (auto& t : producers) {
446     DSched::join(t);
447   }
448   for (auto& t : consumers) {
449     DSched::join(t);
450   }
451   if (!ignoreContents) {
452     EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
453   }
454
455   auto endMicro = nowMicro();
456
457   struct rusage endUsage;
458   getrusage(RUSAGE_SELF, &endUsage);
459
460   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
461   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
462       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
463   uint64_t failures = failed;
464   size_t allocated = q.allocatedCapacity();
465
466   return folly::sformat(
467       "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
468       "handoff, {} failures, {} allocated",
469       qName,
470       numProducers,
471       writer.methodName(),
472       numConsumers,
473       nanosPer,
474       csw,
475       n,
476       failures,
477       allocated);
478 }
479
480 template <bool Dynamic = false>
481 void runMtProdConsDeterministic(long seed) {
482   // we use the Bench method, but perf results are meaningless under DSched
483   DSched sched(DSched::uniform(seed));
484
485   using QueueType = MPMCQueue<int, DeterministicAtomic, Dynamic>;
486
487   vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
488   callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
489   callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
490   callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
491   callers.emplace_back(
492       std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
493   callers.emplace_back(
494       std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
495   size_t cap;
496
497   for (const auto& caller : callers) {
498     cap = 10;
499     LOG(INFO) <<
500       producerConsumerBench(
501         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
502         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
503           + folly::to<std::string>(cap)+")",
504         1,
505         1,
506         1000,
507         *caller);
508     cap = 100;
509     LOG(INFO) <<
510       producerConsumerBench(
511         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
512         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
513           + folly::to<std::string>(cap)+")",
514         10,
515         10,
516         1000,
517         *caller);
518     cap = 10;
519     LOG(INFO) <<
520       producerConsumerBench(
521         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
522         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
523           + folly::to<std::string>(cap)+")",
524         1,
525         1,
526         1000,
527         *caller);
528     cap = 100;
529     LOG(INFO) <<
530       producerConsumerBench(
531         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
532         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
533           + folly::to<std::string>(cap)+")",
534         10,
535         10,
536         1000,
537         *caller);
538     cap = 1;
539     LOG(INFO) <<
540       producerConsumerBench(
541         MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
542         "MPMCQueue<int, DeterministicAtomic, Dynamic>("
543           + folly::to<std::string>(cap)+")",
544         10,
545         10,
546         1000,
547         *caller);
548   }
549 }
550
551 void runMtProdConsDeterministicDynamic(
552   long seed,
553   uint32_t prods,
554   uint32_t cons,
555   uint32_t numOps,
556   size_t cap,
557   size_t minCap,
558   size_t mult
559 ) {
560   // we use the Bench method, but perf results are meaningless under DSched
561   DSched sched(DSched::uniform(seed));
562
563   using QueueType = MPMCQueue<int, DeterministicAtomic, true>;
564
565   vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
566   callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
567   callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
568   callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
569   callers.emplace_back(
570       std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
571   callers.emplace_back(
572       std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
573
574   for (const auto& caller : callers) {
575     LOG(INFO) <<
576       producerConsumerBench(
577         MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
578         "MPMCQueue<int, DeterministicAtomic, true>("
579           + folly::to<std::string>(cap) + ", "
580           + folly::to<std::string>(minCap) + ", "
581           + folly::to<std::string>(mult)+")",
582         prods,
583         cons,
584         numOps,
585         *caller);
586   }
587 }
588
589 TEST(MPMCQueue, mt_prod_cons_deterministic) {
590   runMtProdConsDeterministic(0);
591 }
592
593 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
594   runMtProdConsDeterministic<true>(0);
595 }
596
597 template <typename T>
598 void setFromEnv(T& var, const char* envvar) {
599   char* str = std::getenv(envvar);
600   if (str) { var = atoi(str); }
601 }
602
603 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
604   long seed = 0;
605   uint32_t prods = 10;
606   uint32_t cons = 10;
607   uint32_t numOps = 1000;
608   size_t cap = 10000;
609   size_t minCap = 9;
610   size_t mult = 3;
611   setFromEnv(seed, "SEED");
612   setFromEnv(prods, "PRODS");
613   setFromEnv(cons, "CONS");
614   setFromEnv(numOps, "NUM_OPS");
615   setFromEnv(cap, "CAP");
616   setFromEnv(minCap, "MIN_CAP");
617   setFromEnv(mult, "MULT");
618   runMtProdConsDeterministicDynamic(
619     seed, prods, cons, numOps, cap, minCap, mult);
620 }
621
622 #define PC_BENCH(q, np, nc, ...) \
623     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
624
625 template <bool Dynamic = false>
626 void runMtProdCons() {
627   using QueueType = MPMCQueue<int, std::atomic, Dynamic>;
628
629   int n = 100000;
630   setFromEnv(n, "NUM_OPS");
631   vector<unique_ptr<WriteMethodCaller<QueueType>>>
632     callers;
633   callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
634   callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
635   callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
636   callers.emplace_back(
637       std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
638   callers.emplace_back(
639       std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
640   for (const auto& caller : callers) {
641     LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
642     LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
643     LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
644     LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
645     LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
646     LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
647     LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
648     LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
649     LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
650   }
651 }
652
653 TEST(MPMCQueue, mt_prod_cons) {
654   runMtProdCons();
655 }
656
657 TEST(MPMCQueue, mt_prod_cons_dynamic) {
658   runMtProdCons</* Dynamic = */ true>();
659 }
660
661 template <bool Dynamic = false>
662 void runMtProdConsEmulatedFutex() {
663   using QueueType = MPMCQueue<int, EmulatedFutexAtomic, Dynamic>;
664
665   int n = 100000;
666   vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
667   callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
668   callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
669   callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
670   callers.emplace_back(
671       std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
672   callers.emplace_back(
673       std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
674   for (const auto& caller : callers) {
675     LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
676     LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
677     LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
678     LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
679     LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
680     LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
681     LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
682     LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
683     LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
684   }
685 }
686
687 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
688   runMtProdConsEmulatedFutex();
689 }
690
691 TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
692   runMtProdConsEmulatedFutex</* Dynamic = */ true>();
693 }
694
695 template <template <typename> class Atom, bool Dynamic = false>
696 void runNeverFailThread(int numThreads,
697                         int n, /*numOps*/
698                         MPMCQueue<int, Atom, Dynamic>& cq,
699                         std::atomic<uint64_t>& sum,
700                         int t) {
701   uint64_t threadSum = 0;
702   for (int i = t; i < n; i += numThreads) {
703     // enq + deq
704     EXPECT_TRUE(cq.writeIfNotFull(i));
705
706     int dest = -1;
707     EXPECT_TRUE(cq.readIfNotEmpty(dest));
708     EXPECT_TRUE(dest >= 0);
709     threadSum += dest;
710   }
711   sum += threadSum;
712 }
713
714 template <template <typename> class Atom, bool Dynamic = false>
715 uint64_t runNeverFailTest(int numThreads, int numOps) {
716   // always #enq >= #deq
717   MPMCQueue<int, Atom, Dynamic> cq(numThreads);
718
719   uint64_t n = numOps;
720   auto beginMicro = nowMicro();
721
722   vector<std::thread> threads(numThreads);
723   std::atomic<uint64_t> sum(0);
724   for (int t = 0; t < numThreads; ++t) {
725     threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
726                                           numThreads,
727                                           n,
728                                           std::ref(cq),
729                                           std::ref(sum),
730                                           t));
731   }
732   for (auto& t : threads) {
733     DSched::join(t);
734   }
735   EXPECT_TRUE(cq.isEmpty());
736   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
737
738   return nowMicro() - beginMicro;
739 }
740
741 template <template<typename> class Atom, bool Dynamic = false>
742 void runMtNeverFail(std::vector<int>& nts, int n) {
743   for (int nt : nts) {
744     uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
745     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
746               << " threads";
747   }
748 }
749
750 // All the never_fail tests are for the non-dynamic version only.
751 // False positive for dynamic version. Some writeIfNotFull() and
752 // tryWriteUntil() operations may fail in transient conditions related
753 // to expansion.
754
755 TEST(MPMCQueue, mt_never_fail) {
756   std::vector<int> nts {1, 3, 100};
757   int n = 100000;
758   runMtNeverFail<std::atomic>(nts, n);
759 }
760
761 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
762   std::vector<int> nts {1, 3, 100};
763   int n = 100000;
764   runMtNeverFail<EmulatedFutexAtomic>(nts, n);
765 }
766
767 template<bool Dynamic = false>
768 void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
769   LOG(INFO) << "using seed " << seed;
770   for (int nt : nts) {
771     {
772       DSched sched(DSched::uniform(seed));
773       runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
774     }
775     {
776       DSched sched(DSched::uniformSubset(seed, 2));
777       runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
778     }
779   }
780 }
781
782 TEST(MPMCQueue, mt_never_fail_deterministic) {
783   std::vector<int> nts {3, 10};
784   long seed = 0; // nowMicro() % 10000;
785   int n = 1000;
786   runMtNeverFailDeterministic(nts, n, seed);
787 }
788
789 template <class Clock, template <typename> class Atom, bool Dynamic>
790 void runNeverFailUntilThread(int numThreads,
791                              int n, /*numOps*/
792                              MPMCQueue<int, Atom, Dynamic>& cq,
793                              std::atomic<uint64_t>& sum,
794                              int t) {
795   uint64_t threadSum = 0;
796   for (int i = t; i < n; i += numThreads) {
797     // enq + deq
798     auto soon = Clock::now() + std::chrono::seconds(1);
799     EXPECT_TRUE(cq.tryWriteUntil(soon, i));
800
801     int dest = -1;
802     EXPECT_TRUE(cq.readIfNotEmpty(dest));
803     EXPECT_TRUE(dest >= 0);
804     threadSum += dest;
805   }
806   sum += threadSum;
807 }
808
809 template <class Clock, template <typename> class Atom, bool Dynamic = false>
810 uint64_t runNeverFailTest(int numThreads, int numOps) {
811   // always #enq >= #deq
812   MPMCQueue<int, Atom, Dynamic> cq(numThreads);
813
814   uint64_t n = numOps;
815   auto beginMicro = nowMicro();
816
817   vector<std::thread> threads(numThreads);
818   std::atomic<uint64_t> sum(0);
819   for (int t = 0; t < numThreads; ++t) {
820     threads[t] = DSched::thread(std::bind(
821                                   runNeverFailUntilThread<Clock, Atom, Dynamic>,
822                                   numThreads,
823                                   n,
824                                   std::ref(cq),
825                                   std::ref(sum),
826                                   t));
827   }
828   for (auto& t : threads) {
829     DSched::join(t);
830   }
831   EXPECT_TRUE(cq.isEmpty());
832   EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
833
834   return nowMicro() - beginMicro;
835 }
836
837 template <bool Dynamic = false>
838 void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
839   for (int nt : nts) {
840     uint64_t elapsed =
841       runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
842     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
843               << " threads";
844   }
845 }
846
847 TEST(MPMCQueue, mt_never_fail_until_system) {
848   std::vector<int> nts {1, 3, 100};
849   int n = 100000;
850   runMtNeverFailUntilSystem(nts, n);
851 }
852
853 template <bool Dynamic = false>
854 void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
855   for (int nt : nts) {
856     uint64_t elapsed =
857       runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
858     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
859               << " threads";
860   }
861 }
862
863 TEST(MPMCQueue, mt_never_fail_until_steady) {
864   std::vector<int> nts {1, 3, 100};
865   int n = 100000;
866   runMtNeverFailUntilSteady(nts, n);
867 }
868
869 enum LifecycleEvent {
870   NOTHING = -1,
871   DEFAULT_CONSTRUCTOR,
872   COPY_CONSTRUCTOR,
873   MOVE_CONSTRUCTOR,
874   TWO_ARG_CONSTRUCTOR,
875   COPY_OPERATOR,
876   MOVE_OPERATOR,
877   DESTRUCTOR,
878   MAX_LIFECYCLE_EVENT
879 };
880
881 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
882 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
883
884 static int lc_outstanding() {
885   return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
886       lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
887       lc_counts[DESTRUCTOR];
888 }
889
890 static void lc_snap() {
891   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
892     lc_prev[i] = lc_counts[i];
893   }
894 }
895
896 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
897
898 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
899   for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
900     int delta = i == what || i == what2 ? 1 : 0;
901     EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
902         << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
903         << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
904         << ", from line " << lineno;
905   }
906   lc_snap();
907 }
908
909 template <typename R>
910 struct Lifecycle {
911   typedef R IsRelocatable;
912
913   bool constructed;
914
915   Lifecycle() noexcept : constructed(true) {
916     ++lc_counts[DEFAULT_CONSTRUCTOR];
917   }
918
919   explicit Lifecycle(int /* n */, char const* /* s */) noexcept
920       : constructed(true) {
921     ++lc_counts[TWO_ARG_CONSTRUCTOR];
922   }
923
924   Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
925     ++lc_counts[COPY_CONSTRUCTOR];
926   }
927
928   Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
929     ++lc_counts[MOVE_CONSTRUCTOR];
930   }
931
932   Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
933     ++lc_counts[COPY_OPERATOR];
934     return *this;
935   }
936
937   Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
938     ++lc_counts[MOVE_OPERATOR];
939     return *this;
940   }
941
942   ~Lifecycle() noexcept {
943     ++lc_counts[DESTRUCTOR];
944     assert(lc_outstanding() >= 0);
945     assert(constructed);
946     constructed = false;
947   }
948 };
949
950 template <typename R>
951 void runPerfectForwardingTest() {
952   lc_snap();
953   EXPECT_EQ(lc_outstanding(), 0);
954
955   {
956     // Non-dynamic only. False positive for dynamic.
957     MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
958     LIFECYCLE_STEP(NOTHING);
959
960     for (int pass = 0; pass < 10; ++pass) {
961       for (int i = 0; i < 10; ++i) {
962         queue.blockingWrite();
963         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
964
965         queue.blockingWrite(1, "one");
966         LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
967
968         {
969           Lifecycle<R> src;
970           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
971           queue.blockingWrite(std::move(src));
972           LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
973         }
974         LIFECYCLE_STEP(DESTRUCTOR);
975
976         {
977           Lifecycle<R> src;
978           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
979           queue.blockingWrite(src);
980           LIFECYCLE_STEP(COPY_CONSTRUCTOR);
981         }
982         LIFECYCLE_STEP(DESTRUCTOR);
983
984         EXPECT_TRUE(queue.write());
985         LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
986       }
987
988       EXPECT_EQ(queue.size(), 50);
989       EXPECT_FALSE(queue.write(2, "two"));
990       LIFECYCLE_STEP(NOTHING);
991
992       for (int i = 0; i < 50; ++i) {
993         {
994           Lifecycle<R> node;
995           LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
996
997           queue.blockingRead(node);
998           if (R::value) {
999             // relocatable, moved via memcpy
1000             LIFECYCLE_STEP(DESTRUCTOR);
1001           } else {
1002             LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1003           }
1004         }
1005         LIFECYCLE_STEP(DESTRUCTOR);
1006       }
1007
1008       EXPECT_EQ(queue.size(), 0);
1009     }
1010
1011     // put one element back before destruction
1012     {
1013       Lifecycle<R> src(3, "three");
1014       LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1015       queue.write(std::move(src));
1016       LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1017     }
1018     LIFECYCLE_STEP(DESTRUCTOR); // destroy src
1019   }
1020   LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
1021
1022   EXPECT_EQ(lc_outstanding(), 0);
1023 }
1024
1025 TEST(MPMCQueue, perfect_forwarding) {
1026   runPerfectForwardingTest<std::false_type>();
1027 }
1028
1029 TEST(MPMCQueue, perfect_forwarding_relocatable) {
1030   runPerfectForwardingTest<std::true_type>();
1031 }
1032
1033 template <bool Dynamic = false>
1034 void run_queue_moving() {
1035   lc_snap();
1036   EXPECT_EQ(lc_outstanding(), 0);
1037
1038   {
1039     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
1040     LIFECYCLE_STEP(NOTHING);
1041
1042     a.blockingWrite();
1043     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1044
1045     // move constructor
1046     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
1047       = std::move(a);
1048     LIFECYCLE_STEP(NOTHING);
1049     EXPECT_EQ(a.capacity(), 0);
1050     EXPECT_EQ(a.size(), 0);
1051     EXPECT_EQ(b.capacity(), 50);
1052     EXPECT_EQ(b.size(), 1);
1053
1054     b.blockingWrite();
1055     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1056
1057     // move operator
1058     MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
1059     LIFECYCLE_STEP(NOTHING);
1060     c = std::move(b);
1061     LIFECYCLE_STEP(NOTHING);
1062     EXPECT_EQ(c.capacity(), 50);
1063     EXPECT_EQ(c.size(), 2);
1064
1065     {
1066       Lifecycle<std::false_type> dst;
1067       LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1068       c.blockingRead(dst);
1069       LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1070
1071       {
1072         // swap
1073         MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
1074         LIFECYCLE_STEP(NOTHING);
1075         std::swap(c, d);
1076         LIFECYCLE_STEP(NOTHING);
1077         EXPECT_EQ(c.capacity(), 10);
1078         EXPECT_TRUE(c.isEmpty());
1079         EXPECT_EQ(d.capacity(), 50);
1080         EXPECT_EQ(d.size(), 1);
1081
1082         d.blockingRead(dst);
1083         LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1084
1085         c.blockingWrite(dst);
1086         LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1087
1088         d.blockingWrite(std::move(dst));
1089         LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1090       } // d goes out of scope
1091       LIFECYCLE_STEP(DESTRUCTOR);
1092     } // dst goes out of scope
1093     LIFECYCLE_STEP(DESTRUCTOR);
1094   } // c goes out of scope
1095   LIFECYCLE_STEP(DESTRUCTOR);
1096 }
1097
1098 TEST(MPMCQueue, queue_moving) {
1099   run_queue_moving();
1100 }
1101
1102 TEST(MPMCQueue, queue_moving_dynamic) {
1103   run_queue_moving<true>();
1104 }
1105
1106 TEST(MPMCQueue, explicit_zero_capacity_fail) {
1107   ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
1108
1109   using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
1110   ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
1111 }
1112
1113 template <bool Dynamic>
1114 void testTryReadUntil() {
1115   MPMCQueue<int, std::atomic, Dynamic> q{1};
1116
1117   const auto wait = std::chrono::milliseconds(100);
1118   stop_watch<> watch;
1119   bool rets[2];
1120   int vals[2];
1121   std::vector<std::thread> threads;
1122   boost::barrier b{3};
1123   for (int i = 0; i < 2; i++) {
1124     threads.emplace_back([&, i] {
1125       b.wait();
1126       rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
1127     });
1128   }
1129
1130   b.wait();
1131   EXPECT_TRUE(q.write(42));
1132
1133   for (int i = 0; i < 2; i++) {
1134     threads[i].join();
1135   }
1136
1137   for (int i = 0; i < 2; i++) {
1138     int other = (i + 1) % 2;
1139     if (rets[i]) {
1140       EXPECT_EQ(42, vals[i]);
1141       EXPECT_FALSE(rets[other]);
1142     }
1143   }
1144
1145   EXPECT_TRUE(watch.elapsed(wait));
1146 }
1147
1148 template <bool Dynamic>
1149 void testTryWriteUntil() {
1150   MPMCQueue<int, std::atomic, Dynamic> q{1};
1151   EXPECT_TRUE(q.write(42));
1152
1153   const auto wait = std::chrono::milliseconds(100);
1154   stop_watch<> watch;
1155   bool rets[2];
1156   std::vector<std::thread> threads;
1157   boost::barrier b{3};
1158   for (int i = 0; i < 2; i++) {
1159     threads.emplace_back([&, i] {
1160       b.wait();
1161       rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
1162     });
1163   }
1164
1165   b.wait();
1166   int x;
1167   EXPECT_TRUE(q.read(x));
1168   EXPECT_EQ(42, x);
1169
1170   for (int i = 0; i < 2; i++) {
1171     threads[i].join();
1172   }
1173   EXPECT_TRUE(q.read(x));
1174
1175   for (int i = 0; i < 2; i++) {
1176     int other = (i + 1) % 2;
1177     if (rets[i]) {
1178       EXPECT_EQ(i, x);
1179       EXPECT_FALSE(rets[other]);
1180     }
1181   }
1182
1183   EXPECT_TRUE(watch.elapsed(wait));
1184 }
1185
1186 TEST(MPMCQueue, try_read_until) {
1187   testTryReadUntil<false>();
1188 }
1189
1190 TEST(MPMCQueue, try_read_until_dynamic) {
1191   testTryReadUntil<true>();
1192 }
1193
1194 TEST(MPMCQueue, try_write_until) {
1195   testTryWriteUntil<false>();
1196 }
1197
1198 TEST(MPMCQueue, try_write_until_dynamic) {
1199   testTryWriteUntil<true>();
1200 }
1201
1202 template <bool Dynamic>
1203 void testTimeout(MPMCQueue<int, std::atomic, Dynamic>& q) {
1204   CHECK(q.write(1));
1205   /* The following must not block forever */
1206   q.tryWriteUntil(
1207       std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
1208 }
1209
1210 TEST(MPMCQueue, try_write_until_timeout) {
1211   folly::MPMCQueue<int, std::atomic, false> queue(1);
1212   testTimeout<false>(queue);
1213 }
1214
1215 TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
1216   folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
1217   testTimeout<true>(queue);
1218 }