--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/Likely.h>
+#include <folly/detail/Futex.h>
+#include <folly/portability/Asm.h>
+
+#include <glog/logging.h>
+
+#include <atomic>
+
+namespace folly {
+
+/// SaturatingSemaphore is a flag that allows concurrent posting by
+/// multiple posters and concurrent non-destructive waiting by
+/// multiple waiters.
+///
+/// A SaturatingSemaphore allows one or more waiter threads to check,
+/// spin, or block, indefinitely or with timeout, for a flag to be set
+/// by one or more poster threads. By setting the flag, posters
+/// announce to waiters (that may be already waiting or will check
+/// the flag in the future) that some condition is true. Posts to an
+/// already set flag are idempotent.
+///
+/// SaturatingSemaphore is called so because it behaves like a hybrid
+/// binary/counted _semaphore_ with values zero and infinity, and
+/// post() and wait() functions. It is called _saturating_ because one
+/// post() is enough to set it to infinity and to satisfy any number
+/// of wait()-s. Once set (to infinity) it remains unchanged by
+/// subsequent post()-s and wait()-s, until it is reset() back to
+/// zero.
+///
+/// The implementation of SaturatingSemaphore is based on that of
+/// Baton. It includes no internal padding, and is only 4 bytes in
+/// size. Any alignment or padding to avoid false sharing is up to
+/// the user.
+/// SaturatingSemaphore differs from Baton as follows:
+/// - Baton allows at most one call to post(); this allows any number
+/// and concurrently.
+/// - Baton allows at most one successful call to any wait variant;
+/// this allows any number and concurrently.
+///
+/// Template parameter:
+/// - bool MayBlock: If false, waiting operations spin only. If
+/// true, timed and wait operations may block; adds an atomic
+/// instruction to the critical path of posters.
+///
+/// Wait options:
+/// The subclass WaitOptions contains optional per call setting for
+/// pre-block spin duration: Calls to wait(), try_wait_until(), and
+/// try_wait_for() block only after the passage of the pre-block
+/// period. The default pre-block duration is 10 microseconds. The
+/// pre block option is applicable only if MayBlock is true.
+///
+/// Functions:
+/// void reset();
+/// Clears the flag.
+/// void post();
+/// Sets the flag and wakes all current waiters, i.e., causes all
+/// concurrent calls to wait, try_wait_for, and try_wait_until to
+/// return.
+/// void wait(
+/// WaitOptions opt = wait_options());
+/// Waits for the flag to be set by a call to post.
+/// bool try_wait();
+/// Returns true if the flag is set by a call to post, otherwise false.
+/// bool try_wait_until(
+/// time_point& deadline,
+/// WaitOptions& = wait_options());
+/// Returns true if the flag is set by a call to post before the
+/// deadline, otherwise false.
+/// bool try_wait_for(
+/// duration&,
+/// WaitOptions& = wait_options());
+/// Returns true if the flag is set by a call to post before the
+/// expiration of the specified duration, otherwise false.
+///
+/// Usage:
+/// @code
+/// SaturatingSemaphore</* MayBlock = */ true> f;
+/// ASSERT_FALSE(f.try_wait());
+/// ASSERT_FALSE(f.try_wait_until(
+/// std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
+/// ASSERT_FALSE(f.try_wait_until(
+/// std::chrono::steady_clock::now() + std::chrono::microseconds(1),
+/// f.wait_options().pre_block(std::chrono::microseconds(1))));
+/// f.post();
+/// f.post();
+/// f.wait();
+/// f.wait(f.wait_options().pre_block(std::chrono::nanoseconds(100)));
+/// ASSERT_TRUE(f.try_wait());
+/// ASSERT_TRUE(f.try_wait_until(
+/// std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
+/// f.wait();
+/// f.reset();
+/// ASSERT_FALSE(f.try_wait());
+/// @endcode
+
+template <bool MayBlock, template <typename> class Atom = std::atomic>
+class SaturatingSemaphore {
+ detail::Futex<Atom> state_;
+
+ enum State : uint32_t {
+ NOTREADY = 0,
+ READY = 1,
+ BLOCKED = 2,
+ };
+
+ public:
+ /** WaitOptions */
+
+ class WaitOptions {
+ std::chrono::nanoseconds dur_{std::chrono::microseconds(10)};
+
+ public:
+ FOLLY_ALWAYS_INLINE
+ std::chrono::nanoseconds pre_block() const {
+ return dur_;
+ }
+ FOLLY_ALWAYS_INLINE
+ WaitOptions& pre_block(std::chrono::nanoseconds dur) {
+ dur_ = dur;
+ return *this;
+ }
+ };
+
+ FOLLY_ALWAYS_INLINE static WaitOptions wait_options() {
+ return {};
+ }
+
+ /** constructor */
+ constexpr SaturatingSemaphore() noexcept : state_(NOTREADY) {}
+
+ /** destructor */
+ ~SaturatingSemaphore() {}
+
+ /** reset */
+ void reset() noexcept {
+ state_.store(NOTREADY, std::memory_order_relaxed);
+ }
+
+ /** post */
+ FOLLY_ALWAYS_INLINE void post() noexcept {
+ if (!MayBlock) {
+ state_.store(READY, std::memory_order_release);
+ } else {
+ postFastWaiterMayBlock();
+ }
+ }
+
+ /** wait */
+ FOLLY_ALWAYS_INLINE
+ void wait(const WaitOptions& opt = wait_options()) noexcept {
+ try_wait_until(std::chrono::steady_clock::time_point::max(), opt);
+ }
+
+ /** try_wait */
+ FOLLY_ALWAYS_INLINE bool try_wait() const noexcept {
+ return state_.load(std::memory_order_acquire) == READY;
+ }
+
+ /** try_wait_until */
+ template <typename Clock, typename Duration>
+ FOLLY_ALWAYS_INLINE bool try_wait_until(
+ const std::chrono::time_point<Clock, Duration>& deadline,
+ const WaitOptions& opt = wait_options()) noexcept {
+ if (LIKELY(try_wait())) {
+ return true;
+ }
+ return tryWaitSlow(deadline, opt);
+ }
+
+ /** try_wait_for */
+ template <class Rep, class Period>
+ FOLLY_ALWAYS_INLINE bool try_wait_for(
+ const std::chrono::duration<Rep, Period>& duration,
+ const WaitOptions& opt = wait_options()) noexcept {
+ if (LIKELY(try_wait())) {
+ return true;
+ }
+ auto deadline = std::chrono::steady_clock::now() + duration;
+ return tryWaitSlow(deadline, opt);
+ }
+
+ private:
+ FOLLY_ALWAYS_INLINE void postFastWaiterMayBlock() noexcept {
+ uint32_t before = NOTREADY;
+ if (LIKELY(state_.compare_exchange_strong(
+ before,
+ READY,
+ std::memory_order_release,
+ std::memory_order_relaxed))) {
+ return;
+ }
+ postSlowWaiterMayBlock(before);
+ }
+
+ void postSlowWaiterMayBlock(uint32_t before) noexcept; // defined below
+
+ template <typename Clock, typename Duration>
+ bool tryWaitSlow(
+ const std::chrono::time_point<Clock, Duration>& deadline,
+ const WaitOptions& opt) noexcept; // defined below
+};
+
+///
+/// Member function definitioons
+///
+
+/** postSlowWaiterMayBlock */
+template <bool MayBlock, template <typename> class Atom>
+FOLLY_NOINLINE void SaturatingSemaphore<MayBlock, Atom>::postSlowWaiterMayBlock(
+ uint32_t before) noexcept {
+ while (true) {
+ if (before == NOTREADY) {
+ if (state_.compare_exchange_strong(
+ before,
+ READY,
+ std::memory_order_release,
+ std::memory_order_relaxed)) {
+ return;
+ }
+ }
+ if (before == READY) { // Only if multiple posters
+ // The reason for not simply returning (without the following
+ // steps) is to prevent the following case:
+ //
+ // T1: T2: T3:
+ // local1.post(); local2.post(); global.wait();
+ // global.post(); global.post(); global.reset();
+ // seq_cst fence
+ // local1.try_wait() == true;
+ // local2.try_wait() == false;
+ //
+ // This following steps correspond to T2's global.post(), where
+ // global is already posted by T1.
+ //
+ // The following fence and load guarantee that T3 does not miss
+ // T2's prior stores, i.e., local2.post() in this example.
+ //
+ // The following case is prevented:
+ //
+ // Starting with local2 == NOTREADY and global == READY
+ //
+ // T2: T3:
+ // store READY to local2 // post store NOTREADY to global // reset
+ // seq_cst fenc seq_cst fence
+ // load READY from global // post load NOTREADY from local2 // try_wait
+ //
+ std::atomic_thread_fence(std::memory_order_seq_cst);
+ before = state_.load(std::memory_order_relaxed);
+ if (before == READY) {
+ return;
+ }
+ continue;
+ }
+ DCHECK_EQ(before, BLOCKED);
+ if (state_.compare_exchange_strong(
+ before,
+ READY,
+ std::memory_order_release,
+ std::memory_order_relaxed)) {
+ state_.futexWake();
+ return;
+ }
+ }
+}
+
+/** tryWaitSlow */
+template <bool MayBlock, template <typename> class Atom>
+template <typename Clock, typename Duration>
+FOLLY_NOINLINE bool SaturatingSemaphore<MayBlock, Atom>::tryWaitSlow(
+ const std::chrono::time_point<Clock, Duration>& deadline,
+ const WaitOptions& opt) noexcept {
+ auto tbegin = Clock::now();
+ while (true) {
+ auto before = state_.load(std::memory_order_acquire);
+ if (before == READY) {
+ return true;
+ }
+ if (Clock::now() >= deadline) {
+ return false;
+ }
+ if (MayBlock) {
+ auto tnow = Clock::now();
+ if (tnow < tbegin) {
+ // backward time discontinuity in Clock, revise pre_block starting point
+ tbegin = tnow;
+ }
+ auto dur = std::chrono::duration_cast<Duration>(tnow - tbegin);
+ if (dur >= opt.pre_block()) {
+ if (before == NOTREADY) {
+ if (!state_.compare_exchange_strong(
+ before,
+ BLOCKED,
+ std::memory_order_relaxed,
+ std::memory_order_relaxed)) {
+ continue;
+ }
+ }
+ if (deadline == std::chrono::time_point<Clock, Duration>::max()) {
+ state_.futexWait(BLOCKED);
+ } else {
+ state_.futexWaitUntil(BLOCKED, deadline);
+ }
+ }
+ }
+ asm_volatile_pause();
+ }
+}
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/synchronization/SaturatingSemaphore.h>
+#include <folly/portability/GTest.h>
+#include <folly/test/DeterministicSchedule.h>
+
+/// Test helper functions
+
+using folly::SaturatingSemaphore;
+using DSched = folly::test::DeterministicSchedule;
+
+template <bool MayBlock, template <typename> class Atom = std::atomic>
+void run_basic_test() {
+ SaturatingSemaphore<MayBlock, Atom> f;
+ ASSERT_FALSE(f.try_wait());
+ ASSERT_FALSE(f.try_wait_until(
+ std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
+ ASSERT_FALSE(f.try_wait_until(
+ std::chrono::steady_clock::now() + std::chrono::microseconds(1),
+ f.wait_options().pre_block(std::chrono::microseconds(1))));
+ f.post();
+ f.post();
+ f.wait();
+ f.wait(f.wait_options().pre_block(std::chrono::nanoseconds(100)));
+ ASSERT_TRUE(f.try_wait());
+ ASSERT_TRUE(f.try_wait_until(
+ std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
+ f.wait();
+ f.reset();
+ ASSERT_FALSE(f.try_wait());
+}
+
+template <bool MayBlock, template <typename> class Atom = std::atomic>
+void run_pingpong_test(int numRounds) {
+ using WF = SaturatingSemaphore<MayBlock, Atom>;
+ std::array<WF, 17> flags;
+ WF& a = flags[0];
+ WF& b = flags[16]; // different cache line
+ auto thr = DSched::thread([&] {
+ for (int i = 0; i < numRounds; ++i) {
+ a.try_wait();
+ a.wait();
+ a.reset();
+ b.post();
+ }
+ });
+ for (int i = 0; i < numRounds; ++i) {
+ a.post();
+ b.try_wait();
+ b.wait();
+ b.reset();
+ }
+ DSched::join(thr);
+}
+
+template <bool MayBlock, template <typename> class Atom = std::atomic>
+void run_multi_poster_multi_waiter_test(int np, int nw) {
+ SaturatingSemaphore<MayBlock, Atom> f;
+ std::atomic<int> posted{0};
+ std::atomic<int> waited{0};
+ std::atomic<bool> go_post{false};
+ std::atomic<bool> go_wait{false};
+
+ std::vector<std::thread> prod(np);
+ std::vector<std::thread> cons(nw);
+ for (int i = 0; i < np; ++i) {
+ prod[i] = DSched::thread([&] {
+ while (!go_post.load()) {
+ /* spin */;
+ }
+ f.post();
+ posted.fetch_add(1);
+ });
+ }
+
+ for (int i = 0; i < nw; ++i) {
+ cons[i] = DSched::thread([&] {
+ ASSERT_FALSE(f.try_wait());
+ ASSERT_FALSE(f.try_wait_for(std::chrono::microseconds(1)));
+ ASSERT_FALSE(f.try_wait_until(
+ std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
+ ASSERT_FALSE(f.try_wait_until(
+ std::chrono::steady_clock::now() + std::chrono::microseconds(1),
+ f.wait_options().pre_block(std::chrono::microseconds(0))));
+ waited.fetch_add(1);
+ while (!go_wait.load()) {
+ /* spin */;
+ }
+ ASSERT_TRUE(f.try_wait());
+ ASSERT_TRUE(f.try_wait_for(std::chrono::microseconds(1)));
+ ASSERT_TRUE(f.try_wait_until(
+ std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
+ ASSERT_TRUE(f.try_wait_until(
+ std::chrono::steady_clock::now() + std::chrono::microseconds(1),
+ f.wait_options().pre_block(std::chrono::microseconds(0))));
+ f.wait();
+ });
+ }
+
+ while (waited.load() < nw) {
+ /* spin */;
+ }
+ go_post.store(true);
+ while (posted.load() < np) {
+ /* spin */;
+ }
+ go_wait.store(true);
+
+ for (auto& t : prod) {
+ DSched::join(t);
+ }
+ for (auto& t : cons) {
+ DSched::join(t);
+ }
+}
+
+/// Tests
+
+TEST(SaturatingSemaphore, basic) {
+ run_basic_test<false>();
+ run_basic_test<true>();
+}
+
+TEST(SaturatingSemaphore, pingpong) {
+ run_pingpong_test<false>(1000);
+ run_pingpong_test<true>(1000);
+}
+
+TEST(SaturatingSemaphore, multi_poster_multi_waiter) {
+ run_multi_poster_multi_waiter_test<false>(1, 1);
+ run_multi_poster_multi_waiter_test<false>(1, 10);
+ run_multi_poster_multi_waiter_test<false>(10, 1);
+ run_multi_poster_multi_waiter_test<false>(10, 10);
+ run_multi_poster_multi_waiter_test<true>(1, 1);
+ run_multi_poster_multi_waiter_test<true>(1, 10);
+ run_multi_poster_multi_waiter_test<true>(10, 1);
+ run_multi_poster_multi_waiter_test<true>(10, 10);
+}