add support for whenAll to waitWithSemaphore
[folly.git] / folly / MPMCQueue.h
index defbdfe0007269fc39625094411e44e495182e43..34ed779ac079dcf28cf41cb99736557e97e79347 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2013 Facebook, Inc.
+ * Copyright 2014 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 #include <linux/futex.h>
 #include <string.h>
 #include <sys/syscall.h>
+#include <type_traits>
 #include <unistd.h>
 
 #include <folly/Traits.h>
+#include <folly/detail/CacheLocality.h>
 #include <folly/detail/Futex.h>
 
 namespace folly {
@@ -80,11 +82,13 @@ template <typename T> class MPMCPipelineStageImpl;
 /// FOLLY_ASSUME_FBVECTOR_COMPATIBLE then your type can be put in
 /// MPMCQueue.
 template<typename T,
-         template<typename> class Atom = std::atomic,
-         typename = typename std::enable_if<
-             std::is_nothrow_constructible<T,T&&>::value ||
-             folly::IsRelocatable<T>::value>::type>
+         template<typename> class Atom = std::atomic>
 class MPMCQueue : boost::noncopyable {
+
+  static_assert(std::is_nothrow_constructible<T,T&&>::value ||
+                folly::IsRelocatable<T>::value,
+      "T must be relocatable or have a noexcept move constructor");
+
   friend class detail::MPMCPipelineStageImpl<T>;
  public:
   typedef T value_type;
@@ -100,7 +104,11 @@ class MPMCQueue : boost::noncopyable {
     , popSpinCutoff_(0)
   {
     // ideally this would be a static assert, but g++ doesn't allow it
-    assert(alignof(MPMCQueue<T,Atom>) >= kFalseSharingRange);
+    assert(alignof(MPMCQueue<T,Atom>)
+           >= detail::CacheLocality::kFalseSharingRange);
+    assert(static_cast<uint8_t*>(static_cast<void*>(&popTicket_))
+           - static_cast<uint8_t*>(static_cast<void*>(&pushTicket_))
+           >= detail::CacheLocality::kFalseSharingRange);
   }
 
   /// A default-constructed queue is useful because a usable (non-zero
@@ -324,27 +332,15 @@ class MPMCQueue : boost::noncopyable {
     /// the proper spin backoff
     kAdaptationFreq = 128,
 
-    /// Memory locations on the same cache line are subject to false
-    /// sharing, which is very bad for performance
-    kFalseSharingRange = 64,
-
     /// To avoid false sharing in slots_ with neighboring memory
     /// allocations, we pad it with this many SingleElementQueue-s at
     /// each end
-    kSlotPadding = 1 +
-        (kFalseSharingRange - 1) / sizeof(detail::SingleElementQueue<T,Atom>)
+    kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1)
+        / sizeof(detail::SingleElementQueue<T,Atom>) + 1
   };
 
-  static_assert(kFalseSharingRange == 64,
-                "FOLLY_ON_NEXT_CACHE_LINE must track kFalseSharingRange");
-
-// This literal "64' should be kFalseSharingRange,
-// but gcc-4.8.0 and 4.8.1 reject it.
-// FIXME: s/64/kFalseSharingRange/ if that ever changes.
-#define FOLLY_ON_NEXT_CACHE_LINE __attribute__((aligned(64)))
-
   /// The maximum number of items in the queue at once
-  size_t capacity_ FOLLY_ON_NEXT_CACHE_LINE;
+  size_t FOLLY_ALIGN_TO_AVOID_FALSE_SHARING capacity_;
 
   /// An array of capacity_ SingleElementQueue-s, each of which holds
   /// either 0 or 1 item.  We over-allocate by 2 * kSlotPadding and don't
@@ -357,24 +353,23 @@ class MPMCQueue : boost::noncopyable {
   int stride_;
 
   /// Enqueuers get tickets from here
-  Atom<uint64_t> pushTicket_ FOLLY_ON_NEXT_CACHE_LINE;
+  Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushTicket_;
 
   /// Dequeuers get tickets from here
-  Atom<uint64_t> popTicket_ FOLLY_ON_NEXT_CACHE_LINE;
+  Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popTicket_;
 
   /// This is how many times we will spin before using FUTEX_WAIT when
   /// the queue is full on enqueue, adaptively computed by occasionally
   /// spinning for longer and smoothing with an exponential moving average
-  Atom<int> pushSpinCutoff_ FOLLY_ON_NEXT_CACHE_LINE;
+  Atom<int> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushSpinCutoff_;
 
   /// The adaptive spin cutoff when the queue is empty on dequeue
-  Atom<int> popSpinCutoff_ FOLLY_ON_NEXT_CACHE_LINE;
+  Atom<int> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING popSpinCutoff_;
 
-  /// Alignment doesn't avoid false sharing at the end of the struct,
+  /// Alignment doesn't prevent false sharing at the end of the struct,
   /// so fill out the last cache line
-  char padding_[kFalseSharingRange - sizeof(Atom<int>)];
+  char padding_[detail::CacheLocality::kFalseSharingRange - sizeof(Atom<int>)];
 
-#undef FOLLY_ON_NEXT_CACHE_LINE
 
   /// We assign tickets in increasing order, but we don't want to
   /// access neighboring elements of slots_ because that will lead to
@@ -771,7 +766,7 @@ struct SingleElementQueue {
                const bool updateSpinCutoff,
                Args&&... args) noexcept {
     sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
-    new (contents_) T(std::forward<Args>(args)...);
+    new (&contents_) T(std::forward<Args>(args)...);
     sequencer_.completeTurn(turn * 2);
   }
 
@@ -789,13 +784,13 @@ struct SingleElementQueue {
     if (std::is_nothrow_constructible<T,T&&>::value) {
       // this is preferred
       sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
-      new (contents_) T(std::move(goner));
+      new (&contents_) T(std::move(goner));
       sequencer_.completeTurn(turn * 2);
     } else {
       // simulate nothrow move with relocation, followed by default
       // construction to fill the gap we created
       sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
-      memcpy(contents_, &goner, sizeof(T));
+      memcpy(&contents_, &goner, sizeof(T));
       sequencer_.completeTurn(turn * 2);
       new (&goner) T();
     }
@@ -818,7 +813,7 @@ struct SingleElementQueue {
         // unlikely, but if we don't complete our turn the queue will die
       }
       sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
-      memcpy(&elem, contents_, sizeof(T));
+      memcpy(&elem, &contents_, sizeof(T));
       sequencer_.completeTurn(turn * 2 + 1);
     } else {
       // use nothrow move assignment
@@ -835,13 +830,13 @@ struct SingleElementQueue {
 
  private:
   /// Storage for a T constructed with placement new
-  char contents_[sizeof(T)] __attribute__((aligned(alignof(T))));
+  typename std::aligned_storage<sizeof(T),alignof(T)>::type contents_;
 
   /// Even turns are pushes, odd turns are pops
   TurnSequencer<Atom> sequencer_;
 
   T* ptr() noexcept {
-    return static_cast<T*>(static_cast<void*>(contents_));
+    return static_cast<T*>(static_cast<void*>(&contents_));
   }
 
   void destroyContents() noexcept {
@@ -851,7 +846,7 @@ struct SingleElementQueue {
       // g++ doesn't seem to have std::is_nothrow_destructible yet
     }
 #ifndef NDEBUG
-    memset(contents_, 'Q', sizeof(T));
+    memset(&contents_, 'Q', sizeof(T));
 #endif
   }
 };