Don't use pthread_spinlock_t in TimedRWMutex
[folly.git] / folly / LifoSem.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <string.h>
20 #include <stdint.h>
21 #include <atomic>
22 #include <algorithm>
23 #include <memory>
24 #include <system_error>
25
26 #include <folly/AtomicStruct.h>
27 #include <folly/Baton.h>
28 #include <folly/IndexedMemPool.h>
29 #include <folly/Likely.h>
30 #include <folly/detail/CacheLocality.h>
31
32 namespace folly {
33
34 template <template<typename> class Atom = std::atomic,
35           class BatonType = Baton<Atom>>
36 struct LifoSemImpl;
37
38 /// LifoSem is a semaphore that wakes its waiters in a manner intended to
39 /// maximize performance rather than fairness.  It should be preferred
40 /// to a mutex+condvar or POSIX sem_t solution when all of the waiters
41 /// are equivalent.  It is faster than a condvar or sem_t, and it has a
42 /// shutdown state that might save you a lot of complexity when it comes
43 /// time to shut down your work pipelines.  LifoSem is larger than sem_t,
44 /// but that is only because it uses padding and alignment to avoid
45 /// false sharing.
46 ///
47 /// LifoSem allows multi-post and multi-tryWait, and provides a shutdown
48 /// state that awakens all waiters.  LifoSem is faster than sem_t because
49 /// it performs exact wakeups, so it often requires fewer system calls.
50 /// It provides all of the functionality of sem_t except for timed waiting.
51 /// It is called LifoSem because its wakeup policy is approximately LIFO,
52 /// rather than the usual FIFO.
53 ///
54 /// The core semaphore operations provided are:
55 ///
56 /// -- post() -- if there is a pending waiter, wake it up, otherwise
57 /// increment the value of the semaphore.  If the value of the semaphore
58 /// is already 2^32-1, does nothing.  Compare to sem_post().
59 ///
60 /// -- post(n) -- equivalent to n calls to post(), but much more efficient.
61 /// sem_t has no equivalent to this method.
62 ///
63 /// -- bool tryWait() -- if the semaphore's value is positive, decrements it
64 /// and returns true, otherwise returns false.  Compare to sem_trywait().
65 ///
66 /// -- uint32_t tryWait(uint32_t n) -- attempts to decrement the semaphore's
67 /// value by n, returning the amount by which it actually was decremented
68 /// (a value from 0 to n inclusive).  Not atomic.  Equivalent to n calls
69 /// to tryWait().  sem_t has no equivalent to this method.
70 ///
71 /// -- wait() -- waits until tryWait() can succeed.  Compare to sem_wait().
72 ///
73 /// LifoSem also has the notion of a shutdown state, in which any calls
74 /// that would block (or are already blocked) throw ShutdownSemError.
75 /// Note the difference between a call to wait() and a call to wait()
76 /// that might block.  In the former case tryWait() would succeed, and no
77 /// isShutdown() check is performed.  In the latter case an exception is
78 /// thrown.  This behavior allows a LifoSem controlling work distribution
79 /// to drain.  If you want to immediately stop all waiting on shutdown,
80 /// you can just check isShutdown() yourself (preferrably wrapped in
81 /// an UNLIKELY).  This fast-stop behavior is easy to add, but difficult
82 /// to remove if you want the draining behavior, which is why we have
83 /// chosen the former.  Since wait() is the only method that can block,
84 /// it is the only one that is affected by the shutdown state.
85 ///
86 /// All LifoSem operations operations except valueGuess() are guaranteed
87 /// to be linearizable.
88 typedef LifoSemImpl<> LifoSem;
89
90
91 /// The exception thrown when wait()ing on an isShutdown() LifoSem
92 struct ShutdownSemError : public std::runtime_error {
93   explicit ShutdownSemError(const std::string& msg);
94   virtual ~ShutdownSemError() noexcept;
95 };
96
97 namespace detail {
98
99 // Internally, a LifoSem is either a value or a linked list of wait nodes.
100 // This union is captured in the LifoSemHead type, which holds either a
101 // value or an indexed pointer to the list.  LifoSemHead itself is a value
102 // type, the head is a mutable atomic box containing a LifoSemHead value.
103 // Each wait node corresponds to exactly one waiter.  Values can flow
104 // through the semaphore either by going into and out of the head's value,
105 // or by direct communication from a poster to a waiter.  The former path
106 // is taken when there are no pending waiters, the latter otherwise.  The
107 // general flow of a post is to try to increment the value or pop-and-post
108 // a wait node.  Either of those have the effect of conveying one semaphore
109 // unit.  Waiting is the opposite, either a decrement of the value or
110 // push-and-wait of a wait node.  The generic LifoSemBase abstracts the
111 // actual mechanism by which a wait node's post->wait communication is
112 // performed, which is why we have LifoSemRawNode and LifoSemNode.
113
114 /// LifoSemRawNode is the actual pooled storage that backs LifoSemNode
115 /// for user-specified Handoff types.  This is done so that we can have
116 /// a large static IndexedMemPool of nodes, instead of per-type pools
117 template <template<typename> class Atom>
118 struct LifoSemRawNode {
119   std::aligned_storage<sizeof(void*),alignof(void*)>::type raw;
120
121   /// The IndexedMemPool index of the next node in this chain, or 0
122   /// if none.  This will be set to uint32_t(-1) if the node is being
123   /// posted due to a shutdown-induced wakeup
124   uint32_t next;
125
126   bool isShutdownNotice() const { return next == uint32_t(-1); }
127   void clearShutdownNotice() { next = 0; }
128   void setShutdownNotice() { next = uint32_t(-1); }
129
130   typedef folly::IndexedMemPool<LifoSemRawNode<Atom>,32,200,Atom> Pool;
131
132   /// Storage for all of the waiter nodes for LifoSem-s that use Atom
133   static Pool& pool();
134 };
135
136 /// Use this macro to declare the static storage that backs the raw nodes
137 /// for the specified atomic type
138 #define LIFOSEM_DECLARE_POOL(Atom, capacity)                 \
139   namespace folly {                                          \
140   namespace detail {                                         \
141   template <>                                                \
142   LifoSemRawNode<Atom>::Pool& LifoSemRawNode<Atom>::pool() { \
143     static Pool* instance = new Pool((capacity));            \
144     return *instance;                                        \
145   }                                                          \
146   }                                                          \
147   }
148
149 /// Handoff is a type not bigger than a void* that knows how to perform a
150 /// single post() -> wait() communication.  It must have a post() method.
151 /// If it has a wait() method then LifoSemBase's wait() implementation
152 /// will work out of the box, otherwise you will need to specialize
153 /// LifoSemBase::wait accordingly.
154 template <typename Handoff, template<typename> class Atom>
155 struct LifoSemNode : public LifoSemRawNode<Atom> {
156
157   static_assert(sizeof(Handoff) <= sizeof(LifoSemRawNode<Atom>::raw),
158       "Handoff too big for small-object optimization, use indirection");
159   static_assert(alignof(Handoff) <=
160                 alignof(decltype(LifoSemRawNode<Atom>::raw)),
161       "Handoff alignment constraint not satisfied");
162
163   template <typename ...Args>
164   void init(Args&&... args) {
165     new (&this->raw) Handoff(std::forward<Args>(args)...);
166   }
167
168   void destroy() {
169     handoff().~Handoff();
170 #ifndef NDEBUG
171     memset(&this->raw, 'F', sizeof(this->raw));
172 #endif
173   }
174
175   Handoff& handoff() {
176     return *static_cast<Handoff*>(static_cast<void*>(&this->raw));
177   }
178
179   const Handoff& handoff() const {
180     return *static_cast<const Handoff*>(static_cast<const void*>(&this->raw));
181   }
182 };
183
184 template <typename Handoff, template<typename> class Atom>
185 struct LifoSemNodeRecycler {
186   void operator()(LifoSemNode<Handoff,Atom>* elem) const {
187     elem->destroy();
188     auto idx = LifoSemRawNode<Atom>::pool().locateElem(elem);
189     LifoSemRawNode<Atom>::pool().recycleIndex(idx);
190   }
191 };
192
193 /// LifoSemHead is a 64-bit struct that holds a 32-bit value, some state
194 /// bits, and a sequence number used to avoid ABA problems in the lock-free
195 /// management of the LifoSem's wait lists.  The value can either hold
196 /// an integral semaphore value (if there are no waiters) or a node index
197 /// (see IndexedMemPool) for the head of a list of wait nodes
198 class LifoSemHead {
199   // What we really want are bitfields:
200   //  uint64_t data : 32; uint64_t isNodeIdx : 1; uint64_t seq : 31;
201   // Unfortunately g++ generates pretty bad code for this sometimes (I saw
202   // -O3 code from gcc 4.7.1 copying the bitfields one at a time instead of
203   // in bulk, for example).  We can generate better code anyway by assuming
204   // that setters won't be given values that cause under/overflow, and
205   // putting the sequence at the end where its planned overflow doesn't
206   // need any masking.
207   //
208   // data == 0 (empty list) with isNodeIdx is conceptually the same
209   // as data == 0 (no unclaimed increments) with !isNodeIdx, we always
210   // convert the former into the latter to make the logic simpler.
211   enum {
212     IsNodeIdxShift = 32,
213     IsShutdownShift = 33,
214     SeqShift = 34,
215   };
216   enum : uint64_t {
217     IsNodeIdxMask = uint64_t(1) << IsNodeIdxShift,
218     IsShutdownMask = uint64_t(1) << IsShutdownShift,
219     SeqIncr = uint64_t(1) << SeqShift,
220     SeqMask = ~(SeqIncr - 1),
221   };
222
223  public:
224
225   uint64_t bits;
226
227   //////// getters
228
229   inline uint32_t idx() const {
230     assert(isNodeIdx());
231     assert(uint32_t(bits) != 0);
232     return uint32_t(bits);
233   }
234   inline uint32_t value() const {
235     assert(!isNodeIdx());
236     return uint32_t(bits);
237   }
238   inline constexpr bool isNodeIdx() const {
239     return (bits & IsNodeIdxMask) != 0;
240   }
241   inline constexpr bool isShutdown() const {
242     return (bits & IsShutdownMask) != 0;
243   }
244   inline constexpr uint32_t seq() const {
245     return uint32_t(bits >> SeqShift);
246   }
247
248   //////// setter-like things return a new struct
249
250   /// This should only be used for initial construction, not for setting
251   /// the value, because it clears the sequence number
252   static inline constexpr LifoSemHead fresh(uint32_t value) {
253     return LifoSemHead{ value };
254   }
255
256   /// Returns the LifoSemHead that results from popping a waiter node,
257   /// given the current waiter node's next ptr
258   inline LifoSemHead withPop(uint32_t idxNext) const {
259     assert(isNodeIdx());
260     if (idxNext == 0) {
261       // no isNodeIdx bit or data bits.  Wraparound of seq bits is okay
262       return LifoSemHead{ (bits & (SeqMask | IsShutdownMask)) + SeqIncr };
263     } else {
264       // preserve sequence bits (incremented with wraparound okay) and
265       // isNodeIdx bit, replace all data bits
266       return LifoSemHead{
267           (bits & (SeqMask | IsShutdownMask | IsNodeIdxMask)) +
268           SeqIncr + idxNext };
269     }
270   }
271
272   /// Returns the LifoSemHead that results from pushing a new waiter node
273   inline LifoSemHead withPush(uint32_t _idx) const {
274     assert(isNodeIdx() || value() == 0);
275     assert(!isShutdown());
276     assert(_idx != 0);
277     return LifoSemHead{ (bits & SeqMask) | IsNodeIdxMask | _idx };
278   }
279
280   /// Returns the LifoSemHead with value increased by delta, with
281   /// saturation if the maximum value is reached
282   inline LifoSemHead withValueIncr(uint32_t delta) const {
283     assert(!isNodeIdx());
284     auto rv = LifoSemHead{ bits + SeqIncr + delta };
285     if (UNLIKELY(rv.isNodeIdx())) {
286       // value has overflowed into the isNodeIdx bit
287       rv = LifoSemHead{ (rv.bits & ~IsNodeIdxMask) | (IsNodeIdxMask - 1) };
288     }
289     return rv;
290   }
291
292   /// Returns the LifoSemHead that results from decrementing the value
293   inline LifoSemHead withValueDecr(uint32_t delta) const {
294     assert(delta > 0 && delta <= value());
295     return LifoSemHead{ bits + SeqIncr - delta };
296   }
297
298   /// Returns the LifoSemHead with the same state as the current node,
299   /// but with the shutdown bit set
300   inline LifoSemHead withShutdown() const {
301     return LifoSemHead{ bits | IsShutdownMask };
302   }
303
304   inline constexpr bool operator== (const LifoSemHead& rhs) const {
305     return bits == rhs.bits;
306   }
307   inline constexpr bool operator!= (const LifoSemHead& rhs) const {
308     return !(*this == rhs);
309   }
310 };
311
312 /// LifoSemBase is the engine for several different types of LIFO
313 /// semaphore.  LifoSemBase handles storage of positive semaphore values
314 /// and wait nodes, but the actual waiting and notification mechanism is
315 /// up to the client.
316 ///
317 /// The Handoff type is responsible for arranging one wakeup notification.
318 /// See LifoSemNode for more information on how to make your own.
319 template <typename Handoff,
320           template<typename> class Atom = std::atomic>
321 struct LifoSemBase {
322
323   /// Constructor
324   constexpr explicit LifoSemBase(uint32_t initialValue = 0)
325       : head_(LifoSemHead::fresh(initialValue)), padding_() {}
326
327   LifoSemBase(LifoSemBase const&) = delete;
328   LifoSemBase& operator=(LifoSemBase const&) = delete;
329
330   /// Silently saturates if value is already 2^32-1
331   void post() {
332     auto idx = incrOrPop(1);
333     if (idx != 0) {
334       idxToNode(idx).handoff().post();
335     }
336   }
337
338   /// Equivalent to n calls to post(), except may be much more efficient.
339   /// At any point in time at which the semaphore's value would exceed
340   /// 2^32-1 if tracked with infinite precision, it may be silently
341   /// truncated to 2^32-1.  This saturation is not guaranteed to be exact,
342   /// although it is guaranteed that overflow won't result in wrap-around.
343   /// There would be a substantial performance and complexity cost in
344   /// guaranteeing exact saturation (similar to the cost of maintaining
345   /// linearizability near the zero value, but without as much of
346   /// a benefit).
347   void post(uint32_t n) {
348     uint32_t idx;
349     while (n > 0 && (idx = incrOrPop(n)) != 0) {
350       // pop accounts for only 1
351       idxToNode(idx).handoff().post();
352       --n;
353     }
354   }
355
356   /// Returns true iff shutdown() has been called
357   bool isShutdown() const {
358     return UNLIKELY(head_.load(std::memory_order_acquire).isShutdown());
359   }
360
361   /// Prevents blocking on this semaphore, causing all blocking wait()
362   /// calls to throw ShutdownSemError.  Both currently blocked wait() and
363   /// future calls to wait() for which tryWait() would return false will
364   /// cause an exception.  Calls to wait() for which the matching post()
365   /// has already occurred will proceed normally.
366   void shutdown() {
367     // first set the shutdown bit
368     auto h = head_.load(std::memory_order_acquire);
369     while (!h.isShutdown()) {
370       if (head_.compare_exchange_strong(h, h.withShutdown())) {
371         // success
372         h = h.withShutdown();
373         break;
374       }
375       // compare_exchange_strong rereads h, retry
376     }
377
378     // now wake up any waiters
379     while (h.isNodeIdx()) {
380       auto& node = idxToNode(h.idx());
381       auto repl = h.withPop(node.next);
382       if (head_.compare_exchange_strong(h, repl)) {
383         // successful pop, wake up the waiter and move on.  The next
384         // field is used to convey that this wakeup didn't consume a value
385         node.setShutdownNotice();
386         node.handoff().post();
387         h = repl;
388       }
389     }
390   }
391
392   /// Returns true iff value was decremented
393   bool tryWait() {
394     uint32_t n = 1;
395     auto rv = decrOrPush(n, 0);
396     assert((rv == WaitResult::DECR && n == 0) ||
397            (rv != WaitResult::DECR && n == 1));
398     // SHUTDOWN is okay here, since we don't actually wait
399     return rv == WaitResult::DECR;
400   }
401
402   /// Equivalent to (but may be much more efficient than) n calls to
403   /// tryWait().  Returns the total amount by which the semaphore's value
404   /// was decreased
405   uint32_t tryWait(uint32_t n) {
406     auto const orig = n;
407     while (n > 0) {
408 #ifndef NDEBUG
409       auto prev = n;
410 #endif
411       auto rv = decrOrPush(n, 0);
412       assert((rv == WaitResult::DECR && n < prev) ||
413              (rv != WaitResult::DECR && n == prev));
414       if (rv != WaitResult::DECR) {
415         break;
416       }
417     }
418     return orig - n;
419   }
420
421   /// Blocks the current thread until there is a matching post or the
422   /// semaphore is shut down.  Throws ShutdownSemError if the semaphore
423   /// has been shut down and this method would otherwise be blocking.
424   /// Note that wait() doesn't throw during shutdown if tryWait() would
425   /// return true
426   void wait() {
427     // early check isn't required for correctness, but is an important
428     // perf win if we can avoid allocating and deallocating a node
429     if (tryWait()) {
430       return;
431     }
432
433     // allocateNode() won't compile unless Handoff has a default
434     // constructor
435     UniquePtr node = allocateNode();
436
437     auto rv = tryWaitOrPush(*node);
438     if (UNLIKELY(rv == WaitResult::SHUTDOWN)) {
439       assert(isShutdown());
440       throw ShutdownSemError("wait() would block but semaphore is shut down");
441     }
442
443     if (rv == WaitResult::PUSH) {
444       node->handoff().wait();
445       if (UNLIKELY(node->isShutdownNotice())) {
446         // this wait() didn't consume a value, it was triggered by shutdown
447         assert(isShutdown());
448         throw ShutdownSemError(
449             "blocking wait() interrupted by semaphore shutdown");
450       }
451
452       // node->handoff().wait() can't return until after the node has
453       // been popped and post()ed, so it is okay for the UniquePtr to
454       // recycle the node now
455     }
456     // else node wasn't pushed, so it is safe to recycle
457   }
458
459   /// Returns a guess at the current value, designed for debugging.
460   /// If there are no concurrent posters or waiters then this will
461   /// be correct
462   uint32_t valueGuess() const {
463     // this is actually linearizable, but we don't promise that because
464     // we may want to add striping in the future to help under heavy
465     // contention
466     auto h = head_.load(std::memory_order_acquire);
467     return h.isNodeIdx() ? 0 : h.value();
468   }
469
470  protected:
471
472   enum class WaitResult {
473     PUSH,
474     DECR,
475     SHUTDOWN,
476   };
477
478   /// The type of a std::unique_ptr that will automatically return a
479   /// LifoSemNode to the appropriate IndexedMemPool
480   typedef std::unique_ptr<LifoSemNode<Handoff, Atom>,
481                           LifoSemNodeRecycler<Handoff, Atom>> UniquePtr;
482
483   /// Returns a node that can be passed to decrOrLink
484   template <typename... Args>
485   UniquePtr allocateNode(Args&&... args) {
486     auto idx = LifoSemRawNode<Atom>::pool().allocIndex();
487     if (idx != 0) {
488       auto& node = idxToNode(idx);
489       node.clearShutdownNotice();
490       try {
491         node.init(std::forward<Args>(args)...);
492       } catch (...) {
493         LifoSemRawNode<Atom>::pool().recycleIndex(idx);
494         throw;
495       }
496       return UniquePtr(&node);
497     } else {
498       return UniquePtr();
499     }
500   }
501
502   /// Returns DECR if the semaphore value was decremented (and waiterNode
503   /// was untouched), PUSH if a reference to the wait node was pushed,
504   /// or SHUTDOWN if decrement was not possible and push wasn't allowed
505   /// because isShutdown().  Ownership of the wait node remains the
506   /// responsibility of the caller, who must not release it until after
507   /// the node's Handoff has been posted.
508   WaitResult tryWaitOrPush(LifoSemNode<Handoff, Atom>& waiterNode) {
509     uint32_t n = 1;
510     return decrOrPush(n, nodeToIdx(waiterNode));
511   }
512
513  private:
514
515   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
516   folly::AtomicStruct<LifoSemHead,Atom> head_;
517
518   char padding_[folly::detail::CacheLocality::kFalseSharingRange -
519       sizeof(LifoSemHead)];
520
521
522   static LifoSemNode<Handoff, Atom>& idxToNode(uint32_t idx) {
523     auto raw = &LifoSemRawNode<Atom>::pool()[idx];
524     return *static_cast<LifoSemNode<Handoff, Atom>*>(raw);
525   }
526
527   static uint32_t nodeToIdx(const LifoSemNode<Handoff, Atom>& node) {
528     return LifoSemRawNode<Atom>::pool().locateElem(&node);
529   }
530
531   /// Either increments by n and returns 0, or pops a node and returns it.
532   /// If n + the stripe's value overflows, then the stripe's value
533   /// saturates silently at 2^32-1
534   uint32_t incrOrPop(uint32_t n) {
535     while (true) {
536       assert(n > 0);
537
538       auto head = head_.load(std::memory_order_acquire);
539       if (head.isNodeIdx()) {
540         auto& node = idxToNode(head.idx());
541         if (head_.compare_exchange_strong(head, head.withPop(node.next))) {
542           // successful pop
543           return head.idx();
544         }
545       } else {
546         auto after = head.withValueIncr(n);
547         if (head_.compare_exchange_strong(head, after)) {
548           // successful incr
549           return 0;
550         }
551       }
552       // retry
553     }
554   }
555
556   /// Returns DECR if some amount was decremented, with that amount
557   /// subtracted from n.  If n is 1 and this function returns DECR then n
558   /// must be 0 afterward.  Returns PUSH if no value could be decremented
559   /// and idx was pushed, or if idx was zero and no push was performed but
560   /// a push would have been performed with a valid node.  Returns SHUTDOWN
561   /// if the caller should have blocked but isShutdown().  If idx == 0,
562   /// may return PUSH even after isShutdown() or may return SHUTDOWN
563   WaitResult decrOrPush(uint32_t& n, uint32_t idx) {
564     assert(n > 0);
565
566     while (true) {
567       auto head = head_.load(std::memory_order_acquire);
568
569       if (!head.isNodeIdx() && head.value() > 0) {
570         // decr
571         auto delta = std::min(n, head.value());
572         if (head_.compare_exchange_strong(head, head.withValueDecr(delta))) {
573           n -= delta;
574           return WaitResult::DECR;
575         }
576       } else {
577         // push
578         if (idx == 0) {
579           return WaitResult::PUSH;
580         }
581
582         if (UNLIKELY(head.isShutdown())) {
583           return WaitResult::SHUTDOWN;
584         }
585
586         auto& node = idxToNode(idx);
587         node.next = head.isNodeIdx() ? head.idx() : 0;
588         if (head_.compare_exchange_strong(head, head.withPush(idx))) {
589           // push succeeded
590           return WaitResult::PUSH;
591         }
592       }
593     }
594     // retry
595   }
596 };
597
598 } // namespace detail
599
600 template <template<typename> class Atom, class BatonType>
601 struct LifoSemImpl : public detail::LifoSemBase<BatonType, Atom> {
602   constexpr explicit LifoSemImpl(uint32_t v = 0)
603     : detail::LifoSemBase<BatonType, Atom>(v) {}
604 };
605
606 } // namespace folly