Add padding option to SegmentedQueue to eliminate false sharing
[libcds.git] / cds / intrusive / segmented_queue.h
index 8ba29ca29c00cc6ce9cdea066b6e3c37b45723f4..3b71512772f3c6dffbfd05e31bdeab4907c08a24 100644 (file)
@@ -92,6 +92,15 @@ namespace cds { namespace intrusive {
             /// Alignment of critical data, default is cache line alignment. See cds::opt::alignment option specification
             enum { alignment = opt::cache_line_alignment };
 
+            /// Padding of segment data, default is no special padding
+            /**
+                The segment is just an array of atomic data pointers,
+                so, the high load leads to false sharing and performance degradation.
+                A padding of segment data can eliminate false sharing issue.
+                On the other hand, the padding leads to increase segment size.
+            */
+            enum { padding = opt::no_special_padding };
+
             /// Segment allocator. Default is \ref CDS_DEFAULT_ALLOCATOR
             typedef CDS_DEFAULT_ALLOCATOR allocator;
 
@@ -122,6 +131,8 @@ namespace cds { namespace intrusive {
             - \p opt::memory_model - memory model, default is \p opt::v::relaxed_ordering.
                 See option description for the full list of possible models
             - \p opt::alignment - the alignment for critical data, see option description for explanation
+            - \p opt::padding - the padding of segment data, default no special padding.
+                See \p traits::padding for explanation.
             - \p opt::allocator - the allocator to be used for maintaining segments.
             - \p opt::lock_type - a mutual exclusion lock type used to maintain internal list of allocated
                 segments. Default is \p cds::opt::Spin, \p std::mutex is also suitable.
@@ -205,19 +216,21 @@ namespace cds { namespace intrusive {
     protected:
         //@cond
         // Segment cell. LSB is used as deleted mark
-        typedef cds::details::marked_ptr< value_type, 1 >   cell;
+        typedef cds::details::marked_ptr< value_type, 1 > regular_cell;
+        typedef atomics::atomic< regular_cell > atomic_cell;
+        typedef typename cds::opt::details::apply_padding< atomic_cell, traits::padding >::type cell;
 
         // Segment
         struct segment: public boost::intrusive::slist_base_hook<>
         {
-            atomics::atomic< cell > *    cells;  // Cell array of size \ref m_nQuasiFactor
-            size_t   version;   // version tag (ABA prevention tag)
+            cell * cells;    // Cell array of size \ref m_nQuasiFactor
+            size_t version;  // version tag (ABA prevention tag)
             // cell array is placed here in one continuous memory block
 
             // Initializes the segment
             segment( size_t nCellCount )
                 // MSVC warning C4355: 'this': used in base member initializer list
-                : cells( reinterpret_cast< atomics::atomic< cell > * >( this + 1 ))
+                : cells( reinterpret_cast< cell *>( this + 1 ))
                 , version( 0 )
             {
                 init( nCellCount );
@@ -227,9 +240,9 @@ namespace cds { namespace intrusive {
 
             void init( size_t nCellCount )
             {
-                atomics::atomic< cell > * pLastCell = cells + nCellCount;
-                for ( atomics::atomic< cell > * pCell = cells; pCell < pLastCell; ++pCell )
-                    pCell->store( cell(), atomics::memory_order_relaxed );
+                cell * pLastCell = cells + nCellCount;
+                for ( cell* pCell = cells; pCell < pLastCell; ++pCell )
+                    pCell->data.store( regular_cell(), atomics::memory_order_relaxed );
                 atomics::atomic_thread_fence( memory_model::memory_order_release );
             }
         };
@@ -300,9 +313,9 @@ namespace cds { namespace intrusive {
             bool populated( segment const& s ) const
             {
                 // The lock should be held
-                atomics::atomic< cell > const * pLastCell = s.cells + quasi_factor();
-                for ( atomics::atomic< cell > const * pCell = s.cells; pCell < pLastCell; ++pCell ) {
-                    if ( !pCell->load( memory_model::memory_order_relaxed ).all() )
+                cell const * pLastCell = s.cells + quasi_factor();
+                for ( cell const * pCell = s.cells; pCell < pLastCell; ++pCell ) {
+                    if ( !pCell->data.load( memory_model::memory_order_relaxed ).all() )
                         return false;
                 }
                 return true;
@@ -310,9 +323,9 @@ namespace cds { namespace intrusive {
             bool exhausted( segment const& s ) const
             {
                 // The lock should be held
-                atomics::atomic< cell > const * pLastCell = s.cells + quasi_factor();
-                for ( atomics::atomic< cell > const * pCell = s.cells; pCell < pLastCell; ++pCell ) {
-                    if ( !pCell->load( memory_model::memory_order_relaxed ).bits() )
+                cell const * pLastCell = s.cells + quasi_factor();
+                for ( cell const * pCell = s.cells; pCell < pLastCell; ++pCell ) {
+                    if ( !pCell->data.load( memory_model::memory_order_relaxed ).bits() )
                         return false;
                 }
                 return true;
@@ -466,14 +479,14 @@ namespace cds { namespace intrusive {
                 do {
                     typename permutation_generator::integer_type i = gen;
                     CDS_DEBUG_ONLY( ++nLoopCount );
-                    if ( pTailSegment->cells[i].load(memory_model::memory_order_relaxed).all() ) {
+                    if ( pTailSegment->cells[i].data.load(memory_model::memory_order_relaxed).all() ) {
                         // Cell is not empty, go next
                         m_Stat.onPushPopulated();
                     }
                     else {
                         // Empty cell found, try to enqueue here
-                        cell nullCell;
-                        if ( pTailSegment->cells[i].compare_exchange_strong( nullCell, cell( &val ),
+                        regular_cell nullCell;
+                        if ( pTailSegment->cells[i].data.compare_exchange_strong( nullCell, regular_cell( &val ),
                             memory_model::memory_order_release, atomics::memory_order_relaxed ))
                         {
                             // Ok to push item
@@ -620,7 +633,7 @@ namespace cds { namespace intrusive {
                 }
 
                 bool bHadNullValue = false;
-                cell item;
+                regular_cell item;
                 CDS_DEBUG_ONLY( size_t nLoopCount = 0 );
                 do {
                     typename permutation_generator::integer_type i = gen;
@@ -629,7 +642,7 @@ namespace cds { namespace intrusive {
                     // Guard the item
                     // In segmented queue the cell cannot be reused
                     // So no loop is needed here to protect the cell
-                    item = pHeadSegment->cells[i].load( memory_model::memory_order_relaxed );
+                    item = pHeadSegment->cells[i].data.load( memory_model::memory_order_relaxed );
                     itemGuard.assign( item.ptr() );
 
                     // Check if this cell is empty, which means an element
@@ -640,7 +653,7 @@ namespace cds { namespace intrusive {
                         // If the item is not deleted yet
                         if ( !item.bits() ) {
                             // Try to mark the cell as deleted
-                            if ( pHeadSegment->cells[i].compare_exchange_strong( item, item | 1,
+                            if ( pHeadSegment->cells[i].data.compare_exchange_strong( item, item | 1,
                                 memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
                             {
                                 --m_ItemCounter;