+TEST(MPMCQueue, mt_never_fail_deterministic) {
+ std::vector<int> nts {3, 10};
+ long seed = 0; // nowMicro() % 10000;
+ int n = 1000;
+ runMtNeverFailDeterministic(nts, n, seed);
+}
+
+template <class Clock, template <typename> class Atom, bool Dynamic>
+void runNeverFailUntilThread(int numThreads,
+ int n, /*numOps*/
+ MPMCQueue<int, Atom, Dynamic>& cq,
+ std::atomic<uint64_t>& sum,
+ int t) {
+ uint64_t threadSum = 0;
+ for (int i = t; i < n; i += numThreads) {
+ // enq + deq
+ auto soon = Clock::now() + std::chrono::seconds(1);
+ EXPECT_TRUE(cq.tryWriteUntil(soon, i));
+
+ int dest = -1;
+ EXPECT_TRUE(cq.readIfNotEmpty(dest));
+ EXPECT_TRUE(dest >= 0);
+ threadSum += dest;
+ }
+ sum += threadSum;
+}
+
+template <class Clock, template <typename> class Atom, bool Dynamic = false>
+uint64_t runNeverFailTest(int numThreads, int numOps) {
+ // always #enq >= #deq
+ MPMCQueue<int, Atom, Dynamic> cq(numThreads);
+
+ uint64_t n = numOps;
+ auto beginMicro = nowMicro();
+
+ vector<std::thread> threads(numThreads);
+ std::atomic<uint64_t> sum(0);
+ for (int t = 0; t < numThreads; ++t) {
+ threads[t] = DSched::thread(std::bind(
+ runNeverFailUntilThread<Clock, Atom, Dynamic>,
+ numThreads,
+ n,
+ std::ref(cq),
+ std::ref(sum),
+ t));
+ }
+ for (auto& t : threads) {
+ DSched::join(t);
+ }
+ EXPECT_TRUE(cq.isEmpty());
+ EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
+
+ return nowMicro() - beginMicro;
+}
+
+template <bool Dynamic = false>
+void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
+ for (int nt : nts) {
+ uint64_t elapsed =
+ runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
+ LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+ << " threads";
+ }
+}
+
+TEST(MPMCQueue, mt_never_fail_until_system) {
+ std::vector<int> nts {1, 3, 100};
+ int n = 100000;
+ runMtNeverFailUntilSystem(nts, n);
+}
+
+template <bool Dynamic = false>
+void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
+ for (int nt : nts) {
+ uint64_t elapsed =
+ runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
+ LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+ << " threads";
+ }
+}
+
+TEST(MPMCQueue, mt_never_fail_until_steady) {
+ std::vector<int> nts {1, 3, 100};
+ int n = 100000;
+ runMtNeverFailUntilSteady(nts, n);
+}
+