X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=cds%2Fintrusive%2Fsegmented_queue.h;h=1a42b33ae9a35026c1c327b765b5642653bfa4d8;hb=40a5453909484139fc270121370575632c8e9e5c;hp=2e41c4e0a464b17ec64812efa1ebcba07311663c;hpb=3d027d83c1f8667ce35681b113ef999f3ea10540;p=libcds.git diff --git a/cds/intrusive/segmented_queue.h b/cds/intrusive/segmented_queue.h index 2e41c4e0..1a42b33a 100644 --- a/cds/intrusive/segmented_queue.h +++ b/cds/intrusive/segmented_queue.h @@ -1,13 +1,41 @@ -//$$CDS-header$$ - -#ifndef __CDS_INTRUSIVE_SEGMENTED_QUEUE_H -#define __CDS_INTRUSIVE_SEGMENTED_QUEUE_H +/* + This file is a part of libcds - Concurrent Data Structures library + + (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016 + + Source code repo: http://github.com/khizmax/libcds/ + Download: http://sourceforge.net/projects/libcds/files/ + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#ifndef CDSLIB_INTRUSIVE_SEGMENTED_QUEUE_H +#define CDSLIB_INTRUSIVE_SEGMENTED_QUEUE_H #include #include #include #include -#include +#include #include #include @@ -92,11 +120,20 @@ namespace cds { namespace intrusive { /// Alignment of critical data, default is cache line alignment. See cds::opt::alignment option specification enum { alignment = opt::cache_line_alignment }; + /// Padding of segment data, default is no special padding + /** + The segment is just an array of atomic data pointers, + so, the high load leads to false sharing and performance degradation. + A padding of segment data can eliminate false sharing issue. + On the other hand, the padding leads to increase segment size. + */ + enum { padding = opt::no_special_padding }; + /// Segment allocator. Default is \ref CDS_DEFAULT_ALLOCATOR typedef CDS_DEFAULT_ALLOCATOR allocator; /// Lock type used to maintain an internal list of allocated segments - typedef cds::lock::Spin lock_type; + typedef cds::sync::spin lock_type; /// Random \ref cds::opt::permutation_generator "permutation generator" for sequence [0, quasi_factor) typedef cds::opt::v::random2_permutation permutation_generator; @@ -115,13 +152,15 @@ namespace cds { namespace intrusive { all other \p %segmented_queue::traits members left unchanged. \p Options are: - - \p opt::disposer - the functor used for dispose removed items. + - \p opt::disposer - the functor used to dispose removed items. - \p opt::stat - internal statistics, possible type: \p segmented_queue::stat, \p segmented_queue::empty_stat (the default) - \p opt::item_counter - item counting feature. Note that \p atomicity::empty_item_counetr is not suitable for segmented queue. - \p opt::memory_model - memory model, default is \p opt::v::relaxed_ordering. See option description for the full list of possible models - \p opt::alignment - the alignment for critical data, see option description for explanation + - \p opt::padding - the padding of segment data, default no special padding. + See \p traits::padding for explanation. - \p opt::allocator - the allocator to be used for maintaining segments. - \p opt::lock_type - a mutual exclusion lock type used to maintain internal list of allocated segments. Default is \p cds::opt::Spin, \p std::mutex is also suitable. @@ -176,7 +215,7 @@ namespace cds { namespace intrusive { quasi factor. This means that the consumer dequeues any item from the current first segment. Template parameters: - - \p GC - a garbage collector, possible types are cds::gc::HP, cds::gc::PTB + - \p GC - a garbage collector, possible types are cds::gc::HP, cds::gc::DHP - \p T - the type of values stored in the queue - \p Traits - queue type traits, default is \p segmented_queue::traits. \p segmented_queue::make_traits metafunction can be used to construct the @@ -200,39 +239,40 @@ namespace cds { namespace intrusive { typedef typename traits::lock_type lock_type; ///< Type of mutex for maintaining an internal list of allocated segments. typedef typename traits::permutation_generator permutation_generator; ///< Random permutation generator for sequence [0, quasi-factor) - static const size_t m_nHazardPtrCount = 2 ; ///< Count of hazard pointer required for the algorithm + static const size_t c_nHazardPtrCount = 2 ; ///< Count of hazard pointer required for the algorithm protected: //@cond // Segment cell. LSB is used as deleted mark - typedef cds::details::marked_ptr< value_type, 1 > cell; + typedef cds::details::marked_ptr< value_type, 1 > regular_cell; + typedef atomics::atomic< regular_cell > atomic_cell; + typedef typename cds::opt::details::apply_padding< atomic_cell, traits::padding >::type cell; // Segment struct segment: public boost::intrusive::slist_base_hook<> { - atomics::atomic< cell > * cells; // Cell array of size \ref m_nQuasiFactor - size_t version; // version tag (ABA prevention tag) + cell * cells; // Cell array of size \ref m_nQuasiFactor + size_t version; // version tag (ABA prevention tag) // cell array is placed here in one continuous memory block // Initializes the segment segment( size_t nCellCount ) // MSVC warning C4355: 'this': used in base member initializer list - : cells( reinterpret_cast< atomics::atomic< cell > * >( this + 1 )) + : cells( reinterpret_cast< cell *>( this + 1 )) , version( 0 ) { init( nCellCount ); } + segment() = delete; + void init( size_t nCellCount ) { - atomics::atomic< cell > * pLastCell = cells + nCellCount; - for ( atomics::atomic< cell > * pCell = cells; pCell < pLastCell; ++pCell ) - pCell->store( cell(), atomics::memory_order_relaxed ); + cell * pLastCell = cells + nCellCount; + for ( cell* pCell = cells; pCell < pLastCell; ++pCell ) + pCell->data.store( regular_cell(), atomics::memory_order_relaxed ); atomics::atomic_thread_fence( memory_model::memory_order_release ); } - - private: - segment(); //=delete }; typedef typename opt::details::alignment_setter< atomics::atomic, traits::alignment >::type aligned_segment_ptr; @@ -301,9 +341,9 @@ namespace cds { namespace intrusive { bool populated( segment const& s ) const { // The lock should be held - atomics::atomic< cell > const * pLastCell = s.cells + quasi_factor(); - for ( atomics::atomic< cell > const * pCell = s.cells; pCell < pLastCell; ++pCell ) { - if ( !pCell->load( memory_model::memory_order_relaxed ).all() ) + cell const * pLastCell = s.cells + quasi_factor(); + for ( cell const * pCell = s.cells; pCell < pLastCell; ++pCell ) { + if ( !pCell->data.load( memory_model::memory_order_relaxed ).all() ) return false; } return true; @@ -311,9 +351,9 @@ namespace cds { namespace intrusive { bool exhausted( segment const& s ) const { // The lock should be held - atomics::atomic< cell > const * pLastCell = s.cells + quasi_factor(); - for ( atomics::atomic< cell > const * pCell = s.cells; pCell < pLastCell; ++pCell ) { - if ( !pCell->load( memory_model::memory_order_relaxed ).bits() ) + cell const * pLastCell = s.cells + quasi_factor(); + for ( cell const * pCell = s.cells; pCell < pLastCell; ++pCell ) { + if ( !pCell->data.load( memory_model::memory_order_relaxed ).bits() ) return false; } return true; @@ -334,13 +374,15 @@ namespace cds { namespace intrusive { return guard.assign( &m_List.back() ); } +# ifdef _DEBUG assert( m_List.empty() || populated( m_List.back() )); +# endif segment * pNew = allocate_segment(); m_Stat.onSegmentCreated(); if ( m_List.empty() ) - m_pHead.store( pNew, memory_model::memory_order_relaxed ); + m_pHead.store( pNew, memory_model::memory_order_release ); m_List.push_back( *pNew ); m_pTail.store( pNew, memory_model::memory_order_release ); return guard.assign( pNew ); @@ -366,7 +408,9 @@ namespace cds { namespace intrusive { return guard.assign( &m_List.front() ); } - assert( exhausted(m_List.front()) ); +# ifdef _DEBUG + assert( exhausted( m_List.front())); +# endif m_List.pop_front(); if ( m_List.empty() ) { @@ -399,8 +443,7 @@ namespace cds { namespace intrusive { segment * allocate_segment() { - return segment_allocator().NewBlock( sizeof(segment) + sizeof(cell) * m_nQuasiFactor, - quasi_factor() ); + return segment_allocator().NewBlock( sizeof(segment) + sizeof(cell) * m_nQuasiFactor, quasi_factor() ); } static void free_segment( segment * pSegment ) @@ -467,14 +510,14 @@ namespace cds { namespace intrusive { do { typename permutation_generator::integer_type i = gen; CDS_DEBUG_ONLY( ++nLoopCount ); - if ( pTailSegment->cells[i].load(memory_model::memory_order_relaxed).all() ) { + if ( pTailSegment->cells[i].data.load(memory_model::memory_order_relaxed).all() ) { // Cell is not empty, go next m_Stat.onPushPopulated(); } else { // Empty cell found, try to enqueue here - cell nullCell; - if ( pTailSegment->cells[i].compare_exchange_strong( nullCell, cell( &val ), + regular_cell nullCell; + if ( pTailSegment->cells[i].data.compare_exchange_strong( nullCell, regular_cell( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed )) { // Ok to push item @@ -560,7 +603,7 @@ namespace cds { namespace intrusive { /// Clear the queue /** - The function repeatedly calls \ref dequeue until it returns \p nullptr. + The function repeatedly calls \p dequeue() until it returns \p nullptr. The disposer specified in \p Traits template argument is called for each removed item. */ void clear() @@ -621,7 +664,7 @@ namespace cds { namespace intrusive { } bool bHadNullValue = false; - cell item; + regular_cell item; CDS_DEBUG_ONLY( size_t nLoopCount = 0 ); do { typename permutation_generator::integer_type i = gen; @@ -630,7 +673,7 @@ namespace cds { namespace intrusive { // Guard the item // In segmented queue the cell cannot be reused // So no loop is needed here to protect the cell - item = pHeadSegment->cells[i].load( memory_model::memory_order_relaxed ); + item = pHeadSegment->cells[i].data.load( memory_model::memory_order_relaxed ); itemGuard.assign( item.ptr() ); // Check if this cell is empty, which means an element @@ -641,7 +684,7 @@ namespace cds { namespace intrusive { // If the item is not deleted yet if ( !item.bits() ) { // Try to mark the cell as deleted - if ( pHeadSegment->cells[i].compare_exchange_strong( item, item | 1, + if ( pHeadSegment->cells[i].data.compare_exchange_strong( item, item | 1, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) { --m_ItemCounter; @@ -679,4 +722,4 @@ namespace cds { namespace intrusive { # pragma warning( pop ) #endif -#endif // #ifndef __CDS_INTRUSIVE_SEGMENTED_QUEUE_H +#endif // #ifndef CDSLIB_INTRUSIVE_SEGMENTED_QUEUE_H