From 644a73aada5984d2c125d505a21a00174ba4f185 Mon Sep 17 00:00:00 2001 From: Maged Michael Date: Wed, 29 Nov 2017 21:12:16 -0800 Subject: [PATCH] Saturating semaphore Summary: SaturatingSemaphore is a flag that allows: - multiple concurrent posters - multiple concurrent waiters - idempotent posting - non-destructive waiting - blocking and spinning - pre-block spin time control ``` /// SaturatingSemaphore is a flag that allows concurrent posting by /// multiple posters and concurrent non-destructive waiting by /// multiple waiters. /// /// A SaturatingSemaphore allows one or more waiter threads to check, /// spin, or block, indefinitely or with timeout, for a flag to be set /// by one or more poster threads. By setting the flag, posters /// announce to waiters (that may be already waiting or will check /// the flag in the future) that some condition is true. Posts to an /// already set flag are idempotent. ``` Reviewed By: djwatson Differential Revision: D6379704 fbshipit-source-id: 59aed76caa2d159639e75425a778a9c63f18f375 --- folly/synchronization/SaturatingSemaphore.h | 327 ++++++++++++++++++ .../test/SaturatingSemaphoreTest.cpp | 152 ++++++++ 2 files changed, 479 insertions(+) create mode 100644 folly/synchronization/SaturatingSemaphore.h create mode 100644 folly/synchronization/test/SaturatingSemaphoreTest.cpp diff --git a/folly/synchronization/SaturatingSemaphore.h b/folly/synchronization/SaturatingSemaphore.h new file mode 100644 index 00000000..169bb8d8 --- /dev/null +++ b/folly/synchronization/SaturatingSemaphore.h @@ -0,0 +1,327 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include + +#include + +namespace folly { + +/// SaturatingSemaphore is a flag that allows concurrent posting by +/// multiple posters and concurrent non-destructive waiting by +/// multiple waiters. +/// +/// A SaturatingSemaphore allows one or more waiter threads to check, +/// spin, or block, indefinitely or with timeout, for a flag to be set +/// by one or more poster threads. By setting the flag, posters +/// announce to waiters (that may be already waiting or will check +/// the flag in the future) that some condition is true. Posts to an +/// already set flag are idempotent. +/// +/// SaturatingSemaphore is called so because it behaves like a hybrid +/// binary/counted _semaphore_ with values zero and infinity, and +/// post() and wait() functions. It is called _saturating_ because one +/// post() is enough to set it to infinity and to satisfy any number +/// of wait()-s. Once set (to infinity) it remains unchanged by +/// subsequent post()-s and wait()-s, until it is reset() back to +/// zero. +/// +/// The implementation of SaturatingSemaphore is based on that of +/// Baton. It includes no internal padding, and is only 4 bytes in +/// size. Any alignment or padding to avoid false sharing is up to +/// the user. +/// SaturatingSemaphore differs from Baton as follows: +/// - Baton allows at most one call to post(); this allows any number +/// and concurrently. +/// - Baton allows at most one successful call to any wait variant; +/// this allows any number and concurrently. +/// +/// Template parameter: +/// - bool MayBlock: If false, waiting operations spin only. If +/// true, timed and wait operations may block; adds an atomic +/// instruction to the critical path of posters. +/// +/// Wait options: +/// The subclass WaitOptions contains optional per call setting for +/// pre-block spin duration: Calls to wait(), try_wait_until(), and +/// try_wait_for() block only after the passage of the pre-block +/// period. The default pre-block duration is 10 microseconds. The +/// pre block option is applicable only if MayBlock is true. +/// +/// Functions: +/// void reset(); +/// Clears the flag. +/// void post(); +/// Sets the flag and wakes all current waiters, i.e., causes all +/// concurrent calls to wait, try_wait_for, and try_wait_until to +/// return. +/// void wait( +/// WaitOptions opt = wait_options()); +/// Waits for the flag to be set by a call to post. +/// bool try_wait(); +/// Returns true if the flag is set by a call to post, otherwise false. +/// bool try_wait_until( +/// time_point& deadline, +/// WaitOptions& = wait_options()); +/// Returns true if the flag is set by a call to post before the +/// deadline, otherwise false. +/// bool try_wait_for( +/// duration&, +/// WaitOptions& = wait_options()); +/// Returns true if the flag is set by a call to post before the +/// expiration of the specified duration, otherwise false. +/// +/// Usage: +/// @code +/// SaturatingSemaphore f; +/// ASSERT_FALSE(f.try_wait()); +/// ASSERT_FALSE(f.try_wait_until( +/// std::chrono::steady_clock::now() + std::chrono::microseconds(1))); +/// ASSERT_FALSE(f.try_wait_until( +/// std::chrono::steady_clock::now() + std::chrono::microseconds(1), +/// f.wait_options().pre_block(std::chrono::microseconds(1)))); +/// f.post(); +/// f.post(); +/// f.wait(); +/// f.wait(f.wait_options().pre_block(std::chrono::nanoseconds(100))); +/// ASSERT_TRUE(f.try_wait()); +/// ASSERT_TRUE(f.try_wait_until( +/// std::chrono::steady_clock::now() + std::chrono::microseconds(1))); +/// f.wait(); +/// f.reset(); +/// ASSERT_FALSE(f.try_wait()); +/// @endcode + +template class Atom = std::atomic> +class SaturatingSemaphore { + detail::Futex state_; + + enum State : uint32_t { + NOTREADY = 0, + READY = 1, + BLOCKED = 2, + }; + + public: + /** WaitOptions */ + + class WaitOptions { + std::chrono::nanoseconds dur_{std::chrono::microseconds(10)}; + + public: + FOLLY_ALWAYS_INLINE + std::chrono::nanoseconds pre_block() const { + return dur_; + } + FOLLY_ALWAYS_INLINE + WaitOptions& pre_block(std::chrono::nanoseconds dur) { + dur_ = dur; + return *this; + } + }; + + FOLLY_ALWAYS_INLINE static WaitOptions wait_options() { + return {}; + } + + /** constructor */ + constexpr SaturatingSemaphore() noexcept : state_(NOTREADY) {} + + /** destructor */ + ~SaturatingSemaphore() {} + + /** reset */ + void reset() noexcept { + state_.store(NOTREADY, std::memory_order_relaxed); + } + + /** post */ + FOLLY_ALWAYS_INLINE void post() noexcept { + if (!MayBlock) { + state_.store(READY, std::memory_order_release); + } else { + postFastWaiterMayBlock(); + } + } + + /** wait */ + FOLLY_ALWAYS_INLINE + void wait(const WaitOptions& opt = wait_options()) noexcept { + try_wait_until(std::chrono::steady_clock::time_point::max(), opt); + } + + /** try_wait */ + FOLLY_ALWAYS_INLINE bool try_wait() const noexcept { + return state_.load(std::memory_order_acquire) == READY; + } + + /** try_wait_until */ + template + FOLLY_ALWAYS_INLINE bool try_wait_until( + const std::chrono::time_point& deadline, + const WaitOptions& opt = wait_options()) noexcept { + if (LIKELY(try_wait())) { + return true; + } + return tryWaitSlow(deadline, opt); + } + + /** try_wait_for */ + template + FOLLY_ALWAYS_INLINE bool try_wait_for( + const std::chrono::duration& duration, + const WaitOptions& opt = wait_options()) noexcept { + if (LIKELY(try_wait())) { + return true; + } + auto deadline = std::chrono::steady_clock::now() + duration; + return tryWaitSlow(deadline, opt); + } + + private: + FOLLY_ALWAYS_INLINE void postFastWaiterMayBlock() noexcept { + uint32_t before = NOTREADY; + if (LIKELY(state_.compare_exchange_strong( + before, + READY, + std::memory_order_release, + std::memory_order_relaxed))) { + return; + } + postSlowWaiterMayBlock(before); + } + + void postSlowWaiterMayBlock(uint32_t before) noexcept; // defined below + + template + bool tryWaitSlow( + const std::chrono::time_point& deadline, + const WaitOptions& opt) noexcept; // defined below +}; + +/// +/// Member function definitioons +/// + +/** postSlowWaiterMayBlock */ +template class Atom> +FOLLY_NOINLINE void SaturatingSemaphore::postSlowWaiterMayBlock( + uint32_t before) noexcept { + while (true) { + if (before == NOTREADY) { + if (state_.compare_exchange_strong( + before, + READY, + std::memory_order_release, + std::memory_order_relaxed)) { + return; + } + } + if (before == READY) { // Only if multiple posters + // The reason for not simply returning (without the following + // steps) is to prevent the following case: + // + // T1: T2: T3: + // local1.post(); local2.post(); global.wait(); + // global.post(); global.post(); global.reset(); + // seq_cst fence + // local1.try_wait() == true; + // local2.try_wait() == false; + // + // This following steps correspond to T2's global.post(), where + // global is already posted by T1. + // + // The following fence and load guarantee that T3 does not miss + // T2's prior stores, i.e., local2.post() in this example. + // + // The following case is prevented: + // + // Starting with local2 == NOTREADY and global == READY + // + // T2: T3: + // store READY to local2 // post store NOTREADY to global // reset + // seq_cst fenc seq_cst fence + // load READY from global // post load NOTREADY from local2 // try_wait + // + std::atomic_thread_fence(std::memory_order_seq_cst); + before = state_.load(std::memory_order_relaxed); + if (before == READY) { + return; + } + continue; + } + DCHECK_EQ(before, BLOCKED); + if (state_.compare_exchange_strong( + before, + READY, + std::memory_order_release, + std::memory_order_relaxed)) { + state_.futexWake(); + return; + } + } +} + +/** tryWaitSlow */ +template class Atom> +template +FOLLY_NOINLINE bool SaturatingSemaphore::tryWaitSlow( + const std::chrono::time_point& deadline, + const WaitOptions& opt) noexcept { + auto tbegin = Clock::now(); + while (true) { + auto before = state_.load(std::memory_order_acquire); + if (before == READY) { + return true; + } + if (Clock::now() >= deadline) { + return false; + } + if (MayBlock) { + auto tnow = Clock::now(); + if (tnow < tbegin) { + // backward time discontinuity in Clock, revise pre_block starting point + tbegin = tnow; + } + auto dur = std::chrono::duration_cast(tnow - tbegin); + if (dur >= opt.pre_block()) { + if (before == NOTREADY) { + if (!state_.compare_exchange_strong( + before, + BLOCKED, + std::memory_order_relaxed, + std::memory_order_relaxed)) { + continue; + } + } + if (deadline == std::chrono::time_point::max()) { + state_.futexWait(BLOCKED); + } else { + state_.futexWaitUntil(BLOCKED, deadline); + } + } + } + asm_volatile_pause(); + } +} + +} // namespace folly diff --git a/folly/synchronization/test/SaturatingSemaphoreTest.cpp b/folly/synchronization/test/SaturatingSemaphoreTest.cpp new file mode 100644 index 00000000..2be96155 --- /dev/null +++ b/folly/synchronization/test/SaturatingSemaphoreTest.cpp @@ -0,0 +1,152 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +/// Test helper functions + +using folly::SaturatingSemaphore; +using DSched = folly::test::DeterministicSchedule; + +template class Atom = std::atomic> +void run_basic_test() { + SaturatingSemaphore f; + ASSERT_FALSE(f.try_wait()); + ASSERT_FALSE(f.try_wait_until( + std::chrono::steady_clock::now() + std::chrono::microseconds(1))); + ASSERT_FALSE(f.try_wait_until( + std::chrono::steady_clock::now() + std::chrono::microseconds(1), + f.wait_options().pre_block(std::chrono::microseconds(1)))); + f.post(); + f.post(); + f.wait(); + f.wait(f.wait_options().pre_block(std::chrono::nanoseconds(100))); + ASSERT_TRUE(f.try_wait()); + ASSERT_TRUE(f.try_wait_until( + std::chrono::steady_clock::now() + std::chrono::microseconds(1))); + f.wait(); + f.reset(); + ASSERT_FALSE(f.try_wait()); +} + +template class Atom = std::atomic> +void run_pingpong_test(int numRounds) { + using WF = SaturatingSemaphore; + std::array flags; + WF& a = flags[0]; + WF& b = flags[16]; // different cache line + auto thr = DSched::thread([&] { + for (int i = 0; i < numRounds; ++i) { + a.try_wait(); + a.wait(); + a.reset(); + b.post(); + } + }); + for (int i = 0; i < numRounds; ++i) { + a.post(); + b.try_wait(); + b.wait(); + b.reset(); + } + DSched::join(thr); +} + +template class Atom = std::atomic> +void run_multi_poster_multi_waiter_test(int np, int nw) { + SaturatingSemaphore f; + std::atomic posted{0}; + std::atomic waited{0}; + std::atomic go_post{false}; + std::atomic go_wait{false}; + + std::vector prod(np); + std::vector cons(nw); + for (int i = 0; i < np; ++i) { + prod[i] = DSched::thread([&] { + while (!go_post.load()) { + /* spin */; + } + f.post(); + posted.fetch_add(1); + }); + } + + for (int i = 0; i < nw; ++i) { + cons[i] = DSched::thread([&] { + ASSERT_FALSE(f.try_wait()); + ASSERT_FALSE(f.try_wait_for(std::chrono::microseconds(1))); + ASSERT_FALSE(f.try_wait_until( + std::chrono::steady_clock::now() + std::chrono::microseconds(1))); + ASSERT_FALSE(f.try_wait_until( + std::chrono::steady_clock::now() + std::chrono::microseconds(1), + f.wait_options().pre_block(std::chrono::microseconds(0)))); + waited.fetch_add(1); + while (!go_wait.load()) { + /* spin */; + } + ASSERT_TRUE(f.try_wait()); + ASSERT_TRUE(f.try_wait_for(std::chrono::microseconds(1))); + ASSERT_TRUE(f.try_wait_until( + std::chrono::steady_clock::now() + std::chrono::microseconds(1))); + ASSERT_TRUE(f.try_wait_until( + std::chrono::steady_clock::now() + std::chrono::microseconds(1), + f.wait_options().pre_block(std::chrono::microseconds(0)))); + f.wait(); + }); + } + + while (waited.load() < nw) { + /* spin */; + } + go_post.store(true); + while (posted.load() < np) { + /* spin */; + } + go_wait.store(true); + + for (auto& t : prod) { + DSched::join(t); + } + for (auto& t : cons) { + DSched::join(t); + } +} + +/// Tests + +TEST(SaturatingSemaphore, basic) { + run_basic_test(); + run_basic_test(); +} + +TEST(SaturatingSemaphore, pingpong) { + run_pingpong_test(1000); + run_pingpong_test(1000); +} + +TEST(SaturatingSemaphore, multi_poster_multi_waiter) { + run_multi_poster_multi_waiter_test(1, 1); + run_multi_poster_multi_waiter_test(1, 10); + run_multi_poster_multi_waiter_test(10, 1); + run_multi_poster_multi_waiter_test(10, 10); + run_multi_poster_multi_waiter_test(1, 1); + run_multi_poster_multi_waiter_test(1, 10); + run_multi_poster_multi_waiter_test(10, 1); + run_multi_poster_multi_waiter_test(10, 10); +} -- 2.34.1