Include <folly/portability/SysTime.h> rather than <sys/time.h>
[folly.git] / folly / test / MPMCQueueTest.cpp
index 5bb2fa7b966ebf40baa02308640f0f92949fed9d..d42f402ab0f96d69e27ef5bc59deeabdfb233714 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2016 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
 #include <folly/MPMCQueue.h>
 #include <folly/Format.h>
 #include <folly/Memory.h>
+#include <folly/portability/SysTime.h>
 #include <folly/test/DeterministicSchedule.h>
 
 #include <boost/intrusive_ptr.hpp>
 #include <thread>
 #include <utility>
 #include <unistd.h>
-#include <sys/time.h>
 #include <sys/resource.h>
 
-#include <gflags/gflags.h>
 #include <gtest/gtest.h>
 
 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
@@ -36,26 +35,44 @@ FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
 using namespace folly;
 using namespace detail;
 using namespace test;
+using std::chrono::time_point;
+using std::chrono::steady_clock;
+using std::chrono::seconds;
+using std::chrono::milliseconds;
+using std::string;
+using std::unique_ptr;
+using std::vector;
 
 typedef DeterministicSchedule DSched;
 
+template <template<typename> class Atom>
+void run_mt_sequencer_thread(
+    int numThreads,
+    int numOps,
+    uint32_t init,
+    TurnSequencer<Atom>& seq,
+    Atom<uint32_t>& spinThreshold,
+    int& prev,
+    int i) {
+  for (int op = i; op < numOps; op += numThreads) {
+    seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
+    EXPECT_EQ(prev, op - 1);
+    prev = op;
+    seq.completeTurn(init + op);
+  }
+}
 
 template <template<typename> class Atom>
 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
   TurnSequencer<Atom> seq(init);
-  Atom<int> spinThreshold(0);
+  Atom<uint32_t> spinThreshold(0);
 
   int prev = -1;
