Remove unsafe tag and add presorted tag
[folly.git] / folly / synchronization / SaturatingSemaphore.h
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/Likely.h>
20 #include <folly/detail/Futex.h>
21 #include <folly/detail/MemoryIdler.h>
22 #include <folly/portability/Asm.h>
23 #include <folly/synchronization/WaitOptions.h>
24
25 #include <glog/logging.h>
26
27 #include <atomic>
28
29 namespace folly {
30
31 /// SaturatingSemaphore is a flag that allows concurrent posting by
32 /// multiple posters and concurrent non-destructive waiting by
33 /// multiple waiters.
34 ///
35 /// A SaturatingSemaphore allows one or more waiter threads to check,
36 /// spin, or block, indefinitely or with timeout, for a flag to be set
37 /// by one or more poster threads. By setting the flag, posters
38 /// announce to waiters (that may be already waiting or will check
39 /// the flag in the future) that some condition is true. Posts to an
40 /// already set flag are idempotent.
41 ///
42 /// SaturatingSemaphore is called so because it behaves like a hybrid
43 /// binary/counted _semaphore_ with values zero and infinity, and
44 /// post() and wait() functions. It is called _saturating_ because one
45 /// post() is enough to set it to infinity and to satisfy any number
46 /// of wait()-s. Once set (to infinity) it remains unchanged by
47 /// subsequent post()-s and wait()-s, until it is reset() back to
48 /// zero.
49 ///
50 /// The implementation of SaturatingSemaphore is based on that of
51 /// Baton. It includes no internal padding, and is only 4 bytes in
52 /// size. Any alignment or padding to avoid false sharing is up to
53 /// the user.
54 /// SaturatingSemaphore differs from Baton as follows:
55 /// - Baton allows at most one call to post(); this allows any number
56 ///   and concurrently.
57 /// - Baton allows at most one successful call to any wait variant;
58 ///   this allows any number and concurrently.
59 ///
60 /// Template parameter:
61 /// - bool MayBlock: If false, waiting operations spin only. If
62 ///   true, timed and wait operations may block; adds an atomic
63 ///   instruction to the critical path of posters.
64 ///
65 /// Wait options:
66 ///   WaitOptions contains optional per call setting for spin-max duration:
67 ///   Calls to wait(), try_wait_until(), and try_wait_for() block only after the
68 ///   passage of the spin-max period. The default spin-max duration is 10 usec.
69 ///   The spin-max option is applicable only if MayBlock is true.
70 ///
71 /// Functions:
72 ///   bool ready():
73 ///     Returns true if the flag is set by a call to post, otherwise false.
74 ///     Equivalent to try_wait, but available on const receivers.
75 ///   void reset();
76 ///     Clears the flag.
77 ///   void post();
78 ///     Sets the flag and wakes all current waiters, i.e., causes all
79 ///     concurrent calls to wait, try_wait_for, and try_wait_until to
80 ///     return.
81 ///   void wait(
82 ///       WaitOptions opt = wait_options());
83 ///     Waits for the flag to be set by a call to post.
84 ///   bool try_wait();
85 ///     Returns true if the flag is set by a call to post, otherwise false.
86 ///   bool try_wait_until(
87 ///       time_point& deadline,
88 ///       WaitOptions& = wait_options());
89 ///     Returns true if the flag is set by a call to post before the
90 ///     deadline, otherwise false.
91 ///   bool try_wait_for(
92 ///       duration&,
93 ///       WaitOptions& = wait_options());
94 ///     Returns true if the flag is set by a call to post before the
95 ///     expiration of the specified duration, otherwise false.
96 ///
97 /// Usage:
98 /// @code
99 /// SaturatingSemaphore</* MayBlock = */ true> f;
100 /// ASSERT_FALSE(f.try_wait());
101 /// ASSERT_FALSE(f.try_wait_until(
102 ///     std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
103 /// ASSERT_FALSE(f.try_wait_until(
104 ///     std::chrono::steady_clock::now() + std::chrono::microseconds(1),
105 ///     f.wait_options().spin_max(std::chrono::microseconds(1))));
106 /// f.post();
107 /// f.post();
108 /// f.wait();
109 /// f.wait(f.wait_options().spin_max(std::chrono::nanoseconds(100)));
110 /// ASSERT_TRUE(f.try_wait());
111 /// ASSERT_TRUE(f.try_wait_until(
112 ///     std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
113 /// f.wait();
114 /// f.reset();
115 /// ASSERT_FALSE(f.try_wait());
116 /// @endcode
117
118 template <bool MayBlock, template <typename> class Atom = std::atomic>
119 class SaturatingSemaphore {
120   detail::Futex<Atom> state_;
121
122   enum State : uint32_t {
123     NOTREADY = 0,
124     READY = 1,
125     BLOCKED = 2,
126   };
127
128  public:
129   FOLLY_ALWAYS_INLINE static WaitOptions wait_options() {
130     return {};
131   }
132
133   /** constructor */
134   constexpr SaturatingSemaphore() noexcept : state_(NOTREADY) {}
135
136   /** destructor */
137   ~SaturatingSemaphore() {}
138
139   /** ready */
140   FOLLY_ALWAYS_INLINE bool ready() const noexcept {
141     return state_.load(std::memory_order_acquire) == READY;
142   }
143
144   /** reset */
145   void reset() noexcept {
146     state_.store(NOTREADY, std::memory_order_relaxed);
147   }
148
149   /** post */
150   FOLLY_ALWAYS_INLINE void post() noexcept {
151     if (!MayBlock) {
152       state_.store(READY, std::memory_order_release);
153     } else {
154       postFastWaiterMayBlock();
155     }
156   }
157
158   /** wait */
159   FOLLY_ALWAYS_INLINE
160   void wait(const WaitOptions& opt = wait_options()) noexcept {
161     try_wait_until(std::chrono::steady_clock::time_point::max(), opt);
162   }
163
164   /** try_wait */
165   FOLLY_ALWAYS_INLINE bool try_wait() noexcept {
166     return ready();
167   }
168
169   /** try_wait_until */
170   template <typename Clock, typename Duration>
171   FOLLY_ALWAYS_INLINE bool try_wait_until(
172       const std::chrono::time_point<Clock, Duration>& deadline,
173       const WaitOptions& opt = wait_options()) noexcept {
174     if (LIKELY(try_wait())) {
175       return true;
176     }
177     return tryWaitSlow(deadline, opt);
178   }
179
180   /** try_wait_for */
181   template <class Rep, class Period>
182   FOLLY_ALWAYS_INLINE bool try_wait_for(
183       const std::chrono::duration<Rep, Period>& duration,
184       const WaitOptions& opt = wait_options()) noexcept {
185     if (LIKELY(try_wait())) {
186       return true;
187     }
188     auto deadline = std::chrono::steady_clock::now() + duration;
189     return tryWaitSlow(deadline, opt);
190   }
191
192  private:
193   FOLLY_ALWAYS_INLINE void postFastWaiterMayBlock() noexcept {
194     uint32_t before = NOTREADY;
195     if (LIKELY(state_.compare_exchange_strong(
196             before,
197             READY,
198             std::memory_order_release,
199             std::memory_order_relaxed))) {
200       return;
201     }
202     postSlowWaiterMayBlock(before);
203   }
204
205   void postSlowWaiterMayBlock(uint32_t before) noexcept; // defined below
206
207   template <typename Clock, typename Duration>
208   bool tryWaitSlow(
209       const std::chrono::time_point<Clock, Duration>& deadline,
210       const WaitOptions& opt) noexcept; // defined below
211 };
212
213 ///
214 /// Member function definitioons
215 ///
216
217 /** postSlowWaiterMayBlock */
218 template <bool MayBlock, template <typename> class Atom>
219 FOLLY_NOINLINE void SaturatingSemaphore<MayBlock, Atom>::postSlowWaiterMayBlock(
220     uint32_t before) noexcept {
221   while (true) {
222     if (before == NOTREADY) {
223       if (state_.compare_exchange_strong(
224               before,
225               READY,
226               std::memory_order_release,
227               std::memory_order_relaxed)) {
228         return;
229       }
230     }
231     if (before == READY) { // Only if multiple posters
232       // The reason for not simply returning (without the following
233       // steps) is to prevent the following case:
234       //
235       //  T1:             T2:             T3:
236       //  local1.post();  local2.post();  global.wait();
237       //  global.post();  global.post();  global.reset();
238       //                                  seq_cst fence
239       //                                  local1.try_wait() == true;
240       //                                  local2.try_wait() == false;
241       //
242       // This following steps correspond to T2's global.post(), where
243       // global is already posted by T1.
244       //
245       // The following fence and load guarantee that T3 does not miss
246       // T2's prior stores, i.e., local2.post() in this example.
247       //
248       // The following case is prevented:
249       //
250       // Starting with local2 == NOTREADY and global == READY
251       //
252       // T2:                              T3:
253       // store READY to local2 // post    store NOTREADY to global // reset
254       // seq_cst fenc                     seq_cst fence
255       // load READY from global // post   load NOTREADY from local2 // try_wait
256       //
257       std::atomic_thread_fence(std::memory_order_seq_cst);
258       before = state_.load(std::memory_order_relaxed);
259       if (before == READY) {
260         return;
261       }
262       continue;
263     }
264     DCHECK_EQ(before, BLOCKED);
265     if (state_.compare_exchange_strong(
266             before,
267             READY,
268             std::memory_order_release,
269             std::memory_order_relaxed)) {
270       state_.futexWake();
271       return;
272     }
273   }
274 }
275
276 /** tryWaitSlow */
277 template <bool MayBlock, template <typename> class Atom>
278 template <typename Clock, typename Duration>
279 FOLLY_NOINLINE bool SaturatingSemaphore<MayBlock, Atom>::tryWaitSlow(
280     const std::chrono::time_point<Clock, Duration>& deadline,
281     const WaitOptions& opt) noexcept {
282   auto tbegin = Clock::now();
283   while (true) {
284     auto before = state_.load(std::memory_order_acquire);
285     if (before == READY) {
286       return true;
287     }
288     if (Clock::now() >= deadline) {
289       return false;
290     }
291     if (MayBlock) {
292       auto tnow = Clock::now();
293       if (tnow < tbegin) {
294         // backward time discontinuity in Clock, revise spin_max starting point
295         tbegin = tnow;
296       }
297       auto dur = std::chrono::duration_cast<Duration>(tnow - tbegin);
298       if (dur >= opt.spin_max()) {
299         if (before == NOTREADY) {
300           if (!state_.compare_exchange_strong(
301                   before,
302                   BLOCKED,
303                   std::memory_order_relaxed,
304                   std::memory_order_relaxed)) {
305             continue;
306           }
307         }
308         detail::MemoryIdler::futexWaitUntil(state_, BLOCKED, deadline);
309       }
310     }
311     asm_volatile_pause();
312   }
313 }
314
315 } // namespace folly