Replace cds::OS::ThreadId with std::thread::id, cds::OS::nullThreadId() with std...
[libcds.git] / cds / intrusive / mspriority_queue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
4 #define __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
5
6 #include <cds/intrusive/base.h>
7 #include <cds/lock/spinlock.h>
8 #include <cds/os/thread.h>
9 #include <cds/details/bit_reverse_counter.h>
10 #include <cds/intrusive/options.h>
11 #include <cds/opt/buffer.h>
12 #include <cds/opt/compare.h>
13 #include <cds/details/bounded_container.h>
14 #include <cds/ref.h>
15
16 namespace cds { namespace intrusive {
17
18     /// MSPriorityQueue related definitions
19     /** @ingroup cds_intrusive_helper
20     */
21     namespace mspriority_queue {
22
23         /// MSPriorityQueue statistics
24         template <typename Counter = cds::atomicity::event_counter>
25         struct stat {
26             typedef Counter   event_counter ; ///< Event counter type
27
28             event_counter   m_nPushCount            ;   ///< Count of success push operation
29             event_counter   m_nPopCount             ;   ///< Count of success pop operation
30             event_counter   m_nPushFailCount        ;   ///< Count of failed ("the queue is full") push operation
31             event_counter   m_nPopFailCount         ;   ///< Count of failed ("the queue is empty") pop operation
32             event_counter   m_nPushHeapifySwapCount ;   ///< Count of item swapping when heapifying in push
33             event_counter   m_nPopHeapifySwapCount  ;   ///< Count of item swapping when heapifying in pop
34
35             //@cond
36             void onPushSuccess()            { ++m_nPushCount            ;}
37             void onPopSuccess()             { ++m_nPopCount             ;}
38             void onPushFailed()             { ++m_nPushFailCount        ;}
39             void onPopFailed()              { ++m_nPopFailCount         ;}
40             void onPushHeapifySwap()        { ++m_nPushHeapifySwapCount ;}
41             void onPopHeapifySwap()         { ++m_nPopHeapifySwapCount  ;}
42             //@endcond
43         };
44
45         /// MSPriorityQueue empty statistics
46         struct empty_stat {
47             //@cond
48             void onPushSuccess()            {}
49             void onPopSuccess()             {}
50             void onPushFailed()             {}
51             void onPopFailed()              {}
52             void onPushHeapifySwap()        {}
53             void onPopHeapifySwap()         {}
54             //@endcond
55         };
56
57         /// Type traits for MSPriorityQueue
58         struct type_traits {
59             /// Storage type
60             /**
61                 The storage type for the heap array. Default is cds::opt::v::dynamic_buffer.
62
63                 You may specify any type of buffer's value since at instantiation time
64                 the \p buffer::rebind member metafunction is called to change type
65                 of values stored in the buffer.
66             */
67             typedef opt::v::dynamic_buffer<void *>  buffer;
68
69             /// Priority compare functor
70             /**
71                 No default functor is provided. If the option is not specified, the \p less is used.
72             */
73             typedef opt::none           compare;
74
75             /// specifies binary predicate used for priority comparing.
76             /**
77                 Default is \p std::less<T>.
78             */
79             typedef opt::none           less;
80
81             /// Type of mutual-exclusion lock
82             typedef lock::Spin          lock_type;
83
84             /// Back-off strategy
85             typedef backoff::yield      back_off;
86
87             /// Internal statistics
88             /**
89                 Possible types: mspriority_queue::empty_stat (the default), mspriority_queue::stat
90                 or any other with interface like \p %mspriority_queue::stat
91             */
92             typedef empty_stat      stat;
93         };
94
95         /// Metafunction converting option list to traits
96         /**
97             This is a wrapper for <tt> cds::opt::make_options< type_traits, Options...> </tt>
98
99             See \ref MSPriorityQueue, \ref type_traits, \ref cds::opt::make_options.
100         */
101         template <CDS_DECL_OPTIONS7>
102         struct make_traits {
103 #   ifdef CDS_DOXYGEN_INVOKED
104             typedef implementation_defined type ;   ///< Metafunction result
105 #   else
106             typedef typename cds::opt::make_options<
107                 typename cds::opt::find_type_traits< type_traits, CDS_OPTIONS7 >::type
108                 ,CDS_OPTIONS7
109             >::type   type;
110 #   endif
111         };
112
113     }   // namespace mspriority_queue
114
115     /// Michael & Scott array-based lock-based concurrent priority queue heap
116     /** @ingroup cds_intrusive_priority_queue
117         Source:
118             - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
119                 "An efficient algorithm for concurrent priority queue heaps"
120
121         \p %MSPriorityQueue augments the standard array-based heap data structure with
122         a mutual-exclusion lock on the heap's size and locks on each node in the heap.
123         Each node also has a tag that indicates whether
124         it is empty, valid, or in a transient state due to an update to the heap
125         by an inserting thread.
126         The algorithm allows concurrent insertions and deletions in opposite directions,
127         without risking deadlock and without the need for special server threads.
128         It also uses a "bit-reversal" technique to scatter accesses across the fringe
129         of the tree to reduce contention.
130         On large heaps the algorithm achieves significant performance improvements
131         over serialized single-lock algorithm, for various insertion/deletion
132         workloads. For small heaps it still performs well, but not as well as
133         single-lock algorithm.
134
135         Template parameters:
136         - \p T - type to be stored in the list. The priority is a part of \p T type.
137         - \p Traits - type traits. See mspriority_queue::type_traits for explanation.
138
139         It is possible to declare option-based queue with cds::container::mspriority_queue::make_traits
140         metafunction instead of \p Traits template argument.
141         Template argument list \p Options of \p %cds::container::mspriority_queue::make_traits metafunction are:
142         - opt::buffer - the buffer type for heap array. Possible type are: opt::v::static_buffer, opt::v::dynamic_buffer.
143             Default is \p %opt::v::dynamic_buffer.
144             You may specify any type of values for the buffer since at instantiation time
145             the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
146         - opt::compare - priority compare functor. No default functor is provided.
147             If the option is not specified, the opt::less is used.
148         - opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
149         - opt::lock_type - lock type. Default is cds::lock::Spin.
150         - opt::back_off - back-off strategy. Default is cds::backoff::yield
151         - opt::stat - internal statistics. Available types: mspriority_queue::stat, mspriority_queue::empty_stat (the default)
152     */
153     template <typename T, class Traits>
154     class MSPriorityQueue: public cds::bounded_container
155     {
156     public:
157         typedef T           value_type  ;   ///< Value type stored in the queue
158         typedef Traits      traits      ;   ///< Traits template parameter
159
160 #   ifdef CDS_DOXYGEN_INVOKED
161         typedef implementation_defined key_comparator  ;    ///< priority comparing functor based on opt::compare and opt::less option setter.
162 #   else
163         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
164 #   endif
165
166         typedef typename traits::lock_type lock_type       ;   ///< heap's size lock type
167         typedef typename traits::back_off  back_off        ;   ///< Back-off strategy
168         typedef typename traits::stat          stat        ;   ///< internal statistics type
169
170     protected:
171         //@cond
172         typedef std::thread::id tag_type;
173
174         enum tag_value {
175             Available   = -1,
176             Empty       = 0
177         };
178         //@endcond
179
180         //@cond
181         /// Heap item type
182         struct node {
183             value_type *        m_pVal  ;   ///< A value pointer
184             tag_type volatile   m_nTag  ;   ///< A tag
185             mutable lock_type   m_Lock  ;   ///< Node-level lock
186
187             /// Creates empty node
188             node()
189                 : m_pVal( nullptr )
190                 , m_nTag( tag_type(Empty) )
191             {}
192
193             /// Lock the node
194             void lock()
195             {
196                 m_Lock.lock();
197             }
198
199             /// Unlock the node
200             void unlock()
201             {
202                 m_Lock.unlock();
203             }
204         };
205         //@endcond
206
207     protected:
208         //@cond
209 #   ifndef CDS_CXX11_LAMBDA_SUPPORT
210         struct empty_cleaner
211         {
212             void operator()( value_type const& ) const
213             {}
214         };
215 #   endif
216         //@endcond
217
218     public:
219         typedef typename traits::buffer::template rebind<node>::other   buffer_type ;   ///< Heap array buffer type
220
221         //@cond
222         typedef cds::bitop::bit_reverse_counter<>           item_counter_type;
223         typedef typename item_counter_type::counter_type    counter_type;
224         //@endcond
225
226     protected:
227         item_counter_type   m_ItemCounter   ;   ///< Item counter
228         mutable lock_type   m_Lock          ;   ///< Heap's size lock
229         buffer_type         m_Heap          ;   ///< Heap array
230         stat                m_Stat          ;   ///< internal statistics accumulator
231
232     public:
233         /// Constructs empty priority queue
234         /**
235             For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
236         */
237         MSPriorityQueue( size_t nCapacity )
238             : m_Heap( nCapacity )
239         {}
240
241         /// Clears priority queue and destructs the object
242         ~MSPriorityQueue()
243         {
244             clear();
245         }
246
247         /// Inserts a item into priority queue
248         /**
249             If the priority queue is full, the function returns \p false,
250             no item has been added.
251             Otherwise, the function inserts the copy of \p val into the heap
252             and returns \p true.
253
254             The function use copy constructor to create new heap item from \p val.
255         */
256         bool push( value_type& val )
257         {
258             tag_type const curId = std::this_thread::get_id();
259
260             // Insert new item at bottom of the heap
261             m_Lock.lock();
262             if ( m_ItemCounter.value() >= capacity() ) {
263                 // the heap is full
264                 m_Lock.unlock();
265                 m_Stat.onPushFailed();
266                 return false;
267             }
268
269             counter_type i = m_ItemCounter.inc();
270             assert( i < m_Heap.capacity() );
271
272             node& refNode = m_Heap[i];
273             refNode.lock();
274             m_Lock.unlock();
275             refNode.m_pVal = &val;
276             refNode.m_nTag = curId;
277             refNode.unlock();
278
279             // Move item towards top of the heap while it has higher priority than parent
280             heapify_after_push( i, curId );
281
282             m_Stat.onPushSuccess();
283             return true;
284         }
285
286         /// Extracts item with high priority
287         /**
288             If the priority queue is empty, the function returns \p nullptr.
289             Otherwise, it returns the item extracted.
290
291             The item returned may be disposed immediately.
292         */
293         value_type * pop()
294         {
295             m_Lock.lock();
296             if ( m_ItemCounter.value() == 0 ) {
297                 // the heap is empty
298                 m_Lock.unlock();
299                 m_Stat.onPopFailed();
300                 return false;
301             }
302             counter_type nBottom = m_ItemCounter.reversed_value();
303             m_ItemCounter.dec();
304             // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
305             // Consequently, "<=" is here
306             assert( nBottom <= capacity() );
307             assert( nBottom > 0 );
308
309             node& refBottom = m_Heap[ nBottom ];
310             refBottom.lock();
311             m_Lock.unlock();
312             refBottom.m_nTag = tag_type(Empty);
313             value_type * pVal = refBottom.m_pVal;
314             refBottom.m_pVal = nullptr;
315             refBottom.unlock();
316
317             node& refTop = m_Heap[ 1 ];
318             refTop.lock();
319             if ( refTop.m_nTag == tag_type(Empty) ) {
320                 // nBottom == nTop
321                 refTop.unlock();
322                 m_Stat.onPopSuccess();
323                 return pVal;
324             }
325
326             std::swap( refTop.m_pVal, pVal );
327             refTop.m_nTag = tag_type( Available );
328
329             assert( nBottom > 1 );
330
331             // refTop will be unlocked inside heapify_after_pop
332             heapify_after_pop( 1, &refTop );
333
334             m_Stat.onPopSuccess();
335             return pVal;
336         }
337
338         /// Clears the queue (not atomic)
339         /**
340             This function is no atomic, but thread-safe
341         */
342         void clear()
343         {
344 #       ifdef CDS_CXX11_LAMBDA_SUPPORT
345             clear_with( []( value_type const& src ) {} );
346 #       else
347             clear_with( empty_cleaner() );
348 #       endif
349         }
350
351         /// Clears the queue (not atomic)
352         /**
353             This function is no atomic, but thread-safe.
354
355             For each item removed the functor \p f is called.
356             \p Func interface is:
357             \code
358                 struct clear_functor
359                 {
360                     void operator()( value_type& item );
361                 };
362             \endcode
363             A lambda function or a function pointer can be used as \p f.
364         */
365         template <typename Func>
366         void clear_with( Func f )
367         {
368             while ( !empty() ) {
369                 value_type * pVal = pop();
370                 if ( pVal )
371                     cds::unref(f)( *pVal );
372             }
373         }
374
375         /// Checks is the priority queue is empty
376         bool empty() const
377         {
378             return size() == 0;
379         }
380
381         /// Checks if the priority queue is full
382         bool full() const
383         {
384             return size() == capacity();
385         }
386
387         /// Returns current size of priority queue
388         size_t size() const
389         {
390             m_Lock.lock();
391             size_t nSize = (size_t) m_ItemCounter.value();
392             m_Lock.unlock();
393             return nSize;
394         }
395
396         /// Return capacity of the priority queue
397         size_t capacity() const
398         {
399             // m_Heap[0] is not used
400             return m_Heap.capacity() - 1;
401         }
402
403         /// Returns const reference to internal statistics
404         stat const& statistics() const
405         {
406             return m_Stat;
407         }
408
409     protected:
410         //@cond
411
412         void heapify_after_push( counter_type i, tag_type curId )
413         {
414             key_comparator  cmp;
415             back_off        bkoff;
416
417             // Move item towards top of the heap while it has higher priority than parent
418             while ( i > 1 ) {
419                 bool bProgress = true;
420                 counter_type nParent = i / 2;
421                 node& refParent = m_Heap[nParent];
422                 refParent.lock();
423                 node& refItem = m_Heap[i];
424                 refItem.lock();
425
426                 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
427                     if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
428                         std::swap( refItem.m_nTag, refParent.m_nTag );
429                         std::swap( refItem.m_pVal, refParent.m_pVal );
430                         m_Stat.onPushHeapifySwap();
431                         i = nParent;
432                     }
433                     else {
434                         refItem.m_nTag = tag_type(Available);
435                         i = 0;
436                     }
437                 }
438                 else if ( refParent.m_nTag == tag_type(Empty) )
439                     i = 0;
440                 else if ( refItem.m_nTag != curId )
441                     i = nParent;
442                 else
443                     bProgress = false;
444
445                 refItem.unlock();
446                 refParent.unlock();
447
448                 if ( !bProgress )
449                     bkoff();
450                 else
451                     bkoff.reset();
452             }
453
454             if ( i == 1 ) {
455                 node& refItem = m_Heap[i];
456                 refItem.lock();
457                 if ( refItem.m_nTag == curId )
458                     refItem.m_nTag = tag_type(Available);
459                 refItem.unlock();
460             }
461         }
462
463         void heapify_after_pop( counter_type nParent, node * pParent )
464         {
465             key_comparator cmp;
466
467             while ( nParent < m_Heap.capacity() / 2 ) {
468                 counter_type nLeft = nParent * 2;
469                 counter_type nRight = nLeft + 1;
470                 node& refLeft = m_Heap[nLeft];
471                 node& refRight = m_Heap[nRight];
472                 refLeft.lock();
473                 refRight.lock();
474
475                 counter_type nChild;
476                 node * pChild;
477                 if ( refLeft.m_nTag == tag_type(Empty) ) {
478                     refRight.unlock();
479                     refLeft.unlock();
480                     break;
481                 }
482                 else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
483                     refRight.unlock();
484                     nChild = nLeft;
485                     pChild = &refLeft;
486                 }
487                 else {
488                     refLeft.unlock();
489                     nChild = nRight;
490                     pChild = &refRight;
491                 }
492
493                 // If child has higher priority that parent then swap
494                 // Otherwise stop
495                 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
496                     std::swap( pParent->m_nTag, pChild->m_nTag );
497                     std::swap( pParent->m_pVal, pChild->m_pVal );
498                     pParent->unlock();
499                     m_Stat.onPopHeapifySwap();
500                     nParent = nChild;
501                     pParent = pChild;
502                 }
503                 else {
504                     pChild->unlock();
505                     break;
506                 }
507             }
508             pParent->unlock();
509         }
510         //@endcond
511     };
512
513 }} // namespace cds::intrusive
514
515 #endif // #ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H