/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
*/
-#include <folly/MPMCQueue.h>
#include <folly/Format.h>
+#include <folly/MPMCQueue.h>
#include <folly/Memory.h>
#include <folly/portability/GTest.h>
#include <folly/portability/SysResource.h>
#include <folly/portability/SysTime.h>
#include <folly/portability/Unistd.h>
+#include <folly/stop_watch.h>
#include <folly/test/DeterministicSchedule.h>
#include <boost/intrusive_ptr.hpp>
-#include <memory>
+#include <boost/thread/barrier.hpp>
#include <functional>
+#include <memory>
#include <thread>
#include <utility>
runElementTypeTest(std::make_pair(10, string("def")));
runElementTypeTest(vector<string>{{"abc"}});
runElementTypeTest(std::make_shared<char>('a'));
- runElementTypeTest(folly::make_unique<char>('a'));
+ runElementTypeTest(std::make_unique<char>('a'));
runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
EXPECT_EQ(RefCounted::active_instances, 0);
}
runElementTypeTest<true>(std::make_pair(10, string("def")));
runElementTypeTest<true>(vector<string>{{"abc"}});
runElementTypeTest<true>(std::make_shared<char>('a'));
- runElementTypeTest<true>(folly::make_unique<char>('a'));
+ runElementTypeTest<true>(std::make_unique<char>('a'));
runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
EXPECT_EQ(RefCounted::active_instances, 0);
}
}
}
+// All the never_fail tests are for the non-dynamic version only.
+// False positive for dynamic version. Some writeIfNotFull() and
+// tryWriteUntil() operations may fail in transient conditions related
+// to expansion.
+
TEST(MPMCQueue, mt_never_fail) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFail<std::atomic>(nts, n);
}
-TEST(MPMCQueue, mt_never_fail_dynamic) {
- std::vector<int> nts {1, 3, 100};
- int n = 100000;
- runMtNeverFail<std::atomic, true>(nts, n);
-}
-
TEST(MPMCQueue, mt_never_fail_emulated_futex) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFail<EmulatedFutexAtomic>(nts, n);
}
-TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) {
- std::vector<int> nts {1, 3, 100};
- int n = 100000;
- runMtNeverFail<EmulatedFutexAtomic, true>(nts, n);
-}
-
template<bool Dynamic = false>
void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
LOG(INFO) << "using seed " << seed;
runMtNeverFailDeterministic(nts, n, seed);
}
-TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) {
- std::vector<int> nts {3, 10};
- long seed = 0; // nowMicro() % 10000;
- int n = 1000;
- runMtNeverFailDeterministic<true>(nts, n, seed);
-}
-
template <class Clock, template <typename> class Atom, bool Dynamic>
void runNeverFailUntilThread(int numThreads,
int n, /*numOps*/
runMtNeverFailUntilSystem(nts, n);
}
-TEST(MPMCQueue, mt_never_fail_until_system_dynamic) {
- std::vector<int> nts {1, 3, 100};
- int n = 100000;
- runMtNeverFailUntilSystem<true>(nts, n);
-}
-
template <bool Dynamic = false>
void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
for (int nt : nts) {
runMtNeverFailUntilSteady(nts, n);
}
-TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) {
- std::vector<int> nts {1, 3, 100};
- int n = 100000;
- runMtNeverFailUntilSteady<true>(nts, n);
-}
-
enum LifecycleEvent {
NOTHING = -1,
DEFAULT_CONSTRUCTOR,
using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
}
+
+template <bool Dynamic>
+void testTryReadUntil() {
+ MPMCQueue<int, std::atomic, Dynamic> q{1};
+
+ const auto wait = std::chrono::milliseconds(100);
+ stop_watch<> watch;
+ bool rets[2];
+ int vals[2];
+ std::vector<std::thread> threads;
+ boost::barrier b{3};
+ for (int i = 0; i < 2; i++) {
+ threads.emplace_back([&, i] {
+ b.wait();
+ rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
+ });
+ }
+
+ b.wait();
+ EXPECT_TRUE(q.write(42));
+
+ for (int i = 0; i < 2; i++) {
+ threads[i].join();
+ }
+
+ for (int i = 0; i < 2; i++) {
+ int other = (i + 1) % 2;
+ if (rets[i]) {
+ EXPECT_EQ(42, vals[i]);
+ EXPECT_FALSE(rets[other]);
+ }
+ }
+
+ EXPECT_TRUE(watch.elapsed(wait));
+}
+
+template <bool Dynamic>
+void testTryWriteUntil() {
+ MPMCQueue<int, std::atomic, Dynamic> q{1};
+ EXPECT_TRUE(q.write(42));
+
+ const auto wait = std::chrono::milliseconds(100);
+ stop_watch<> watch;
+ bool rets[2];
+ std::vector<std::thread> threads;
+ boost::barrier b{3};
+ for (int i = 0; i < 2; i++) {
+ threads.emplace_back([&, i] {
+ b.wait();
+ rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
+ });
+ }
+
+ b.wait();
+ int x;
+ EXPECT_TRUE(q.read(x));
+ EXPECT_EQ(42, x);
+
+ for (int i = 0; i < 2; i++) {
+ threads[i].join();
+ }
+ EXPECT_TRUE(q.read(x));
+
+ for (int i = 0; i < 2; i++) {
+ int other = (i + 1) % 2;
+ if (rets[i]) {
+ EXPECT_EQ(i, x);
+ EXPECT_FALSE(rets[other]);
+ }
+ }
+
+ EXPECT_TRUE(watch.elapsed(wait));
+}
+
+TEST(MPMCQueue, try_read_until) {
+ testTryReadUntil<false>();
+}
+
+TEST(MPMCQueue, try_read_until_dynamic) {
+ testTryReadUntil<true>();
+}
+
+TEST(MPMCQueue, try_write_until) {
+ testTryWriteUntil<false>();
+}
+
+TEST(MPMCQueue, try_write_until_dynamic) {
+ testTryWriteUntil<true>();
+}
+
+template <bool Dynamic>
+void testTimeout(MPMCQueue<int, std::atomic, Dynamic>& q) {
+ CHECK(q.write(1));
+ /* The following must not block forever */
+ q.tryWriteUntil(
+ std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
+}
+
+TEST(MPMCQueue, try_write_until_timeout) {
+ folly::MPMCQueue<int, std::atomic, false> queue(1);
+ testTimeout<false>(queue);
+}
+
+TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
+ folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
+ testTimeout<true>(queue);
+}