2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/Format.h>
18 #include <folly/MPMCQueue.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/GTest.h>
21 #include <folly/portability/SysResource.h>
22 #include <folly/portability/SysTime.h>
23 #include <folly/portability/Unistd.h>
24 #include <folly/stop_watch.h>
25 #include <folly/test/DeterministicSchedule.h>
27 #include <boost/intrusive_ptr.hpp>
28 #include <boost/thread/barrier.hpp>
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
36 using namespace folly;
37 using namespace detail;
39 using std::chrono::time_point;
40 using std::chrono::steady_clock;
41 using std::chrono::seconds;
42 using std::chrono::milliseconds;
44 using std::unique_ptr;
47 typedef DeterministicSchedule DSched;
49 template <template<typename> class Atom>
50 void run_mt_sequencer_thread(
54 TurnSequencer<Atom>& seq,
55 Atom<uint32_t>& spinThreshold,
58 for (int op = i; op < numOps; op += numThreads) {
59 seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
60 EXPECT_EQ(prev, op - 1);
62 seq.completeTurn(init + op);
66 template <template<typename> class Atom>
67 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
68 TurnSequencer<Atom> seq(init);
69 Atom<uint32_t> spinThreshold(0);
72 vector<std::thread> threads(numThreads);
73 for (int i = 0; i < numThreads; ++i) {
74 threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
75 numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
79 for (auto& thr : threads) {
83 EXPECT_EQ(prev, numOps - 1);
86 TEST(MPMCQueue, sequencer) {
87 run_mt_sequencer_test<std::atomic>(1, 100, 0);
88 run_mt_sequencer_test<std::atomic>(2, 100000, -100);
89 run_mt_sequencer_test<std::atomic>(100, 10000, -100);
92 TEST(MPMCQueue, sequencer_emulated_futex) {
93 run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
94 run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
95 run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
98 TEST(MPMCQueue, sequencer_deterministic) {
99 DSched sched(DSched::uniform(0));
100 run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
101 run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
102 run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
105 template <bool Dynamic = false, typename T>
106 void runElementTypeTest(T&& src) {
107 MPMCQueue<T, std::atomic, Dynamic> cq(10);
108 cq.blockingWrite(std::forward<T>(src));
110 cq.blockingRead(dest);
111 EXPECT_TRUE(cq.write(std::move(dest)));
112 EXPECT_TRUE(cq.read(dest));
113 auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
114 EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
115 EXPECT_TRUE(cq.read(dest));
116 auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
117 EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
118 EXPECT_TRUE(cq.read(dest));
122 static FOLLY_TLS int active_instances;
124 mutable std::atomic<int> rc;
126 RefCounted() : rc(0) {
134 FOLLY_TLS int RefCounted::active_instances;
136 void intrusive_ptr_add_ref(RefCounted const* p) {
140 void intrusive_ptr_release(RefCounted const* p) {
141 if (--(p->rc) == 0) {
146 TEST(MPMCQueue, lots_of_element_types) {
147 runElementTypeTest(10);
148 runElementTypeTest(string("abc"));
149 runElementTypeTest(std::make_pair(10, string("def")));
150 runElementTypeTest(vector<string>{{"abc"}});
151 runElementTypeTest(std::make_shared<char>('a'));
152 runElementTypeTest(std::make_unique<char>('a'));
153 runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
154 EXPECT_EQ(RefCounted::active_instances, 0);
157 TEST(MPMCQueue, lots_of_element_types_dynamic) {
158 runElementTypeTest<true>(10);
159 runElementTypeTest<true>(string("abc"));
160 runElementTypeTest<true>(std::make_pair(10, string("def")));
161 runElementTypeTest<true>(vector<string>{{"abc"}});
162 runElementTypeTest<true>(std::make_shared<char>('a'));
163 runElementTypeTest<true>(std::make_unique<char>('a'));
164 runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
165 EXPECT_EQ(RefCounted::active_instances, 0);
168 TEST(MPMCQueue, single_thread_enqdeq) {
169 // Non-dynamic version only.
170 // False positive for dynamic version. Capacity can be temporarily
171 // higher than specified.
172 MPMCQueue<int> cq(10);
174 for (int pass = 0; pass < 10; ++pass) {
175 for (int i = 0; i < 10; ++i) {
176 EXPECT_TRUE(cq.write(i));
178 EXPECT_FALSE(cq.write(-1));
179 EXPECT_FALSE(cq.isEmpty());
180 EXPECT_EQ(cq.size(), 10);
182 for (int i = 0; i < 5; ++i) {
184 EXPECT_TRUE(cq.read(dest));
187 for (int i = 5; i < 10; ++i) {
189 cq.blockingRead(dest);
193 EXPECT_FALSE(cq.read(dest));
196 EXPECT_TRUE(cq.isEmpty());
197 EXPECT_EQ(cq.size(), 0);
201 TEST(MPMCQueue, tryenq_capacity_test) {
202 // Non-dynamic version only.
203 // False positive for dynamic version. Capacity can be temporarily
204 // higher than specified.
205 for (size_t cap = 1; cap < 100; ++cap) {
206 MPMCQueue<int> cq(cap);
207 for (size_t i = 0; i < cap; ++i) {
208 EXPECT_TRUE(cq.write(i));
210 EXPECT_FALSE(cq.write(100));
214 TEST(MPMCQueue, enq_capacity_test) {
215 // Non-dynamic version only.
216 // False positive for dynamic version. Capacity can be temporarily
217 // higher than specified.
218 for (auto cap : { 1, 100, 10000 }) {
219 MPMCQueue<int> cq(cap);
220 for (int i = 0; i < cap; ++i) {
225 auto thr = std::thread([&]{
226 cq.blockingWrite(100);
232 cq.blockingRead(dummy);
238 template <template<typename> class Atom, bool Dynamic = false>
239 void runTryEnqDeqThread(
242 MPMCQueue<int, Atom, Dynamic>& cq,
243 std::atomic<uint64_t>& sum,
245 uint64_t threadSum = 0;
247 // received doesn't reflect any actual values, we just start with
248 // t and increment by numThreads to get the rounding of termination
249 // correct if numThreads doesn't evenly divide numOps
251 while (src < n || received < n) {
252 if (src < n && cq.write(src)) {
257 if (received < n && cq.read(dst)) {
258 received += numThreads;
265 template <template<typename> class Atom, bool Dynamic = false>
266 void runTryEnqDeqTest(int numThreads, int numOps) {
267 // write and read aren't linearizable, so we don't have
268 // hard guarantees on their individual behavior. We can still test
269 // correctness in aggregate
270 MPMCQueue<int,Atom, Dynamic> cq(numThreads);
273 vector<std::thread> threads(numThreads);
274 std::atomic<uint64_t> sum(0);
275 for (int t = 0; t < numThreads; ++t) {
276 threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
277 numThreads, n, std::ref(cq), std::ref(sum), t));
279 for (auto& t : threads) {
282 EXPECT_TRUE(cq.isEmpty());
283 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
286 TEST(MPMCQueue, mt_try_enq_deq) {
287 int nts[] = { 1, 3, 100 };
291 runTryEnqDeqTest<std::atomic>(nt, n);
295 TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
296 int nts[] = { 1, 3, 100 };
300 runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
304 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
305 int nts[] = { 1, 3, 100 };
309 runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
313 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
314 int nts[] = { 1, 3, 100 };
318 runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
322 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
323 int nts[] = { 3, 10 };
326 LOG(INFO) << "using seed " << seed;
331 DSched sched(DSched::uniform(seed));
332 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
335 DSched sched(DSched::uniformSubset(seed, 2));
336 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
339 DSched sched(DSched::uniform(seed));
340 runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
343 DSched sched(DSched::uniformSubset(seed, 2));
344 runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
349 uint64_t nowMicro() {
351 gettimeofday(&tv, 0);
352 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
355 template <typename Q>
356 struct WriteMethodCaller {
357 WriteMethodCaller() {}
358 virtual ~WriteMethodCaller() = default;
359 virtual bool callWrite(Q& q, int i) = 0;
360 virtual string methodName() = 0;
363 template <typename Q>
364 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
365 bool callWrite(Q& q, int i) override {
369 string methodName() override { return "blockingWrite"; }
372 template <typename Q>
373 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
374 bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
375 string methodName() override { return "writeIfNotFull"; }
378 template <typename Q>
379 struct WriteCaller : public WriteMethodCaller<Q> {
380 bool callWrite(Q& q, int i) override { return q.write(i); }
381 string methodName() override { return "write"; }
384 template <typename Q,
385 class Clock = steady_clock,
386 class Duration = typename Clock::duration>
387 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
388 const Duration duration_;
389 explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
390 bool callWrite(Q& q, int i) override {
391 auto then = Clock::now() + duration_;
392 return q.tryWriteUntil(then, i);
394 string methodName() override {
395 return folly::sformat(
396 "tryWriteUntil({}ms)",
397 std::chrono::duration_cast<milliseconds>(duration_).count());
401 template <typename Q>
402 string producerConsumerBench(Q&& queue,
407 WriteMethodCaller<Q>& writer,
408 bool ignoreContents = false) {
411 struct rusage beginUsage;
412 getrusage(RUSAGE_SELF, &beginUsage);
414 auto beginMicro = nowMicro();
417 std::atomic<uint64_t> sum(0);
418 std::atomic<uint64_t> failed(0);
420 vector<std::thread> producers(numProducers);
421 for (int t = 0; t < numProducers; ++t) {
422 producers[t] = DSched::thread([&,t]{
423 for (int i = t; i < numOps; i += numProducers) {
424 while (!writer.callWrite(q, i)) {
431 vector<std::thread> consumers(numConsumers);
432 for (int t = 0; t < numConsumers; ++t) {
433 consumers[t] = DSched::thread([&,t]{
434 uint64_t localSum = 0;
435 for (int i = t; i < numOps; i += numConsumers) {
437 q.blockingRead(dest);
438 EXPECT_FALSE(dest == -1);
445 for (auto& t : producers) {
448 for (auto& t : consumers) {
451 if (!ignoreContents) {
452 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
455 auto endMicro = nowMicro();
457 struct rusage endUsage;
458 getrusage(RUSAGE_SELF, &endUsage);
460 uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
461 long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
462 (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
463 uint64_t failures = failed;
464 size_t allocated = q.allocatedCapacity();
466 return folly::sformat(
467 "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
468 "handoff, {} failures, {} allocated",
480 template <bool Dynamic = false>
481 void runMtProdConsDeterministic(long seed) {
482 // we use the Bench method, but perf results are meaningless under DSched
483 DSched sched(DSched::uniform(seed));
485 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
487 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
488 DeterministicAtomic, Dynamic>>>());
489 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
490 DeterministicAtomic, Dynamic>>>());
491 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
492 DeterministicAtomic, Dynamic>>>());
493 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
494 DeterministicAtomic, Dynamic>>>(milliseconds(1)));
495 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
496 DeterministicAtomic, Dynamic>>>(seconds(2)));
499 for (const auto& caller : callers) {
502 producerConsumerBench(
503 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
504 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
505 + folly::to<std::string>(cap)+")",
512 producerConsumerBench(
513 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
514 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
515 + folly::to<std::string>(cap)+")",
522 producerConsumerBench(
523 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
524 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
525 + folly::to<std::string>(cap)+")",
532 producerConsumerBench(
533 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
534 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
535 + folly::to<std::string>(cap)+")",
542 producerConsumerBench(
543 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
544 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
545 + folly::to<std::string>(cap)+")",
553 void runMtProdConsDeterministicDynamic(
562 // we use the Bench method, but perf results are meaningless under DSched
563 DSched sched(DSched::uniform(seed));
565 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
567 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
568 DeterministicAtomic, true>>>());
569 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
570 DeterministicAtomic, true>>>());
571 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
572 DeterministicAtomic, true>>>());
573 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
574 DeterministicAtomic, true>>>(milliseconds(1)));
575 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
576 DeterministicAtomic, true>>>(seconds(2)));
578 for (const auto& caller : callers) {
580 producerConsumerBench(
581 MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
582 "MPMCQueue<int, DeterministicAtomic, true>("
583 + folly::to<std::string>(cap) + ", "
584 + folly::to<std::string>(minCap) + ", "
585 + folly::to<std::string>(mult)+")",
593 TEST(MPMCQueue, mt_prod_cons_deterministic) {
594 runMtProdConsDeterministic(0);
597 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
598 runMtProdConsDeterministic<true>(0);
601 template <typename T>
602 void setFromEnv(T& var, const char* envvar) {
603 char* str = std::getenv(envvar);
604 if (str) { var = atoi(str); }
607 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
611 uint32_t numOps = 1000;
615 setFromEnv(seed, "SEED");
616 setFromEnv(prods, "PRODS");
617 setFromEnv(cons, "CONS");
618 setFromEnv(numOps, "NUM_OPS");
619 setFromEnv(cap, "CAP");
620 setFromEnv(minCap, "MIN_CAP");
621 setFromEnv(mult, "MULT");
622 runMtProdConsDeterministicDynamic(
623 seed, prods, cons, numOps, cap, minCap, mult);
626 #define PC_BENCH(q, np, nc, ...) \
627 producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
629 template <bool Dynamic = false>
630 void runMtProdCons() {
632 setFromEnv(n, "NUM_OPS");
633 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, std::atomic, Dynamic>>>>
635 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
636 std::atomic, Dynamic>>>());
637 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
638 std::atomic, Dynamic>>>());
639 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int, std::atomic,
641 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
642 std::atomic, Dynamic>>>(milliseconds(1)));
643 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
644 std::atomic, Dynamic>>>(seconds(2)));
645 for (const auto& caller : callers) {
646 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
648 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
650 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
652 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
654 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
656 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
658 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
660 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
662 LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(100000)),
663 32, 100, n, *caller);
667 TEST(MPMCQueue, mt_prod_cons) {
671 TEST(MPMCQueue, mt_prod_cons_dynamic) {
672 runMtProdCons</* Dynamic = */ true>();
675 template <bool Dynamic = false>
676 void runMtProdConsEmulatedFutex() {
678 vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic,
680 callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
681 EmulatedFutexAtomic, Dynamic>>>());
682 callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
683 EmulatedFutexAtomic, Dynamic>>>());
684 callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
685 EmulatedFutexAtomic, Dynamic>>>());
686 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
687 EmulatedFutexAtomic, Dynamic>>>(milliseconds(1)));
688 callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
689 EmulatedFutexAtomic, Dynamic>>>(seconds(2)));
690 for (const auto& caller : callers) {
691 LOG(INFO) << PC_BENCH(
692 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 1, n, *caller);
693 LOG(INFO) << PC_BENCH(
694 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 1, n, *caller);
695 LOG(INFO) << PC_BENCH(
696 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 10, n, *caller);
697 LOG(INFO) << PC_BENCH(
698 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 10, n, *caller);
699 LOG(INFO) << PC_BENCH(
700 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 1, n, *caller);
701 LOG(INFO) << PC_BENCH(
702 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 10, 1, n, *caller);
703 LOG(INFO) << PC_BENCH(
704 (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 10, n, *caller);
705 LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
706 (10000)), 10, 10, n, *caller);
707 LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
708 (100000)), 32, 100, n, *caller);
712 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
713 runMtProdConsEmulatedFutex();
716 TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
717 runMtProdConsEmulatedFutex</* Dynamic = */ true>();
720 template <template <typename> class Atom, bool Dynamic = false>
721 void runNeverFailThread(int numThreads,
723 MPMCQueue<int, Atom, Dynamic>& cq,
724 std::atomic<uint64_t>& sum,
726 uint64_t threadSum = 0;
727 for (int i = t; i < n; i += numThreads) {
729 EXPECT_TRUE(cq.writeIfNotFull(i));
732 EXPECT_TRUE(cq.readIfNotEmpty(dest));
733 EXPECT_TRUE(dest >= 0);
739 template <template <typename> class Atom, bool Dynamic = false>
740 uint64_t runNeverFailTest(int numThreads, int numOps) {
741 // always #enq >= #deq
742 MPMCQueue<int, Atom, Dynamic> cq(numThreads);
745 auto beginMicro = nowMicro();
747 vector<std::thread> threads(numThreads);
748 std::atomic<uint64_t> sum(0);
749 for (int t = 0; t < numThreads; ++t) {
750 threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
757 for (auto& t : threads) {
760 EXPECT_TRUE(cq.isEmpty());
761 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
763 return nowMicro() - beginMicro;
766 template <template<typename> class Atom, bool Dynamic = false>
767 void runMtNeverFail(std::vector<int>& nts, int n) {
769 uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
770 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
775 // All the never_fail tests are for the non-dynamic version only.
776 // False positive for dynamic version. Some writeIfNotFull() and
777 // tryWriteUntil() operations may fail in transient conditions related
780 TEST(MPMCQueue, mt_never_fail) {
781 std::vector<int> nts {1, 3, 100};
783 runMtNeverFail<std::atomic>(nts, n);
786 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
787 std::vector<int> nts {1, 3, 100};
789 runMtNeverFail<EmulatedFutexAtomic>(nts, n);
792 template<bool Dynamic = false>
793 void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
794 LOG(INFO) << "using seed " << seed;
797 DSched sched(DSched::uniform(seed));
798 runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
801 DSched sched(DSched::uniformSubset(seed, 2));
802 runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
807 TEST(MPMCQueue, mt_never_fail_deterministic) {
808 std::vector<int> nts {3, 10};
809 long seed = 0; // nowMicro() % 10000;
811 runMtNeverFailDeterministic(nts, n, seed);
814 template <class Clock, template <typename> class Atom, bool Dynamic>
815 void runNeverFailUntilThread(int numThreads,
817 MPMCQueue<int, Atom, Dynamic>& cq,
818 std::atomic<uint64_t>& sum,
820 uint64_t threadSum = 0;
821 for (int i = t; i < n; i += numThreads) {
823 auto soon = Clock::now() + std::chrono::seconds(1);
824 EXPECT_TRUE(cq.tryWriteUntil(soon, i));
827 EXPECT_TRUE(cq.readIfNotEmpty(dest));
828 EXPECT_TRUE(dest >= 0);
834 template <class Clock, template <typename> class Atom, bool Dynamic = false>
835 uint64_t runNeverFailTest(int numThreads, int numOps) {
836 // always #enq >= #deq
837 MPMCQueue<int, Atom, Dynamic> cq(numThreads);
840 auto beginMicro = nowMicro();
842 vector<std::thread> threads(numThreads);
843 std::atomic<uint64_t> sum(0);
844 for (int t = 0; t < numThreads; ++t) {
845 threads[t] = DSched::thread(std::bind(
846 runNeverFailUntilThread<Clock, Atom, Dynamic>,
853 for (auto& t : threads) {
856 EXPECT_TRUE(cq.isEmpty());
857 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
859 return nowMicro() - beginMicro;
862 template <bool Dynamic = false>
863 void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
866 runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
867 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
872 TEST(MPMCQueue, mt_never_fail_until_system) {
873 std::vector<int> nts {1, 3, 100};
875 runMtNeverFailUntilSystem(nts, n);
878 template <bool Dynamic = false>
879 void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
882 runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
883 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
888 TEST(MPMCQueue, mt_never_fail_until_steady) {
889 std::vector<int> nts {1, 3, 100};
891 runMtNeverFailUntilSteady(nts, n);
894 enum LifecycleEvent {
906 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
907 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
909 static int lc_outstanding() {
910 return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
911 lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
912 lc_counts[DESTRUCTOR];
915 static void lc_snap() {
916 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
917 lc_prev[i] = lc_counts[i];
921 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
923 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
924 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
925 int delta = i == what || i == what2 ? 1 : 0;
926 EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
927 << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
928 << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
929 << ", from line " << lineno;
934 template <typename R>
936 typedef R IsRelocatable;
940 Lifecycle() noexcept : constructed(true) {
941 ++lc_counts[DEFAULT_CONSTRUCTOR];
944 explicit Lifecycle(int /* n */, char const* /* s */) noexcept
945 : constructed(true) {
946 ++lc_counts[TWO_ARG_CONSTRUCTOR];
949 Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
950 ++lc_counts[COPY_CONSTRUCTOR];
953 Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
954 ++lc_counts[MOVE_CONSTRUCTOR];
957 Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
958 ++lc_counts[COPY_OPERATOR];
962 Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
963 ++lc_counts[MOVE_OPERATOR];
967 ~Lifecycle() noexcept {
968 ++lc_counts[DESTRUCTOR];
969 assert(lc_outstanding() >= 0);
975 template <typename R>
976 void runPerfectForwardingTest() {
978 EXPECT_EQ(lc_outstanding(), 0);
981 // Non-dynamic only. False positive for dynamic.
982 MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
983 LIFECYCLE_STEP(NOTHING);
985 for (int pass = 0; pass < 10; ++pass) {
986 for (int i = 0; i < 10; ++i) {
987 queue.blockingWrite();
988 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
990 queue.blockingWrite(1, "one");
991 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
995 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
996 queue.blockingWrite(std::move(src));
997 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
999 LIFECYCLE_STEP(DESTRUCTOR);
1003 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1004 queue.blockingWrite(src);
1005 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1007 LIFECYCLE_STEP(DESTRUCTOR);
1009 EXPECT_TRUE(queue.write());
1010 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1013 EXPECT_EQ(queue.size(), 50);
1014 EXPECT_FALSE(queue.write(2, "two"));
1015 LIFECYCLE_STEP(NOTHING);
1017 for (int i = 0; i < 50; ++i) {
1020 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1022 queue.blockingRead(node);
1024 // relocatable, moved via memcpy
1025 LIFECYCLE_STEP(DESTRUCTOR);
1027 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1030 LIFECYCLE_STEP(DESTRUCTOR);
1033 EXPECT_EQ(queue.size(), 0);
1036 // put one element back before destruction
1038 Lifecycle<R> src(3, "three");
1039 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1040 queue.write(std::move(src));
1041 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1043 LIFECYCLE_STEP(DESTRUCTOR); // destroy src
1045 LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
1047 EXPECT_EQ(lc_outstanding(), 0);
1050 TEST(MPMCQueue, perfect_forwarding) {
1051 runPerfectForwardingTest<std::false_type>();
1054 TEST(MPMCQueue, perfect_forwarding_relocatable) {
1055 runPerfectForwardingTest<std::true_type>();
1058 template <bool Dynamic = false>
1059 void run_queue_moving() {
1061 EXPECT_EQ(lc_outstanding(), 0);
1064 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
1065 LIFECYCLE_STEP(NOTHING);
1068 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1071 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
1073 LIFECYCLE_STEP(NOTHING);
1074 EXPECT_EQ(a.capacity(), 0);
1075 EXPECT_EQ(a.size(), 0);
1076 EXPECT_EQ(b.capacity(), 50);
1077 EXPECT_EQ(b.size(), 1);
1080 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1083 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
1084 LIFECYCLE_STEP(NOTHING);
1086 LIFECYCLE_STEP(NOTHING);
1087 EXPECT_EQ(c.capacity(), 50);
1088 EXPECT_EQ(c.size(), 2);
1091 Lifecycle<std::false_type> dst;
1092 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1093 c.blockingRead(dst);
1094 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1098 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
1099 LIFECYCLE_STEP(NOTHING);
1101 LIFECYCLE_STEP(NOTHING);
1102 EXPECT_EQ(c.capacity(), 10);
1103 EXPECT_TRUE(c.isEmpty());
1104 EXPECT_EQ(d.capacity(), 50);
1105 EXPECT_EQ(d.size(), 1);
1107 d.blockingRead(dst);
1108 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1110 c.blockingWrite(dst);
1111 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1113 d.blockingWrite(std::move(dst));
1114 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1115 } // d goes out of scope
1116 LIFECYCLE_STEP(DESTRUCTOR);
1117 } // dst goes out of scope
1118 LIFECYCLE_STEP(DESTRUCTOR);
1119 } // c goes out of scope
1120 LIFECYCLE_STEP(DESTRUCTOR);
1123 TEST(MPMCQueue, queue_moving) {
1127 TEST(MPMCQueue, queue_moving_dynamic) {
1128 run_queue_moving<true>();
1131 TEST(MPMCQueue, explicit_zero_capacity_fail) {
1132 ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
1134 using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
1135 ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
1138 template <bool Dynamic>
1139 void testTryReadUntil() {
1140 MPMCQueue<int, std::atomic, Dynamic> q{1};
1142 const auto wait = std::chrono::milliseconds(100);
1146 std::vector<std::thread> threads;
1147 boost::barrier b{3};
1148 for (int i = 0; i < 2; i++) {
1149 threads.emplace_back([&, i] {
1151 rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
1156 EXPECT_TRUE(q.write(42));
1158 for (int i = 0; i < 2; i++) {
1162 for (int i = 0; i < 2; i++) {
1163 int other = (i + 1) % 2;
1165 EXPECT_EQ(42, vals[i]);
1166 EXPECT_FALSE(rets[other]);
1170 EXPECT_TRUE(watch.elapsed(wait));
1173 template <bool Dynamic>
1174 void testTryWriteUntil() {
1175 MPMCQueue<int, std::atomic, Dynamic> q{1};
1176 EXPECT_TRUE(q.write(42));
1178 const auto wait = std::chrono::milliseconds(100);
1181 std::vector<std::thread> threads;
1182 boost::barrier b{3};
1183 for (int i = 0; i < 2; i++) {
1184 threads.emplace_back([&, i] {
1186 rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
1192 EXPECT_TRUE(q.read(x));
1195 for (int i = 0; i < 2; i++) {
1198 EXPECT_TRUE(q.read(x));
1200 for (int i = 0; i < 2; i++) {
1201 int other = (i + 1) % 2;
1204 EXPECT_FALSE(rets[other]);
1208 EXPECT_TRUE(watch.elapsed(wait));
1211 TEST(MPMCQueue, try_read_until) {
1212 testTryReadUntil<false>();
1215 TEST(MPMCQueue, try_read_until_dynamic) {
1216 testTryReadUntil<true>();
1219 TEST(MPMCQueue, try_write_until) {
1220 testTryWriteUntil<false>();
1223 TEST(MPMCQueue, try_write_until_dynamic) {
1224 testTryWriteUntil<true>();
1227 template <bool Dynamic>
1228 void testTimeout(MPMCQueue<int, std::atomic, Dynamic>& q) {
1230 /* The following must not block forever */
1232 std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
1235 TEST(MPMCQueue, try_write_until_timeout) {
1236 folly::MPMCQueue<int, std::atomic, false> queue(1);
1237 testTimeout<false>(queue);
1240 TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
1241 folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
1242 testTimeout<true>(queue);