Fixed priority inversion bug in MSPriorityQueue
[libcds.git] / cds / intrusive / mspriority_queue.h
index 9128fca83a91cfd8e6ae7cc0b04b297d263ae441..fb920a8cd0f8be7f141c2b5712d066ad81f2ad2d 100644 (file)
@@ -93,8 +93,10 @@ namespace cds { namespace intrusive {
             //@endcond
         };
 
+        /// Monotonic item counter, see \p traits::item_counter for explanation
         class monotonic_counter
         {
+        //@cond
         public:
             typedef size_t counter_type;
 
@@ -119,6 +121,7 @@ namespace cds { namespace intrusive {
 
         private:
             size_t m_nCounter;
+        //@endcond
         };
 
         /// MSPriorityQueue traits
@@ -159,10 +162,18 @@ namespace cds { namespace intrusive {
             typedef empty_stat      stat;
 
             /// Item counter type
+            /**
+                Two type are possible:
+                - \p cds::bitop::bit_reverse_counter - a counter described in <a href="http://www.research.ibm.com/people/m/michael/ipl-1996.pdf">original paper</a>,
+                  which was developed for reducing lock contention. However, bit-reversing technigue requires more memory than classic heapifying algorithm
+                  because of sparsing of elements: for priority queue of max size \p N the bit-reversing technique requires array size up to 2<sup>K</sup>
+                  where \p K - the nearest power of two such that <tt>2<sup>K</sup> >= N</tt>.
+                - \p mspriority_queue::monotonic_counter - a classic monotonic item counter. This counter can lead to false sharing under high contention.
+                  By the other hand, for priority queue of max size \p N it requires \p N array size.
+
+                By default, \p MSPriorityQueue uses \p %cds::bitop::bit_reverse_counter as described in original paper.
+            */
             typedef cds::bitop::bit_reverse_counter<> item_counter;
-
-            /// Fairness
-            static bool const fairness = true;
         };
 
         /// Metafunction converting option list to traits
@@ -178,6 +189,8 @@ namespace cds { namespace intrusive {
             - \p opt::lock_type - lock type. Default is \p cds::sync::spin
             - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
             - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
+            - \p opt::item_counter - an item counter type for \p MSPriorityQueue. 
+                 Available type: \p cds::bitop::bit_reverse_counter, \p mspriority_queue::monotonic_counter. See \p traits::item_counter for details.
         */
         template <typename... Options>
         struct make_traits {
@@ -232,9 +245,10 @@ namespace cds { namespace intrusive {
         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
 #   endif
 
-        typedef typename traits::lock_type lock_type;   ///< heap's size lock type
-        typedef typename traits::back_off  back_off;    ///< Back-off strategy
-        typedef typename traits::stat      stat;        ///< internal statistics type
+        typedef typename traits::lock_type      lock_type;   ///< heap's size lock type
+        typedef typename traits::back_off       back_off;    ///< Back-off strategy
+        typedef typename traits::stat           stat;        ///< internal statistics type, see \p mspriority_queue::traits::stat
+        typedef typename traits::item_counter   item_counter;///< Item counter type, see \p mspriority_queue::traits::item_counter
 
     protected:
         //@cond
@@ -277,14 +291,11 @@ namespace cds { namespace intrusive {
         typedef typename traits::buffer::template rebind<node>::other   buffer_type ;   ///< Heap array buffer type
 
         //@cond
-        typedef typename traits::item_counter  item_counter_type;
-        typedef typename item_counter_type::counter_type    counter_type;
+        typedef typename item_counter::counter_type    counter_type;
         //@endcond
 
-        static const bool c_bFairQueue = traits::fairness;
-
     protected:
-        item_counter_type   m_ItemCounter   ;   ///< Item counter
+        item_counter        m_ItemCounter   ;   ///< Item counter
         mutable lock_type   m_Lock          ;   ///< Heap's size lock
         buffer_type         m_Heap          ;   ///< Heap array
         stat                m_Stat          ;   ///< internal statistics accumulator
@@ -365,18 +376,17 @@ namespace cds { namespace intrusive {
             assert( nBottom < m_Heap.capacity() );
             assert( nBottom > 0 );
 
-            if ( c_bFairQueue ) {
-                refTop.lock();
-                if ( nBottom == 1 ) {
-                    refTop.m_nTag = tag_type( Empty );
-                    value_type * pVal = refTop.m_pVal;
-                    refTop.m_pVal = nullptr;
-                    refTop.unlock();
-                    m_Lock.unlock();
-                    m_Stat.onPopSuccess();
-                    return pVal;
-                }
+            refTop.lock();
+            if ( nBottom == 1 ) {
+                refTop.m_nTag = tag_type( Empty );
+                value_type * pVal = refTop.m_pVal;
+                refTop.m_pVal = nullptr;
+                refTop.unlock();
+                m_Lock.unlock();
+                m_Stat.onPopSuccess();
+                return pVal;
             }
+
             node& refBottom = m_Heap[nBottom];
             refBottom.lock();
             m_Lock.unlock();
@@ -385,10 +395,6 @@ namespace cds { namespace intrusive {
             refBottom.m_pVal = nullptr;
             refBottom.unlock();
 
-            //node& refTop = m_Heap[ 1 ];
-            if ( !c_bFairQueue )
-                refTop.lock();
-
             if ( refTop.m_nTag == tag_type(Empty) ) {
                 // nBottom == nTop
                 refTop.unlock();