X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Ftest%2FMPMCQueueTest.cpp;h=f556ebbf326d9442822abd153d52e23be0d5da97;hp=d42f402ab0f96d69e27ef5bc59deeabdfb233714;hb=f34acf2cbdfd496b88611f725852f774290ef234;hpb=6ca321dc723345b81022178f5d0e67004b4f0cdf diff --git a/folly/test/MPMCQueueTest.cpp b/folly/test/MPMCQueueTest.cpp index d42f402a..f556ebbf 100644 --- a/folly/test/MPMCQueueTest.cpp +++ b/folly/test/MPMCQueueTest.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2016 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,21 +14,22 @@ * limitations under the License. */ -#include #include +#include #include +#include +#include #include +#include +#include #include #include -#include +#include #include +#include #include #include -#include -#include - -#include FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr); @@ -101,9 +102,9 @@ TEST(MPMCQueue, sequencer_deterministic) { run_mt_sequencer_test(10, 1000, -100); } -template +template void runElementTypeTest(T&& src) { - MPMCQueue cq(10); + MPMCQueue cq(10); cq.blockingWrite(std::forward(src)); T dest; cq.blockingRead(dest); @@ -118,7 +119,7 @@ void runElementTypeTest(T&& src) { } struct RefCounted { - static __thread int active_instances; + static FOLLY_TLS int active_instances; mutable std::atomic rc; @@ -130,8 +131,7 @@ struct RefCounted { --active_instances; } }; -__thread int RefCounted::active_instances; - +FOLLY_TLS int RefCounted::active_instances; void intrusive_ptr_add_ref(RefCounted const* p) { p->rc++; @@ -149,12 +149,26 @@ TEST(MPMCQueue, lots_of_element_types) { runElementTypeTest(std::make_pair(10, string("def"))); runElementTypeTest(vector{{"abc"}}); runElementTypeTest(std::make_shared('a')); - runElementTypeTest(folly::make_unique('a')); + runElementTypeTest(std::make_unique('a')); runElementTypeTest(boost::intrusive_ptr(new RefCounted)); EXPECT_EQ(RefCounted::active_instances, 0); } +TEST(MPMCQueue, lots_of_element_types_dynamic) { + runElementTypeTest(10); + runElementTypeTest(string("abc")); + runElementTypeTest(std::make_pair(10, string("def"))); + runElementTypeTest(vector{{"abc"}}); + runElementTypeTest(std::make_shared('a')); + runElementTypeTest(std::make_unique('a')); + runElementTypeTest(boost::intrusive_ptr(new RefCounted)); + EXPECT_EQ(RefCounted::active_instances, 0); +} + TEST(MPMCQueue, single_thread_enqdeq) { + // Non-dynamic version only. + // False positive for dynamic version. Capacity can be temporarily + // higher than specified. MPMCQueue cq(10); for (int pass = 0; pass < 10; ++pass) { @@ -185,6 +199,9 @@ TEST(MPMCQueue, single_thread_enqdeq) { } TEST(MPMCQueue, tryenq_capacity_test) { + // Non-dynamic version only. + // False positive for dynamic version. Capacity can be temporarily + // higher than specified. for (size_t cap = 1; cap < 100; ++cap) { MPMCQueue cq(cap); for (size_t i = 0; i < cap; ++i) { @@ -195,6 +212,9 @@ TEST(MPMCQueue, tryenq_capacity_test) { } TEST(MPMCQueue, enq_capacity_test) { + // Non-dynamic version only. + // False positive for dynamic version. Capacity can be temporarily + // higher than specified. for (auto cap : { 1, 100, 10000 }) { MPMCQueue cq(cap); for (int i = 0; i < cap; ++i) { @@ -215,11 +235,11 @@ TEST(MPMCQueue, enq_capacity_test) { } } -template class Atom> +template class Atom, bool Dynamic = false> void runTryEnqDeqThread( int numThreads, int n, /*numOps*/ - MPMCQueue& cq, + MPMCQueue& cq, std::atomic& sum, int t) { uint64_t threadSum = 0; @@ -242,18 +262,18 @@ void runTryEnqDeqThread( sum += threadSum; } -template class Atom> +template class Atom, bool Dynamic = false> void runTryEnqDeqTest(int numThreads, int numOps) { // write and read aren't linearizable, so we don't have // hard guarantees on their individual behavior. We can still test // correctness in aggregate - MPMCQueue cq(numThreads); + MPMCQueue cq(numThreads); uint64_t n = numOps; vector threads(numThreads); std::atomic sum(0); for (int t = 0; t < numThreads; ++t) { - threads[t] = DSched::thread(std::bind(runTryEnqDeqThread, + threads[t] = DSched::thread(std::bind(runTryEnqDeqThread, numThreads, n, std::ref(cq), std::ref(sum), t)); } for (auto& t : threads) { @@ -272,6 +292,15 @@ TEST(MPMCQueue, mt_try_enq_deq) { } } +TEST(MPMCQueue, mt_try_enq_deq_dynamic) { + int nts[] = { 1, 3, 100 }; + + int n = 100000; + for (int nt : nts) { + runTryEnqDeqTest(nt, n); + } +} + TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) { int nts[] = { 1, 3, 100 }; @@ -281,6 +310,15 @@ TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) { } } +TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) { + int nts[] = { 1, 3, 100 }; + + int n = 100000; + for (int nt : nts) { + runTryEnqDeqTest(nt, n); + } +} + TEST(MPMCQueue, mt_try_enq_deq_deterministic) { int nts[] = { 3, 10 }; @@ -297,6 +335,14 @@ TEST(MPMCQueue, mt_try_enq_deq_deterministic) { DSched sched(DSched::uniformSubset(seed, 2)); runTryEnqDeqTest(nt, n); } + { + DSched sched(DSched::uniform(seed)); + runTryEnqDeqTest(nt, n); + } + { + DSched sched(DSched::uniformSubset(seed, 2)); + runTryEnqDeqTest(nt, n); + } } } @@ -415,10 +461,11 @@ string producerConsumerBench(Q&& queue, long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw - (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw); uint64_t failures = failed; + size_t allocated = q.allocatedCapacity(); return folly::sformat( "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} " - "handoff, {} failures", + "handoff, {} failures, {} allocated", qName, numProducers, writer.methodName(), @@ -426,134 +473,254 @@ string producerConsumerBench(Q&& queue, nanosPer, csw, n, - failures); + failures, + allocated); } -TEST(MPMCQueue, mt_prod_cons_deterministic) { +template +void runMtProdConsDeterministic(long seed) { // we use the Bench method, but perf results are meaningless under DSched - DSched sched(DSched::uniform(0)); + DSched sched(DSched::uniform(seed)); + + vector>>> callers; + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>(milliseconds(1))); + callers.emplace_back(make_unique>>(seconds(2))); + size_t cap; - vector>>> - callers; - callers.emplace_back( - make_unique>>()); - callers.emplace_back( - make_unique>>()); - callers.emplace_back( - make_unique>>()); - callers.emplace_back( - make_unique>>( - milliseconds(1))); - callers.emplace_back( - make_unique>>( - seconds(2))); + for (const auto& caller : callers) { + cap = 10; + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap), + "MPMCQueue(" + + folly::to(cap)+")", + 1, + 1, + 1000, + *caller); + cap = 100; + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap), + "MPMCQueue(" + + folly::to(cap)+")", + 10, + 10, + 1000, + *caller); + cap = 10; + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap), + "MPMCQueue(" + + folly::to(cap)+")", + 1, + 1, + 1000, + *caller); + cap = 100; + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap), + "MPMCQueue(" + + folly::to(cap)+")", + 10, + 10, + 1000, + *caller); + cap = 1; + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap), + "MPMCQueue(" + + folly::to(cap)+")", + 10, + 10, + 1000, + *caller); + } +} + +void runMtProdConsDeterministicDynamic( + long seed, + uint32_t prods, + uint32_t cons, + uint32_t numOps, + size_t cap, + size_t minCap, + size_t mult +) { + // we use the Bench method, but perf results are meaningless under DSched + DSched sched(DSched::uniform(seed)); + + vector>>> callers; + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>(milliseconds(1))); + callers.emplace_back(make_unique>>(seconds(2))); for (const auto& caller : callers) { - LOG(INFO) - << producerConsumerBench(MPMCQueue(10), - "MPMCQueue(10)", - 1, - 1, - 1000, - *caller); - LOG(INFO) - << producerConsumerBench(MPMCQueue(100), - "MPMCQueue(100)", - 10, - 10, - 1000, - *caller); - LOG(INFO) - << producerConsumerBench(MPMCQueue(10), - "MPMCQueue(10)", - 1, - 1, - 1000, - *caller); - LOG(INFO) - << producerConsumerBench(MPMCQueue(100), - "MPMCQueue(100)", - 10, - 10, - 1000, - *caller); - LOG(INFO) << producerConsumerBench(MPMCQueue(1), - "MPMCQueue(1)", - 10, - 10, - 1000, - *caller); + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap, minCap, mult), + "MPMCQueue(" + + folly::to(cap) + ", " + + folly::to(minCap) + ", " + + folly::to(mult)+")", + prods, + cons, + numOps, + *caller); } } +TEST(MPMCQueue, mt_prod_cons_deterministic) { + runMtProdConsDeterministic(0); +} + +TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) { + runMtProdConsDeterministic(0); +} + +template +void setFromEnv(T& var, const char* envvar) { + char* str = std::getenv(envvar); + if (str) { var = atoi(str); } +} + +TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) { + long seed = 0; + uint32_t prods = 10; + uint32_t cons = 10; + uint32_t numOps = 1000; + size_t cap = 10000; + size_t minCap = 9; + size_t mult = 3; + setFromEnv(seed, "SEED"); + setFromEnv(prods, "PRODS"); + setFromEnv(cons, "CONS"); + setFromEnv(numOps, "NUM_OPS"); + setFromEnv(cap, "CAP"); + setFromEnv(minCap, "MIN_CAP"); + setFromEnv(mult, "MULT"); + runMtProdConsDeterministicDynamic( + seed, prods, cons, numOps, cap, minCap, mult); +} + #define PC_BENCH(q, np, nc, ...) \ producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__) -TEST(MPMCQueue, mt_prod_cons) { +template +void runMtProdCons() { int n = 100000; - vector>>> callers; - callers.emplace_back(make_unique>>()); - callers.emplace_back(make_unique>>()); - callers.emplace_back(make_unique>>()); - callers.emplace_back( - make_unique>>(milliseconds(1))); - callers.emplace_back( - make_unique>>(seconds(2))); + setFromEnv(n, "NUM_OPS"); + vector>>> + callers; + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>(milliseconds(1))); + callers.emplace_back(make_unique>>(seconds(2))); for (const auto& caller : callers) { - LOG(INFO) << PC_BENCH(MPMCQueue(10), 1, 1, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(10), 10, 1, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(10), 1, 10, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(10), 10, 10, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 1, 1, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 10, 1, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 1, 10, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 10, 10, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(100000), 32, 100, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10)), + 1, 1, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10)), + 10, 1, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10)), + 1, 10, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10)), + 10, 10, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10000)), + 1, 1, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10000)), + 10, 1, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10000)), + 1, 10, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10000)), + 10, 10, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(100000)), + 32, 100, n, *caller); } } -TEST(MPMCQueue, mt_prod_cons_emulated_futex) { +TEST(MPMCQueue, mt_prod_cons) { + runMtProdCons(); +} + +TEST(MPMCQueue, mt_prod_cons_dynamic) { + runMtProdCons(); +} + +template +void runMtProdConsEmulatedFutex() { int n = 100000; - vector>>> - callers; - callers.emplace_back( - make_unique>>()); - callers.emplace_back( - make_unique>>()); - callers.emplace_back( - make_unique>>()); - callers.emplace_back( - make_unique>>( - milliseconds(1))); - callers.emplace_back( - make_unique>>( - seconds(2))); + vector>>> callers; + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>(milliseconds(1))); + callers.emplace_back(make_unique>>(seconds(2))); for (const auto& caller : callers) { LOG(INFO) << PC_BENCH( - (MPMCQueue(10)), 1, 1, n, *caller); - LOG(INFO) << PC_BENCH( - (MPMCQueue(10)), 10, 1, n, *caller); + (MPMCQueue(10)), 1, 1, n, *caller); LOG(INFO) << PC_BENCH( - (MPMCQueue(10)), 1, 10, n, *caller); + (MPMCQueue(10)), 10, 1, n, *caller); LOG(INFO) << PC_BENCH( - (MPMCQueue(10)), 10, 10, n, *caller); + (MPMCQueue(10)), 1, 10, n, *caller); LOG(INFO) << PC_BENCH( - (MPMCQueue(10000)), 1, 1, n, *caller); + (MPMCQueue(10)), 10, 10, n, *caller); LOG(INFO) << PC_BENCH( - (MPMCQueue(10000)), 10, 1, n, *caller); + (MPMCQueue(10000)), 1, 1, n, *caller); LOG(INFO) << PC_BENCH( - (MPMCQueue(10000)), 1, 10, n, *caller); + (MPMCQueue(10000)), 10, 1, n, *caller); LOG(INFO) << PC_BENCH( - (MPMCQueue(10000)), 10, 10, n, *caller); - LOG(INFO) << PC_BENCH( - (MPMCQueue(100000)), 32, 100, n, *caller); + (MPMCQueue(10000)), 1, 10, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue + (10000)), 10, 10, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue + (100000)), 32, 100, n, *caller); } } -template