/*
- * Copyright 2013 Facebook, Inc.
+ * Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <linux/futex.h>
#include <string.h>
#include <sys/syscall.h>
+#include <type_traits>
#include <unistd.h>
#include <folly/Traits.h>
+#include <folly/detail/CacheLocality.h>
#include <folly/detail/Futex.h>
namespace folly {
/// FOLLY_ASSUME_FBVECTOR_COMPATIBLE then your type can be put in
/// MPMCQueue.
template<typename T,
- template<typename> class Atom = std::atomic,
- typename = typename std::enable_if<
- std::is_nothrow_constructible<T,T&&>::value ||
- folly::IsRelocatable<T>::value>::type>
+ template<typename> class Atom = std::atomic>
class MPMCQueue : boost::noncopyable {
+
+ static_assert(std::is_nothrow_constructible<T,T&&>::value ||
+ folly::IsRelocatable<T>::value,
+ "T must be relocatable or have a noexcept move constructor");
+
friend class detail::MPMCPipelineStageImpl<T>;
public:
typedef T value_type;
- explicit MPMCQueue(size_t capacity)
- : capacity_(capacity)
- , slots_(new detail::SingleElementQueue<T,Atom>[capacity +
+ explicit MPMCQueue(size_t queueCapacity)
+ : capacity_(queueCapacity)
+ , slots_(new detail::SingleElementQueue<T,Atom>[queueCapacity +
2 * kSlotPadding])
- , stride_(computeStride(capacity))
+ , stride_(computeStride(queueCapacity))
, pushTicket_(0)
, popTicket_(0)
, pushSpinCutoff_(0)
, popSpinCutoff_(0)
{
// ideally this would be a static assert, but g++ doesn't allow it
- assert(alignof(MPMCQueue<T,Atom>) >= kFalseSharingRange);
+ assert(alignof(MPMCQueue<T,Atom>)
+ >= detail::CacheLocality::kFalseSharingRange);
+ assert(static_cast<uint8_t*>(static_cast<void*>(&popTicket_))
+ - static_cast<uint8_t*>(static_cast<void*>(&pushTicket_))
+ >= detail::CacheLocality::kFalseSharingRange);
}
/// A default-constructed queue is useful because a usable (non-zero
/// the proper spin backoff
kAdaptationFreq = 128,
- /// Memory locations on the same cache line are subject to false
- /// sharing, which is very bad for performance
- kFalseSharingRange = 64,
-
/// To avoid false sharing in slots_ with neighboring memory
/// allocations, we pad it with this many SingleElementQueue-s at
/// each end
- kSlotPadding = 1 +
- (kFalseSharingRange - 1) / sizeof(detail::SingleElementQueue<T,Atom>)
+ kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1)
+ / sizeof(detail::SingleElementQueue<T,Atom>) + 1
};
- static_assert(kFalseSharingRange == 64,
- "FOLLY_ON_NEXT_CACHE_LINE must track kFalseSharingRange");
-
-// This literal "64' should be kFalseSharingRange,
-// but gcc-4.8.0 and 4.8.1 reject it.
-// FIXME: s/64/kFalseSharingRange/ if that ever changes.
-#define FOLLY_ON_NEXT_CACHE_LINE __attribute__((aligned(64)))
-
/// The maximum number of items in the queue at once
- size_t capacity_ FOLLY_ON_NEXT_CACHE_LINE;
+ size_t FOLLY_ALIGN_TO_AVOID_FALSE_SHARING capacity_;
/// An array of capacity_ SingleElementQueue-s, each of which holds
/// either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't
int stride_;
/// Enqueuers get tickets from here
- Atom<uint64_t> pushTicket_ FOLLY_ON_NEXT_CACHE_LINE;
+ Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushTicket_;
/// Dequeuers get tickets from here
- Atom<uint64_t> popTicket_ FOLLY_ON_NEXT_CACHE_LINE;
+ Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popTicket_;
/// This is how many times we will spin before using FUTEX_WAIT when
/// the queue is full on enqueue, adaptively computed by occasionally
/// spinning for longer and smoothing with an exponential moving average
- Atom<int> pushSpinCutoff_ FOLLY_ON_NEXT_CACHE_LINE;
+ Atom<int> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushSpinCutoff_;
/// The adaptive spin cutoff when the queue is empty on dequeue
- Atom<int> popSpinCutoff_ FOLLY_ON_NEXT_CACHE_LINE;
+ Atom<int> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popSpinCutoff_;
- /// Alignment doesn't avoid false sharing at the end of the struct,
+ /// Alignment doesn't prevent false sharing at the end of the struct,
/// so fill out the last cache line
- char padding_[kFalseSharingRange - sizeof(Atom<int>)];
+ char padding_[detail::CacheLocality::kFalseSharingRange - sizeof(Atom<int>)];
-#undef FOLLY_ON_NEXT_CACHE_LINE
/// We assign tickets in increasing order, but we don't want to
/// access neighboring elements of slots_ because that will lead to
const bool updateSpinCutoff,
Args&&... args) noexcept {
sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
- new (contents_) T(std::forward<Args>(args)...);
+ new (&contents_) T(std::forward<Args>(args)...);
sequencer_.completeTurn(turn * 2);
}
if (std::is_nothrow_constructible<T,T&&>::value) {
// this is preferred
sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
- new (contents_) T(std::move(goner));
+ new (&contents_) T(std::move(goner));
sequencer_.completeTurn(turn * 2);
} else {
// simulate nothrow move with relocation, followed by default
// construction to fill the gap we created
sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
- memcpy(contents_, &goner, sizeof(T));
+ memcpy(&contents_, &goner, sizeof(T));
sequencer_.completeTurn(turn * 2);
new (&goner) T();
}
// unlikely, but if we don't complete our turn the queue will die
}
sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
- memcpy(&elem, contents_, sizeof(T));
+ memcpy(&elem, &contents_, sizeof(T));
sequencer_.completeTurn(turn * 2 + 1);
} else {
// use nothrow move assignment
private:
/// Storage for a T constructed with placement new
- char contents_[sizeof(T)] __attribute__((aligned(alignof(T))));
+ typename std::aligned_storage<sizeof(T),alignof(T)>::type contents_;
/// Even turns are pushes, odd turns are pops
TurnSequencer<Atom> sequencer_;
T* ptr() noexcept {
- return static_cast<T*>(static_cast<void*>(contents_));
+ return static_cast<T*>(static_cast<void*>(&contents_));
}
void destroyContents() noexcept {
// g++ doesn't seem to have std::is_nothrow_destructible yet
}
#ifndef NDEBUG
- memset(contents_, 'Q', sizeof(T));
+ memset(&contents_, 'Q', sizeof(T));
#endif
}
};