Saturating semaphore
authorMaged Michael <magedmichael@fb.com>
Thu, 30 Nov 2017 05:12:16 +0000 (21:12 -0800)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Thu, 30 Nov 2017 05:20:26 +0000 (21:20 -0800)
Summary:
SaturatingSemaphore is a flag that allows:
- multiple concurrent posters
- multiple concurrent waiters
- idempotent posting
- non-destructive waiting
- blocking and spinning
- pre-block spin time control

```
/// SaturatingSemaphore is a flag that allows concurrent posting by
/// multiple posters and concurrent non-destructive waiting by
/// multiple waiters.
///
/// A SaturatingSemaphore allows one or more waiter threads to check,
/// spin, or block, indefinitely or with timeout, for a flag to be set
/// by one or more poster threads. By setting the flag, posters
/// announce to waiters (that may be already waiting or will check
/// the flag in the future) that some condition is true. Posts to an
/// already set flag are idempotent.
```

Reviewed By: djwatson

Differential Revision: D6379704

fbshipit-source-id: 59aed76caa2d159639e75425a778a9c63f18f375

folly/synchronization/SaturatingSemaphore.h [new file with mode: 0644]
folly/synchronization/test/SaturatingSemaphoreTest.cpp [new file with mode: 0644]

diff --git a/folly/synchronization/SaturatingSemaphore.h b/folly/synchronization/SaturatingSemaphore.h
new file mode 100644 (file)
index 0000000..169bb8d
--- /dev/null
@@ -0,0 +1,327 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/Likely.h>
+#include <folly/detail/Futex.h>
+#include <folly/portability/Asm.h>
+
+#include <glog/logging.h>
+
+#include <atomic>
+
+namespace folly {
+
+/// SaturatingSemaphore is a flag that allows concurrent posting by
+/// multiple posters and concurrent non-destructive waiting by
+/// multiple waiters.
+///
+/// A SaturatingSemaphore allows one or more waiter threads to check,
+/// spin, or block, indefinitely or with timeout, for a flag to be set
+/// by one or more poster threads. By setting the flag, posters
+/// announce to waiters (that may be already waiting or will check
+/// the flag in the future) that some condition is true. Posts to an
+/// already set flag are idempotent.
+///
+/// SaturatingSemaphore is called so because it behaves like a hybrid
+/// binary/counted _semaphore_ with values zero and infinity, and
+/// post() and wait() functions. It is called _saturating_ because one
+/// post() is enough to set it to infinity and to satisfy any number
+/// of wait()-s. Once set (to infinity) it remains unchanged by
+/// subsequent post()-s and wait()-s, until it is reset() back to
+/// zero.
+///
+/// The implementation of SaturatingSemaphore is based on that of
+/// Baton. It includes no internal padding, and is only 4 bytes in
+/// size. Any alignment or padding to avoid false sharing is up to
+/// the user.
+/// SaturatingSemaphore differs from Baton as follows:
+/// - Baton allows at most one call to post(); this allows any number
+///   and concurrently.
+/// - Baton allows at most one successful call to any wait variant;
+///   this allows any number and concurrently.
+///
+/// Template parameter:
+/// - bool MayBlock: If false, waiting operations spin only. If
+///   true, timed and wait operations may block; adds an atomic
+///   instruction to the critical path of posters.
+///
+/// Wait options:
+///   The subclass WaitOptions contains optional per call setting for
+///   pre-block spin duration: Calls to wait(), try_wait_until(), and
+///   try_wait_for() block only after the passage of the pre-block
+///   period. The default pre-block duration is 10 microseconds. The
+///   pre block option is applicable only if MayBlock is true.
+///
+/// Functions:
+///   void reset();
+///     Clears the flag.
+///   void post();
+///     Sets the flag and wakes all current waiters, i.e., causes all
+///     concurrent calls to wait, try_wait_for, and try_wait_until to
+///     return.
+///   void wait(
+///       WaitOptions opt = wait_options());
+///     Waits for the flag to be set by a call to post.
+///   bool try_wait();
+///     Returns true if the flag is set by a call to post, otherwise false.
+///   bool try_wait_until(
+///       time_point& deadline,
+///       WaitOptions& = wait_options());
+///     Returns true if the flag is set by a call to post before the
+///     deadline, otherwise false.
+///   bool try_wait_for(
+///       duration&,
+///       WaitOptions& = wait_options());
+///     Returns true if the flag is set by a call to post before the
+///     expiration of the specified duration, otherwise false.
+///
+/// Usage:
+/// @code
+/// SaturatingSemaphore</* MayBlock = */ true> f;
+/// ASSERT_FALSE(f.try_wait());
+/// ASSERT_FALSE(f.try_wait_until(
+///     std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
+/// ASSERT_FALSE(f.try_wait_until(
+///     std::chrono::steady_clock::now() + std::chrono::microseconds(1),
+///     f.wait_options().pre_block(std::chrono::microseconds(1))));
+/// f.post();
+/// f.post();
+/// f.wait();
+/// f.wait(f.wait_options().pre_block(std::chrono::nanoseconds(100)));
+/// ASSERT_TRUE(f.try_wait());
+/// ASSERT_TRUE(f.try_wait_until(
+///     std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
+/// f.wait();
+/// f.reset();
+/// ASSERT_FALSE(f.try_wait());
+/// @endcode
+
+template <bool MayBlock, template <typename> class Atom = std::atomic>
+class SaturatingSemaphore {
+  detail::Futex<Atom> state_;
+
+  enum State : uint32_t {
+    NOTREADY = 0,
+    READY = 1,
+    BLOCKED = 2,
+  };
+
+ public:
+  /** WaitOptions */
+
+  class WaitOptions {
+    std::chrono::nanoseconds dur_{std::chrono::microseconds(10)};
+
+   public:
+    FOLLY_ALWAYS_INLINE
+    std::chrono::nanoseconds pre_block() const {
+      return dur_;
+    }
+    FOLLY_ALWAYS_INLINE
+    WaitOptions& pre_block(std::chrono::nanoseconds dur) {
+      dur_ = dur;
+      return *this;
+    }
+  };
+
+  FOLLY_ALWAYS_INLINE static WaitOptions wait_options() {
+    return {};
+  }
+
+  /** constructor */
+  constexpr SaturatingSemaphore() noexcept : state_(NOTREADY) {}
+
+  /** destructor */
+  ~SaturatingSemaphore() {}
+
+  /** reset */
+  void reset() noexcept {
+    state_.store(NOTREADY, std::memory_order_relaxed);
+  }
+
+  /** post */
+  FOLLY_ALWAYS_INLINE void post() noexcept {
+    if (!MayBlock) {
+      state_.store(READY, std::memory_order_release);
+    } else {
+      postFastWaiterMayBlock();
+    }
+  }
+
+  /** wait */
+  FOLLY_ALWAYS_INLINE
+  void wait(const WaitOptions& opt = wait_options()) noexcept {
+    try_wait_until(std::chrono::steady_clock::time_point::max(), opt);
+  }
+
+  /** try_wait */
+  FOLLY_ALWAYS_INLINE bool try_wait() const noexcept {
+    return state_.load(std::memory_order_acquire) == READY;
+  }
+
+  /** try_wait_until */
+  template <typename Clock, typename Duration>
+  FOLLY_ALWAYS_INLINE bool try_wait_until(
+      const std::chrono::time_point<Clock, Duration>& deadline,
+      const WaitOptions& opt = wait_options()) noexcept {
+    if (LIKELY(try_wait())) {
+      return true;
+    }
+    return tryWaitSlow(deadline, opt);
+  }
+
+  /** try_wait_for */
+  template <class Rep, class Period>
+  FOLLY_ALWAYS_INLINE bool try_wait_for(
+      const std::chrono::duration<Rep, Period>& duration,
+      const WaitOptions& opt = wait_options()) noexcept {
+    if (LIKELY(try_wait())) {
+      return true;
+    }
+    auto deadline = std::chrono::steady_clock::now() + duration;
+    return tryWaitSlow(deadline, opt);
+  }
+
+ private:
+  FOLLY_ALWAYS_INLINE void postFastWaiterMayBlock() noexcept {
+    uint32_t before = NOTREADY;
+    if (LIKELY(state_.compare_exchange_strong(
+            before,
+            READY,
+            std::memory_order_release,
+            std::memory_order_relaxed))) {
+      return;
+    }
+    postSlowWaiterMayBlock(before);
+  }
+
+  void postSlowWaiterMayBlock(uint32_t before) noexcept; // defined below
+
+  template <typename Clock, typename Duration>
+  bool tryWaitSlow(
+      const std::chrono::time_point<Clock, Duration>& deadline,
+      const WaitOptions& opt) noexcept; // defined below
+};
+
+///
+/// Member function definitioons
+///
+
+/** postSlowWaiterMayBlock */
+template <bool MayBlock, template <typename> class Atom>
+FOLLY_NOINLINE void SaturatingSemaphore<MayBlock, Atom>::postSlowWaiterMayBlock(
+    uint32_t before) noexcept {
+  while (true) {
+    if (before == NOTREADY) {
+      if (state_.compare_exchange_strong(
+              before,
+              READY,
+              std::memory_order_release,
+              std::memory_order_relaxed)) {
+        return;
+      }
+    }
+    if (before == READY) { // Only if multiple posters
+      // The reason for not simply returning (without the following
+      // steps) is to prevent the following case:
+      //
+      //  T1:             T2:             T3:
+      //  local1.post();  local2.post();  global.wait();
+      //  global.post();  global.post();  global.reset();
+      //                                  seq_cst fence
+      //                                  local1.try_wait() == true;
+      //                                  local2.try_wait() == false;
+      //
+      // This following steps correspond to T2's global.post(), where
+      // global is already posted by T1.
+      //
+      // The following fence and load guarantee that T3 does not miss
+      // T2's prior stores, i.e., local2.post() in this example.
+      //
+      // The following case is prevented:
+      //
+      // Starting with local2 == NOTREADY and global == READY
+      //
+      // T2:                              T3:
+      // store READY to local2 // post    store NOTREADY to global // reset
+      // seq_cst fenc                     seq_cst fence
+      // load READY from global // post   load NOTREADY from local2 // try_wait
+      //
+      std::atomic_thread_fence(std::memory_order_seq_cst);
+      before = state_.load(std::memory_order_relaxed);
+      if (before == READY) {
+        return;
+      }
+      continue;
+    }
+    DCHECK_EQ(before, BLOCKED);
+    if (state_.compare_exchange_strong(
+            before,
+            READY,
+            std::memory_order_release,
+            std::memory_order_relaxed)) {
+      state_.futexWake();
+      return;
+    }
+  }
+}
+
+/** tryWaitSlow */
+template <bool MayBlock, template <typename> class Atom>
+template <typename Clock, typename Duration>
+FOLLY_NOINLINE bool SaturatingSemaphore<MayBlock, Atom>::tryWaitSlow(
+    const std::chrono::time_point<Clock, Duration>& deadline,
+    const WaitOptions& opt) noexcept {
+  auto tbegin = Clock::now();
+  while (true) {
+    auto before = state_.load(std::memory_order_acquire);
+    if (before == READY) {
+      return true;
+    }
+    if (Clock::now() >= deadline) {
+      return false;
+    }
+    if (MayBlock) {
+      auto tnow = Clock::now();
+      if (tnow < tbegin) {
+        // backward time discontinuity in Clock, revise pre_block starting point
+        tbegin = tnow;
+      }
+      auto dur = std::chrono::duration_cast<Duration>(tnow - tbegin);
+      if (dur >= opt.pre_block()) {
+        if (before == NOTREADY) {
+          if (!state_.compare_exchange_strong(
+                  before,
+                  BLOCKED,
+                  std::memory_order_relaxed,
+                  std::memory_order_relaxed)) {
+            continue;
+          }
+        }
+        if (deadline == std::chrono::time_point<Clock, Duration>::max()) {
+          state_.futexWait(BLOCKED);
+        } else {
+          state_.futexWaitUntil(BLOCKED, deadline);
+        }
+      }
+    }
+    asm_volatile_pause();
+  }
+}
+
+} // namespace folly
diff --git a/folly/synchronization/test/SaturatingSemaphoreTest.cpp b/folly/synchronization/test/SaturatingSemaphoreTest.cpp
new file mode 100644 (file)
index 0000000..2be9615
--- /dev/null
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/synchronization/SaturatingSemaphore.h>
+#include <folly/portability/GTest.h>
+#include <folly/test/DeterministicSchedule.h>
+
+/// Test helper functions
+
+using folly::SaturatingSemaphore;
+using DSched = folly::test::DeterministicSchedule;
+
+template <bool MayBlock, template <typename> class Atom = std::atomic>
+void run_basic_test() {
+  SaturatingSemaphore<MayBlock, Atom> f;
+  ASSERT_FALSE(f.try_wait());
+  ASSERT_FALSE(f.try_wait_until(
+      std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
+  ASSERT_FALSE(f.try_wait_until(
+      std::chrono::steady_clock::now() + std::chrono::microseconds(1),
+      f.wait_options().pre_block(std::chrono::microseconds(1))));
+  f.post();
+  f.post();
+  f.wait();
+  f.wait(f.wait_options().pre_block(std::chrono::nanoseconds(100)));
+  ASSERT_TRUE(f.try_wait());
+  ASSERT_TRUE(f.try_wait_until(
+      std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
+  f.wait();
+  f.reset();
+  ASSERT_FALSE(f.try_wait());
+}
+
+template <bool MayBlock, template <typename> class Atom = std::atomic>
+void run_pingpong_test(int numRounds) {
+  using WF = SaturatingSemaphore<MayBlock, Atom>;
+  std::array<WF, 17> flags;
+  WF& a = flags[0];
+  WF& b = flags[16]; // different cache line
+  auto thr = DSched::thread([&] {
+    for (int i = 0; i < numRounds; ++i) {
+      a.try_wait();
+      a.wait();
+      a.reset();
+      b.post();
+    }
+  });
+  for (int i = 0; i < numRounds; ++i) {
+    a.post();
+    b.try_wait();
+    b.wait();
+    b.reset();
+  }
+  DSched::join(thr);
+}
+
+template <bool MayBlock, template <typename> class Atom = std::atomic>
+void run_multi_poster_multi_waiter_test(int np, int nw) {
+  SaturatingSemaphore<MayBlock, Atom> f;
+  std::atomic<int> posted{0};
+  std::atomic<int> waited{0};
+  std::atomic<bool> go_post{false};
+  std::atomic<bool> go_wait{false};
+
+  std::vector<std::thread> prod(np);
+  std::vector<std::thread> cons(nw);
+  for (int i = 0; i < np; ++i) {
+    prod[i] = DSched::thread([&] {
+      while (!go_post.load()) {
+        /* spin */;
+      }
+      f.post();
+      posted.fetch_add(1);
+    });
+  }
+
+  for (int i = 0; i < nw; ++i) {
+    cons[i] = DSched::thread([&] {
+      ASSERT_FALSE(f.try_wait());
+      ASSERT_FALSE(f.try_wait_for(std::chrono::microseconds(1)));
+      ASSERT_FALSE(f.try_wait_until(
+          std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
+      ASSERT_FALSE(f.try_wait_until(
+          std::chrono::steady_clock::now() + std::chrono::microseconds(1),
+          f.wait_options().pre_block(std::chrono::microseconds(0))));
+      waited.fetch_add(1);
+      while (!go_wait.load()) {
+        /* spin */;
+      }
+      ASSERT_TRUE(f.try_wait());
+      ASSERT_TRUE(f.try_wait_for(std::chrono::microseconds(1)));
+      ASSERT_TRUE(f.try_wait_until(
+          std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
+      ASSERT_TRUE(f.try_wait_until(
+          std::chrono::steady_clock::now() + std::chrono::microseconds(1),
+          f.wait_options().pre_block(std::chrono::microseconds(0))));
+      f.wait();
+    });
+  }
+
+  while (waited.load() < nw) {
+    /* spin */;
+  }
+  go_post.store(true);
+  while (posted.load() < np) {
+    /* spin */;
+  }
+  go_wait.store(true);
+
+  for (auto& t : prod) {
+    DSched::join(t);
+  }
+  for (auto& t : cons) {
+    DSched::join(t);
+  }
+}
+
+/// Tests
+
+TEST(SaturatingSemaphore, basic) {
+  run_basic_test<false>();
+  run_basic_test<true>();
+}
+
+TEST(SaturatingSemaphore, pingpong) {
+  run_pingpong_test<false>(1000);
+  run_pingpong_test<true>(1000);
+}
+
+TEST(SaturatingSemaphore, multi_poster_multi_waiter) {
+  run_multi_poster_multi_waiter_test<false>(1, 1);
+  run_multi_poster_multi_waiter_test<false>(1, 10);
+  run_multi_poster_multi_waiter_test<false>(10, 1);
+  run_multi_poster_multi_waiter_test<false>(10, 10);
+  run_multi_poster_multi_waiter_test<true>(1, 1);
+  run_multi_poster_multi_waiter_test<true>(1, 10);
+  run_multi_poster_multi_waiter_test<true>(10, 1);
+  run_multi_poster_multi_waiter_test<true>(10, 10);
+}