Removed trailing whitespaces
[libcds.git] / cds / intrusive / mspriority_queue.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
4 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
5
6 #include <mutex>  // std::unique_lock
7 #include <cds/intrusive/details/base.h>
8 #include <cds/sync/spinlock.h>
9 #include <cds/os/thread.h>
10 #include <cds/details/bit_reverse_counter.h>
11 #include <cds/intrusive/options.h>
12 #include <cds/opt/buffer.h>
13 #include <cds/opt/compare.h>
14 #include <cds/details/bounded_container.h>
15
16 namespace cds { namespace intrusive {
17
18     /// MSPriorityQueue related definitions
19     /** @ingroup cds_intrusive_helper
20     */
21     namespace mspriority_queue {
22
23         /// MSPriorityQueue statistics
24         template <typename Counter = cds::atomicity::event_counter>
25         struct stat {
26             typedef Counter   event_counter ; ///< Event counter type
27
28             event_counter   m_nPushCount            ;   ///< Count of success push operation
29             event_counter   m_nPopCount             ;   ///< Count of success pop operation
30             event_counter   m_nPushFailCount        ;   ///< Count of failed ("the queue is full") push operation
31             event_counter   m_nPopFailCount         ;   ///< Count of failed ("the queue is empty") pop operation
32             event_counter   m_nPushHeapifySwapCount ;   ///< Count of item swapping when heapifying in push
33             event_counter   m_nPopHeapifySwapCount  ;   ///< Count of item swapping when heapifying in pop
34
35             //@cond
36             void onPushSuccess()            { ++m_nPushCount            ;}
37             void onPopSuccess()             { ++m_nPopCount             ;}
38             void onPushFailed()             { ++m_nPushFailCount        ;}
39             void onPopFailed()              { ++m_nPopFailCount         ;}
40             void onPushHeapifySwap()        { ++m_nPushHeapifySwapCount ;}
41             void onPopHeapifySwap()         { ++m_nPopHeapifySwapCount  ;}
42             //@endcond
43         };
44
45         /// MSPriorityQueue empty statistics
46         struct empty_stat {
47             //@cond
48             void onPushSuccess()            {}
49             void onPopSuccess()             {}
50             void onPushFailed()             {}
51             void onPopFailed()              {}
52             void onPushHeapifySwap()        {}
53             void onPopHeapifySwap()         {}
54             //@endcond
55         };
56
57         /// MSPriorityQueue traits
58         struct traits {
59             /// Storage type
60             /**
61                 The storage type for the heap array. Default is \p cds::opt::v::dynamic_buffer.
62
63                 You may specify any type of buffer's value since at instantiation time
64                 the \p buffer::rebind member metafunction is called to change type
65                 of values stored in the buffer.
66             */
67             typedef opt::v::dynamic_buffer<void *>  buffer;
68
69             /// Priority compare functor
70             /**
71                 No default functor is provided. If the option is not specified, the \p less is used.
72             */
73             typedef opt::none       compare;
74
75             /// Specifies binary predicate used for priority comparing.
76             /**
77                 Default is \p std::less<T>.
78             */
79             typedef opt::none       less;
80
81             /// Type of mutual-exclusion lock
82             typedef cds::sync::spin lock_type;
83
84             /// Back-off strategy
85             typedef backoff::yield      back_off;
86
87             /// Internal statistics
88             /**
89                 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
90                 or any other with interface like \p %mspriority_queue::stat
91             */
92             typedef empty_stat      stat;
93         };
94
95         /// Metafunction converting option list to traits
96         /**
97             \p Options:
98             - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::static_buffer, \p opt::v::dynamic_buffer.
99                 Default is \p %opt::v::dynamic_buffer.
100                 You may specify any type of values for the buffer since at instantiation time
101                 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
102             - \p opt::compare - priority compare functor. No default functor is provided.
103                 If the option is not specified, the \p opt::less is used.
104             - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
105             - \p opt::lock_type - lock type. Default is \p cds::sync::spin
106             - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
107             - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
108         */
109         template <typename... Options>
110         struct make_traits {
111 #   ifdef CDS_DOXYGEN_INVOKED
112             typedef implementation_defined type ;   ///< Metafunction result
113 #   else
114             typedef typename cds::opt::make_options<
115                 typename cds::opt::find_type_traits< traits, Options... >::type
116                 ,Options...
117             >::type   type;
118 #   endif
119         };
120
121     }   // namespace mspriority_queue
122
123     /// Michael & Scott array-based lock-based concurrent priority queue heap
124     /** @ingroup cds_intrusive_priority_queue
125         Source:
126             - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
127                 "An efficient algorithm for concurrent priority queue heaps"
128
129         \p %MSPriorityQueue augments the standard array-based heap data structure with
130         a mutual-exclusion lock on the heap's size and locks on each node in the heap.
131         Each node also has a tag that indicates whether
132         it is empty, valid, or in a transient state due to an update to the heap
133         by an inserting thread.
134         The algorithm allows concurrent insertions and deletions in opposite directions,
135         without risking deadlock and without the need for special server threads.
136         It also uses a "bit-reversal" technique to scatter accesses across the fringe
137         of the tree to reduce contention.
138         On large heaps the algorithm achieves significant performance improvements
139         over serialized single-lock algorithm, for various insertion/deletion
140         workloads. For small heaps it still performs well, but not as well as
141         single-lock algorithm.
142
143         Template parameters:
144         - \p T - type to be stored in the queue. The priority is a part of \p T type.
145         - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
146             It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
147             metafunction instead of \p Traits template argument.
148     */
149     template <typename T, class Traits = mspriority_queue::traits >
150     class MSPriorityQueue: public cds::bounded_container
151     {
152     public:
153         typedef T           value_type  ;   ///< Value type stored in the queue
154         typedef Traits      traits      ;   ///< Traits template parameter
155
156 #   ifdef CDS_DOXYGEN_INVOKED
157         typedef implementation_defined key_comparator  ;    ///< priority comparing functor based on opt::compare and opt::less option setter.
158 #   else
159         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
160 #   endif
161
162         typedef typename traits::lock_type lock_type       ;   ///< heap's size lock type
163         typedef typename traits::back_off  back_off        ;   ///< Back-off strategy
164         typedef typename traits::stat          stat        ;   ///< internal statistics type
165
166     protected:
167         //@cond
168         typedef cds::OS::ThreadId   tag_type;
169
170         enum tag_value {
171             Available   = -1,
172             Empty       = 0
173         };
174         //@endcond
175
176         //@cond
177         /// Heap item type
178         struct node {
179             value_type *        m_pVal  ;   ///< A value pointer
180             tag_type volatile   m_nTag  ;   ///< A tag
181             mutable lock_type   m_Lock  ;   ///< Node-level lock
182
183             /// Creates empty node
184             node()
185                 : m_pVal( nullptr )
186                 , m_nTag( tag_type(Empty) )
187             {}
188
189             /// Lock the node
190             void lock()
191             {
192                 m_Lock.lock();
193             }
194
195             /// Unlock the node
196             void unlock()
197             {
198                 m_Lock.unlock();
199             }
200         };
201         //@endcond
202
203     public:
204         typedef typename traits::buffer::template rebind<node>::other   buffer_type ;   ///< Heap array buffer type
205
206         //@cond
207         typedef cds::bitop::bit_reverse_counter<>           item_counter_type;
208         typedef typename item_counter_type::counter_type    counter_type;
209         //@endcond
210
211     protected:
212         item_counter_type   m_ItemCounter   ;   ///< Item counter
213         mutable lock_type   m_Lock          ;   ///< Heap's size lock
214         buffer_type         m_Heap          ;   ///< Heap array
215         stat                m_Stat          ;   ///< internal statistics accumulator
216
217     public:
218         /// Constructs empty priority queue
219         /**
220             For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
221         */
222         MSPriorityQueue( size_t nCapacity )
223             : m_Heap( nCapacity )
224         {}
225
226         /// Clears priority queue and destructs the object
227         ~MSPriorityQueue()
228         {
229             clear();
230         }
231
232         /// Inserts a item into priority queue
233         /**
234             If the priority queue is full, the function returns \p false,
235             no item has been added.
236             Otherwise, the function inserts the pointer to \p val into the heap
237             and returns \p true.
238
239             The function does not make a copy of \p val.
240         */
241         bool push( value_type& val )
242         {
243             tag_type const curId = cds::OS::get_current_thread_id();
244
245             // Insert new item at bottom of the heap
246             m_Lock.lock();
247             if ( m_ItemCounter.value() >= capacity() ) {
248                 // the heap is full
249                 m_Lock.unlock();
250                 m_Stat.onPushFailed();
251                 return false;
252             }
253
254             counter_type i = m_ItemCounter.inc();
255             assert( i < m_Heap.capacity() );
256
257             node& refNode = m_Heap[i];
258             refNode.lock();
259             m_Lock.unlock();
260             refNode.m_pVal = &val;
261             refNode.m_nTag = curId;
262             refNode.unlock();
263
264             // Move item towards top of the heap while it has higher priority than parent
265             heapify_after_push( i, curId );
266
267             m_Stat.onPushSuccess();
268             return true;
269         }
270
271         /// Extracts item with high priority
272         /**
273             If the priority queue is empty, the function returns \p nullptr.
274             Otherwise, it returns the item extracted.
275         */
276         value_type * pop()
277         {
278             m_Lock.lock();
279             if ( m_ItemCounter.value() == 0 ) {
280                 // the heap is empty
281                 m_Lock.unlock();
282                 m_Stat.onPopFailed();
283                 return nullptr;
284             }
285             counter_type nBottom = m_ItemCounter.reversed_value();
286             m_ItemCounter.dec();
287             // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
288             // Consequently, "<=" is here
289             assert( nBottom <= capacity() );
290             assert( nBottom > 0 );
291
292             node& refBottom = m_Heap[ nBottom ];
293             refBottom.lock();
294             m_Lock.unlock();
295             refBottom.m_nTag = tag_type(Empty);
296             value_type * pVal = refBottom.m_pVal;
297             refBottom.m_pVal = nullptr;
298             refBottom.unlock();
299
300             node& refTop = m_Heap[ 1 ];
301             refTop.lock();
302             if ( refTop.m_nTag == tag_type(Empty) ) {
303                 // nBottom == nTop
304                 refTop.unlock();
305                 m_Stat.onPopSuccess();
306                 return pVal;
307             }
308
309             std::swap( refTop.m_pVal, pVal );
310             refTop.m_nTag = tag_type( Available );
311
312             assert( nBottom > 1 );
313
314             // refTop will be unlocked inside heapify_after_pop
315             heapify_after_pop( 1, &refTop );
316
317             m_Stat.onPopSuccess();
318             return pVal;
319         }
320
321         /// Clears the queue (not atomic)
322         /**
323             This function is no atomic, but thread-safe
324         */
325         void clear()
326         {
327             clear_with( []( value_type const& /*src*/ ) {} );
328         }
329
330         /// Clears the queue (not atomic)
331         /**
332             This function is no atomic, but thread-safe.
333
334             For each item removed the functor \p f is called.
335             \p Func interface is:
336             \code
337                 struct clear_functor
338                 {
339                     void operator()( value_type& item );
340                 };
341             \endcode
342             A lambda function or a function pointer can be used as \p f.
343         */
344         template <typename Func>
345         void clear_with( Func f )
346         {
347             while ( !empty() ) {
348                 value_type * pVal = pop();
349                 if ( pVal )
350                     f( *pVal );
351             }
352         }
353
354         /// Checks is the priority queue is empty
355         bool empty() const
356         {
357             return size() == 0;
358         }
359
360         /// Checks if the priority queue is full
361         bool full() const
362         {
363             return size() == capacity();
364         }
365
366         /// Returns current size of priority queue
367         size_t size() const
368         {
369             std::unique_lock<lock_type> l( m_Lock );
370             size_t nSize = (size_t) m_ItemCounter.value();
371             return nSize;
372         }
373
374         /// Return capacity of the priority queue
375         size_t capacity() const
376         {
377             // m_Heap[0] is not used
378             return m_Heap.capacity() - 1;
379         }
380
381         /// Returns const reference to internal statistics
382         stat const& statistics() const
383         {
384             return m_Stat;
385         }
386
387     protected:
388         //@cond
389
390         void heapify_after_push( counter_type i, tag_type curId )
391         {
392             key_comparator  cmp;
393             back_off        bkoff;
394
395             // Move item towards top of the heap while it has higher priority than parent
396             while ( i > 1 ) {
397                 bool bProgress = true;
398                 counter_type nParent = i / 2;
399                 node& refParent = m_Heap[nParent];
400                 refParent.lock();
401                 node& refItem = m_Heap[i];
402                 refItem.lock();
403
404                 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
405                     if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
406                         std::swap( refItem.m_nTag, refParent.m_nTag );
407                         std::swap( refItem.m_pVal, refParent.m_pVal );
408                         m_Stat.onPushHeapifySwap();
409                         i = nParent;
410                     }
411                     else {
412                         refItem.m_nTag = tag_type(Available);
413                         i = 0;
414                     }
415                 }
416                 else if ( refParent.m_nTag == tag_type(Empty) )
417                     i = 0;
418                 else if ( refItem.m_nTag != curId )
419                     i = nParent;
420                 else
421                     bProgress = false;
422
423                 refItem.unlock();
424                 refParent.unlock();
425
426                 if ( !bProgress )
427                     bkoff();
428                 else
429                     bkoff.reset();
430             }
431
432             if ( i == 1 ) {
433                 node& refItem = m_Heap[i];
434                 refItem.lock();
435                 if ( refItem.m_nTag == curId )
436                     refItem.m_nTag = tag_type(Available);
437                 refItem.unlock();
438             }
439         }
440
441         void heapify_after_pop( counter_type nParent, node * pParent )
442         {
443             key_comparator cmp;
444
445             while ( nParent < m_Heap.capacity() / 2 ) {
446                 counter_type nLeft = nParent * 2;
447                 counter_type nRight = nLeft + 1;
448                 node& refLeft = m_Heap[nLeft];
449                 node& refRight = m_Heap[nRight];
450                 refLeft.lock();
451                 refRight.lock();
452
453                 counter_type nChild;
454                 node * pChild;
455                 if ( refLeft.m_nTag == tag_type(Empty) ) {
456                     refRight.unlock();
457                     refLeft.unlock();
458                     break;
459                 }
460                 else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
461                     refRight.unlock();
462                     nChild = nLeft;
463                     pChild = &refLeft;
464                 }
465                 else {
466                     refLeft.unlock();
467                     nChild = nRight;
468                     pChild = &refRight;
469                 }
470
471                 // If child has higher priority that parent then swap
472                 // Otherwise stop
473                 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
474                     std::swap( pParent->m_nTag, pChild->m_nTag );
475                     std::swap( pParent->m_pVal, pChild->m_pVal );
476                     pParent->unlock();
477                     m_Stat.onPopHeapifySwap();
478                     nParent = nChild;
479                     pParent = pChild;
480                 }
481                 else {
482                     pChild->unlock();
483                     break;
484                 }
485             }
486             pParent->unlock();
487         }
488         //@endcond
489     };
490
491 }} // namespace cds::intrusive
492
493 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H