Let SaturatingSemaphore::try_wait be non-const and add ready
[folly.git] / folly / synchronization / SaturatingSemaphore.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/Likely.h>
20 #include <folly/detail/Futex.h>
21 #include <folly/portability/Asm.h>
22
23 #include <glog/logging.h>
24
25 #include <atomic>
26
27 namespace folly {
28
29 /// SaturatingSemaphore is a flag that allows concurrent posting by
30 /// multiple posters and concurrent non-destructive waiting by
31 /// multiple waiters.
32 ///
33 /// A SaturatingSemaphore allows one or more waiter threads to check,
34 /// spin, or block, indefinitely or with timeout, for a flag to be set
35 /// by one or more poster threads. By setting the flag, posters
36 /// announce to waiters (that may be already waiting or will check
37 /// the flag in the future) that some condition is true. Posts to an
38 /// already set flag are idempotent.
39 ///
40 /// SaturatingSemaphore is called so because it behaves like a hybrid
41 /// binary/counted _semaphore_ with values zero and infinity, and
42 /// post() and wait() functions. It is called _saturating_ because one
43 /// post() is enough to set it to infinity and to satisfy any number
44 /// of wait()-s. Once set (to infinity) it remains unchanged by
45 /// subsequent post()-s and wait()-s, until it is reset() back to
46 /// zero.
47 ///
48 /// The implementation of SaturatingSemaphore is based on that of
49 /// Baton. It includes no internal padding, and is only 4 bytes in
50 /// size. Any alignment or padding to avoid false sharing is up to
51 /// the user.
52 /// SaturatingSemaphore differs from Baton as follows:
53 /// - Baton allows at most one call to post(); this allows any number
54 ///   and concurrently.
55 /// - Baton allows at most one successful call to any wait variant;
56 ///   this allows any number and concurrently.
57 ///
58 /// Template parameter:
59 /// - bool MayBlock: If false, waiting operations spin only. If
60 ///   true, timed and wait operations may block; adds an atomic
61 ///   instruction to the critical path of posters.
62 ///
63 /// Wait options:
64 ///   The subclass WaitOptions contains optional per call setting for
65 ///   pre-block spin duration: Calls to wait(), try_wait_until(), and
66 ///   try_wait_for() block only after the passage of the pre-block
67 ///   period. The default pre-block duration is 10 microseconds. The
68 ///   pre block option is applicable only if MayBlock is true.
69 ///
70 /// Functions:
71 ///   bool ready():
72 ///     Returns true if the flag is set by a call to post, otherwise false.
73 ///     Equivalent to try_wait, but available on const receivers.
74 ///   void reset();
75 ///     Clears the flag.
76 ///   void post();
77 ///     Sets the flag and wakes all current waiters, i.e., causes all
78 ///     concurrent calls to wait, try_wait_for, and try_wait_until to
79 ///     return.
80 ///   void wait(
81 ///       WaitOptions opt = wait_options());
82 ///     Waits for the flag to be set by a call to post.
83 ///   bool try_wait();
84 ///     Returns true if the flag is set by a call to post, otherwise false.
85 ///   bool try_wait_until(
86 ///       time_point& deadline,
87 ///       WaitOptions& = wait_options());
88 ///     Returns true if the flag is set by a call to post before the
89 ///     deadline, otherwise false.
90 ///   bool try_wait_for(
91 ///       duration&,
92 ///       WaitOptions& = wait_options());
93 ///     Returns true if the flag is set by a call to post before the
94 ///     expiration of the specified duration, otherwise false.
95 ///
96 /// Usage:
97 /// @code
98 /// SaturatingSemaphore</* MayBlock = */ true> f;
99 /// ASSERT_FALSE(f.try_wait());
100 /// ASSERT_FALSE(f.try_wait_until(
101 ///     std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
102 /// ASSERT_FALSE(f.try_wait_until(
103 ///     std::chrono::steady_clock::now() + std::chrono::microseconds(1),
104 ///     f.wait_options().pre_block(std::chrono::microseconds(1))));
105 /// f.post();
106 /// f.post();
107 /// f.wait();
108 /// f.wait(f.wait_options().pre_block(std::chrono::nanoseconds(100)));
109 /// ASSERT_TRUE(f.try_wait());
110 /// ASSERT_TRUE(f.try_wait_until(
111 ///     std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
112 /// f.wait();
113 /// f.reset();
114 /// ASSERT_FALSE(f.try_wait());
115 /// @endcode
116
117 template <bool MayBlock, template <typename> class Atom = std::atomic>
118 class SaturatingSemaphore {
119   detail::Futex<Atom> state_;
120
121   enum State : uint32_t {
122     NOTREADY = 0,
123     READY = 1,
124     BLOCKED = 2,
125   };
126
127  public:
128   /** WaitOptions */
129
130   class WaitOptions {
131     std::chrono::nanoseconds dur_{std::chrono::microseconds(10)};
132
133    public:
134     FOLLY_ALWAYS_INLINE
135     std::chrono::nanoseconds pre_block() const {
136       return dur_;
137     }
138     FOLLY_ALWAYS_INLINE
139     WaitOptions& pre_block(std::chrono::nanoseconds dur) {
140       dur_ = dur;
141       return *this;
142     }
143   };
144
145   FOLLY_ALWAYS_INLINE static WaitOptions wait_options() {
146     return {};
147   }
148
149   /** constructor */
150   constexpr SaturatingSemaphore() noexcept : state_(NOTREADY) {}
151
152   /** destructor */
153   ~SaturatingSemaphore() {}
154
155   /** ready */
156   FOLLY_ALWAYS_INLINE bool ready() const noexcept {
157     return state_.load(std::memory_order_acquire) == READY;
158   }
159
160   /** reset */
161   void reset() noexcept {
162     state_.store(NOTREADY, std::memory_order_relaxed);
163   }
164
165   /** post */
166   FOLLY_ALWAYS_INLINE void post() noexcept {
167     if (!MayBlock) {
168       state_.store(READY, std::memory_order_release);
169     } else {
170       postFastWaiterMayBlock();
171     }
172   }
173
174   /** wait */
175   FOLLY_ALWAYS_INLINE
176   void wait(const WaitOptions& opt = wait_options()) noexcept {
177     try_wait_until(std::chrono::steady_clock::time_point::max(), opt);
178   }
179
180   /** try_wait */
181   FOLLY_ALWAYS_INLINE bool try_wait() noexcept {
182     return ready();
183   }
184
185   /** try_wait_until */
186   template <typename Clock, typename Duration>
187   FOLLY_ALWAYS_INLINE bool try_wait_until(
188       const std::chrono::time_point<Clock, Duration>& deadline,
189       const WaitOptions& opt = wait_options()) noexcept {
190     if (LIKELY(try_wait())) {
191       return true;
192     }
193     return tryWaitSlow(deadline, opt);
194   }
195
196   /** try_wait_for */
197   template <class Rep, class Period>
198   FOLLY_ALWAYS_INLINE bool try_wait_for(
199       const std::chrono::duration<Rep, Period>& duration,
200       const WaitOptions& opt = wait_options()) noexcept {
201     if (LIKELY(try_wait())) {
202       return true;
203     }
204     auto deadline = std::chrono::steady_clock::now() + duration;
205     return tryWaitSlow(deadline, opt);
206   }
207
208  private:
209   FOLLY_ALWAYS_INLINE void postFastWaiterMayBlock() noexcept {
210     uint32_t before = NOTREADY;
211     if (LIKELY(state_.compare_exchange_strong(
212             before,
213             READY,
214             std::memory_order_release,
215             std::memory_order_relaxed))) {
216       return;
217     }
218     postSlowWaiterMayBlock(before);
219   }
220
221   void postSlowWaiterMayBlock(uint32_t before) noexcept; // defined below
222
223   template <typename Clock, typename Duration>
224   bool tryWaitSlow(
225       const std::chrono::time_point<Clock, Duration>& deadline,
226       const WaitOptions& opt) noexcept; // defined below
227 };
228
229 ///
230 /// Member function definitioons
231 ///
232
233 /** postSlowWaiterMayBlock */
234 template <bool MayBlock, template <typename> class Atom>
235 FOLLY_NOINLINE void SaturatingSemaphore<MayBlock, Atom>::postSlowWaiterMayBlock(
236     uint32_t before) noexcept {
237   while (true) {
238     if (before == NOTREADY) {
239       if (state_.compare_exchange_strong(
240               before,
241               READY,
242               std::memory_order_release,
243               std::memory_order_relaxed)) {
244         return;
245       }
246     }
247     if (before == READY) { // Only if multiple posters
248       // The reason for not simply returning (without the following
249       // steps) is to prevent the following case:
250       //
251       //  T1:             T2:             T3:
252       //  local1.post();  local2.post();  global.wait();
253       //  global.post();  global.post();  global.reset();
254       //                                  seq_cst fence
255       //                                  local1.try_wait() == true;
256       //                                  local2.try_wait() == false;
257       //
258       // This following steps correspond to T2's global.post(), where
259       // global is already posted by T1.
260       //
261       // The following fence and load guarantee that T3 does not miss
262       // T2's prior stores, i.e., local2.post() in this example.
263       //
264       // The following case is prevented:
265       //
266       // Starting with local2 == NOTREADY and global == READY
267       //
268       // T2:                              T3:
269       // store READY to local2 // post    store NOTREADY to global // reset
270       // seq_cst fenc                     seq_cst fence
271       // load READY from global // post   load NOTREADY from local2 // try_wait
272       //
273       std::atomic_thread_fence(std::memory_order_seq_cst);
274       before = state_.load(std::memory_order_relaxed);
275       if (before == READY) {
276         return;
277       }
278       continue;
279     }
280     DCHECK_EQ(before, BLOCKED);
281     if (state_.compare_exchange_strong(
282             before,
283             READY,
284             std::memory_order_release,
285             std::memory_order_relaxed)) {
286       state_.futexWake();
287       return;
288     }
289   }
290 }
291
292 /** tryWaitSlow */
293 template <bool MayBlock, template <typename> class Atom>
294 template <typename Clock, typename Duration>
295 FOLLY_NOINLINE bool SaturatingSemaphore<MayBlock, Atom>::tryWaitSlow(
296     const std::chrono::time_point<Clock, Duration>& deadline,
297     const WaitOptions& opt) noexcept {
298   auto tbegin = Clock::now();
299   while (true) {
300     auto before = state_.load(std::memory_order_acquire);
301     if (before == READY) {
302       return true;
303     }
304     if (Clock::now() >= deadline) {
305       return false;
306     }
307     if (MayBlock) {
308       auto tnow = Clock::now();
309       if (tnow < tbegin) {
310         // backward time discontinuity in Clock, revise pre_block starting point
311         tbegin = tnow;
312       }
313       auto dur = std::chrono::duration_cast<Duration>(tnow - tbegin);
314       if (dur >= opt.pre_block()) {
315         if (before == NOTREADY) {
316           if (!state_.compare_exchange_strong(
317                   before,
318                   BLOCKED,
319                   std::memory_order_relaxed,
320                   std::memory_order_relaxed)) {
321             continue;
322           }
323         }
324         if (deadline == std::chrono::time_point<Clock, Duration>::max()) {
325           state_.futexWait(BLOCKED);
326         } else {
327           state_.futexWaitUntil(BLOCKED, deadline);
328         }
329       }
330     }
331     asm_volatile_pause();
332   }
333 }
334
335 } // namespace folly