Remove hash_functor_selector.h, use only std::hash
[libcds.git] / cds / intrusive / mspriority_queue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
4 #define __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
5
6 #include <cds/intrusive/base.h>
7 #include <cds/lock/spinlock.h>
8 #include <cds/os/thread.h>
9 #include <cds/details/bit_reverse_counter.h>
10 #include <cds/intrusive/options.h>
11 #include <cds/opt/buffer.h>
12 #include <cds/opt/compare.h>
13 #include <cds/details/bounded_container.h>
14 #include <cds/ref.h>
15
16 namespace cds { namespace intrusive {
17
18     /// MSPriorityQueue related definitions
19     /** @ingroup cds_intrusive_helper
20     */
21     namespace mspriority_queue {
22
23         /// MSPriorityQueue statistics
24         template <typename Counter = cds::atomicity::event_counter>
25         struct stat {
26             typedef Counter   event_counter ; ///< Event counter type
27
28             event_counter   m_nPushCount            ;   ///< Count of success push operation
29             event_counter   m_nPopCount             ;   ///< Count of success pop operation
30             event_counter   m_nPushFailCount        ;   ///< Count of failed ("the queue is full") push operation
31             event_counter   m_nPopFailCount         ;   ///< Count of failed ("the queue is empty") pop operation
32             event_counter   m_nPushHeapifySwapCount ;   ///< Count of item swapping when heapifying in push
33             event_counter   m_nPopHeapifySwapCount  ;   ///< Count of item swapping when heapifying in pop
34
35             //@cond
36             void onPushSuccess()            { ++m_nPushCount            ;}
37             void onPopSuccess()             { ++m_nPopCount             ;}
38             void onPushFailed()             { ++m_nPushFailCount        ;}
39             void onPopFailed()              { ++m_nPopFailCount         ;}
40             void onPushHeapifySwap()        { ++m_nPushHeapifySwapCount ;}
41             void onPopHeapifySwap()         { ++m_nPopHeapifySwapCount  ;}
42             //@endcond
43         };
44
45         /// MSPriorityQueue empty statistics
46         struct empty_stat {
47             //@cond
48             void onPushSuccess()            {}
49             void onPopSuccess()             {}
50             void onPushFailed()             {}
51             void onPopFailed()              {}
52             void onPushHeapifySwap()        {}
53             void onPopHeapifySwap()         {}
54             //@endcond
55         };
56
57         /// Type traits for MSPriorityQueue
58         struct type_traits {
59             /// Storage type
60             /**
61                 The storage type for the heap array. Default is cds::opt::v::dynamic_buffer.
62
63                 You may specify any type of buffer's value since at instantiation time
64                 the \p buffer::rebind member metafunction is called to change type
65                 of values stored in the buffer.
66             */
67             typedef opt::v::dynamic_buffer<void *>  buffer;
68
69             /// Priority compare functor
70             /**
71                 No default functor is provided. If the option is not specified, the \p less is used.
72             */
73             typedef opt::none           compare;
74
75             /// specifies binary predicate used for priority comparing.
76             /**
77                 Default is \p std::less<T>.
78             */
79             typedef opt::none           less;
80
81             /// Type of mutual-exclusion lock
82             typedef lock::Spin          lock_type;
83
84             /// Back-off strategy
85             typedef backoff::yield      back_off;
86
87             /// Internal statistics
88             /**
89                 Possible types: mspriority_queue::empty_stat (the default), mspriority_queue::stat
90                 or any other with interface like \p %mspriority_queue::stat
91             */
92             typedef empty_stat      stat;
93         };
94
95         /// Metafunction converting option list to traits
96         /**
97             This is a wrapper for <tt> cds::opt::make_options< type_traits, Options...> </tt>
98
99             See \ref MSPriorityQueue, \ref type_traits, \ref cds::opt::make_options.
100         */
101         template <typename... Options>
102         struct make_traits {
103 #   ifdef CDS_DOXYGEN_INVOKED
104             typedef implementation_defined type ;   ///< Metafunction result
105 #   else
106             typedef typename cds::opt::make_options<
107                 typename cds::opt::find_type_traits< type_traits, Options... >::type
108                 ,Options...
109             >::type   type;
110 #   endif
111         };
112
113     }   // namespace mspriority_queue
114
115     /// Michael & Scott array-based lock-based concurrent priority queue heap
116     /** @ingroup cds_intrusive_priority_queue
117         Source:
118             - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
119                 "An efficient algorithm for concurrent priority queue heaps"
120
121         \p %MSPriorityQueue augments the standard array-based heap data structure with
122         a mutual-exclusion lock on the heap's size and locks on each node in the heap.
123         Each node also has a tag that indicates whether
124         it is empty, valid, or in a transient state due to an update to the heap
125         by an inserting thread.
126         The algorithm allows concurrent insertions and deletions in opposite directions,
127         without risking deadlock and without the need for special server threads.
128         It also uses a "bit-reversal" technique to scatter accesses across the fringe
129         of the tree to reduce contention.
130         On large heaps the algorithm achieves significant performance improvements
131         over serialized single-lock algorithm, for various insertion/deletion
132         workloads. For small heaps it still performs well, but not as well as
133         single-lock algorithm.
134
135         Template parameters:
136         - \p T - type to be stored in the list. The priority is a part of \p T type.
137         - \p Traits - type traits. See mspriority_queue::type_traits for explanation.
138
139         It is possible to declare option-based queue with cds::container::mspriority_queue::make_traits
140         metafunction instead of \p Traits template argument.
141         Template argument list \p Options of \p %cds::container::mspriority_queue::make_traits metafunction are:
142         - opt::buffer - the buffer type for heap array. Possible type are: opt::v::static_buffer, opt::v::dynamic_buffer.
143             Default is \p %opt::v::dynamic_buffer.
144             You may specify any type of values for the buffer since at instantiation time
145             the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
146         - opt::compare - priority compare functor. No default functor is provided.
147             If the option is not specified, the opt::less is used.
148         - opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
149         - opt::lock_type - lock type. Default is cds::lock::Spin.
150         - opt::back_off - back-off strategy. Default is cds::backoff::yield
151         - opt::stat - internal statistics. Available types: mspriority_queue::stat, mspriority_queue::empty_stat (the default)
152     */
153     template <typename T, class Traits>
154     class MSPriorityQueue: public cds::bounded_container
155     {
156     public:
157         typedef T           value_type  ;   ///< Value type stored in the queue
158         typedef Traits      traits      ;   ///< Traits template parameter
159
160 #   ifdef CDS_DOXYGEN_INVOKED
161         typedef implementation_defined key_comparator  ;    ///< priority comparing functor based on opt::compare and opt::less option setter.
162 #   else
163         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
164 #   endif
165
166         typedef typename traits::lock_type lock_type       ;   ///< heap's size lock type
167         typedef typename traits::back_off  back_off        ;   ///< Back-off strategy
168         typedef typename traits::stat          stat        ;   ///< internal statistics type
169
170     protected:
171         //@cond
172         typedef cds::OS::ThreadId   tag_type;
173
174         enum tag_value {
175             Available   = -1,
176             Empty       = 0
177         };
178         //@endcond
179
180         //@cond
181         /// Heap item type
182         struct node {
183             value_type *        m_pVal  ;   ///< A value pointer
184             tag_type volatile   m_nTag  ;   ///< A tag
185             mutable lock_type   m_Lock  ;   ///< Node-level lock
186
187             /// Creates empty node
188             node()
189                 : m_pVal( nullptr )
190                 , m_nTag( tag_type(Empty) )
191             {}
192
193             /// Lock the node
194             void lock()
195             {
196                 m_Lock.lock();
197             }
198
199             /// Unlock the node
200             void unlock()
201             {
202                 m_Lock.unlock();
203             }
204         };
205         //@endcond
206
207     public:
208         typedef typename traits::buffer::template rebind<node>::other   buffer_type ;   ///< Heap array buffer type
209
210         //@cond
211         typedef cds::bitop::bit_reverse_counter<>           item_counter_type;
212         typedef typename item_counter_type::counter_type    counter_type;
213         //@endcond
214
215     protected:
216         item_counter_type   m_ItemCounter   ;   ///< Item counter
217         mutable lock_type   m_Lock          ;   ///< Heap's size lock
218         buffer_type         m_Heap          ;   ///< Heap array
219         stat                m_Stat          ;   ///< internal statistics accumulator
220
221     public:
222         /// Constructs empty priority queue
223         /**
224             For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
225         */
226         MSPriorityQueue( size_t nCapacity )
227             : m_Heap( nCapacity )
228         {}
229
230         /// Clears priority queue and destructs the object
231         ~MSPriorityQueue()
232         {
233             clear();
234         }
235
236         /// Inserts a item into priority queue
237         /**
238             If the priority queue is full, the function returns \p false,
239             no item has been added.
240             Otherwise, the function inserts the copy of \p val into the heap
241             and returns \p true.
242
243             The function use copy constructor to create new heap item from \p val.
244         */
245         bool push( value_type& val )
246         {
247             tag_type const curId = cds::OS::getCurrentThreadId();
248
249             // Insert new item at bottom of the heap
250             m_Lock.lock();
251             if ( m_ItemCounter.value() >= capacity() ) {
252                 // the heap is full
253                 m_Lock.unlock();
254                 m_Stat.onPushFailed();
255                 return false;
256             }
257
258             counter_type i = m_ItemCounter.inc();
259             assert( i < m_Heap.capacity() );
260
261             node& refNode = m_Heap[i];
262             refNode.lock();
263             m_Lock.unlock();
264             refNode.m_pVal = &val;
265             refNode.m_nTag = curId;
266             refNode.unlock();
267
268             // Move item towards top of the heap while it has higher priority than parent
269             heapify_after_push( i, curId );
270
271             m_Stat.onPushSuccess();
272             return true;
273         }
274
275         /// Extracts item with high priority
276         /**
277             If the priority queue is empty, the function returns \p nullptr.
278             Otherwise, it returns the item extracted.
279
280             The item returned may be disposed immediately.
281         */
282         value_type * pop()
283         {
284             m_Lock.lock();
285             if ( m_ItemCounter.value() == 0 ) {
286                 // the heap is empty
287                 m_Lock.unlock();
288                 m_Stat.onPopFailed();
289                 return nullptr;
290             }
291             counter_type nBottom = m_ItemCounter.reversed_value();
292             m_ItemCounter.dec();
293             // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
294             // Consequently, "<=" is here
295             assert( nBottom <= capacity() );
296             assert( nBottom > 0 );
297
298             node& refBottom = m_Heap[ nBottom ];
299             refBottom.lock();
300             m_Lock.unlock();
301             refBottom.m_nTag = tag_type(Empty);
302             value_type * pVal = refBottom.m_pVal;
303             refBottom.m_pVal = nullptr;
304             refBottom.unlock();
305
306             node& refTop = m_Heap[ 1 ];
307             refTop.lock();
308             if ( refTop.m_nTag == tag_type(Empty) ) {
309                 // nBottom == nTop
310                 refTop.unlock();
311                 m_Stat.onPopSuccess();
312                 return pVal;
313             }
314
315             std::swap( refTop.m_pVal, pVal );
316             refTop.m_nTag = tag_type( Available );
317
318             assert( nBottom > 1 );
319
320             // refTop will be unlocked inside heapify_after_pop
321             heapify_after_pop( 1, &refTop );
322
323             m_Stat.onPopSuccess();
324             return pVal;
325         }
326
327         /// Clears the queue (not atomic)
328         /**
329             This function is no atomic, but thread-safe
330         */
331         void clear()
332         {
333             clear_with( []( value_type const& src ) {} );
334         }
335
336         /// Clears the queue (not atomic)
337         /**
338             This function is no atomic, but thread-safe.
339
340             For each item removed the functor \p f is called.
341             \p Func interface is:
342             \code
343                 struct clear_functor
344                 {
345                     void operator()( value_type& item );
346                 };
347             \endcode
348             A lambda function or a function pointer can be used as \p f.
349         */
350         template <typename Func>
351         void clear_with( Func f )
352         {
353             while ( !empty() ) {
354                 value_type * pVal = pop();
355                 if ( pVal )
356                     cds::unref(f)( *pVal );
357             }
358         }
359
360         /// Checks is the priority queue is empty
361         bool empty() const
362         {
363             return size() == 0;
364         }
365
366         /// Checks if the priority queue is full
367         bool full() const
368         {
369             return size() == capacity();
370         }
371
372         /// Returns current size of priority queue
373         size_t size() const
374         {
375             m_Lock.lock();
376             size_t nSize = (size_t) m_ItemCounter.value();
377             m_Lock.unlock();
378             return nSize;
379         }
380
381         /// Return capacity of the priority queue
382         size_t capacity() const
383         {
384             // m_Heap[0] is not used
385             return m_Heap.capacity() - 1;
386         }
387
388         /// Returns const reference to internal statistics
389         stat const& statistics() const
390         {
391             return m_Stat;
392         }
393
394     protected:
395         //@cond
396
397         void heapify_after_push( counter_type i, tag_type curId )
398         {
399             key_comparator  cmp;
400             back_off        bkoff;
401
402             // Move item towards top of the heap while it has higher priority than parent
403             while ( i > 1 ) {
404                 bool bProgress = true;
405                 counter_type nParent = i / 2;
406                 node& refParent = m_Heap[nParent];
407                 refParent.lock();
408                 node& refItem = m_Heap[i];
409                 refItem.lock();
410
411                 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
412                     if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
413                         std::swap( refItem.m_nTag, refParent.m_nTag );
414                         std::swap( refItem.m_pVal, refParent.m_pVal );
415                         m_Stat.onPushHeapifySwap();
416                         i = nParent;
417                     }
418                     else {
419                         refItem.m_nTag = tag_type(Available);
420                         i = 0;
421                     }
422                 }
423                 else if ( refParent.m_nTag == tag_type(Empty) )
424                     i = 0;
425                 else if ( refItem.m_nTag != curId )
426                     i = nParent;
427                 else
428                     bProgress = false;
429
430                 refItem.unlock();
431                 refParent.unlock();
432
433                 if ( !bProgress )
434                     bkoff();
435                 else
436                     bkoff.reset();
437             }
438
439             if ( i == 1 ) {
440                 node& refItem = m_Heap[i];
441                 refItem.lock();
442                 if ( refItem.m_nTag == curId )
443                     refItem.m_nTag = tag_type(Available);
444                 refItem.unlock();
445             }
446         }
447
448         void heapify_after_pop( counter_type nParent, node * pParent )
449         {
450             key_comparator cmp;
451
452             while ( nParent < m_Heap.capacity() / 2 ) {
453                 counter_type nLeft = nParent * 2;
454                 counter_type nRight = nLeft + 1;
455                 node& refLeft = m_Heap[nLeft];
456                 node& refRight = m_Heap[nRight];
457                 refLeft.lock();
458                 refRight.lock();
459
460                 counter_type nChild;
461                 node * pChild;
462                 if ( refLeft.m_nTag == tag_type(Empty) ) {
463                     refRight.unlock();
464                     refLeft.unlock();
465                     break;
466                 }
467                 else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
468                     refRight.unlock();
469                     nChild = nLeft;
470                     pChild = &refLeft;
471                 }
472                 else {
473                     refLeft.unlock();
474                     nChild = nRight;
475                     pChild = &refRight;
476                 }
477
478                 // If child has higher priority that parent then swap
479                 // Otherwise stop
480                 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
481                     std::swap( pParent->m_nTag, pChild->m_nTag );
482                     std::swap( pParent->m_pVal, pChild->m_pVal );
483                     pParent->unlock();
484                     m_Stat.onPopHeapifySwap();
485                     nParent = nChild;
486                     pParent = pChild;
487                 }
488                 else {
489                     pChild->unlock();
490                     break;
491                 }
492             }
493             pParent->unlock();
494         }
495         //@endcond
496     };
497
498 }} // namespace cds::intrusive
499
500 #endif // #ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H