2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/Likely.h>
20 #include <folly/detail/Futex.h>
21 #include <folly/portability/Asm.h>
23 #include <glog/logging.h>
29 /// SaturatingSemaphore is a flag that allows concurrent posting by
30 /// multiple posters and concurrent non-destructive waiting by
33 /// A SaturatingSemaphore allows one or more waiter threads to check,
34 /// spin, or block, indefinitely or with timeout, for a flag to be set
35 /// by one or more poster threads. By setting the flag, posters
36 /// announce to waiters (that may be already waiting or will check
37 /// the flag in the future) that some condition is true. Posts to an
38 /// already set flag are idempotent.
40 /// SaturatingSemaphore is called so because it behaves like a hybrid
41 /// binary/counted _semaphore_ with values zero and infinity, and
42 /// post() and wait() functions. It is called _saturating_ because one
43 /// post() is enough to set it to infinity and to satisfy any number
44 /// of wait()-s. Once set (to infinity) it remains unchanged by
45 /// subsequent post()-s and wait()-s, until it is reset() back to
48 /// The implementation of SaturatingSemaphore is based on that of
49 /// Baton. It includes no internal padding, and is only 4 bytes in
50 /// size. Any alignment or padding to avoid false sharing is up to
52 /// SaturatingSemaphore differs from Baton as follows:
53 /// - Baton allows at most one call to post(); this allows any number
55 /// - Baton allows at most one successful call to any wait variant;
56 /// this allows any number and concurrently.
58 /// Template parameter:
59 /// - bool MayBlock: If false, waiting operations spin only. If
60 /// true, timed and wait operations may block; adds an atomic
61 /// instruction to the critical path of posters.
64 /// The subclass WaitOptions contains optional per call setting for
65 /// pre-block spin duration: Calls to wait(), try_wait_until(), and
66 /// try_wait_for() block only after the passage of the pre-block
67 /// period. The default pre-block duration is 10 microseconds. The
68 /// pre block option is applicable only if MayBlock is true.
72 /// Returns true if the flag is set by a call to post, otherwise false.
73 /// Equivalent to try_wait, but available on const receivers.
77 /// Sets the flag and wakes all current waiters, i.e., causes all
78 /// concurrent calls to wait, try_wait_for, and try_wait_until to
81 /// WaitOptions opt = wait_options());
82 /// Waits for the flag to be set by a call to post.
84 /// Returns true if the flag is set by a call to post, otherwise false.
85 /// bool try_wait_until(
86 /// time_point& deadline,
87 /// WaitOptions& = wait_options());
88 /// Returns true if the flag is set by a call to post before the
89 /// deadline, otherwise false.
90 /// bool try_wait_for(
92 /// WaitOptions& = wait_options());
93 /// Returns true if the flag is set by a call to post before the
94 /// expiration of the specified duration, otherwise false.
98 /// SaturatingSemaphore</* MayBlock = */ true> f;
99 /// ASSERT_FALSE(f.try_wait());
100 /// ASSERT_FALSE(f.try_wait_until(
101 /// std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
102 /// ASSERT_FALSE(f.try_wait_until(
103 /// std::chrono::steady_clock::now() + std::chrono::microseconds(1),
104 /// f.wait_options().pre_block(std::chrono::microseconds(1))));
108 /// f.wait(f.wait_options().pre_block(std::chrono::nanoseconds(100)));
109 /// ASSERT_TRUE(f.try_wait());
110 /// ASSERT_TRUE(f.try_wait_until(
111 /// std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
114 /// ASSERT_FALSE(f.try_wait());
117 template <bool MayBlock, template <typename> class Atom = std::atomic>
118 class SaturatingSemaphore {
119 detail::Futex<Atom> state_;
121 enum State : uint32_t {
131 std::chrono::nanoseconds dur_{std::chrono::microseconds(10)};
135 std::chrono::nanoseconds pre_block() const {
139 WaitOptions& pre_block(std::chrono::nanoseconds dur) {
145 FOLLY_ALWAYS_INLINE static WaitOptions wait_options() {
150 constexpr SaturatingSemaphore() noexcept : state_(NOTREADY) {}
153 ~SaturatingSemaphore() {}
156 FOLLY_ALWAYS_INLINE bool ready() const noexcept {
157 return state_.load(std::memory_order_acquire) == READY;
161 void reset() noexcept {
162 state_.store(NOTREADY, std::memory_order_relaxed);
166 FOLLY_ALWAYS_INLINE void post() noexcept {
168 state_.store(READY, std::memory_order_release);
170 postFastWaiterMayBlock();
176 void wait(const WaitOptions& opt = wait_options()) noexcept {
177 try_wait_until(std::chrono::steady_clock::time_point::max(), opt);
181 FOLLY_ALWAYS_INLINE bool try_wait() noexcept {
185 /** try_wait_until */
186 template <typename Clock, typename Duration>
187 FOLLY_ALWAYS_INLINE bool try_wait_until(
188 const std::chrono::time_point<Clock, Duration>& deadline,
189 const WaitOptions& opt = wait_options()) noexcept {
190 if (LIKELY(try_wait())) {
193 return tryWaitSlow(deadline, opt);
197 template <class Rep, class Period>
198 FOLLY_ALWAYS_INLINE bool try_wait_for(
199 const std::chrono::duration<Rep, Period>& duration,
200 const WaitOptions& opt = wait_options()) noexcept {
201 if (LIKELY(try_wait())) {
204 auto deadline = std::chrono::steady_clock::now() + duration;
205 return tryWaitSlow(deadline, opt);
209 FOLLY_ALWAYS_INLINE void postFastWaiterMayBlock() noexcept {
210 uint32_t before = NOTREADY;
211 if (LIKELY(state_.compare_exchange_strong(
214 std::memory_order_release,
215 std::memory_order_relaxed))) {
218 postSlowWaiterMayBlock(before);
221 void postSlowWaiterMayBlock(uint32_t before) noexcept; // defined below
223 template <typename Clock, typename Duration>
225 const std::chrono::time_point<Clock, Duration>& deadline,
226 const WaitOptions& opt) noexcept; // defined below
230 /// Member function definitioons
233 /** postSlowWaiterMayBlock */
234 template <bool MayBlock, template <typename> class Atom>
235 FOLLY_NOINLINE void SaturatingSemaphore<MayBlock, Atom>::postSlowWaiterMayBlock(
236 uint32_t before) noexcept {
238 if (before == NOTREADY) {
239 if (state_.compare_exchange_strong(
242 std::memory_order_release,
243 std::memory_order_relaxed)) {
247 if (before == READY) { // Only if multiple posters
248 // The reason for not simply returning (without the following
249 // steps) is to prevent the following case:
252 // local1.post(); local2.post(); global.wait();
253 // global.post(); global.post(); global.reset();
255 // local1.try_wait() == true;
256 // local2.try_wait() == false;
258 // This following steps correspond to T2's global.post(), where
259 // global is already posted by T1.
261 // The following fence and load guarantee that T3 does not miss
262 // T2's prior stores, i.e., local2.post() in this example.
264 // The following case is prevented:
266 // Starting with local2 == NOTREADY and global == READY
269 // store READY to local2 // post store NOTREADY to global // reset
270 // seq_cst fenc seq_cst fence
271 // load READY from global // post load NOTREADY from local2 // try_wait
273 std::atomic_thread_fence(std::memory_order_seq_cst);
274 before = state_.load(std::memory_order_relaxed);
275 if (before == READY) {
280 DCHECK_EQ(before, BLOCKED);
281 if (state_.compare_exchange_strong(
284 std::memory_order_release,
285 std::memory_order_relaxed)) {
293 template <bool MayBlock, template <typename> class Atom>
294 template <typename Clock, typename Duration>
295 FOLLY_NOINLINE bool SaturatingSemaphore<MayBlock, Atom>::tryWaitSlow(
296 const std::chrono::time_point<Clock, Duration>& deadline,
297 const WaitOptions& opt) noexcept {
298 auto tbegin = Clock::now();
300 auto before = state_.load(std::memory_order_acquire);
301 if (before == READY) {
304 if (Clock::now() >= deadline) {
308 auto tnow = Clock::now();
310 // backward time discontinuity in Clock, revise pre_block starting point
313 auto dur = std::chrono::duration_cast<Duration>(tnow - tbegin);
314 if (dur >= opt.pre_block()) {
315 if (before == NOTREADY) {
316 if (!state_.compare_exchange_strong(
319 std::memory_order_relaxed,
320 std::memory_order_relaxed)) {
324 if (deadline == std::chrono::time_point<Clock, Duration>::max()) {
325 state_.futexWait(BLOCKED);
327 state_.futexWaitUntil(BLOCKED, deadline);
331 asm_volatile_pause();