typedef DeterministicSchedule DSched;
-template <template<typename> class Atom>
+template <template <typename> class Atom>
void run_mt_sequencer_thread(
int numThreads,
int numOps,
}
}
-template <template<typename> class Atom>
+template <template <typename> class Atom>
void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
TurnSequencer<Atom> seq(init);
Atom<uint32_t> spinThreshold(0);
runElementTypeTest(std::make_pair(10, string("def")));
runElementTypeTest(vector<string>{{"abc"}});
runElementTypeTest(std::make_shared<char>('a'));
- runElementTypeTest(folly::make_unique<char>('a'));
+ runElementTypeTest(std::make_unique<char>('a'));
runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
EXPECT_EQ(RefCounted::active_instances, 0);
}
runElementTypeTest<true>(std::make_pair(10, string("def")));
runElementTypeTest<true>(vector<string>{{"abc"}});
runElementTypeTest<true>(std::make_shared<char>('a'));
- runElementTypeTest<true>(folly::make_unique<char>('a'));
+ runElementTypeTest<true>(std::make_unique<char>('a'));
runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
EXPECT_EQ(RefCounted::active_instances, 0);
}
}
}
-template <template<typename> class Atom, bool Dynamic = false>
+template <template <typename> class Atom, bool Dynamic = false>
void runTryEnqDeqThread(
int numThreads,
int n, /*numOps*/
sum += threadSum;
}
-template <template<typename> class Atom, bool Dynamic = false>
+template <template <typename> class Atom, bool Dynamic = false>
void runTryEnqDeqTest(int numThreads, int numOps) {
// write and read aren't linearizable, so we don't have
// hard guarantees on their individual behavior. We can still test
// we use the Bench method, but perf results are meaningless under DSched
DSched sched(DSched::uniform(seed));
- vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
- Dynamic>>>> callers;
- callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
- DeterministicAtomic, Dynamic>>>());
- callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
- DeterministicAtomic, Dynamic>>>());
- callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
- DeterministicAtomic, Dynamic>>>());
- callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
- DeterministicAtomic, Dynamic>>>(milliseconds(1)));
- callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
- DeterministicAtomic, Dynamic>>>(seconds(2)));
+ using QueueType = MPMCQueue<int, DeterministicAtomic, Dynamic>;
+
+ vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
+ callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
+ callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
+ callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
+ callers.emplace_back(
+ std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
+ callers.emplace_back(
+ std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
size_t cap;
for (const auto& caller : callers) {
// we use the Bench method, but perf results are meaningless under DSched
DSched sched(DSched::uniform(seed));
- vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
- true>>>> callers;
- callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
- DeterministicAtomic, true>>>());
- callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
- DeterministicAtomic, true>>>());
- callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
- DeterministicAtomic, true>>>());
- callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
- DeterministicAtomic, true>>>(milliseconds(1)));
- callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
- DeterministicAtomic, true>>>(seconds(2)));
+ using QueueType = MPMCQueue<int, DeterministicAtomic, true>;
+
+ vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
+ callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
+ callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
+ callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
+ callers.emplace_back(
+ std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
+ callers.emplace_back(
+ std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
for (const auto& caller : callers) {
LOG(INFO) <<
template <bool Dynamic = false>
void runMtProdCons() {
+ using QueueType = MPMCQueue<int, std::atomic, Dynamic>;
+
int n = 100000;
setFromEnv(n, "NUM_OPS");
- vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, std::atomic, Dynamic>>>>
+ vector<unique_ptr<WriteMethodCaller<QueueType>>>
callers;
- callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
- std::atomic, Dynamic>>>());
- callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
- std::atomic, Dynamic>>>());
- callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int, std::atomic,
- Dynamic>>>());
- callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
- std::atomic, Dynamic>>>(milliseconds(1)));
- callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
- std::atomic, Dynamic>>>(seconds(2)));
+ callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
+ callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
+ callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
+ callers.emplace_back(
+ std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
+ callers.emplace_back(
+ std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
for (const auto& caller : callers) {
- LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
- 1, 1, n, *caller);
- LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
- 10, 1, n, *caller);
- LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
- 1, 10, n, *caller);
- LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
- 10, 10, n, *caller);
- LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
- 1, 1, n, *caller);
- LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
- 10, 1, n, *caller);
- LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
- 1, 10, n, *caller);
- LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
- 10, 10, n, *caller);
- LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(100000)),
- 32, 100, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
}
}
template <bool Dynamic = false>
void runMtProdConsEmulatedFutex() {
+ using QueueType = MPMCQueue<int, EmulatedFutexAtomic, Dynamic>;
+
int n = 100000;
- vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic,
- Dynamic>>>> callers;
- callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
- EmulatedFutexAtomic, Dynamic>>>());
- callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
- EmulatedFutexAtomic, Dynamic>>>());
- callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
- EmulatedFutexAtomic, Dynamic>>>());
- callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
- EmulatedFutexAtomic, Dynamic>>>(milliseconds(1)));
- callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
- EmulatedFutexAtomic, Dynamic>>>(seconds(2)));
+ vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
+ callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
+ callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
+ callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
+ callers.emplace_back(
+ std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
+ callers.emplace_back(
+ std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
for (const auto& caller : callers) {
- LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 1, n, *caller);
- LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 1, n, *caller);
- LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 10, n, *caller);
- LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 10, n, *caller);
- LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 1, n, *caller);
- LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 10, 1, n, *caller);
- LOG(INFO) << PC_BENCH(
- (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 10, n, *caller);
- LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
- (10000)), 10, 10, n, *caller);
- LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
- (100000)), 32, 100, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
}
}
return nowMicro() - beginMicro;
}
-template <template<typename> class Atom, bool Dynamic = false>
+template <template <typename> class Atom, bool Dynamic = false>
void runMtNeverFail(std::vector<int>& nts, int n) {
for (int nt : nts) {
uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
}
}
+// All the never_fail tests are for the non-dynamic version only.
+// False positive for dynamic version. Some writeIfNotFull() and
+// tryWriteUntil() operations may fail in transient conditions related
+// to expansion.
+
TEST(MPMCQueue, mt_never_fail) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFail<std::atomic>(nts, n);
}
-TEST(MPMCQueue, mt_never_fail_dynamic) {
- std::vector<int> nts {1, 3, 100};
- int n = 100000;
- runMtNeverFail<std::atomic, true>(nts, n);
-}
-
TEST(MPMCQueue, mt_never_fail_emulated_futex) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFail<EmulatedFutexAtomic>(nts, n);
}
-TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) {
- std::vector<int> nts {1, 3, 100};
- int n = 100000;
- runMtNeverFail<EmulatedFutexAtomic, true>(nts, n);
-}
-
-template<bool Dynamic = false>
+template <bool Dynamic = false>
void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
LOG(INFO) << "using seed " << seed;
for (int nt : nts) {
runMtNeverFailDeterministic(nts, n, seed);
}
-TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) {
- std::vector<int> nts {3, 10};
- long seed = 0; // nowMicro() % 10000;
- int n = 1000;
- runMtNeverFailDeterministic<true>(nts, n, seed);
-}
-
template <class Clock, template <typename> class Atom, bool Dynamic>
void runNeverFailUntilThread(int numThreads,
int n, /*numOps*/
runMtNeverFailUntilSystem(nts, n);
}
-TEST(MPMCQueue, mt_never_fail_until_system_dynamic) {
- std::vector<int> nts {1, 3, 100};
- int n = 100000;
- runMtNeverFailUntilSystem<true>(nts, n);
-}
-
template <bool Dynamic = false>
void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
for (int nt : nts) {
runMtNeverFailUntilSteady(nts, n);
}
-TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) {
- std::vector<int> nts {1, 3, 100};
- int n = 100000;
- runMtNeverFailUntilSteady<true>(nts, n);
-}
-
enum LifecycleEvent {
NOTHING = -1,
DEFAULT_CONSTRUCTOR,
TEST(MPMCQueue, try_write_until_dynamic) {
testTryWriteUntil<true>();
}
+
+template <bool Dynamic>
+void testTimeout(MPMCQueue<int, std::atomic, Dynamic>& q) {
+ CHECK(q.write(1));
+ /* The following must not block forever */
+ q.tryWriteUntil(
+ std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
+}
+
+TEST(MPMCQueue, try_write_until_timeout) {
+ folly::MPMCQueue<int, std::atomic, false> queue(1);
+ testTimeout<false>(queue);
+}
+
+TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
+ folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
+ testTimeout<true>(queue);
+}