X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2FMPMCQueue.h;h=a2681954c3031e19d8f1b76fcca35bd54b43341f;hb=0347a79f57ee6e1179c0482adacf3a7e6ff890ff;hp=9e3b88eb1c997857a29145b3a7bfbc25aec23ef6;hpb=1509cebb0407aee1ca0a381af6f737d53aa2325b;p=folly.git diff --git a/folly/MPMCQueue.h b/folly/MPMCQueue.h index 9e3b88eb..a2681954 100644 --- a/folly/MPMCQueue.h +++ b/folly/MPMCQueue.h @@ -1,5 +1,5 @@ /* - * Copyright 2014 Facebook, Inc. + * Copyright 2015 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,14 +27,14 @@ #include #include -#include +#include namespace folly { namespace detail { template class Atom> -class SingleElementQueue; +struct SingleElementQueue; template class MPMCPipelineStageImpl; @@ -46,6 +46,14 @@ template class MPMCPipelineStageImpl; /// up front. The bulk of the work of enqueuing and dequeuing can be /// performed in parallel. /// +/// MPMCQueue is linearizable. That means that if a call to write(A) +/// returns before a call to write(B) begins, then A will definitely end up +/// in the queue before B, and if a call to read(X) returns before a call +/// to read(Y) is started, that X will be something from earlier in the +/// queue than Y. This also means that if a read call returns a value, you +/// can be sure that all previous elements of the queue have been assigned +/// a reader (that reader might not yet have returned, but it exists). +/// /// The underlying implementation uses a ticket dispenser for the head and /// the tail, spreading accesses across N single-element queues to produce /// a queue with capacity N. The ticket dispensers use atomic increment, @@ -57,8 +65,7 @@ template class MPMCPipelineStageImpl; /// when the MPMCQueue's capacity is smaller than the number of enqueuers /// or dequeuers). /// -/// NOEXCEPT INTERACTION: Ticket-based queues separate the assignment -/// of In benchmarks (contained in tao/queues/ConcurrentQueueTests) +/// In benchmarks (contained in tao/queues/ConcurrentQueueTests) /// it handles 1 to 1, 1 to N, N to 1, and N to M thread counts better /// than any of the alternatives present in fbcode, for both small (~10) /// and large capacities. In these benchmarks it is also faster than @@ -67,17 +74,25 @@ template class MPMCPipelineStageImpl; /// queue because it uses futex() to block and unblock waiting threads, /// rather than spinning with sched_yield. /// -/// queue positions from the actual construction of the in-queue elements, -/// which means that the T constructor used during enqueue must not throw -/// an exception. This is enforced at compile time using type traits, -/// which requires that T be adorned with accurate noexcept information. -/// If your type does not use noexcept, you will have to wrap it in -/// something that provides the guarantee. We provide an alternate -/// safe implementation for types that don't use noexcept but that are -/// marked folly::IsRelocatable and boost::has_nothrow_constructor, -/// which is common for folly types. In particular, if you can declare -/// FOLLY_ASSUME_FBVECTOR_COMPATIBLE then your type can be put in -/// MPMCQueue. +/// NOEXCEPT INTERACTION: tl;dr; If it compiles you're fine. Ticket-based +/// queues separate the assignment of queue positions from the actual +/// construction of the in-queue elements, which means that the T +/// constructor used during enqueue must not throw an exception. This is +/// enforced at compile time using type traits, which requires that T be +/// adorned with accurate noexcept information. If your type does not +/// use noexcept, you will have to wrap it in something that provides +/// the guarantee. We provide an alternate safe implementation for types +/// that don't use noexcept but that are marked folly::IsRelocatable +/// and boost::has_nothrow_constructor, which is common for folly types. +/// In particular, if you can declare FOLLY_ASSUME_FBVECTOR_COMPATIBLE +/// then your type can be put in MPMCQueue. +/// +/// If you have a pool of N queue consumers that you want to shut down +/// after the queue has drained, one way is to enqueue N sentinel values +/// to the queue. If the producer doesn't know how many consumers there +/// are you can enqueue one sentinel and then have each consumer requeue +/// two sentinels after it receives it (by requeuing 2 the shutdown can +/// complete in O(log P) time instead of O(P)). template class Atom = std::atomic> class MPMCQueue : boost::noncopyable { @@ -92,14 +107,21 @@ class MPMCQueue : boost::noncopyable { explicit MPMCQueue(size_t queueCapacity) : capacity_(queueCapacity) - , slots_(new detail::SingleElementQueue[queueCapacity + - 2 * kSlotPadding]) - , stride_(computeStride(queueCapacity)) , pushTicket_(0) , popTicket_(0) , pushSpinCutoff_(0) , popSpinCutoff_(0) { + if (queueCapacity == 0) + throw std::invalid_argument( + "MPMCQueue with explicit capacity 0 is impossible" + ); + + // would sigfpe if capacity is 0 + stride_ = computeStride(queueCapacity); + slots_ = new detail::SingleElementQueue[queueCapacity + + 2 * kSlotPadding]; + // ideally this would be a static assert, but g++ doesn't allow it assert(alignof(MPMCQueue) >= detail::CacheLocality::kFalseSharingRange); @@ -274,7 +296,7 @@ class MPMCQueue : boost::noncopyable { /// return false, but writeIfNotFull will wait for the dequeue to finish. /// This method is required if you are composing queues and managing /// your own wakeup, because it guarantees that after every successful - /// write a readIfNotFull will succeed. + /// write a readIfNotEmpty will succeed. template bool writeIfNotFull(Args&&... args) noexcept { uint64_t ticket; @@ -542,207 +564,6 @@ class MPMCQueue : boost::noncopyable { namespace detail { -/// A TurnSequencer allows threads to order their execution according to -/// a monotonically increasing (with wraparound) "turn" value. The two -/// operations provided are to wait for turn T, and to move to the next -/// turn. Every thread that is waiting for T must have arrived before -/// that turn is marked completed (for MPMCQueue only one thread waits -/// for any particular turn, so this is trivially true). -/// -/// TurnSequencer's state_ holds 26 bits of the current turn (shifted -/// left by 6), along with a 6 bit saturating value that records the -/// maximum waiter minus the current turn. Wraparound of the turn space -/// is expected and handled. This allows us to atomically adjust the -/// number of outstanding waiters when we perform a FUTEX_WAKE operation. -/// Compare this strategy to sem_t's separate num_waiters field, which -/// isn't decremented until after the waiting thread gets scheduled, -/// during which time more enqueues might have occurred and made pointless -/// FUTEX_WAKE calls. -/// -/// TurnSequencer uses futex() directly. It is optimized for the -/// case that the highest awaited turn is 32 or less higher than the -/// current turn. We use the FUTEX_WAIT_BITSET variant, which lets -/// us embed 32 separate wakeup channels in a single futex. See -/// http://locklessinc.com/articles/futex_cheat_sheet for a description. -/// -/// We only need to keep exact track of the delta between the current -/// turn and the maximum waiter for the 32 turns that follow the current -/// one, because waiters at turn t+32 will be awoken at turn t. At that -/// point they can then adjust the delta using the higher base. Since we -/// need to encode waiter deltas of 0 to 32 inclusive, we use 6 bits. -/// We actually store waiter deltas up to 63, since that might reduce -/// the number of CAS operations a tiny bit. -/// -/// To avoid some futex() calls entirely, TurnSequencer uses an adaptive -/// spin cutoff before waiting. The overheads (and convergence rate) -/// of separately tracking the spin cutoff for each TurnSequencer would -/// be prohibitive, so the actual storage is passed in as a parameter and -/// updated atomically. This also lets the caller use different adaptive -/// cutoffs for different operations (read versus write, for example). -/// To avoid contention, the spin cutoff is only updated when requested -/// by the caller. -template class Atom> -struct TurnSequencer { - explicit TurnSequencer(const uint32_t firstTurn = 0) noexcept - : state_(encode(firstTurn << kTurnShift, 0)) - {} - - /// Returns true iff a call to waitForTurn(turn, ...) won't block - bool isTurn(const uint32_t turn) const noexcept { - auto state = state_.load(std::memory_order_acquire); - return decodeCurrentSturn(state) == (turn << kTurnShift); - } - - // Internally we always work with shifted turn values, which makes the - // truncation and wraparound work correctly. This leaves us bits at - // the bottom to store the number of waiters. We call shifted turns - // "sturns" inside this class. - - /// Blocks the current thread until turn has arrived. If - /// updateSpinCutoff is true then this will spin for up to kMaxSpins tries - /// before blocking and will adjust spinCutoff based on the results, - /// otherwise it will spin for at most spinCutoff spins. - void waitForTurn(const uint32_t turn, - Atom& spinCutoff, - const bool updateSpinCutoff) noexcept { - uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed); - const uint32_t effectiveSpinCutoff = - updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh; - - uint32_t tries; - const uint32_t sturn = turn << kTurnShift; - for (tries = 0; ; ++tries) { - uint32_t state = state_.load(std::memory_order_acquire); - uint32_t current_sturn = decodeCurrentSturn(state); - if (current_sturn == sturn) { - break; - } - - // wrap-safe version of assert(current_sturn < sturn) - assert(sturn - current_sturn < std::numeric_limits::max() / 2); - - // the first effectSpinCutoff tries are spins, after that we will - // record ourself as a waiter and block with futexWait - if (tries < effectiveSpinCutoff) { - asm volatile ("pause"); - continue; - } - - uint32_t current_max_waiter_delta = decodeMaxWaitersDelta(state); - uint32_t our_waiter_delta = (sturn - current_sturn) >> kTurnShift; - uint32_t new_state; - if (our_waiter_delta <= current_max_waiter_delta) { - // state already records us as waiters, probably because this - // isn't our first time around this loop - new_state = state; - } else { - new_state = encode(current_sturn, our_waiter_delta); - if (state != new_state && - !state_.compare_exchange_strong(state, new_state)) { - continue; - } - } - state_.futexWait(new_state, futexChannel(turn)); - } - - if (updateSpinCutoff || prevThresh == 0) { - // if we hit kMaxSpins then spinning was pointless, so the right - // spinCutoff is kMinSpins - uint32_t target; - if (tries >= kMaxSpins) { - target = kMinSpins; - } else { - // to account for variations, we allow ourself to spin 2*N when - // we think that N is actually required in order to succeed - target = std::min(kMaxSpins, - std::max(kMinSpins, tries * 2)); - } - - if (prevThresh == 0) { - // bootstrap - spinCutoff.store(target); - } else { - // try once, keep moving if CAS fails. Exponential moving average - // with alpha of 7/8 - // Be careful that the quantity we add to prevThresh is signed. - spinCutoff.compare_exchange_weak( - prevThresh, prevThresh + int(target - prevThresh) / 8); - } - } - } - - /// Unblocks a thread running waitForTurn(turn + 1) - void completeTurn(const uint32_t turn) noexcept { - uint32_t state = state_.load(std::memory_order_acquire); - while (true) { - assert(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state))); - uint32_t max_waiter_delta = decodeMaxWaitersDelta(state); - uint32_t new_state = encode( - (turn + 1) << kTurnShift, - max_waiter_delta == 0 ? 0 : max_waiter_delta - 1); - if (state_.compare_exchange_strong(state, new_state)) { - if (max_waiter_delta != 0) { - state_.futexWake(std::numeric_limits::max(), - futexChannel(turn + 1)); - } - break; - } - // failing compare_exchange_strong updates first arg to the value - // that caused the failure, so no need to reread state_ - } - } - - /// Returns the least-most significant byte of the current uncompleted - /// turn. The full 32 bit turn cannot be recovered. - uint8_t uncompletedTurnLSB() const noexcept { - return state_.load(std::memory_order_acquire) >> kTurnShift; - } - - private: - enum : uint32_t { - /// kTurnShift counts the bits that are stolen to record the delta - /// between the current turn and the maximum waiter. It needs to be big - /// enough to record wait deltas of 0 to 32 inclusive. Waiters more - /// than 32 in the future will be woken up 32*n turns early (since - /// their BITSET will hit) and will adjust the waiter count again. - /// We go a bit beyond and let the waiter count go up to 63, which - /// is free and might save us a few CAS - kTurnShift = 6, - kWaitersMask = (1 << kTurnShift) - 1, - - /// The minimum spin count that we will adaptively select - kMinSpins = 20, - - /// The maximum spin count that we will adaptively select, and the - /// spin count that will be used when probing to get a new data point - /// for the adaptation - kMaxSpins = 2000, - }; - - /// This holds both the current turn, and the highest waiting turn, - /// stored as (current_turn << 6) | min(63, max(waited_turn - current_turn)) - Futex state_; - - /// Returns the bitmask to pass futexWait or futexWake when communicating - /// about the specified turn - int futexChannel(uint32_t turn) const noexcept { - return 1 << (turn & 31); - } - - uint32_t decodeCurrentSturn(uint32_t state) const noexcept { - return state & ~kWaitersMask; - } - - uint32_t decodeMaxWaitersDelta(uint32_t state) const noexcept { - return state & kWaitersMask; - } - - uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept { - return currentSturn | std::min(uint32_t{ kWaitersMask }, maxWaiterD); - } -}; - - /// SingleElementQueue implements a blocking queue that holds at most one /// item, and that requires its users to assign incrementing identifiers /// (turns) to each enqueue and dequeue operation. Note that the turns @@ -760,7 +581,7 @@ struct SingleElementQueue { /// enqueue using in-place noexcept construction template ::value>::type> + std::is_nothrow_constructible::value>::type> void enqueue(const uint32_t turn, Atom& spinCutoff, const bool updateSpinCutoff, @@ -781,19 +602,13 @@ struct SingleElementQueue { Atom& spinCutoff, const bool updateSpinCutoff, T&& goner) noexcept { - if (std::is_nothrow_constructible::value) { - // this is preferred - sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); - new (&contents_) T(std::move(goner)); - sequencer_.completeTurn(turn * 2); - } else { - // simulate nothrow move with relocation, followed by default - // construction to fill the gap we created - sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); - memcpy(&contents_, &goner, sizeof(T)); - sequencer_.completeTurn(turn * 2); - new (&goner) T(); - } + enqueueImpl( + turn, + spinCutoff, + updateSpinCutoff, + std::move(goner), + typename std::conditional::value, + ImplByMove, ImplByRelocation>::type()); } bool mayEnqueue(const uint32_t turn) const noexcept { @@ -804,24 +619,13 @@ struct SingleElementQueue { Atom& spinCutoff, const bool updateSpinCutoff, T& elem) noexcept { - if (folly::IsRelocatable::value) { - // this version is preferred, because we do as much work as possible - // before waiting - try { - elem.~T(); - } catch (...) { - // unlikely, but if we don't complete our turn the queue will die - } - sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff); - memcpy(&elem, &contents_, sizeof(T)); - sequencer_.completeTurn(turn * 2 + 1); - } else { - // use nothrow move assignment - sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff); - elem = std::move(*ptr()); - destroyContents(); - sequencer_.completeTurn(turn * 2 + 1); - } + dequeueImpl(turn, + spinCutoff, + updateSpinCutoff, + elem, + typename std::conditional::value, + ImplByRelocation, + ImplByMove>::type()); } bool mayDequeue(const uint32_t turn) const noexcept { @@ -849,6 +653,63 @@ struct SingleElementQueue { memset(&contents_, 'Q', sizeof(T)); #endif } + + /// Tag classes for dispatching to enqueue/dequeue implementation. + struct ImplByRelocation {}; + struct ImplByMove {}; + + /// enqueue using nothrow move construction. + void enqueueImpl(const uint32_t turn, + Atom& spinCutoff, + const bool updateSpinCutoff, + T&& goner, + ImplByMove) noexcept { + sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); + new (&contents_) T(std::move(goner)); + sequencer_.completeTurn(turn * 2); + } + + /// enqueue by simulating nothrow move with relocation, followed by + /// default construction to a noexcept relocation. + void enqueueImpl(const uint32_t turn, + Atom& spinCutoff, + const bool updateSpinCutoff, + T&& goner, + ImplByRelocation) noexcept { + sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); + memcpy(&contents_, &goner, sizeof(T)); + sequencer_.completeTurn(turn * 2); + new (&goner) T(); + } + + /// dequeue by destructing followed by relocation. This version is preferred, + /// because as much work as possible can be done before waiting. + void dequeueImpl(uint32_t turn, + Atom& spinCutoff, + const bool updateSpinCutoff, + T& elem, + ImplByRelocation) noexcept { + try { + elem.~T(); + } catch (...) { + // unlikely, but if we don't complete our turn the queue will die + } + sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff); + memcpy(&elem, &contents_, sizeof(T)); + sequencer_.completeTurn(turn * 2 + 1); + } + + /// dequeue by nothrow move assignment. + void dequeueImpl(uint32_t turn, + Atom& spinCutoff, + const bool updateSpinCutoff, + T& elem, + ImplByMove) noexcept { + sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff); + elem = std::move(*ptr()); + destroyContents(); + sequencer_.completeTurn(turn * 2 + 1); + } }; } // namespace detail