Fixed rare priority inversion bug in MSPriorityQueue
[libcds.git] / cds / intrusive / mspriority_queue.h
index 5d3d0805816cf366660e87f72e379ef145900987..9128fca83a91cfd8e6ae7cc0b04b297d263ae441 100644 (file)
@@ -93,6 +93,34 @@ namespace cds { namespace intrusive {
             //@endcond
         };
 
+        class monotonic_counter
+        {
+        public:
+            typedef size_t counter_type;
+
+            monotonic_counter()
+                : m_nCounter(0)
+            {}
+
+            size_t inc()
+            {
+                return ++m_nCounter;
+            }
+
+            size_t dec()
+            {
+                return m_nCounter--;
+            }
+
+            size_t value() const
+            {
+                return m_nCounter;
+            }
+
+        private:
+            size_t m_nCounter;
+        };
+
         /// MSPriorityQueue traits
         struct traits {
             /// Storage type
@@ -103,7 +131,7 @@ namespace cds { namespace intrusive {
                 the \p buffer::rebind member metafunction is called to change type
                 of values stored in the buffer.
             */
-            typedef opt::v::initialized_dynamic_buffer<void *, CDS_DEFAULT_ALLOCATOR, false>  buffer;
+            typedef opt::v::initialized_dynamic_buffer<void *>  buffer;
 
             /// Priority compare functor
             /**
@@ -117,7 +145,7 @@ namespace cds { namespace intrusive {
             */
             typedef opt::none       less;
 
-            /// Type of mutual-exclusion lock
+            /// Type of mutual-exclusion lock. The lock is not need to be recursive.
             typedef cds::sync::spin lock_type;
 
             /// Back-off strategy
@@ -129,6 +157,12 @@ namespace cds { namespace intrusive {
                 or any other with interface like \p %mspriority_queue::stat
             */
             typedef empty_stat      stat;
+
+            /// Item counter type
+            typedef cds::bitop::bit_reverse_counter<> item_counter;
+
+            /// Fairness
+            static bool const fairness = true;
         };
 
         /// Metafunction converting option list to traits
@@ -243,10 +277,12 @@ namespace cds { namespace intrusive {
         typedef typename traits::buffer::template rebind<node>::other   buffer_type ;   ///< Heap array buffer type
 
         //@cond
-        typedef cds::bitop::bit_reverse_counter<>           item_counter_type;
+        typedef typename traits::item_counter  item_counter_type;
         typedef typename item_counter_type::counter_type    counter_type;
         //@endcond
 
+        static const bool c_bFairQueue = traits::fairness;
+
     protected:
         item_counter_type   m_ItemCounter   ;   ///< Item counter
         mutable lock_type   m_Lock          ;   ///< Heap's size lock
@@ -291,12 +327,7 @@ namespace cds { namespace intrusive {
             }
 
             counter_type i = m_ItemCounter.inc();
-            if ( i >= m_Heap.capacity() ) {
-                // the heap is full
-                m_Lock.unlock();
-                m_Stat.onPushFailed();
-                return false;
-            }
+            assert( i < m_Heap.capacity() );
 
             node& refNode = m_Heap[i];
             refNode.lock();
@@ -321,6 +352,8 @@ namespace cds { namespace intrusive {
         */
         value_type * pop()
         {
+            node& refTop = m_Heap[1];
+
             m_Lock.lock();
             if ( m_ItemCounter.value() == 0 ) {
                 // the heap is empty
@@ -328,12 +361,23 @@ namespace cds { namespace intrusive {
                 m_Stat.onPopFailed();
                 return nullptr;
             }
-            counter_type nBottom = m_ItemCounter.reversed_value();
-            m_ItemCounter.dec();
+            counter_type nBottom = m_ItemCounter.dec();
             assert( nBottom < m_Heap.capacity() );
             assert( nBottom > 0 );
 
-            node& refBottom = m_Heap[ nBottom ];
+            if ( c_bFairQueue ) {
+                refTop.lock();
+                if ( nBottom == 1 ) {
+                    refTop.m_nTag = tag_type( Empty );
+                    value_type * pVal = refTop.m_pVal;
+                    refTop.m_pVal = nullptr;
+                    refTop.unlock();
+                    m_Lock.unlock();
+                    m_Stat.onPopSuccess();
+                    return pVal;
+                }
+            }
+            node& refBottom = m_Heap[nBottom];
             refBottom.lock();
             m_Lock.unlock();
             refBottom.m_nTag = tag_type(Empty);
@@ -341,8 +385,10 @@ namespace cds { namespace intrusive {
             refBottom.m_pVal = nullptr;
             refBottom.unlock();
 
-            node& refTop = m_Heap[ 1 ];
-            refTop.lock();
+            //node& refTop = m_Heap[ 1 ];
+            if ( !c_bFairQueue )
+                refTop.lock();
+
             if ( refTop.m_nTag == tag_type(Empty) ) {
                 // nBottom == nTop
                 refTop.unlock();