From 4eae12e6aab7b1f964d4e7ad475c34631256e1a1 Mon Sep 17 00:00:00 2001 From: khizmax Date: Fri, 29 Jul 2016 00:11:16 +0300 Subject: [PATCH] MSPriorityQueue: revised pop() --- cds/intrusive/mspriority_queue.h | 52 ++++++++++++------------ test/stress/pqueue/pop.cpp | 69 +++++--------------------------- 2 files changed, 34 insertions(+), 87 deletions(-) diff --git a/cds/intrusive/mspriority_queue.h b/cds/intrusive/mspriority_queue.h index 6d2ddb89..1cc0e392 100644 --- a/cds/intrusive/mspriority_queue.h +++ b/cds/intrusive/mspriority_queue.h @@ -323,9 +323,7 @@ namespace cds { namespace intrusive { } counter_type nBottom = m_ItemCounter.reversed_value(); m_ItemCounter.dec(); - // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1 - // Consequently, "<=" is here - assert( nBottom <= capacity() ); + assert( nBottom < m_Heap.capacity() ); assert( nBottom > 0 ); node& refBottom = m_Heap[ nBottom ]; @@ -349,7 +347,7 @@ namespace cds { namespace intrusive { refTop.m_nTag = tag_type( Available ); // refTop will be unlocked inside heapify_after_pop - heapify_after_pop( 1, &refTop ); + heapify_after_pop( &refTop ); m_Stat.onPopSuccess(); return pVal; @@ -480,34 +478,34 @@ namespace cds { namespace intrusive { } } - void heapify_after_pop( counter_type nParent, node * pParent ) + void heapify_after_pop( node * pParent ) { key_comparator cmp; + counter_type const nCapacity = m_Heap.capacity(); - while ( nParent < m_Heap.capacity() / 2 ) { - counter_type nLeft = nParent * 2; - counter_type nRight = nLeft + 1; - node& refLeft = m_Heap[nLeft]; - node& refRight = m_Heap[nRight]; - refLeft.lock(); - refRight.lock(); - - counter_type nChild; - node * pChild; - if ( refLeft.m_nTag == tag_type(Empty) ) { - refRight.unlock(); - refLeft.unlock(); + counter_type nParent = 1; + for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) { + node* pChild = &m_Heap[ nChild ]; + pChild->lock(); + + if ( pChild->m_nTag == tag_type( Empty )) { + pChild->unlock(); break; } - else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) { - refRight.unlock(); - nChild = nLeft; - pChild = &refLeft; - } - else { - refLeft.unlock(); - nChild = nRight; - pChild = &refRight; + + counter_type const nRight = nChild + 1; + if ( nRight < nCapacity ) { + node& refRight = m_Heap[nRight]; + refRight.lock(); + + if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) { + // get right child + pChild->unlock(); + nChild = nRight; + pChild = &refRight; + } + else + refRight.unlock(); } // If child has higher priority that parent then swap diff --git a/test/stress/pqueue/pop.cpp b/test/stress/pqueue/pop.cpp index c9e6823c..acb2fb57 100644 --- a/test/stress/pqueue/pop.cpp +++ b/test/stress/pqueue/pop.cpp @@ -40,52 +40,6 @@ namespace { typedef cds_test::stress_fixture base_class; protected: - template - class Producer: public cds_test::thread - { - typedef cds_test::thread base_class; - - public: - Producer( cds_test::thread_pool& pool, PQueue& queue ) - : base_class( pool ) - , m_Queue( queue ) - {} - - Producer( Producer& src ) - : base_class( src ) - , m_Queue( src.m_Queue ) - {} - - virtual thread * clone() - { - return new Producer( *this ); - } - - virtual void test() - { - typedef typename PQueue::value_type value_type; - for ( array_type::const_iterator it = m_arr.begin(); it != m_arr.end(); ++it ) { - if ( !m_Queue.push( value_type( *it ) )) - ++m_nPushError; - } - } - - void prepare( size_t nStart, size_t nEnd ) - { - m_arr.reserve( nEnd - nStart ); - for ( size_t i = nStart; i < nEnd; ++i ) - m_arr.push_back( i ); - shuffle( m_arr.begin(), m_arr.end() ); - } - - public: - PQueue& m_Queue; - size_t m_nPushError = 0; - - typedef std::vector array_type; - array_type m_arr; - }; - template class Consumer: public cds_test::thread { @@ -156,9 +110,6 @@ namespace { template void test( PQueue& q ) { - size_t const nThreadItemCount = s_nQueueSize / s_nThreadCount; - s_nQueueSize = nThreadItemCount * s_nThreadCount; - cds_test::thread_pool& pool = get_pool(); propout() << std::make_pair( "thread_count", s_nThreadCount ) @@ -166,21 +117,19 @@ namespace { // push { - pool.add( new Producer( pool, q ), s_nThreadCount ); + std::vector< size_t > arr; + arr.reserve( s_nQueueSize ); + for ( size_t i = 0; i < s_nQueueSize; ++i ) + arr.push_back( i ); + shuffle( arr.begin(), arr.end() ); - size_t nStart = 0; - for ( size_t i = 0; i < pool.size(); ++i ) { - static_cast&>( pool.get(i) ).prepare( nStart, nStart + nThreadItemCount ); - nStart += nThreadItemCount; - } - - std::chrono::milliseconds duration = pool.run(); - propout() << std::make_pair( "producer_duration", duration ); + typedef typename PQueue::value_type value_type; + for ( auto it = arr.begin(); it != arr.end(); ++it ) + q.push( value_type( *it )); } // pop { - pool.clear(); pool.add( new Consumer( pool, q ), s_nThreadCount ); std::chrono::milliseconds duration = pool.run(); @@ -242,7 +191,7 @@ namespace { TEST_F( fixture_t, pqueue_t ) \ { \ typedef pqueue::Types::pqueue_t pqueue_type; \ - pqueue_type pq( s_nQueueSize ); \ + pqueue_type pq( s_nQueueSize + 1 ); \ test( pq ); \ } CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_dyn_less ) -- 2.34.1