MSPriorityQueue: fixed typo, add more assertions
[libcds.git] / cds / intrusive / mspriority_queue.h
index 2788fa3a282bde9bc2d82dc1daeacea45d883170..2dda9d37883d976c90f877f035380de414bbe125 100644 (file)
@@ -1,11 +1,39 @@
-//$$CDS-header$$
-
-#ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
-#define __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
+/*
+    This file is a part of libcds - Concurrent Data Structures library
+
+    (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
+
+    Source code repo: http://github.com/khizmax/libcds/
+    Download: http://sourceforge.net/projects/libcds/files/
+    
+    Redistribution and use in source and binary forms, with or without
+    modification, are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright notice, this
+      list of conditions and the following disclaimer.
+
+    * Redistributions in binary form must reproduce the above copyright notice,
+      this list of conditions and the following disclaimer in the documentation
+      and/or other materials provided with the distribution.
+
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+    AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+    IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+    FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+    SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+    OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
+#define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
 
 #include <mutex>  // std::unique_lock
 #include <cds/intrusive/details/base.h>
-#include <cds/lock/spinlock.h>
+#include <cds/sync/spinlock.h>
 #include <cds/os/thread.h>
 #include <cds/details/bit_reverse_counter.h>
 #include <cds/intrusive/options.h>
@@ -25,12 +53,15 @@ namespace cds { namespace intrusive {
         struct stat {
             typedef Counter   event_counter ; ///< Event counter type
 
-            event_counter   m_nPushCount            ;   ///< Count of success push operation
-            event_counter   m_nPopCount             ;   ///< Count of success pop operation
-            event_counter   m_nPushFailCount        ;   ///< Count of failed ("the queue is full") push operation
-            event_counter   m_nPopFailCount         ;   ///< Count of failed ("the queue is empty") pop operation
-            event_counter   m_nPushHeapifySwapCount ;   ///< Count of item swapping when heapifying in push
-            event_counter   m_nPopHeapifySwapCount  ;   ///< Count of item swapping when heapifying in pop
+            event_counter   m_nPushCount;            ///< Count of success push operation
+            event_counter   m_nPopCount;             ///< Count of success pop operation
+            event_counter   m_nPushFailCount;        ///< Count of failed ("the queue is full") push operation
+            event_counter   m_nPopFailCount;         ///< Count of failed ("the queue is empty") pop operation
+            event_counter   m_nPushHeapifySwapCount; ///< Count of item swapping when heapifying in push
+            event_counter   m_nPopHeapifySwapCount;  ///< Count of item swapping when heapifying in pop
+            event_counter   m_nItemMovedTop;         ///< Count of events when \p push() encountered that inserted item was moved to top by a concurrent \p pop()
+            event_counter   m_nItemMovedUp;          ///< Count of events when \p push() encountered that inserted item was moved upwards by a concurrent \p pop()
+            event_counter   m_nPushEmptyPass;        ///< Count of empty pass during heapify via concurrent operations
 
             //@cond
             void onPushSuccess()            { ++m_nPushCount            ;}
@@ -39,18 +70,26 @@ namespace cds { namespace intrusive {
             void onPopFailed()              { ++m_nPopFailCount         ;}
             void onPushHeapifySwap()        { ++m_nPushHeapifySwapCount ;}
             void onPopHeapifySwap()         { ++m_nPopHeapifySwapCount  ;}
+
+            void onItemMovedTop()           { ++m_nItemMovedTop         ;}
+            void onItemMovedUp()            { ++m_nItemMovedUp          ;}
+            void onPushEmptyPass()          { ++m_nPushEmptyPass        ;}
             //@endcond
         };
 
         /// MSPriorityQueue empty statistics
         struct empty_stat {
             //@cond
-            void onPushSuccess()            {}
-            void onPopSuccess()             {}
-            void onPushFailed()             {}
-            void onPopFailed()              {}
-            void onPushHeapifySwap()        {}
-            void onPopHeapifySwap()         {}
+            void onPushSuccess()            const {}
+            void onPopSuccess()             const {}
+            void onPushFailed()             const {}
+            void onPopFailed()              const {}
+            void onPushHeapifySwap()        const {}
+            void onPopHeapifySwap()         const {}
+
+            void onItemMovedTop()           const {}
+            void onItemMovedUp()            const {}
+            void onPushEmptyPass()          const {}
             //@endcond
         };
 
@@ -58,28 +97,28 @@ namespace cds { namespace intrusive {
         struct traits {
             /// Storage type
             /**
-                The storage type for the heap array. Default is \p cds::opt::v::dynamic_buffer.
+                The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer.
 
                 You may specify any type of buffer's value since at instantiation time
                 the \p buffer::rebind member metafunction is called to change type
                 of values stored in the buffer.
             */
