Saturating semaphore
[folly.git] / folly / synchronization / SaturatingSemaphore.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/Likely.h>
20 #include <folly/detail/Futex.h>
21 #include <folly/portability/Asm.h>
22
23 #include <glog/logging.h>
24
25 #include <atomic>
26
27 namespace folly {
28
29 /// SaturatingSemaphore is a flag that allows concurrent posting by
30 /// multiple posters and concurrent non-destructive waiting by
31 /// multiple waiters.
32 ///
33 /// A SaturatingSemaphore allows one or more waiter threads to check,
34 /// spin, or block, indefinitely or with timeout, for a flag to be set
35 /// by one or more poster threads. By setting the flag, posters
36 /// announce to waiters (that may be already waiting or will check
37 /// the flag in the future) that some condition is true. Posts to an
38 /// already set flag are idempotent.
39 ///
40 /// SaturatingSemaphore is called so because it behaves like a hybrid
41 /// binary/counted _semaphore_ with values zero and infinity, and
42 /// post() and wait() functions. It is called _saturating_ because one
43 /// post() is enough to set it to infinity and to satisfy any number
44 /// of wait()-s. Once set (to infinity) it remains unchanged by
45 /// subsequent post()-s and wait()-s, until it is reset() back to
46 /// zero.
47 ///
48 /// The implementation of SaturatingSemaphore is based on that of
49 /// Baton. It includes no internal padding, and is only 4 bytes in
50 /// size. Any alignment or padding to avoid false sharing is up to
51 /// the user.
52 /// SaturatingSemaphore differs from Baton as follows:
53 /// - Baton allows at most one call to post(); this allows any number
54 ///   and concurrently.
55 /// - Baton allows at most one successful call to any wait variant;
56 ///   this allows any number and concurrently.
57 ///
58 /// Template parameter:
59 /// - bool MayBlock: If false, waiting operations spin only. If
60 ///   true, timed and wait operations may block; adds an atomic
61 ///   instruction to the critical path of posters.
62 ///
63 /// Wait options:
64 ///   The subclass WaitOptions contains optional per call setting for
65 ///   pre-block spin duration: Calls to wait(), try_wait_until(), and
66 ///   try_wait_for() block only after the passage of the pre-block
67 ///   period. The default pre-block duration is 10 microseconds. The
68 ///   pre block option is applicable only if MayBlock is true.
69 ///
70 /// Functions:
71 ///   void reset();
72 ///     Clears the flag.
73 ///   void post();
74 ///     Sets the flag and wakes all current waiters, i.e., causes all
75 ///     concurrent calls to wait, try_wait_for, and try_wait_until to
76 ///     return.
77 ///   void wait(
78 ///       WaitOptions opt = wait_options());
79 ///     Waits for the flag to be set by a call to post.
80 ///   bool try_wait();
81 ///     Returns true if the flag is set by a call to post, otherwise false.
82 ///   bool try_wait_until(
83 ///       time_point& deadline,
84 ///       WaitOptions& = wait_options());
85 ///     Returns true if the flag is set by a call to post before the
86 ///     deadline, otherwise false.
87 ///   bool try_wait_for(
88 ///       duration&,
89 ///       WaitOptions& = wait_options());
90 ///     Returns true if the flag is set by a call to post before the
91 ///     expiration of the specified duration, otherwise false.
92 ///
93 /// Usage:
94 /// @code
95 /// SaturatingSemaphore</* MayBlock = */ true> f;
96 /// ASSERT_FALSE(f.try_wait());
97 /// ASSERT_FALSE(f.try_wait_until(
98 ///     std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
99 /// ASSERT_FALSE(f.try_wait_until(
100 ///     std::chrono::steady_clock::now() + std::chrono::microseconds(1),
101 ///     f.wait_options().pre_block(std::chrono::microseconds(1))));
102 /// f.post();
103 /// f.post();
104 /// f.wait();
105 /// f.wait(f.wait_options().pre_block(std::chrono::nanoseconds(100)));
106 /// ASSERT_TRUE(f.try_wait());
107 /// ASSERT_TRUE(f.try_wait_until(
108 ///     std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
109 /// f.wait();
110 /// f.reset();
111 /// ASSERT_FALSE(f.try_wait());
112 /// @endcode
113
114 template <bool MayBlock, template <typename> class Atom = std::atomic>
115 class SaturatingSemaphore {
116   detail::Futex<Atom> state_;
117
118   enum State : uint32_t {
119     NOTREADY = 0,
120     READY = 1,
121     BLOCKED = 2,
122   };
123
124  public:
125   /** WaitOptions */
126
127   class WaitOptions {
128     std::chrono::nanoseconds dur_{std::chrono::microseconds(10)};
129
130    public:
131     FOLLY_ALWAYS_INLINE
132     std::chrono::nanoseconds pre_block() const {
133       return dur_;
134     }
135     FOLLY_ALWAYS_INLINE
136     WaitOptions& pre_block(std::chrono::nanoseconds dur) {
137       dur_ = dur;
138       return *this;
139     }
140   };
141
142   FOLLY_ALWAYS_INLINE static WaitOptions wait_options() {
143     return {};
144   }
145
146   /** constructor */
147   constexpr SaturatingSemaphore() noexcept : state_(NOTREADY) {}
148
149   /** destructor */
150   ~SaturatingSemaphore() {}
151
152   /** reset */
153   void reset() noexcept {
154     state_.store(NOTREADY, std::memory_order_relaxed);
155   }
156
157   /** post */
158   FOLLY_ALWAYS_INLINE void post() noexcept {
159     if (!MayBlock) {
160       state_.store(READY, std::memory_order_release);
161     } else {
162       postFastWaiterMayBlock();
163     }
164   }
165
166   /** wait */
167   FOLLY_ALWAYS_INLINE
168   void wait(const WaitOptions& opt = wait_options()) noexcept {
169     try_wait_until(std::chrono::steady_clock::time_point::max(), opt);
170   }
171
172   /** try_wait */
173   FOLLY_ALWAYS_INLINE bool try_wait() const noexcept {
174     return state_.load(std::memory_order_acquire) == READY;
175   }
176
177   /** try_wait_until */
178   template <typename Clock, typename Duration>
179   FOLLY_ALWAYS_INLINE bool try_wait_until(
180       const std::chrono::time_point<Clock, Duration>& deadline,
181       const WaitOptions& opt = wait_options()) noexcept {
182     if (LIKELY(try_wait())) {
183       return true;
184     }
185     return tryWaitSlow(deadline, opt);
186   }
187
188   /** try_wait_for */
189   template <class Rep, class Period>
190   FOLLY_ALWAYS_INLINE bool try_wait_for(
191       const std::chrono::duration<Rep, Period>& duration,
192       const WaitOptions& opt = wait_options()) noexcept {
193     if (LIKELY(try_wait())) {
194       return true;
195     }
196     auto deadline = std::chrono::steady_clock::now() + duration;
197     return tryWaitSlow(deadline, opt);
198   }
199
200  private:
201   FOLLY_ALWAYS_INLINE void postFastWaiterMayBlock() noexcept {
202     uint32_t before = NOTREADY;
203     if (LIKELY(state_.compare_exchange_strong(
204             before,
205             READY,
206             std::memory_order_release,
207             std::memory_order_relaxed))) {
208       return;
209     }
210     postSlowWaiterMayBlock(before);
211   }
212
213   void postSlowWaiterMayBlock(uint32_t before) noexcept; // defined below
214
215   template <typename Clock, typename Duration>
216   bool tryWaitSlow(
217       const std::chrono::time_point<Clock, Duration>& deadline,
218       const WaitOptions& opt) noexcept; // defined below
219 };
220
221 ///
222 /// Member function definitioons
223 ///
224
225 /** postSlowWaiterMayBlock */
226 template <bool MayBlock, template <typename> class Atom>
227 FOLLY_NOINLINE void SaturatingSemaphore<MayBlock, Atom>::postSlowWaiterMayBlock(
228     uint32_t before) noexcept {
229   while (true) {
230     if (before == NOTREADY) {
231       if (state_.compare_exchange_strong(
232               before,
233               READY,
234               std::memory_order_release,
235               std::memory_order_relaxed)) {
236         return;
237       }
238     }
239     if (before == READY) { // Only if multiple posters
240       // The reason for not simply returning (without the following
241       // steps) is to prevent the following case:
242       //
243       //  T1:             T2:             T3:
244       //  local1.post();  local2.post();  global.wait();
245       //  global.post();  global.post();  global.reset();
246       //                                  seq_cst fence
247       //                                  local1.try_wait() == true;
248       //                                  local2.try_wait() == false;
249       //
250       // This following steps correspond to T2's global.post(), where
251       // global is already posted by T1.
252       //
253       // The following fence and load guarantee that T3 does not miss
254       // T2's prior stores, i.e., local2.post() in this example.
255       //
256       // The following case is prevented:
257       //
258       // Starting with local2 == NOTREADY and global == READY
259       //
260       // T2:                              T3:
261       // store READY to local2 // post    store NOTREADY to global // reset
262       // seq_cst fenc                     seq_cst fence
263       // load READY from global // post   load NOTREADY from local2 // try_wait
264       //
265       std::atomic_thread_fence(std::memory_order_seq_cst);
266       before = state_.load(std::memory_order_relaxed);
267       if (before == READY) {
268         return;
269       }
270       continue;
271     }
272     DCHECK_EQ(before, BLOCKED);
273     if (state_.compare_exchange_strong(
274             before,
275             READY,
276             std::memory_order_release,
277             std::memory_order_relaxed)) {
278       state_.futexWake();
279       return;
280     }
281   }
282 }
283
284 /** tryWaitSlow */
285 template <bool MayBlock, template <typename> class Atom>
286 template <typename Clock, typename Duration>
287 FOLLY_NOINLINE bool SaturatingSemaphore<MayBlock, Atom>::tryWaitSlow(
288     const std::chrono::time_point<Clock, Duration>& deadline,
289     const WaitOptions& opt) noexcept {
290   auto tbegin = Clock::now();
291   while (true) {
292     auto before = state_.load(std::memory_order_acquire);
293     if (before == READY) {
294       return true;
295     }
296     if (Clock::now() >= deadline) {
297       return false;
298     }
299     if (MayBlock) {
300       auto tnow = Clock::now();
301       if (tnow < tbegin) {
302         // backward time discontinuity in Clock, revise pre_block starting point
303         tbegin = tnow;
304       }
305       auto dur = std::chrono::duration_cast<Duration>(tnow - tbegin);
306       if (dur >= opt.pre_block()) {
307         if (before == NOTREADY) {
308           if (!state_.compare_exchange_strong(
309                   before,
310                   BLOCKED,
311                   std::memory_order_relaxed,
312                   std::memory_order_relaxed)) {
313             continue;
314           }
315         }
316         if (deadline == std::chrono::time_point<Clock, Duration>::max()) {
317           state_.futexWait(BLOCKED);
318         } else {
319           state_.futexWaitUntil(BLOCKED, deadline);
320         }
321       }
322     }
323     asm_volatile_pause();
324   }
325 }
326
327 } // namespace folly