X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Ftest%2FMPMCQueueTest.cpp;h=f2467eda65b0205bc98e7d7df8e26e2a8628c606;hb=5bd3bab012ee5b5771f95a9ecfea47b6e75e0802;hp=81dce2633d8d8915e2ae5170cabc4688dfff0cd8;hpb=ec06f66c17f2469fc62221259c66d6bb417f7692;p=folly.git diff --git a/folly/test/MPMCQueueTest.cpp b/folly/test/MPMCQueueTest.cpp index 81dce263..f2467eda 100644 --- a/folly/test/MPMCQueueTest.cpp +++ b/folly/test/MPMCQueueTest.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2014 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,48 +14,66 @@ * limitations under the License. */ -#include #include +#include #include +#include +#include +#include +#include +#include #include #include -#include +#include #include +#include #include #include -#include -#include -#include - -#include -#include FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr); using namespace folly; using namespace detail; using namespace test; +using std::chrono::time_point; +using std::chrono::steady_clock; +using std::chrono::seconds; +using std::chrono::milliseconds; +using std::string; +using std::unique_ptr; +using std::vector; typedef DeterministicSchedule DSched; +template class Atom> +void run_mt_sequencer_thread( + int numThreads, + int numOps, + uint32_t init, + TurnSequencer& seq, + Atom& spinThreshold, + int& prev, + int i) { + for (int op = i; op < numOps; op += numThreads) { + seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0); + EXPECT_EQ(prev, op - 1); + prev = op; + seq.completeTurn(init + op); + } +} template class Atom> void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) { TurnSequencer seq(init); - Atom spinThreshold(0); + Atom spinThreshold(0); int prev = -1; - std::vector threads(numThreads); + vector threads(numThreads); for (int i = 0; i < numThreads; ++i) { - threads[i] = DSched::thread([&, i]{ - for (int op = i; op < numOps; op += numThreads) { - seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0); - EXPECT_EQ(prev, op - 1); - prev = op; - seq.completeTurn(init + op); - } - }); + threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread, + numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold), + std::ref(prev), i)); } for (auto& thr : threads) { @@ -71,6 +89,12 @@ TEST(MPMCQueue, sequencer) { run_mt_sequencer_test(100, 10000, -100); } +TEST(MPMCQueue, sequencer_emulated_futex) { + run_mt_sequencer_test(1, 100, 0); + run_mt_sequencer_test(2, 100000, -100); + run_mt_sequencer_test(100, 10000, -100); +} + TEST(MPMCQueue, sequencer_deterministic) { DSched sched(DSched::uniform(0)); run_mt_sequencer_test(1, 100, -50); @@ -78,43 +102,73 @@ TEST(MPMCQueue, sequencer_deterministic) { run_mt_sequencer_test(10, 1000, -100); } -template +template void runElementTypeTest(T&& src) { - MPMCQueue cq(10); - cq.blockingWrite(std::move(src)); + MPMCQueue cq(10); + cq.blockingWrite(std::forward(src)); T dest; cq.blockingRead(dest); EXPECT_TRUE(cq.write(std::move(dest))); EXPECT_TRUE(cq.read(dest)); + auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1); + EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest))); + EXPECT_TRUE(cq.read(dest)); + auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1); + EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest))); + EXPECT_TRUE(cq.read(dest)); } struct RefCounted { + static FOLLY_TLS int active_instances; + mutable std::atomic rc; - RefCounted() : rc(0) {} + RefCounted() : rc(0) { + ++active_instances; + } + + ~RefCounted() { + --active_instances; + } }; +FOLLY_TLS int RefCounted::active_instances; void intrusive_ptr_add_ref(RefCounted const* p) { p->rc++; } void intrusive_ptr_release(RefCounted const* p) { - if (--(p->rc)) { + if (--(p->rc) == 0) { delete p; } } TEST(MPMCQueue, lots_of_element_types) { runElementTypeTest(10); - runElementTypeTest(std::string("abc")); - runElementTypeTest(std::make_pair(10, std::string("def"))); - runElementTypeTest(std::vector{ { "abc" } }); + runElementTypeTest(string("abc")); + runElementTypeTest(std::make_pair(10, string("def"))); + runElementTypeTest(vector{{"abc"}}); runElementTypeTest(std::make_shared('a')); - runElementTypeTest(folly::make_unique('a')); + runElementTypeTest(std::make_unique('a')); runElementTypeTest(boost::intrusive_ptr(new RefCounted)); + EXPECT_EQ(RefCounted::active_instances, 0); +} + +TEST(MPMCQueue, lots_of_element_types_dynamic) { + runElementTypeTest(10); + runElementTypeTest(string("abc")); + runElementTypeTest(std::make_pair(10, string("def"))); + runElementTypeTest(vector{{"abc"}}); + runElementTypeTest(std::make_shared('a')); + runElementTypeTest(std::make_unique('a')); + runElementTypeTest(boost::intrusive_ptr(new RefCounted)); + EXPECT_EQ(RefCounted::active_instances, 0); } TEST(MPMCQueue, single_thread_enqdeq) { + // Non-dynamic version only. + // False positive for dynamic version. Capacity can be temporarily + // higher than specified. MPMCQueue cq(10); for (int pass = 0; pass < 10; ++pass) { @@ -145,9 +199,12 @@ TEST(MPMCQueue, single_thread_enqdeq) { } TEST(MPMCQueue, tryenq_capacity_test) { + // Non-dynamic version only. + // False positive for dynamic version. Capacity can be temporarily + // higher than specified. for (size_t cap = 1; cap < 100; ++cap) { MPMCQueue cq(cap); - for (int i = 0; i < cap; ++i) { + for (size_t i = 0; i < cap; ++i) { EXPECT_TRUE(cq.write(i)); } EXPECT_FALSE(cq.write(100)); @@ -155,6 +212,9 @@ TEST(MPMCQueue, tryenq_capacity_test) { } TEST(MPMCQueue, enq_capacity_test) { + // Non-dynamic version only. + // False positive for dynamic version. Capacity can be temporarily + // higher than specified. for (auto cap : { 1, 100, 10000 }) { MPMCQueue cq(cap); for (int i = 0; i < cap; ++i) { @@ -175,37 +235,46 @@ TEST(MPMCQueue, enq_capacity_test) { } } -template class Atom> +template class Atom, bool Dynamic = false> +void runTryEnqDeqThread( + int numThreads, + int n, /*numOps*/ + MPMCQueue& cq, + std::atomic& sum, + int t) { + uint64_t threadSum = 0; + int src = t; + // received doesn't reflect any actual values, we just start with + // t and increment by numThreads to get the rounding of termination + // correct if numThreads doesn't evenly divide numOps + int received = t; + while (src < n || received < n) { + if (src < n && cq.write(src)) { + src += numThreads; + } + + int dst; + if (received < n && cq.read(dst)) { + received += numThreads; + threadSum += dst; + } + } + sum += threadSum; +} + +template class Atom, bool Dynamic = false> void runTryEnqDeqTest(int numThreads, int numOps) { // write and read aren't linearizable, so we don't have // hard guarantees on their individual behavior. We can still test // correctness in aggregate - MPMCQueue cq(numThreads); + MPMCQueue cq(numThreads); uint64_t n = numOps; - std::vector threads(numThreads); + vector threads(numThreads); std::atomic sum(0); for (int t = 0; t < numThreads; ++t) { - threads[t] = DSched::thread([&,t]{ - uint64_t threadSum = 0; - int src = t; - // received doesn't reflect any actual values, we just start with - // t and increment by numThreads to get the rounding of termination - // correct if numThreads doesn't evenly divide numOps - int received = t; - while (src < n || received < n) { - if (src < n && cq.write(src)) { - src += numThreads; - } - - int dst; - if (received < n && cq.read(dst)) { - received += numThreads; - threadSum += dst; - } - } - sum += threadSum; - }); + threads[t] = DSched::thread(std::bind(runTryEnqDeqThread, + numThreads, n, std::ref(cq), std::ref(sum), t)); } for (auto& t : threads) { DSched::join(t); @@ -223,6 +292,33 @@ TEST(MPMCQueue, mt_try_enq_deq) { } } +TEST(MPMCQueue, mt_try_enq_deq_dynamic) { + int nts[] = { 1, 3, 100 }; + + int n = 100000; + for (int nt : nts) { + runTryEnqDeqTest(nt, n); + } +} + +TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) { + int nts[] = { 1, 3, 100 }; + + int n = 100000; + for (int nt : nts) { + runTryEnqDeqTest(nt, n); + } +} + +TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) { + int nts[] = { 1, 3, 100 }; + + int n = 100000; + for (int nt : nts) { + runTryEnqDeqTest(nt, n); + } +} + TEST(MPMCQueue, mt_try_enq_deq_deterministic) { int nts[] = { 3, 10 }; @@ -239,6 +335,14 @@ TEST(MPMCQueue, mt_try_enq_deq_deterministic) { DSched sched(DSched::uniformSubset(seed, 2)); runTryEnqDeqTest(nt, n); } + { + DSched sched(DSched::uniform(seed)); + runTryEnqDeqTest(nt, n); + } + { + DSched sched(DSched::uniformSubset(seed, 2)); + runTryEnqDeqTest(nt, n); + } } } @@ -249,9 +353,59 @@ uint64_t nowMicro() { } template -std::string producerConsumerBench(Q&& queue, std::string qName, - int numProducers, int numConsumers, - int numOps, bool ignoreContents = false) { +struct WriteMethodCaller { + WriteMethodCaller() {} + virtual ~WriteMethodCaller() = default; + virtual bool callWrite(Q& q, int i) = 0; + virtual string methodName() = 0; +}; + +template +struct BlockingWriteCaller : public WriteMethodCaller { + bool callWrite(Q& q, int i) override { + q.blockingWrite(i); + return true; + } + string methodName() override { return "blockingWrite"; } +}; + +template +struct WriteIfNotFullCaller : public WriteMethodCaller { + bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); } + string methodName() override { return "writeIfNotFull"; } +}; + +template +struct WriteCaller : public WriteMethodCaller { + bool callWrite(Q& q, int i) override { return q.write(i); } + string methodName() override { return "write"; } +}; + +template +struct TryWriteUntilCaller : public WriteMethodCaller { + const Duration duration_; + explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {} + bool callWrite(Q& q, int i) override { + auto then = Clock::now() + duration_; + return q.tryWriteUntil(then, i); + } + string methodName() override { + return folly::sformat( + "tryWriteUntil({}ms)", + std::chrono::duration_cast(duration_).count()); + } +}; + +template +string producerConsumerBench(Q&& queue, + string qName, + int numProducers, + int numConsumers, + int numOps, + WriteMethodCaller& writer, + bool ignoreContents = false) { Q& q = queue; struct rusage beginUsage; @@ -261,17 +415,20 @@ std::string producerConsumerBench(Q&& queue, std::string qName, uint64_t n = numOps; std::atomic sum(0); + std::atomic failed(0); - std::vector producers(numProducers); + vector producers(numProducers); for (int t = 0; t < numProducers; ++t) { producers[t] = DSched::thread([&,t]{ for (int i = t; i < numOps; i += numProducers) { - q.blockingWrite(i); + while (!writer.callWrite(q, i)) { + ++failed; + } } }); } - std::vector consumers(numConsumers); + vector consumers(numConsumers); for (int t = 0; t < numConsumers; ++t) { consumers[t] = DSched::thread([&,t]{ uint64_t localSum = 0; @@ -303,69 +460,274 @@ std::string producerConsumerBench(Q&& queue, std::string qName, uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n; long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw - (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw); + uint64_t failures = failed; + size_t allocated = q.allocatedCapacity(); + + return folly::sformat( + "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} " + "handoff, {} failures, {} allocated", + qName, + numProducers, + writer.methodName(), + numConsumers, + nanosPer, + csw, + n, + failures, + allocated); +} - return folly::format( - "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff", - qName, numProducers, numConsumers, nanosPer, csw, n).str(); +template +void runMtProdConsDeterministic(long seed) { + // we use the Bench method, but perf results are meaningless under DSched + DSched sched(DSched::uniform(seed)); + + using QueueType = MPMCQueue; + + vector>> callers; + callers.emplace_back(std::make_unique>()); + callers.emplace_back(std::make_unique>()); + callers.emplace_back(std::make_unique>()); + callers.emplace_back( + std::make_unique>(milliseconds(1))); + callers.emplace_back( + std::make_unique>(seconds(2))); + size_t cap; + + for (const auto& caller : callers) { + cap = 10; + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap), + "MPMCQueue(" + + folly::to(cap)+")", + 1, + 1, + 1000, + *caller); + cap = 100; + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap), + "MPMCQueue(" + + folly::to(cap)+")", + 10, + 10, + 1000, + *caller); + cap = 10; + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap), + "MPMCQueue(" + + folly::to(cap)+")", + 1, + 1, + 1000, + *caller); + cap = 100; + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap), + "MPMCQueue(" + + folly::to(cap)+")", + 10, + 10, + 1000, + *caller); + cap = 1; + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap), + "MPMCQueue(" + + folly::to(cap)+")", + 10, + 10, + 1000, + *caller); + } } +void runMtProdConsDeterministicDynamic( + long seed, + uint32_t prods, + uint32_t cons, + uint32_t numOps, + size_t cap, + size_t minCap, + size_t mult +) { + // we use the Bench method, but perf results are meaningless under DSched + DSched sched(DSched::uniform(seed)); + + using QueueType = MPMCQueue; + + vector>> callers; + callers.emplace_back(std::make_unique>()); + callers.emplace_back(std::make_unique>()); + callers.emplace_back(std::make_unique>()); + callers.emplace_back( + std::make_unique>(milliseconds(1))); + callers.emplace_back( + std::make_unique>(seconds(2))); + + for (const auto& caller : callers) { + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap, minCap, mult), + "MPMCQueue(" + + folly::to(cap) + ", " + + folly::to(minCap) + ", " + + folly::to(mult)+")", + prods, + cons, + numOps, + *caller); + } +} TEST(MPMCQueue, mt_prod_cons_deterministic) { - // we use the Bench method, but perf results are meaningless under DSched - DSched sched(DSched::uniform(0)); + runMtProdConsDeterministic(0); +} + +TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) { + runMtProdConsDeterministic(0); +} - producerConsumerBench(MPMCQueue(10), - "", 1, 1, 1000); - producerConsumerBench(MPMCQueue(100), - "", 10, 10, 1000); - producerConsumerBench(MPMCQueue(10), - "", 1, 1, 1000); - producerConsumerBench(MPMCQueue(100), - "", 10, 10, 1000); - producerConsumerBench(MPMCQueue(1), - "", 10, 10, 1000); +template +void setFromEnv(T& var, const char* envvar) { + char* str = std::getenv(envvar); + if (str) { var = atoi(str); } +} + +TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) { + long seed = 0; + uint32_t prods = 10; + uint32_t cons = 10; + uint32_t numOps = 1000; + size_t cap = 10000; + size_t minCap = 9; + size_t mult = 3; + setFromEnv(seed, "SEED"); + setFromEnv(prods, "PRODS"); + setFromEnv(cons, "CONS"); + setFromEnv(numOps, "NUM_OPS"); + setFromEnv(cap, "CAP"); + setFromEnv(minCap, "MIN_CAP"); + setFromEnv(mult, "MULT"); + runMtProdConsDeterministicDynamic( + seed, prods, cons, numOps, cap, minCap, mult); } #define PC_BENCH(q, np, nc, ...) \ producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__) +template +void runMtProdCons() { + using QueueType = MPMCQueue; + + int n = 100000; + setFromEnv(n, "NUM_OPS"); + vector>> + callers; + callers.emplace_back(std::make_unique>()); + callers.emplace_back(std::make_unique>()); + callers.emplace_back(std::make_unique>()); + callers.emplace_back( + std::make_unique>(milliseconds(1))); + callers.emplace_back( + std::make_unique>(seconds(2))); + for (const auto& caller : callers) { + LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller); + } +} + TEST(MPMCQueue, mt_prod_cons) { + runMtProdCons(); +} + +TEST(MPMCQueue, mt_prod_cons_dynamic) { + runMtProdCons(); +} + +template +void runMtProdConsEmulatedFutex() { + using QueueType = MPMCQueue; + int n = 100000; - LOG(INFO) << PC_BENCH(MPMCQueue(10), 1, 1, n); - LOG(INFO) << PC_BENCH(MPMCQueue(10), 10, 1, n); - LOG(INFO) << PC_BENCH(MPMCQueue(10), 1, 10, n); - LOG(INFO) << PC_BENCH(MPMCQueue(10), 10, 10, n); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 1, 1, n); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 10, 1, n); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 1, 10, n); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 10, 10, n); - LOG(INFO) << PC_BENCH(MPMCQueue(100000), 32, 100, n); + vector>> callers; + callers.emplace_back(std::make_unique>()); + callers.emplace_back(std::make_unique>()); + callers.emplace_back(std::make_unique>()); + callers.emplace_back( + std::make_unique>(milliseconds(1))); + callers.emplace_back( + std::make_unique>(seconds(2))); + for (const auto& caller : callers) { + LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller); + LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller); + } } -template class Atom> +TEST(MPMCQueue, mt_prod_cons_emulated_futex) { + runMtProdConsEmulatedFutex(); +} + +TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) { + runMtProdConsEmulatedFutex(); +} + +template