Convert a polling loop to a futex wait
authorPhil Willoughby <philwill@fb.com>
Tue, 15 Mar 2016 08:39:48 +0000 (01:39 -0700)
committerFacebook Github Bot 6 <facebook-github-bot-6-bot@fb.com>
Tue, 15 Mar 2016 08:50:22 +0000 (01:50 -0700)
Summary:Add a new method to MPMCQueue:
```
template <class Clock, typename... Args>
  bool tryWriteUntil(const std::chrono::time_point<Clock>& when,
                     Args&&... args) noexcept
```
This allows you to write producers which terminate reliably in the absence of consumers.

Returns `true` if `args` was enqueued, `false` otherwise.

`Clock` must be one of the types supported by the underlying call to `folly::detail::Futex::futexWaitUntil`; at time of writing these are `std::chrono::steady_clock` and `std::chrono::system_clock`.

Reviewed By: nbronson

Differential Revision: D2895574

fb-gh-sync-id: bdfabcd043191c149f1271e30ffc28476cc8a36e
shipit-source-id: bdfabcd043191c149f1271e30ffc28476cc8a36e

folly/MPMCQueue.h
folly/detail/TurnSequencer.h
folly/test/MPMCQueueTest.cpp

index 05de4c7b10c7b39ed23f1268c35e74c0d24d259c..66b76886a44554bc7f9149be9f8ce4939066cb84 100644 (file)
@@ -284,6 +284,21 @@ class MPMCQueue : boost::noncopyable {
     }
   }
 
