2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/MPMCQueue.h>
18 #include <folly/Format.h>
19 #include <folly/Memory.h>
20 #include <folly/test/DeterministicSchedule.h>
22 #include <boost/intrusive_ptr.hpp>
29 #include <sys/resource.h>
31 #include <gflags/gflags.h>
32 #include <gtest/gtest.h>
34 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
36 using namespace folly;
37 using namespace detail;
40 typedef DeterministicSchedule DSched;
43 template <template<typename> class Atom>
44 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
45 TurnSequencer<Atom> seq(init);
46 Atom<int> spinThreshold(0);
49 std::vector<std::thread> threads(numThreads);
50 for (int i = 0; i < numThreads; ++i) {
51 threads[i] = DSched::thread([&, i]{
52 for (int op = i; op < numOps; op += numThreads) {
53 seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
54 EXPECT_EQ(prev, op - 1);
56 seq.completeTurn(init + op);
61 for (auto& thr : threads) {
65 EXPECT_EQ(prev, numOps - 1);
68 TEST(MPMCQueue, sequencer) {
69 run_mt_sequencer_test<std::atomic>(1, 100, 0);
70 run_mt_sequencer_test<std::atomic>(2, 100000, -100);
71 run_mt_sequencer_test<std::atomic>(100, 10000, -100);
74 TEST(MPMCQueue, sequencer_deterministic) {
75 DSched sched(DSched::uniform(0));
76 run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
77 run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
78 run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
82 void runElementTypeTest(T&& src) {
84 cq.blockingWrite(std::move(src));
86 cq.blockingRead(dest);
87 EXPECT_TRUE(cq.write(std::move(dest)));
88 EXPECT_TRUE(cq.read(dest));
92 mutable std::atomic<int> rc;
94 RefCounted() : rc(0) {}
97 void intrusive_ptr_add_ref(RefCounted const* p) {
101 void intrusive_ptr_release(RefCounted const* p) {
107 TEST(MPMCQueue, lots_of_element_types) {
108 runElementTypeTest(10);
109 runElementTypeTest(std::string("abc"));
110 runElementTypeTest(std::make_pair(10, std::string("def")));
111 runElementTypeTest(std::vector<std::string>{ { "abc" } });
112 runElementTypeTest(std::make_shared<char>('a'));
113 runElementTypeTest(folly::make_unique<char>('a'));
114 runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
117 TEST(MPMCQueue, single_thread_enqdeq) {
118 MPMCQueue<int> cq(10);
120 for (int pass = 0; pass < 10; ++pass) {
121 for (int i = 0; i < 10; ++i) {
122 EXPECT_TRUE(cq.write(i));
124 EXPECT_FALSE(cq.write(-1));
125 EXPECT_FALSE(cq.isEmpty());
126 EXPECT_EQ(cq.size(), 10);
128 for (int i = 0; i < 5; ++i) {
130 EXPECT_TRUE(cq.read(dest));
133 for (int i = 5; i < 10; ++i) {
135 cq.blockingRead(dest);
139 EXPECT_FALSE(cq.read(dest));
142 EXPECT_TRUE(cq.isEmpty());
143 EXPECT_EQ(cq.size(), 0);
147 TEST(MPMCQueue, tryenq_capacity_test) {
148 for (size_t cap = 1; cap < 100; ++cap) {
149 MPMCQueue<int> cq(cap);
150 for (int i = 0; i < cap; ++i) {
151 EXPECT_TRUE(cq.write(i));
153 EXPECT_FALSE(cq.write(100));
157 TEST(MPMCQueue, enq_capacity_test) {
158 for (auto cap : { 1, 100, 10000 }) {
159 MPMCQueue<int> cq(cap);
160 for (int i = 0; i < cap; ++i) {
165 auto thr = std::thread([&]{
166 cq.blockingWrite(100);
172 cq.blockingRead(dummy);
178 template <template<typename> class Atom>
179 void runTryEnqDeqTest(int numThreads, int numOps) {
180 // write and read aren't linearizable, so we don't have
181 // hard guarantees on their individual behavior. We can still test
182 // correctness in aggregate
183 MPMCQueue<int,Atom> cq(numThreads);
186 std::vector<std::thread> threads(numThreads);
187 std::atomic<uint64_t> sum(0);
188 for (int t = 0; t < numThreads; ++t) {
189 threads[t] = DSched::thread([&,t]{
190 uint64_t threadSum = 0;
192 // received doesn't reflect any actual values, we just start with
193 // t and increment by numThreads to get the rounding of termination
194 // correct if numThreads doesn't evenly divide numOps
196 while (src < n || received < n) {
197 if (src < n && cq.write(src)) {
202 if (received < n && cq.read(dst)) {
203 received += numThreads;
210 for (auto& t : threads) {
213 EXPECT_TRUE(cq.isEmpty());
214 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
217 TEST(MPMCQueue, mt_try_enq_deq) {
218 int nts[] = { 1, 3, 100 };
222 runTryEnqDeqTest<std::atomic>(nt, n);
226 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
227 int nts[] = { 3, 10 };
230 LOG(INFO) << "using seed " << seed;
235 DSched sched(DSched::uniform(seed));
236 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
239 DSched sched(DSched::uniformSubset(seed, 2));
240 runTryEnqDeqTest<DeterministicAtomic>(nt, n);
245 uint64_t nowMicro() {
247 gettimeofday(&tv, 0);
248 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
251 template <typename Q>
252 std::string producerConsumerBench(Q&& queue, std::string qName,
253 int numProducers, int numConsumers,
254 int numOps, bool ignoreContents = false) {
257 struct rusage beginUsage;
258 getrusage(RUSAGE_SELF, &beginUsage);
260 auto beginMicro = nowMicro();
263 std::atomic<uint64_t> sum(0);
265 std::vector<std::thread> producers(numProducers);
266 for (int t = 0; t < numProducers; ++t) {
267 producers[t] = DSched::thread([&,t]{
268 for (int i = t; i < numOps; i += numProducers) {
274 std::vector<std::thread> consumers(numConsumers);
275 for (int t = 0; t < numConsumers; ++t) {
276 consumers[t] = DSched::thread([&,t]{
277 uint64_t localSum = 0;
278 for (int i = t; i < numOps; i += numConsumers) {
280 q.blockingRead(dest);
281 EXPECT_FALSE(dest == -1);
288 for (auto& t : producers) {
291 for (auto& t : consumers) {
294 if (!ignoreContents) {
295 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
298 auto endMicro = nowMicro();
300 struct rusage endUsage;
301 getrusage(RUSAGE_SELF, &endUsage);
303 uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
304 long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
305 (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
307 return folly::format(
308 "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff",
309 qName, numProducers, numConsumers, nanosPer, csw, n).str();
313 TEST(MPMCQueue, mt_prod_cons_deterministic) {
314 // we use the Bench method, but perf results are meaningless under DSched
315 DSched sched(DSched::uniform(0));
317 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
319 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
321 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
323 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
325 producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1),
329 #define PC_BENCH(q, np, nc, ...) \
330 producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
332 TEST(MPMCQueue, mt_prod_cons) {
334 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n);
335 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n);
336 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n);
337 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n);
338 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n);
339 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n);
340 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n);
341 LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n);
342 LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
345 template <template<typename> class Atom>
346 uint64_t runNeverFailTest(int numThreads, int numOps) {
347 // always #enq >= #deq
348 MPMCQueue<int,Atom> cq(numThreads);
351 auto beginMicro = nowMicro();
353 std::vector<std::thread> threads(numThreads);
354 std::atomic<uint64_t> sum(0);
355 for (int t = 0; t < numThreads; ++t) {
356 threads[t] = DSched::thread([&,t]{
357 uint64_t threadSum = 0;
358 for (int i = t; i < n; i += numThreads) {
360 EXPECT_TRUE(cq.writeIfNotFull(i));
363 EXPECT_TRUE(cq.readIfNotEmpty(dest));
364 EXPECT_TRUE(dest >= 0);
370 for (auto& t : threads) {
373 EXPECT_TRUE(cq.isEmpty());
374 EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
376 return nowMicro() - beginMicro;
379 TEST(MPMCQueue, mt_never_fail) {
380 int nts[] = { 1, 3, 100 };
384 uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
385 LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
390 TEST(MPMCQueue, mt_never_fail_deterministic) {
391 int nts[] = { 3, 10 };
393 long seed = 0; // nowMicro() % 10000;
394 LOG(INFO) << "using seed " << seed;
399 DSched sched(DSched::uniform(seed));
400 runNeverFailTest<DeterministicAtomic>(nt, n);
403 DSched sched(DSched::uniformSubset(seed, 2));
404 runNeverFailTest<DeterministicAtomic>(nt, n);
409 enum LifecycleEvent {
421 static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
422 static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
424 static int lc_outstanding() {
425 return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
426 lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
427 lc_counts[DESTRUCTOR];
430 static void lc_snap() {
431 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
432 lc_prev[i] = lc_counts[i];
436 #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
438 static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
439 for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
440 int delta = i == what || i == what2 ? 1 : 0;
441 EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
442 << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
443 << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
444 << ", from line " << lineno;
449 template <typename R>
451 typedef R IsRelocatable;
455 Lifecycle() noexcept : constructed(true) {
456 ++lc_counts[DEFAULT_CONSTRUCTOR];
459 explicit Lifecycle(int n, char const* s) noexcept : constructed(true) {
460 ++lc_counts[TWO_ARG_CONSTRUCTOR];
463 Lifecycle(const Lifecycle& rhs) noexcept : constructed(true) {
464 ++lc_counts[COPY_CONSTRUCTOR];
467 Lifecycle(Lifecycle&& rhs) noexcept : constructed(true) {
468 ++lc_counts[MOVE_CONSTRUCTOR];
471 Lifecycle& operator= (const Lifecycle& rhs) noexcept {
472 ++lc_counts[COPY_OPERATOR];
476 Lifecycle& operator= (Lifecycle&& rhs) noexcept {
477 ++lc_counts[MOVE_OPERATOR];
481 ~Lifecycle() noexcept {
482 ++lc_counts[DESTRUCTOR];
483 assert(lc_outstanding() >= 0);
489 template <typename R>
490 void runPerfectForwardingTest() {
492 EXPECT_EQ(lc_outstanding(), 0);
495 MPMCQueue<Lifecycle<R>> queue(50);
496 LIFECYCLE_STEP(NOTHING);
498 for (int pass = 0; pass < 10; ++pass) {
499 for (int i = 0; i < 10; ++i) {
500 queue.blockingWrite();
501 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
503 queue.blockingWrite(1, "one");
504 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
508 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
509 queue.blockingWrite(std::move(src));
510 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
512 LIFECYCLE_STEP(DESTRUCTOR);
516 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
517 queue.blockingWrite(src);
518 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
520 LIFECYCLE_STEP(DESTRUCTOR);
522 EXPECT_TRUE(queue.write());
523 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
526 EXPECT_EQ(queue.size(), 50);
527 EXPECT_FALSE(queue.write(2, "two"));
528 LIFECYCLE_STEP(NOTHING);
530 for (int i = 0; i < 50; ++i) {
533 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
535 queue.blockingRead(node);
537 // relocatable, moved via memcpy
538 LIFECYCLE_STEP(DESTRUCTOR);
540 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
543 LIFECYCLE_STEP(DESTRUCTOR);
546 EXPECT_EQ(queue.size(), 0);
549 // put one element back before destruction
551 Lifecycle<R> src(3, "three");
552 LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
553 queue.write(std::move(src));
554 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
556 LIFECYCLE_STEP(DESTRUCTOR); // destroy src
558 LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
560 EXPECT_EQ(lc_outstanding(), 0);
563 TEST(MPMCQueue, perfect_forwarding) {
564 runPerfectForwardingTest<std::false_type>();
567 TEST(MPMCQueue, perfect_forwarding_relocatable) {
568 runPerfectForwardingTest<std::true_type>();
571 TEST(MPMCQueue, queue_moving) {
573 EXPECT_EQ(lc_outstanding(), 0);
576 MPMCQueue<Lifecycle<std::false_type>> a(50);
577 LIFECYCLE_STEP(NOTHING);
580 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
583 MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
584 LIFECYCLE_STEP(NOTHING);
585 EXPECT_EQ(a.capacity(), 0);
586 EXPECT_EQ(a.size(), 0);
587 EXPECT_EQ(b.capacity(), 50);
588 EXPECT_EQ(b.size(), 1);
591 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
594 MPMCQueue<Lifecycle<std::false_type>> c;
595 LIFECYCLE_STEP(NOTHING);
597 LIFECYCLE_STEP(NOTHING);
598 EXPECT_EQ(c.capacity(), 50);
599 EXPECT_EQ(c.size(), 2);
602 Lifecycle<std::false_type> dst;
603 LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
605 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
609 MPMCQueue<Lifecycle<std::false_type>> d(10);
610 LIFECYCLE_STEP(NOTHING);
612 LIFECYCLE_STEP(NOTHING);
613 EXPECT_EQ(c.capacity(), 10);
614 EXPECT_TRUE(c.isEmpty());
615 EXPECT_EQ(d.capacity(), 50);
616 EXPECT_EQ(d.size(), 1);
619 LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
621 c.blockingWrite(dst);
622 LIFECYCLE_STEP(COPY_CONSTRUCTOR);
624 d.blockingWrite(std::move(dst));
625 LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
626 } // d goes out of scope
627 LIFECYCLE_STEP(DESTRUCTOR);
628 } // dst goes out of scope
629 LIFECYCLE_STEP(DESTRUCTOR);
630 } // c goes out of scope
631 LIFECYCLE_STEP(DESTRUCTOR);
634 int main(int argc, char ** argv) {
635 testing::InitGoogleTest(&argc, argv);
636 gflags::ParseCommandLineFlags(&argc, &argv, true);
637 return RUN_ALL_TESTS();