/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
*/
-#include <folly/MPMCQueue.h>
#include <folly/Format.h>
+#include <folly/MPMCQueue.h>
#include <folly/Memory.h>
+#include <folly/portability/GTest.h>
#include <folly/portability/SysResource.h>
#include <folly/portability/SysTime.h>
#include <folly/portability/Unistd.h>
+#include <folly/stop_watch.h>
#include <folly/test/DeterministicSchedule.h>
#include <boost/intrusive_ptr.hpp>
-#include <memory>
+#include <boost/thread/barrier.hpp>
#include <functional>
+#include <memory>
#include <thread>
#include <utility>
-#include <gtest/gtest.h>
-
FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
using namespace folly;
run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
}
-template <typename T>
+template <bool Dynamic = false, typename T>
void runElementTypeTest(T&& src) {
- MPMCQueue<T> cq(10);
+ MPMCQueue<T, std::atomic, Dynamic> cq(10);
cq.blockingWrite(std::forward<T>(src));
T dest;
cq.blockingRead(dest);
runElementTypeTest(std::make_pair(10, string("def")));
runElementTypeTest(vector<string>{{"abc"}});
runElementTypeTest(std::make_shared<char>('a'));
- runElementTypeTest(folly::make_unique<char>('a'));
+ runElementTypeTest(std::make_unique<char>('a'));
runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
EXPECT_EQ(RefCounted::active_instances, 0);
}
+TEST(MPMCQueue, lots_of_element_types_dynamic) {
+ runElementTypeTest<true>(10);
+ runElementTypeTest<true>(string("abc"));
+ runElementTypeTest<true>(std::make_pair(10, string("def")));
+ runElementTypeTest<true>(vector<string>{{"abc"}});
+ runElementTypeTest<true>(std::make_shared<char>('a'));
+ runElementTypeTest<true>(std::make_unique<char>('a'));
+ runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
+ EXPECT_EQ(RefCounted::active_instances, 0);
+}
+
TEST(MPMCQueue, single_thread_enqdeq) {
+ // Non-dynamic version only.
+ // False positive for dynamic version. Capacity can be temporarily
+ // higher than specified.
MPMCQueue<int> cq(10);
for (int pass = 0; pass < 10; ++pass) {
}
TEST(MPMCQueue, tryenq_capacity_test) {
+ // Non-dynamic version only.
+ // False positive for dynamic version. Capacity can be temporarily
+ // higher than specified.
for (size_t cap = 1; cap < 100; ++cap) {
MPMCQueue<int> cq(cap);
for (size_t i = 0; i < cap; ++i) {
}
TEST(MPMCQueue, enq_capacity_test) {
+ // Non-dynamic version only.
+ // False positive for dynamic version. Capacity can be temporarily
+ // higher than specified.
for (auto cap : { 1, 100, 10000 }) {
MPMCQueue<int> cq(cap);
for (int i = 0; i < cap; ++i) {
}
}
-template <template<typename> class Atom>
+template <template<typename> class Atom, bool Dynamic = false>
void runTryEnqDeqThread(
int numThreads,
int n, /*numOps*/
- MPMCQueue<int, Atom>& cq,
+ MPMCQueue<int, Atom, Dynamic>& cq,
std::atomic<uint64_t>& sum,
int t) {
uint64_t threadSum = 0;
sum += threadSum;
}
-template <template<typename> class Atom>
+template <template<typename> class Atom, bool Dynamic = false>
void runTryEnqDeqTest(int numThreads, int numOps) {
// write and read aren't linearizable, so we don't have
// hard guarantees on their individual behavior. We can still test
// correctness in aggregate
- MPMCQueue<int,Atom> cq(numThreads);
+ MPMCQueue<int,Atom, Dynamic> cq(numThreads);
uint64_t n = numOps;
vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
- threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
+ threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
numThreads, n, std::ref(cq), std::ref(sum), t));
}
for (auto& t : threads) {
}
}
+TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
+ int nts[] = { 1, 3, 100 };
+
+ int n = 100000;
+ for (int nt : nts) {
+ runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
+ }
+}
+
TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
int nts[] = { 1, 3, 100 };
}
}
+TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
+ int nts[] = { 1, 3, 100 };
+
+ int n = 100000;
+ for (int nt : nts) {
+ runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
+ }
+}
+
TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
int nts[] = { 3, 10 };
DSched sched(DSched::uniformSubset(seed, 2));
runTryEnqDeqTest<DeterministicAtomic>(nt, n);
}
+ {
+ DSched sched(DSched::uniform(seed));
+ runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
+ }
+ {
+ DSched sched(DSched::uniformSubset(seed, 2));
+ runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
+ }
}
}
long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
(beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
uint64_t failures = failed;
+ size_t allocated = q.allocatedCapacity();
return folly::sformat(
"{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
- "handoff, {} failures",
+ "handoff, {} failures, {} allocated",
qName,
numProducers,
writer.methodName(),
nanosPer,
csw,
n,
- failures);
+ failures,
+ allocated);
}
-TEST(MPMCQueue, mt_prod_cons_deterministic) {
+template <bool Dynamic = false>
+void runMtProdConsDeterministic(long seed) {
// we use the Bench method, but perf results are meaningless under DSched
- DSched sched(DSched::uniform(0));
+ DSched sched(DSched::uniform(seed));
+
+ vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
+ Dynamic>>>> callers;
+ callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+ DeterministicAtomic, Dynamic>>>());
+ callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+ DeterministicAtomic, Dynamic>>>());
+ callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
+ DeterministicAtomic, Dynamic>>>());
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ DeterministicAtomic, Dynamic>>>(milliseconds(1)));
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ DeterministicAtomic, Dynamic>>>(seconds(2)));
+ size_t cap;
+
+ for (const auto& caller : callers) {
+ cap = 10;
+ LOG(INFO) <<
+ producerConsumerBench(
+ MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+ "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+ + folly::to<std::string>(cap)+")",
+ 1,
+ 1,
+ 1000,
+ *caller);
+ cap = 100;
+ LOG(INFO) <<
+ producerConsumerBench(
+ MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+ "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+ + folly::to<std::string>(cap)+")",
+ 10,
+ 10,
+ 1000,
+ *caller);
+ cap = 10;
+ LOG(INFO) <<
+ producerConsumerBench(
+ MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+ "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+ + folly::to<std::string>(cap)+")",
+ 1,
+ 1,
+ 1000,
+ *caller);
+ cap = 100;
+ LOG(INFO) <<
+ producerConsumerBench(
+ MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+ "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+ + folly::to<std::string>(cap)+")",
+ 10,
+ 10,
+ 1000,
+ *caller);
+ cap = 1;
+ LOG(INFO) <<
+ producerConsumerBench(
+ MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+ "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+ + folly::to<std::string>(cap)+")",
+ 10,
+ 10,
+ 1000,
+ *caller);
+ }
+}
- vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
- callers;
- callers.emplace_back(
- make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
- callers.emplace_back(
- make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
- callers.emplace_back(
- make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
- callers.emplace_back(
- make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
- milliseconds(1)));
- callers.emplace_back(
- make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
- seconds(2)));
+void runMtProdConsDeterministicDynamic(
+ long seed,
+ uint32_t prods,
+ uint32_t cons,
+ uint32_t numOps,
+ size_t cap,
+ size_t minCap,
+ size_t mult
+) {
+ // we use the Bench method, but perf results are meaningless under DSched
+ DSched sched(DSched::uniform(seed));
+
+ vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
+ true>>>> callers;
+ callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+ DeterministicAtomic, true>>>());
+ callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+ DeterministicAtomic, true>>>());
+ callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
+ DeterministicAtomic, true>>>());
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ DeterministicAtomic, true>>>(milliseconds(1)));
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ DeterministicAtomic, true>>>(seconds(2)));
for (const auto& caller : callers) {
- LOG(INFO)
- << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
- "MPMCQueue<int, DeterministicAtomic>(10)",
- 1,
- 1,
- 1000,
- *caller);
- LOG(INFO)
- << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
- "MPMCQueue<int, DeterministicAtomic>(100)",
- 10,
- 10,
- 1000,
- *caller);
- LOG(INFO)
- << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
- "MPMCQueue<int, DeterministicAtomic>(10)",
- 1,
- 1,
- 1000,
- *caller);
- LOG(INFO)
- << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
- "MPMCQueue<int, DeterministicAtomic>(100)",
- 10,
- 10,
- 1000,
- *caller);
- LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
- "MPMCQueue<int, DeterministicAtomic>(1)",
- 10,
- 10,
- 1000,
- *caller);
+ LOG(INFO) <<
+ producerConsumerBench(
+ MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
+ "MPMCQueue<int, DeterministicAtomic, true>("
+ + folly::to<std::string>(cap) + ", "
+ + folly::to<std::string>(minCap) + ", "
+ + folly::to<std::string>(mult)+")",
+ prods,
+ cons,
+ numOps,
+ *caller);
}
}
+TEST(MPMCQueue, mt_prod_cons_deterministic) {
+ runMtProdConsDeterministic(0);
+}
+
+TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
+ runMtProdConsDeterministic<true>(0);
+}
+
+template <typename T>
+void setFromEnv(T& var, const char* envvar) {
+ char* str = std::getenv(envvar);
+ if (str) { var = atoi(str); }
+}
+
+TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
+ long seed = 0;
+ uint32_t prods = 10;
+ uint32_t cons = 10;
+ uint32_t numOps = 1000;
+ size_t cap = 10000;
+ size_t minCap = 9;
+ size_t mult = 3;
+ setFromEnv(seed, "SEED");
+ setFromEnv(prods, "PRODS");
+ setFromEnv(cons, "CONS");
+ setFromEnv(numOps, "NUM_OPS");
+ setFromEnv(cap, "CAP");
+ setFromEnv(minCap, "MIN_CAP");
+ setFromEnv(mult, "MULT");
+ runMtProdConsDeterministicDynamic(
+ seed, prods, cons, numOps, cap, minCap, mult);
+}
+
#define PC_BENCH(q, np, nc, ...) \
producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
-TEST(MPMCQueue, mt_prod_cons) {
+template <bool Dynamic = false>
+void runMtProdCons() {
int n = 100000;
- vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
- callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
- callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
- callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
- callers.emplace_back(
- make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
- callers.emplace_back(
- make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
+ setFromEnv(n, "NUM_OPS");
+ vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, std::atomic, Dynamic>>>>
+ callers;
+ callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+ std::atomic, Dynamic>>>());
+ callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+ std::atomic, Dynamic>>>());
+ callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int, std::atomic,
+ Dynamic>>>());
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ std::atomic, Dynamic>>>(milliseconds(1)));
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ std::atomic, Dynamic>>>(seconds(2)));
for (const auto& caller : callers) {
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+ 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+ 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+ 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+ 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+ 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+ 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+ 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+ 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(100000)),
+ 32, 100, n, *caller);
}
}
-TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
+TEST(MPMCQueue, mt_prod_cons) {
+ runMtProdCons();
+}
+
+TEST(MPMCQueue, mt_prod_cons_dynamic) {
+ runMtProdCons</* Dynamic = */ true>();
+}
+
+template <bool Dynamic = false>
+void runMtProdConsEmulatedFutex() {
int n = 100000;
- vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
- callers;
- callers.emplace_back(
- make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
- callers.emplace_back(
- make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
- callers.emplace_back(
- make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
- callers.emplace_back(
- make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
- milliseconds(1)));
- callers.emplace_back(
- make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
- seconds(2)));
+ vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic,
+ Dynamic>>>> callers;
+ callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+ EmulatedFutexAtomic, Dynamic>>>());
+ callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+ EmulatedFutexAtomic, Dynamic>>>());
+ callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
+ EmulatedFutexAtomic, Dynamic>>>());
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ EmulatedFutexAtomic, Dynamic>>>(milliseconds(1)));
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ EmulatedFutexAtomic, Dynamic>>>(seconds(2)));
for (const auto& caller : callers) {
LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
- LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
+ (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 1, n, *caller);
LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
+ (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 1, n, *caller);
LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
+ (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 10, n, *caller);
LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
+ (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 10, n, *caller);
LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
+ (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 1, n, *caller);
LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
+ (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 10, 1, n, *caller);
LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
- LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
+ (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
+ (10000)), 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
+ (100000)), 32, 100, n, *caller);
}
}
-template <template <typename> class Atom>
+TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
+ runMtProdConsEmulatedFutex();
+}
+
+TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
+ runMtProdConsEmulatedFutex</* Dynamic = */ true>();
+}
+
+template <template <typename> class Atom, bool Dynamic = false>
void runNeverFailThread(int numThreads,
int n, /*numOps*/
- MPMCQueue<int, Atom>& cq,
+ MPMCQueue<int, Atom, Dynamic>& cq,
std::atomic<uint64_t>& sum,
int t) {
uint64_t threadSum = 0;
sum += threadSum;
}
-template <template <typename> class Atom>
+template <template <typename> class Atom, bool Dynamic = false>
uint64_t runNeverFailTest(int numThreads, int numOps) {
// always #enq >= #deq
- MPMCQueue<int, Atom> cq(numThreads);
+ MPMCQueue<int, Atom, Dynamic> cq(numThreads);
uint64_t n = numOps;
auto beginMicro = nowMicro();
vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
- threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
+ threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
numThreads,
n,
std::ref(cq),
return nowMicro() - beginMicro;
}
-TEST(MPMCQueue, mt_never_fail) {
- int nts[] = {1, 3, 100};
-
- int n = 100000;
+template <template<typename> class Atom, bool Dynamic = false>
+void runMtNeverFail(std::vector<int>& nts, int n) {
for (int nt : nts) {
- uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
+ uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
<< " threads";
}
}
-TEST(MPMCQueue, mt_never_fail_emulated_futex) {
- int nts[] = {1, 3, 100};
+// All the never_fail tests are for the non-dynamic version only.
+// False positive for dynamic version. Some writeIfNotFull() and
+// tryWriteUntil() operations may fail in transient conditions related
+// to expansion.
+TEST(MPMCQueue, mt_never_fail) {
+ std::vector<int> nts {1, 3, 100};
int n = 100000;
- for (int nt : nts) {
- uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
- LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
- << " threads";
- }
+ runMtNeverFail<std::atomic>(nts, n);
}
-TEST(MPMCQueue, mt_never_fail_deterministic) {
- int nts[] = {3, 10};
+TEST(MPMCQueue, mt_never_fail_emulated_futex) {
+ std::vector<int> nts {1, 3, 100};
+ int n = 100000;
+ runMtNeverFail<EmulatedFutexAtomic>(nts, n);
+}
- long seed = 0; // nowMicro() % 10000;
+template<bool Dynamic = false>
+void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
LOG(INFO) << "using seed " << seed;
-
- int n = 1000;
for (int nt : nts) {
{
DSched sched(DSched::uniform(seed));
- runNeverFailTest<DeterministicAtomic>(nt, n);
+ runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
}
{
DSched sched(DSched::uniformSubset(seed, 2));
- runNeverFailTest<DeterministicAtomic>(nt, n);
+ runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
}
}
}
-template <class Clock, template <typename> class Atom>
+TEST(MPMCQueue, mt_never_fail_deterministic) {
+ std::vector<int> nts {3, 10};
+ long seed = 0; // nowMicro() % 10000;
+ int n = 1000;
+ runMtNeverFailDeterministic(nts, n, seed);
+}
+
+template <class Clock, template <typename> class Atom, bool Dynamic>
void runNeverFailUntilThread(int numThreads,
int n, /*numOps*/
- MPMCQueue<int, Atom>& cq,
+ MPMCQueue<int, Atom, Dynamic>& cq,
std::atomic<uint64_t>& sum,
int t) {
uint64_t threadSum = 0;
sum += threadSum;
}
-template <class Clock, template <typename> class Atom>
+template <class Clock, template <typename> class Atom, bool Dynamic = false>
uint64_t runNeverFailTest(int numThreads, int numOps) {
// always #enq >= #deq
- MPMCQueue<int, Atom> cq(numThreads);
+ MPMCQueue<int, Atom, Dynamic> cq(numThreads);
uint64_t n = numOps;
auto beginMicro = nowMicro();
vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
- threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
- numThreads,
- n,
- std::ref(cq),
- std::ref(sum),
- t));
+ threads[t] = DSched::thread(std::bind(
+ runNeverFailUntilThread<Clock, Atom, Dynamic>,
+ numThreads,
+ n,
+ std::ref(cq),
+ std::ref(sum),
+ t));
}
for (auto& t : threads) {
DSched::join(t);
return nowMicro() - beginMicro;
}
-TEST(MPMCQueue, mt_never_fail_until_system) {
- int nts[] = {1, 3, 100};
-
- int n = 100000;
+template <bool Dynamic = false>
+void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
for (int nt : nts) {
uint64_t elapsed =
- runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
+ runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
<< " threads";
}
}
-TEST(MPMCQueue, mt_never_fail_until_steady) {
- int nts[] = {1, 3, 100};
-
+TEST(MPMCQueue, mt_never_fail_until_system) {
+ std::vector<int> nts {1, 3, 100};
int n = 100000;
+ runMtNeverFailUntilSystem(nts, n);
+}
+
+template <bool Dynamic = false>
+void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
for (int nt : nts) {
uint64_t elapsed =
- runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
+ runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
<< " threads";
}
}
+TEST(MPMCQueue, mt_never_fail_until_steady) {
+ std::vector<int> nts {1, 3, 100};
+ int n = 100000;
+ runMtNeverFailUntilSteady(nts, n);
+}
+
enum LifecycleEvent {
NOTHING = -1,
DEFAULT_CONSTRUCTOR,
EXPECT_EQ(lc_outstanding(), 0);
{
- MPMCQueue<Lifecycle<R>> queue(50);
+ // Non-dynamic only. False positive for dynamic.
+ MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
LIFECYCLE_STEP(NOTHING);
for (int pass = 0; pass < 10; ++pass) {
runPerfectForwardingTest<std::true_type>();
}
-TEST(MPMCQueue, queue_moving) {
+template <bool Dynamic = false>
+void run_queue_moving() {
lc_snap();
EXPECT_EQ(lc_outstanding(), 0);
{
- MPMCQueue<Lifecycle<std::false_type>> a(50);
+ MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
LIFECYCLE_STEP(NOTHING);
a.blockingWrite();
LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
// move constructor
- MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
+ MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
+ = std::move(a);
LIFECYCLE_STEP(NOTHING);
EXPECT_EQ(a.capacity(), 0);
EXPECT_EQ(a.size(), 0);
LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
// move operator
- MPMCQueue<Lifecycle<std::false_type>> c;
+ MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
LIFECYCLE_STEP(NOTHING);
c = std::move(b);
LIFECYCLE_STEP(NOTHING);
{
// swap
- MPMCQueue<Lifecycle<std::false_type>> d(10);
+ MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
LIFECYCLE_STEP(NOTHING);
std::swap(c, d);
LIFECYCLE_STEP(NOTHING);
LIFECYCLE_STEP(DESTRUCTOR);
}
+TEST(MPMCQueue, queue_moving) {
+ run_queue_moving();
+}
+
+TEST(MPMCQueue, queue_moving_dynamic) {
+ run_queue_moving<true>();
+}
+
TEST(MPMCQueue, explicit_zero_capacity_fail) {
ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
+
+ using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
+ ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
+}
+
+template <bool Dynamic>
+void testTryReadUntil() {
+ MPMCQueue<int, std::atomic, Dynamic> q{1};
+
+ const auto wait = std::chrono::milliseconds(100);
+ stop_watch<> watch;
+ bool rets[2];
+ int vals[2];
+ std::vector<std::thread> threads;
+ boost::barrier b{3};
+ for (int i = 0; i < 2; i++) {
+ threads.emplace_back([&, i] {
+ b.wait();
+ rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
+ });
+ }
+
+ b.wait();
+ EXPECT_TRUE(q.write(42));
+
+ for (int i = 0; i < 2; i++) {
+ threads[i].join();
+ }
+
+ for (int i = 0; i < 2; i++) {
+ int other = (i + 1) % 2;
+ if (rets[i]) {
+ EXPECT_EQ(42, vals[i]);
+ EXPECT_FALSE(rets[other]);
+ }
+ }
+
+ EXPECT_TRUE(watch.elapsed(wait));
+}
+
+template <bool Dynamic>
+void testTryWriteUntil() {
+ MPMCQueue<int, std::atomic, Dynamic> q{1};
+ EXPECT_TRUE(q.write(42));
+
+ const auto wait = std::chrono::milliseconds(100);
+ stop_watch<> watch;
+ bool rets[2];
+ std::vector<std::thread> threads;
+ boost::barrier b{3};
+ for (int i = 0; i < 2; i++) {
+ threads.emplace_back([&, i] {
+ b.wait();
+ rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
+ });
+ }
+
+ b.wait();
+ int x;
+ EXPECT_TRUE(q.read(x));
+ EXPECT_EQ(42, x);
+
+ for (int i = 0; i < 2; i++) {
+ threads[i].join();
+ }
+ EXPECT_TRUE(q.read(x));
+
+ for (int i = 0; i < 2; i++) {
+ int other = (i + 1) % 2;
+ if (rets[i]) {
+ EXPECT_EQ(i, x);
+ EXPECT_FALSE(rets[other]);
+ }
+ }
+
+ EXPECT_TRUE(watch.elapsed(wait));
+}
+
+TEST(MPMCQueue, try_read_until) {
+ testTryReadUntil<false>();
+}
+
+TEST(MPMCQueue, try_read_until_dynamic) {
+ testTryReadUntil<true>();
+}
+
+TEST(MPMCQueue, try_write_until) {
+ testTryWriteUntil<false>();
+}
+
+TEST(MPMCQueue, try_write_until_dynamic) {
+ testTryWriteUntil<true>();
+}
+
+template <bool Dynamic>
+void testTimeout(MPMCQueue<int, std::atomic, Dynamic>& q) {
+ CHECK(q.write(1));
+ /* The following must not block forever */
+ q.tryWriteUntil(
+ std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
+}
+
+TEST(MPMCQueue, try_write_until_timeout) {
+ folly::MPMCQueue<int, std::atomic, false> queue(1);
+ testTimeout<false>(queue);
+}
+
+TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
+ folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
+ testTimeout<true>(queue);
}