+  template <class Clock, typename... Args>
+  bool tryWriteUntil(const std::chrono::time_point<Clock>& when,
+                     Args&&... args) noexcept {
+    uint64_t ticket;
+    if (tryObtainPromisedPushTicketUntil(ticket, when)) {
+      // we have pre-validated that the ticket won't block, or rather that
+      // it won't block longer than it takes another thread to dequeue an
+      // element from the slot it identifies.
+      enqueueWithTicket(ticket, std::forward<Args>(args)...);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   /// If the queue is not full, enqueues and returns true, otherwise
   /// returns false.  Unlike write this method can be blocked by another
   /// thread, specifically a read that has linearized (been assigned
@@ -471,6 +486,28 @@ class MPMCQueue : boost::noncopyable {
     }
   }
 
+  /// Tries until when to obtain a push ticket for which
+  /// SingleElementQueue::enqueue  won't block.  Returns true on success, false
+  /// on failure.
+  /// ticket is filled on success AND failure.
+  template <class Clock>
+  bool tryObtainPromisedPushTicketUntil(
+      uint64_t& ticket, const std::chrono::time_point<Clock>& when) noexcept {
+    bool deadlineReached = false;
+    while (!deadlineReached) {
+      if (tryObtainPromisedPushTicket(ticket)) {
+        return true;
+      }
+      // ticket is a blocking ticket until the preceding ticket has been
+      // processed: wait until this ticket's turn arrives. We have not reserved
+      // this ticket so we will have to re-attempt to get a non-blocking ticket
+      // if we wake up before we time-out.
+      deadlineReached = !slots_[idx(ticket)].tryWaitForEnqueueTurnUntil(
+          turn(ticket), pushSpinCutoff_, (ticket % kAdaptationFreq) == 0, when);
+    }
+    return false;
+  }
+
   /// Tries to obtain a push ticket which can be satisfied if all
   /// in-progress pops complete.  This function does not block, but
   /// blocking may be required when using the returned ticket if some
@@ -482,6 +519,7 @@ class MPMCQueue : boost::noncopyable {
       auto numPops = popTicket_.load(std::memory_order_acquire); // B
       // n will be negative if pops are pending
       int64_t n = numPushes - numPops;
+      rv = numPushes;
       if (n >= static_cast<ssize_t>(capacity_)) {
         // Full, linearize at B.  We don't need to recheck the read we
         // performed at A, because if numPushes was stale at B then the
@@ -489,7 +527,6 @@ class MPMCQueue : boost::noncopyable {
         return false;
       }
       if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) {
-        rv = numPushes;
         return true;
       }
     }
@@ -597,7 +634,7 @@ struct SingleElementQueue {
   template <typename = typename std::enable_if<
                 (folly::IsRelocatable<T>::value &&
                  boost::has_nothrow_constructor<T>::value) ||
-                std::is_nothrow_constructible<T,T&&>::value>::type>
+                std::is_nothrow_constructible<T, T&&>::value>::type>
   void enqueue(const uint32_t turn,
                Atom<uint32_t>& spinCutoff,
                const bool updateSpinCutoff,
@@ -611,6 +648,20 @@ struct SingleElementQueue {
                                   ImplByMove, ImplByRelocation>::type());
   }
 
+  /// Waits until either:
+  /// 1: the dequeue turn preceding the given enqueue turn has arrived
+  /// 2: the given deadline has arrived
+  /// Case 1 returns true, case 2 returns false.
+  template <class Clock>
+  bool tryWaitForEnqueueTurnUntil(
+      const uint32_t turn,
+      Atom<uint32_t>& spinCutoff,
+      const bool updateSpinCutoff,
+      const std::chrono::time_point<Clock>& when) noexcept {
+    return sequencer_.tryWaitForTurn(
+        turn * 2, spinCutoff, updateSpinCutoff, &when);
+  }
+
   bool mayEnqueue(const uint32_t turn) const noexcept {
     return sequencer_.isTurn(turn * 2);
   }
index 85aed3169a9c4cb5cf11da2f798177e5eb98c319..32a6f57f16522f355d201c7818424c43472b9fe2 100644 (file)
@@ -82,10 +82,10 @@ struct TurnSequencer {
   /// See tryWaitForTurn
   /// Requires that `turn` is not a turn in the past.
   void waitForTurn(const uint32_t turn,
-                Atom<uint32_t>& spinCutoff,
-                const bool updateSpinCutoff) noexcept {
+                   Atom<uint32_t>& spinCutoff,
+                   const bool updateSpinCutoff) noexcept {
     bool success = tryWaitForTurn(turn, spinCutoff, updateSpinCutoff);
-    (void) success;
+    (void)success;
     assert(success);
   }
 
@@ -99,9 +99,15 @@ struct TurnSequencer {
   /// before blocking and will adjust spinCutoff based on the results,
   /// otherwise it will spin for at most spinCutoff spins.
   /// Returns true if the wait succeeded, false if the turn is in the past
+  /// or the absTime time value is not nullptr and is reached before the turn
+  /// arrives
+  template <class Clock = std::chrono::steady_clock,
+            class Duration = typename Clock::duration>
   bool tryWaitForTurn(const uint32_t turn,
-                   Atom<uint32_t>& spinCutoff,
-                   const bool updateSpinCutoff) noexcept {
+                      Atom<uint32_t>& spinCutoff,
+                      const bool updateSpinCutoff,
+                      const std::chrono::time_point<Clock, Duration>* absTime =
+                          nullptr) noexcept {
     uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed);
     const uint32_t effectiveSpinCutoff =
         updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh;
@@ -142,7 +148,15 @@ struct TurnSequencer {
           continue;
         }
       }
-      state_.futexWait(new_state, futexChannel(turn));
+      if (absTime) {
+        auto futexResult =
+            state_.futexWaitUntil(new_state, *absTime, futexChannel(turn));
+        if (futexResult == FutexResult::TIMEDOUT) {
+          return false;
+        }
+      } else {
+        state_.futexWait(new_state, futexChannel(turn));
+      }
     }
 
     if (updateSpinCutoff || prevThresh == 0) {
@@ -179,9 +193,9 @@ struct TurnSequencer {
     while (true) {
       assert(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state)));
       uint32_t max_waiter_delta = decodeMaxWaitersDelta(state);
-      uint32_t new_state = encode(
-              (turn + 1) << kTurnShift,
-              max_waiter_delta == 0 ? 0 : max_waiter_delta - 1);
+      uint32_t new_state =
+          encode((turn + 1) << kTurnShift,
+                 max_waiter_delta == 0 ? 0 : max_waiter_delta - 1);
       if (state_.compare_exchange_strong(state, new_state)) {
         if (max_waiter_delta != 0) {
           state_.futexWake(std::numeric_limits<int>::max(),
@@ -227,9 +241,7 @@ struct TurnSequencer {
 
   /// Returns the bitmask to pass futexWait or futexWake when communicating
   /// about the specified turn
-  int futexChannel(uint32_t turn) const noexcept {
-    return 1 << (turn & 31);
-  }
+  int futexChannel(uint32_t turn) const noexcept { return 1 << (turn & 31); }
 
   uint32_t decodeCurrentSturn(uint32_t state) const noexcept {
     return state & ~kWaitersMask;
@@ -240,7 +252,7 @@ struct TurnSequencer {
   }
 
   uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept {
-    return currentSturn | std::min(uint32_t{ kWaitersMask }, maxWaiterD);
+    return currentSturn | std::min(uint32_t{kWaitersMask}, maxWaiterD);
   }
 };
 
index 394b58381043410464be75661d755653bdcd567a..ee09c7a48c74d2cfe9a185b5110979b9a8dcb6e2 100644 (file)
@@ -35,6 +35,14 @@ FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
 using namespace folly;
 using namespace detail;
 using namespace test;
+using std::chrono::time_point;
+using std::chrono::steady_clock;
+using std::chrono::seconds;
+using std::chrono::milliseconds;
+using std::string;
+using std::make_unique;
+using std::unique_ptr;
+using std::vector;
 
 typedef DeterministicSchedule DSched;
 
@@ -61,7 +69,7 @@ void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
   Atom<uint32_t> spinThreshold(0);
 
   int prev = -1;
-  std::vector<std::thread> threads(numThreads);
+  vector<std::thread> threads(numThreads);
   for (int i = 0; i < numThreads; ++i) {
     threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
           numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
@@ -102,6 +110,12 @@ void runElementTypeTest(T&& src) {
   cq.blockingRead(dest);
   EXPECT_TRUE(cq.write(std::move(dest)));
   EXPECT_TRUE(cq.read(dest));
+  auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
+  EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
+  EXPECT_TRUE(cq.read(dest));
+  auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
+  EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
+  EXPECT_TRUE(cq.read(dest));
 }
 
 struct RefCounted {
@@ -132,9 +146,9 @@ void intrusive_ptr_release(RefCounted const* p) {
 
 TEST(MPMCQueue, lots_of_element_types) {
   runElementTypeTest(10);
-  runElementTypeTest(std::string("abc"));
-  runElementTypeTest(std::make_pair(10, std::string("def")));
-  runElementTypeTest(std::vector<std::string>{ { "abc" } });
+  runElementTypeTest(string("abc"));
+  runElementTypeTest(std::make_pair(10, string("def")));
+  runElementTypeTest(vector<string>{{"abc"}});
   runElementTypeTest(std::make_shared<char>('a'));
   runElementTypeTest(folly::make_unique<char>('a'));
   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
@@ -237,7 +251,7 @@ void runTryEnqDeqTest(int numThreads, int numOps) {
   MPMCQueue<int,Atom> cq(numThreads);
 
   uint64_t n = numOps;
-  std::vector<std::thread> threads(numThreads);
+  vector<std::thread> threads(numThreads);
   std::atomic<uint64_t> sum(0);
   for (int t = 0; t < numThreads; ++t) {
     threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
@@ -294,9 +308,59 @@ uint64_t nowMicro() {
 }
 
 template <typename Q>
-std::string producerConsumerBench(Q&& queue, std::string qName,
-                                  int numProducers, int numConsumers,
-                                  int numOps, bool ignoreContents = false) {
+struct WriteMethodCaller {
+  WriteMethodCaller() {}
+  virtual ~WriteMethodCaller() = default;
+  virtual bool callWrite(Q& q, int i) = 0;
+  virtual string methodName() = 0;
+};
+
+template <typename Q>
+struct BlockingWriteCaller : public WriteMethodCaller<Q> {
+  bool callWrite(Q& q, int i) override {
+    q.blockingWrite(i);
+    return true;
+  }
+  string methodName() override { return "blockingWrite"; }
+};
+
+template <typename Q>
+struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
+  bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
+  string methodName() override { return "writeIfNotFull"; }
+};
+
+template <typename Q>
+struct WriteCaller : public WriteMethodCaller<Q> {
+  bool callWrite(Q& q, int i) override { return q.write(i); }
+  string methodName() override { return "write"; }
+};
+
+template <typename Q,
+          class Clock = steady_clock,
+          class Duration = typename Clock::duration>
+struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
+  const Duration duration_;
+  explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
+  bool callWrite(Q& q, int i) override {
+    auto then = Clock::now() + duration_;
+    return q.tryWriteUntil(then, i);
+  }
+  string methodName() override {
+    return folly::sformat(
+        "tryWriteUntil({}ms)",
+        std::chrono::duration_cast<milliseconds>(duration_).count());
+  }
+};
+
+template <typename Q>
+string producerConsumerBench(Q&& queue,
+                             string qName,
+                             int numProducers,
+                             int numConsumers,
+                             int numOps,
+                             WriteMethodCaller<Q>& writer,
+                             bool ignoreContents = false) {
   Q& q = queue;
 
   struct rusage beginUsage;
@@ -306,17 +370,20 @@ std::string producerConsumerBench(Q&& queue, std::string qName,
 
   uint64_t n = numOps;
   std::atomic<uint64_t> sum(0);
+  std::atomic<uint64_t> failed(0);
 
-  std::vector<std::thread> producers(numProducers);
+  vector<std::thread> producers(numProducers);
   for (int t = 0; t < numProducers; ++t) {
     producers[t] = DSched::thread([&,t]{
       for (int i = t; i < numOps; i += numProducers) {
-        q.blockingWrite(i);
+        while (!writer.callWrite(q, i)) {
+          ++failed;
+        }
       }
     });
   }
 
-  std::vector<std::thread> consumers(numConsumers);
+  vector<std::thread> consumers(numConsumers);
   for (int t = 0; t < numConsumers; ++t) {
     consumers[t] = DSched::thread([&,t]{
       uint64_t localSum = 0;
@@ -348,27 +415,76 @@ std::string producerConsumerBench(Q&& queue, std::string qName,
   uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
+  uint64_t failures = failed;
 
-  return folly::format(
-      "{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff",
-      qName, numProducers, numConsumers, nanosPer, csw, n).str();
+  return folly::sformat(
+      "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
+      "handoff, {} failures",
+      qName,
+      numProducers,
+      writer.methodName(),
+      numConsumers,
+      nanosPer,
+      csw,
+      n,
+      failures);
 }
 
-
 TEST(MPMCQueue, mt_prod_cons_deterministic) {
   // we use the Bench method, but perf results are meaningless under DSched
   DSched sched(DSched::uniform(0));
 
-  producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
-          "", 1, 1, 1000);
-  producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
-          "", 10, 10, 1000);
-  producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10),
-          "", 1, 1, 1000);
-  producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100),
-          "", 10, 10, 1000);
-  producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1),
-          "", 10, 10, 1000);
+  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
+      callers;
+  callers.emplace_back(
+      make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
+  callers.emplace_back(
+      make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
+  callers.emplace_back(
+      make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
+  callers.emplace_back(
+      make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
+          milliseconds(1)));
+  callers.emplace_back(
+      make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
+          seconds(2)));
+
+  for (const auto& caller : callers) {
+    LOG(INFO)
+        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
+                                 "MPMCQueue<int, DeterministicAtomic>(10)",
+                                 1,
+                                 1,
+                                 1000,
+                                 *caller);
+    LOG(INFO)
+        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
+                                 "MPMCQueue<int, DeterministicAtomic>(100)",
+                                 10,
+                                 10,
+                                 1000,
+                                 *caller);
+    LOG(INFO)
+        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
+                                 "MPMCQueue<int, DeterministicAtomic>(10)",
+                                 1,
+                                 1,
+                                 1000,
+                                 *caller);
+    LOG(INFO)
+        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
+                                 "MPMCQueue<int, DeterministicAtomic>(100)",
+                                 10,
+                                 10,
+                                 1000,
+                                 *caller);
+    LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
+                                       "MPMCQueue<int, DeterministicAtomic>(1)",
+                                       10,
+                                       10,
+                                       1000,
+                                       *caller);
+  }
 }
 
 #define PC_BENCH(q, np, nc, ...) \
@@ -376,38 +492,71 @@ TEST(MPMCQueue, mt_prod_cons_deterministic) {
 
 TEST(MPMCQueue, mt_prod_cons) {
   int n = 100000;
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n);
-  LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
+  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
+  callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
+  callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
+  callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
+  callers.emplace_back(
+      make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
+  callers.emplace_back(
+      make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
+  for (const auto& caller : callers) {
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
+  }
 }
 
 TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
   int n = 100000;
-  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 1, n);
-  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 1, n);
-  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 10, n);
-  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 10, n);
-  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 1, n);
-  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 1, n);
-  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 10, n);
-  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 10, n);
-  LOG(INFO)
-    << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(100000)), 32, 100, n);
-}
-
-template <template<typename> class Atom>
-void runNeverFailThread(
-    int numThreads,
-    int n, /*numOps*/
-    MPMCQueue<int, Atom>& cq,
-    std::atomic<uint64_t>& sum,
-    int t) {
+  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
+      callers;
+  callers.emplace_back(
+      make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
+  callers.emplace_back(
+      make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
+  callers.emplace_back(
+      make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
+  callers.emplace_back(
+      make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
+          milliseconds(1)));
+  callers.emplace_back(
+      make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
+          seconds(2)));
+  for (const auto& caller : callers) {
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
+    LOG(INFO) << PC_BENCH(
+        (MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
+  }
+}
+
+template <template <typename> class Atom>
+void runNeverFailThread(int numThreads,
+                        int n, /*numOps*/
+                        MPMCQueue<int, Atom>& cq,
+                        std::atomic<uint64_t>& sum,
+                        int t) {
   uint64_t threadSum = 0;
   for (int i = t; i < n; i += numThreads) {
     // enq + deq
@@ -421,19 +570,23 @@ void runNeverFailThread(
   sum += threadSum;
 }
 
-template <template<typename> class Atom>
+template <template <typename> class Atom>
 uint64_t runNeverFailTest(int numThreads, int numOps) {
   // always #enq >= #deq
-  MPMCQueue<int,Atom> cq(numThreads);
+  MPMCQueue<int, Atom> cq(numThreads);
 
   uint64_t n = numOps;
   auto beginMicro = nowMicro();
 
-  std::vector<std::thread> threads(numThreads);
+  vector<std::thread> threads(numThreads);
   std::atomic<uint64_t> sum(0);
   for (int t = 0; t < numThreads; ++t) {
     threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
-          numThreads, n, std::ref(cq), std::ref(sum), t));
+                                          numThreads,
+                                          n,
+                                          std::ref(cq),
+                                          std::ref(sum),
+                                          t));
   }
   for (auto& t : threads) {
     DSched::join(t);
@@ -445,29 +598,29 @@ uint64_t runNeverFailTest(int numThreads, int numOps) {
 }
 
 TEST(MPMCQueue, mt_never_fail) {
-  int nts[] = { 1, 3, 100 };
+  int nts[] = {1, 3, 100};
 
   int n = 100000;
   for (int nt : nts) {
     uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
-    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
-              << nt << " threads";
+    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+              << " threads";
   }
 }
 
 TEST(MPMCQueue, mt_never_fail_emulated_futex) {
-  int nts[] = { 1, 3, 100 };
+  int nts[] = {1, 3, 100};
 
   int n = 100000;
   for (int nt : nts) {
     uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
-    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
-              << nt << " threads";
+    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+              << " threads";
   }
 }
 
 TEST(MPMCQueue, mt_never_fail_deterministic) {
-  int nts[] = { 3, 10 };
+  int nts[] = {3, 10};
 
   long seed = 0; // nowMicro() % 10000;
   LOG(INFO) << "using seed " << seed;
@@ -485,6 +638,77 @@ TEST(MPMCQueue, mt_never_fail_deterministic) {
   }
 }
 
+template <class Clock, template <typename> class Atom>
+void runNeverFailUntilThread(int numThreads,
+                             int n, /*numOps*/
+                             MPMCQueue<int, Atom>& cq,
+                             std::atomic<uint64_t>& sum,
+                             int t) {
+  uint64_t threadSum = 0;
+  for (int i = t; i < n; i += numThreads) {
+    // enq + deq
+    auto soon = Clock::now() + std::chrono::seconds(1);
+    EXPECT_TRUE(cq.tryWriteUntil(soon, i));
+
+    int dest = -1;
+    EXPECT_TRUE(cq.readIfNotEmpty(dest));
+    EXPECT_TRUE(dest >= 0);
+    threadSum += dest;
+  }
+  sum += threadSum;
+}
+
+template <class Clock, template <typename> class Atom>
+uint64_t runNeverFailTest(int numThreads, int numOps) {
+  // always #enq >= #deq
+  MPMCQueue<int, Atom> cq(numThreads);
+
+  uint64_t n = numOps;
+  auto beginMicro = nowMicro();
+
+  vector<std::thread> threads(numThreads);
+  std::atomic<uint64_t> sum(0);
+  for (int t = 0; t < numThreads; ++t) {
+    threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
+                                          numThreads,
+                                          n,
+                                          std::ref(cq),
+                                          std::ref(sum),
+                                          t));
+  }
+  for (auto& t : threads) {
+    DSched::join(t);
+  }
+  EXPECT_TRUE(cq.isEmpty());
+  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
+
+  return nowMicro() - beginMicro;
+}
+
+TEST(MPMCQueue, mt_never_fail_until_system) {
+  int nts[] = {1, 3, 100};
+
+  int n = 100000;
+  for (int nt : nts) {
+    uint64_t elapsed =
+        runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
+    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+              << " threads";
+  }
+}
+
+TEST(MPMCQueue, mt_never_fail_until_steady) {
+  int nts[] = {1, 3, 100};
+
+  int n = 100000;
+  for (int nt : nts) {
+    uint64_t elapsed =
+        runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
+    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
+              << " threads";
+  }
+}
+
 enum LifecycleEvent {
   NOTHING = -1,
   DEFAULT_CONSTRUCTOR,