2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/test/DeterministicSchedule.h>
22 #include <boost/intrusive_ptr.hpp>
29 #include <sys/resource.h>
31 #include <gtest/gtest.h>
33 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
35 using namespace folly;
36 using namespace detail;
39 typedef DeterministicSchedule DSched;
41 template <template<typename> class Atom>
42 void run_mt_sequencer_thread(
46 TurnSequencer<Atom>& seq,
47 Atom<uint32_t>& spinThreshold,
50 for (int op = i; op < numOps; op += numThreads) {
51 seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
52 EXPECT_EQ(prev, op - 1);
54 seq.completeTurn(init + op);
58 template <template<typename> class Atom>
59 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
60 TurnSequencer<Atom> seq(init);
61 Atom<uint32_t> spinThreshold(0);
64 std::vector<std::thread> threads(numThreads);
65 for (int i = 0; i < numThreads; ++i) {
66 threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
67 numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
71 for (auto& thr : threads) {
75 EXPECT_EQ(prev, numOps - 1);
78 TEST(MPMCQueue, sequencer) {
79 run_mt_sequencer_test<std::atomic>(1, 100, 0);
80 run_mt_sequencer_test<std::atomic>(2, 100000, -100);
81 run_mt_sequencer_test<std::atomic>(100, 10000, -100);
84 TEST(MPMCQueue, sequencer_emulated_futex) {
85 run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
86 run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
87 run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
90 TEST(MPMCQueue, sequencer_deterministic) {
91 DSched sched(DSched::uniform(0));
92 run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
93 run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
94 run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
98 void runElementTypeTest(T&& src) {
100 cq.blockingWrite(std::move(src));
102 cq.blockingRead(dest);
103 EXPECT_TRUE(cq.write(std::move(dest)));
104 EXPECT_TRUE(cq.read(dest));
108 static __thread int active_instances;
110 mutable std::atomic<int> rc;
112 RefCounted() : rc(0) {
120 __thread int RefCounted::active_instances;
123 void intrusive_ptr_add_ref(RefCounted const* p) {
127 void intrusive_ptr_release(RefCounted const* p) {
128 if (--(p->rc) == 0) {
133 TEST(MPMCQueue, lots_of_element_types) {
134 runElementTypeTest(10);
135 runElementTypeTest(std::string("abc"));
136 runElementTypeTest(std::make_pair(10, std::string("def")));
137 runElementTypeTest(std::vector<std::string>{ { "abc" } });
138 runElementTypeTest(std::make_shared<char>('a'));
139 runElementTypeTest(folly::make_unique<char>('a'));
140 runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
141 EXPECT_EQ(RefCounted::active_instances, 0);
144 TEST(MPMCQueue, single_thread_enqdeq) {
145 MPMCQueue<int> cq(10);
147 for (int pass = 0; pass < 10; ++pass) {
148 for (int i = 0; i < 10; ++i) {
149 EXPECT_TRUE(cq.write(i));
151 EXPECT_FALSE(cq.write(-1));
152 EXPECT_FALSE(cq.isEmpty());
153 EXPECT_EQ(cq.size(), 10);
155 for (int i = 0; i < 5; ++i) {
157 EXPECT_TRUE(cq.read(dest));
160 for (int i = 5; i < 10; ++i) {
162 cq.blockingRead(dest);
166 EXPECT_FALSE(cq.read(dest));
169 EXPECT_TRUE(cq.isEmpty());
170 EXPECT_EQ(cq.size(), 0);
174 TEST(MPMCQueue, tryenq_capacity_test) {
175 for (size_t cap = 1; cap < 100; ++cap) {
176 MPMCQueue<int> cq(cap);
177 for (size_t i = 0; i < cap; ++i) {
178 EXPECT_TRUE(cq.write(i));
180 EXPECT_FALSE(cq.write(100));
184 TEST(MPMCQueue, enq_capacity_test) {
185 for (auto cap : { 1, 100, 10000 }) {
186 MPMCQueue<int> cq(cap);
187 for (int i = 0; i < cap; ++i) {
192 auto thr = std::thread([&]{
193 cq.blockingWrite(100);
199 cq.blockingRead(dummy);
205 template <template<typename> class Atom>
206 void runTryEnqDeqThread(
209 MPMCQueue<int, Atom>& cq,
210 std::atomic<uint64_t>& sum,
212 uint64_t threadSum = 0;
214 // received doesn't reflect any actual values, we just start with
215 // t and increment by numThreads to get the rounding of termination
216 // correct if numThreads doesn't evenly divide numOps
218 while (src < n || received < n) {
219 if (src < n && cq.write(src)) {
224 if (received < n && cq.read(dst)) {
225 received += numThreads;
232 template <template<typename> class Atom>
233 void runTryEnqDeqTest(int numThreads, int numOps) {
234 // write and read aren't linearizable, so we don't have
235 // hard guarantees on their individual behavior. We can still test
236 // correctness in aggregate
237 MPMCQueue<int,Atom> cq(numThreads);
240 std::vector<std::thread> threads(numThreads);
241 std::atomic<uint64_t> sum(0);
242 for (int t = 0; t < numThreads; ++t) {
243 threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
244 numThreads, n, std::ref(cq), std::ref(sum), t));
246 for (auto& t : threads) {
249 EXPECT_TRUE(cq.isEmpty());
250 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
253 TEST(MPMCQueue, mt_try_enq_deq) {
254 int nts[] = { 1, 3, 100 };
258 runTryEnqDeqTest<std::atomic>(nt, n);
262 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
263 int nts[] = { 1, 3, 100 };
267 runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
271 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
272 int nts[] = { 3, 10 };
275 LOG(INFO) << "using seed " << seed;
280 DSched sched(DSched::uniform(seed));
281 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
284 DSched sched(DSched::uniformSubset(seed, 2));
285 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
290 uint64_t nowMicro() {
292 gettimeofday(&tv, 0);
293 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
296 template <typename Q>
297 std::string producerConsumerBench(Q&& queue, std::string qName,
298 int numProducers, int numConsumers,
299 int numOps, bool ignoreContents = false) {
302 struct rusage beginUsage;
303 getrusage(RUSAGE_SELF, &beginUsage);
305 auto beginMicro = nowMicro();
308 std::atomic<uint64_t> sum(0);
310 std::vector<std::thread> producers(numProducers);
311 for (int t = 0; t < numProducers; ++t) {
312 producers[t] = DSched::thread([&,t]{
313 for (int i = t; i < numOps; i += numProducers) {
319 std::vector<std::thread> consumers(numConsumers);
320 for (int t = 0; t < numConsumers; ++t) {
321 consumers[t] = DSched::thread([&,t]{
322 uint64_t localSum = 0;
323 for (int i = t; i < numOps; i += numConsumers) {
325 q.blockingRead(dest);
326 EXPECT_FALSE(dest == -1);
333 for (auto& t : producers) {
336 for (auto& t : consumers) {
339 if (!ignoreContents) {
340 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
343 auto endMicro = nowMicro();
345 struct rusage endUsage;
346 getrusage(RUSAGE_SELF, &endUsage);
348 uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
349 long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
350 (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
352 return folly::format(
353 "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff",
354 qName, numProducers, numConsumers, nanosPer, csw, n).str();
358 TEST(MPMCQueue, mt_prod_cons_deterministic) {
359 // we use the Bench method, but perf results are meaningless under DSched
360 DSched sched(DSched::uniform(0));
362 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
364 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
366 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
368 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
370 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1),
374 #define PC_BENCH(q, np, nc, ...) \
375 producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
377 TEST(MPMCQueue, mt_prod_cons) {
379 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n);
380 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n);
381 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n);
382 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n);
383 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n);
384 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n);
385 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n);
386 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n);
387 LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
390 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
392 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 1, n);
393 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 1, n);
394 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 10, n);
395 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 10, n);
396 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 1, n);
397 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 1, n);
398 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 10, n);
399 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 10, n);
401 << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(100000)), 32, 100, n);
404 template <template<typename> class Atom>
405 void runNeverFailThread(
408 MPMCQueue<int, Atom>& cq,
409 std::atomic<uint64_t>& sum,
411 uint64_t threadSum = 0;
412 for (int i = t; i < n; i += numThreads) {
414 EXPECT_TRUE(cq.writeIfNotFull(i));
417 EXPECT_TRUE(cq.readIfNotEmpty(dest));
418 EXPECT_TRUE(dest >= 0);
424 template <template<typename> class Atom>
425 uint64_t runNeverFailTest(int numThreads, int numOps) {
426 // always #enq >= #deq
427 MPMCQueue<int,Atom> cq(numThreads);
430 auto beginMicro = nowMicro();
432 std::vector<std::thread> threads(numThreads);
433 std::atomic<uint64_t> sum(0);
434 for (int t = 0; t < numThreads; ++t) {
435 threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
436 numThreads, n, std::ref(cq), std::ref(sum), t));
438 for (auto& t : threads) {
441 EXPECT_TRUE(cq.isEmpty());
442 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
444 return nowMicro() - beginMicro;
447 TEST(MPMCQueue, mt_never_fail) {
448 int nts[] = { 1, 3, 100 };
452 uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
453 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
458 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
459 int nts[] = { 1, 3, 100 };
463 uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
464 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
469 TEST(MPMCQueue, mt_never_fail_deterministic) {
470 int nts[] = { 3, 10 };
472 long seed = 0; // nowMicro() % 10000;
473 LOG(INFO) << "using seed " << seed;
478 DSched sched(DSched::uniform(seed));
479 runNeverFailTest<DeterministicAtomic>(nt, n);
482 DSched sched(DSched::uniformSubset(seed, 2));
483 runNeverFailTest<DeterministicAtomic>(nt, n);
488 enum LifecycleEvent {
500 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
501 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
503 static int lc_outstanding() {
504 return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
505 lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
506 lc_counts[DESTRUCTOR];
509 static void lc_snap() {
510 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
511 lc_prev[i] = lc_counts[i];
515 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
517 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
518 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
519 int delta = i == what || i == what2 ? 1 : 0;
520 EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
521 << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
522 << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
523 << ", from line " << lineno;
528 template <typename R>
530 typedef R IsRelocatable;
534 Lifecycle() noexcept : constructed(true) {
535 ++lc_counts[DEFAULT_CONSTRUCTOR];
538 explicit Lifecycle(int /* n */, char const* /* s */) noexcept
539 : constructed(true) {
540 ++lc_counts[TWO_ARG_CONSTRUCTOR];
543 Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
544 ++lc_counts[COPY_CONSTRUCTOR];
547 Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
548 ++lc_counts[MOVE_CONSTRUCTOR];
551 Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
552 ++lc_counts[COPY_OPERATOR];
556 Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
557 ++lc_counts[MOVE_OPERATOR];
561 ~Lifecycle() noexcept {
562 ++lc_counts[DESTRUCTOR];
563 assert(lc_outstanding() >= 0);
569 template <typename R>
570 void runPerfectForwardingTest() {
572 EXPECT_EQ(lc_outstanding(), 0);
575 MPMCQueue<Lifecycle<R>> queue(50);
576 LIFECYCLE_STEP(NOTHING);
578 for (int pass = 0; pass < 10; ++pass) {
579 for (int i = 0; i < 10; ++i) {
580 queue.blockingWrite();
581 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
583 queue.blockingWrite(1, "one");
584 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
588 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
589 queue.blockingWrite(std::move(src));
590 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
592 LIFECYCLE_STEP(DESTRUCTOR);
596 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
597 queue.blockingWrite(src);
598 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
600 LIFECYCLE_STEP(DESTRUCTOR);
602 EXPECT_TRUE(queue.write());
603 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
606 EXPECT_EQ(queue.size(), 50);
607 EXPECT_FALSE(queue.write(2, "two"));
608 LIFECYCLE_STEP(NOTHING);
610 for (int i = 0; i < 50; ++i) {
613 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
615 queue.blockingRead(node);
617 // relocatable, moved via memcpy
618 LIFECYCLE_STEP(DESTRUCTOR);
620 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
623 LIFECYCLE_STEP(DESTRUCTOR);
626 EXPECT_EQ(queue.size(), 0);
629 // put one element back before destruction
631 Lifecycle<R> src(3, "three");
632 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
633 queue.write(std::move(src));
634 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
636 LIFECYCLE_STEP(DESTRUCTOR); // destroy src
638 LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
640 EXPECT_EQ(lc_outstanding(), 0);
643 TEST(MPMCQueue, perfect_forwarding) {
644 runPerfectForwardingTest<std::false_type>();
647 TEST(MPMCQueue, perfect_forwarding_relocatable) {
648 runPerfectForwardingTest<std::true_type>();
651 TEST(MPMCQueue, queue_moving) {
653 EXPECT_EQ(lc_outstanding(), 0);
656 MPMCQueue<Lifecycle<std::false_type>> a(50);
657 LIFECYCLE_STEP(NOTHING);
660 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
663 MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
664 LIFECYCLE_STEP(NOTHING);
665 EXPECT_EQ(a.capacity(), 0);
666 EXPECT_EQ(a.size(), 0);
667 EXPECT_EQ(b.capacity(), 50);
668 EXPECT_EQ(b.size(), 1);
671 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
674 MPMCQueue<Lifecycle<std::false_type>> c;
675 LIFECYCLE_STEP(NOTHING);
677 LIFECYCLE_STEP(NOTHING);
678 EXPECT_EQ(c.capacity(), 50);
679 EXPECT_EQ(c.size(), 2);
682 Lifecycle<std::false_type> dst;
683 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
685 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
689 MPMCQueue<Lifecycle<std::false_type>> d(10);
690 LIFECYCLE_STEP(NOTHING);
692 LIFECYCLE_STEP(NOTHING);
693 EXPECT_EQ(c.capacity(), 10);
694 EXPECT_TRUE(c.isEmpty());
695 EXPECT_EQ(d.capacity(), 50);
696 EXPECT_EQ(d.size(), 1);
699 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
701 c.blockingWrite(dst);
702 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
704 d.blockingWrite(std::move(dst));
705 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
706 } // d goes out of scope
707 LIFECYCLE_STEP(DESTRUCTOR);
708 } // dst goes out of scope
709 LIFECYCLE_STEP(DESTRUCTOR);
710 } // c goes out of scope
711 LIFECYCLE_STEP(DESTRUCTOR);
714 TEST(MPMCQueue, explicit_zero_capacity_fail) {
715 ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);