2 * Copyright 2017-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/Likely.h>
20 #include <folly/detail/Futex.h>
21 #include <folly/detail/MemoryIdler.h>
22 #include <folly/portability/Asm.h>
23 #include <folly/synchronization/WaitOptions.h>
25 #include <glog/logging.h>
31 /// SaturatingSemaphore is a flag that allows concurrent posting by
32 /// multiple posters and concurrent non-destructive waiting by
35 /// A SaturatingSemaphore allows one or more waiter threads to check,
36 /// spin, or block, indefinitely or with timeout, for a flag to be set
37 /// by one or more poster threads. By setting the flag, posters
38 /// announce to waiters (that may be already waiting or will check
39 /// the flag in the future) that some condition is true. Posts to an
40 /// already set flag are idempotent.
42 /// SaturatingSemaphore is called so because it behaves like a hybrid
43 /// binary/counted _semaphore_ with values zero and infinity, and
44 /// post() and wait() functions. It is called _saturating_ because one
45 /// post() is enough to set it to infinity and to satisfy any number
46 /// of wait()-s. Once set (to infinity) it remains unchanged by
47 /// subsequent post()-s and wait()-s, until it is reset() back to
50 /// The implementation of SaturatingSemaphore is based on that of
51 /// Baton. It includes no internal padding, and is only 4 bytes in
52 /// size. Any alignment or padding to avoid false sharing is up to
54 /// SaturatingSemaphore differs from Baton as follows:
55 /// - Baton allows at most one call to post(); this allows any number
57 /// - Baton allows at most one successful call to any wait variant;
58 /// this allows any number and concurrently.
60 /// Template parameter:
61 /// - bool MayBlock: If false, waiting operations spin only. If
62 /// true, timed and wait operations may block; adds an atomic
63 /// instruction to the critical path of posters.
66 /// WaitOptions contains optional per call setting for spin-max duration:
67 /// Calls to wait(), try_wait_until(), and try_wait_for() block only after the
68 /// passage of the spin-max period. The default spin-max duration is 10 usec.
69 /// The spin-max option is applicable only if MayBlock is true.
73 /// Returns true if the flag is set by a call to post, otherwise false.
74 /// Equivalent to try_wait, but available on const receivers.
78 /// Sets the flag and wakes all current waiters, i.e., causes all
79 /// concurrent calls to wait, try_wait_for, and try_wait_until to
82 /// WaitOptions opt = wait_options());
83 /// Waits for the flag to be set by a call to post.
85 /// Returns true if the flag is set by a call to post, otherwise false.
86 /// bool try_wait_until(
87 /// time_point& deadline,
88 /// WaitOptions& = wait_options());
89 /// Returns true if the flag is set by a call to post before the
90 /// deadline, otherwise false.
91 /// bool try_wait_for(
93 /// WaitOptions& = wait_options());
94 /// Returns true if the flag is set by a call to post before the
95 /// expiration of the specified duration, otherwise false.
99 /// SaturatingSemaphore</* MayBlock = */ true> f;
100 /// ASSERT_FALSE(f.try_wait());
101 /// ASSERT_FALSE(f.try_wait_until(
102 /// std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
103 /// ASSERT_FALSE(f.try_wait_until(
104 /// std::chrono::steady_clock::now() + std::chrono::microseconds(1),
105 /// f.wait_options().spin_max(std::chrono::microseconds(1))));
109 /// f.wait(f.wait_options().spin_max(std::chrono::nanoseconds(100)));
110 /// ASSERT_TRUE(f.try_wait());
111 /// ASSERT_TRUE(f.try_wait_until(
112 /// std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
115 /// ASSERT_FALSE(f.try_wait());
118 template <bool MayBlock, template <typename> class Atom = std::atomic>
119 class SaturatingSemaphore {
120 detail::Futex<Atom> state_;
122 enum State : uint32_t {
129 FOLLY_ALWAYS_INLINE static WaitOptions wait_options() {
134 constexpr SaturatingSemaphore() noexcept : state_(NOTREADY) {}
137 ~SaturatingSemaphore() {}
140 FOLLY_ALWAYS_INLINE bool ready() const noexcept {
141 return state_.load(std::memory_order_acquire) == READY;
145 void reset() noexcept {
146 state_.store(NOTREADY, std::memory_order_relaxed);
150 FOLLY_ALWAYS_INLINE void post() noexcept {
152 state_.store(READY, std::memory_order_release);
154 postFastWaiterMayBlock();
160 void wait(const WaitOptions& opt = wait_options()) noexcept {
161 try_wait_until(std::chrono::steady_clock::time_point::max(), opt);
165 FOLLY_ALWAYS_INLINE bool try_wait() noexcept {
169 /** try_wait_until */
170 template <typename Clock, typename Duration>
171 FOLLY_ALWAYS_INLINE bool try_wait_until(
172 const std::chrono::time_point<Clock, Duration>& deadline,
173 const WaitOptions& opt = wait_options()) noexcept {
174 if (LIKELY(try_wait())) {
177 return tryWaitSlow(deadline, opt);
181 template <class Rep, class Period>
182 FOLLY_ALWAYS_INLINE bool try_wait_for(
183 const std::chrono::duration<Rep, Period>& duration,
184 const WaitOptions& opt = wait_options()) noexcept {
185 if (LIKELY(try_wait())) {
188 auto deadline = std::chrono::steady_clock::now() + duration;
189 return tryWaitSlow(deadline, opt);
193 FOLLY_ALWAYS_INLINE void postFastWaiterMayBlock() noexcept {
194 uint32_t before = NOTREADY;
195 if (LIKELY(state_.compare_exchange_strong(
198 std::memory_order_release,
199 std::memory_order_relaxed))) {
202 postSlowWaiterMayBlock(before);
205 void postSlowWaiterMayBlock(uint32_t before) noexcept; // defined below
207 template <typename Clock, typename Duration>
209 const std::chrono::time_point<Clock, Duration>& deadline,
210 const WaitOptions& opt) noexcept; // defined below
214 /// Member function definitioons
217 /** postSlowWaiterMayBlock */
218 template <bool MayBlock, template <typename> class Atom>
219 FOLLY_NOINLINE void SaturatingSemaphore<MayBlock, Atom>::postSlowWaiterMayBlock(
220 uint32_t before) noexcept {
222 if (before == NOTREADY) {
223 if (state_.compare_exchange_strong(
226 std::memory_order_release,
227 std::memory_order_relaxed)) {
231 if (before == READY) { // Only if multiple posters
232 // The reason for not simply returning (without the following
233 // steps) is to prevent the following case:
236 // local1.post(); local2.post(); global.wait();
237 // global.post(); global.post(); global.reset();
239 // local1.try_wait() == true;
240 // local2.try_wait() == false;
242 // This following steps correspond to T2's global.post(), where
243 // global is already posted by T1.
245 // The following fence and load guarantee that T3 does not miss
246 // T2's prior stores, i.e., local2.post() in this example.
248 // The following case is prevented:
250 // Starting with local2 == NOTREADY and global == READY
253 // store READY to local2 // post store NOTREADY to global // reset
254 // seq_cst fenc seq_cst fence
255 // load READY from global // post load NOTREADY from local2 // try_wait
257 std::atomic_thread_fence(std::memory_order_seq_cst);
258 before = state_.load(std::memory_order_relaxed);
259 if (before == READY) {
264 DCHECK_EQ(before, BLOCKED);
265 if (state_.compare_exchange_strong(
268 std::memory_order_release,
269 std::memory_order_relaxed)) {
277 template <bool MayBlock, template <typename> class Atom>
278 template <typename Clock, typename Duration>
279 FOLLY_NOINLINE bool SaturatingSemaphore<MayBlock, Atom>::tryWaitSlow(
280 const std::chrono::time_point<Clock, Duration>& deadline,
281 const WaitOptions& opt) noexcept {
282 auto tbegin = Clock::now();
284 auto before = state_.load(std::memory_order_acquire);
285 if (before == READY) {
288 if (Clock::now() >= deadline) {
292 auto tnow = Clock::now();
294 // backward time discontinuity in Clock, revise spin_max starting point
297 auto dur = std::chrono::duration_cast<Duration>(tnow - tbegin);
298 if (dur >= opt.spin_max()) {
299 if (before == NOTREADY) {
300 if (!state_.compare_exchange_strong(
303 std::memory_order_relaxed,
304 std::memory_order_relaxed)) {
308 detail::MemoryIdler::futexWaitUntil(state_, BLOCKED, deadline);
311 asm_volatile_pause();