-            typedef opt::v::dynamic_buffer<void *>  buffer;
+            typedef opt::v::initialized_dynamic_buffer<void *>  buffer;
 
             /// Priority compare functor
             /**
                 No default functor is provided. If the option is not specified, the \p less is used.
             */
-            typedef opt::none           compare;
+            typedef opt::none       compare;
 
             /// Specifies binary predicate used for priority comparing.
             /**
                 Default is \p std::less<T>.
             */
-            typedef opt::none           less;
+            typedef opt::none       less;
 
             /// Type of mutual-exclusion lock
-            typedef lock::Spin          lock_type;
+            typedef cds::sync::spin lock_type;
 
             /// Back-off strategy
             typedef backoff::yield      back_off;
@@ -95,14 +134,14 @@ namespace cds { namespace intrusive {
         /// Metafunction converting option list to traits
         /**
             \p Options:
-            - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::static_buffer, \p opt::v::dynamic_buffer.
-                Default is \p %opt::v::dynamic_buffer.
-                You may specify any type of values for the buffer since at instantiation time
+            - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer.
+                Default is \p %opt::v::initialized_dynamic_buffer.
+                You may specify any type of value for the buffer since at instantiation time
                 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
             - \p opt::compare - priority compare functor. No default functor is provided.
                 If the option is not specified, the \p opt::less is used.
             - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
-            - \p opt::lock_type - lock type. Default is \p cds::lock::Spin.
+            - \p opt::lock_type - lock type. Default is \p cds::sync::spin
             - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
             - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
         */
@@ -159,9 +198,9 @@ namespace cds { namespace intrusive {
         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
 #   endif
 
-        typedef typename traits::lock_type lock_type       ;   ///< heap's size lock type
-        typedef typename traits::back_off  back_off        ;   ///< Back-off strategy
-        typedef typename traits::stat          stat        ;   ///< internal statistics type
+        typedef typename traits::lock_type lock_type;   ///< heap's size lock type
+        typedef typename traits::back_off  back_off   ///< Back-off strategy
+        typedef typename traits::stat      stat;        ///< internal statistics type
 
     protected:
         //@cond
@@ -217,7 +256,7 @@ namespace cds { namespace intrusive {
     public:
         /// Constructs empty priority queue
         /**
-            For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
+            For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored.
         */
         MSPriorityQueue( size_t nCapacity )
             : m_Heap( nCapacity )
@@ -233,14 +272,14 @@ namespace cds { namespace intrusive {
         /**
             If the priority queue is full, the function returns \p false,
             no item has been added.
-            Otherwise, the function inserts the copy of \p val into the heap
+            Otherwise, the function inserts the pointer to \p val into the heap
             and returns \p true.
 
-            The function use copy constructor to create new heap item from \p val.
+            The function does not make a copy of \p val.
         */
         bool push( value_type& val )
         {
-            tag_type const curId = cds::OS::getCurrentThreadId();
+            tag_type const curId = cds::OS::get_current_thread_id();
 
             // Insert new item at bottom of the heap
             m_Lock.lock();
@@ -257,11 +296,13 @@ namespace cds { namespace intrusive {
             node& refNode = m_Heap[i];
             refNode.lock();
             m_Lock.unlock();
+            assert( refNode.m_nTag == tag_type( Empty ));
+            assert( refNode.m_pVal == nullptr );
             refNode.m_pVal = &val;
             refNode.m_nTag = curId;
             refNode.unlock();
 
-            // Move item towards top of the heap while it has higher priority than parent
+            // Move item towards top of heap while it has a higher priority than its parent
             heapify_after_push( i, curId );
 
             m_Stat.onPushSuccess();
@@ -272,8 +313,6 @@ namespace cds { namespace intrusive {
         /**
             If the priority queue is empty, the function returns \p nullptr.
             Otherwise, it returns the item extracted.
-
-            The item returned may be disposed immediately.
         */
         value_type * pop()
         {
@@ -284,11 +323,8 @@ namespace cds { namespace intrusive {
                 m_Stat.onPopFailed();
                 return nullptr;
             }
-            counter_type nBottom = m_ItemCounter.reversed_value();
-            m_ItemCounter.dec();
-            // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
-            // Consequently, "<=" is here
-            assert( nBottom <= capacity() );
+            counter_type nBottom = m_ItemCounter.dec();
+            assert( nBottom < m_Heap.capacity() );
             assert( nBottom > 0 );
 
             node& refBottom = m_Heap[ nBottom ];
@@ -311,10 +347,8 @@ namespace cds { namespace intrusive {
             std::swap( refTop.m_pVal, pVal );
             refTop.m_nTag = tag_type( Available );
 
-            assert( nBottom > 1 );
-
             // refTop will be unlocked inside heapify_after_pop
-            heapify_after_pop( 1, &refTop );
+            heapify_after_pop( &refTop );
 
             m_Stat.onPopSuccess();
             return pVal;
@@ -369,8 +403,7 @@ namespace cds { namespace intrusive {
         size_t size() const
         {
             std::unique_lock<lock_type> l( m_Lock );
-            size_t nSize = (size_t) m_ItemCounter.value();
-            return nSize;
+            return static_cast<size_t>( m_ItemCounter.value());
         }
 
         /// Return capacity of the priority queue
@@ -415,12 +448,18 @@ namespace cds { namespace intrusive {
                         i = 0;
                     }
                 }
-                else if ( refParent.m_nTag == tag_type(Empty) )
+                else if ( refParent.m_nTag == tag_type( Empty ) ) {
+                    m_Stat.onItemMovedTop();
                     i = 0;
-                else if ( refItem.m_nTag != curId )
+                }
+                else if ( refItem.m_nTag != curId ) {
+                    m_Stat.onItemMovedUp();
                     i = nParent;
-                else
+                }
+                else {
+                    m_Stat.onPushEmptyPass();
                     bProgress = false;
+                }
 
                 refItem.unlock();
                 refParent.unlock();
@@ -440,37 +479,37 @@ namespace cds { namespace intrusive {
             }
         }
 
-        void heapify_after_pop( counter_type nParent, node * pParent )
+        void heapify_after_pop( node * pParent )
         {
             key_comparator cmp;
+            counter_type const nCapacity = m_Heap.capacity();
+
+            counter_type nParent = 1;
+            for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) {
+                node* pChild = &m_Heap[ nChild ];
+                pChild->lock();
 
-            while ( nParent < m_Heap.capacity() / 2 ) {
-                counter_type nLeft = nParent * 2;
-                counter_type nRight = nLeft + 1;
-                node& refLeft = m_Heap[nLeft];
-                node& refRight = m_Heap[nRight];
-                refLeft.lock();
-                refRight.lock();
-
-                counter_type nChild;
-                node * pChild;
-                if ( refLeft.m_nTag == tag_type(Empty) ) {
-                    refRight.unlock();
-                    refLeft.unlock();
+                if ( pChild->m_nTag == tag_type( Empty )) {
+                    pChild->unlock();
                     break;
                 }
-                else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
-                    refRight.unlock();
-                    nChild = nLeft;
-                    pChild = &refLeft;
-                }
-                else {
-                    refLeft.unlock();
-                    nChild = nRight;
-                    pChild = &refRight;
+
+                counter_type const nRight = nChild + 1;
+                if ( nRight < nCapacity ) {
+                    node& refRight = m_Heap[nRight];
+                    refRight.lock();
+
+                    if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) {
+                        // get right child
+                        pChild->unlock();
+                        nChild = nRight;
+                        pChild = &refRight;
+                    }
+                    else
+                        refRight.unlock();
                 }
 
-                // If child has higher priority that parent then swap
+                // If child has higher priority than parent then swap
                 // Otherwise stop
                 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
                     std::swap( pParent->m_nTag, pChild->m_nTag );
@@ -492,4 +531,4 @@ namespace cds { namespace intrusive {
 
 }} // namespace cds::intrusive
 
-#endif // #ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
+#endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H