Add Baton variants with multiple posters and with a non-blocking waiter
authorMaged Michael <magedmichael@fb.com>
Fri, 17 Feb 2017 13:13:28 +0000 (05:13 -0800)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Fri, 17 Feb 2017 13:21:19 +0000 (05:21 -0800)
Summary:
Added two variants of Baton by adding two Boolean template parameters with defaults that correspond to the original single poster blocking waiter version:
- SinglePoster (single poster vs multiple posters). Default true.
- Blocking (vs spinning). Default true.

The multi-poster baton (SinglePoster = false) can be handed over by multiple concurrent posters, where redundant post()-s are idempotent but if a post() arrives after the baton is reset() then it unblocks the waiter.

The nonblocking Baton (Blocking = false) has faster common case post(), by avoiding CAS in the critical path. It does so by disallowing waiter blocking and timed_wait().

Reviewed By: nbronson

Differential Revision: D4506728

fbshipit-source-id: e5b65ad93e443fbba22164319c985ebc8424554a

folly/Baton.h
folly/test/BatonBenchmark.cpp
folly/test/BatonTest.cpp
folly/test/BatonTestHelpers.h

index e0f66951dfc185e25a4adefc1e155c0730d92e78..833239153d8567fb356258c42f484db3ee8b0343 100644 (file)
 
 #pragma once
 
+#include <assert.h>
+#include <errno.h>
 #include <stdint.h>
 #include <atomic>
