Easy SIOF-proofing
[folly.git] / folly / LifoSem.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_LIFOSEM_H
18 #define FOLLY_LIFOSEM_H
19
20 #include <string.h>
21 #include <stdint.h>
22 #include <atomic>
23 #include <algorithm>
24 #include <memory>
25 #include <system_error>
26
27 #include <folly/AtomicStruct.h>
28 #include <folly/Baton.h>
29 #include <folly/IndexedMemPool.h>
30 #include <folly/Likely.h>
31 #include <folly/detail/CacheLocality.h>
32
33 namespace folly {
34
35 template <template<typename> class Atom = std::atomic,
36           class BatonType = Baton<Atom>>
37 struct LifoSemImpl;
38
39 /// LifoSem is a semaphore that wakes its waiters in a manner intended to
40 /// maximize performance rather than fairness.  It should be preferred
41 /// to a mutex+condvar or POSIX sem_t solution when all of the waiters
42 /// are equivalent.  It is faster than a condvar or sem_t, and it has a
43 /// shutdown state that might save you a lot of complexity when it comes
44 /// time to shut down your work pipelines.  LifoSem is larger than sem_t,
45 /// but that is only because it uses padding and alignment to avoid
46 /// false sharing.
47 ///
48 /// LifoSem allows multi-post and multi-tryWait, and provides a shutdown
49 /// state that awakens all waiters.  LifoSem is faster than sem_t because
50 /// it performs exact wakeups, so it often requires fewer system calls.
51 /// It provides all of the functionality of sem_t except for timed waiting.
52 /// It is called LifoSem because its wakeup policy is approximately LIFO,
53 /// rather than the usual FIFO.
54 ///
55 /// The core semaphore operations provided are:
56 ///
57 /// -- post() -- if there is a pending waiter, wake it up, otherwise
58 /// increment the value of the semaphore.  If the value of the semaphore
59 /// is already 2^32-1, does nothing.  Compare to sem_post().
60 ///
61 /// -- post(n) -- equivalent to n calls to post(), but much more efficient.
62 /// sem_t has no equivalent to this method.
63 ///
64 /// -- bool tryWait() -- if the semaphore's value is positive, decrements it
65 /// and returns true, otherwise returns false.  Compare to sem_trywait().
66 ///
67 /// -- uint32_t tryWait(uint32_t n) -- attempts to decrement the semaphore's
68 /// value by n, returning the amount by which it actually was decremented
69 /// (a value from 0 to n inclusive).  Not atomic.  Equivalent to n calls
70 /// to tryWait().  sem_t has no equivalent to this method.
71 ///
72 /// -- wait() -- waits until tryWait() can succeed.  Compare to sem_wait().
73 ///
74 /// LifoSem also has the notion of a shutdown state, in which any calls
75 /// that would block (or are already blocked) throw ShutdownSemError.
76 /// Note the difference between a call to wait() and a call to wait()
77 /// that might block.  In the former case tryWait() would succeed, and no
78 /// isShutdown() check is performed.  In the latter case an exception is
79 /// thrown.  This behavior allows a LifoSem controlling work distribution
80 /// to drain.  If you want to immediately stop all waiting on shutdown,
81 /// you can just check isShutdown() yourself (preferrably wrapped in
82 /// an UNLIKELY).  This fast-stop behavior is easy to add, but difficult
83 /// to remove if you want the draining behavior, which is why we have
84 /// chosen the former.  Since wait() is the only method that can block,
85 /// it is the only one that is affected by the shutdown state.
86 ///
87 /// All LifoSem operations operations except valueGuess() are guaranteed
88 /// to be linearizable.
89 typedef LifoSemImpl<> LifoSem;
90
91
92 /// The exception thrown when wait()ing on an isShutdown() LifoSem
93 struct ShutdownSemError : public std::runtime_error {
94   explicit ShutdownSemError(const std::string& msg);
95   virtual ~ShutdownSemError() noexcept;
96 };
97
98 namespace detail {
99
100 // Internally, a LifoSem is either a value or a linked list of wait nodes.
101 // This union is captured in the LifoSemHead type, which holds either a
102 // value or an indexed pointer to the list.  LifoSemHead itself is a value
103 // type, the head is a mutable atomic box containing a LifoSemHead value.
104 // Each wait node corresponds to exactly one waiter.  Values can flow
105 // through the semaphore either by going into and out of the head's value,
106 // or by direct communication from a poster to a waiter.  The former path
107 // is taken when there are no pending waiters, the latter otherwise.  The
108 // general flow of a post is to try to increment the value or pop-and-post
109 // a wait node.  Either of those have the effect of conveying one semaphore
110 // unit.  Waiting is the opposite, either a decrement of the value or
111 // push-and-wait of a wait node.  The generic LifoSemBase abstracts the
112 // actual mechanism by which a wait node's post->wait communication is
113 // performed, which is why we have LifoSemRawNode and LifoSemNode.
114
115 /// LifoSemRawNode is the actual pooled storage that backs LifoSemNode
116 /// for user-specified Handoff types.  This is done so that we can have
117 /// a large static IndexedMemPool of nodes, instead of per-type pools
118 template <template<typename> class Atom>
119 struct LifoSemRawNode {
120   std::aligned_storage<sizeof(void*),alignof(void*)>::type raw;
121
122   /// The IndexedMemPool index of the next node in this chain, or 0
123   /// if none.  This will be set to uint32_t(-1) if the node is being
124   /// posted due to a shutdown-induced wakeup
125   uint32_t next;
126
127   bool isShutdownNotice() const { return next == uint32_t(-1); }
128   void clearShutdownNotice() { next = 0; }
129   void setShutdownNotice() { next = uint32_t(-1); }
130
131   typedef folly::IndexedMemPool<LifoSemRawNode<Atom>,32,200,Atom> Pool;
132
133   /// Storage for all of the waiter nodes for LifoSem-s that use Atom
134   static Pool pool;
135 };
136
137 /// Use this macro to declare the static storage that backs the raw nodes
138 /// for the specified atomic type
139 #define LIFOSEM_DECLARE_POOL(Atom, capacity)                       \
140     template<>                                                     \
141     folly::detail::LifoSemRawNode<Atom>::Pool                      \
142         folly::detail::LifoSemRawNode<Atom>::pool((capacity));
143
144 /// Handoff is a type not bigger than a void* that knows how to perform a
145 /// single post() -> wait() communication.  It must have a post() method.
146 /// If it has a wait() method then LifoSemBase's wait() implementation
147 /// will work out of the box, otherwise you will need to specialize
148 /// LifoSemBase::wait accordingly.
149 template <typename Handoff, template<typename> class Atom>
150 struct LifoSemNode : public LifoSemRawNode<Atom> {
151
152   static_assert(sizeof(Handoff) <= sizeof(LifoSemRawNode<Atom>::raw),
153       "Handoff too big for small-object optimization, use indirection");
154   static_assert(alignof(Handoff) <=
155                 alignof(decltype(LifoSemRawNode<Atom>::raw)),
156       "Handoff alignment constraint not satisfied");
157
158   template <typename ...Args>
159   void init(Args&&... args) {
160     new (&this->raw) Handoff(std::forward<Args>(args)...);
161   }
162
163   void destroy() {
164     handoff().~Handoff();
165 #ifndef NDEBUG
166     memset(&this->raw, 'F', sizeof(this->raw));
167 #endif
168   }
169
170   Handoff& handoff() {
171     return *static_cast<Handoff*>(static_cast<void*>(&this->raw));
172   }
173
174   const Handoff& handoff() const {
175     return *static_cast<const Handoff*>(static_cast<const void*>(&this->raw));
176   }
177 };
178
179 template <typename Handoff, template<typename> class Atom>
180 struct LifoSemNodeRecycler {
181   void operator()(LifoSemNode<Handoff,Atom>* elem) const {
182     elem->destroy();
183     auto idx = LifoSemRawNode<Atom>::pool.locateElem(elem);
184     LifoSemRawNode<Atom>::pool.recycleIndex(idx);
185   }
186 };
187
188 /// LifoSemHead is a 64-bit struct that holds a 32-bit value, some state
189 /// bits, and a sequence number used to avoid ABA problems in the lock-free
190 /// management of the LifoSem's wait lists.  The value can either hold
191 /// an integral semaphore value (if there are no waiters) or a node index
192 /// (see IndexedMemPool) for the head of a list of wait nodes
193 class LifoSemHead {
194   // What we really want are bitfields:
195   //  uint64_t data : 32; uint64_t isNodeIdx : 1; uint64_t seq : 31;
196   // Unfortunately g++ generates pretty bad code for this sometimes (I saw
197   // -O3 code from gcc 4.7.1 copying the bitfields one at a time instead of
198   // in bulk, for example).  We can generate better code anyway by assuming
199   // that setters won't be given values that cause under/overflow, and
200   // putting the sequence at the end where its planned overflow doesn't
201   // need any masking.
202   //
203   // data == 0 (empty list) with isNodeIdx is conceptually the same
204   // as data == 0 (no unclaimed increments) with !isNodeIdx, we always
205   // convert the former into the latter to make the logic simpler.
206   enum {
207     IsNodeIdxShift = 32,
208     IsShutdownShift = 33,
209     SeqShift = 34,
210   };
211   enum : uint64_t {
212     IsNodeIdxMask = uint64_t(1) << IsNodeIdxShift,
213     IsShutdownMask = uint64_t(1) << IsShutdownShift,
214     SeqIncr = uint64_t(1) << SeqShift,
215     SeqMask = ~(SeqIncr - 1),
216   };
217
218  public:
219
220   uint64_t bits;
221
222   //////// getters
223
224   inline uint32_t idx() const {
225     assert(isNodeIdx());
226     assert(uint32_t(bits) != 0);
227     return uint32_t(bits);
228   }
229   inline uint32_t value() const {
230     assert(!isNodeIdx());
231     return uint32_t(bits);
232   }
233   inline constexpr bool isNodeIdx() const {
234     return (bits & IsNodeIdxMask) != 0;
235   }
236   inline constexpr bool isShutdown() const {
237     return (bits & IsShutdownMask) != 0;
238   }
239   inline constexpr uint32_t seq() const {
240     return uint32_t(bits >> SeqShift);
241   }
242
243   //////// setter-like things return a new struct
244
245   /// This should only be used for initial construction, not for setting
246   /// the value, because it clears the sequence number
247   static inline constexpr LifoSemHead fresh(uint32_t value) {
248     return LifoSemHead{ value };
249   }
250
251   /// Returns the LifoSemHead that results from popping a waiter node,
252   /// given the current waiter node's next ptr
253   inline LifoSemHead withPop(uint32_t idxNext) const {
254     assert(isNodeIdx());
255     if (idxNext == 0) {
256       // no isNodeIdx bit or data bits.  Wraparound of seq bits is okay
257       return LifoSemHead{ (bits & (SeqMask | IsShutdownMask)) + SeqIncr };
258     } else {
259       // preserve sequence bits (incremented with wraparound okay) and
260       // isNodeIdx bit, replace all data bits
261       return LifoSemHead{
262           (bits & (SeqMask | IsShutdownMask | IsNodeIdxMask)) +
263           SeqIncr + idxNext };
264     }
265   }
266
267   /// Returns the LifoSemHead that results from pushing a new waiter node
268   inline LifoSemHead withPush(uint32_t _idx) const {
269     assert(isNodeIdx() || value() == 0);
270     assert(!isShutdown());
271     assert(_idx != 0);
272     return LifoSemHead{ (bits & SeqMask) | IsNodeIdxMask | _idx };
273   }
274
275   /// Returns the LifoSemHead with value increased by delta, with
276   /// saturation if the maximum value is reached
277   inline LifoSemHead withValueIncr(uint32_t delta) const {
278     assert(!isNodeIdx());
279     auto rv = LifoSemHead{ bits + SeqIncr + delta };
280     if (UNLIKELY(rv.isNodeIdx())) {
281       // value has overflowed into the isNodeIdx bit
282       rv = LifoSemHead{ (rv.bits & ~IsNodeIdxMask) | (IsNodeIdxMask - 1) };
283     }
284     return rv;
285   }
286
287   /// Returns the LifoSemHead that results from decrementing the value
288   inline LifoSemHead withValueDecr(uint32_t delta) const {
289     assert(delta > 0 && delta <= value());
290     return LifoSemHead{ bits + SeqIncr - delta };
291   }
292
293   /// Returns the LifoSemHead with the same state as the current node,
294   /// but with the shutdown bit set
295   inline LifoSemHead withShutdown() const {
296     return LifoSemHead{ bits | IsShutdownMask };
297   }
298
299   inline constexpr bool operator== (const LifoSemHead& rhs) const {
300     return bits == rhs.bits;
301   }
302   inline constexpr bool operator!= (const LifoSemHead& rhs) const {
303     return !(*this == rhs);
304   }
305 };
306
307 /// LifoSemBase is the engine for several different types of LIFO
308 /// semaphore.  LifoSemBase handles storage of positive semaphore values
309 /// and wait nodes, but the actual waiting and notification mechanism is
310 /// up to the client.
311 ///
312 /// The Handoff type is responsible for arranging one wakeup notification.
313 /// See LifoSemNode for more information on how to make your own.
314 template <typename Handoff,
315           template<typename> class Atom = std::atomic>
316 struct LifoSemBase {
317
318   /// Constructor
319   constexpr explicit LifoSemBase(uint32_t initialValue = 0)
320     : head_(LifoSemHead::fresh(initialValue)) {}
321
322   LifoSemBase(LifoSemBase const&) = delete;
323   LifoSemBase& operator=(LifoSemBase const&) = delete;
324
325   /// Silently saturates if value is already 2^32-1
326   void post() {
327     auto idx = incrOrPop(1);
328     if (idx != 0) {
329       idxToNode(idx).handoff().post();
330     }
331   }
332
333   /// Equivalent to n calls to post(), except may be much more efficient.
334   /// At any point in time at which the semaphore's value would exceed
335   /// 2^32-1 if tracked with infinite precision, it may be silently
336   /// truncated to 2^32-1.  This saturation is not guaranteed to be exact,
337   /// although it is guaranteed that overflow won't result in wrap-around.
338   /// There would be a substantial performance and complexity cost in
339   /// guaranteeing exact saturation (similar to the cost of maintaining
340   /// linearizability near the zero value, but without as much of
341   /// a benefit).
342   void post(uint32_t n) {
343     uint32_t idx;
344     while (n > 0 && (idx = incrOrPop(n)) != 0) {
345       // pop accounts for only 1
346       idxToNode(idx).handoff().post();
347       --n;
348     }
349   }
350
351   /// Returns true iff shutdown() has been called
352   bool isShutdown() const {
353     return UNLIKELY(head_.load(std::memory_order_acquire).isShutdown());
354   }
355
356   /// Prevents blocking on this semaphore, causing all blocking wait()
357   /// calls to throw ShutdownSemError.  Both currently blocked wait() and
358   /// future calls to wait() for which tryWait() would return false will
359   /// cause an exception.  Calls to wait() for which the matching post()
360   /// has already occurred will proceed normally.
361   void shutdown() {
362     // first set the shutdown bit
363     auto h = head_.load(std::memory_order_acquire);
364     while (!h.isShutdown()) {
365       if (head_.compare_exchange_strong(h, h.withShutdown())) {
366         // success
367         h = h.withShutdown();
368         break;
369       }
370       // compare_exchange_strong rereads h, retry
371     }
372
373     // now wake up any waiters
374     while (h.isNodeIdx()) {
375       auto& node = idxToNode(h.idx());
376       auto repl = h.withPop(node.next);
377       if (head_.compare_exchange_strong(h, repl)) {
378         // successful pop, wake up the waiter and move on.  The next
379         // field is used to convey that this wakeup didn't consume a value
380         node.setShutdownNotice();
381         node.handoff().post();
382         h = repl;
383       }
384     }
385   }
386
387   /// Returns true iff value was decremented
388   bool tryWait() {
389     uint32_t n = 1;
390     auto rv = decrOrPush(n, 0);
391     assert((rv == WaitResult::DECR && n == 0) ||
392            (rv != WaitResult::DECR && n == 1));
393     // SHUTDOWN is okay here, since we don't actually wait
394     return rv == WaitResult::DECR;
395   }
396
397   /// Equivalent to (but may be much more efficient than) n calls to
398   /// tryWait().  Returns the total amount by which the semaphore's value
399   /// was decreased
400   uint32_t tryWait(uint32_t n) {
401     auto const orig = n;
402     while (n > 0) {
403 #ifndef NDEBUG
404       auto prev = n;
405 #endif
406       auto rv = decrOrPush(n, 0);
407       assert((rv == WaitResult::DECR && n < prev) ||
408              (rv != WaitResult::DECR && n == prev));
409       if (rv != WaitResult::DECR) {
410         break;
411       }
412     }
413     return orig - n;
414   }
415
416   /// Blocks the current thread until there is a matching post or the
417   /// semaphore is shut down.  Throws ShutdownSemError if the semaphore
418   /// has been shut down and this method would otherwise be blocking.
419   /// Note that wait() doesn't throw during shutdown if tryWait() would
420   /// return true
421   void wait() {
422     // early check isn't required for correctness, but is an important
423     // perf win if we can avoid allocating and deallocating a node
424     if (tryWait()) {
425       return;
426     }
427
428     // allocateNode() won't compile unless Handoff has a default
429     // constructor
430     UniquePtr node = allocateNode();
431
432     auto rv = tryWaitOrPush(*node);
433     if (UNLIKELY(rv == WaitResult::SHUTDOWN)) {
434       assert(isShutdown());
435       throw ShutdownSemError("wait() would block but semaphore is shut down");
436     }
437
438     if (rv == WaitResult::PUSH) {
439       node->handoff().wait();
440       if (UNLIKELY(node->isShutdownNotice())) {
441         // this wait() didn't consume a value, it was triggered by shutdown
442         assert(isShutdown());
443         throw ShutdownSemError(
444             "blocking wait() interrupted by semaphore shutdown");
445       }
446
447       // node->handoff().wait() can't return until after the node has
448       // been popped and post()ed, so it is okay for the UniquePtr to
449       // recycle the node now
450     }
451     // else node wasn't pushed, so it is safe to recycle
452   }
453
454   /// Returns a guess at the current value, designed for debugging.
455   /// If there are no concurrent posters or waiters then this will
456   /// be correct
457   uint32_t valueGuess() const {
458     // this is actually linearizable, but we don't promise that because
459     // we may want to add striping in the future to help under heavy
460     // contention
461     auto h = head_.load(std::memory_order_acquire);
462     return h.isNodeIdx() ? 0 : h.value();
463   }
464
465  protected:
466
467   enum class WaitResult {
468     PUSH,
469     DECR,
470     SHUTDOWN,
471   };
472
473   /// The type of a std::unique_ptr that will automatically return a
474   /// LifoSemNode to the appropriate IndexedMemPool
475   typedef std::unique_ptr<LifoSemNode<Handoff, Atom>,
476                           LifoSemNodeRecycler<Handoff, Atom>> UniquePtr;
477
478   /// Returns a node that can be passed to decrOrLink
479   template <typename... Args>
480   UniquePtr allocateNode(Args&&... args) {
481     auto idx = LifoSemRawNode<Atom>::pool.allocIndex();
482     if (idx != 0) {
483       auto& node = idxToNode(idx);
484       node.clearShutdownNotice();
485       try {
486         node.init(std::forward<Args>(args)...);
487       } catch (...) {
488         LifoSemRawNode<Atom>::pool.recycleIndex(idx);
489         throw;
490       }
491       return UniquePtr(&node);
492     } else {
493       return UniquePtr();
494     }
495   }
496
497   /// Returns DECR if the semaphore value was decremented (and waiterNode
498   /// was untouched), PUSH if a reference to the wait node was pushed,
499   /// or SHUTDOWN if decrement was not possible and push wasn't allowed
500   /// because isShutdown().  Ownership of the wait node remains the
501   /// responsibility of the caller, who must not release it until after
502   /// the node's Handoff has been posted.
503   WaitResult tryWaitOrPush(LifoSemNode<Handoff, Atom>& waiterNode) {
504     uint32_t n = 1;
505     return decrOrPush(n, nodeToIdx(waiterNode));
506   }
507
508  private:
509
510   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
511   folly::AtomicStruct<LifoSemHead,Atom> head_;
512
513   char padding_[folly::detail::CacheLocality::kFalseSharingRange -
514       sizeof(LifoSemHead)];
515
516
517   static LifoSemNode<Handoff, Atom>& idxToNode(uint32_t idx) {
518     auto raw = &LifoSemRawNode<Atom>::pool[idx];
519     return *static_cast<LifoSemNode<Handoff, Atom>*>(raw);
520   }
521
522   static uint32_t nodeToIdx(const LifoSemNode<Handoff, Atom>& node) {
523     return LifoSemRawNode<Atom>::pool.locateElem(&node);
524   }
525
526   /// Either increments by n and returns 0, or pops a node and returns it.
527   /// If n + the stripe's value overflows, then the stripe's value
528   /// saturates silently at 2^32-1
529   uint32_t incrOrPop(uint32_t n) {
530     while (true) {
531       assert(n > 0);
532
533       auto head = head_.load(std::memory_order_acquire);
534       if (head.isNodeIdx()) {
535         auto& node = idxToNode(head.idx());
536         if (head_.compare_exchange_strong(head, head.withPop(node.next))) {
537           // successful pop
538           return head.idx();
539         }
540       } else {
541         auto after = head.withValueIncr(n);
542         if (head_.compare_exchange_strong(head, after)) {
543           // successful incr
544           return 0;
545         }
546       }
547       // retry
548     }
549   }
550
551   /// Returns DECR if some amount was decremented, with that amount
552   /// subtracted from n.  If n is 1 and this function returns DECR then n
553   /// must be 0 afterward.  Returns PUSH if no value could be decremented
554   /// and idx was pushed, or if idx was zero and no push was performed but
555   /// a push would have been performed with a valid node.  Returns SHUTDOWN
556   /// if the caller should have blocked but isShutdown().  If idx == 0,
557   /// may return PUSH even after isShutdown() or may return SHUTDOWN
558   WaitResult decrOrPush(uint32_t& n, uint32_t idx) {
559     assert(n > 0);
560
561     while (true) {
562       auto head = head_.load(std::memory_order_acquire);
563
564       if (!head.isNodeIdx() && head.value() > 0) {
565         // decr
566         auto delta = std::min(n, head.value());
567         if (head_.compare_exchange_strong(head, head.withValueDecr(delta))) {
568           n -= delta;
569           return WaitResult::DECR;
570         }
571       } else {
572         // push
573         if (idx == 0) {
574           return WaitResult::PUSH;
575         }
576
577         if (UNLIKELY(head.isShutdown())) {
578           return WaitResult::SHUTDOWN;
579         }
580
581         auto& node = idxToNode(idx);
582         node.next = head.isNodeIdx() ? head.idx() : 0;
583         if (head_.compare_exchange_strong(head, head.withPush(idx))) {
584           // push succeeded
585           return WaitResult::PUSH;
586         }
587       }
588     }
589     // retry
590   }
591 };
592
593 } // namespace detail
594
595 template <template<typename> class Atom, class BatonType>
596 struct LifoSemImpl : public detail::LifoSemBase<BatonType, Atom> {
597   constexpr explicit LifoSemImpl(uint32_t v = 0)
598     : detail::LifoSemBase<BatonType, Atom>(v) {}
599 };
600
601 } // namespace folly
602
603 #endif