X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2FMPMCQueue.h;h=d1e3b32d23f1e4ed72e6399d471f0986177f7916;hb=d9b08663d81210afcdba85169e1697a00aeda017;hp=fef1730e3bf1ffddf4a91bc9a0edb7e94f06ab7b;hpb=21effa23799fe52594ef73baffd3f2c8a6e8b43f;p=folly.git diff --git a/folly/MPMCQueue.h b/folly/MPMCQueue.h index fef1730e..d1e3b32d 100644 --- a/folly/MPMCQueue.h +++ b/folly/MPMCQueue.h @@ -1,5 +1,5 @@ /* - * Copyright 2013 Facebook, Inc. + * Copyright 2014 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,14 +20,13 @@ #include #include #include -#include #include -#include #include -#include +#include #include #include +#include #include namespace folly { @@ -80,27 +79,33 @@ template class MPMCPipelineStageImpl; /// FOLLY_ASSUME_FBVECTOR_COMPATIBLE then your type can be put in /// MPMCQueue. template class Atom = std::atomic, - typename = typename std::enable_if< - std::is_nothrow_constructible::value || - folly::IsRelocatable::value>::type> + template class Atom = std::atomic> class MPMCQueue : boost::noncopyable { + + static_assert(std::is_nothrow_constructible::value || + folly::IsRelocatable::value, + "T must be relocatable or have a noexcept move constructor"); + friend class detail::MPMCPipelineStageImpl; public: typedef T value_type; - explicit MPMCQueue(size_t capacity) - : capacity_(capacity) - , slots_(new detail::SingleElementQueue[capacity + + explicit MPMCQueue(size_t queueCapacity) + : capacity_(queueCapacity) + , slots_(new detail::SingleElementQueue[queueCapacity + 2 * kSlotPadding]) - , stride_(computeStride(capacity)) + , stride_(computeStride(queueCapacity)) , pushTicket_(0) , popTicket_(0) , pushSpinCutoff_(0) , popSpinCutoff_(0) { // ideally this would be a static assert, but g++ doesn't allow it - assert(alignof(MPMCQueue) >= kFalseSharingRange); + assert(alignof(MPMCQueue) + >= detail::CacheLocality::kFalseSharingRange); + assert(static_cast(static_cast(&popTicket_)) + - static_cast(static_cast(&pushTicket_)) + >= detail::CacheLocality::kFalseSharingRange); } /// A default-constructed queue is useful because a usable (non-zero @@ -324,27 +329,15 @@ class MPMCQueue : boost::noncopyable { /// the proper spin backoff kAdaptationFreq = 128, - /// Memory locations on the same cache line are subject to false - /// sharing, which is very bad for performance - kFalseSharingRange = 64, - /// To avoid false sharing in slots_ with neighboring memory /// allocations, we pad it with this many SingleElementQueue-s at /// each end - kSlotPadding = 1 + - (kFalseSharingRange - 1) / sizeof(detail::SingleElementQueue) + kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1) + / sizeof(detail::SingleElementQueue) + 1 }; - static_assert(kFalseSharingRange == 64, - "FOLLY_ON_NEXT_CACHE_LINE must track kFalseSharingRange"); - -// This literal "64' should be kFalseSharingRange, -// but gcc-4.8.0 and 4.8.1 reject it. -// FIXME: s/64/kFalseSharingRange/ if that ever changes. -#define FOLLY_ON_NEXT_CACHE_LINE __attribute__((aligned(64))) - /// The maximum number of items in the queue at once - size_t capacity_ FOLLY_ON_NEXT_CACHE_LINE; + size_t FOLLY_ALIGN_TO_AVOID_FALSE_SHARING capacity_; /// An array of capacity_ SingleElementQueue-s, each of which holds /// either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't @@ -357,24 +350,23 @@ class MPMCQueue : boost::noncopyable { int stride_; /// Enqueuers get tickets from here - Atom pushTicket_ FOLLY_ON_NEXT_CACHE_LINE; + Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushTicket_; /// Dequeuers get tickets from here - Atom popTicket_ FOLLY_ON_NEXT_CACHE_LINE; + Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popTicket_; /// This is how many times we will spin before using FUTEX_WAIT when /// the queue is full on enqueue, adaptively computed by occasionally /// spinning for longer and smoothing with an exponential moving average - Atom pushSpinCutoff_ FOLLY_ON_NEXT_CACHE_LINE; + Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushSpinCutoff_; /// The adaptive spin cutoff when the queue is empty on dequeue - Atom popSpinCutoff_ FOLLY_ON_NEXT_CACHE_LINE; + Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popSpinCutoff_; - /// Alignment doesn't avoid false sharing at the end of the struct, + /// Alignment doesn't prevent false sharing at the end of the struct, /// so fill out the last cache line - char padding_[kFalseSharingRange - sizeof(Atom)]; + char padding_[detail::CacheLocality::kFalseSharingRange - sizeof(Atom)]; -#undef FOLLY_ON_NEXT_CACHE_LINE /// We assign tickets in increasing order, but we don't want to /// access neighboring elements of slots_ because that will lead to @@ -666,7 +658,7 @@ struct TurnSequencer { if (prevThresh == 0) { // bootstrap - spinCutoff = target; + spinCutoff.store(target); } else { // try once, keep moving if CAS fails. Exponential moving average // with alpha of 7/8 @@ -771,7 +763,7 @@ struct SingleElementQueue { const bool updateSpinCutoff, Args&&... args) noexcept { sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); - new (contents_) T(std::forward(args)...); + new (&contents_) T(std::forward(args)...); sequencer_.completeTurn(turn * 2); } @@ -789,13 +781,13 @@ struct SingleElementQueue { if (std::is_nothrow_constructible::value) { // this is preferred sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); - new (contents_) T(std::move(goner)); + new (&contents_) T(std::move(goner)); sequencer_.completeTurn(turn * 2); } else { // simulate nothrow move with relocation, followed by default // construction to fill the gap we created sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); - memcpy(contents_, &goner, sizeof(T)); + memcpy(&contents_, &goner, sizeof(T)); sequencer_.completeTurn(turn * 2); new (&goner) T(); } @@ -818,7 +810,7 @@ struct SingleElementQueue { // unlikely, but if we don't complete our turn the queue will die } sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff); - memcpy(&elem, contents_, sizeof(T)); + memcpy(&elem, &contents_, sizeof(T)); sequencer_.completeTurn(turn * 2 + 1); } else { // use nothrow move assignment @@ -835,13 +827,13 @@ struct SingleElementQueue { private: /// Storage for a T constructed with placement new - char contents_[sizeof(T)] __attribute__((aligned(alignof(T)))); + typename std::aligned_storage::type contents_; /// Even turns are pushes, odd turns are pops TurnSequencer sequencer_; T* ptr() noexcept { - return static_cast(static_cast(contents_)); + return static_cast(static_cast(&contents_)); } void destroyContents() noexcept { @@ -851,7 +843,7 @@ struct SingleElementQueue { // g++ doesn't seem to have std::is_nothrow_destructible yet } #ifndef NDEBUG - memset(contents_, 'Q', sizeof(T)); + memset(&contents_, 'Q', sizeof(T)); #endif } };