Removed wrong assertion
[libcds.git] / cds / intrusive / mspriority_queue.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
4 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
5
6 #include <mutex>  // std::unique_lock
7 #include <cds/intrusive/details/base.h>
8 #include <cds/sync/spinlock.h>
9 #include <cds/os/thread.h>
10 #include <cds/details/bit_reverse_counter.h>
11 #include <cds/intrusive/options.h>
12 #include <cds/opt/buffer.h>
13 #include <cds/opt/compare.h>
14 #include <cds/details/bounded_container.h>
15
16 namespace cds { namespace intrusive {
17
18     /// MSPriorityQueue related definitions
19     /** @ingroup cds_intrusive_helper
20     */
21     namespace mspriority_queue {
22
23         /// MSPriorityQueue statistics
24         template <typename Counter = cds::atomicity::event_counter>
25         struct stat {
26             typedef Counter   event_counter ; ///< Event counter type
27
28             event_counter   m_nPushCount            ;   ///< Count of success push operation
29             event_counter   m_nPopCount             ;   ///< Count of success pop operation
30             event_counter   m_nPushFailCount        ;   ///< Count of failed ("the queue is full") push operation
31             event_counter   m_nPopFailCount         ;   ///< Count of failed ("the queue is empty") pop operation
32             event_counter   m_nPushHeapifySwapCount ;   ///< Count of item swapping when heapifying in push
33             event_counter   m_nPopHeapifySwapCount  ;   ///< Count of item swapping when heapifying in pop
34
35             //@cond
36             void onPushSuccess()            { ++m_nPushCount            ;}
37             void onPopSuccess()             { ++m_nPopCount             ;}
38             void onPushFailed()             { ++m_nPushFailCount        ;}
39             void onPopFailed()              { ++m_nPopFailCount         ;}
40             void onPushHeapifySwap()        { ++m_nPushHeapifySwapCount ;}
41             void onPopHeapifySwap()         { ++m_nPopHeapifySwapCount  ;}
42             //@endcond
43         };
44
45         /// MSPriorityQueue empty statistics
46         struct empty_stat {
47             //@cond
48             void onPushSuccess()            {}
49             void onPopSuccess()             {}
50             void onPushFailed()             {}
51             void onPopFailed()              {}
52             void onPushHeapifySwap()        {}
53             void onPopHeapifySwap()         {}
54             //@endcond
55         };
56
57         /// MSPriorityQueue traits
58         struct traits {
59             /// Storage type
60             /**
61                 The storage type for the heap array. Default is \p cds::opt::v::dynamic_buffer.
62
63                 You may specify any type of buffer's value since at instantiation time
64                 the \p buffer::rebind member metafunction is called to change type
65                 of values stored in the buffer.
66             */
67             typedef opt::v::dynamic_buffer<void *>  buffer;
68
69             /// Priority compare functor
70             /**
71                 No default functor is provided. If the option is not specified, the \p less is used.
72             */
73             typedef opt::none       compare;
74
75             /// Specifies binary predicate used for priority comparing.
76             /**
77                 Default is \p std::less<T>.
78             */
79             typedef opt::none       less;
80
81             /// Type of mutual-exclusion lock
82             typedef cds::sync::spin lock_type;
83
84             /// Back-off strategy
85             typedef backoff::yield      back_off;
86
87             /// Internal statistics
88             /**
89                 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
90                 or any other with interface like \p %mspriority_queue::stat
91             */
92             typedef empty_stat      stat;
93         };
94
95         /// Metafunction converting option list to traits
96         /**
97             \p Options:
98             - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::static_buffer, \p opt::v::dynamic_buffer.
99                 Default is \p %opt::v::dynamic_buffer.
100                 You may specify any type of values for the buffer since at instantiation time
101                 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
102             - \p opt::compare - priority compare functor. No default functor is provided.
103                 If the option is not specified, the \p opt::less is used.
104             - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
105             - \p opt::lock_type - lock type. Default is \p cds::sync::spin
106             - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
107             - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
108         */
109         template <typename... Options>
110         struct make_traits {
111 #   ifdef CDS_DOXYGEN_INVOKED
112             typedef implementation_defined type ;   ///< Metafunction result
113 #   else
114             typedef typename cds::opt::make_options<
115                 typename cds::opt::find_type_traits< traits, Options... >::type
116                 ,Options...
117             >::type   type;
118 #   endif
119         };
120
121     }   // namespace mspriority_queue
122
123     /// Michael & Scott array-based lock-based concurrent priority queue heap
124     /** @ingroup cds_intrusive_priority_queue
125         Source:
126             - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
127                 "An efficient algorithm for concurrent priority queue heaps"
128
129         \p %MSPriorityQueue augments the standard array-based heap data structure with
130         a mutual-exclusion lock on the heap's size and locks on each node in the heap.
131         Each node also has a tag that indicates whether
132         it is empty, valid, or in a transient state due to an update to the heap
133         by an inserting thread.
134         The algorithm allows concurrent insertions and deletions in opposite directions,
135         without risking deadlock and without the need for special server threads.
136         It also uses a "bit-reversal" technique to scatter accesses across the fringe
137         of the tree to reduce contention.
138         On large heaps the algorithm achieves significant performance improvements
139         over serialized single-lock algorithm, for various insertion/deletion
140         workloads. For small heaps it still performs well, but not as well as
141         single-lock algorithm.
142
143         Template parameters:
144         - \p T - type to be stored in the queue. The priority is a part of \p T type.
145         - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
146             It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
147             metafunction instead of \p Traits template argument.
148     */
149     template <typename T, class Traits = mspriority_queue::traits >
150     class MSPriorityQueue: public cds::bounded_container
151     {
152     public:
153         typedef T           value_type  ;   ///< Value type stored in the queue
154         typedef Traits      traits      ;   ///< Traits template parameter
155
156 #   ifdef CDS_DOXYGEN_INVOKED
157         typedef implementation_defined key_comparator  ;    ///< priority comparing functor based on opt::compare and opt::less option setter.
158 #   else
159         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
160 #   endif
161
162         typedef typename traits::lock_type lock_type       ;   ///< heap's size lock type
163         typedef typename traits::back_off  back_off        ;   ///< Back-off strategy
164         typedef typename traits::stat          stat        ;   ///< internal statistics type
165
166     protected:
167         //@cond
168         typedef cds::OS::ThreadId   tag_type;
169
170         enum tag_value {
171             Available   = -1,
172             Empty       = 0
173         };
174         //@endcond
175
176         //@cond
177         /// Heap item type
178         struct node {
179             value_type *        m_pVal  ;   ///< A value pointer
180             tag_type volatile   m_nTag  ;   ///< A tag
181             mutable lock_type   m_Lock  ;   ///< Node-level lock
182
183             /// Creates empty node
184             node()
185                 : m_pVal( nullptr )
186                 , m_nTag( tag_type(Empty) )
187             {}
188
189             /// Lock the node
190             void lock()
191             {
192                 m_Lock.lock();
193             }
194
195             /// Unlock the node
196             void unlock()
197             {
198                 m_Lock.unlock();
199             }
200         };
201         //@endcond
202
203     public:
204         typedef typename traits::buffer::template rebind<node>::other   buffer_type ;   ///< Heap array buffer type
205
206         //@cond
207         typedef cds::bitop::bit_reverse_counter<>           item_counter_type;
208         typedef typename item_counter_type::counter_type    counter_type;
209         //@endcond
210
211     protected:
212         item_counter_type   m_ItemCounter   ;   ///< Item counter
213         mutable lock_type   m_Lock          ;   ///< Heap's size lock
214         buffer_type         m_Heap          ;   ///< Heap array
215         stat                m_Stat          ;   ///< internal statistics accumulator
216
217     public:
218         /// Constructs empty priority queue
219         /**
220             For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
221         */
222         MSPriorityQueue( size_t nCapacity )
223             : m_Heap( nCapacity )
224         {}
225
226         /// Clears priority queue and destructs the object
227         ~MSPriorityQueue()
228         {
229             clear();
230         }
231
232         /// Inserts a item into priority queue
233         /**
234             If the priority queue is full, the function returns \p false,
235             no item has been added.
236             Otherwise, the function inserts the pointer to \p val into the heap
237             and returns \p true.
238
239             The function does not make a copy of \p val.
240         */
241         bool push( value_type& val )
242         {
243             tag_type const curId = cds::OS::get_current_thread_id();
244
245             // Insert new item at bottom of the heap
246             m_Lock.lock();
247             if ( m_ItemCounter.value() >= capacity() ) {
248                 // the heap is full
249                 m_Lock.unlock();
250                 m_Stat.onPushFailed();
251                 return false;
252             }
253
254             counter_type i = m_ItemCounter.inc();
255             assert( i < m_Heap.capacity() );
256
257             node& refNode = m_Heap[i];
258             refNode.lock();
259             m_Lock.unlock();
260             refNode.m_pVal = &val;
261             refNode.m_nTag = curId;
262             refNode.unlock();
263
264             // Move item towards top of the heap while it has higher priority than parent
265             heapify_after_push( i, curId );
266
267             m_Stat.onPushSuccess();
268             return true;
269         }
270
271         /// Extracts item with high priority
272         /**
273             If the priority queue is empty, the function returns \p nullptr.
274             Otherwise, it returns the item extracted.
275         */
276         value_type * pop()
277         {
278             m_Lock.lock();
279             if ( m_ItemCounter.value() == 0 ) {
280                 // the heap is empty
281                 m_Lock.unlock();
282                 m_Stat.onPopFailed();
283                 return nullptr;
284             }
285             counter_type nBottom = m_ItemCounter.reversed_value();
286             m_ItemCounter.dec();
287             // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
288             // Consequently, "<=" is here
289             assert( nBottom <= capacity() );
290             assert( nBottom > 0 );
291
292             node& refBottom = m_Heap[ nBottom ];
293             refBottom.lock();
294             m_Lock.unlock();
295             refBottom.m_nTag = tag_type(Empty);
296             value_type * pVal = refBottom.m_pVal;
297             refBottom.m_pVal = nullptr;
298             refBottom.unlock();
299
300             node& refTop = m_Heap[ 1 ];
301             refTop.lock();
302             if ( refTop.m_nTag == tag_type(Empty) ) {
303                 // nBottom == nTop
304                 refTop.unlock();
305                 m_Stat.onPopSuccess();
306                 return pVal;
307             }
308
309             std::swap( refTop.m_pVal, pVal );
310             refTop.m_nTag = tag_type( Available );
311
312             // refTop will be unlocked inside heapify_after_pop
313             heapify_after_pop( 1, &refTop );
314
315             m_Stat.onPopSuccess();
316             return pVal;
317         }
318
319         /// Clears the queue (not atomic)
320         /**
321             This function is no atomic, but thread-safe
322         */
323         void clear()
324         {
325             clear_with( []( value_type const& /*src*/ ) {} );
326         }
327
328         /// Clears the queue (not atomic)
329         /**
330             This function is no atomic, but thread-safe.
331
332             For each item removed the functor \p f is called.
333             \p Func interface is:
334             \code
335                 struct clear_functor
336                 {
337                     void operator()( value_type& item );
338                 };
339             \endcode
340             A lambda function or a function pointer can be used as \p f.
341         */
342         template <typename Func>
343         void clear_with( Func f )
344         {
345             while ( !empty() ) {
346                 value_type * pVal = pop();
347                 if ( pVal )
348                     f( *pVal );
349             }
350         }
351
352         /// Checks is the priority queue is empty
353         bool empty() const
354         {
355             return size() == 0;
356         }
357
358         /// Checks if the priority queue is full
359         bool full() const
360         {
361             return size() == capacity();
362         }
363
364         /// Returns current size of priority queue
365         size_t size() const
366         {
367             std::unique_lock<lock_type> l( m_Lock );
368             size_t nSize = (size_t) m_ItemCounter.value();
369             return nSize;
370         }
371
372         /// Return capacity of the priority queue
373         size_t capacity() const
374         {
375             // m_Heap[0] is not used
376             return m_Heap.capacity() - 1;
377         }
378
379         /// Returns const reference to internal statistics
380         stat const& statistics() const
381         {
382             return m_Stat;
383         }
384
385     protected:
386         //@cond
387
388         void heapify_after_push( counter_type i, tag_type curId )
389         {
390             key_comparator  cmp;
391             back_off        bkoff;
392
393             // Move item towards top of the heap while it has higher priority than parent
394             while ( i > 1 ) {
395                 bool bProgress = true;
396                 counter_type nParent = i / 2;
397                 node& refParent = m_Heap[nParent];
398                 refParent.lock();
399                 node& refItem = m_Heap[i];
400                 refItem.lock();
401
402                 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
403                     if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
404                         std::swap( refItem.m_nTag, refParent.m_nTag );
405                         std::swap( refItem.m_pVal, refParent.m_pVal );
406                         m_Stat.onPushHeapifySwap();
407                         i = nParent;
408                     }
409                     else {
410                         refItem.m_nTag = tag_type(Available);
411                         i = 0;
412                     }
413                 }
414                 else if ( refParent.m_nTag == tag_type(Empty) )
415                     i = 0;
416                 else if ( refItem.m_nTag != curId )
417                     i = nParent;
418                 else
419                     bProgress = false;
420
421                 refItem.unlock();
422                 refParent.unlock();
423
424                 if ( !bProgress )
425                     bkoff();
426                 else
427                     bkoff.reset();
428             }
429
430             if ( i == 1 ) {
431                 node& refItem = m_Heap[i];
432                 refItem.lock();
433                 if ( refItem.m_nTag == curId )
434                     refItem.m_nTag = tag_type(Available);
435                 refItem.unlock();
436             }
437         }
438
439         void heapify_after_pop( counter_type nParent, node * pParent )
440         {
441             key_comparator cmp;
442
443             while ( nParent < m_Heap.capacity() / 2 ) {
444                 counter_type nLeft = nParent * 2;
445                 counter_type nRight = nLeft + 1;
446                 node& refLeft = m_Heap[nLeft];
447                 node& refRight = m_Heap[nRight];
448                 refLeft.lock();
449                 refRight.lock();
450
451                 counter_type nChild;
452                 node * pChild;
453                 if ( refLeft.m_nTag == tag_type(Empty) ) {
454                     refRight.unlock();
455                     refLeft.unlock();
456                     break;
457                 }
458                 else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
459                     refRight.unlock();
460                     nChild = nLeft;
461                     pChild = &refLeft;
462                 }
463                 else {
464                     refLeft.unlock();
465                     nChild = nRight;
466                     pChild = &refRight;
467                 }
468
469                 // If child has higher priority that parent then swap
470                 // Otherwise stop
471                 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
472                     std::swap( pParent->m_nTag, pChild->m_nTag );
473                     std::swap( pParent->m_pVal, pChild->m_pVal );
474                     pParent->unlock();
475                     m_Stat.onPopHeapifySwap();
476                     nParent = nChild;
477                     pParent = pChild;
478                 }
479                 else {
480                     pChild->unlock();
481                     break;
482                 }
483             }
484             pParent->unlock();
485         }
486         //@endcond
487     };
488
489 }} // namespace cds::intrusive
490
491 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H