2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/Format.h>
18 #include <folly/MPMCQueue.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/GTest.h>
21 #include <folly/portability/SysResource.h>
22 #include <folly/portability/SysTime.h>
23 #include <folly/portability/Unistd.h>
24 #include <folly/stop_watch.h>
25 #include <folly/test/DeterministicSchedule.h>
27 #include <boost/intrusive_ptr.hpp>
28 #include <boost/thread/barrier.hpp>
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
36 using namespace folly;
37 using namespace detail;
39 using std::chrono::time_point;
40 using std::chrono::steady_clock;
41 using std::chrono::seconds;
42 using std::chrono::milliseconds;
44 using std::unique_ptr;
47 typedef DeterministicSchedule DSched;
49 template <template<typename> class Atom>
50 void run_mt_sequencer_thread(
54 TurnSequencer<Atom>& seq,
55 Atom<uint32_t>& spinThreshold,
58 for (int op = i; op < numOps; op += numThreads) {
59 seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
60 EXPECT_EQ(prev, op - 1);
62 seq.completeTurn(init + op);
66 template <template<typename> class Atom>
67 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
68 TurnSequencer<Atom> seq(init);
69 Atom<uint32_t> spinThreshold(0);
72 vector<std::thread> threads(numThreads);
73 for (int i = 0; i < numThreads; ++i) {
74 threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
75 numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
79 for (auto& thr : threads) {
83 EXPECT_EQ(prev, numOps - 1);
86 TEST(MPMCQueue, sequencer) {
87 run_mt_sequencer_test<std::atomic>(1, 100, 0);
88 run_mt_sequencer_test<std::atomic>(2, 100000, -100);
89 run_mt_sequencer_test<std::atomic>(100, 10000, -100);
92 TEST(MPMCQueue, sequencer_emulated_futex) {
93 run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
94 run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
95 run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
98 TEST(MPMCQueue, sequencer_deterministic) {
99 DSched sched(DSched::uniform(0));
100 run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
101 run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
102 run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
105 template <bool Dynamic = false, typename T>
106 void runElementTypeTest(T&& src) {
107 MPMCQueue<T, std::atomic, Dynamic> cq(10);
108 cq.blockingWrite(std::forward<T>(src));
110 cq.blockingRead(dest);
111 EXPECT_TRUE(cq.write(std::move(dest)));
112 EXPECT_TRUE(cq.read(dest));
113 auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
114 EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
115 EXPECT_TRUE(cq.read(dest));
116 auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
117 EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
118 EXPECT_TRUE(cq.read(dest));
122 static FOLLY_TLS int active_instances;
124 mutable std::atomic<int> rc;
126 RefCounted() : rc(0) {
134 FOLLY_TLS int RefCounted::active_instances;
136 void intrusive_ptr_add_ref(RefCounted const* p) {
140 void intrusive_ptr_release(RefCounted const* p) {
141 if (--(p->rc) == 0) {
146 TEST(MPMCQueue, lots_of_element_types) {
147 runElementTypeTest(10);
148 runElementTypeTest(string("abc"));
149 runElementTypeTest(std::make_pair(10, string("def")));
150 runElementTypeTest(vector<string>{{"abc"}});
151 runElementTypeTest(std::make_shared<char>('a'));
152 runElementTypeTest(std::make_unique<char>('a'));
153 runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
154 EXPECT_EQ(RefCounted::active_instances, 0);
157 TEST(MPMCQueue, lots_of_element_types_dynamic) {
158 runElementTypeTest<true>(10);
159 runElementTypeTest<true>(string("abc"));
160 runElementTypeTest<true>(std::make_pair(10, string("def")));
161 runElementTypeTest<true>(vector<string>{{"abc"}});
162 runElementTypeTest<true>(std::make_shared<char>('a'));
163 runElementTypeTest<true>(std::make_unique<char>('a'));
164 runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
165 EXPECT_EQ(RefCounted::active_instances, 0);
168 TEST(MPMCQueue, single_thread_enqdeq) {
169 // Non-dynamic version only.
170 // False positive for dynamic version. Capacity can be temporarily
171 // higher than specified.
172 MPMCQueue<int> cq(10);
174 for (int pass = 0; pass < 10; ++pass) {
175 for (int i = 0; i < 10; ++i) {
176 EXPECT_TRUE(cq.write(i));
178 EXPECT_FALSE(cq.write(-1));
179 EXPECT_FALSE(cq.isEmpty());
180 EXPECT_EQ(cq.size(), 10);
182 for (int i = 0; i < 5; ++i) {
184 EXPECT_TRUE(cq.read(dest));
187 for (int i = 5; i < 10; ++i) {
189 cq.blockingRead(dest);
193 EXPECT_FALSE(cq.read(dest));
196 EXPECT_TRUE(cq.isEmpty());
197 EXPECT_EQ(cq.size(), 0);
201 TEST(MPMCQueue, tryenq_capacity_test) {
202 // Non-dynamic version only.
203 // False positive for dynamic version. Capacity can be temporarily
204 // higher than specified.
205 for (size_t cap = 1; cap < 100; ++cap) {
206 MPMCQueue<int> cq(cap);
207 for (size_t i = 0; i < cap; ++i) {
208 EXPECT_TRUE(cq.write(i));
210 EXPECT_FALSE(cq.write(100));
214 TEST(MPMCQueue, enq_capacity_test) {
215 // Non-dynamic version only.
216 // False positive for dynamic version. Capacity can be temporarily
217 // higher than specified.
218 for (auto cap : { 1, 100, 10000 }) {
219 MPMCQueue<int> cq(cap);
220 for (int i = 0; i < cap; ++i) {
225 auto thr = std::thread([&]{
226 cq.blockingWrite(100);
232 cq.blockingRead(dummy);
238 template <template<typename> class Atom, bool Dynamic = false>
239 void runTryEnqDeqThread(
242 MPMCQueue<int, Atom, Dynamic>& cq,
243 std::atomic<uint64_t>& sum,
245 uint64_t threadSum = 0;
247 // received doesn't reflect any actual values, we just start with
248 // t and increment by numThreads to get the rounding of termination
249 // correct if numThreads doesn't evenly divide numOps
251 while (src < n || received < n) {
252 if (src < n && cq.write(src)) {
257 if (received < n && cq.read(dst)) {
258 received += numThreads;
265 template <template<typename> class Atom, bool Dynamic = false>
266 void runTryEnqDeqTest(int numThreads, int numOps) {
267 // write and read aren't linearizable, so we don't have
268 // hard guarantees on their individual behavior. We can still test
269 // correctness in aggregate
270 MPMCQueue<int,Atom, Dynamic> cq(numThreads);
273 vector<std::thread> threads(numThreads);
274 std::atomic<uint64_t> sum(0);
275 for (int t = 0; t < numThreads; ++t) {
276 threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
277 numThreads, n, std::ref(cq), std::ref(sum), t));
279 for (auto& t : threads) {
282 EXPECT_TRUE(cq.isEmpty());
283 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
286 TEST(MPMCQueue, mt_try_enq_deq) {
287 int nts[] = { 1, 3, 100 };
291 runTryEnqDeqTest<std::atomic>(nt, n);
295 TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
296 int nts[] = { 1, 3, 100 };
300 runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
304 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
305 int nts[] = { 1, 3, 100 };
309 runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
313 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
314 int nts[] = { 1, 3, 100 };
318 runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
322 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
323 int nts[] = { 3, 10 };
326 LOG(INFO) << "using seed " << seed;
331 DSched sched(DSched::uniform(seed));
332 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
335 DSched sched(DSched::uniformSubset(seed, 2));
336 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
339 DSched sched(DSched::uniform(seed));
340 runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
343 DSched sched(DSched::uniformSubset(seed, 2));
344 runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
349 uint64_t nowMicro() {
351 gettimeofday(&tv, 0);
352 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
355 template <typename Q>
356 struct WriteMethodCaller {
357 WriteMethodCaller() {}
358 virtual ~WriteMethodCaller() = default;
359 virtual bool callWrite(Q& q, int i) = 0;
360 virtual string methodName() = 0;
363 template <typename Q>
364 struct BlockingWriteCaller : public WriteMethodCaller<Q> {
365 bool callWrite(Q& q, int i) override {
369 string methodName() override { return "blockingWrite"; }
372 template <typename Q>
373 struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
374 bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
375 string methodName() override { return "writeIfNotFull"; }
378 template <typename Q>
379 struct WriteCaller : public WriteMethodCaller<Q> {
380 bool callWrite(Q& q, int i) override { return q.write(i); }
381 string methodName() override { return "write"; }
384 template <typename Q,
385 class Clock = steady_clock,
386 class Duration = typename Clock::duration>
387 struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
388 const Duration duration_;
389 explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
390 bool callWrite(Q& q, int i) override {
391 auto then = Clock::now() + duration_;
392 return q.tryWriteUntil(then, i);
394 string methodName() override {
395 return folly::sformat(
396 "tryWriteUntil({}ms)",
397 std::chrono::duration_cast<milliseconds>(duration_).count());
401 template <typename Q>
402 string producerConsumerBench(Q&& queue,
407 WriteMethodCaller<Q>& writer,
408 bool ignoreContents = false) {
411 struct rusage beginUsage;
412 getrusage(RUSAGE_SELF, &beginUsage);
414 auto beginMicro = nowMicro();
417 std::atomic<uint64_t> sum(0);
418 std::atomic<uint64_t> failed(0);
420 vector<std::thread> producers(numProducers);
421 for (int t = 0; t < numProducers; ++t) {
422 producers[t] = DSched::thread([&,t]{
423 for (int i = t; i < numOps; i += numProducers) {
424 while (!writer.callWrite(q, i)) {
431 vector<std::thread> consumers(numConsumers);
432 for (int t = 0; t < numConsumers; ++t) {
433 consumers[t] = DSched::thread([&,t]{
434 uint64_t localSum = 0;
435 for (int i = t; i < numOps; i += numConsumers) {
437 q.blockingRead(dest);
438 EXPECT_FALSE(dest == -1);
445 for (auto& t : producers) {
448 for (auto& t : consumers) {
451 if (!ignoreContents) {
452 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
455 auto endMicro = nowMicro();
457 struct rusage endUsage;
458 getrusage(RUSAGE_SELF, &endUsage);
460 uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
461 long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
462 (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
463 uint64_t failures = failed;
464 size_t allocated = q.allocatedCapacity();
466 return folly::sformat(
467 "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
468 "handoff, {} failures, {} allocated",
480 template <bool Dynamic = false>
481 void runMtProdConsDeterministic(long seed) {
482 // we use the Bench method, but perf results are meaningless under DSched
483 DSched sched(DSched::uniform(seed));
485 using QueueType = MPMCQueue<int, DeterministicAtomic, Dynamic>;
487 vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
488 callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
489 callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
490 callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
491 callers.emplace_back(
492 std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
493 callers.emplace_back(
494 std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
497 for (const auto& caller : callers) {
500 producerConsumerBench(
501 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
502 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
503 + folly::to<std::string>(cap)+")",
510 producerConsumerBench(
511 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
512 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
513 + folly::to<std::string>(cap)+")",
520 producerConsumerBench(
521 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
522 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
523 + folly::to<std::string>(cap)+")",
530 producerConsumerBench(
531 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
532 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
533 + folly::to<std::string>(cap)+")",
540 producerConsumerBench(
541 MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
542 "MPMCQueue<int, DeterministicAtomic, Dynamic>("
543 + folly::to<std::string>(cap)+")",
551 void runMtProdConsDeterministicDynamic(
560 // we use the Bench method, but perf results are meaningless under DSched
561 DSched sched(DSched::uniform(seed));
563 using QueueType = MPMCQueue<int, DeterministicAtomic, true>;
565 vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
566 callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
567 callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
568 callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
569 callers.emplace_back(
570 std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
571 callers.emplace_back(
572 std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
574 for (const auto& caller : callers) {
576 producerConsumerBench(
577 MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
578 "MPMCQueue<int, DeterministicAtomic, true>("
579 + folly::to<std::string>(cap) + ", "
580 + folly::to<std::string>(minCap) + ", "
581 + folly::to<std::string>(mult)+")",
589 TEST(MPMCQueue, mt_prod_cons_deterministic) {
590 runMtProdConsDeterministic(0);
593 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
594 runMtProdConsDeterministic<true>(0);
597 template <typename T>
598 void setFromEnv(T& var, const char* envvar) {
599 char* str = std::getenv(envvar);
600 if (str) { var = atoi(str); }
603 TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
607 uint32_t numOps = 1000;
611 setFromEnv(seed, "SEED");
612 setFromEnv(prods, "PRODS");
613 setFromEnv(cons, "CONS");
614 setFromEnv(numOps, "NUM_OPS");
615 setFromEnv(cap, "CAP");
616 setFromEnv(minCap, "MIN_CAP");
617 setFromEnv(mult, "MULT");
618 runMtProdConsDeterministicDynamic(
619 seed, prods, cons, numOps, cap, minCap, mult);
622 #define PC_BENCH(q, np, nc, ...) \
623 producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
625 template <bool Dynamic = false>
626 void runMtProdCons() {
627 using QueueType = MPMCQueue<int, std::atomic, Dynamic>;
630 setFromEnv(n, "NUM_OPS");
631 vector<unique_ptr<WriteMethodCaller<QueueType>>>
633 callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
634 callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
635 callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
636 callers.emplace_back(
637 std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
638 callers.emplace_back(
639 std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
640 for (const auto& caller : callers) {
641 LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
642 LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
643 LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
644 LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
645 LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
646 LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
647 LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
648 LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
649 LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
653 TEST(MPMCQueue, mt_prod_cons) {
657 TEST(MPMCQueue, mt_prod_cons_dynamic) {
658 runMtProdCons</* Dynamic = */ true>();
661 template <bool Dynamic = false>
662 void runMtProdConsEmulatedFutex() {
663 using QueueType = MPMCQueue<int, EmulatedFutexAtomic, Dynamic>;
666 vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
667 callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
668 callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
669 callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
670 callers.emplace_back(
671 std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
672 callers.emplace_back(
673 std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
674 for (const auto& caller : callers) {
675 LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
676 LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
677 LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
678 LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
679 LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
680 LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
681 LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
682 LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
683 LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
687 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
688 runMtProdConsEmulatedFutex();
691 TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
692 runMtProdConsEmulatedFutex</* Dynamic = */ true>();
695 template <template <typename> class Atom, bool Dynamic = false>
696 void runNeverFailThread(int numThreads,
698 MPMCQueue<int, Atom, Dynamic>& cq,
699 std::atomic<uint64_t>& sum,
701 uint64_t threadSum = 0;
702 for (int i = t; i < n; i += numThreads) {
704 EXPECT_TRUE(cq.writeIfNotFull(i));
707 EXPECT_TRUE(cq.readIfNotEmpty(dest));
708 EXPECT_TRUE(dest >= 0);
714 template <template <typename> class Atom, bool Dynamic = false>
715 uint64_t runNeverFailTest(int numThreads, int numOps) {
716 // always #enq >= #deq
717 MPMCQueue<int, Atom, Dynamic> cq(numThreads);
720 auto beginMicro = nowMicro();
722 vector<std::thread> threads(numThreads);
723 std::atomic<uint64_t> sum(0);
724 for (int t = 0; t < numThreads; ++t) {
725 threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
732 for (auto& t : threads) {
735 EXPECT_TRUE(cq.isEmpty());
736 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
738 return nowMicro() - beginMicro;
741 template <template<typename> class Atom, bool Dynamic = false>
742 void runMtNeverFail(std::vector<int>& nts, int n) {
744 uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
745 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
750 // All the never_fail tests are for the non-dynamic version only.
751 // False positive for dynamic version. Some writeIfNotFull() and
752 // tryWriteUntil() operations may fail in transient conditions related
755 TEST(MPMCQueue, mt_never_fail) {
756 std::vector<int> nts {1, 3, 100};
758 runMtNeverFail<std::atomic>(nts, n);
761 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
762 std::vector<int> nts {1, 3, 100};
764 runMtNeverFail<EmulatedFutexAtomic>(nts, n);
767 template<bool Dynamic = false>
768 void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
769 LOG(INFO) << "using seed " << seed;
772 DSched sched(DSched::uniform(seed));
773 runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
776 DSched sched(DSched::uniformSubset(seed, 2));
777 runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
782 TEST(MPMCQueue, mt_never_fail_deterministic) {
783 std::vector<int> nts {3, 10};
784 long seed = 0; // nowMicro() % 10000;
786 runMtNeverFailDeterministic(nts, n, seed);
789 template <class Clock, template <typename> class Atom, bool Dynamic>
790 void runNeverFailUntilThread(int numThreads,
792 MPMCQueue<int, Atom, Dynamic>& cq,
793 std::atomic<uint64_t>& sum,
795 uint64_t threadSum = 0;
796 for (int i = t; i < n; i += numThreads) {
798 auto soon = Clock::now() + std::chrono::seconds(1);
799 EXPECT_TRUE(cq.tryWriteUntil(soon, i));
802 EXPECT_TRUE(cq.readIfNotEmpty(dest));
803 EXPECT_TRUE(dest >= 0);
809 template <class Clock, template <typename> class Atom, bool Dynamic = false>
810 uint64_t runNeverFailTest(int numThreads, int numOps) {
811 // always #enq >= #deq
812 MPMCQueue<int, Atom, Dynamic> cq(numThreads);
815 auto beginMicro = nowMicro();
817 vector<std::thread> threads(numThreads);
818 std::atomic<uint64_t> sum(0);
819 for (int t = 0; t < numThreads; ++t) {
820 threads[t] = DSched::thread(std::bind(
821 runNeverFailUntilThread<Clock, Atom, Dynamic>,
828 for (auto& t : threads) {
831 EXPECT_TRUE(cq.isEmpty());
832 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
834 return nowMicro() - beginMicro;
837 template <bool Dynamic = false>
838 void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
841 runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
842 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
847 TEST(MPMCQueue, mt_never_fail_until_system) {
848 std::vector<int> nts {1, 3, 100};
850 runMtNeverFailUntilSystem(nts, n);
853 template <bool Dynamic = false>
854 void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
857 runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
858 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
863 TEST(MPMCQueue, mt_never_fail_until_steady) {
864 std::vector<int> nts {1, 3, 100};
866 runMtNeverFailUntilSteady(nts, n);
869 enum LifecycleEvent {
881 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
882 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
884 static int lc_outstanding() {
885 return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
886 lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
887 lc_counts[DESTRUCTOR];
890 static void lc_snap() {
891 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
892 lc_prev[i] = lc_counts[i];
896 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
898 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
899 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
900 int delta = i == what || i == what2 ? 1 : 0;
901 EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
902 << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
903 << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
904 << ", from line " << lineno;
909 template <typename R>
911 typedef R IsRelocatable;
915 Lifecycle() noexcept : constructed(true) {
916 ++lc_counts[DEFAULT_CONSTRUCTOR];
919 explicit Lifecycle(int /* n */, char const* /* s */) noexcept
920 : constructed(true) {
921 ++lc_counts[TWO_ARG_CONSTRUCTOR];
924 Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
925 ++lc_counts[COPY_CONSTRUCTOR];
928 Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
929 ++lc_counts[MOVE_CONSTRUCTOR];
932 Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
933 ++lc_counts[COPY_OPERATOR];
937 Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
938 ++lc_counts[MOVE_OPERATOR];
942 ~Lifecycle() noexcept {
943 ++lc_counts[DESTRUCTOR];
944 assert(lc_outstanding() >= 0);
950 template <typename R>
951 void runPerfectForwardingTest() {
953 EXPECT_EQ(lc_outstanding(), 0);
956 // Non-dynamic only. False positive for dynamic.
957 MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
958 LIFECYCLE_STEP(NOTHING);
960 for (int pass = 0; pass < 10; ++pass) {
961 for (int i = 0; i < 10; ++i) {
962 queue.blockingWrite();
963 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
965 queue.blockingWrite(1, "one");
966 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
970 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
971 queue.blockingWrite(std::move(src));
972 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
974 LIFECYCLE_STEP(DESTRUCTOR);
978 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
979 queue.blockingWrite(src);
980 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
982 LIFECYCLE_STEP(DESTRUCTOR);
984 EXPECT_TRUE(queue.write());
985 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
988 EXPECT_EQ(queue.size(), 50);
989 EXPECT_FALSE(queue.write(2, "two"));
990 LIFECYCLE_STEP(NOTHING);
992 for (int i = 0; i < 50; ++i) {
995 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
997 queue.blockingRead(node);
999 // relocatable, moved via memcpy
1000 LIFECYCLE_STEP(DESTRUCTOR);
1002 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1005 LIFECYCLE_STEP(DESTRUCTOR);
1008 EXPECT_EQ(queue.size(), 0);
1011 // put one element back before destruction
1013 Lifecycle<R> src(3, "three");
1014 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
1015 queue.write(std::move(src));
1016 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1018 LIFECYCLE_STEP(DESTRUCTOR); // destroy src
1020 LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
1022 EXPECT_EQ(lc_outstanding(), 0);
1025 TEST(MPMCQueue, perfect_forwarding) {
1026 runPerfectForwardingTest<std::false_type>();
1029 TEST(MPMCQueue, perfect_forwarding_relocatable) {
1030 runPerfectForwardingTest<std::true_type>();
1033 template <bool Dynamic = false>
1034 void run_queue_moving() {
1036 EXPECT_EQ(lc_outstanding(), 0);
1039 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
1040 LIFECYCLE_STEP(NOTHING);
1043 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1046 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
1048 LIFECYCLE_STEP(NOTHING);
1049 EXPECT_EQ(a.capacity(), 0);
1050 EXPECT_EQ(a.size(), 0);
1051 EXPECT_EQ(b.capacity(), 50);
1052 EXPECT_EQ(b.size(), 1);
1055 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1058 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
1059 LIFECYCLE_STEP(NOTHING);
1061 LIFECYCLE_STEP(NOTHING);
1062 EXPECT_EQ(c.capacity(), 50);
1063 EXPECT_EQ(c.size(), 2);
1066 Lifecycle<std::false_type> dst;
1067 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
1068 c.blockingRead(dst);
1069 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1073 MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
1074 LIFECYCLE_STEP(NOTHING);
1076 LIFECYCLE_STEP(NOTHING);
1077 EXPECT_EQ(c.capacity(), 10);
1078 EXPECT_TRUE(c.isEmpty());
1079 EXPECT_EQ(d.capacity(), 50);
1080 EXPECT_EQ(d.size(), 1);
1082 d.blockingRead(dst);
1083 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
1085 c.blockingWrite(dst);
1086 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
1088 d.blockingWrite(std::move(dst));
1089 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
1090 } // d goes out of scope
1091 LIFECYCLE_STEP(DESTRUCTOR);
1092 } // dst goes out of scope
1093 LIFECYCLE_STEP(DESTRUCTOR);
1094 } // c goes out of scope
1095 LIFECYCLE_STEP(DESTRUCTOR);
1098 TEST(MPMCQueue, queue_moving) {
1102 TEST(MPMCQueue, queue_moving_dynamic) {
1103 run_queue_moving<true>();
1106 TEST(MPMCQueue, explicit_zero_capacity_fail) {
1107 ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
1109 using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
1110 ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
1113 template <bool Dynamic>
1114 void testTryReadUntil() {
1115 MPMCQueue<int, std::atomic, Dynamic> q{1};
1117 const auto wait = std::chrono::milliseconds(100);
1121 std::vector<std::thread> threads;
1122 boost::barrier b{3};
1123 for (int i = 0; i < 2; i++) {
1124 threads.emplace_back([&, i] {
1126 rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
1131 EXPECT_TRUE(q.write(42));
1133 for (int i = 0; i < 2; i++) {
1137 for (int i = 0; i < 2; i++) {
1138 int other = (i + 1) % 2;
1140 EXPECT_EQ(42, vals[i]);
1141 EXPECT_FALSE(rets[other]);
1145 EXPECT_TRUE(watch.elapsed(wait));
1148 template <bool Dynamic>
1149 void testTryWriteUntil() {
1150 MPMCQueue<int, std::atomic, Dynamic> q{1};
1151 EXPECT_TRUE(q.write(42));
1153 const auto wait = std::chrono::milliseconds(100);
1156 std::vector<std::thread> threads;
1157 boost::barrier b{3};
1158 for (int i = 0; i < 2; i++) {
1159 threads.emplace_back([&, i] {
1161 rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
1167 EXPECT_TRUE(q.read(x));
1170 for (int i = 0; i < 2; i++) {
1173 EXPECT_TRUE(q.read(x));
1175 for (int i = 0; i < 2; i++) {
1176 int other = (i + 1) % 2;
1179 EXPECT_FALSE(rets[other]);
1183 EXPECT_TRUE(watch.elapsed(wait));
1186 TEST(MPMCQueue, try_read_until) {
1187 testTryReadUntil<false>();
1190 TEST(MPMCQueue, try_read_until_dynamic) {
1191 testTryReadUntil<true>();
1194 TEST(MPMCQueue, try_write_until) {
1195 testTryWriteUntil<false>();
1198 TEST(MPMCQueue, try_write_until_dynamic) {
1199 testTryWriteUntil<true>();
1202 template <bool Dynamic>
1203 void testTimeout(MPMCQueue<int, std::atomic, Dynamic>& q) {
1205 /* The following must not block forever */
1207 std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
1210 TEST(MPMCQueue, try_write_until_timeout) {
1211 folly::MPMCQueue<int, std::atomic, false> queue(1);
1212 testTimeout<false>(queue);
1215 TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
1216 folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
1217 testTimeout<true>(queue);