X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=cds%2Fintrusive%2Fbasket_queue.h;h=890d33f50111591555056b2024ea617a24f2081d;hb=b07c16add99c6681f46a7e49f40088a85306e265;hp=4c26872b8bc924912e42a02bc107a26fedf5eae9;hpb=3caf0699346f9cb714809664697aa29efc7eb429;p=libcds.git diff --git a/cds/intrusive/basket_queue.h b/cds/intrusive/basket_queue.h index 4c26872b..890d33f5 100644 --- a/cds/intrusive/basket_queue.h +++ b/cds/intrusive/basket_queue.h @@ -1,4 +1,32 @@ -//$$CDS-header$$ +/* + This file is a part of libcds - Concurrent Data Structures library + + (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017 + + Source code repo: http://github.com/khizmax/libcds/ + Download: http://sourceforge.net/projects/libcds/files/ + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ #ifndef CDSLIB_INTRUSIVE_BASKET_QUEUE_H #define CDSLIB_INTRUSIVE_BASKET_QUEUE_H @@ -228,28 +256,27 @@ namespace cds { namespace intrusive { /// Link checking, see \p cds::opt::link_checker static CDS_CONSTEXPR const opt::link_check_type link_checker = opt::debug_check_link; - /// Alignment for internal queue data. Default is \p opt::cache_line_alignment - enum { alignment = opt::cache_line_alignment }; + /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding + enum { padding = opt::cache_line_padding }; }; /// Metafunction converting option list to \p basket_queue::traits /** Supported \p Options are: - - - opt::hook - hook used. Possible hooks are: \p basket_queue::base_hook, \p basket_queue::member_hook, \p basket_queue::traits_hook. + - \p opt::hook - hook used. Possible hooks are: \p basket_queue::base_hook, \p basket_queue::member_hook, \p basket_queue::traits_hook. If the option is not specified, \p %basket_queue::base_hook<> is used. - - opt::back_off - back-off strategy used, default is \p cds::backoff::empty. - - opt::disposer - the functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used + - \p opt::back_off - back-off strategy used, default is \p cds::backoff::empty. + - \p opt::disposer - the functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used when dequeuing. - - opt::link_checker - the type of node's link fields checking. Default is \p opt::debug_check_link - - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled) + - \p opt::link_checker - the type of node's link fields checking. Default is \p opt::debug_check_link + - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled) To enable item counting use \p cds::atomicity::item_counter - - opt::stat - the type to gather internal statistics. + - \p opt::stat - the type to gather internal statistics. Possible statistics types are: \p basket_queue::stat, \p basket_queue::empty_stat, user-provided class that supports \p %basket_queue::stat interface. Default is \p %basket_queue::empty_stat (internal statistics disabled). - - opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment - - opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default) + - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding + - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default) or \p opt::v::sequential_consistent (sequentially consisnent memory model). Example: declare \p %BasketQueue with item counting and internal statistics @@ -285,14 +312,14 @@ namespace cds { namespace intrusive { Key idea - In the “basket” approach, instead of + In the 'basket' approach, instead of the traditional ordered list of nodes, the queue consists of an ordered list of groups of nodes (logical baskets). The order of nodes in each basket need not be specified, and in fact, it is easiest to maintain them in FIFO order. The baskets fulfill the following basic rules: - - Each basket has a time interval in which all its nodes’ enqueue operations overlap. + - Each basket has a time interval in which all its nodes' enqueue operations overlap. - The baskets are ordered by the order of their respective time intervals. - - For each basket, its nodes’ dequeue operations occur after its time interval. + - For each basket, its nodes' dequeue operations occur after its time interval. - The dequeue operations are performed according to the order of baskets. Two properties define the FIFO order of nodes: @@ -301,7 +328,7 @@ namespace cds { namespace intrusive { In algorithms such as the MS-queue or optimistic queue, threads enqueue items by applying a Compare-and-swap (CAS) operation to the - queue’s tail pointer, and all the threads that fail on a particular CAS operation (and also + queue's tail pointer, and all the threads that fail on a particular CAS operation (and also the winner of that CAS) overlap in time. In particular, they share the time interval of the CAS operation itself. Hence, all the threads that fail to CAS on the tail-node of the queue may be inserted into the same basket. By integrating the basket-mechanism @@ -309,13 +336,13 @@ namespace cds { namespace intrusive { onto the new tail, can now be utilized to insert the failed operations into the basket, allowing enqueues to complete sooner. In the meantime, the next successful CAS operations by enqueues allow new baskets to be formed down the list, and these can be - filled concurrently. Moreover, the failed operations don’t retry their link attempt on the + filled concurrently. Moreover, the failed operations don't retry their link attempt on the new tail, lowering the overall contention on it. This leads to a queue algorithm that unlike all former concurrent queue algorithms requires virtually no tuning of the backoff mechanisms to reduce contention, making the algorithm an attractive out-of-the-box queue. - In order to enqueue, just as in MSQueue, a thread first tries to link the new node to + In order to enqueue, just as in \p MSQueue, a thread first tries to link the new node to the last node. If it failed to do so, then another thread has already succeeded. Thus it tries to insert the new node into the new basket that was created by the winner thread. To dequeue a node, a thread first reads the head of the queue to obtain the @@ -379,7 +406,7 @@ namespace cds { namespace intrusive { // BasketQueue with Hazard Pointer garbage collector, // member hook + item disposer + item counter, - // without alignment of internal queue data: + // without padding of internal queue data: struct Bar { // Your data @@ -397,7 +424,7 @@ namespace cds { namespace intrusive { > ,ci::opt::disposer< fooDisposer > ,cds::opt::item_counter< cds::atomicity::item_counter > - ,cds::opt::alignment< cds::opt::no_special_alignment > + ,cds::opt::padding< cds::opt::no_special_padding > >::type {}; typedef ci::BasketQueue< hp_gc, Bar, barTraits > barQueue; @@ -427,24 +454,29 @@ namespace cds { namespace intrusive { typedef BasketQueue< GC2, T2, Traits2> other ; ///< Rebinding result }; - static CDS_CONSTEXPR const size_t m_nHazardPtrCount = 6 ; ///< Count of hazard pointer required for the algorithm + static CDS_CONSTEXPR const size_t c_nHazardPtrCount = 6 ; ///< Count of hazard pointer required for the algorithm protected: //@cond typedef typename node_type::marked_ptr marked_ptr; typedef typename node_type::atomic_marked_ptr atomic_marked_ptr; - typedef intrusive::node_to_value node_to_value; - typedef typename opt::details::alignment_setter< atomic_marked_ptr, traits::alignment >::type aligned_node_ptr; - typedef typename opt::details::alignment_setter< node_type, traits::alignment >::type dummy_node_type; - // GC and node_type::gc must be the same static_assert( std::is_same::value, "GC and node_type::gc must be the same"); //@endcond - aligned_node_ptr m_pHead ; ///< Queue's head pointer (aligned) - aligned_node_ptr m_pTail ; ///< Queue's tail pointer (aligned) - dummy_node_type m_Dummy ; ///< dummy node + atomic_marked_ptr m_pHead ; ///< Queue's head pointer (aligned) + //@cond + typename opt::details::apply_padding< atomic_marked_ptr, traits::padding >::padding_type pad1_; + //@endcond + atomic_marked_ptr m_pTail ; ///< Queue's tail pointer (aligned) + //@cond + typename opt::details::apply_padding< atomic_marked_ptr, traits::padding >::padding_type pad2_; + //@endcond + node_type m_Dummy ; ///< dummy node + //@cond + typename opt::details::apply_padding< node_type, traits::padding >::padding_type pad3_; + //@endcond item_counter m_ItemCounter ; ///< Item counter stat m_Stat ; ///< Internal statistics //@cond @@ -453,31 +485,6 @@ namespace cds { namespace intrusive { //@cond - template - static marked_ptr guard_node( typename gc::template GuardArray& g, size_t idx, atomic_marked_ptr const& p ) - { - marked_ptr pg; - while ( true ) { - pg = p.load( memory_model::memory_order_relaxed ); - g.assign( idx, node_traits::to_value_ptr( pg.ptr() ) ); - if ( p.load( memory_model::memory_order_acquire) == pg ) { - return pg; - } - } - } - - static marked_ptr guard_node( typename gc::Guard& g, atomic_marked_ptr const& p ) - { - marked_ptr pg; - while ( true ) { - pg = p.load( memory_model::memory_order_relaxed ); - g.assign( node_traits::to_value_ptr( pg.ptr() ) ); - if ( p.load( memory_model::memory_order_acquire) == pg ) { - return pg; - } - } - } - struct dequeue_result { typename gc::template GuardArray<3> guards; node_type * pNext; @@ -495,13 +502,13 @@ namespace cds { namespace intrusive { marked_ptr pNext; while ( true ) { - h = guard_node( res.guards, 0, m_pHead ); - t = guard_node( res.guards, 1, m_pTail ); - pNext = guard_node( res.guards, 2, h->m_pNext ); + h = res.guards.protect( 0, m_pHead, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());}); + t = res.guards.protect( 1, m_pTail, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());}); + pNext = res.guards.protect( 2, h->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());}); - if ( h == m_pHead.load( memory_model::memory_order_acquire ) ) { - if ( h.ptr() == t.ptr() ) { - if ( !pNext.ptr() ) { + if ( h == m_pHead.load( memory_model::memory_order_acquire )) { + if ( h.ptr() == t.ptr()) { + if ( !pNext.ptr()) { m_Stat.onEmptyDequeue(); return false; } @@ -509,12 +516,12 @@ namespace cds { namespace intrusive { { typename gc::Guard g; while ( pNext->m_pNext.load(memory_model::memory_order_relaxed).ptr() && m_pTail.load(memory_model::memory_order_relaxed) == t ) { - pNext = guard_node( g, pNext->m_pNext ); - res.guards.assign( 2, g.template get() ); + pNext = g.protect( pNext->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());}); + res.guards.copy( 2, g ); } } - m_pTail.compare_exchange_weak( t, marked_ptr(pNext.ptr()), memory_model::memory_order_release, memory_model::memory_order_relaxed ); + m_pTail.compare_exchange_weak( t, marked_ptr(pNext.ptr()), memory_model::memory_order_acquire, atomics::memory_order_relaxed ); } else { marked_ptr iter( h ); @@ -524,20 +531,20 @@ namespace cds { namespace intrusive { while ( pNext.ptr() && pNext.bits() && iter.ptr() != t.ptr() && m_pHead.load(memory_model::memory_order_relaxed) == h ) { iter = pNext; - g.assign( res.guards.template get(2) ); - pNext = guard_node( res.guards, 2, pNext->m_pNext ); + g.assign( res.guards.template get(2)); + pNext = res.guards.protect( 2, pNext->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());}); ++hops; } if ( m_pHead.load(memory_model::memory_order_relaxed) != h ) continue; - if ( iter.ptr() == t.ptr() ) + if ( iter.ptr() == t.ptr()) free_chain( h, iter ); else if ( bDeque ) { res.pNext = pNext.ptr(); - if ( iter->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNext.ptr(), 1 ), memory_model::memory_order_release, memory_model::memory_order_relaxed ) ) { + if ( iter->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNext.ptr(), 1 ), memory_model::memory_order_acquire, atomics::memory_order_relaxed )) { if ( hops >= m_nMaxHops ) free_chain( h, pNext ); break; @@ -565,15 +572,15 @@ namespace cds { namespace intrusive { { // "head" and "newHead" are guarded - if ( m_pHead.compare_exchange_strong( head, marked_ptr(newHead.ptr()), memory_model::memory_order_release, memory_model::memory_order_relaxed )) + if ( m_pHead.compare_exchange_strong( head, marked_ptr(newHead.ptr()), memory_model::memory_order_release, atomics::memory_order_relaxed )) { typename gc::template GuardArray<2> guards; - guards.assign( 0, node_traits::to_value_ptr(head.ptr()) ); - while ( head.ptr() != newHead.ptr() ) { - marked_ptr pNext = guard_node( guards, 1, head->m_pNext ); + guards.assign( 0, node_traits::to_value_ptr(head.ptr())); + while ( head.ptr() != newHead.ptr()) { + marked_ptr pNext = guards.protect( 1, head->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());}); assert( pNext.bits() != 0 ); - dispose_node( head.ptr() ); - guards.assign( 0, guards.template get(1) ); + dispose_node( head.ptr()); + guards.copy( 0, 1 ); head = pNext; } } @@ -592,11 +599,11 @@ namespace cds { namespace intrusive { void operator()( value_type * p ) { assert( p != nullptr ); - BasketQueue::clear_links( node_traits::to_node_ptr( p ) ); + BasketQueue::clear_links( node_traits::to_node_ptr( p )); disposer()(p); } }; - gc::template retire( node_traits::to_value_ptr(p) ); + gc::template retire( node_traits::to_value_ptr(p)); } } //@endcond @@ -649,18 +656,19 @@ namespace cds { namespace intrusive { link_checker::is_empty( pNew ); typename gc::Guard guard; + typename gc::Guard gNext; back_off bkoff; marked_ptr t; while ( true ) { - t = guard_node( guard, m_pTail ); + t = guard.protect( m_pTail, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());}); marked_ptr pNext = t->m_pNext.load(memory_model::memory_order_acquire ); if ( pNext.ptr() == nullptr ) { - pNew->m_pNext.store( marked_ptr(), memory_model::memory_order_release ); - if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr(pNew), memory_model::memory_order_release, memory_model::memory_order_relaxed ) ) { - if ( !m_pTail.compare_exchange_strong( t, marked_ptr(pNew), memory_model::memory_order_release, memory_model::memory_order_relaxed )) + pNew->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed ); + if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr(pNew), memory_model::memory_order_release, atomics::memory_order_relaxed )) { + if ( !m_pTail.compare_exchange_strong( t, marked_ptr(pNew), memory_model::memory_order_release, atomics::memory_order_acquire )) m_Stat.onAdvanceTailFailed(); break; } @@ -669,19 +677,17 @@ namespace cds { namespace intrusive { m_Stat.onTryAddBasket(); // Reread tail next - typename gc::Guard gNext; - try_again: - pNext = guard_node( gNext, t->m_pNext ); + pNext = gNext.protect( t->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());}); // add to the basket - if ( m_pTail.load(memory_model::memory_order_relaxed) == t + if ( m_pTail.load(memory_model::memory_order_acquire) == t && t->m_pNext.load( memory_model::memory_order_relaxed) == pNext - && !pNext.bits() ) + && !pNext.bits()) { bkoff(); - pNew->m_pNext.store( pNext, memory_model::memory_order_release ); - if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNew ), memory_model::memory_order_release, memory_model::memory_order_relaxed )) { + pNew->m_pNext.store( pNext, memory_model::memory_order_relaxed ); + if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNew ), memory_model::memory_order_release, atomics::memory_order_relaxed )) { m_Stat.onAddBasket(); break; } @@ -692,8 +698,12 @@ namespace cds { namespace intrusive { // Tail is misplaced, advance it typename gc::template GuardArray<2> g; - g.assign( 0, node_traits::to_value_ptr( pNext.ptr() ) ); - if ( t->m_pNext.load( memory_model::memory_order_relaxed ) != pNext ) { + g.assign( 0, node_traits::to_value_ptr( pNext.ptr())); + if ( m_pTail.load( memory_model::memory_order_acquire ) != t + + || t->m_pNext.load( memory_model::memory_order_relaxed ) != pNext ) + + { m_Stat.onEnqueueRace(); bkoff(); continue; @@ -703,17 +713,17 @@ namespace cds { namespace intrusive { bool bTailOk = true; while ( (p = pNext->m_pNext.load( memory_model::memory_order_relaxed )).ptr() != nullptr ) { - bTailOk = m_pTail.load( memory_model::memory_order_relaxed ) == t; + bTailOk = m_pTail.load( memory_model::memory_order_acquire ) == t; if ( !bTailOk ) break; - g.assign( 1, node_traits::to_value_ptr( p.ptr() )); - if ( pNext->m_pNext.load(memory_model::memory_order_relaxed) != p ) + g.assign( 1, node_traits::to_value_ptr( p.ptr())); + if ( pNext->m_pNext.load(memory_model::memory_order_acquire) != p ) continue; pNext = p; - g.assign( 0, g.template get( 1 ) ); + g.assign( 0, g.template get( 1 )); } - if ( !bTailOk || !m_pTail.compare_exchange_weak( t, marked_ptr( pNext.ptr() ), memory_model::memory_order_release, memory_model::memory_order_relaxed )) + if ( !bTailOk || !m_pTail.compare_exchange_weak( t, marked_ptr( pNext.ptr()), memory_model::memory_order_release, atomics::memory_order_relaxed )) m_Stat.onAdvanceTailFailed(); m_Stat.onBadTail(); @@ -775,7 +785,7 @@ namespace cds { namespace intrusive { */ void clear() { - while ( dequeue() ); + while ( dequeue()); } /// Returns queue's item count