2788fa3a282bde9bc2d82dc1daeacea45d883170
[libcds.git] / cds / intrusive / mspriority_queue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
4 #define __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
5
6 #include <mutex>  // std::unique_lock
7 #include <cds/intrusive/details/base.h>
8 #include <cds/lock/spinlock.h>
9 #include <cds/os/thread.h>
10 #include <cds/details/bit_reverse_counter.h>
11 #include <cds/intrusive/options.h>
12 #include <cds/opt/buffer.h>
13 #include <cds/opt/compare.h>
14 #include <cds/details/bounded_container.h>
15
16 namespace cds { namespace intrusive {
17
18     /// MSPriorityQueue related definitions
19     /** @ingroup cds_intrusive_helper
20     */
21     namespace mspriority_queue {
22
23         /// MSPriorityQueue statistics
24         template <typename Counter = cds::atomicity::event_counter>
25         struct stat {
26             typedef Counter   event_counter ; ///< Event counter type
27
28             event_counter   m_nPushCount            ;   ///< Count of success push operation
29             event_counter   m_nPopCount             ;   ///< Count of success pop operation
30             event_counter   m_nPushFailCount        ;   ///< Count of failed ("the queue is full") push operation
31             event_counter   m_nPopFailCount         ;   ///< Count of failed ("the queue is empty") pop operation
32             event_counter   m_nPushHeapifySwapCount ;   ///< Count of item swapping when heapifying in push
33             event_counter   m_nPopHeapifySwapCount  ;   ///< Count of item swapping when heapifying in pop
34
35             //@cond
36             void onPushSuccess()            { ++m_nPushCount            ;}
37             void onPopSuccess()             { ++m_nPopCount             ;}
38             void onPushFailed()             { ++m_nPushFailCount        ;}
39             void onPopFailed()              { ++m_nPopFailCount         ;}
40             void onPushHeapifySwap()        { ++m_nPushHeapifySwapCount ;}
41             void onPopHeapifySwap()         { ++m_nPopHeapifySwapCount  ;}
42             //@endcond
43         };
44
45         /// MSPriorityQueue empty statistics
46         struct empty_stat {
47             //@cond
48             void onPushSuccess()            {}
49             void onPopSuccess()             {}
50             void onPushFailed()             {}
51             void onPopFailed()              {}
52             void onPushHeapifySwap()        {}
53             void onPopHeapifySwap()         {}
54             //@endcond
55         };
56
57         /// MSPriorityQueue traits
58         struct traits {
59             /// Storage type
60             /**
61                 The storage type for the heap array. Default is \p cds::opt::v::dynamic_buffer.
62
63                 You may specify any type of buffer's value since at instantiation time
64                 the \p buffer::rebind member metafunction is called to change type
65                 of values stored in the buffer.
66             */
67             typedef opt::v::dynamic_buffer<void *>  buffer;
68
69             /// Priority compare functor
70             /**
71                 No default functor is provided. If the option is not specified, the \p less is used.
72             */
73             typedef opt::none           compare;
74
75             /// Specifies binary predicate used for priority comparing.
76             /**
77                 Default is \p std::less<T>.
78             */
79             typedef opt::none           less;
80
81             /// Type of mutual-exclusion lock
82             typedef lock::Spin          lock_type;
83
84             /// Back-off strategy
85             typedef backoff::yield      back_off;
86
87             /// Internal statistics
88             /**
89                 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
90                 or any other with interface like \p %mspriority_queue::stat
91             */
92             typedef empty_stat      stat;
93         };
94
95         /// Metafunction converting option list to traits
96         /**
97             \p Options:
98             - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::static_buffer, \p opt::v::dynamic_buffer.
99                 Default is \p %opt::v::dynamic_buffer.
100                 You may specify any type of values for the buffer since at instantiation time
101                 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
102             - \p opt::compare - priority compare functor. No default functor is provided.
103                 If the option is not specified, the \p opt::less is used.
104             - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
105             - \p opt::lock_type - lock type. Default is \p cds::lock::Spin.
106             - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
107             - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
108         */
109         template <typename... Options>
110         struct make_traits {
111 #   ifdef CDS_DOXYGEN_INVOKED
112             typedef implementation_defined type ;   ///< Metafunction result
113 #   else
114             typedef typename cds::opt::make_options<
115                 typename cds::opt::find_type_traits< traits, Options... >::type
116                 ,Options...
117             >::type   type;
118 #   endif
119         };
120
121     }   // namespace mspriority_queue
122
123     /// Michael & Scott array-based lock-based concurrent priority queue heap
124     /** @ingroup cds_intrusive_priority_queue
125         Source:
126             - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
127                 "An efficient algorithm for concurrent priority queue heaps"
128
129         \p %MSPriorityQueue augments the standard array-based heap data structure with
130         a mutual-exclusion lock on the heap's size and locks on each node in the heap.
131         Each node also has a tag that indicates whether
132         it is empty, valid, or in a transient state due to an update to the heap
133         by an inserting thread.
134         The algorithm allows concurrent insertions and deletions in opposite directions,
135         without risking deadlock and without the need for special server threads.
136         It also uses a "bit-reversal" technique to scatter accesses across the fringe
137         of the tree to reduce contention.
138         On large heaps the algorithm achieves significant performance improvements
139         over serialized single-lock algorithm, for various insertion/deletion
140         workloads. For small heaps it still performs well, but not as well as
141         single-lock algorithm.
142
143         Template parameters:
144         - \p T - type to be stored in the queue. The priority is a part of \p T type.
145         - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
146             It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
147             metafunction instead of \p Traits template argument.
148     */
149     template <typename T, class Traits = mspriority_queue::traits >
150     class MSPriorityQueue: public cds::bounded_container
151     {
152     public:
153         typedef T           value_type  ;   ///< Value type stored in the queue
154         typedef Traits      traits      ;   ///< Traits template parameter
155
156 #   ifdef CDS_DOXYGEN_INVOKED
157         typedef implementation_defined key_comparator  ;    ///< priority comparing functor based on opt::compare and opt::less option setter.
158 #   else
159         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
160 #   endif
161
162         typedef typename traits::lock_type lock_type       ;   ///< heap's size lock type
163         typedef typename traits::back_off  back_off        ;   ///< Back-off strategy
164         typedef typename traits::stat          stat        ;   ///< internal statistics type
165
166     protected:
167         //@cond
168         typedef cds::OS::ThreadId   tag_type;
169
170         enum tag_value {
171             Available   = -1,
172             Empty       = 0
173         };
174         //@endcond
175
176         //@cond
177         /// Heap item type
178         struct node {
179             value_type *        m_pVal  ;   ///< A value pointer
180             tag_type volatile   m_nTag  ;   ///< A tag
181             mutable lock_type   m_Lock  ;   ///< Node-level lock
182
183             /// Creates empty node
184             node()
185                 : m_pVal( nullptr )
186                 , m_nTag( tag_type(Empty) )
187             {}
188
189             /// Lock the node
190             void lock()
191             {
192                 m_Lock.lock();
193             }
194
195             /// Unlock the node
196             void unlock()
197             {
198                 m_Lock.unlock();
199             }
200         };
201         //@endcond
202
203     public:
204         typedef typename traits::buffer::template rebind<node>::other   buffer_type ;   ///< Heap array buffer type
205
206         //@cond
207         typedef cds::bitop::bit_reverse_counter<>           item_counter_type;
208         typedef typename item_counter_type::counter_type    counter_type;
209         //@endcond
210
211     protected:
212         item_counter_type   m_ItemCounter   ;   ///< Item counter
213         mutable lock_type   m_Lock          ;   ///< Heap's size lock
214         buffer_type         m_Heap          ;   ///< Heap array
215         stat                m_Stat          ;   ///< internal statistics accumulator
216
217     public:
218         /// Constructs empty priority queue
219         /**
220             For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
221         */
222         MSPriorityQueue( size_t nCapacity )
223             : m_Heap( nCapacity )
224         {}
225
226         /// Clears priority queue and destructs the object
227         ~MSPriorityQueue()
228         {
229             clear();
230         }
231
232         /// Inserts a item into priority queue
233         /**
234             If the priority queue is full, the function returns \p false,
235             no item has been added.
236             Otherwise, the function inserts the copy of \p val into the heap
237             and returns \p true.
238
239             The function use copy constructor to create new heap item from \p val.
240         */
241         bool push( value_type& val )
242         {
243             tag_type const curId = cds::OS::getCurrentThreadId();
244
245             // Insert new item at bottom of the heap
246             m_Lock.lock();
247             if ( m_ItemCounter.value() >= capacity() ) {
248                 // the heap is full
249                 m_Lock.unlock();
250                 m_Stat.onPushFailed();
251                 return false;
252             }
253
254             counter_type i = m_ItemCounter.inc();
255             assert( i < m_Heap.capacity() );
256
257             node& refNode = m_Heap[i];
258             refNode.lock();
259             m_Lock.unlock();
260             refNode.m_pVal = &val;
261             refNode.m_nTag = curId;
262             refNode.unlock();
263
264             // Move item towards top of the heap while it has higher priority than parent
265             heapify_after_push( i, curId );
266
267             m_Stat.onPushSuccess();
268             return true;
269         }
270
271         /// Extracts item with high priority
272         /**
273             If the priority queue is empty, the function returns \p nullptr.
274             Otherwise, it returns the item extracted.
275
276             The item returned may be disposed immediately.
277         */
278         value_type * pop()
279         {
280             m_Lock.lock();
281             if ( m_ItemCounter.value() == 0 ) {
282                 // the heap is empty
283                 m_Lock.unlock();
284                 m_Stat.onPopFailed();
285                 return nullptr;
286             }
287             counter_type nBottom = m_ItemCounter.reversed_value();
288             m_ItemCounter.dec();
289             // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
290             // Consequently, "<=" is here
291             assert( nBottom <= capacity() );
292             assert( nBottom > 0 );
293
294             node& refBottom = m_Heap[ nBottom ];
295             refBottom.lock();
296             m_Lock.unlock();
297             refBottom.m_nTag = tag_type(Empty);
298             value_type * pVal = refBottom.m_pVal;
299             refBottom.m_pVal = nullptr;
300             refBottom.unlock();
301
302             node& refTop = m_Heap[ 1 ];
303             refTop.lock();
304             if ( refTop.m_nTag == tag_type(Empty) ) {
305                 // nBottom == nTop
306                 refTop.unlock();
307                 m_Stat.onPopSuccess();
308                 return pVal;
309             }
310
311             std::swap( refTop.m_pVal, pVal );
312             refTop.m_nTag = tag_type( Available );
313
314             assert( nBottom > 1 );
315
316             // refTop will be unlocked inside heapify_after_pop
317             heapify_after_pop( 1, &refTop );
318
319             m_Stat.onPopSuccess();
320             return pVal;
321         }
322
323         /// Clears the queue (not atomic)
324         /**
325             This function is no atomic, but thread-safe
326         */
327         void clear()
328         {
329             clear_with( []( value_type const& /*src*/ ) {} );
330         }
331
332         /// Clears the queue (not atomic)
333         /**
334             This function is no atomic, but thread-safe.
335
336             For each item removed the functor \p f is called.
337             \p Func interface is:
338             \code
339                 struct clear_functor
340                 {
341                     void operator()( value_type& item );
342                 };
343             \endcode
344             A lambda function or a function pointer can be used as \p f.
345         */
346         template <typename Func>
347         void clear_with( Func f )
348         {
349             while ( !empty() ) {
350                 value_type * pVal = pop();
351                 if ( pVal )
352                     f( *pVal );
353             }
354         }
355
356         /// Checks is the priority queue is empty
357         bool empty() const
358         {
359             return size() == 0;
360         }
361
362         /// Checks if the priority queue is full
363         bool full() const
364         {
365             return size() == capacity();
366         }
367
368         /// Returns current size of priority queue
369         size_t size() const
370         {
371             std::unique_lock<lock_type> l( m_Lock );
372             size_t nSize = (size_t) m_ItemCounter.value();
373             return nSize;
374         }
375
376         /// Return capacity of the priority queue
377         size_t capacity() const
378         {
379             // m_Heap[0] is not used
380             return m_Heap.capacity() - 1;
381         }
382
383         /// Returns const reference to internal statistics
384         stat const& statistics() const
385         {
386             return m_Stat;
387         }
388
389     protected:
390         //@cond
391
392         void heapify_after_push( counter_type i, tag_type curId )
393         {
394             key_comparator  cmp;
395             back_off        bkoff;
396
397             // Move item towards top of the heap while it has higher priority than parent
398             while ( i > 1 ) {
399                 bool bProgress = true;
400                 counter_type nParent = i / 2;
401                 node& refParent = m_Heap[nParent];
402                 refParent.lock();
403                 node& refItem = m_Heap[i];
404                 refItem.lock();
405
406                 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
407                     if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
408                         std::swap( refItem.m_nTag, refParent.m_nTag );
409                         std::swap( refItem.m_pVal, refParent.m_pVal );
410                         m_Stat.onPushHeapifySwap();
411                         i = nParent;
412                     }
413                     else {
414                         refItem.m_nTag = tag_type(Available);
415                         i = 0;
416                     }
417                 }
418                 else if ( refParent.m_nTag == tag_type(Empty) )
419                     i = 0;
420                 else if ( refItem.m_nTag != curId )
421                     i = nParent;
422                 else
423                     bProgress = false;
424
425                 refItem.unlock();
426                 refParent.unlock();
427
428                 if ( !bProgress )
429                     bkoff();
430                 else
431                     bkoff.reset();
432             }
433
434             if ( i == 1 ) {
435                 node& refItem = m_Heap[i];
436                 refItem.lock();
437                 if ( refItem.m_nTag == curId )
438                     refItem.m_nTag = tag_type(Available);
439                 refItem.unlock();
440             }
441         }
442
443         void heapify_after_pop( counter_type nParent, node * pParent )
444         {
445             key_comparator cmp;
446
447             while ( nParent < m_Heap.capacity() / 2 ) {
448                 counter_type nLeft = nParent * 2;
449                 counter_type nRight = nLeft + 1;
450                 node& refLeft = m_Heap[nLeft];
451                 node& refRight = m_Heap[nRight];
452                 refLeft.lock();
453                 refRight.lock();
454
455                 counter_type nChild;
456                 node * pChild;
457                 if ( refLeft.m_nTag == tag_type(Empty) ) {
458                     refRight.unlock();
459                     refLeft.unlock();
460                     break;
461                 }
462                 else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
463                     refRight.unlock();
464                     nChild = nLeft;
465                     pChild = &refLeft;
466                 }
467                 else {
468                     refLeft.unlock();
469                     nChild = nRight;
470                     pChild = &refRight;
471                 }
472
473                 // If child has higher priority that parent then swap
474                 // Otherwise stop
475                 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
476                     std::swap( pParent->m_nTag, pChild->m_nTag );
477                     std::swap( pParent->m_pVal, pChild->m_pVal );
478                     pParent->unlock();
479                     m_Stat.onPopHeapifySwap();
480                     nParent = nChild;
481                     pParent = pChild;
482                 }
483                 else {
484                     pChild->unlock();
485                     break;
486                 }
487             }
488             pParent->unlock();
489         }
490         //@endcond
491     };
492
493 }} // namespace cds::intrusive
494
495 #endif // #ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H