Copyright 2014->2015
[folly.git] / folly / MPMCQueue.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <atomic>
21 #include <assert.h>
22 #include <boost/noncopyable.hpp>
23 #include <limits>
24 #include <string.h>
25 #include <type_traits>
26 #include <unistd.h>
27
28 #include <folly/Traits.h>
29 #include <folly/detail/CacheLocality.h>
30 #include <folly/detail/Futex.h>
31
32 namespace folly {
33
34 namespace detail {
35
36 template<typename T, template<typename> class Atom>
37 class SingleElementQueue;
38
39 template <typename T> class MPMCPipelineStageImpl;
40
41 } // namespace detail
42
43 /// MPMCQueue<T> is a high-performance bounded concurrent queue that
44 /// supports multiple producers, multiple consumers, and optional blocking.
45 /// The queue has a fixed capacity, for which all memory will be allocated
46 /// up front.  The bulk of the work of enqueuing and dequeuing can be
47 /// performed in parallel.
48 ///
49 /// MPMCQueue is linearizable.  That means that if a call to write(A)
50 /// returns before a call to write(B) begins, then A will definitely end up
51 /// in the queue before B, and if a call to read(X) returns before a call
52 /// to read(Y) is started, that X will be something from earlier in the
53 /// queue than Y.  This also means that if a read call returns a value, you
54 /// can be sure that all previous elements of the queue have been assigned
55 /// a reader (that reader might not yet have returned, but it exists).
56 ///
57 /// The underlying implementation uses a ticket dispenser for the head and
58 /// the tail, spreading accesses across N single-element queues to produce
59 /// a queue with capacity N.  The ticket dispensers use atomic increment,
60 /// which is more robust to contention than a CAS loop.  Each of the
61 /// single-element queues uses its own CAS to serialize access, with an
62 /// adaptive spin cutoff.  When spinning fails on a single-element queue
63 /// it uses futex()'s _BITSET operations to reduce unnecessary wakeups
64 /// even if multiple waiters are present on an individual queue (such as
65 /// when the MPMCQueue's capacity is smaller than the number of enqueuers
66 /// or dequeuers).
67 ///
68 /// In benchmarks (contained in tao/queues/ConcurrentQueueTests)
69 /// it handles 1 to 1, 1 to N, N to 1, and N to M thread counts better
70 /// than any of the alternatives present in fbcode, for both small (~10)
71 /// and large capacities.  In these benchmarks it is also faster than
72 /// tbb::concurrent_bounded_queue for all configurations.  When there are
73 /// many more threads than cores, MPMCQueue is _much_ faster than the tbb
74 /// queue because it uses futex() to block and unblock waiting threads,
75 /// rather than spinning with sched_yield.
76 ///
77 /// NOEXCEPT INTERACTION: tl;dr; If it compiles you're fine.  Ticket-based
78 /// queues separate the assignment of queue positions from the actual
79 /// construction of the in-queue elements, which means that the T
80 /// constructor used during enqueue must not throw an exception.  This is
81 /// enforced at compile time using type traits, which requires that T be
82 /// adorned with accurate noexcept information.  If your type does not
83 /// use noexcept, you will have to wrap it in something that provides
84 /// the guarantee.  We provide an alternate safe implementation for types
85 /// that don't use noexcept but that are marked folly::IsRelocatable
86 /// and boost::has_nothrow_constructor, which is common for folly types.
87 /// In particular, if you can declare FOLLY_ASSUME_FBVECTOR_COMPATIBLE
88 /// then your type can be put in MPMCQueue.
89 ///
90 /// If you have a pool of N queue consumers that you want to shut down
91 /// after the queue has drained, one way is to enqueue N sentinel values
92 /// to the queue.  If the producer doesn't know how many consumers there
93 /// are you can enqueue one sentinel and then have each consumer requeue
94 /// two sentinels after it receives it (by requeuing 2 the shutdown can
95 /// complete in O(log P) time instead of O(P)).
96 template<typename T,
97          template<typename> class Atom = std::atomic>
98 class MPMCQueue : boost::noncopyable {
99
100   static_assert(std::is_nothrow_constructible<T,T&&>::value ||
101                 folly::IsRelocatable<T>::value,
102       "T must be relocatable or have a noexcept move constructor");
103
104   friend class detail::MPMCPipelineStageImpl<T>;
105  public:
106   typedef T value_type;
107
108   explicit MPMCQueue(size_t queueCapacity)
109     : capacity_(queueCapacity)
110     , slots_(new detail::SingleElementQueue<T,Atom>[queueCapacity +
111                                                     2 * kSlotPadding])
112     , stride_(computeStride(queueCapacity))
113     , pushTicket_(0)
114     , popTicket_(0)
115     , pushSpinCutoff_(0)
116     , popSpinCutoff_(0)
117   {
118     // ideally this would be a static assert, but g++ doesn't allow it
119     assert(alignof(MPMCQueue<T,Atom>)
120            >= detail::CacheLocality::kFalseSharingRange);
121     assert(static_cast<uint8_t*>(static_cast<void*>(&popTicket_))
122            - static_cast<uint8_t*>(static_cast<void*>(&pushTicket_))
123            >= detail::CacheLocality::kFalseSharingRange);
124   }
125
126   /// A default-constructed queue is useful because a usable (non-zero
127   /// capacity) queue can be moved onto it or swapped with it
128   MPMCQueue() noexcept
129     : capacity_(0)
130     , slots_(nullptr)
131     , stride_(0)
132     , pushTicket_(0)
133     , popTicket_(0)
134     , pushSpinCutoff_(0)
135     , popSpinCutoff_(0)
136   {}
137
138   /// IMPORTANT: The move constructor is here to make it easier to perform
139   /// the initialization phase, it is not safe to use when there are any
140   /// concurrent accesses (this is not checked).
141   MPMCQueue(MPMCQueue<T,Atom>&& rhs) noexcept
142     : capacity_(rhs.capacity_)
143     , slots_(rhs.slots_)
144     , stride_(rhs.stride_)
145     , pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed))
146     , popTicket_(rhs.popTicket_.load(std::memory_order_relaxed))
147     , pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed))
148     , popSpinCutoff_(rhs.popSpinCutoff_.load(std::memory_order_relaxed))
149   {
150     // relaxed ops are okay for the previous reads, since rhs queue can't
151     // be in concurrent use
152
153     // zero out rhs
154     rhs.capacity_ = 0;
155     rhs.slots_ = nullptr;
156     rhs.stride_ = 0;
157     rhs.pushTicket_.store(0, std::memory_order_relaxed);
158     rhs.popTicket_.store(0, std::memory_order_relaxed);
159     rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
160     rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
161   }
162
163   /// IMPORTANT: The move operator is here to make it easier to perform
164   /// the initialization phase, it is not safe to use when there are any
165   /// concurrent accesses (this is not checked).
166   MPMCQueue<T,Atom> const& operator= (MPMCQueue<T,Atom>&& rhs) {
167     if (this != &rhs) {
168       this->~MPMCQueue();
169       new (this) MPMCQueue(std::move(rhs));
170     }
171     return *this;
172   }
173
174   /// MPMCQueue can only be safely destroyed when there are no
175   /// pending enqueuers or dequeuers (this is not checked).
176   ~MPMCQueue() {
177     delete[] slots_;
178   }
179
180   /// Returns the number of successful reads minus the number of successful
181   /// writes.  Waiting blockingRead and blockingWrite calls are included,
182   /// so this value can be negative.
183   ssize_t size() const noexcept {
184     // since both pushes and pops increase monotonically, we can get a
185     // consistent snapshot either by bracketing a read of popTicket_ with
186     // two reads of pushTicket_ that return the same value, or the other
187     // way around.  We maximize our chances by alternately attempting
188     // both bracketings.
189     uint64_t pushes = pushTicket_.load(std::memory_order_acquire); // A
190     uint64_t pops = popTicket_.load(std::memory_order_acquire); // B
191     while (true) {
192       uint64_t nextPushes = pushTicket_.load(std::memory_order_acquire); // C
193       if (pushes == nextPushes) {
194         // pushTicket_ didn't change from A (or the previous C) to C,
195         // so we can linearize at B (or D)
196         return pushes - pops;
197       }
198       pushes = nextPushes;
199       uint64_t nextPops = popTicket_.load(std::memory_order_acquire); // D
200       if (pops == nextPops) {
201         // popTicket_ didn't chance from B (or the previous D), so we
202         // can linearize at C
203         return pushes - pops;
204       }
205       pops = nextPops;
206     }
207   }
208
209   /// Returns true if there are no items available for dequeue
210   bool isEmpty() const noexcept {
211     return size() <= 0;
212   }
213
214   /// Returns true if there is currently no empty space to enqueue
215   bool isFull() const noexcept {
216     // careful with signed -> unsigned promotion, since size can be negative
217     return size() >= static_cast<ssize_t>(capacity_);
218   }
219
220   /// Returns is a guess at size() for contexts that don't need a precise
221   /// value, such as stats.
222   ssize_t sizeGuess() const noexcept {
223     return writeCount() - readCount();
224   }
225
226   /// Doesn't change
227   size_t capacity() const noexcept {
228     return capacity_;
229   }
230
231   /// Returns the total number of calls to blockingWrite or successful
232   /// calls to write, including those blockingWrite calls that are
233   /// currently blocking
234   uint64_t writeCount() const noexcept {
235     return pushTicket_.load(std::memory_order_acquire);
236   }
237
238   /// Returns the total number of calls to blockingRead or successful
239   /// calls to read, including those blockingRead calls that are currently
240   /// blocking
241   uint64_t readCount() const noexcept {
242     return popTicket_.load(std::memory_order_acquire);
243   }
244
245   /// Enqueues a T constructed from args, blocking until space is
246   /// available.  Note that this method signature allows enqueue via
247   /// move, if args is a T rvalue, via copy, if args is a T lvalue, or
248   /// via emplacement if args is an initializer list that can be passed
249   /// to a T constructor.
250   template <typename ...Args>
251   void blockingWrite(Args&&... args) noexcept {
252     enqueueWithTicket(pushTicket_++, std::forward<Args>(args)...);
253   }
254
255   /// If an item can be enqueued with no blocking, does so and returns
256   /// true, otherwise returns false.  This method is similar to
257   /// writeIfNotFull, but if you don't have a specific need for that
258   /// method you should use this one.
259   ///
260   /// One of the common usages of this method is to enqueue via the
261   /// move constructor, something like q.write(std::move(x)).  If write
262   /// returns false because the queue is full then x has not actually been
263   /// consumed, which looks strange.  To understand why it is actually okay
264   /// to use x afterward, remember that std::move is just a typecast that
265   /// provides an rvalue reference that enables use of a move constructor
266   /// or operator.  std::move doesn't actually move anything.  It could
267   /// more accurately be called std::rvalue_cast or std::move_permission.
268   template <typename ...Args>
269   bool write(Args&&... args) noexcept {
270     uint64_t ticket;
271     if (tryObtainReadyPushTicket(ticket)) {
272       // we have pre-validated that the ticket won't block
273       enqueueWithTicket(ticket, std::forward<Args>(args)...);
274       return true;
275     } else {
276       return false;
277     }
278   }
279
280   /// If the queue is not full, enqueues and returns true, otherwise
281   /// returns false.  Unlike write this method can be blocked by another
282   /// thread, specifically a read that has linearized (been assigned
283   /// a ticket) but not yet completed.  If you don't really need this
284   /// function you should probably use write.
285   ///
286   /// MPMCQueue isn't lock-free, so just because a read operation has
287   /// linearized (and isFull is false) doesn't mean that space has been
288   /// made available for another write.  In this situation write will
289   /// return false, but writeIfNotFull will wait for the dequeue to finish.
290   /// This method is required if you are composing queues and managing
291   /// your own wakeup, because it guarantees that after every successful
292   /// write a readIfNotEmpty will succeed.
293   template <typename ...Args>
294   bool writeIfNotFull(Args&&... args) noexcept {
295     uint64_t ticket;
296     if (tryObtainPromisedPushTicket(ticket)) {
297       // some other thread is already dequeuing the slot into which we
298       // are going to enqueue, but we might have to wait for them to finish
299       enqueueWithTicket(ticket, std::forward<Args>(args)...);
300       return true;
301     } else {
302       return false;
303     }
304   }
305
306   /// Moves a dequeued element onto elem, blocking until an element
307   /// is available
308   void blockingRead(T& elem) noexcept {
309     dequeueWithTicket(popTicket_++, elem);
310   }
311
312   /// If an item can be dequeued with no blocking, does so and returns
313   /// true, otherwise returns false.
314   bool read(T& elem) noexcept {
315     uint64_t ticket;
316     if (tryObtainReadyPopTicket(ticket)) {
317       // the ticket has been pre-validated to not block
318       dequeueWithTicket(ticket, elem);
319       return true;
320     } else {
321       return false;
322     }
323   }
324
325   /// If the queue is not empty, dequeues and returns true, otherwise
326   /// returns false.  If the matching write is still in progress then this
327   /// method may block waiting for it.  If you don't rely on being able
328   /// to dequeue (such as by counting completed write) then you should
329   /// prefer read.
330   bool readIfNotEmpty(T& elem) noexcept {
331     uint64_t ticket;
332     if (tryObtainPromisedPopTicket(ticket)) {
333       // the matching enqueue already has a ticket, but might not be done
334       dequeueWithTicket(ticket, elem);
335       return true;
336     } else {
337       return false;
338     }
339   }
340
341  private:
342   enum {
343     /// Once every kAdaptationFreq we will spin longer, to try to estimate
344     /// the proper spin backoff
345     kAdaptationFreq = 128,
346
347     /// To avoid false sharing in slots_ with neighboring memory
348     /// allocations, we pad it with this many SingleElementQueue-s at
349     /// each end
350     kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1)
351         / sizeof(detail::SingleElementQueue<T,Atom>) + 1
352   };
353
354   /// The maximum number of items in the queue at once
355   size_t FOLLY_ALIGN_TO_AVOID_FALSE_SHARING capacity_;
356
357   /// An array of capacity_ SingleElementQueue-s, each of which holds
358   /// either 0 or 1 item.  We over-allocate by 2 * kSlotPadding and don't
359   /// touch the slots at either end, to avoid false sharing
360   detail::SingleElementQueue<T,Atom>* slots_;
361
362   /// The number of slots_ indices that we advance for each ticket, to
363   /// avoid false sharing.  Ideally slots_[i] and slots_[i + stride_]
364   /// aren't on the same cache line
365   int stride_;
366
367   /// Enqueuers get tickets from here
368   Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushTicket_;
369
370   /// Dequeuers get tickets from here
371   Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popTicket_;
372
373   /// This is how many times we will spin before using FUTEX_WAIT when
374   /// the queue is full on enqueue, adaptively computed by occasionally
375   /// spinning for longer and smoothing with an exponential moving average
376   Atom<uint32_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushSpinCutoff_;
377
378   /// The adaptive spin cutoff when the queue is empty on dequeue
379   Atom<uint32_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popSpinCutoff_;
380
381   /// Alignment doesn't prevent false sharing at the end of the struct,
382   /// so fill out the last cache line
383   char padding_[detail::CacheLocality::kFalseSharingRange -
384                 sizeof(Atom<uint32_t>)];
385
386
387   /// We assign tickets in increasing order, but we don't want to
388   /// access neighboring elements of slots_ because that will lead to
389   /// false sharing (multiple cores accessing the same cache line even
390   /// though they aren't accessing the same bytes in that cache line).
391   /// To avoid this we advance by stride slots per ticket.
392   ///
393   /// We need gcd(capacity, stride) to be 1 so that we will use all
394   /// of the slots.  We ensure this by only considering prime strides,
395   /// which either have no common divisors with capacity or else have
396   /// a zero remainder after dividing by capacity.  That is sufficient
397   /// to guarantee correctness, but we also want to actually spread the
398   /// accesses away from each other to avoid false sharing (consider a
399   /// stride of 7 with a capacity of 8).  To that end we try a few taking
400   /// care to observe that advancing by -1 is as bad as advancing by 1
401   /// when in comes to false sharing.
402   ///
403   /// The simple way to avoid false sharing would be to pad each
404   /// SingleElementQueue, but since we have capacity_ of them that could
405   /// waste a lot of space.
406   static int computeStride(size_t capacity) noexcept {
407     static const int smallPrimes[] = { 2, 3, 5, 7, 11, 13, 17, 19, 23 };
408
409     int bestStride = 1;
410     size_t bestSep = 1;
411     for (int stride : smallPrimes) {
412       if ((stride % capacity) == 0 || (capacity % stride) == 0) {
413         continue;
414       }
415       size_t sep = stride % capacity;
416       sep = std::min(sep, capacity - sep);
417       if (sep > bestSep) {
418         bestStride = stride;
419         bestSep = sep;
420       }
421     }
422     return bestStride;
423   }
424
425   /// Returns the index into slots_ that should be used when enqueuing or
426   /// dequeuing with the specified ticket
427   size_t idx(uint64_t ticket) noexcept {
428     return ((ticket * stride_) % capacity_) + kSlotPadding;
429   }
430
431   /// Maps an enqueue or dequeue ticket to the turn should be used at the
432   /// corresponding SingleElementQueue
433   uint32_t turn(uint64_t ticket) noexcept {
434     return ticket / capacity_;
435   }
436
437   /// Tries to obtain a push ticket for which SingleElementQueue::enqueue
438   /// won't block.  Returns true on immediate success, false on immediate
439   /// failure.
440   bool tryObtainReadyPushTicket(uint64_t& rv) noexcept {
441     auto ticket = pushTicket_.load(std::memory_order_acquire); // A
442     while (true) {
443       if (!slots_[idx(ticket)].mayEnqueue(turn(ticket))) {
444         // if we call enqueue(ticket, ...) on the SingleElementQueue
445         // right now it would block, but this might no longer be the next
446         // ticket.  We can increase the chance of tryEnqueue success under
447         // contention (without blocking) by rechecking the ticket dispenser
448         auto prev = ticket;
449         ticket = pushTicket_.load(std::memory_order_acquire); // B
450         if (prev == ticket) {
451           // mayEnqueue was bracketed by two reads (A or prev B or prev
452           // failing CAS to B), so we are definitely unable to enqueue
453           return false;
454         }
455       } else {
456         // we will bracket the mayEnqueue check with a read (A or prev B
457         // or prev failing CAS) and the following CAS.  If the CAS fails
458         // it will effect a load of pushTicket_
459         if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
460           rv = ticket;
461           return true;
462         }
463       }
464     }
465   }
466
467   /// Tries to obtain a push ticket which can be satisfied if all
468   /// in-progress pops complete.  This function does not block, but
469   /// blocking may be required when using the returned ticket if some
470   /// other thread's pop is still in progress (ticket has been granted but
471   /// pop has not yet completed).
472   bool tryObtainPromisedPushTicket(uint64_t& rv) noexcept {
473     auto numPushes = pushTicket_.load(std::memory_order_acquire); // A
474     while (true) {
475       auto numPops = popTicket_.load(std::memory_order_acquire); // B
476       // n will be negative if pops are pending
477       int64_t n = numPushes - numPops;
478       if (n >= static_cast<ssize_t>(capacity_)) {
479         // Full, linearize at B.  We don't need to recheck the read we
480         // performed at A, because if numPushes was stale at B then the
481         // real numPushes value is even worse
482         return false;
483       }
484       if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) {
485         rv = numPushes;
486         return true;
487       }
488     }
489   }
490
491   /// Tries to obtain a pop ticket for which SingleElementQueue::dequeue
492   /// won't block.  Returns true on immediate success, false on immediate
493   /// failure.
494   bool tryObtainReadyPopTicket(uint64_t& rv) noexcept {
495     auto ticket = popTicket_.load(std::memory_order_acquire);
496     while (true) {
497       if (!slots_[idx(ticket)].mayDequeue(turn(ticket))) {
498         auto prev = ticket;
499         ticket = popTicket_.load(std::memory_order_acquire);
500         if (prev == ticket) {
501           return false;
502         }
503       } else {
504         if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
505           rv = ticket;
506           return true;
507         }
508       }
509     }
510   }
511
512   /// Similar to tryObtainReadyPopTicket, but returns a pop ticket whose
513   /// corresponding push ticket has already been handed out, rather than
514   /// returning one whose corresponding push ticket has already been
515   /// completed.  This means that there is a possibility that the caller
516   /// will block when using the ticket, but it allows the user to rely on
517   /// the fact that if enqueue has succeeded, tryObtainPromisedPopTicket
518   /// will return true.  The "try" part of this is that we won't have
519   /// to block waiting for someone to call enqueue, although we might
520   /// have to block waiting for them to finish executing code inside the
521   /// MPMCQueue itself.
522   bool tryObtainPromisedPopTicket(uint64_t& rv) noexcept {
523     auto numPops = popTicket_.load(std::memory_order_acquire); // A
524     while (true) {
525       auto numPushes = pushTicket_.load(std::memory_order_acquire); // B
526       if (numPops >= numPushes) {
527         // Empty, or empty with pending pops.  Linearize at B.  We don't
528         // need to recheck the read we performed at A, because if numPops
529         // is stale then the fresh value is larger and the >= is still true
530         return false;
531       }
532       if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) {
533         rv = numPops;
534         return true;
535       }
536     }
537   }
538
539   // Given a ticket, constructs an enqueued item using args
540   template <typename ...Args>
541   void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept {
542     slots_[idx(ticket)].enqueue(turn(ticket),
543                                 pushSpinCutoff_,
544                                 (ticket % kAdaptationFreq) == 0,
545                                 std::forward<Args>(args)...);
546   }
547
548   // Given a ticket, dequeues the corresponding element
549   void dequeueWithTicket(uint64_t ticket, T& elem) noexcept {
550     slots_[idx(ticket)].dequeue(turn(ticket),
551                                 popSpinCutoff_,
552                                 (ticket % kAdaptationFreq) == 0,
553                                 elem);
554   }
555 };
556
557
558 namespace detail {
559
560 /// A TurnSequencer allows threads to order their execution according to
561 /// a monotonically increasing (with wraparound) "turn" value.  The two
562 /// operations provided are to wait for turn T, and to move to the next
563 /// turn.  Every thread that is waiting for T must have arrived before
564 /// that turn is marked completed (for MPMCQueue only one thread waits
565 /// for any particular turn, so this is trivially true).
566 ///
567 /// TurnSequencer's state_ holds 26 bits of the current turn (shifted
568 /// left by 6), along with a 6 bit saturating value that records the
569 /// maximum waiter minus the current turn.  Wraparound of the turn space
570 /// is expected and handled.  This allows us to atomically adjust the
571 /// number of outstanding waiters when we perform a FUTEX_WAKE operation.
572 /// Compare this strategy to sem_t's separate num_waiters field, which
573 /// isn't decremented until after the waiting thread gets scheduled,
574 /// during which time more enqueues might have occurred and made pointless
575 /// FUTEX_WAKE calls.
576 ///
577 /// TurnSequencer uses futex() directly.  It is optimized for the
578 /// case that the highest awaited turn is 32 or less higher than the
579 /// current turn.  We use the FUTEX_WAIT_BITSET variant, which lets
580 /// us embed 32 separate wakeup channels in a single futex.  See
581 /// http://locklessinc.com/articles/futex_cheat_sheet for a description.
582 ///
583 /// We only need to keep exact track of the delta between the current
584 /// turn and the maximum waiter for the 32 turns that follow the current
585 /// one, because waiters at turn t+32 will be awoken at turn t.  At that
586 /// point they can then adjust the delta using the higher base.  Since we
587 /// need to encode waiter deltas of 0 to 32 inclusive, we use 6 bits.
588 /// We actually store waiter deltas up to 63, since that might reduce
589 /// the number of CAS operations a tiny bit.
590 ///
591 /// To avoid some futex() calls entirely, TurnSequencer uses an adaptive
592 /// spin cutoff before waiting.  The overheads (and convergence rate)
593 /// of separately tracking the spin cutoff for each TurnSequencer would
594 /// be prohibitive, so the actual storage is passed in as a parameter and
595 /// updated atomically.  This also lets the caller use different adaptive
596 /// cutoffs for different operations (read versus write, for example).
597 /// To avoid contention, the spin cutoff is only updated when requested
598 /// by the caller.
599 template <template<typename> class Atom>
600 struct TurnSequencer {
601   explicit TurnSequencer(const uint32_t firstTurn = 0) noexcept
602       : state_(encode(firstTurn << kTurnShift, 0))
603   {}
604
605   /// Returns true iff a call to waitForTurn(turn, ...) won't block
606   bool isTurn(const uint32_t turn) const noexcept {
607     auto state = state_.load(std::memory_order_acquire);
608     return decodeCurrentSturn(state) == (turn << kTurnShift);
609   }
610
611   // Internally we always work with shifted turn values, which makes the
612   // truncation and wraparound work correctly.  This leaves us bits at
613   // the bottom to store the number of waiters.  We call shifted turns
614   // "sturns" inside this class.
615
616   /// Blocks the current thread until turn has arrived.  If
617   /// updateSpinCutoff is true then this will spin for up to kMaxSpins tries
618   /// before blocking and will adjust spinCutoff based on the results,
619   /// otherwise it will spin for at most spinCutoff spins.
620   void waitForTurn(const uint32_t turn,
621                    Atom<uint32_t>& spinCutoff,
622                    const bool updateSpinCutoff) noexcept {
623     uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed);
624     const uint32_t effectiveSpinCutoff =
625         updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh;
626
627     uint32_t tries;
628     const uint32_t sturn = turn << kTurnShift;
629     for (tries = 0; ; ++tries) {
630       uint32_t state = state_.load(std::memory_order_acquire);
631       uint32_t current_sturn = decodeCurrentSturn(state);
632       if (current_sturn == sturn) {
633         break;
634       }
635
636       // wrap-safe version of assert(current_sturn < sturn)
637       assert(sturn - current_sturn < std::numeric_limits<uint32_t>::max() / 2);
638
639       // the first effectSpinCutoff tries are spins, after that we will
640       // record ourself as a waiter and block with futexWait
641       if (tries < effectiveSpinCutoff) {
642         asm volatile ("pause");
643         continue;
644       }
645
646       uint32_t current_max_waiter_delta = decodeMaxWaitersDelta(state);
647       uint32_t our_waiter_delta = (sturn - current_sturn) >> kTurnShift;
648       uint32_t new_state;
649       if (our_waiter_delta <= current_max_waiter_delta) {
650         // state already records us as waiters, probably because this
651         // isn't our first time around this loop
652         new_state = state;
653       } else {
654         new_state = encode(current_sturn, our_waiter_delta);
655         if (state != new_state &&
656             !state_.compare_exchange_strong(state, new_state)) {
657           continue;
658         }
659       }
660       state_.futexWait(new_state, futexChannel(turn));
661     }
662
663     if (updateSpinCutoff || prevThresh == 0) {
664       // if we hit kMaxSpins then spinning was pointless, so the right
665       // spinCutoff is kMinSpins
666       uint32_t target;
667       if (tries >= kMaxSpins) {
668         target = kMinSpins;
669       } else {
670         // to account for variations, we allow ourself to spin 2*N when
671         // we think that N is actually required in order to succeed
672         target = std::min<uint32_t>(kMaxSpins,
673                                     std::max<uint32_t>(kMinSpins, tries * 2));
674       }
675
676       if (prevThresh == 0) {
677         // bootstrap
678         spinCutoff.store(target);
679       } else {
680         // try once, keep moving if CAS fails.  Exponential moving average
681         // with alpha of 7/8
682         // Be careful that the quantity we add to prevThresh is signed.
683         spinCutoff.compare_exchange_weak(
684             prevThresh, prevThresh + int(target - prevThresh) / 8);
685       }
686     }
687   }
688
689   /// Unblocks a thread running waitForTurn(turn + 1)
690   void completeTurn(const uint32_t turn) noexcept {
691     uint32_t state = state_.load(std::memory_order_acquire);
692     while (true) {
693       assert(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state)));
694       uint32_t max_waiter_delta = decodeMaxWaitersDelta(state);
695       uint32_t new_state = encode(
696               (turn + 1) << kTurnShift,
697               max_waiter_delta == 0 ? 0 : max_waiter_delta - 1);
698       if (state_.compare_exchange_strong(state, new_state)) {
699         if (max_waiter_delta != 0) {
700           state_.futexWake(std::numeric_limits<int>::max(),
701                            futexChannel(turn + 1));
702         }
703         break;
704       }
705       // failing compare_exchange_strong updates first arg to the value
706       // that caused the failure, so no need to reread state_
707     }
708   }
709
710   /// Returns the least-most significant byte of the current uncompleted
711   /// turn.  The full 32 bit turn cannot be recovered.
712   uint8_t uncompletedTurnLSB() const noexcept {
713     return state_.load(std::memory_order_acquire) >> kTurnShift;
714   }
715
716  private:
717   enum : uint32_t {
718     /// kTurnShift counts the bits that are stolen to record the delta
719     /// between the current turn and the maximum waiter. It needs to be big
720     /// enough to record wait deltas of 0 to 32 inclusive.  Waiters more
721     /// than 32 in the future will be woken up 32*n turns early (since
722     /// their BITSET will hit) and will adjust the waiter count again.
723     /// We go a bit beyond and let the waiter count go up to 63, which
724     /// is free and might save us a few CAS
725     kTurnShift = 6,
726     kWaitersMask = (1 << kTurnShift) - 1,
727
728     /// The minimum spin count that we will adaptively select
729     kMinSpins = 20,
730
731     /// The maximum spin count that we will adaptively select, and the
732     /// spin count that will be used when probing to get a new data point
733     /// for the adaptation
734     kMaxSpins = 2000,
735   };
736
737   /// This holds both the current turn, and the highest waiting turn,
738   /// stored as (current_turn << 6) | min(63, max(waited_turn - current_turn))
739   Futex<Atom> state_;
740
741   /// Returns the bitmask to pass futexWait or futexWake when communicating
742   /// about the specified turn
743   int futexChannel(uint32_t turn) const noexcept {
744     return 1 << (turn & 31);
745   }
746
747   uint32_t decodeCurrentSturn(uint32_t state) const noexcept {
748     return state & ~kWaitersMask;
749   }
750
751   uint32_t decodeMaxWaitersDelta(uint32_t state) const noexcept {
752     return state & kWaitersMask;
753   }
754
755   uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept {
756     return currentSturn | std::min(uint32_t{ kWaitersMask }, maxWaiterD);
757   }
758 };
759
760
761 /// SingleElementQueue implements a blocking queue that holds at most one
762 /// item, and that requires its users to assign incrementing identifiers
763 /// (turns) to each enqueue and dequeue operation.  Note that the turns
764 /// used by SingleElementQueue are doubled inside the TurnSequencer
765 template <typename T, template <typename> class Atom>
766 struct SingleElementQueue {
767
768   ~SingleElementQueue() noexcept {
769     if ((sequencer_.uncompletedTurnLSB() & 1) == 1) {
770       // we are pending a dequeue, so we have a constructed item
771       destroyContents();
772     }
773   }
774
775   /// enqueue using in-place noexcept construction
776   template <typename ...Args,
777             typename = typename std::enable_if<
778                 std::is_nothrow_constructible<T,Args...>::value>::type>
779   void enqueue(const uint32_t turn,
780                Atom<uint32_t>& spinCutoff,
781                const bool updateSpinCutoff,
782                Args&&... args) noexcept {
783     sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
784     new (&contents_) T(std::forward<Args>(args)...);
785     sequencer_.completeTurn(turn * 2);
786   }
787
788   /// enqueue using move construction, either real (if
789   /// is_nothrow_move_constructible) or simulated using relocation and
790   /// default construction (if IsRelocatable and has_nothrow_constructor)
791   template <typename = typename std::enable_if<
792                 (folly::IsRelocatable<T>::value &&
793                  boost::has_nothrow_constructor<T>::value) ||
794                 std::is_nothrow_constructible<T,T&&>::value>::type>
795   void enqueue(const uint32_t turn,
796                Atom<uint32_t>& spinCutoff,
797                const bool updateSpinCutoff,
798                T&& goner) noexcept {
799     if (std::is_nothrow_constructible<T,T&&>::value) {
800       // this is preferred
801       sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
802       new (&contents_) T(std::move(goner));
803       sequencer_.completeTurn(turn * 2);
804     } else {
805       // simulate nothrow move with relocation, followed by default
806       // construction to fill the gap we created
807       sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
808       memcpy(&contents_, &goner, sizeof(T));
809       sequencer_.completeTurn(turn * 2);
810       new (&goner) T();
811     }
812   }
813
814   bool mayEnqueue(const uint32_t turn) const noexcept {
815     return sequencer_.isTurn(turn * 2);
816   }
817
818   void dequeue(uint32_t turn,
819                Atom<uint32_t>& spinCutoff,
820                const bool updateSpinCutoff,
821                T& elem) noexcept {
822     if (folly::IsRelocatable<T>::value) {
823       // this version is preferred, because we do as much work as possible
824       // before waiting
825       try {
826         elem.~T();
827       } catch (...) {
828         // unlikely, but if we don't complete our turn the queue will die
829       }
830       sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
831       memcpy(&elem, &contents_, sizeof(T));
832       sequencer_.completeTurn(turn * 2 + 1);
833     } else {
834       // use nothrow move assignment
835       sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
836       elem = std::move(*ptr());
837       destroyContents();
838       sequencer_.completeTurn(turn * 2 + 1);
839     }
840   }
841
842   bool mayDequeue(const uint32_t turn) const noexcept {
843     return sequencer_.isTurn(turn * 2 + 1);
844   }
845
846  private:
847   /// Storage for a T constructed with placement new
848   typename std::aligned_storage<sizeof(T),alignof(T)>::type contents_;
849
850   /// Even turns are pushes, odd turns are pops
851   TurnSequencer<Atom> sequencer_;
852
853   T* ptr() noexcept {
854     return static_cast<T*>(static_cast<void*>(&contents_));
855   }
856
857   void destroyContents() noexcept {
858     try {
859       ptr()->~T();
860     } catch (...) {
861       // g++ doesn't seem to have std::is_nothrow_destructible yet
862     }
863 #ifndef NDEBUG
864     memset(&contents_, 'Q', sizeof(T));
865 #endif
866   }
867 };
868
869 } // namespace detail
870
871 } // namespace folly