-#include <errno.h>
-#include <assert.h>
+#include <thread>
 
 #include <folly/detail/Futex.h>
 #include <folly/detail/MemoryIdler.h>
 
 namespace folly {
 
-/// A Baton allows a thread to block once and be awoken: it captures
-/// a single handoff.  During its lifecycle (from construction/reset to
-/// destruction/reset) a baton must either be post()ed and wait()ed exactly
-/// once each, or not at all.
+/// A Baton allows a thread to block once and be awoken. The single
+/// poster version (with SinglePoster == true) captures a single
+/// handoff, and during its lifecycle (from construction/reset to
+/// destruction/reset) a baton must either be post()ed and wait()ed
+/// exactly once each, or not at all.
+///
+/// The multi-poster version (SinglePoster == false) allows multiple
+/// concurrent handoff attempts, the first of which completes the
+/// handoff and the rest if any are idempotent.
 ///
 /// Baton includes no internal padding, and is only 4 bytes in size.
 /// Any alignment or padding to avoid false sharing is up to the user.
 ///
-/// This is basically a stripped-down semaphore that supports only a
-/// single call to sem_post and a single call to sem_wait.  The current
-/// posix semaphore sem_t isn't too bad, but this provides more a bit more
-/// speed, inlining, smaller size, a guarantee that the implementation
-/// won't change, and compatibility with DeterministicSchedule.  By having
-/// a much more restrictive lifecycle we can also add a bunch of assertions
-/// that can help to catch race conditions ahead of time.
-template <template<typename> class Atom = std::atomic>
+/// This is basically a stripped-down semaphore that supports (only a
+/// single call to sem_post, when SinglePoster == true) and a single
+/// call to sem_wait.
+///
+/// The non-blocking version (Blocking == false) provides more speed
+/// by using only load acquire and store release operations in the
+/// critical path, at the cost of disallowing blocking and timing out.
+///
+/// The current posix semaphore sem_t isn't too bad, but this provides
+/// more a bit more speed, inlining, smaller size, a guarantee that
+/// the implementation won't change, and compatibility with
+/// DeterministicSchedule.  By having a much more restrictive
+/// lifecycle we can also add a bunch of assertions that can help to
+/// catch race conditions ahead of time.
+template <
+    template <typename> class Atom = std::atomic,
+    bool SinglePoster = true, //  single vs multiple posters
+    bool Blocking = true> // blocking vs spinning
 struct Baton {
   constexpr Baton() : state_(INIT) {}
 
@@ -96,32 +112,82 @@ struct Baton {
   }
 
   /// Causes wait() to wake up.  For each lifetime of a Baton (where a
-  /// lifetime starts at construction or reset() and ends at destruction
-  /// or reset()) there can be at most one call to post().  Any thread
-  /// may call post().
-  ///
-  /// Although we could implement a more generic semaphore semantics
-  /// without any extra size or CPU overhead, the single-call limitation
-  /// allows us to have better assert-ions during debug builds.
+  /// lifetime starts at construction or reset() and ends at
+  /// destruction or reset()) there can be at most one call to post(),
+  /// in the single poster version.  Any thread may call post().
   void post() {
-    uint32_t before = state_.load(std::memory_order_acquire);
-
-    assert(before == INIT || before == WAITING || before == TIMED_OUT);
-
-    if (before == INIT &&
-        state_.compare_exchange_strong(before, EARLY_DELIVERY)) {
+    if (!Blocking) {
+      /// Non-blocking version
+      ///
+      assert([&] {
+        auto state = state_.load(std::memory_order_relaxed);
+        return (state == INIT || state == EARLY_DELIVERY);
+      }());
+      state_.store(EARLY_DELIVERY, std::memory_order_release);
       return;
     }
 
-    assert(before == WAITING || before == TIMED_OUT);
+    /// Blocking versions
+    ///
+    if (SinglePoster) {
+      /// Single poster version
+      ///
+      uint32_t before = state_.load(std::memory_order_acquire);
 
-    if (before == TIMED_OUT) {
-      return;
-    }
+      assert(before == INIT || before == WAITING || before == TIMED_OUT);
 
-    assert(before == WAITING);
-    state_.store(LATE_DELIVERY, std::memory_order_release);
-    state_.futexWake(1);
+      if (before == INIT &&
+          state_.compare_exchange_strong(before, EARLY_DELIVERY)) {
+        return;
+      }
+
+      assert(before == WAITING || before == TIMED_OUT);
+
+      if (before == TIMED_OUT) {
+        return;
+      }
+
+      assert(before == WAITING);
+      state_.store(LATE_DELIVERY, std::memory_order_release);
+      state_.futexWake(1);
+    } else {
+      /// Multi-poster version
+      ///
+      while (true) {
+        uint32_t before = state_.load(std::memory_order_acquire);
+
+        if (before == INIT &&
+            state_.compare_exchange_strong(before, EARLY_DELIVERY)) {
+          return;
+        }
+
+        if (before == TIMED_OUT) {
+          return;
+        }
+
+        if (before == EARLY_DELIVERY || before == LATE_DELIVERY) {
+          // The reason for not simply returning (without the following
+          // atomic operation) is to avoid the following case:
+          //
+          //  T1:             T2:             T3:
+          //  local1.post();  local2.post();  global.wait();
+          //  global.post();  global.post();  local1.try_wait() == true;
+          //                                  local2.try_wait() == false;
+          //
+          if (state_.fetch_add(0) != before) {
+            continue;
+          }
+          return;
+        }
+
+        assert(before == WAITING);
+        if (!state_.compare_exchange_weak(before, LATE_DELIVERY)) {
+          continue;
+        }
+        state_.futexWake(1);
+        return;
+      }
+    }
   }
 
   /// Waits until post() has been called in the current Baton lifetime.
@@ -139,6 +205,13 @@ struct Baton {
       return;
     }
 
+    if (!Blocking) {
+      while (!try_wait()) {
+        std::this_thread::yield();
+      }
+      return;
+    }
+
     // guess we have to block :(
     uint32_t expected = INIT;
     if (!state_.compare_exchange_strong(expected, WAITING)) {
@@ -189,6 +262,8 @@ struct Baton {
   ///                       false otherwise
   template <typename Clock, typename Duration = typename Clock::duration>
   bool timed_wait(const std::chrono::time_point<Clock,Duration>& deadline) {
+    static_assert(Blocking, "Non-blocking Baton does not support timed wait.");
+
     if (spinWaitForEarlyDelivery()) {
       assert(state_.load(std::memory_order_acquire) == EARLY_DELIVERY);
       return true;
@@ -235,7 +310,7 @@ struct Baton {
   ///   call wait, try_wait or timed_wait on the same baton without resetting
   ///
   /// @return       true if baton has been posted, false othewise
-  bool try_wait() {
+  bool try_wait() const {
     auto s = state_.load(std::memory_order_acquire);
     assert(s == INIT || s == EARLY_DELIVERY);
     return s == EARLY_DELIVERY;
index e4ba1fa50f868879b451afc3569bc8ecca9b1990..11fb747c15855adf0f38eaf485b8b3a94757290b 100644 (file)
@@ -31,12 +31,42 @@ using folly::detail::EmulatedFutexAtomic;
 
 typedef DeterministicSchedule DSched;
 
-BENCHMARK(baton_pingpong, iters) { run_pingpong_test<std::atomic>(iters); }
+BENCHMARK(baton_pingpong_single_poster_blocking, iters) {
+  run_pingpong_test<std::atomic, true, true>(iters);
+}
+
+BENCHMARK(baton_pingpong_multi_poster_blocking, iters) {
+  run_pingpong_test<std::atomic, false, true>(iters);
+}
+
+BENCHMARK(baton_pingpong_single_poster_nonblocking, iters) {
+  run_pingpong_test<std::atomic, true, false>(iters);
+}
+
+BENCHMARK(baton_pingpong_multi_poster_nonblocking, iters) {
+  run_pingpong_test<std::atomic, false, false>(iters);
+}
+
+BENCHMARK_DRAW_LINE()
 
-BENCHMARK(baton_pingpong_emulated_futex, iters) {
-  run_pingpong_test<EmulatedFutexAtomic>(iters);
+BENCHMARK(baton_pingpong_emulated_futex_single_poster_blocking, iters) {
+  run_pingpong_test<EmulatedFutexAtomic, true, true>(iters);
 }
 
+BENCHMARK(baton_pingpong_emulated_futex_multi_poster_blocking, iters) {
+  run_pingpong_test<EmulatedFutexAtomic, false, true>(iters);
+}
+
+BENCHMARK(baton_pingpong_emulated_futex_single_poster_nonblocking, iters) {
+  run_pingpong_test<EmulatedFutexAtomic, true, false>(iters);
+}
+
+BENCHMARK(baton_pingpong_emulated_futex_multi_poster_nonblocking, iters) {
+  run_pingpong_test<EmulatedFutexAtomic, false, false>(iters);
+}
+
+BENCHMARK_DRAW_LINE()
+
 BENCHMARK(posix_sem_pingpong, iters) {
   sem_t sems[3];
   sem_t* a = sems + 0;
index 282e8b985729ccdc748a3ac72ae95b090d2b5a3e..74b717984686f88d8bf83b31ac50f4cb42f68bd9 100644 (file)
@@ -25,56 +25,260 @@ using namespace folly;
 using namespace folly::test;
 using folly::detail::EmulatedFutexAtomic;
 
-TEST(Baton, basic) {
-  Baton<> b;
-  b.post();
-  b.wait();
+/// Basic test
+
+TEST(Baton, basic_single_poster_blocking) {
+  run_basic_test<std::atomic, true, true>();
+  run_basic_test<EmulatedFutexAtomic, true, true>();
+  run_basic_test<DeterministicAtomic, true, true>();
+}
+
+TEST(Baton, basic_single_poster_nonblocking) {
+  run_basic_test<std::atomic, true, false>();
+  run_basic_test<EmulatedFutexAtomic, true, false>();
+  run_basic_test<DeterministicAtomic, true, false>();
+}
+
+TEST(Baton, basic_multi_poster_blocking) {
+  run_basic_test<std::atomic, false, true>();
+}
+
+TEST(Baton, basic_multi_poster_nonblocking) {
+  run_basic_test<std::atomic, false, false>();
+}
+
+/// Ping pong tests
+
+TEST(Baton, pingpong_single_poster_blocking) {
+  DSched sched(DSched::uniform(0));
+
+  run_pingpong_test<DeterministicAtomic, true, true>(1000);
+}
+
+TEST(Baton, pingpong_single_poster_nonblocking) {
+  DSched sched(DSched::uniform(0));
+
+  run_pingpong_test<DeterministicAtomic, true, false>(1000);
 }
 
-TEST(Baton, pingpong) {
+TEST(Baton, pingpong_multi_poster_blocking) {
   DSched sched(DSched::uniform(0));
 
-  run_pingpong_test<DeterministicAtomic>(1000);
+  run_pingpong_test<DeterministicAtomic, false, true>(1000);
+}
+
+TEST(Baton, pingpong_multi_poster_nonblocking) {
+  DSched sched(DSched::uniform(0));
+
+  run_pingpong_test<DeterministicAtomic, false, false>(1000);
+}
+
+/// Timed wait tests - Nonblocking Baton does not support timed_wait()
+
+// Timed wait basic system clock tests
+
+TEST(Baton, timed_wait_basic_system_clock_single_poster) {
+  run_basic_timed_wait_tests<std::atomic, std::chrono::system_clock, true>();
+  run_basic_timed_wait_tests<
+      EmulatedFutexAtomic,
+      std::chrono::system_clock,
+      true>();
+  run_basic_timed_wait_tests<
+      DeterministicAtomic,
+      std::chrono::system_clock,
+      true>();
 }
 
-TEST(Baton, timed_wait_basic_system_clock) {
-  run_basic_timed_wait_tests<std::atomic, std::chrono::system_clock>();
-  run_basic_timed_wait_tests<EmulatedFutexAtomic, std::chrono::system_clock>();
-  run_basic_timed_wait_tests<DeterministicAtomic, std::chrono::system_clock>();
+TEST(Baton, timed_wait_basic_system_clock_multi_poster) {
+  run_basic_timed_wait_tests<std::atomic, std::chrono::system_clock, false>();
+  run_basic_timed_wait_tests<
+      EmulatedFutexAtomic,
+      std::chrono::system_clock,
+      false>();
+  run_basic_timed_wait_tests<
+      DeterministicAtomic,
+      std::chrono::system_clock,
+      false>();
 }
 
-TEST(Baton, timed_wait_timeout_system_clock) {
-  run_timed_wait_tmo_tests<std::atomic, std::chrono::system_clock>();
-  run_timed_wait_tmo_tests<EmulatedFutexAtomic, std::chrono::system_clock>();
-  run_timed_wait_tmo_tests<DeterministicAtomic, std::chrono::system_clock>();
+// Timed wait timeout system clock tests
+
+TEST(Baton, timed_wait_timeout_system_clock_single_poster) {
+  run_timed_wait_tmo_tests<std::atomic, std::chrono::system_clock, true>();
+  run_timed_wait_tmo_tests<
+      EmulatedFutexAtomic,
+      std::chrono::system_clock,
+      true>();
+  run_timed_wait_tmo_tests<
+      DeterministicAtomic,
+      std::chrono::system_clock,
+      true>();
 }
 
-TEST(Baton, timed_wait_system_clock) {
-  run_timed_wait_regular_test<std::atomic, std::chrono::system_clock>();
-  run_timed_wait_regular_test<EmulatedFutexAtomic, std::chrono::system_clock>();
-  run_timed_wait_regular_test<DeterministicAtomic, std::chrono::system_clock>();
+TEST(Baton, timed_wait_timeout_system_clock_multi_poster) {
+  run_timed_wait_tmo_tests<std::atomic, std::chrono::system_clock, false>();
+  run_timed_wait_tmo_tests<
+      EmulatedFutexAtomic,
+      std::chrono::system_clock,
+      false>();
+  run_timed_wait_tmo_tests<
+      DeterministicAtomic,
+      std::chrono::system_clock,
+      false>();
+}
+
+// Timed wait regular system clock tests
+
+TEST(Baton, timed_wait_system_clock_single_poster) {
+  run_timed_wait_regular_test<std::atomic, std::chrono::system_clock, true>();
+  run_timed_wait_regular_test<
+      EmulatedFutexAtomic,
+      std::chrono::system_clock,
+      true>();
+  run_timed_wait_regular_test<
+      DeterministicAtomic,
+      std::chrono::system_clock,
+      true>();
 }
 
-TEST(Baton, timed_wait_basic_steady_clock) {
-  run_basic_timed_wait_tests<std::atomic, std::chrono::steady_clock>();
-  run_basic_timed_wait_tests<EmulatedFutexAtomic, std::chrono::steady_clock>();
-  run_basic_timed_wait_tests<DeterministicAtomic, std::chrono::steady_clock>();
+TEST(Baton, timed_wait_system_clock_multi_poster) {
+  run_timed_wait_regular_test<std::atomic, std::chrono::system_clock, false>();
+  run_timed_wait_regular_test<
+      EmulatedFutexAtomic,
+      std::chrono::system_clock,
+      false>();
+  run_timed_wait_regular_test<
+      DeterministicAtomic,
+      std::chrono::system_clock,
+      false>();
+}
+
+// Timed wait basic steady clock tests
+
+TEST(Baton, timed_wait_basic_steady_clock_single_poster) {
+  run_basic_timed_wait_tests<std::atomic, std::chrono::steady_clock, true>();
+  run_basic_timed_wait_tests<
+      EmulatedFutexAtomic,
+      std::chrono::steady_clock,
+      true>();
+  run_basic_timed_wait_tests<
+      DeterministicAtomic,
+      std::chrono::steady_clock,
+      true>();
+}
+
+TEST(Baton, timed_wait_basic_steady_clock_multi_poster) {
+  run_basic_timed_wait_tests<std::atomic, std::chrono::steady_clock, false>();
+  run_basic_timed_wait_tests<
+      EmulatedFutexAtomic,
+      std::chrono::steady_clock,
+      false>();
+  run_basic_timed_wait_tests<
+      DeterministicAtomic,
+      std::chrono::steady_clock,
+      false>();
+}
+
+// Timed wait timeout steady clock tests
+
+TEST(Baton, timed_wait_timeout_steady_clock_single_poster) {
+  run_timed_wait_tmo_tests<std::atomic, std::chrono::steady_clock, true>();
+  run_timed_wait_tmo_tests<
+      EmulatedFutexAtomic,
+      std::chrono::steady_clock,
+      true>();
+  run_timed_wait_tmo_tests<
+      DeterministicAtomic,
+      std::chrono::steady_clock,
+      true>();
+}
+
+TEST(Baton, timed_wait_timeout_steady_clock_multi_poster) {
+  run_timed_wait_tmo_tests<std::atomic, std::chrono::steady_clock, false>();
+  run_timed_wait_tmo_tests<
+      EmulatedFutexAtomic,
+      std::chrono::steady_clock,
+      false>();
+  run_timed_wait_tmo_tests<
+      DeterministicAtomic,
+      std::chrono::steady_clock,
+      false>();
+}
+
+// Timed wait regular steady clock tests
+
+TEST(Baton, timed_wait_steady_clock_single_poster) {
+  run_timed_wait_regular_test<std::atomic, std::chrono::steady_clock, true>();
+  run_timed_wait_regular_test<
+      EmulatedFutexAtomic,
+      std::chrono::steady_clock,
+      true>();
+  run_timed_wait_regular_test<
+      DeterministicAtomic,
+      std::chrono::steady_clock,
+      true>();
+}
+
+TEST(Baton, timed_wait_steady_clock_multi_poster) {
+  run_timed_wait_regular_test<std::atomic, std::chrono::steady_clock, false>();
+  run_timed_wait_regular_test<
+      EmulatedFutexAtomic,
+      std::chrono::steady_clock,
+      false>();
+  run_timed_wait_regular_test<
+      DeterministicAtomic,
+      std::chrono::steady_clock,
+      false>();
+}
+
+/// Try wait tests
+
+TEST(Baton, try_wait_single_poster_blocking) {
+  run_try_wait_tests<std::atomic, true, true>();
+  run_try_wait_tests<EmulatedFutexAtomic, true, true>();
+  run_try_wait_tests<DeterministicAtomic, true, true>();
+}
+
+TEST(Baton, try_wait_single_poster_nonblocking) {
+  run_try_wait_tests<std::atomic, true, false>();
+  run_try_wait_tests<EmulatedFutexAtomic, true, false>();
+  run_try_wait_tests<DeterministicAtomic, true, false>();
+}
+
+TEST(Baton, try_wait_multi_poster_blocking) {
+  run_try_wait_tests<std::atomic, false, true>();
+  run_try_wait_tests<EmulatedFutexAtomic, false, true>();
+  run_try_wait_tests<DeterministicAtomic, false, true>();
+}
+
+TEST(Baton, try_wait_multi_poster_nonblocking) {
+  run_try_wait_tests<std::atomic, false, false>();
+  run_try_wait_tests<EmulatedFutexAtomic, false, false>();
+  run_try_wait_tests<DeterministicAtomic, false, false>();
+}
+
+/// Multi-producer tests
+
+TEST(Baton, multi_producer_single_poster_blocking) {
+  run_try_wait_tests<std::atomic, true, true>();
+  run_try_wait_tests<EmulatedFutexAtomic, true, true>();
+  run_try_wait_tests<DeterministicAtomic, true, true>();
 }
 
-TEST(Baton, timed_wait_timeout_steady_clock) {
-  run_timed_wait_tmo_tests<std::atomic, std::chrono::steady_clock>();
-  run_timed_wait_tmo_tests<EmulatedFutexAtomic, std::chrono::steady_clock>();
-  run_timed_wait_tmo_tests<DeterministicAtomic, std::chrono::steady_clock>();
+TEST(Baton, multi_producer_single_poster_nonblocking) {
+  run_try_wait_tests<std::atomic, true, false>();
+  run_try_wait_tests<EmulatedFutexAtomic, true, false>();
+  run_try_wait_tests<DeterministicAtomic, true, false>();
 }
 
-TEST(Baton, timed_wait_steady_clock) {
-  run_timed_wait_regular_test<std::atomic, std::chrono::steady_clock>();
-  run_timed_wait_regular_test<EmulatedFutexAtomic, std::chrono::steady_clock>();
-  run_timed_wait_regular_test<DeterministicAtomic, std::chrono::steady_clock>();
+TEST(Baton, multi_producer_multi_poster_blocking) {
+  run_try_wait_tests<std::atomic, false, true>();
+  run_try_wait_tests<EmulatedFutexAtomic, false, true>();
+  run_try_wait_tests<DeterministicAtomic, false, true>();
 }
 
-TEST(Baton, try_wait) {
-  run_try_wait_tests<std::atomic>();
-  run_try_wait_tests<EmulatedFutexAtomic>();
-  run_try_wait_tests<DeterministicAtomic>();
+TEST(Baton, multi_producer_multi_poster_nonblocking) {
+  run_try_wait_tests<std::atomic, false, false>();
+  run_try_wait_tests<EmulatedFutexAtomic, false, false>();
+  run_try_wait_tests<DeterministicAtomic, false, false>();
 }
index cc7a8a47ebc5e132da3a2a3b979c3f49084b8dfa..e4de09df7aed184ee0b4c4ab65f95af494c63f9a 100644 (file)
@@ -25,11 +25,19 @@ namespace test {
 
 typedef DeterministicSchedule DSched;
 
-template <template <typename> class Atom>
+template <template <typename> class Atom, bool SinglePoster, bool Blocking>
+void run_basic_test() {
+  Baton<Atom, SinglePoster, Blocking> b;
+  b.post();
+  b.wait();
+}
+
+template <template <typename> class Atom, bool SinglePoster, bool Blocking>
 void run_pingpong_test(int numRounds) {
-  Baton<Atom> batons[17];
-  Baton<Atom>& a = batons[0];
-  Baton<Atom>& b = batons[16]; // to get it on a different cache line
+  using B = Baton<Atom, SinglePoster, Blocking>;
+  B batons[17];
+  B& a = batons[0];
+  B& b = batons[16]; // to get it on a different cache line
   auto thr = DSched::thread([&] {
     for (int i = 0; i < numRounds; ++i) {
       a.wait();
@@ -45,17 +53,17 @@ void run_pingpong_test(int numRounds) {
   DSched::join(thr);
 }
 
-template <template <typename> class Atom, typename Clock>
+template <template <typename> class Atom, typename Clock, bool SinglePoster>
 void run_basic_timed_wait_tests() {
-  Baton<Atom> b;
+  Baton<Atom, SinglePoster> b;
   b.post();
   // tests if early delivery works fine
   EXPECT_TRUE(b.timed_wait(Clock::now()));
 }
 
-template <template <typename> class Atom, typename Clock>
+template <template <typename> class Atom, typename Clock, bool SinglePoster>
 void run_timed_wait_tmo_tests() {
-  Baton<Atom> b;
+  Baton<Atom, SinglePoster> b;
 
   auto thr = DSched::thread([&] {
     bool rv = b.timed_wait(Clock::now() + std::chrono::milliseconds(1));
@@ -65,9 +73,9 @@ void run_timed_wait_tmo_tests() {
   DSched::join(thr);
 }
 
-template <template <typename> class Atom, typename Clock>
+template <template <typename> class Atom, typename Clock, bool SinglePoster>
 void run_timed_wait_regular_test() {
-  Baton<Atom> b;
+  Baton<Atom, SinglePoster> b;
 
   auto thr = DSched::thread([&] {
     // To wait forever we'd like to use time_point<Clock>::max, but
@@ -96,12 +104,68 @@ void run_timed_wait_regular_test() {
   DSched::join(thr);
 }
 
-template <template <typename> class Atom>
+template <template <typename> class Atom, bool SinglePoster, bool Blocking>
 void run_try_wait_tests() {
-  Baton<Atom> b;
+  Baton<Atom, SinglePoster, Blocking> b;
   EXPECT_FALSE(b.try_wait());
   b.post();
   EXPECT_TRUE(b.try_wait());
 }
+
+template <template <typename> class Atom, bool SinglePoster, bool Blocking>
+void run_multi_producer_tests() {
+  constexpr int NPROD = 5;
+  Baton<Atom, SinglePoster, Blocking> local_ping[NPROD];
+  Baton<Atom, SinglePoster, Blocking> local_pong[NPROD];
+  Baton<Atom, /* SingleProducer = */ false, Blocking> global;
+  Baton<Atom, SinglePoster, Blocking> shutdown;
+
+  std::thread prod[NPROD];
+  for (int i = 0; i < NPROD; ++i) {
+    prod[i] = DSched::thread([&, i] {
+      if (!std::is_same<Atom<int>, DeterministicAtomic<int>>::value) {
+        // If we are using std::atomic (or EmulatedFutexAtomic) then
+        // a variable sleep here will make it more likely that
+        // global.post()-s will span more than one global.wait() by
+        // the consumer thread and for the latter to block (if the
+        // global baton is blocking). For DeterministicAtomic, we just
+        // rely on DeterministicSchedule to do the scheduling.  The
+        // test won't fail if we lose the race, we just don't get
+        // coverage.
+        for (int j = 0; j < i; ++j) {
+          std::this_thread::sleep_for(std::chrono::microseconds(1));
+        }
+      }
+      local_ping[i].post();
+      global.post();
+      local_pong[i].wait();
+    });
+  }
+
+  auto cons = DSched::thread([&] {
+    while (true) {
+      global.wait();
+      global.reset();
+      if (shutdown.try_wait()) {
+        return;
+      }
+      for (int i = 0; i < NPROD; ++i) {
+        if (local_ping.try_wait()) {
+          local_ping.reset();
+          local_pong.post();
+        }
+      }
+    }
+  });
+
+  for (auto& t : prod) {
+    DSched::join(t);
+  }
+
+  global.post();
+  shutdown.post();
+  DSched::join(cons);
 }
-}
+
+} // namespace test {
+} // namespace folly {