/*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <folly/MPMCQueue.h>
#include <folly/Format.h>
#include <folly/Memory.h>
+#include <folly/portability/SysTime.h>
#include <folly/test/DeterministicSchedule.h>
#include <boost/intrusive_ptr.hpp>
#include <thread>
#include <utility>
#include <unistd.h>
-#include <sys/time.h>
#include <sys/resource.h>
-#include <gflags/gflags.h>
#include <gtest/gtest.h>
FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
using namespace folly;
using namespace detail;
using namespace test;
+using std::chrono::time_point;
+using std::chrono::steady_clock;
+using std::chrono::seconds;
+using std::chrono::milliseconds;
+using std::string;
+using std::unique_ptr;
+using std::vector;
typedef DeterministicSchedule DSched;
+template <template<typename> class Atom>
+void run_mt_sequencer_thread(
+ int numThreads,
+ int numOps,
+ uint32_t init,
+ TurnSequencer<Atom>& seq,
+ Atom<uint32_t>& spinThreshold,
+ int& prev,
+ int i) {
+ for (int op = i; op < numOps; op += numThreads) {
+ seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
+ EXPECT_EQ(prev, op - 1);
+ prev = op;
+ seq.completeTurn(init + op);
+ }
+}
template <template<typename> class Atom>
void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
TurnSequencer<Atom> seq(init);
- Atom<int> spinThreshold(0);
+ Atom<uint32_t> spinThreshold(0);
int prev = -1;
- std::vector<std::thread> threads(numThreads);
+ vector<std::thread> threads(numThreads);
for (int i = 0; i < numThreads; ++i) {
- threads[i] = DSched::thread([&, i]{
- for (int op = i; op < numOps; op += numThreads) {
- seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
- EXPECT_EQ(prev, op - 1);
- prev = op;
- seq.completeTurn(init + op);
- }
- });
+ threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
+ numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
+ std::ref(prev), i));
}
for (auto& thr : threads) {
run_mt_sequencer_test<std::atomic>(100, 10000, -100);
}
+TEST(MPMCQueue, sequencer_emulated_futex) {
+ run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
+ run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
+ run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
+}
+
TEST(MPMCQueue, sequencer_deterministic) {
DSched sched(DSched::uniform(0));
run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
template <typename T>
void runElementTypeTest(T&& src) {
MPMCQueue<T> cq(10);
- cq.blockingWrite(std::move(src));
+ cq.blockingWrite(std::forward<T>(src));
T dest;
cq.blockingRead(dest);
EXPECT_TRUE(cq.write(std::move(dest)));
EXPECT_TRUE(cq.read(dest));
+ auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
+ EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
+ EXPECT_TRUE(cq.read(dest));
+ auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
+ EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
+ EXPECT_TRUE(cq.read(dest));
}
struct RefCounted {
+ static __thread int active_instances;
+
mutable std::atomic<int> rc;
- RefCounted() : rc(0) {}
+ RefCounted() : rc(0) {
+ ++active_instances;
+ }
+
+ ~RefCounted() {
+ --active_instances;
+ }
};
+__thread int RefCounted::active_instances;
+
void intrusive_ptr_add_ref(RefCounted const* p) {
p->rc++;
}
void intrusive_ptr_release(RefCounted const* p) {
- if (--(p->rc)) {
+ if (--(p->rc) == 0) {
delete p;
}
}
TEST(MPMCQueue, lots_of_element_types) {
runElementTypeTest(10);
- runElementTypeTest(std::string("abc"));
- runElementTypeTest(std::make_pair(10, std::string("def")));
- runElementTypeTest(std::vector<std::string>{ { "abc" } });
+ runElementTypeTest(string("abc"));
+ runElementTypeTest(std::make_pair(10, string("def")));
+ runElementTypeTest(vector<string>{{"abc"}});
runElementTypeTest(std::make_shared<char>('a'));
runElementTypeTest(folly::make_unique<char>('a'));
runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
+ EXPECT_EQ(RefCounted::active_instances, 0);
}
TEST(MPMCQueue, single_thread_enqdeq) {
TEST(MPMCQueue, tryenq_capacity_test) {
for (size_t cap = 1; cap < 100; ++cap) {
MPMCQueue<int> cq(cap);
- for (int i = 0; i < cap; ++i) {
+ for (size_t i = 0; i < cap; ++i) {
EXPECT_TRUE(cq.write(i));
}
EXPECT_FALSE(cq.write(100));
}
}
+template <template<typename> class Atom>
+void runTryEnqDeqThread(
+ int numThreads,
+ int n, /*numOps*/
+ MPMCQueue<int, Atom>& cq,
+ std::atomic<uint64_t>& sum,
+ int t) {
+ uint64_t threadSum = 0;
+ int src = t;
+ // received doesn't reflect any actual values, we just start with
+ // t and increment by numThreads to get the rounding of termination
+ // correct if numThreads doesn't evenly divide numOps
+ int received = t;
+ while (src < n || received < n) {
+ if (src < n && cq.write(src)) {
+ src += numThreads;
+ }
+
+ int dst;
+ if (received < n && cq.read(dst)) {
+ received += numThreads;
+ threadSum += dst;
+ }
+ }
+ sum += threadSum;
+}
+
template <template<typename> class Atom>
void runTryEnqDeqTest(int numThreads, int numOps) {
// write and read aren't linearizable, so we don't have
MPMCQueue<int,Atom> cq(numThreads);
uint64_t n = numOps;
- std::vector<std::thread> threads(numThreads);
+ vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
- threads[t] = DSched::thread([&,t]{
- uint64_t threadSum = 0;
- int src = t;
- // received doesn't reflect any actual values, we just start with
- // t and increment by numThreads to get the rounding of termination
- // correct if numThreads doesn't evenly divide numOps
- int received = t;
- while (src < n || received < n) {
- if (src < n && cq.write(src)) {
- src += numThreads;
- }
-
- int dst;
- if (received < n && cq.read(dst)) {
- received += numThreads;
- threadSum += dst;
- }
- }
- sum += threadSum;
- });
+ threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
+ numThreads, n, std::ref(cq), std::ref(sum), t));
}
for (auto& t : threads) {
DSched::join(t);
}
}
+TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
+ int nts[] = { 1, 3, 100 };
+
+ int n = 100000;
+ for (int nt : nts) {
+ runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
+ }
+}
+
TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
int nts[] = { 3, 10 };
}
template <typename Q>
-std::string producerConsumerBench(Q&& queue, std::string qName,
- int numProducers, int numConsumers,
- int numOps, bool ignoreContents = false) {
+struct WriteMethodCaller {
+ WriteMethodCaller() {}
+ virtual ~WriteMethodCaller() = default;
+ virtual bool callWrite(Q& q, int i) = 0;
+ virtual string methodName() = 0;
+};
+
+template <typename Q>
+struct BlockingWriteCaller : public WriteMethodCaller<Q> {
+ bool callWrite(Q& q, int i) override {
+ q.blockingWrite(i);
+ return true;
+ }
+ string methodName() override { return "blockingWrite"; }
+};
+
+template <typename Q>
+struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
+ bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
+ string methodName() override { return "writeIfNotFull"; }
+};
+
+template <typename Q>
+struct WriteCaller : public WriteMethodCaller<Q> {
+ bool callWrite(Q& q, int i) override { return q.write(i); }
+ string methodName() override { return "write"; }
+};
+
+template <typename Q,
+ class Clock = steady_clock,
+ class Duration = typename Clock::duration>
+struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
+ const Duration duration_;
+ explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
+ bool callWrite(Q& q, int i) override {
+ auto then = Clock::now() + duration_;
+ return q.tryWriteUntil(then, i);
+ }
+ string methodName() override {
+ return folly::sformat(
+ "tryWriteUntil({}ms)",
+ std::chrono::duration_cast<milliseconds>(duration_).count());
+ }
+};
+
+template <typename Q>
+string producerConsumerBench(Q&& queue,
+ string qName,
+ int numProducers,
+ int numConsumers,
+ int numOps,
+ WriteMethodCaller<Q>& writer,
+ bool ignoreContents = false) {
Q& q = queue;
struct rusage beginUsage;
uint64_t n = numOps;
std::atomic<uint64_t> sum(0);
+ std::atomic<uint64_t> failed(0);
- std::vector<std::thread> producers(numProducers);
+ vector<std::thread> producers(numProducers);
for (int t = 0; t < numProducers; ++t) {
producers[t] = DSched::thread([&,t]{
for (int i = t; i < numOps; i += numProducers) {
- q.blockingWrite(i);
+ while (!writer.callWrite(q, i)) {
+ ++failed;
+ }
}
});
}
- std::vector<std::thread> consumers(numConsumers);
+ vector<std::thread> consumers(numConsumers);
for (int t = 0; t < numConsumers; ++t) {
consumers[t] = DSched::thread([&,t]{
uint64_t localSum = 0;
uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
(beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
-
- return folly::format(
- "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff",
- qName, numProducers, numConsumers, nanosPer, csw, n).str();
+ uint64_t failures = failed;
+
+ return folly::sformat(
+ "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
+ "handoff, {} failures",
+ qName,
+ numProducers,
+ writer.methodName(),
+ numConsumers,
+ nanosPer,
+ csw,
+ n,
+ failures);
}
-
TEST(MPMCQueue, mt_prod_cons_deterministic) {
// we use the Bench method, but perf results are meaningless under DSched
DSched sched(DSched::uniform(0));
- producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
- "", 1, 1, 1000);
- producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
- "", 10, 10, 1000);
- producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
- "", 1, 1, 1000);
- producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
- "", 10, 10, 1000);
- producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1),
- "", 10, 10, 1000);
+ vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
+ callers;
+ callers.emplace_back(
+ make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
+ callers.emplace_back(
+ make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
+ callers.emplace_back(
+ make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
+ callers.emplace_back(
+ make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
+ milliseconds(1)));
+ callers.emplace_back(
+ make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
+ seconds(2)));
+
+ for (const auto& caller : callers) {
+ LOG(INFO)
+ << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
+ "MPMCQueue<int, DeterministicAtomic>(10)",
+ 1,
+ 1,
+ 1000,
+ *caller);
+ LOG(INFO)
+ << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
+ "MPMCQueue<int, DeterministicAtomic>(100)",
+ 10,
+ 10,
+ 1000,
+ *caller);
+ LOG(INFO)
+ << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
+ "MPMCQueue<int, DeterministicAtomic>(10)",
+ 1,
+ 1,
+ 1000,
+ *caller);
+ LOG(INFO)
+ << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
+ "MPMCQueue<int, DeterministicAtomic>(100)",
+ 10,
+ 10,
+ 1000,
+ *caller);
+ LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
+ "MPMCQueue<int, DeterministicAtomic>(1)",
+ 10,
+ 10,
+ 1000,
+ *caller);
+ }
}
#define PC_BENCH(q, np, nc, ...) \
TEST(MPMCQueue, mt_prod_cons) {
int n = 100000;
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
+ vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
+ callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
+ callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
+ callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
+ callers.emplace_back(
+ make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
+ callers.emplace_back(
+ make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
+ for (const auto& caller : callers) {
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
+ }
}
-template <template<typename> class Atom>
+TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
+ int n = 100000;
+ vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
+ callers;
+ callers.emplace_back(
+ make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
+ callers.emplace_back(
+ make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
+ callers.emplace_back(
+ make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
+ callers.emplace_back(
+ make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
+ milliseconds(1)));
+ callers.emplace_back(
+ make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
+ seconds(2)));
+ for (const auto& caller : callers) {
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH(
+ (MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
+ }
+}
+
+template <template <typename> class Atom>
+void runNeverFailThread(int numThreads,
+ int n, /*numOps*/
+ MPMCQueue<int, Atom>& cq,
+ std::atomic<uint64_t>& sum,
+ int t) {
+ uint64_t threadSum = 0;
+ for (int i = t; i < n; i += numThreads) {
+ // enq + deq
+ EXPECT_TRUE(cq.writeIfNotFull(i));
+
+ int dest = -1;
+ EXPECT_TRUE(cq.readIfNotEmpty(dest));
+ EXPECT_TRUE(dest >= 0);
+ threadSum += dest;
+ }
+ sum += threadSum;
+}
+
+template <template <typename> class Atom>
uint64_t runNeverFailTest(int numThreads, int numOps) {
// always #enq >= #deq
- MPMCQueue<int,Atom> cq(numThreads);
+ MPMCQueue<int, Atom> cq(numThreads);
uint64_t n = numOps;
auto beginMicro = nowMicro();
- std::vector<std::thread> threads(numThreads);
+ vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
- threads[t] = DSched::thread([&,t]{
- uint64_t threadSum = 0;
- for (int i = t; i < n; i += numThreads) {
- // enq + deq
- EXPECT_TRUE(cq.writeIfNotFull(i));
-
- int dest = -1;
- EXPECT_TRUE(cq.readIfNotEmpty(dest));
- EXPECT_TRUE(dest >= 0);
- threadSum += dest;
- }
- sum += threadSum;
- });
+ threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
+ numThreads,
+ n,
+ std::ref(cq),
+ std::ref(sum),
+ t));
}
for (auto& t : threads) {
DSched::join(t);
}
TEST(MPMCQueue, mt_never_fail) {
- int nts[] = { 1, 3, 100 };
+ int nts[] = {1, 3, 100};
int n = 100000;
for (int nt : nts) {
uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
- LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
- << nt << " threads";
+ LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+ << " threads";
+ }
+}
+
+TEST(MPMCQueue, mt_never_fail_emulated_futex) {
+ int nts[] = {1, 3, 100};
+
+ int n = 100000;
+ for (int nt : nts) {
+ uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
+ LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+ << " threads";
}
}
TEST(MPMCQueue, mt_never_fail_deterministic) {
- int nts[] = { 3, 10 };
+ int nts[] = {3, 10};
long seed = 0; // nowMicro() % 10000;
LOG(INFO) << "using seed " << seed;
}
}
+template <class Clock, template <typename> class Atom>
+void runNeverFailUntilThread(int numThreads,
+ int n, /*numOps*/
+ MPMCQueue<int, Atom>& cq,
+ std::atomic<uint64_t>& sum,
+ int t) {
+ uint64_t threadSum = 0;
+ for (int i = t; i < n; i += numThreads) {
+ // enq + deq
+ auto soon = Clock::now() + std::chrono::seconds(1);
+ EXPECT_TRUE(cq.tryWriteUntil(soon, i));
+
+ int dest = -1;
+ EXPECT_TRUE(cq.readIfNotEmpty(dest));
+ EXPECT_TRUE(dest >= 0);
+ threadSum += dest;
+ }
+ sum += threadSum;
+}
+
+template <class Clock, template <typename> class Atom>
+uint64_t runNeverFailTest(int numThreads, int numOps) {
+ // always #enq >= #deq
+ MPMCQueue<int, Atom> cq(numThreads);
+
+ uint64_t n = numOps;
+ auto beginMicro = nowMicro();
+
+ vector<std::thread> threads(numThreads);
+ std::atomic<uint64_t> sum(0);
+ for (int t = 0; t < numThreads; ++t) {
+ threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
+ numThreads,
+ n,
+ std::ref(cq),
+ std::ref(sum),
+ t));
+ }
+ for (auto& t : threads) {
+ DSched::join(t);
+ }
+ EXPECT_TRUE(cq.isEmpty());
+ EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
+
+ return nowMicro() - beginMicro;
+}
+
+TEST(MPMCQueue, mt_never_fail_until_system) {
+ int nts[] = {1, 3, 100};
+
+ int n = 100000;
+ for (int nt : nts) {
+ uint64_t elapsed =
+ runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
+ LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+ << " threads";
+ }
+}
+
+TEST(MPMCQueue, mt_never_fail_until_steady) {
+ int nts[] = {1, 3, 100};
+
+ int n = 100000;
+ for (int nt : nts) {
+ uint64_t elapsed =
+ runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
+ LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+ << " threads";
+ }
+}
+
enum LifecycleEvent {
NOTHING = -1,
DEFAULT_CONSTRUCTOR,
MAX_LIFECYCLE_EVENT
};
-static __thread int lc_counts[MAX_LIFECYCLE_EVENT];
-static __thread int lc_prev[MAX_LIFECYCLE_EVENT];
+static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
+static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
static int lc_outstanding() {
return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
++lc_counts[DEFAULT_CONSTRUCTOR];
}
- explicit Lifecycle(int n, char const* s) noexcept : constructed(true) {
+ explicit Lifecycle(int /* n */, char const* /* s */) noexcept
+ : constructed(true) {
++lc_counts[TWO_ARG_CONSTRUCTOR];
}
- Lifecycle(const Lifecycle& rhs) noexcept : constructed(true) {
+ Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
++lc_counts[COPY_CONSTRUCTOR];
}
- Lifecycle(Lifecycle&& rhs) noexcept : constructed(true) {
+ Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
++lc_counts[MOVE_CONSTRUCTOR];
}
- Lifecycle& operator= (const Lifecycle& rhs) noexcept {
+ Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
++lc_counts[COPY_OPERATOR];
return *this;
}
- Lifecycle& operator= (Lifecycle&& rhs) noexcept {
+ Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
++lc_counts[MOVE_OPERATOR];
return *this;
}
LIFECYCLE_STEP(DESTRUCTOR);
}
-int main(int argc, char ** argv) {
- testing::InitGoogleTest(&argc, argv);
- google::ParseCommandLineFlags(&argc, &argv, true);
- return RUN_ALL_TESTS();
+TEST(MPMCQueue, explicit_zero_capacity_fail) {
+ ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
}