Add timed_wait and try_wait to Baton
authorSarang Masti <mssarang@fb.com>
Thu, 31 Jul 2014 00:45:16 +0000 (17:45 -0700)
committerSara Golemon <sgolemon@fb.com>
Thu, 14 Aug 2014 18:49:04 +0000 (11:49 -0700)
Summary:
This diff adds timed_wait that allows waiting on the Baton
with a timeout. The diff also adds try_wait which doesn't
block the thread at all.

Test Plan:
-- added new unit tests
-- ran all folly tests

Reviewed By: ngbronson@fb.com

Subscribers: bwatling

FB internal diff: D1468909

Tasks: 4679428

folly/Baton.h
folly/test/BatonTest.cpp

index 86b2e013d0caf6785ec71bb2473658829dc0889e..c87a086e8a3c47b5ca1080cf771e1c46be8e686f 100644 (file)
@@ -103,15 +103,23 @@ struct Baton : boost::noncopyable {
   /// allows us to have better assert-ions during debug builds.
   void post() {
     uint32_t before = state_.load(std::memory_order_acquire);
-    assert(before == INIT || before == WAITING);
-    if (before != INIT ||
-        !state_.compare_exchange_strong(before, EARLY_DELIVERY)) {
-      // we didn't get to state_ before wait(), so we need to call futex()
-      assert(before == WAITING);
-
-      state_.store(LATE_DELIVERY, std::memory_order_release);
-      state_.futexWake(1);
+
+    assert(before == INIT || before == WAITING || before == TIMED_OUT);
+
+    if (before == INIT &&
+        state_.compare_exchange_strong(before, EARLY_DELIVERY)) {
+      return;
+    }
+
+    assert(before == WAITING || before == TIMED_OUT);
+
+    if (before == TIMED_OUT) {
+      return;
     }
+
+    assert(before == WAITING);
+    state_.store(LATE_DELIVERY, std::memory_order_release);
+    state_.futexWake(1);
   }
 
   /// Waits until post() has been called in the current Baton lifetime.
@@ -124,30 +132,16 @@ struct Baton : boost::noncopyable {
   /// but by making this condition very restrictive we can provide better
   /// checking in debug builds.
   void wait() {
-    uint32_t before;
-
-    static_assert(PreBlockAttempts > 0,
-        "isn't this assert clearer than an uninitialized variable warning?");
-    for (int i = 0; i < PreBlockAttempts; ++i) {
-      before = state_.load(std::memory_order_acquire);
-      if (before == EARLY_DELIVERY) {
-        // hooray!
-        return;
-      }
-      assert(before == INIT);
-#if FOLLY_X64
-      // The pause instruction is the polite way to spin, but it doesn't
-      // actually affect correctness to omit it if we don't have it.
-      // Pausing donates the full capabilities of the current core to
-      // its other hyperthreads for a dozen cycles or so
-      asm volatile ("pause");
-#endif
+    if (spinWaitForEarlyDelivery()) {
+      assert(state_.load(std::memory_order_acquire) == EARLY_DELIVERY);
+      return;
     }
 
     // guess we have to block :(
-    if (!state_.compare_exchange_strong(before, WAITING)) {
+    uint32_t expected = INIT;
+    if (!state_.compare_exchange_strong(expected, WAITING)) {
       // CAS failed, last minute reprieve
-      assert(before == EARLY_DELIVERY);
+      assert(expected == EARLY_DELIVERY);
       return;
     }
 
@@ -181,12 +175,70 @@ struct Baton : boost::noncopyable {
     }
   }
 
+  /// Similar to wait, but with a timeout. The thread is unblocked if the
+  /// timeout expires.
+  /// Note: Only a single call to timed_wait/wait is allowed during a baton's
+  /// life-cycle (from construction/reset to destruction/reset). In other
+  /// words, after timed_wait the caller can't invoke wait/timed_wait/try_wait
+  /// again on the same baton without resetting it.
+  ///
+  /// @param  deadline      Time until which the thread can block
+  /// @return               true if the baton was posted to before timeout,
+  ///                       false otherwise
+  template <typename Clock, typename Duration = typename Clock::duration>
+  bool timed_wait(const std::chrono::time_point<Clock,Duration>& deadline) {
+    if (spinWaitForEarlyDelivery()) {
+      assert(state_.load(std::memory_order_acquire) == EARLY_DELIVERY);
+      return true;
+    }
+
+    // guess we have to block :(
+    uint32_t expected = INIT;
+    if (!state_.compare_exchange_strong(expected, WAITING)) {
+      // CAS failed, last minute reprieve
+      assert(expected == EARLY_DELIVERY);
+      return true;
+    }
+
+    while (true) {
+      auto rv = state_.futexWaitUntil(WAITING, deadline);
+      if (rv == folly::detail::FutexResult::TIMEDOUT) {
+        state_.store(TIMED_OUT, std::memory_order_release);
+        return false;
+      }
+
+      uint32_t s = state_.load(std::memory_order_acquire);
+      assert(s == WAITING || s == LATE_DELIVERY);
+      if (s == LATE_DELIVERY) {
+        return true;
+      }
+    }
+  }
+
+  /// Similar to wait, but doesn't block the thread if it hasn't been posted.
+  ///
+  /// try_wait has the following semantics:
+  /// - It is ok to call try_wait any number times on the same baton until
+  ///   try_wait reports that the baton has been posted.
+  /// - It is ok to call timed_wait or wait on the same baton if try_wait
+  ///   reports that baton hasn't been posted.
+  /// - If try_wait indicates that the baton has been posted, it is invalid to
+  ///   call wait, try_wait or timed_wait on the same baton without resetting
+  ///
+  /// @return       true if baton has been posted, false othewise
+  bool try_wait() {
+    auto s = state_.load(std::memory_order_acquire);
+    assert(s == INIT || s == EARLY_DELIVERY);
+    return s == EARLY_DELIVERY;
+  }
+
  private:
   enum State : uint32_t {
     INIT = 0,
     EARLY_DELIVERY = 1,
     WAITING = 2,
     LATE_DELIVERY = 3,
+    TIMED_OUT = 4
   };
 
   enum {
@@ -206,6 +258,33 @@ struct Baton : boost::noncopyable {
     PreBlockAttempts = 300,
   };
 
+  // Spin for "some time" (see discussion on PreBlockAttempts) waiting
+  // for a post.
+  //
+  // @return       true if we received an early delivery during the wait,
+  //               false otherwise. If the function returns true then
+  //               state_ is guaranteed to be EARLY_DELIVERY
+  bool spinWaitForEarlyDelivery() {
+
+    static_assert(PreBlockAttempts > 0,
+        "isn't this assert clearer than an uninitialized variable warning?");
+    for (int i = 0; i < PreBlockAttempts; ++i) {
+      if (try_wait()) {
+        // hooray!
+        return true;
+      }
+#if FOLLY_X64
+      // The pause instruction is the polite way to spin, but it doesn't
+      // actually affect correctness to omit it if we don't have it.
+      // Pausing donates the full capabilities of the current core to
+      // its other hyperthreads for a dozen cycles or so
+      asm volatile ("pause");
+#endif
+    }
+
+    return false;
+  }
+
   detail::Futex<Atom> state_;
 };
 
index ede45327096babb4baa01f1e2da5bb32ae3b93bf..f214dd6907acdad748644cbc2b773fcb4817fbb7 100644 (file)
@@ -83,6 +83,80 @@ BENCHMARK(posix_sem_pingpong, iters) {
   thr.join();
 }
 
+template <template<typename> class Atom>
+void run_basic_timed_wait_tests() {
+  Baton<Atom> b;
+  b.post();
+  // tests if early delivery works fine
+  EXPECT_TRUE(b.timed_wait(std::chrono::system_clock::now()));
+}
+
+template <template<typename> class Atom>
+void run_timed_wait_tmo_tests() {
+  Baton<Atom> b;
+
+  auto thr = DSched::thread([&]{
+    bool rv = b.timed_wait(std::chrono::system_clock::now() +
+                           std::chrono::milliseconds(1));
+    // main thread is guaranteed to not post until timeout occurs
+    EXPECT_FALSE(rv);
+  });
+  DSched::join(thr);
+}
+
+template <template<typename> class Atom>
+void run_timed_wait_regular_test() {
+  Baton<Atom> b;
+
+  auto thr = DSched::thread([&] {
+    bool rv = b.timed_wait(
+                std::chrono::time_point<std::chrono::system_clock>::max());
+    if (std::is_same<Atom<int>, std::atomic<int>>::value) {
+      // We can only ensure this for std::atomic
+      EXPECT_TRUE(rv);
+    }
+  });
+
+  if (std::is_same<Atom<int>, std::atomic<int>>::value) {
+    // If we are using std::atomic, then a sleep here guarantees to a large
+    // extent that 'thr' will execute wait before we post it, thus testing
+    // late delivery. For DeterministicAtomic, we just rely on
+    // DeterministicSchedule to do the scheduling
+    std::this_thread::sleep_for(std::chrono::milliseconds(2));
+  }
+
+  b.post();
+  DSched::join(thr);
+}
+
+TEST(Baton, timed_wait_basic) {
+  run_basic_timed_wait_tests<std::atomic>();
+  run_basic_timed_wait_tests<DeterministicAtomic>();
+}
+
+TEST(Baton, timed_wait_timeout) {
+  run_timed_wait_tmo_tests<std::atomic>();
+  run_timed_wait_tmo_tests<DeterministicAtomic>();
+}
+
+TEST(Baton, timed_wait) {
+  run_timed_wait_regular_test<std::atomic>();
+  run_timed_wait_regular_test<DeterministicAtomic>();
+}
+
+template <template<typename> class Atom>
+void run_try_wait_tests() {
+  Baton<Atom> b;
+  EXPECT_FALSE(b.try_wait());
+  b.post();
+  EXPECT_TRUE(b.try_wait());
+}
+
+TEST(Baton, try_wait) {
+  run_try_wait_tests<std::atomic>();
+  run_try_wait_tests<DeterministicAtomic>();
+}
+
 // I am omitting a benchmark result snapshot because these microbenchmarks
 // mainly illustrate that PreBlockAttempts is very effective for rapid
 // handoffs.  The performance of Baton and sem_t is essentially identical