/// allows us to have better assert-ions during debug builds.
void post() {
uint32_t before = state_.load(std::memory_order_acquire);
- assert(before == INIT || before == WAITING);
- if (before != INIT ||
- !state_.compare_exchange_strong(before, EARLY_DELIVERY)) {
- // we didn't get to state_ before wait(), so we need to call futex()
- assert(before == WAITING);
-
- state_.store(LATE_DELIVERY, std::memory_order_release);
- state_.futexWake(1);
+
+ assert(before == INIT || before == WAITING || before == TIMED_OUT);
+
+ if (before == INIT &&
+ state_.compare_exchange_strong(before, EARLY_DELIVERY)) {
+ return;
+ }
+
+ assert(before == WAITING || before == TIMED_OUT);
+
+ if (before == TIMED_OUT) {
+ return;
}
+
+ assert(before == WAITING);
+ state_.store(LATE_DELIVERY, std::memory_order_release);
+ state_.futexWake(1);
}
/// Waits until post() has been called in the current Baton lifetime.
/// but by making this condition very restrictive we can provide better
/// checking in debug builds.
void wait() {
- uint32_t before;
-
- static_assert(PreBlockAttempts > 0,
- "isn't this assert clearer than an uninitialized variable warning?");
- for (int i = 0; i < PreBlockAttempts; ++i) {
- before = state_.load(std::memory_order_acquire);
- if (before == EARLY_DELIVERY) {
- // hooray!
- return;
- }
- assert(before == INIT);
-#if FOLLY_X64
- // The pause instruction is the polite way to spin, but it doesn't
- // actually affect correctness to omit it if we don't have it.
- // Pausing donates the full capabilities of the current core to
- // its other hyperthreads for a dozen cycles or so
- asm volatile ("pause");
-#endif
+ if (spinWaitForEarlyDelivery()) {
+ assert(state_.load(std::memory_order_acquire) == EARLY_DELIVERY);
+ return;
}
// guess we have to block :(
- if (!state_.compare_exchange_strong(before, WAITING)) {
+ uint32_t expected = INIT;
+ if (!state_.compare_exchange_strong(expected, WAITING)) {
// CAS failed, last minute reprieve
- assert(before == EARLY_DELIVERY);
+ assert(expected == EARLY_DELIVERY);
return;
}
}
}
+ /// Similar to wait, but with a timeout. The thread is unblocked if the
+ /// timeout expires.
+ /// Note: Only a single call to timed_wait/wait is allowed during a baton's
+ /// life-cycle (from construction/reset to destruction/reset). In other
+ /// words, after timed_wait the caller can't invoke wait/timed_wait/try_wait
+ /// again on the same baton without resetting it.
+ ///
+ /// @param deadline Time until which the thread can block
+ /// @return true if the baton was posted to before timeout,
+ /// false otherwise
+ template <typename Clock, typename Duration = typename Clock::duration>
+ bool timed_wait(const std::chrono::time_point<Clock,Duration>& deadline) {
+ if (spinWaitForEarlyDelivery()) {
+ assert(state_.load(std::memory_order_acquire) == EARLY_DELIVERY);
+ return true;
+ }
+
+ // guess we have to block :(
+ uint32_t expected = INIT;
+ if (!state_.compare_exchange_strong(expected, WAITING)) {
+ // CAS failed, last minute reprieve
+ assert(expected == EARLY_DELIVERY);
+ return true;
+ }
+
+ while (true) {
+ auto rv = state_.futexWaitUntil(WAITING, deadline);
+ if (rv == folly::detail::FutexResult::TIMEDOUT) {
+ state_.store(TIMED_OUT, std::memory_order_release);
+ return false;
+ }
+
+ uint32_t s = state_.load(std::memory_order_acquire);
+ assert(s == WAITING || s == LATE_DELIVERY);
+ if (s == LATE_DELIVERY) {
+ return true;
+ }
+ }
+ }
+
+ /// Similar to wait, but doesn't block the thread if it hasn't been posted.
+ ///
+ /// try_wait has the following semantics:
+ /// - It is ok to call try_wait any number times on the same baton until
+ /// try_wait reports that the baton has been posted.
+ /// - It is ok to call timed_wait or wait on the same baton if try_wait
+ /// reports that baton hasn't been posted.
+ /// - If try_wait indicates that the baton has been posted, it is invalid to
+ /// call wait, try_wait or timed_wait on the same baton without resetting
+ ///
+ /// @return true if baton has been posted, false othewise
+ bool try_wait() {
+ auto s = state_.load(std::memory_order_acquire);
+ assert(s == INIT || s == EARLY_DELIVERY);
+ return s == EARLY_DELIVERY;
+ }
+
private:
enum State : uint32_t {
INIT = 0,
EARLY_DELIVERY = 1,
WAITING = 2,
LATE_DELIVERY = 3,
+ TIMED_OUT = 4
};
enum {
PreBlockAttempts = 300,
};
+ // Spin for "some time" (see discussion on PreBlockAttempts) waiting
+ // for a post.
+ //
+ // @return true if we received an early delivery during the wait,
+ // false otherwise. If the function returns true then
+ // state_ is guaranteed to be EARLY_DELIVERY
+ bool spinWaitForEarlyDelivery() {
+
+ static_assert(PreBlockAttempts > 0,
+ "isn't this assert clearer than an uninitialized variable warning?");
+ for (int i = 0; i < PreBlockAttempts; ++i) {
+ if (try_wait()) {
+ // hooray!
+ return true;
+ }
+#if FOLLY_X64
+ // The pause instruction is the polite way to spin, but it doesn't
+ // actually affect correctness to omit it if we don't have it.
+ // Pausing donates the full capabilities of the current core to
+ // its other hyperthreads for a dozen cycles or so
+ asm volatile ("pause");
+#endif
+ }
+
+ return false;
+ }
+
detail::Futex<Atom> state_;
};
thr.join();
}
+template <template<typename> class Atom>
+void run_basic_timed_wait_tests() {
+ Baton<Atom> b;
+ b.post();
+ // tests if early delivery works fine
+ EXPECT_TRUE(b.timed_wait(std::chrono::system_clock::now()));
+}
+
+template <template<typename> class Atom>
+void run_timed_wait_tmo_tests() {
+ Baton<Atom> b;
+
+ auto thr = DSched::thread([&]{
+ bool rv = b.timed_wait(std::chrono::system_clock::now() +
+ std::chrono::milliseconds(1));
+ // main thread is guaranteed to not post until timeout occurs
+ EXPECT_FALSE(rv);
+ });
+ DSched::join(thr);
+}
+
+template <template<typename> class Atom>
+void run_timed_wait_regular_test() {
+ Baton<Atom> b;
+
+ auto thr = DSched::thread([&] {
+ bool rv = b.timed_wait(
+ std::chrono::time_point<std::chrono::system_clock>::max());
+ if (std::is_same<Atom<int>, std::atomic<int>>::value) {
+ // We can only ensure this for std::atomic
+ EXPECT_TRUE(rv);
+ }
+ });
+
+ if (std::is_same<Atom<int>, std::atomic<int>>::value) {
+ // If we are using std::atomic, then a sleep here guarantees to a large
+ // extent that 'thr' will execute wait before we post it, thus testing
+ // late delivery. For DeterministicAtomic, we just rely on
+ // DeterministicSchedule to do the scheduling
+ std::this_thread::sleep_for(std::chrono::milliseconds(2));
+ }
+
+ b.post();
+ DSched::join(thr);
+}
+
+TEST(Baton, timed_wait_basic) {
+ run_basic_timed_wait_tests<std::atomic>();
+ run_basic_timed_wait_tests<DeterministicAtomic>();
+}
+
+TEST(Baton, timed_wait_timeout) {
+ run_timed_wait_tmo_tests<std::atomic>();
+ run_timed_wait_tmo_tests<DeterministicAtomic>();
+}
+
+TEST(Baton, timed_wait) {
+ run_timed_wait_regular_test<std::atomic>();
+ run_timed_wait_regular_test<DeterministicAtomic>();
+}
+
+template <template<typename> class Atom>
+void run_try_wait_tests() {
+ Baton<Atom> b;
+ EXPECT_FALSE(b.try_wait());
+ b.post();
+ EXPECT_TRUE(b.try_wait());
+}
+
+TEST(Baton, try_wait) {
+ run_try_wait_tests<std::atomic>();
+ run_try_wait_tests<DeterministicAtomic>();
+}
+
// I am omitting a benchmark result snapshot because these microbenchmarks
// mainly illustrate that PreBlockAttempts is very effective for rapid
// handoffs. The performance of Baton and sem_t is essentially identical