X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2FMPMCQueue.h;h=45c4ef74607499be45b7284a97411976b17c9034;hb=4e907f707a04a334256b939acc6b810c0b882f86;hp=defbdfe0007269fc39625094411e44e495182e43;hpb=048a4518ce3b44df833fa68518e84e7eef5ace04;p=folly.git diff --git a/folly/MPMCQueue.h b/folly/MPMCQueue.h index defbdfe0..45c4ef74 100644 --- a/folly/MPMCQueue.h +++ b/folly/MPMCQueue.h @@ -1,5 +1,5 @@ /* - * Copyright 2013 Facebook, Inc. + * Copyright 2015 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,14 +20,13 @@ #include #include #include -#include #include -#include #include -#include +#include #include #include +#include #include namespace folly { @@ -47,6 +46,14 @@ template class MPMCPipelineStageImpl; /// up front. The bulk of the work of enqueuing and dequeuing can be /// performed in parallel. /// +/// MPMCQueue is linearizable. That means that if a call to write(A) +/// returns before a call to write(B) begins, then A will definitely end up +/// in the queue before B, and if a call to read(X) returns before a call +/// to read(Y) is started, that X will be something from earlier in the +/// queue than Y. This also means that if a read call returns a value, you +/// can be sure that all previous elements of the queue have been assigned +/// a reader (that reader might not yet have returned, but it exists). +/// /// The underlying implementation uses a ticket dispenser for the head and /// the tail, spreading accesses across N single-element queues to produce /// a queue with capacity N. The ticket dispensers use atomic increment, @@ -58,8 +65,7 @@ template class MPMCPipelineStageImpl; /// when the MPMCQueue's capacity is smaller than the number of enqueuers /// or dequeuers). /// -/// NOEXCEPT INTERACTION: Ticket-based queues separate the assignment -/// of In benchmarks (contained in tao/queues/ConcurrentQueueTests) +/// In benchmarks (contained in tao/queues/ConcurrentQueueTests) /// it handles 1 to 1, 1 to N, N to 1, and N to M thread counts better /// than any of the alternatives present in fbcode, for both small (~10) /// and large capacities. In these benchmarks it is also faster than @@ -68,39 +74,60 @@ template class MPMCPipelineStageImpl; /// queue because it uses futex() to block and unblock waiting threads, /// rather than spinning with sched_yield. /// -/// queue positions from the actual construction of the in-queue elements, -/// which means that the T constructor used during enqueue must not throw -/// an exception. This is enforced at compile time using type traits, -/// which requires that T be adorned with accurate noexcept information. -/// If your type does not use noexcept, you will have to wrap it in -/// something that provides the guarantee. We provide an alternate -/// safe implementation for types that don't use noexcept but that are -/// marked folly::IsRelocatable and boost::has_nothrow_constructor, -/// which is common for folly types. In particular, if you can declare -/// FOLLY_ASSUME_FBVECTOR_COMPATIBLE then your type can be put in -/// MPMCQueue. +/// NOEXCEPT INTERACTION: tl;dr; If it compiles you're fine. Ticket-based +/// queues separate the assignment of queue positions from the actual +/// construction of the in-queue elements, which means that the T +/// constructor used during enqueue must not throw an exception. This is +/// enforced at compile time using type traits, which requires that T be +/// adorned with accurate noexcept information. If your type does not +/// use noexcept, you will have to wrap it in something that provides +/// the guarantee. We provide an alternate safe implementation for types +/// that don't use noexcept but that are marked folly::IsRelocatable +/// and boost::has_nothrow_constructor, which is common for folly types. +/// In particular, if you can declare FOLLY_ASSUME_FBVECTOR_COMPATIBLE +/// then your type can be put in MPMCQueue. +/// +/// If you have a pool of N queue consumers that you want to shut down +/// after the queue has drained, one way is to enqueue N sentinel values +/// to the queue. If the producer doesn't know how many consumers there +/// are you can enqueue one sentinel and then have each consumer requeue +/// two sentinels after it receives it (by requeuing 2 the shutdown can +/// complete in O(log P) time instead of O(P)). template class Atom = std::atomic, - typename = typename std::enable_if< - std::is_nothrow_constructible::value || - folly::IsRelocatable::value>::type> + template class Atom = std::atomic> class MPMCQueue : boost::noncopyable { + + static_assert(std::is_nothrow_constructible::value || + folly::IsRelocatable::value, + "T must be relocatable or have a noexcept move constructor"); + friend class detail::MPMCPipelineStageImpl; public: typedef T value_type; explicit MPMCQueue(size_t queueCapacity) : capacity_(queueCapacity) - , slots_(new detail::SingleElementQueue[queueCapacity + - 2 * kSlotPadding]) - , stride_(computeStride(queueCapacity)) , pushTicket_(0) , popTicket_(0) , pushSpinCutoff_(0) , popSpinCutoff_(0) { + if (queueCapacity == 0) + throw std::invalid_argument( + "MPMCQueue with explicit capacity 0 is impossible" + ); + + // would sigfpe if capacity is 0 + stride_ = computeStride(queueCapacity); + slots_ = new detail::SingleElementQueue[queueCapacity + + 2 * kSlotPadding]; + // ideally this would be a static assert, but g++ doesn't allow it - assert(alignof(MPMCQueue) >= kFalseSharingRange); + assert(alignof(MPMCQueue) + >= detail::CacheLocality::kFalseSharingRange); + assert(static_cast(static_cast(&popTicket_)) + - static_cast(static_cast(&pushTicket_)) + >= detail::CacheLocality::kFalseSharingRange); } /// A default-constructed queue is useful because a usable (non-zero @@ -269,7 +296,7 @@ class MPMCQueue : boost::noncopyable { /// return false, but writeIfNotFull will wait for the dequeue to finish. /// This method is required if you are composing queues and managing /// your own wakeup, because it guarantees that after every successful - /// write a readIfNotFull will succeed. + /// write a readIfNotEmpty will succeed. template bool writeIfNotFull(Args&&... args) noexcept { uint64_t ticket; @@ -324,27 +351,15 @@ class MPMCQueue : boost::noncopyable { /// the proper spin backoff kAdaptationFreq = 128, - /// Memory locations on the same cache line are subject to false - /// sharing, which is very bad for performance - kFalseSharingRange = 64, - /// To avoid false sharing in slots_ with neighboring memory /// allocations, we pad it with this many SingleElementQueue-s at /// each end - kSlotPadding = 1 + - (kFalseSharingRange - 1) / sizeof(detail::SingleElementQueue) + kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1) + / sizeof(detail::SingleElementQueue) + 1 }; - static_assert(kFalseSharingRange == 64, - "FOLLY_ON_NEXT_CACHE_LINE must track kFalseSharingRange"); - -// This literal "64' should be kFalseSharingRange, -// but gcc-4.8.0 and 4.8.1 reject it. -// FIXME: s/64/kFalseSharingRange/ if that ever changes. -#define FOLLY_ON_NEXT_CACHE_LINE __attribute__((aligned(64))) - /// The maximum number of items in the queue at once - size_t capacity_ FOLLY_ON_NEXT_CACHE_LINE; + size_t FOLLY_ALIGN_TO_AVOID_FALSE_SHARING capacity_; /// An array of capacity_ SingleElementQueue-s, each of which holds /// either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't @@ -357,24 +372,24 @@ class MPMCQueue : boost::noncopyable { int stride_; /// Enqueuers get tickets from here - Atom pushTicket_ FOLLY_ON_NEXT_CACHE_LINE; + Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushTicket_; /// Dequeuers get tickets from here - Atom popTicket_ FOLLY_ON_NEXT_CACHE_LINE; + Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popTicket_; /// This is how many times we will spin before using FUTEX_WAIT when /// the queue is full on enqueue, adaptively computed by occasionally /// spinning for longer and smoothing with an exponential moving average - Atom pushSpinCutoff_ FOLLY_ON_NEXT_CACHE_LINE; + Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushSpinCutoff_; /// The adaptive spin cutoff when the queue is empty on dequeue - Atom popSpinCutoff_ FOLLY_ON_NEXT_CACHE_LINE; + Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popSpinCutoff_; - /// Alignment doesn't avoid false sharing at the end of the struct, + /// Alignment doesn't prevent false sharing at the end of the struct, /// so fill out the last cache line - char padding_[kFalseSharingRange - sizeof(Atom)]; + char padding_[detail::CacheLocality::kFalseSharingRange - + sizeof(Atom)]; -#undef FOLLY_ON_NEXT_CACHE_LINE /// We assign tickets in increasing order, but we don't want to /// access neighboring elements of slots_ because that will lead to @@ -610,13 +625,13 @@ struct TurnSequencer { /// before blocking and will adjust spinCutoff based on the results, /// otherwise it will spin for at most spinCutoff spins. void waitForTurn(const uint32_t turn, - Atom& spinCutoff, + Atom& spinCutoff, const bool updateSpinCutoff) noexcept { - int prevThresh = spinCutoff.load(std::memory_order_relaxed); - const int effectiveSpinCutoff = + uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed); + const uint32_t effectiveSpinCutoff = updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh; - int tries; + uint32_t tries; const uint32_t sturn = turn << kTurnShift; for (tries = 0; ; ++tries) { uint32_t state = state_.load(std::memory_order_acquire); @@ -655,23 +670,25 @@ struct TurnSequencer { if (updateSpinCutoff || prevThresh == 0) { // if we hit kMaxSpins then spinning was pointless, so the right // spinCutoff is kMinSpins - int target; + uint32_t target; if (tries >= kMaxSpins) { target = kMinSpins; } else { // to account for variations, we allow ourself to spin 2*N when // we think that N is actually required in order to succeed - target = std::min(int{kMaxSpins}, std::max(int{kMinSpins}, tries * 2)); + target = std::min(kMaxSpins, + std::max(kMinSpins, tries * 2)); } if (prevThresh == 0) { // bootstrap - spinCutoff = target; + spinCutoff.store(target); } else { // try once, keep moving if CAS fails. Exponential moving average // with alpha of 7/8 + // Be careful that the quantity we add to prevThresh is signed. spinCutoff.compare_exchange_weak( - prevThresh, prevThresh + (target - prevThresh) / 8); + prevThresh, prevThresh + int(target - prevThresh) / 8); } } } @@ -767,11 +784,11 @@ struct SingleElementQueue { typename = typename std::enable_if< std::is_nothrow_constructible::value>::type> void enqueue(const uint32_t turn, - Atom& spinCutoff, + Atom& spinCutoff, const bool updateSpinCutoff, Args&&... args) noexcept { sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); - new (contents_) T(std::forward(args)...); + new (&contents_) T(std::forward(args)...); sequencer_.completeTurn(turn * 2); } @@ -783,19 +800,19 @@ struct SingleElementQueue { boost::has_nothrow_constructor::value) || std::is_nothrow_constructible::value>::type> void enqueue(const uint32_t turn, - Atom& spinCutoff, + Atom& spinCutoff, const bool updateSpinCutoff, T&& goner) noexcept { if (std::is_nothrow_constructible::value) { // this is preferred sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); - new (contents_) T(std::move(goner)); + new (&contents_) T(std::move(goner)); sequencer_.completeTurn(turn * 2); } else { // simulate nothrow move with relocation, followed by default // construction to fill the gap we created sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); - memcpy(contents_, &goner, sizeof(T)); + memcpy(&contents_, &goner, sizeof(T)); sequencer_.completeTurn(turn * 2); new (&goner) T(); } @@ -806,7 +823,7 @@ struct SingleElementQueue { } void dequeue(uint32_t turn, - Atom& spinCutoff, + Atom& spinCutoff, const bool updateSpinCutoff, T& elem) noexcept { if (folly::IsRelocatable::value) { @@ -818,7 +835,7 @@ struct SingleElementQueue { // unlikely, but if we don't complete our turn the queue will die } sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff); - memcpy(&elem, contents_, sizeof(T)); + memcpy(&elem, &contents_, sizeof(T)); sequencer_.completeTurn(turn * 2 + 1); } else { // use nothrow move assignment @@ -835,13 +852,13 @@ struct SingleElementQueue { private: /// Storage for a T constructed with placement new - char contents_[sizeof(T)] __attribute__((aligned(alignof(T)))); + typename std::aligned_storage::type contents_; /// Even turns are pushes, odd turns are pops TurnSequencer sequencer_; T* ptr() noexcept { - return static_cast(static_cast(contents_)); + return static_cast(static_cast(&contents_)); } void destroyContents() noexcept { @@ -851,7 +868,7 @@ struct SingleElementQueue { // g++ doesn't seem to have std::is_nothrow_destructible yet } #ifndef NDEBUG - memset(contents_, 'Q', sizeof(T)); + memset(&contents_, 'Q', sizeof(T)); #endif } };