2 * Copyright 2013-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/Format.h>
18 #include <folly/MPMCQueue.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/GTest.h>
21 #include <folly/portability/SysResource.h>
22 #include <folly/portability/SysTime.h>
23 #include <folly/portability/Unistd.h>
24 #include <folly/stop_watch.h>
25 #include <folly/test/DeterministicSchedule.h>
27 #include <boost/intrusive_ptr.hpp>
28 #include <boost/thread/barrier.hpp>
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
36 using namespace folly;
37 using namespace detail;
39 using std::chrono::time_point;
40 using std::chrono::steady_clock;
41 using std::chrono::seconds;
42 using std::chrono::milliseconds;
44 using std::unique_ptr;
47 typedef DeterministicSchedule DSched;
49 template <template <typename> class Atom>
50 void run_mt_sequencer_thread(
54 TurnSequencer<Atom>& seq,
55 Atom<uint32_t>& spinThreshold,
58 for (int op = i; op < numOps; op += numThreads) {
59 seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
60 EXPECT_EQ(prev, op - 1);
62 seq.completeTurn(init + op);
66 template <template <typename> class Atom>
67 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
68 TurnSequencer<Atom> seq(init);
69 Atom<uint32_t> spinThreshold(0);
72 vector<std::thread> threads(numThreads);
73 for (int i = 0; i < numThreads; ++i) {
74 threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
75 numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
79 for (auto& thr : threads) {
83 EXPECT_EQ(prev, numOps - 1);
86 TEST(MPMCQueue, sequencer) {
87 run_mt_sequencer_test<std::atomic>(1, 100, 0);
88 run_mt_sequencer_test<std::atomic>(2, 100000, -100);
89 run_mt_sequencer_test<std::atomic>(100, 10000, -100);
92 TEST(MPMCQueue, sequencer_emulated_futex) {
93 run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
94 run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
95 run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
98 TEST(MPMCQueue, sequencer_deterministic) {
99 DSched sched(DSched::uniform(0));
100 run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
101 run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
102 run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
105 template <bool Dynamic = false, typename T>
106 void runElementTypeTest(T&& src) {
107 MPMCQueue<T, std::atomic, Dynamic> cq(10);
108 cq.blockingWrite(std::forward<T>(src));
110 cq.blockingRead(dest);
111 EXPECT_TRUE(cq.write(std::move(dest)));
112 EXPECT_TRUE(cq.read(dest));
113 auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
114 EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
115 EXPECT_TRUE(cq.read(dest));
116 auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
117 EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
118 EXPECT_TRUE(cq.read(dest));
122 static FOLLY_TLS int active_instances;
124 mutable std::atomic<int> rc;
126 RefCounted() : rc(0) {
134 FOLLY_TLS int RefCounted::active_instances;
136 void intrusive_ptr_add_ref(RefCounted const* p) {
140 void intrusive_ptr_release(RefCounted const* p) {
141 if (--(p->rc) == 0) {
146 TEST(MPMCQueue, lots_of_element_types) {
147 runElementTypeTest(10);
148 runElementTypeTest(string("abc"));
149 runElementTypeTest(std::make_pair(10, string("def")));
150 runElementTypeTest(vector<string>{{"abc"}});
151 runElementTypeTest(std::make_shared<char>('a'));
152 runElementTypeTest(std::make_unique<char>('a'));
153 runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
154 EXPECT_EQ(RefCounted::active_instances, 0);
157 TEST(MPMCQueue, lots_of_element_types_dynamic) {
158 runElementTypeTest<true>(10);
159 runElementTypeTest<true>(string("abc"));
160 runElementTypeTest<true>(std::make_pair(10, string("def")));
161 runElementTypeTest<true>(vector<string>{{"abc"}});
162 runElementTypeTest<true>(std::make_shared<char>('a'));
163 runElementTypeTest<true>(std::make_unique<char>('a'));
164 runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
165 EXPECT_EQ(RefCounted::active_instances, 0);
168 TEST(MPMCQueue, single_thread_enqdeq) {
169 // Non-dynamic version only.
170 // False positive for dynamic version. Capacity can be temporarily
171 // higher than specified.
172 MPMCQueue<int> cq(10);
174 for (int pass = 0; pass < 10; ++pass) {
175 for (int i = 0; i < 10; ++i) {
176 EXPECT_TRUE(cq.write(i));
178 EXPECT_FALSE(cq.write(-1));
179 EXPECT_FALSE(cq.isEmpty());
180 EXPECT_EQ(cq.size(), 10);
182 for (int i = 0; i < 5; ++i) {
184 EXPECT_TRUE(cq.read(dest));
187 for (int i = 5; i < 10; ++i) {
189 cq.blockingRead(dest);
193 EXPECT_FALSE(cq.read(dest));
196 EXPECT_TRUE(cq.isEmpty());
197 EXPECT_EQ(cq.size(), 0);
201 TEST(MPMCQueue, tryenq_capacity_test) {
202 // Non-dynamic version only.
203 // False positive for dynamic version. Capacity can be temporarily
204 // higher than specified.
205 for (size_t cap = 1; cap < 100; ++cap) {
206 MPMCQueue<int> cq(cap);
207 for (size_t i = 0; i < cap; ++i) {
208 EXPECT_TRUE(cq.write(i));
210 EXPECT_FALSE(cq.write(100));
214 TEST(MPMCQueue, enq_capacity_test) {
215 // Non-dynamic version only.
216 // False positive for dynamic version. Capacity can be temporarily
217 // higher than specified.
218 for (auto cap : { 1, 100, 10000 }) {
219 MPMCQueue<int> cq(cap);
220 for (int i = 0; i < cap; ++i) {
225 auto thr = std::thread([&]{
226 cq.blockingWrite(100);
232 cq.blockingRead(dummy);
238 template <template <typename> class Atom, bool Dynamic = false>
239 void runTryEnqDeqThread(
242 MPMCQueue<int, Atom, Dynamic>& cq,
243 std::atomic<uint64_t>& sum,
245 uint64_t threadSum = 0;
247 // received doesn't reflect any actual values, we just start with
248 // t and increment by numThreads to get the rounding of termination
249 // correct if numThreads doesn't evenly divide numOps
251 while (src < n || received < n) {
252 if (src < n && cq.write(src)) {
257 if (received < n && cq.read(dst)) {
258 received += numThreads;
265 template <template <typename> class Atom, bool Dynamic = false>
266 void runTryEnqDeqTest(int numThreads, int numOps) {
267 // write and read aren't linearizable, so we don't have
268 // hard guarantees on their individual behavior. We can still test
269 // correctness in aggregate
270 MPMCQueue<int,Atom, Dynamic> cq(numThreads);
273 vector<std::thread> threads(numThreads);
274 std::atomic<uint64_t> sum(0);
275 for (int t = 0; t < numThreads; ++t) {
276 threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
277 numThreads, n, std::ref(cq), std::ref(sum), t));
279 for (auto& t : threads) {
282 EXPECT_TRUE(cq.isEmpty());
283 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
286 TEST(MPMCQueue, mt_try_enq_deq) {
287 int nts[] = { 1, 3, 100 };
291 runTryEnqDeqTest<std::atomic>(nt, n);
295 TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
296 int nts[] = { 1, 3, 100 };
300 runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
304 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
305 int nts[] = { 1, 3, 100 };
309 runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
313 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
314 int nts[] = { 1, 3, 100 };
318 runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
322 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
323 int nts[] = { 3, 10 };
326 LOG(INFO) << "using seed " << seed;
331 DSched sched(DSched::uniform(seed));
332 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
335 DSched sched(DSched::uniformSubset(seed, 2));
336 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
339 DSched sched(DSched::uniform(seed));
340 runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
343 DSched sched(DSched::uniformSubset(seed, 2));
344 runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
349 uint64_t nowMicro() {
351 gettimeofday(&tv, nullptr);
352 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
355 template <typename Q>
356 struct WriteMethodCaller {
357 WriteMethodCaller() {}
358 virtual ~WriteMethodCaller() = default;
359 virtual bool callWrite(Q& q, int i) = 0;
360 virtual string methodName() = 0;
363 template <typename Q>
364 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
365 bool callWrite(Q& q, int i) override {
369 string methodName() override { return "blockingWrite"; }
372 template <typename Q>
373 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
374 bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
375 string methodName() override { return "writeIfNotFull"; }
378 template <typename Q>
379 struct WriteCaller : public WriteMethodCaller<Q> {
380 bool callWrite(Q& q, int i) override { return q.write(i); }
381 string methodName() override { return "write"; }
386 class Clock = steady_clock,
387 class Duration = typename Clock::duration>
388 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
389 const Duration duration_;
390 explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
391 bool callWrite(Q& q, int i) override {
392 auto then = Clock::now() + duration_;
393 return q.tryWriteUntil(then, i);
395 string methodName() override {
396 return folly::sformat(
397 "tryWriteUntil({}ms)",
398 std::chrono::duration_cast<milliseconds>(duration_).count());
402 template <typename Q>
403 string producerConsumerBench(Q&& queue,
408 WriteMethodCaller<Q>& writer,
409 bool ignoreContents = false) {
412 struct rusage beginUsage;
413 getrusage(RUSAGE_SELF, &beginUsage);
415 auto beginMicro = nowMicro();
418 std::atomic<uint64_t> sum(0);
419 std::atomic<uint64_t> failed(0);
421 vector<std::thread> producers(numProducers);
422 for (int t = 0; t < numProducers; ++t) {
423 producers[t] = DSched::thread([&,t]{
424 for (int i = t; i < numOps; i += numProducers) {
425 while (!writer.callWrite(q, i)) {
432 vector<std::thread> consumers(numConsumers);
433 for (int t = 0; t < numConsumers; ++t) {
434 consumers[t] = DSched::thread([&,t]{
435 uint64_t localSum = 0;
436 for (int i = t; i < numOps; i += numConsumers) {
438 q.blockingRead(dest);
439 EXPECT_FALSE(dest == -1);
446 for (auto& t : producers) {
449 for (auto& t : consumers) {
452 if (!ignoreContents) {
453 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
456 auto endMicro = nowMicro();
458 struct rusage endUsage;
459 getrusage(RUSAGE_SELF, &endUsage);
461 uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
462 long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
463 (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
464 uint64_t failures = failed;
465 size_t allocated = q.allocatedCapacity();
467 return folly::sformat(
468 "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
469 "handoff, {} failures, {} allocated",
481 template <bool Dynamic = false>
482 void runMtProdConsDeterministic(long seed) {
483 // we use the Bench method, but perf results are meaningless under DSched
484 DSched sched(DSched::uniform(seed));
486 using QueueType = MPMCQueue<int, DeterministicAtomic, Dynamic>;
488 vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
489 callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
490 callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
491 callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
492 callers.emplace_back(
493 std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
494 callers.emplace_back(
495 std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
498 for (const auto& caller : callers) {
501 producerConsumerBench(
502 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
503 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
504 + folly::to<std::string>(cap)+")",
511 producerConsumerBench(
512 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
513 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
514 + folly::to<std::string>(cap)+")",
521 producerConsumerBench(
522 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
523 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
524 + folly::to<std::string>(cap)+")",
531 producerConsumerBench(
532 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
533 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
534 + folly::to<std::string>(cap)+")",
541 producerConsumerBench(
542 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
543 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
544 + folly::to<std::string>(cap)+")",
552 void runMtProdConsDeterministicDynamic(
561 // we use the Bench method, but perf results are meaningless under DSched
562 DSched sched(DSched::uniform(seed));
564 using QueueType = MPMCQueue<int, DeterministicAtomic, true>;
566 vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
567 callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
568 callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
569 callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
570 callers.emplace_back(
571 std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
572 callers.emplace_back(
573 std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
575 for (const auto& caller : callers) {
577 producerConsumerBench(
578 MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
579 "MPMCQueue<int, DeterministicAtomic, true>("
580 + folly::to<std::string>(cap) + ", "
581 + folly::to<std::string>(minCap) + ", "
582 + folly::to<std::string>(mult)+")",
590 TEST(MPMCQueue, mt_prod_cons_deterministic) {
591 runMtProdConsDeterministic(0);
594 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
595 runMtProdConsDeterministic<true>(0);
598 template <typename T>
599 void setFromEnv(T& var, const char* envvar) {
600 char* str = std::getenv(envvar);
601 if (str) { var = atoi(str); }
604 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
608 uint32_t numOps = 1000;
612 setFromEnv(seed, "SEED");
613 setFromEnv(prods, "PRODS");
614 setFromEnv(cons, "CONS");
615 setFromEnv(numOps, "NUM_OPS");
616 setFromEnv(cap, "CAP");
617 setFromEnv(minCap, "MIN_CAP");
618 setFromEnv(mult, "MULT");
619 runMtProdConsDeterministicDynamic(
620 seed, prods, cons, numOps, cap, minCap, mult);
623 #define PC_BENCH(q, np, nc, ...) \
624 producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
626 template <bool Dynamic = false>
627 void runMtProdCons() {
628 using QueueType = MPMCQueue<int, std::atomic, Dynamic>;
631 setFromEnv(n, "NUM_OPS");
632 vector<unique_ptr<WriteMethodCaller<QueueType>>>
634 callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
635 callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
636 callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
637 callers.emplace_back(
638 std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
639 callers.emplace_back(
640 std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
641 for (const auto& caller : callers) {
642 LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
643 LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
644 LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
645 LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
646 LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
647 LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
648 LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
649 LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
650 LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
654 TEST(MPMCQueue, mt_prod_cons) {
658 TEST(MPMCQueue, mt_prod_cons_dynamic) {
659 runMtProdCons</* Dynamic = */ true>();
662 template <bool Dynamic = false>
663 void runMtProdConsEmulatedFutex() {
664 using QueueType = MPMCQueue<int, EmulatedFutexAtomic, Dynamic>;
667 vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
668 callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
669 callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
670 callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
671 callers.emplace_back(
672 std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
673 callers.emplace_back(
674 std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
675 for (const auto& caller : callers) {
676 LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
677 LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
678 LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
679 LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
680 LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
681 LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
682 LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
683 LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
684 LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
688 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
689 runMtProdConsEmulatedFutex();
692 TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
693 runMtProdConsEmulatedFutex</* Dynamic = */ true>();
696 template <template <typename> class Atom, bool Dynamic = false>
697 void runNeverFailThread(int numThreads,
699 MPMCQueue<int, Atom, Dynamic>& cq,
700 std::atomic<uint64_t>& sum,
702 uint64_t threadSum = 0;
703 for (int i = t; i < n; i += numThreads) {
705 EXPECT_TRUE(cq.writeIfNotFull(i));
708 EXPECT_TRUE(cq.readIfNotEmpty(dest));
709 EXPECT_TRUE(dest >= 0);
715 template <template <typename> class Atom, bool Dynamic = false>
716 uint64_t runNeverFailTest(int numThreads, int numOps) {
717 // always #enq >= #deq
718 MPMCQueue<int, Atom, Dynamic> cq(numThreads);
721 auto beginMicro = nowMicro();
723 vector<std::thread> threads(numThreads);
724 std::atomic<uint64_t> sum(0);
725 for (int t = 0; t < numThreads; ++t) {
726 threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
733 for (auto& t : threads) {
736 EXPECT_TRUE(cq.isEmpty());
737 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
739 return nowMicro() - beginMicro;
742 template <template <typename> class Atom, bool Dynamic = false>
743 void runMtNeverFail(std::vector<int>& nts, int n) {
745 uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
746 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
751 // All the never_fail tests are for the non-dynamic version only.
752 // False positive for dynamic version. Some writeIfNotFull() and
753 // tryWriteUntil() operations may fail in transient conditions related
756 TEST(MPMCQueue, mt_never_fail) {
757 std::vector<int> nts {1, 3, 100};
759 runMtNeverFail<std::atomic>(nts, n);
762 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
763 std::vector<int> nts {1, 3, 100};
765 runMtNeverFail<EmulatedFutexAtomic>(nts, n);
768 template <bool Dynamic = false>
769 void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
770 LOG(INFO) << "using seed " << seed;
773 DSched sched(DSched::uniform(seed));
774 runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
777 DSched sched(DSched::uniformSubset(seed, 2));
778 runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
783 TEST(MPMCQueue, mt_never_fail_deterministic) {
784 std::vector<int> nts {3, 10};
785 long seed = 0; // nowMicro() % 10000;
787 runMtNeverFailDeterministic(nts, n, seed);
790 template <class Clock, template <typename> class Atom, bool Dynamic>
791 void runNeverFailUntilThread(int numThreads,
793 MPMCQueue<int, Atom, Dynamic>& cq,
794 std::atomic<uint64_t>& sum,
796 uint64_t threadSum = 0;
797 for (int i = t; i < n; i += numThreads) {
799 auto soon = Clock::now() + std::chrono::seconds(1);
800 EXPECT_TRUE(cq.tryWriteUntil(soon, i));
803 EXPECT_TRUE(cq.readIfNotEmpty(dest));
804 EXPECT_TRUE(dest >= 0);
810 template <class Clock, template <typename> class Atom, bool Dynamic = false>
811 uint64_t runNeverFailTest(int numThreads, int numOps) {
812 // always #enq >= #deq
813 MPMCQueue<int, Atom, Dynamic> cq(numThreads);
816 auto beginMicro = nowMicro();
818 vector<std::thread> threads(numThreads);
819 std::atomic<uint64_t> sum(0);
820 for (int t = 0; t < numThreads; ++t) {
821 threads[t] = DSched::thread(std::bind(
822 runNeverFailUntilThread<Clock, Atom, Dynamic>,
829 for (auto& t : threads) {
832 EXPECT_TRUE(cq.isEmpty());
833 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
835 return nowMicro() - beginMicro;
838 template <bool Dynamic = false>
839 void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
842 runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
843 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
848 TEST(MPMCQueue, mt_never_fail_until_system) {
849 std::vector<int> nts {1, 3, 100};
851 runMtNeverFailUntilSystem(nts, n);
854 template <bool Dynamic = false>
855 void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
858 runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
859 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
864 TEST(MPMCQueue, mt_never_fail_until_steady) {
865 std::vector<int> nts {1, 3, 100};
867 runMtNeverFailUntilSteady(nts, n);
870 enum LifecycleEvent {
882 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
883 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
885 static int lc_outstanding() {
886 return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
887 lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
888 lc_counts[DESTRUCTOR];
891 static void lc_snap() {
892 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
893 lc_prev[i] = lc_counts[i];
897 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
899 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
900 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
901 int delta = i == what || i == what2 ? 1 : 0;
902 EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
903 << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
904 << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
905 << ", from line " << lineno;
910 template <typename R>
912 typedef R IsRelocatable;
916 Lifecycle() noexcept : constructed(true) {
917 ++lc_counts[DEFAULT_CONSTRUCTOR];
920 explicit Lifecycle(int /* n */, char const* /* s */) noexcept
921 : constructed(true) {
922 ++lc_counts[TWO_ARG_CONSTRUCTOR];
925 Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
926 ++lc_counts[COPY_CONSTRUCTOR];
929 Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
930 ++lc_counts[MOVE_CONSTRUCTOR];
933 Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
934 ++lc_counts[COPY_OPERATOR];
938 Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
939 ++lc_counts[MOVE_OPERATOR];
943 ~Lifecycle() noexcept {
944 ++lc_counts[DESTRUCTOR];
945 assert(lc_outstanding() >= 0);
951 template <typename R>
952 void runPerfectForwardingTest() {
954 EXPECT_EQ(lc_outstanding(), 0);
957 // Non-dynamic only. False positive for dynamic.
958 MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
959 LIFECYCLE_STEP(NOTHING);
961 for (int pass = 0; pass < 10; ++pass) {
962 for (int i = 0; i < 10; ++i) {
963 queue.blockingWrite();
964 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
966 queue.blockingWrite(1, "one");
967 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
971 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
972 queue.blockingWrite(std::move(src));
973 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
975 LIFECYCLE_STEP(DESTRUCTOR);
979 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
980 queue.blockingWrite(src);
981 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
983 LIFECYCLE_STEP(DESTRUCTOR);
985 EXPECT_TRUE(queue.write());
986 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
989 EXPECT_EQ(queue.size(), 50);
990 EXPECT_FALSE(queue.write(2, "two"));
991 LIFECYCLE_STEP(NOTHING);
993 for (int i = 0; i < 50; ++i) {
996 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
998 queue.blockingRead(node);
1000 // relocatable, moved via memcpy
1001 LIFECYCLE_STEP(DESTRUCTOR);
1003 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1006 LIFECYCLE_STEP(DESTRUCTOR);
1009 EXPECT_EQ(queue.size(), 0);
1012 // put one element back before destruction
1014 Lifecycle<R> src(3, "three");
1015 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1016 queue.write(std::move(src));
1017 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1019 LIFECYCLE_STEP(DESTRUCTOR); // destroy src
1021 LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
1023 EXPECT_EQ(lc_outstanding(), 0);
1026 TEST(MPMCQueue, perfect_forwarding) {
1027 runPerfectForwardingTest<std::false_type>();
1030 TEST(MPMCQueue, perfect_forwarding_relocatable) {
1031 runPerfectForwardingTest<std::true_type>();
1034 template <bool Dynamic = false>
1035 void run_queue_moving() {
1037 EXPECT_EQ(lc_outstanding(), 0);
1040 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
1041 LIFECYCLE_STEP(NOTHING);
1044 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1047 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
1049 LIFECYCLE_STEP(NOTHING);
1050 EXPECT_EQ(a.capacity(), 0);
1051 EXPECT_EQ(a.size(), 0);
1052 EXPECT_EQ(b.capacity(), 50);
1053 EXPECT_EQ(b.size(), 1);
1056 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1059 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
1060 LIFECYCLE_STEP(NOTHING);
1062 LIFECYCLE_STEP(NOTHING);
1063 EXPECT_EQ(c.capacity(), 50);
1064 EXPECT_EQ(c.size(), 2);
1067 Lifecycle<std::false_type> dst;
1068 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1069 c.blockingRead(dst);
1070 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1074 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
1075 LIFECYCLE_STEP(NOTHING);
1077 LIFECYCLE_STEP(NOTHING);
1078 EXPECT_EQ(c.capacity(), 10);
1079 EXPECT_TRUE(c.isEmpty());
1080 EXPECT_EQ(d.capacity(), 50);
1081 EXPECT_EQ(d.size(), 1);
1083 d.blockingRead(dst);
1084 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1086 c.blockingWrite(dst);
1087 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1089 d.blockingWrite(std::move(dst));
1090 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1091 } // d goes out of scope
1092 LIFECYCLE_STEP(DESTRUCTOR);
1093 } // dst goes out of scope
1094 LIFECYCLE_STEP(DESTRUCTOR);
1095 } // c goes out of scope
1096 LIFECYCLE_STEP(DESTRUCTOR);
1099 TEST(MPMCQueue, queue_moving) {
1103 TEST(MPMCQueue, queue_moving_dynamic) {
1104 run_queue_moving<true>();
1107 TEST(MPMCQueue, explicit_zero_capacity_fail) {
1108 ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
1110 using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
1111 ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
1114 template <bool Dynamic>
1115 void testTryReadUntil() {
1116 MPMCQueue<int, std::atomic, Dynamic> q{1};
1118 const auto wait = std::chrono::milliseconds(100);
1122 std::vector<std::thread> threads;
1123 boost::barrier b{3};
1124 for (int i = 0; i < 2; i++) {
1125 threads.emplace_back([&, i] {
1127 rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
1132 EXPECT_TRUE(q.write(42));
1134 for (int i = 0; i < 2; i++) {
1138 for (int i = 0; i < 2; i++) {
1139 int other = (i + 1) % 2;
1141 EXPECT_EQ(42, vals[i]);
1142 EXPECT_FALSE(rets[other]);
1146 EXPECT_TRUE(watch.elapsed(wait));
1149 template <bool Dynamic>
1150 void testTryWriteUntil() {
1151 MPMCQueue<int, std::atomic, Dynamic> q{1};
1152 EXPECT_TRUE(q.write(42));
1154 const auto wait = std::chrono::milliseconds(100);
1157 std::vector<std::thread> threads;
1158 boost::barrier b{3};
1159 for (int i = 0; i < 2; i++) {
1160 threads.emplace_back([&, i] {
1162 rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
1168 EXPECT_TRUE(q.read(x));
1171 for (int i = 0; i < 2; i++) {
1174 EXPECT_TRUE(q.read(x));
1176 for (int i = 0; i < 2; i++) {
1177 int other = (i + 1) % 2;
1180 EXPECT_FALSE(rets[other]);
1184 EXPECT_TRUE(watch.elapsed(wait));
1187 TEST(MPMCQueue, try_read_until) {
1188 testTryReadUntil<false>();
1191 TEST(MPMCQueue, try_read_until_dynamic) {
1192 testTryReadUntil<true>();
1195 TEST(MPMCQueue, try_write_until) {
1196 testTryWriteUntil<false>();
1199 TEST(MPMCQueue, try_write_until_dynamic) {
1200 testTryWriteUntil<true>();
1203 template <bool Dynamic>
1204 void testTimeout(MPMCQueue<int, std::atomic, Dynamic>& q) {
1206 /* The following must not block forever */
1208 std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
1211 TEST(MPMCQueue, try_write_until_timeout) {
1212 folly::MPMCQueue<int, std::atomic, false> queue(1);
1213 testTimeout<false>(queue);
1216 TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
1217 folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
1218 testTimeout<true>(queue);