MSPriorityQueue: revised pop()
authorkhizmax <libcds.dev@gmail.com>
Thu, 28 Jul 2016 21:11:16 +0000 (00:11 +0300)
committerkhizmax <libcds.dev@gmail.com>
Thu, 28 Jul 2016 21:11:16 +0000 (00:11 +0300)
cds/intrusive/mspriority_queue.h
test/stress/pqueue/pop.cpp

index 6d2ddb896fbd8205f74aa780d32ec45de5f6ba74..1cc0e39231a639c67ca04d672c66521d2140f935 100644 (file)
@@ -323,9 +323,7 @@ namespace cds { namespace intrusive {
             }
             counter_type nBottom = m_ItemCounter.reversed_value();
             m_ItemCounter.dec();
-            // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
-            // Consequently, "<=" is here
-            assert( nBottom <= capacity() );
+            assert( nBottom < m_Heap.capacity() );
             assert( nBottom > 0 );
 
             node& refBottom = m_Heap[ nBottom ];
@@ -349,7 +347,7 @@ namespace cds { namespace intrusive {
             refTop.m_nTag = tag_type( Available );
 
             // refTop will be unlocked inside heapify_after_pop
-            heapify_after_pop( 1, &refTop );
+            heapify_after_pop( &refTop );
 
             m_Stat.onPopSuccess();
             return pVal;
@@ -480,34 +478,34 @@ namespace cds { namespace intrusive {
             }
         }
 
-        void heapify_after_pop( counter_type nParent, node * pParent )
+        void heapify_after_pop( node * pParent )
         {
             key_comparator cmp;
+            counter_type const nCapacity = m_Heap.capacity();
 
-            while ( nParent < m_Heap.capacity() / 2 ) {
-                counter_type nLeft = nParent * 2;
-                counter_type nRight = nLeft + 1;
-                node& refLeft = m_Heap[nLeft];
-                node& refRight = m_Heap[nRight];
-                refLeft.lock();
-                refRight.lock();
-
-                counter_type nChild;
-                node * pChild;
-                if ( refLeft.m_nTag == tag_type(Empty) ) {
-                    refRight.unlock();
-                    refLeft.unlock();
+            counter_type nParent = 1;
+            for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) {
+                node* pChild = &m_Heap[ nChild ];
+                pChild->lock();
+
+                if ( pChild->m_nTag == tag_type( Empty )) {
+                    pChild->unlock();
                     break;
                 }
-                else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
-                    refRight.unlock();
-                    nChild = nLeft;
-                    pChild = &refLeft;
-                }
-                else {
-                    refLeft.unlock();
-                    nChild = nRight;
-                    pChild = &refRight;
+
+                counter_type const nRight = nChild + 1;
+                if ( nRight < nCapacity ) {
+                    node& refRight = m_Heap[nRight];
+                    refRight.lock();
+
+                    if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) {
+                        // get right child
+                        pChild->unlock();
+                        nChild = nRight;
+                        pChild = &refRight;
+                    }
+                    else
+                        refRight.unlock();
                 }
 
                 // If child has higher priority that parent then swap
index c9e6823ce7b84db1d020227c770d38b7d0d2f621..acb2fb571cc8a6bc31e57769cc119e090199f59f 100644 (file)
@@ -40,52 +40,6 @@ namespace {
         typedef cds_test::stress_fixture base_class;
 
     protected:
-        template <class PQueue>
-        class Producer: public cds_test::thread
-        {
-            typedef cds_test::thread base_class;
-
-        public:
-            Producer( cds_test::thread_pool& pool, PQueue& queue )
-                : base_class( pool )
-                , m_Queue( queue )
-            {}
-
-            Producer( Producer& src )
-                : base_class( src )
-                , m_Queue( src.m_Queue )
-            {}
-
-            virtual thread * clone()
-            {
-                return new Producer( *this );
-            }
-
-            virtual void test()
-            {
-                typedef typename PQueue::value_type value_type;
-                for ( array_type::const_iterator it = m_arr.begin(); it != m_arr.end(); ++it ) {
-                    if ( !m_Queue.push( value_type( *it ) ))
-                        ++m_nPushError;
-                }
-            }
-
-            void prepare( size_t nStart, size_t nEnd )
-            {
-                m_arr.reserve( nEnd - nStart );
-                for ( size_t i = nStart; i < nEnd; ++i )
-                    m_arr.push_back( i );
-                shuffle( m_arr.begin(), m_arr.end() );
-            }
-
-        public:
-            PQueue&             m_Queue;
-            size_t              m_nPushError = 0;
-
-            typedef std::vector<size_t> array_type;
-            array_type          m_arr;
-        };
-
         template <class PQueue>
         class Consumer: public cds_test::thread
         {
@@ -156,9 +110,6 @@ namespace {
         template <class PQueue>
         void test( PQueue& q )
         {
-            size_t const nThreadItemCount = s_nQueueSize / s_nThreadCount;
-            s_nQueueSize = nThreadItemCount * s_nThreadCount;
-
             cds_test::thread_pool& pool = get_pool();
 
             propout() << std::make_pair( "thread_count", s_nThreadCount )
@@ -166,21 +117,19 @@ namespace {
 
             // push
             {
-                pool.add( new Producer<PQueue>( pool, q ), s_nThreadCount );
+                std::vector< size_t > arr;
+                arr.reserve( s_nQueueSize );
+                for ( size_t i = 0; i < s_nQueueSize; ++i )
+                    arr.push_back( i );
+                shuffle( arr.begin(), arr.end() );
 
-                size_t nStart = 0;
-                for ( size_t i = 0; i < pool.size(); ++i ) {
-                    static_cast<Producer<PQueue>&>( pool.get(i) ).prepare( nStart, nStart + nThreadItemCount );
-                    nStart += nThreadItemCount;
-                }
-
-                std::chrono::milliseconds duration = pool.run();
-                propout() << std::make_pair( "producer_duration", duration );
+                typedef typename PQueue::value_type value_type;
+                for ( auto it = arr.begin(); it != arr.end(); ++it )
+                    q.push( value_type( *it ));
             }
 
             // pop
             {
-                pool.clear();
                 pool.add( new Consumer<PQueue>( pool, q ), s_nThreadCount );
 
                 std::chrono::milliseconds duration = pool.run();
@@ -242,7 +191,7 @@ namespace {
     TEST_F( fixture_t, pqueue_t ) \
     { \
         typedef pqueue::Types<pqueue::simple_value>::pqueue_t pqueue_type; \
-        pqueue_type pq( s_nQueueSize ); \
+        pqueue_type pq( s_nQueueSize + 1 ); \
         test( pq ); \
     }
     CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_dyn_less )