-  std::vector<std::thread> threads(numThreads);
+  vector<std::thread> threads(numThreads);
   for (int i = 0; i < numThreads; ++i) {
-    threads[i] = DSched::thread([&, i]{
-      for (int op = i; op < numOps; op += numThreads) {
-        seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
-        EXPECT_EQ(prev, op - 1);
-        prev = op;
-        seq.completeTurn(init + op);
-      }
-    });
+    threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
+          numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
+          std::ref(prev), i));
   }
 
   for (auto& thr : threads) {
@@ -71,6 +88,12 @@ TEST(MPMCQueue, sequencer) {
   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
 }
 
+TEST(MPMCQueue, sequencer_emulated_futex) {
+  run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
+  run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
+  run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
+}
+
 TEST(MPMCQueue, sequencer_deterministic) {
   DSched sched(DSched::uniform(0));
   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
@@ -81,37 +104,54 @@ TEST(MPMCQueue, sequencer_deterministic) {
 template <typename T>
 void runElementTypeTest(T&& src) {
   MPMCQueue<T> cq(10);
-  cq.blockingWrite(std::move(src));
+  cq.blockingWrite(std::forward<T>(src));
   T dest;
   cq.blockingRead(dest);
   EXPECT_TRUE(cq.write(std::move(dest)));
   EXPECT_TRUE(cq.read(dest));
+  auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
+  EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
+  EXPECT_TRUE(cq.read(dest));
+  auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
+  EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
+  EXPECT_TRUE(cq.read(dest));
 }
 
 struct RefCounted {
+  static __thread int active_instances;
+
   mutable std::atomic<int> rc;
 
-  RefCounted() : rc(0) {}
+  RefCounted() : rc(0) {
+    ++active_instances;
+  }
+
+  ~RefCounted() {
+    --active_instances;
+  }
 };
+__thread int RefCounted::active_instances;
+
 
 void intrusive_ptr_add_ref(RefCounted const* p) {
   p->rc++;
 }
 
 void intrusive_ptr_release(RefCounted const* p) {
-  if (--(p->rc)) {
+  if (--(p->rc) == 0) {
     delete p;
   }
 }
 
 TEST(MPMCQueue, lots_of_element_types) {
   runElementTypeTest(10);
-  runElementTypeTest(std::string("abc"));
-  runElementTypeTest(std::make_pair(10, std::string("def")));
-  runElementTypeTest(std::vector<std::string>{ { "abc" } });
+  runElementTypeTest(string("abc"));
+  runElementTypeTest(std::make_pair(10, string("def")));
+  runElementTypeTest(vector<string>{{"abc"}});
   runElementTypeTest(std::make_shared<char>('a'));
   runElementTypeTest(folly::make_unique<char>('a'));
   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
+  EXPECT_EQ(RefCounted::active_instances, 0);
 }
 
 TEST(MPMCQueue, single_thread_enqdeq) {
@@ -147,7 +187,7 @@ TEST(MPMCQueue, single_thread_enqdeq) {
 TEST(MPMCQueue, tryenq_capacity_test) {
   for (size_t cap = 1; cap < 100; ++cap) {
     MPMCQueue<int> cq(cap);
-    for (int i = 0; i < cap; ++i) {
+    for (size_t i = 0; i < cap; ++i) {
       EXPECT_TRUE(cq.write(i));
     }
     EXPECT_FALSE(cq.write(100));
@@ -175,6 +215,33 @@ TEST(MPMCQueue, enq_capacity_test) {
   }
 }
 
+template <template<typename> class Atom>
+void runTryEnqDeqThread(
+    int numThreads,
+    int n, /*numOps*/
+    MPMCQueue<int, Atom>& cq,
+    std::atomic<uint64_t>& sum,
+    int t) {
+  uint64_t threadSum = 0;
+  int src = t;
+  // received doesn't reflect any actual values, we just start with
+  // t and increment by numThreads to get the rounding of termination
+  // correct if numThreads doesn't evenly divide numOps
+  int received = t;
+  while (src < n || received < n) {
+    if (src < n && cq.write(src)) {
+      src += numThreads;
+    }
+
+    int dst;
+    if (received < n && cq.read(dst)) {
+      received += numThreads;
+      threadSum += dst;
+    }
+  }
+  sum += threadSum;
+}
+
 template <template<typename> class Atom>
 void runTryEnqDeqTest(int numThreads, int numOps) {
   // write and read aren't linearizable, so we don't have
@@ -183,29 +250,11 @@ void runTryEnqDeqTest(int numThreads, int numOps) {
   MPMCQueue<int,Atom> cq(numThreads);
 
   uint64_t n = numOps;
-  std::vector<std::thread> threads(numThreads);
+  vector<std::thread> threads(numThreads);
   std::atomic<uint64_t> sum(0);
   for (int t = 0; t < numThreads; ++t) {
-    threads[t] = DSched::thread([&,t]{
-      uint64_t threadSum = 0;
-      int src = t;
-      // received doesn't reflect any actual values, we just start with
-      // t and increment by numThreads to get the rounding of termination
-      // correct if numThreads doesn't evenly divide numOps
-      int received = t;
-      while (src < n || received < n) {
-        if (src < n && cq.write(src)) {
-          src += numThreads;
-        }
-
-        int dst;
-        if (received < n && cq.read(dst)) {
-          received += numThreads;
-          threadSum += dst;
-        }
-      }
-      sum += threadSum;
-    });
+    threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
+          numThreads, n, std::ref(cq), std::ref(sum), t));
   }
   for (auto& t : threads) {
     DSched::join(t);
@@ -223,6 +272,15 @@ TEST(MPMCQueue, mt_try_enq_deq) {
   }
 }
 
+TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
+  int nts[] = { 1, 3, 100 };
+
+  int n = 100000;
+  for (int nt : nts) {
+    runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
+  }
+}
+
 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
   int nts[] = { 3, 10 };
 
@@ -249,9 +307,59 @@ uint64_t nowMicro() {
 }
 
 template <typename Q>
-std::string producerConsumerBench(Q&& queue, std::string qName,
-                                  int numProducers, int numConsumers,
-                                  int numOps, bool ignoreContents = false) {
+struct WriteMethodCaller {
+  WriteMethodCaller() {}
+  virtual ~WriteMethodCaller() = default;
+  virtual bool callWrite(Q& q, int i) = 0;
+  virtual string methodName() = 0;
+};
+
+template <typename Q>
+struct BlockingWriteCaller : public WriteMethodCaller<Q> {
+  bool callWrite(Q& q, int i) override {
+    q.blockingWrite(i);
+    return true;
+  }
+  string methodName() override { return "blockingWrite"; }
+};
+
+template <typename Q>
+struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
+  bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
+  string methodName() override { return "writeIfNotFull"; }
+};
+
+template <typename Q>
+struct WriteCaller : public WriteMethodCaller<Q> {
+  bool callWrite(Q& q, int i) override { return q.write(i); }
+  string methodName() override { return "write"; }
+};
+
+template <typename Q,
+          class Clock = steady_clock,
+          class Duration = typename Clock::duration>
+struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
+  const Duration duration_;
+  explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
+  bool callWrite(Q& q, int i) override {
+    auto then = Clock::now() + duration_;
+    return q.tryWriteUntil(then, i);
+  }
+  string methodName() override {
+    return folly::sformat(
+        "tryWriteUntil({}ms)",
+        std::chrono::duration_cast<milliseconds>(duration_).count());
+  }
+};
+
+template <typename Q>
+string producerConsumerBench(Q&& queue,
+                             string qName,
+                             int numProducers,
+                             int numConsumers,
+                             int numOps,
+                             WriteMethodCaller<Q>& writer,
+                             bool ignoreContents = false) {
   Q& q = queue;
 
   struct rusage beginUsage;
@@ -261,17 +369,20 @@ std::string producerConsumerBench(Q&& queue, std::string qName,
 
   uint64_t n = numOps;
   std::atomic<uint64_t> sum(0);
+  std::atomic<uint64_t> failed(0);
 
-  std::vector<std::thread> producers(numProducers);
+  vector<std::thread> producers(numProducers);
   for (int t = 0; t < numProducers; ++t) {
     producers[t] = DSched::thread([&,t]{
       for (int i = t; i < numOps; i += numProducers) {
-        q.blockingWrite(i);
+        while (!writer.callWrite(q, i)) {
+          ++failed;
+        }
       }
     });
   }
 
-  std::vector<std::thread> consumers(numConsumers);
+  vector<std::thread> consumers(numConsumers);
   for (int t = 0; t < numConsumers; ++t) {
     consumers[t] = DSched::thread([&,t]{
       uint64_t localSum = 0;
@@ -303,27 +414,76 @@ std::string producerConsumerBench(Q&& queue, std::string qName,
   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
-
-  return folly::format(
-      "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff",
-      qName, numProducers, numConsumers, nanosPer, csw, n).str();
+  uint64_t failures = failed;
+
+  return folly::sformat(
+      "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
+      "handoff, {} failures",
+      qName,
+      numProducers,
+      writer.methodName(),
+      numConsumers,
+      nanosPer,
+      csw,
+      n,
+      failures);
 }
 
-
 TEST(MPMCQueue, mt_prod_cons_deterministic) {
   // we use the Bench method, but perf results are meaningless under DSched
   DSched sched(DSched::uniform(0));
 
-  producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
-          "", 1, 1, 1000);
-  producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
-          "", 10, 10, 1000);
-  producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
-          "", 1, 1, 1000);
-  producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
-          "", 10, 10, 1000);
-  producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1),
-          "", 10, 10, 1000);
+  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
+      callers;
+  callers.emplace_back(
+      make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
+  callers.emplace_back(
+      make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
+  callers.emplace_back(
+      make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
+  callers.emplace_back(
+      make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
+          milliseconds(1)));
+  callers.emplace_back(
+      make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
+          seconds(2)));
+
+  for (const auto& caller : callers) {
+    LOG(INFO)
+        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
+                                 "MPMCQueue<int, DeterministicAtomic>(10)",
+                                 1,
+                                 1,
+                                 1000,
+                                 *caller);
+    LOG(INFO)
+        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
+                                 "MPMCQueue<int, DeterministicAtomic>(100)",
+                                 10,
+                                 10,
+                                 1000,
+                                 *caller);
+    LOG(INFO)
+        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
+                                 "MPMCQueue<int, DeterministicAtomic>(10)",
+                                 1,
+                                 1,
+                                 1000,
+                                 *caller);
+    LOG(INFO)
+        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
+                                 "MPMCQueue<int, DeterministicAtomic>(100)",
+                                 10,
+                                 10,
+                                 1000,
+                                 *caller);
+    LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
+                                       "MPMCQueue<int, DeterministicAtomic>(1)",
+                                       10,
+                                       10,
+                                       1000,
+                                       *caller);
+  }
 }
 
 #define PC_BENCH(q, np, nc, ...) \
@@ -331,41 +491,101 @@ TEST(MPMCQueue, mt_prod_cons_deterministic) {
 
 TEST(MPMCQueue, mt_prod_cons) {
   int n = 100000;
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
+  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
+  callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
+  callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
+  callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
+  callers.emplace_back(
+      make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
+  callers.emplace_back(
+      make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
+  for (const auto& caller : callers) {
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
+  }
 }
 
-template <template<typename> class Atom>
+TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
+  int n = 100000;
+  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
+      callers;
+  callers.emplace_back(
+      make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
+  callers.emplace_back(
+      make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
+  callers.emplace_back(
+      make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
+  callers.emplace_back(
+      make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
+          milliseconds(1)));
+  callers.emplace_back(
+      make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
+          seconds(2)));
+  for (const auto& caller : callers) {
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
+  }
+}
+
+template <template <typename> class Atom>
+void runNeverFailThread(int numThreads,
+                        int n, /*numOps*/
+                        MPMCQueue<int, Atom>& cq,
+                        std::atomic<uint64_t>& sum,
+                        int t) {
+  uint64_t threadSum = 0;
+  for (int i = t; i < n; i += numThreads) {
+    // enq + deq
+    EXPECT_TRUE(cq.writeIfNotFull(i));
+
+    int dest = -1;
+    EXPECT_TRUE(cq.readIfNotEmpty(dest));
+    EXPECT_TRUE(dest >= 0);
+    threadSum += dest;
+  }
+  sum += threadSum;
+}
+
+template <template <typename> class Atom>
 uint64_t runNeverFailTest(int numThreads, int numOps) {
   // always #enq >= #deq
-  MPMCQueue<int,Atom> cq(numThreads);
+  MPMCQueue<int, Atom> cq(numThreads);
 
   uint64_t n = numOps;
   auto beginMicro = nowMicro();
 
-  std::vector<std::thread> threads(numThreads);
+  vector<std::thread> threads(numThreads);
   std::atomic<uint64_t> sum(0);
   for (int t = 0; t < numThreads; ++t) {
-    threads[t] = DSched::thread([&,t]{
-      uint64_t threadSum = 0;
-      for (int i = t; i < n; i += numThreads) {
-        // enq + deq
-        EXPECT_TRUE(cq.writeIfNotFull(i));
-
-        int dest = -1;
-        EXPECT_TRUE(cq.readIfNotEmpty(dest));
-        EXPECT_TRUE(dest >= 0);
-        threadSum += dest;
-      }
-      sum += threadSum;
-    });
+    threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
+                                          numThreads,
+                                          n,
+                                          std::ref(cq),
+                                          std::ref(sum),
+                                          t));
   }
   for (auto& t : threads) {
     DSched::join(t);
@@ -377,18 +597,29 @@ uint64_t runNeverFailTest(int numThreads, int numOps) {
 }
 
 TEST(MPMCQueue, mt_never_fail) {
-  int nts[] = { 1, 3, 100 };
+  int nts[] = {1, 3, 100};
 
   int n = 100000;
   for (int nt : nts) {
     uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
-    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
-              << nt << " threads";
+    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+              << " threads";
+  }
+}
+
+TEST(MPMCQueue, mt_never_fail_emulated_futex) {
+  int nts[] = {1, 3, 100};
+
+  int n = 100000;
+  for (int nt : nts) {
+    uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
+    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+              << " threads";
   }
 }
 
 TEST(MPMCQueue, mt_never_fail_deterministic) {
-  int nts[] = { 3, 10 };
+  int nts[] = {3, 10};
 
   long seed = 0; // nowMicro() % 10000;
   LOG(INFO) << "using seed " << seed;
@@ -406,6 +637,77 @@ TEST(MPMCQueue, mt_never_fail_deterministic) {
   }
 }
 
+template <class Clock, template <typename> class Atom>
+void runNeverFailUntilThread(int numThreads,
+                             int n, /*numOps*/
+                             MPMCQueue<int, Atom>& cq,
+                             std::atomic<uint64_t>& sum,
+                             int t) {
+  uint64_t threadSum = 0;
+  for (int i = t; i < n; i += numThreads) {
+    // enq + deq
+    auto soon = Clock::now() + std::chrono::seconds(1);
+    EXPECT_TRUE(cq.tryWriteUntil(soon, i));
+
+    int dest = -1;
+    EXPECT_TRUE(cq.readIfNotEmpty(dest));
+    EXPECT_TRUE(dest >= 0);
+    threadSum += dest;
+  }
+  sum += threadSum;
+}
+
+template <class Clock, template <typename> class Atom>
+uint64_t runNeverFailTest(int numThreads, int numOps) {
+  // always #enq >= #deq
+  MPMCQueue<int, Atom> cq(numThreads);
+
+  uint64_t n = numOps;
+  auto beginMicro = nowMicro();
+
+  vector<std::thread> threads(numThreads);
+  std::atomic<uint64_t> sum(0);
+  for (int t = 0; t < numThreads; ++t) {
+    threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
+                                          numThreads,
+                                          n,
+                                          std::ref(cq),
+                                          std::ref(sum),
+                                          t));
+  }
+  for (auto& t : threads) {
+    DSched::join(t);
+  }
+  EXPECT_TRUE(cq.isEmpty());
+  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
+
+  return nowMicro() - beginMicro;
+}
+
+TEST(MPMCQueue, mt_never_fail_until_system) {
+  int nts[] = {1, 3, 100};
+
+  int n = 100000;
+  for (int nt : nts) {
+    uint64_t elapsed =
+        runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
+    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+              << " threads";
+  }
+}
+
+TEST(MPMCQueue, mt_never_fail_until_steady) {
+  int nts[] = {1, 3, 100};
+
+  int n = 100000;
+  for (int nt : nts) {
+    uint64_t elapsed =
+        runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
+    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+              << " threads";
+  }
+}
+
 enum LifecycleEvent {
   NOTHING = -1,
   DEFAULT_CONSTRUCTOR,
@@ -418,8 +720,8 @@ enum LifecycleEvent {
   MAX_LIFECYCLE_EVENT
 };
 
-static __thread int lc_counts[MAX_LIFECYCLE_EVENT];
-static __thread int lc_prev[MAX_LIFECYCLE_EVENT];
+static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
+static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
 
 static int lc_outstanding() {
   return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
@@ -456,24 +758,25 @@ struct Lifecycle {
     ++lc_counts[DEFAULT_CONSTRUCTOR];
   }
 
-  explicit Lifecycle(int n, char const* s) noexcept : constructed(true) {
+  explicit Lifecycle(int /* n */, char const* /* s */) noexcept
+      : constructed(true) {
     ++lc_counts[TWO_ARG_CONSTRUCTOR];
   }
 
-  Lifecycle(const Lifecycle& rhs) noexcept : constructed(true) {
+  Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
     ++lc_counts[COPY_CONSTRUCTOR];
   }
 
-  Lifecycle(Lifecycle&& rhs) noexcept : constructed(true) {
+  Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
     ++lc_counts[MOVE_CONSTRUCTOR];
   }
 
-  Lifecycle& operator= (const Lifecycle& rhs) noexcept {
+  Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
     ++lc_counts[COPY_OPERATOR];
     return *this;
   }
 
-  Lifecycle& operator= (Lifecycle&& rhs) noexcept {
+  Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
     ++lc_counts[MOVE_OPERATOR];
     return *this;
   }
@@ -631,8 +934,6 @@ TEST(MPMCQueue, queue_moving) {
   LIFECYCLE_STEP(DESTRUCTOR);
 }
 
-int main(int argc, char ** argv) {
-  testing::InitGoogleTest(&argc, argv);
-  google::ParseCommandLineFlags(&argc, &argv, true);
-  return RUN_ALL_TESTS();
+TEST(MPMCQueue, explicit_zero_capacity_fail) {
+  ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
 }