Multi-producer multi-consumer queue with optional blocking
[folly.git] / folly / MPMCQueue.h
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <atomic>
21 #include <assert.h>
22 #include <boost/noncopyable.hpp>
23 #include <errno.h>
24 #include <limits>
25 #include <linux/futex.h>
26 #include <string.h>
27 #include <sys/syscall.h>
28 #include <unistd.h>
29
30 #include <folly/Traits.h>
31 #include <folly/detail/Futex.h>
32
33 namespace folly {
34
35 namespace detail {
36
37 template<typename T, template<typename> class Atom>
38 class SingleElementQueue;
39
40 } // namespace detail
41
42 /// MPMCQueue<T> is a high-performance bounded concurrent queue that
43 /// supports multiple producers, multiple consumers, and optional blocking.
44 /// The queue has a fixed capacity, for which all memory will be allocated
45 /// up front.  The bulk of the work of enqueuing and dequeuing can be
46 /// performed in parallel.
47 ///
48 /// The underlying implementation uses a ticket dispenser for the head and
49 /// the tail, spreading accesses across N single-element queues to produce
50 /// a queue with capacity N.  The ticket dispensers use atomic increment,
51 /// which is more robust to contention than a CAS loop.  Each of the
52 /// single-element queues uses its own CAS to serialize access, with an
53 /// adaptive spin cutoff.  When spinning fails on a single-element queue
54 /// it uses futex()'s _BITSET operations to reduce unnecessary wakeups
55 /// even if multiple waiters are present on an individual queue (such as
56 /// when the MPMCQueue's capacity is smaller than the number of enqueuers
57 /// or dequeuers).
58 ///
59 /// NOEXCEPT INTERACTION: Ticket-based queues separate the assignment
60 /// of In benchmarks (contained in tao/queues/ConcurrentQueueTests)
61 /// it handles 1 to 1, 1 to N, N to 1, and N to M thread counts better
62 /// than any of the alternatives present in fbcode, for both small (~10)
63 /// and large capacities.  In these benchmarks it is also faster than
64 /// tbb::concurrent_bounded_queue for all configurations.  When there are
65 /// many more threads than cores, MPMCQueue is _much_ faster than the tbb
66 /// queue because it uses futex() to block and unblock waiting threads,
67 /// rather than spinning with sched_yield.
68 ///
69 /// queue positions from the actual construction of the in-queue elements,
70 /// which means that the T constructor used during enqueue must not throw
71 /// an exception.  This is enforced at compile time using type traits,
72 /// which requires that T be adorned with accurate noexcept information.
73 /// If your type does not use noexcept, you will have to wrap it in
74 /// something that provides the guarantee.  We provide an alternate
75 /// safe implementation for types that don't use noexcept but that are
76 /// marked folly::IsRelocatable and boost::has_nothrow_constructor,
77 /// which is common for folly types.  In particular, if you can declare
78 /// FOLLY_ASSUME_FBVECTOR_COMPATIBLE then your type can be put in
79 /// MPMCQueue.
80 template<typename T,
81          template<typename> class Atom = std::atomic,
82          typename = typename std::enable_if<
83              std::is_nothrow_constructible<T,T&&>::value ||
84              folly::IsRelocatable<T>::value>::type>
85 class MPMCQueue : boost::noncopyable {
86  public:
87   typedef T value_type;
88
89   explicit MPMCQueue(size_t capacity)
90     : capacity_(capacity)
91     , slots_(new detail::SingleElementQueue<T,Atom>[capacity +
92                                                     2 * kSlotPadding])
93     , stride_(computeStride(capacity))
94     , pushTicket_(0)
95     , popTicket_(0)
96     , pushSpinCutoff_(0)
97     , popSpinCutoff_(0)
98   {
99     // ideally this would be a static assert, but g++ doesn't allow it
100     assert(alignof(MPMCQueue<T,Atom>) >= kFalseSharingRange);
101   }
102
103   /// A default-constructed queue is useful because a usable (non-zero
104   /// capacity) queue can be moved onto it or swapped with it
105   MPMCQueue() noexcept
106     : capacity_(0)
107     , slots_(nullptr)
108     , stride_(0)
109     , pushTicket_(0)
110     , popTicket_(0)
111     , pushSpinCutoff_(0)
112     , popSpinCutoff_(0)
113   {}
114
115   /// IMPORTANT: The move constructor is here to make it easier to perform
116   /// the initialization phase, it is not safe to use when there are any
117   /// concurrent accesses (this is not checked).
118   MPMCQueue(MPMCQueue<T,Atom>&& rhs) noexcept
119     : capacity_(rhs.capacity_)
120     , slots_(rhs.slots_)
121     , stride_(rhs.stride_)
122     , pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed))
123     , popTicket_(rhs.popTicket_.load(std::memory_order_relaxed))
124     , pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed))
125     , popSpinCutoff_(rhs.popSpinCutoff_.load(std::memory_order_relaxed))
126   {
127     // relaxed ops are okay for the previous reads, since rhs queue can't
128     // be in concurrent use
129
130     // zero out rhs
131     rhs.capacity_ = 0;
132     rhs.slots_ = nullptr;
133     rhs.stride_ = 0;
134     rhs.pushTicket_.store(0, std::memory_order_relaxed);
135     rhs.popTicket_.store(0, std::memory_order_relaxed);
136     rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
137     rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
138   }
139
140   /// IMPORTANT: The move operator is here to make it easier to perform
141   /// the initialization phase, it is not safe to use when there are any
142   /// concurrent accesses (this is not checked).
143   MPMCQueue<T,Atom> const& operator= (MPMCQueue<T,Atom>&& rhs) {
144     if (this != &rhs) {
145       this->~MPMCQueue();
146       new (this) MPMCQueue(std::move(rhs));
147     }
148     return *this;
149   }
150
151   /// MPMCQueue can only be safely destroyed when there are no
152   /// pending enqueuers or dequeuers (this is not checked).
153   ~MPMCQueue() {
154     delete[] slots_;
155   }
156
157   /// Returns the number of successful reads minus the number of successful
158   /// writes.  Waiting blockingRead and blockingWrite calls are included,
159   /// so this value can be negative.
160   ssize_t size() const noexcept {
161     // since both pushes and pops increase monotonically, we can get a
162     // consistent snapshot either by bracketing a read of popTicket_ with
163     // two reads of pushTicket_ that return the same value, or the other
164     // way around.  We maximize our chances by alternately attempting
165     // both bracketings.
166     uint64_t pushes = pushTicket_.load(std::memory_order_acquire); // A
167     uint64_t pops = popTicket_.load(std::memory_order_acquire); // B
168     while (true) {
169       uint64_t nextPushes = pushTicket_.load(std::memory_order_acquire); // C
170       if (pushes == nextPushes) {
171         // pushTicket_ didn't change from A (or the previous C) to C,
172         // so we can linearize at B (or D)
173         return pushes - pops;
174       }
175       pushes = nextPushes;
176       uint64_t nextPops = popTicket_.load(std::memory_order_acquire); // D
177       if (pops == nextPops) {
178         // popTicket_ didn't chance from B (or the previous D), so we
179         // can linearize at C
180         return pushes - pops;
181       }
182       pops = nextPops;
183     }
184   }
185
186   /// Returns true if there are no items available for dequeue
187   bool isEmpty() const noexcept {
188     return size() <= 0;
189   }
190
191   /// Returns true if there is currently no empty space to enqueue
192   bool isFull() const noexcept {
193     // careful with signed -> unsigned promotion, since size can be negative
194     return size() >= static_cast<ssize_t>(capacity_);
195   }
196
197   /// Returns is a guess at size() for contexts that don't need a precise
198   /// value, such as stats.
199   uint64_t sizeGuess() const noexcept {
200     return writeCount() - readCount();
201   }
202
203   /// Doesn't change
204   size_t capacity() const noexcept {
205     return capacity_;
206   }
207
208   /// Returns the total number of calls to blockingWrite or successful
209   /// calls to write, including those blockingWrite calls that are
210   /// currently blocking
211   uint64_t writeCount() const noexcept {
212     return pushTicket_.load(std::memory_order_acquire);
213   }
214
215   /// Returns the total number of calls to blockingRead or successful
216   /// calls to read, including those blockingRead calls that are currently
217   /// blocking
218   uint64_t readCount() const noexcept {
219     return popTicket_.load(std::memory_order_acquire);
220   }
221
222   /// Enqueues a T constructed from args, blocking until space is
223   /// available.  Note that this method signature allows enqueue via
224   /// move, if args is a T rvalue, via copy, if args is a T lvalue, or
225   /// via emplacement if args is an initializer list that can be passed
226   /// to a T constructor.
227   template <typename ...Args>
228   void blockingWrite(Args&&... args) noexcept {
229     enqueueWithTicket(pushTicket_++, std::forward<Args>(args)...);
230   }
231
232   /// If an item can be enqueued with no blocking, does so and returns
233   /// true, otherwise returns false.  This method is similar to
234   /// writeIfNotFull, but if you don't have a specific need for that
235   /// method you should use this one.
236   ///
237   /// One of the common usages of this method is to enqueue via the
238   /// move constructor, something like q.write(std::move(x)).  If write
239   /// returns false because the queue is full then x has not actually been
240   /// consumed, which looks strange.  To understand why it is actually okay
241   /// to use x afterward, remember that std::move is just a typecast that
242   /// provides an rvalue reference that enables use of a move constructor
243   /// or operator.  std::move doesn't actually move anything.  It could
244   /// more accurately be called std::rvalue_cast or std::move_permission.
245   template <typename ...Args>
246   bool write(Args&&... args) noexcept {
247     uint64_t ticket;
248     if (tryObtainReadyPushTicket(ticket)) {
249       // we have pre-validated that the ticket won't block
250       enqueueWithTicket(ticket, std::forward<Args>(args)...);
251       return true;
252     } else {
253       return false;
254     }
255   }
256
257   /// If the queue is not full, enqueues and returns true, otherwise
258   /// returns false.  Unlike write this method can be blocked by another
259   /// thread, specifically a read that has linearized (been assigned
260   /// a ticket) but not yet completed.  If you don't really need this
261   /// function you should probably use write.
262   ///
263   /// MPMCQueue isn't lock-free, so just because a read operation has
264   /// linearized (and isFull is false) doesn't mean that space has been
265   /// made available for another write.  In this situation write will
266   /// return false, but writeIfNotFull will wait for the dequeue to finish.
267   /// This method is required if you are composing queues and managing
268   /// your own wakeup, because it guarantees that after every successful
269   /// write a readIfNotFull will succeed.
270   template <typename ...Args>
271   bool writeIfNotFull(Args&&... args) noexcept {
272     uint64_t ticket;
273     if (tryObtainPromisedPushTicket(ticket)) {
274       // some other thread is already dequeuing the slot into which we
275       // are going to enqueue, but we might have to wait for them to finish
276       enqueueWithTicket(ticket, std::forward<Args>(args)...);
277       return true;
278     } else {
279       return false;
280     }
281   }
282
283   /// Moves a dequeued element onto elem, blocking until an element
284   /// is available
285   void blockingRead(T& elem) noexcept {
286     dequeueWithTicket(popTicket_++, elem);
287   }
288
289   /// If an item can be dequeued with no blocking, does so and returns
290   /// true, otherwise returns false.
291   bool read(T& elem) noexcept {
292     uint64_t ticket;
293     if (tryObtainReadyPopTicket(ticket)) {
294       // the ticket has been pre-validated to not block
295       dequeueWithTicket(ticket, elem);
296       return true;
297     } else {
298       return false;
299     }
300   }
301
302   /// If the queue is not empty, dequeues and returns true, otherwise
303   /// returns false.  If the matching write is still in progress then this
304   /// method may block waiting for it.  If you don't rely on being able
305   /// to dequeue (such as by counting completed write) then you should
306   /// prefer read.
307   bool readIfNotEmpty(T& elem) noexcept {
308     uint64_t ticket;
309     if (tryObtainPromisedPopTicket(ticket)) {
310       // the matching enqueue already has a ticket, but might not be done
311       dequeueWithTicket(ticket, elem);
312       return true;
313     } else {
314       return false;
315     }
316   }
317
318  private:
319   enum {
320     /// Once every kAdaptationFreq we will spin longer, to try to estimate
321     /// the proper spin backoff
322     kAdaptationFreq = 128,
323
324     /// Memory locations on the same cache line are subject to false
325     /// sharing, which is very bad for performance
326     kFalseSharingRange = 64,
327
328     /// To avoid false sharing in slots_ with neighboring memory
329     /// allocations, we pad it with this many SingleElementQueue-s at
330     /// each end
331     kSlotPadding = 1 +
332         (kFalseSharingRange - 1) / sizeof(detail::SingleElementQueue<T,Atom>)
333   };
334
335 #define FOLLY_ON_NEXT_CACHE_LINE __attribute__((aligned(kFalseSharingRange)))
336
337   /// The maximum number of items in the queue at once
338   size_t capacity_ FOLLY_ON_NEXT_CACHE_LINE;
339
340   /// An array of capacity_ SingleElementQueue-s, each of which holds
341   /// either 0 or 1 item.  We over-allocate by 2 * kSlotPadding and don't
342   /// touch the slots at either end, to avoid false sharing
343   detail::SingleElementQueue<T,Atom>* slots_;
344
345   /// The number of slots_ indices that we advance for each ticket, to
346   /// avoid false sharing.  Ideally slots_[i] and slots_[i + stride_]
347   /// aren't on the same cache line
348   int stride_;
349
350   /// Enqueuers get tickets from here
351   Atom<uint64_t> pushTicket_ FOLLY_ON_NEXT_CACHE_LINE;
352
353   /// Dequeuers get tickets from here
354   Atom<uint64_t> popTicket_ FOLLY_ON_NEXT_CACHE_LINE;
355
356   /// This is how many times we will spin before using FUTEX_WAIT when
357   /// the queue is full on enqueue, adaptively computed by occasionally
358   /// spinning for longer and smoothing with an exponential moving average
359   Atom<int> pushSpinCutoff_ FOLLY_ON_NEXT_CACHE_LINE;
360
361   /// The adaptive spin cutoff when the queue is empty on dequeue
362   Atom<int> popSpinCutoff_ FOLLY_ON_NEXT_CACHE_LINE;
363
364   /// Alignment doesn't avoid false sharing at the end of the struct,
365   /// so fill out the last cache line
366   char padding_[kFalseSharingRange - sizeof(Atom<int>)];
367
368 #undef FOLLY_ON_NEXT_CACHE_LINE
369
370   /// We assign tickets in increasing order, but we don't want to
371   /// access neighboring elements of slots_ because that will lead to
372   /// false sharing (multiple cores accessing the same cache line even
373   /// though they aren't accessing the same bytes in that cache line).
374   /// To avoid this we advance by stride slots per ticket.
375   ///
376   /// We need gcd(capacity, stride) to be 1 so that we will use all
377   /// of the slots.  We ensure this by only considering prime strides,
378   /// which either have no common divisors with capacity or else have
379   /// a zero remainder after dividing by capacity.  That is sufficient
380   /// to guarantee correctness, but we also want to actually spread the
381   /// accesses away from each other to avoid false sharing (consider a
382   /// stride of 7 with a capacity of 8).  To that end we try a few taking
383   /// care to observe that advancing by -1 is as bad as advancing by 1
384   /// when in comes to false sharing.
385   ///
386   /// The simple way to avoid false sharing would be to pad each
387   /// SingleElementQueue, but since we have capacity_ of them that could
388   /// waste a lot of space.
389   static int computeStride(size_t capacity) noexcept {
390     static const int smallPrimes[] = { 2, 3, 5, 7, 11, 13, 17, 19, 23 };
391
392     int bestStride = 1;
393     size_t bestSep = 1;
394     for (int stride : smallPrimes) {
395       if ((stride % capacity) == 0 || (capacity % stride) == 0) {
396         continue;
397       }
398       size_t sep = stride % capacity;
399       sep = std::min(sep, capacity - sep);
400       if (sep > bestSep) {
401         bestStride = stride;
402         bestSep = sep;
403       }
404     }
405     return bestStride;
406   }
407
408   /// Returns the index into slots_ that should be used when enqueuing or
409   /// dequeuing with the specified ticket
410   size_t idx(uint64_t ticket) noexcept {
411     return ((ticket * stride_) % capacity_) + kSlotPadding;
412   }
413
414   /// Maps an enqueue or dequeue ticket to the turn should be used at the
415   /// corresponding SingleElementQueue
416   uint32_t turn(uint64_t ticket) noexcept {
417     return ticket / capacity_;
418   }
419
420   /// Tries to obtain a push ticket for which SingleElementQueue::enqueue
421   /// won't block.  Returns true on immediate success, false on immediate
422   /// failure.
423   bool tryObtainReadyPushTicket(uint64_t& rv) noexcept {
424     auto ticket = pushTicket_.load(std::memory_order_acquire); // A
425     while (true) {
426       if (!slots_[idx(ticket)].mayEnqueue(turn(ticket))) {
427         // if we call enqueue(ticket, ...) on the SingleElementQueue
428         // right now it would block, but this might no longer be the next
429         // ticket.  We can increase the chance of tryEnqueue success under
430         // contention (without blocking) by rechecking the ticket dispenser
431         auto prev = ticket;
432         ticket = pushTicket_.load(std::memory_order_acquire); // B
433         if (prev == ticket) {
434           // mayEnqueue was bracketed by two reads (A or prev B or prev
435           // failing CAS to B), so we are definitely unable to enqueue
436           return false;
437         }
438       } else {
439         // we will bracket the mayEnqueue check with a read (A or prev B
440         // or prev failing CAS) and the following CAS.  If the CAS fails
441         // it will effect a load of pushTicket_
442         if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
443           rv = ticket;
444           return true;
445         }
446       }
447     }
448   }
449
450   /// Tries to obtain a push ticket which can be satisfied if all
451   /// in-progress pops complete.  This function does not block, but
452   /// blocking may be required when using the returned ticket if some
453   /// other thread's pop is still in progress (ticket has been granted but
454   /// pop has not yet completed).
455   bool tryObtainPromisedPushTicket(uint64_t& rv) noexcept {
456     auto numPushes = pushTicket_.load(std::memory_order_acquire); // A
457     while (true) {
458       auto numPops = popTicket_.load(std::memory_order_acquire); // B
459       // n will be negative if pops are pending
460       int64_t n = numPushes - numPops;
461       if (n >= static_cast<ssize_t>(capacity_)) {
462         // Full, linearize at B.  We don't need to recheck the read we
463         // performed at A, because if numPushes was stale at B then the
464         // real numPushes value is even worse
465         return false;
466       }
467       if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) {
468         rv = numPushes;
469         return true;
470       }
471     }
472   }
473
474   /// Tries to obtain a pop ticket for which SingleElementQueue::dequeue
475   /// won't block.  Returns true on immediate success, false on immediate
476   /// failure.
477   bool tryObtainReadyPopTicket(uint64_t& rv) noexcept {
478     auto ticket = popTicket_.load(std::memory_order_acquire);
479     while (true) {
480       if (!slots_[idx(ticket)].mayDequeue(turn(ticket))) {
481         auto prev = ticket;
482         ticket = popTicket_.load(std::memory_order_acquire);
483         if (prev == ticket) {
484           return false;
485         }
486       } else {
487         if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
488           rv = ticket;
489           return true;
490         }
491       }
492     }
493   }
494
495   /// Similar to tryObtainReadyPopTicket, but returns a pop ticket whose
496   /// corresponding push ticket has already been handed out, rather than
497   /// returning one whose corresponding push ticket has already been
498   /// completed.  This means that there is a possibility that the caller
499   /// will block when using the ticket, but it allows the user to rely on
500   /// the fact that if enqueue has succeeded, tryObtainPromisedPopTicket
501   /// will return true.  The "try" part of this is that we won't have
502   /// to block waiting for someone to call enqueue, although we might
503   /// have to block waiting for them to finish executing code inside the
504   /// MPMCQueue itself.
505   bool tryObtainPromisedPopTicket(uint64_t& rv) noexcept {
506     auto numPops = popTicket_.load(std::memory_order_acquire); // A
507     while (true) {
508       auto numPushes = pushTicket_.load(std::memory_order_acquire); // B
509       if (numPops >= numPushes) {
510         // Empty, or empty with pending pops.  Linearize at B.  We don't
511         // need to recheck the read we performed at A, because if numPops
512         // is stale then the fresh value is larger and the >= is still true
513         return false;
514       }
515       if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) {
516         rv = numPops;
517         return true;
518       }
519     }
520   }
521
522   // Given a ticket, constructs an enqueued item using args
523   template <typename ...Args>
524   void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept {
525     slots_[idx(ticket)].enqueue(turn(ticket),
526                                 pushSpinCutoff_,
527                                 (ticket % kAdaptationFreq) == 0,
528                                 std::forward<Args>(args)...);
529   }
530
531   // Given a ticket, dequeues the corresponding element
532   void dequeueWithTicket(uint64_t ticket, T& elem) noexcept {
533     slots_[idx(ticket)].dequeue(turn(ticket),
534                                 popSpinCutoff_,
535                                 (ticket % kAdaptationFreq) == 0,
536                                 elem);
537   }
538 };
539
540
541 namespace detail {
542
543 /// A TurnSequencer allows threads to order their execution according to
544 /// a monotonically increasing (with wraparound) "turn" value.  The two
545 /// operations provided are to wait for turn T, and to move to the next
546 /// turn.  Every thread that is waiting for T must have arrived before
547 /// that turn is marked completed (for MPMCQueue only one thread waits
548 /// for any particular turn, so this is trivially true).
549 ///
550 /// TurnSequencer's state_ holds 26 bits of the current turn (shifted
551 /// left by 6), along with a 6 bit saturating value that records the
552 /// maximum waiter minus the current turn.  Wraparound of the turn space
553 /// is expected and handled.  This allows us to atomically adjust the
554 /// number of outstanding waiters when we perform a FUTEX_WAKE operation.
555 /// Compare this strategy to sem_t's separate num_waiters field, which
556 /// isn't decremented until after the waiting thread gets scheduled,
557 /// during which time more enqueues might have occurred and made pointless
558 /// FUTEX_WAKE calls.
559 ///
560 /// TurnSequencer uses futex() directly.  It is optimized for the
561 /// case that the highest awaited turn is 32 or less higher than the
562 /// current turn.  We use the FUTEX_WAIT_BITSET variant, which lets
563 /// us embed 32 separate wakeup channels in a single futex.  See
564 /// http://locklessinc.com/articles/futex_cheat_sheet for a description.
565 ///
566 /// We only need to keep exact track of the delta between the current
567 /// turn and the maximum waiter for the 32 turns that follow the current
568 /// one, because waiters at turn t+32 will be awoken at turn t.  At that
569 /// point they can then adjust the delta using the higher base.  Since we
570 /// need to encode waiter deltas of 0 to 32 inclusive, we use 6 bits.
571 /// We actually store waiter deltas up to 63, since that might reduce
572 /// the number of CAS operations a tiny bit.
573 ///
574 /// To avoid some futex() calls entirely, TurnSequencer uses an adaptive
575 /// spin cutoff before waiting.  The overheads (and convergence rate)
576 /// of separately tracking the spin cutoff for each TurnSequencer would
577 /// be prohibitive, so the actual storage is passed in as a parameter and
578 /// updated atomically.  This also lets the caller use different adaptive
579 /// cutoffs for different operations (read versus write, for example).
580 /// To avoid contention, the spin cutoff is only updated when requested
581 /// by the caller.
582 template <template<typename> class Atom>
583 struct TurnSequencer {
584   explicit TurnSequencer(const uint32_t firstTurn = 0) noexcept
585       : state_(encode(firstTurn << kTurnShift, 0))
586   {}
587
588   /// Returns true iff a call to waitForTurn(turn, ...) won't block
589   bool isTurn(const uint32_t turn) const noexcept {
590     auto state = state_.load(std::memory_order_acquire);
591     return decodeCurrentSturn(state) == (turn << kTurnShift);
592   }
593
594   // Internally we always work with shifted turn values, which makes the
595   // truncation and wraparound work correctly.  This leaves us bits at
596   // the bottom to store the number of waiters.  We call shifted turns
597   // "sturns" inside this class.
598
599   /// Blocks the current thread until turn has arrived.  If
600   /// updateSpinCutoff is true then this will spin for up to kMaxSpins tries
601   /// before blocking and will adjust spinCutoff based on the results,
602   /// otherwise it will spin for at most spinCutoff spins.
603   void waitForTurn(const uint32_t turn,
604                    Atom<int>& spinCutoff,
605                    const bool updateSpinCutoff) noexcept {
606     int prevThresh = spinCutoff.load(std::memory_order_relaxed);
607     const int effectiveSpinCutoff =
608         updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh;
609     int tries;
610
611     const uint32_t sturn = turn << kTurnShift;
612     for (tries = 0; ; ++tries) {
613       uint32_t state = state_.load(std::memory_order_acquire);
614       uint32_t current_sturn = decodeCurrentSturn(state);
615       if (current_sturn == sturn) {
616         break;
617       }
618
619       // wrap-safe version of assert(current_sturn < sturn)
620       assert(sturn - current_sturn < std::numeric_limits<uint32_t>::max() / 2);
621
622       // the first effectSpinCutoff tries are spins, after that we will
623       // record ourself as a waiter and block with futexWait
624       if (tries < effectiveSpinCutoff) {
625         asm volatile ("pause");
626         continue;
627       }
628
629       uint32_t current_max_waiter_delta = decodeMaxWaitersDelta(state);
630       uint32_t our_waiter_delta = (sturn - current_sturn) >> kTurnShift;
631       uint32_t new_state;
632       if (our_waiter_delta <= current_max_waiter_delta) {
633         // state already records us as waiters, probably because this
634         // isn't our first time around this loop
635         new_state = state;
636       } else {
637         new_state = encode(current_sturn, our_waiter_delta);
638         if (state != new_state &&
639             !state_.compare_exchange_strong(state, new_state)) {
640           continue;
641         }
642       }
643       state_.futexWait(new_state, futexChannel(turn));
644     }
645
646     if (updateSpinCutoff || prevThresh == 0) {
647       // if we hit kMaxSpins then spinning was pointless, so the right
648       // spinCutoff is kMinSpins
649       int target;
650       if (tries >= kMaxSpins) {
651         target = kMinSpins;
652       } else {
653         // to account for variations, we allow ourself to spin 2*N when
654         // we think that N is actually required in order to succeed
655         target = std::min(int{kMaxSpins}, std::max(int{kMinSpins}, tries * 2));
656       }
657
658       if (prevThresh == 0) {
659         // bootstrap
660         spinCutoff = target;
661       } else {
662         // try once, keep moving if CAS fails.  Exponential moving average
663         // with alpha of 7/8
664         spinCutoff.compare_exchange_weak(
665             prevThresh, prevThresh + (target - prevThresh) / 8);
666       }
667     }
668   }
669
670   /// Unblocks a thread running waitForTurn(turn + 1)
671   void completeTurn(const uint32_t turn) noexcept {
672     uint32_t state = state_.load(std::memory_order_acquire);
673     while (true) {
674       assert(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state)));
675       uint32_t max_waiter_delta = decodeMaxWaitersDelta(state);
676       uint32_t new_state = encode(
677               (turn + 1) << kTurnShift,
678               max_waiter_delta == 0 ? 0 : max_waiter_delta - 1);
679       if (state_.compare_exchange_strong(state, new_state)) {
680         if (max_waiter_delta != 0) {
681           state_.futexWake(std::numeric_limits<int>::max(),
682                            futexChannel(turn + 1));
683         }
684         break;
685       }
686       // failing compare_exchange_strong updates first arg to the value
687       // that caused the failure, so no need to reread state_
688     }
689   }
690
691   /// Returns the least-most significant byte of the current uncompleted
692   /// turn.  The full 32 bit turn cannot be recovered.
693   uint8_t uncompletedTurnLSB() const noexcept {
694     return state_.load(std::memory_order_acquire) >> kTurnShift;
695   }
696
697  private:
698   enum : uint32_t {
699     /// kTurnShift counts the bits that are stolen to record the delta
700     /// between the current turn and the maximum waiter. It needs to be big
701     /// enough to record wait deltas of 0 to 32 inclusive.  Waiters more
702     /// than 32 in the future will be woken up 32*n turns early (since
703     /// their BITSET will hit) and will adjust the waiter count again.
704     /// We go a bit beyond and let the waiter count go up to 63, which
705     /// is free and might save us a few CAS
706     kTurnShift = 6,
707     kWaitersMask = (1 << kTurnShift) - 1,
708
709     /// The minimum spin count that we will adaptively select
710     kMinSpins = 20,
711
712     /// The maximum spin count that we will adaptively select, and the
713     /// spin count that will be used when probing to get a new data point
714     /// for the adaptation
715     kMaxSpins = 2000,
716   };
717
718   /// This holds both the current turn, and the highest waiting turn,
719   /// stored as (current_turn << 6) | min(63, max(waited_turn - current_turn))
720   Futex<Atom> state_;
721
722   /// Returns the bitmask to pass futexWait or futexWake when communicating
723   /// about the specified turn
724   int futexChannel(uint32_t turn) const noexcept {
725     return 1 << (turn & 31);
726   }
727
728   uint32_t decodeCurrentSturn(uint32_t state) const noexcept {
729     return state & ~kWaitersMask;
730   }
731
732   uint32_t decodeMaxWaitersDelta(uint32_t state) const noexcept {
733     return state & kWaitersMask;
734   }
735
736   uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept {
737     return currentSturn | std::min(uint32_t{ kWaitersMask }, maxWaiterD);
738   }
739 };
740
741
742 /// SingleElementQueue implements a blocking queue that holds at most one
743 /// item, and that requires its users to assign incrementing identifiers
744 /// (turns) to each enqueue and dequeue operation.  Note that the turns
745 /// used by SingleElementQueue are doubled inside the TurnSequencer
746 template <typename T, template <typename> class Atom>
747 struct SingleElementQueue {
748
749   ~SingleElementQueue() noexcept {
750     if ((sequencer_.uncompletedTurnLSB() & 1) == 1) {
751       // we are pending a dequeue, so we have a constructed item
752       destroyContents();
753     }
754   }
755
756   /// enqueue using in-place noexcept construction
757   template <typename ...Args,
758             typename = typename std::enable_if<
759                 std::is_nothrow_constructible<T,Args...>::value>::type>
760   void enqueue(const uint32_t turn,
761                Atom<int>& spinCutoff,
762                const bool updateSpinCutoff,
763                Args&&... args) noexcept {
764     sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
765     new (contents_) T(std::forward<Args>(args)...);
766     sequencer_.completeTurn(turn * 2);
767   }
768
769   /// enqueue using move construction, either real (if
770   /// is_nothrow_move_constructible) or simulated using relocation and
771   /// default construction (if IsRelocatable and has_nothrow_constructor)
772   template <typename = typename std::enable_if<
773                 (folly::IsRelocatable<T>::value &&
774                  boost::has_nothrow_constructor<T>::value) ||
775                 std::is_nothrow_constructible<T,T&&>::value>::type>
776   void enqueue(const uint32_t turn,
777                Atom<int>& spinCutoff,
778                const bool updateSpinCutoff,
779                T&& goner) noexcept {
780     if (std::is_nothrow_constructible<T,T&&>::value) {
781       // this is preferred
782       sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
783       new (contents_) T(std::move(goner));
784       sequencer_.completeTurn(turn * 2);
785     } else {
786       // simulate nothrow move with relocation, followed by default
787       // construction to fill the gap we created
788       sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
789       memcpy(contents_, &goner, sizeof(T));
790       sequencer_.completeTurn(turn * 2);
791       new (&goner) T();
792     }
793   }
794
795   bool mayEnqueue(const uint32_t turn) const noexcept {
796     return sequencer_.isTurn(turn * 2);
797   }
798
799   void dequeue(uint32_t turn,
800                Atom<int>& spinCutoff,
801                const bool updateSpinCutoff,
802                T& elem) noexcept {
803     if (folly::IsRelocatable<T>::value) {
804       // this version is preferred, because we do as much work as possible
805       // before waiting
806       try {
807         elem.~T();
808       } catch (...) {
809         // unlikely, but if we don't complete our turn the queue will die
810       }
811       sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
812       memcpy(&elem, contents_, sizeof(T));
813       sequencer_.completeTurn(turn * 2 + 1);
814     } else {
815       // use nothrow move assignment
816       sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
817       elem = std::move(*ptr());
818       destroyContents();
819       sequencer_.completeTurn(turn * 2 + 1);
820     }
821   }
822
823   bool mayDequeue(const uint32_t turn) const noexcept {
824     return sequencer_.isTurn(turn * 2 + 1);
825   }
826
827  private:
828   /// Storage for a T constructed with placement new
829   char contents_[sizeof(T)] __attribute__((aligned(alignof(T))));
830
831   /// Even turns are pushes, odd turns are pops
832   TurnSequencer<Atom> sequencer_;
833
834   T* ptr() noexcept {
835     return static_cast<T*>(static_cast<void*>(contents_));
836   }
837
838   void destroyContents() noexcept {
839     try {
840       ptr()->~T();
841     } catch (...) {
842       // g++ doesn't seem to have std::is_nothrow_destructible yet
843     }
844 #ifndef NDEBUG
845     memset(contents_, 'Q', sizeof(T));
846 #endif
847   }
848 };
849
850 } // namespace detail
851
852 } // namespace folly