2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/test/DeterministicSchedule.h>
22 #include <boost/intrusive_ptr.hpp>
29 #include <sys/resource.h>
31 #include <gflags/gflags.h>
32 #include <gtest/gtest.h>
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
36 using namespace folly;
37 using namespace detail;
40 typedef DeterministicSchedule DSched;
42 template <template<typename> class Atom>
43 void run_mt_sequencer_thread(
47 TurnSequencer<Atom>& seq,
48 Atom<uint32_t>& spinThreshold,
51 for (int op = i; op < numOps; op += numThreads) {
52 seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
53 EXPECT_EQ(prev, op - 1);
55 seq.completeTurn(init + op);
59 template <template<typename> class Atom>
60 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
61 TurnSequencer<Atom> seq(init);
62 Atom<uint32_t> spinThreshold(0);
65 std::vector<std::thread> threads(numThreads);
66 for (int i = 0; i < numThreads; ++i) {
67 threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
68 numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
72 for (auto& thr : threads) {
76 EXPECT_EQ(prev, numOps - 1);
79 TEST(MPMCQueue, sequencer) {
80 run_mt_sequencer_test<std::atomic>(1, 100, 0);
81 run_mt_sequencer_test<std::atomic>(2, 100000, -100);
82 run_mt_sequencer_test<std::atomic>(100, 10000, -100);
85 TEST(MPMCQueue, sequencer_emulated_futex) {
86 run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
87 run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
88 run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
91 TEST(MPMCQueue, sequencer_deterministic) {
92 DSched sched(DSched::uniform(0));
93 run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
94 run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
95 run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
99 void runElementTypeTest(T&& src) {
101 cq.blockingWrite(std::move(src));
103 cq.blockingRead(dest);
104 EXPECT_TRUE(cq.write(std::move(dest)));
105 EXPECT_TRUE(cq.read(dest));
109 static __thread int active_instances;
111 mutable std::atomic<int> rc;
113 RefCounted() : rc(0) {
121 __thread int RefCounted::active_instances;
124 void intrusive_ptr_add_ref(RefCounted const* p) {
128 void intrusive_ptr_release(RefCounted const* p) {
129 if (--(p->rc) == 0) {
134 TEST(MPMCQueue, lots_of_element_types) {
135 runElementTypeTest(10);
136 runElementTypeTest(std::string("abc"));
137 runElementTypeTest(std::make_pair(10, std::string("def")));
138 runElementTypeTest(std::vector<std::string>{ { "abc" } });
139 runElementTypeTest(std::make_shared<char>('a'));
140 runElementTypeTest(folly::make_unique<char>('a'));
141 runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
142 EXPECT_EQ(RefCounted::active_instances, 0);
145 TEST(MPMCQueue, single_thread_enqdeq) {
146 MPMCQueue<int> cq(10);
148 for (int pass = 0; pass < 10; ++pass) {
149 for (int i = 0; i < 10; ++i) {
150 EXPECT_TRUE(cq.write(i));
152 EXPECT_FALSE(cq.write(-1));
153 EXPECT_FALSE(cq.isEmpty());
154 EXPECT_EQ(cq.size(), 10);
156 for (int i = 0; i < 5; ++i) {
158 EXPECT_TRUE(cq.read(dest));
161 for (int i = 5; i < 10; ++i) {
163 cq.blockingRead(dest);
167 EXPECT_FALSE(cq.read(dest));
170 EXPECT_TRUE(cq.isEmpty());
171 EXPECT_EQ(cq.size(), 0);
175 TEST(MPMCQueue, tryenq_capacity_test) {
176 for (size_t cap = 1; cap < 100; ++cap) {
177 MPMCQueue<int> cq(cap);
178 for (size_t i = 0; i < cap; ++i) {
179 EXPECT_TRUE(cq.write(i));
181 EXPECT_FALSE(cq.write(100));
185 TEST(MPMCQueue, enq_capacity_test) {
186 for (auto cap : { 1, 100, 10000 }) {
187 MPMCQueue<int> cq(cap);
188 for (int i = 0; i < cap; ++i) {
193 auto thr = std::thread([&]{
194 cq.blockingWrite(100);
200 cq.blockingRead(dummy);
206 template <template<typename> class Atom>
207 void runTryEnqDeqThread(
210 MPMCQueue<int, Atom>& cq,
211 std::atomic<uint64_t>& sum,
213 uint64_t threadSum = 0;
215 // received doesn't reflect any actual values, we just start with
216 // t and increment by numThreads to get the rounding of termination
217 // correct if numThreads doesn't evenly divide numOps
219 while (src < n || received < n) {
220 if (src < n && cq.write(src)) {
225 if (received < n && cq.read(dst)) {
226 received += numThreads;
233 template <template<typename> class Atom>
234 void runTryEnqDeqTest(int numThreads, int numOps) {
235 // write and read aren't linearizable, so we don't have
236 // hard guarantees on their individual behavior. We can still test
237 // correctness in aggregate
238 MPMCQueue<int,Atom> cq(numThreads);
241 std::vector<std::thread> threads(numThreads);
242 std::atomic<uint64_t> sum(0);
243 for (int t = 0; t < numThreads; ++t) {
244 threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
245 numThreads, n, std::ref(cq), std::ref(sum), t));
247 for (auto& t : threads) {
250 EXPECT_TRUE(cq.isEmpty());
251 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
254 TEST(MPMCQueue, mt_try_enq_deq) {
255 int nts[] = { 1, 3, 100 };
259 runTryEnqDeqTest<std::atomic>(nt, n);
263 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
264 int nts[] = { 1, 3, 100 };
268 runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
272 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
273 int nts[] = { 3, 10 };
276 LOG(INFO) << "using seed " << seed;
281 DSched sched(DSched::uniform(seed));
282 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
285 DSched sched(DSched::uniformSubset(seed, 2));
286 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
291 uint64_t nowMicro() {
293 gettimeofday(&tv, 0);
294 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
297 template <typename Q>
298 std::string producerConsumerBench(Q&& queue, std::string qName,
299 int numProducers, int numConsumers,
300 int numOps, bool ignoreContents = false) {
303 struct rusage beginUsage;
304 getrusage(RUSAGE_SELF, &beginUsage);
306 auto beginMicro = nowMicro();
309 std::atomic<uint64_t> sum(0);
311 std::vector<std::thread> producers(numProducers);
312 for (int t = 0; t < numProducers; ++t) {
313 producers[t] = DSched::thread([&,t]{
314 for (int i = t; i < numOps; i += numProducers) {
320 std::vector<std::thread> consumers(numConsumers);
321 for (int t = 0; t < numConsumers; ++t) {
322 consumers[t] = DSched::thread([&,t]{
323 uint64_t localSum = 0;
324 for (int i = t; i < numOps; i += numConsumers) {
326 q.blockingRead(dest);
327 EXPECT_FALSE(dest == -1);
334 for (auto& t : producers) {
337 for (auto& t : consumers) {
340 if (!ignoreContents) {
341 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
344 auto endMicro = nowMicro();
346 struct rusage endUsage;
347 getrusage(RUSAGE_SELF, &endUsage);
349 uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
350 long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
351 (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
353 return folly::format(
354 "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff",
355 qName, numProducers, numConsumers, nanosPer, csw, n).str();
359 TEST(MPMCQueue, mt_prod_cons_deterministic) {
360 // we use the Bench method, but perf results are meaningless under DSched
361 DSched sched(DSched::uniform(0));
363 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
365 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
367 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
369 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
371 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1),
375 #define PC_BENCH(q, np, nc, ...) \
376 producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
378 TEST(MPMCQueue, mt_prod_cons) {
380 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n);
381 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n);
382 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n);
383 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n);
384 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n);
385 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n);
386 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n);
387 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n);
388 LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
391 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
393 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 1, n);
394 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 1, n);
395 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 10, n);
396 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 10, n);
397 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 1, n);
398 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 1, n);
399 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 10, n);
400 LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 10, n);
402 << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(100000)), 32, 100, n);
405 template <template<typename> class Atom>
406 void runNeverFailThread(
409 MPMCQueue<int, Atom>& cq,
410 std::atomic<uint64_t>& sum,
412 uint64_t threadSum = 0;
413 for (int i = t; i < n; i += numThreads) {
415 EXPECT_TRUE(cq.writeIfNotFull(i));
418 EXPECT_TRUE(cq.readIfNotEmpty(dest));
419 EXPECT_TRUE(dest >= 0);
425 template <template<typename> class Atom>
426 uint64_t runNeverFailTest(int numThreads, int numOps) {
427 // always #enq >= #deq
428 MPMCQueue<int,Atom> cq(numThreads);
431 auto beginMicro = nowMicro();
433 std::vector<std::thread> threads(numThreads);
434 std::atomic<uint64_t> sum(0);
435 for (int t = 0; t < numThreads; ++t) {
436 threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
437 numThreads, n, std::ref(cq), std::ref(sum), t));
439 for (auto& t : threads) {
442 EXPECT_TRUE(cq.isEmpty());
443 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
445 return nowMicro() - beginMicro;
448 TEST(MPMCQueue, mt_never_fail) {
449 int nts[] = { 1, 3, 100 };
453 uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
454 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
459 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
460 int nts[] = { 1, 3, 100 };
464 uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
465 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
470 TEST(MPMCQueue, mt_never_fail_deterministic) {
471 int nts[] = { 3, 10 };
473 long seed = 0; // nowMicro() % 10000;
474 LOG(INFO) << "using seed " << seed;
479 DSched sched(DSched::uniform(seed));
480 runNeverFailTest<DeterministicAtomic>(nt, n);
483 DSched sched(DSched::uniformSubset(seed, 2));
484 runNeverFailTest<DeterministicAtomic>(nt, n);
489 enum LifecycleEvent {
501 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
502 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
504 static int lc_outstanding() {
505 return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
506 lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
507 lc_counts[DESTRUCTOR];
510 static void lc_snap() {
511 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
512 lc_prev[i] = lc_counts[i];
516 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
518 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
519 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
520 int delta = i == what || i == what2 ? 1 : 0;
521 EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
522 << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
523 << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
524 << ", from line " << lineno;
529 template <typename R>
531 typedef R IsRelocatable;
535 Lifecycle() noexcept : constructed(true) {
536 ++lc_counts[DEFAULT_CONSTRUCTOR];
539 explicit Lifecycle(int n, char const* s) noexcept : constructed(true) {
540 ++lc_counts[TWO_ARG_CONSTRUCTOR];
543 Lifecycle(const Lifecycle& rhs) noexcept : constructed(true) {
544 ++lc_counts[COPY_CONSTRUCTOR];
547 Lifecycle(Lifecycle&& rhs) noexcept : constructed(true) {
548 ++lc_counts[MOVE_CONSTRUCTOR];
551 Lifecycle& operator= (const Lifecycle& rhs) noexcept {
552 ++lc_counts[COPY_OPERATOR];
556 Lifecycle& operator= (Lifecycle&& rhs) noexcept {
557 ++lc_counts[MOVE_OPERATOR];
561 ~Lifecycle() noexcept {
562 ++lc_counts[DESTRUCTOR];
563 assert(lc_outstanding() >= 0);
569 template <typename R>
570 void runPerfectForwardingTest() {
572 EXPECT_EQ(lc_outstanding(), 0);
575 MPMCQueue<Lifecycle<R>> queue(50);
576 LIFECYCLE_STEP(NOTHING);
578 for (int pass = 0; pass < 10; ++pass) {
579 for (int i = 0; i < 10; ++i) {
580 queue.blockingWrite();
581 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
583 queue.blockingWrite(1, "one");
584 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
588 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
589 queue.blockingWrite(std::move(src));
590 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
592 LIFECYCLE_STEP(DESTRUCTOR);
596 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
597 queue.blockingWrite(src);
598 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
600 LIFECYCLE_STEP(DESTRUCTOR);
602 EXPECT_TRUE(queue.write());
603 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
606 EXPECT_EQ(queue.size(), 50);
607 EXPECT_FALSE(queue.write(2, "two"));
608 LIFECYCLE_STEP(NOTHING);
610 for (int i = 0; i < 50; ++i) {
613 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
615 queue.blockingRead(node);
617 // relocatable, moved via memcpy
618 LIFECYCLE_STEP(DESTRUCTOR);
620 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
623 LIFECYCLE_STEP(DESTRUCTOR);
626 EXPECT_EQ(queue.size(), 0);
629 // put one element back before destruction
631 Lifecycle<R> src(3, "three");
632 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
633 queue.write(std::move(src));
634 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
636 LIFECYCLE_STEP(DESTRUCTOR); // destroy src
638 LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
640 EXPECT_EQ(lc_outstanding(), 0);
643 TEST(MPMCQueue, perfect_forwarding) {
644 runPerfectForwardingTest<std::false_type>();
647 TEST(MPMCQueue, perfect_forwarding_relocatable) {
648 runPerfectForwardingTest<std::true_type>();
651 TEST(MPMCQueue, queue_moving) {
653 EXPECT_EQ(lc_outstanding(), 0);
656 MPMCQueue<Lifecycle<std::false_type>> a(50);
657 LIFECYCLE_STEP(NOTHING);
660 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
663 MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
664 LIFECYCLE_STEP(NOTHING);
665 EXPECT_EQ(a.capacity(), 0);
666 EXPECT_EQ(a.size(), 0);
667 EXPECT_EQ(b.capacity(), 50);
668 EXPECT_EQ(b.size(), 1);
671 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
674 MPMCQueue<Lifecycle<std::false_type>> c;
675 LIFECYCLE_STEP(NOTHING);
677 LIFECYCLE_STEP(NOTHING);
678 EXPECT_EQ(c.capacity(), 50);
679 EXPECT_EQ(c.size(), 2);
682 Lifecycle<std::false_type> dst;
683 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
685 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
689 MPMCQueue<Lifecycle<std::false_type>> d(10);
690 LIFECYCLE_STEP(NOTHING);
692 LIFECYCLE_STEP(NOTHING);
693 EXPECT_EQ(c.capacity(), 10);
694 EXPECT_TRUE(c.isEmpty());
695 EXPECT_EQ(d.capacity(), 50);
696 EXPECT_EQ(d.size(), 1);
699 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
701 c.blockingWrite(dst);
702 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
704 d.blockingWrite(std::move(dst));
705 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
706 } // d goes out of scope
707 LIFECYCLE_STEP(DESTRUCTOR);
708 } // dst goes out of scope
709 LIFECYCLE_STEP(DESTRUCTOR);
710 } // c goes out of scope
711 LIFECYCLE_STEP(DESTRUCTOR);
714 TEST(MPMCQueue, explicit_zero_capacity_fail) {
715 ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
719 int main(int argc, char ** argv) {
720 testing::InitGoogleTest(&argc, argv);
721 gflags::ParseCommandLineFlags(&argc, &argv, true);
722 return RUN_ALL_TESTS();