5d3d0805816cf366660e87f72e379ef145900987
[libcds.git] / cds / intrusive / mspriority_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
32 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
33
34 #include <mutex>  // std::unique_lock
35 #include <cds/intrusive/details/base.h>
36 #include <cds/sync/spinlock.h>
37 #include <cds/os/thread.h>
38 #include <cds/details/bit_reverse_counter.h>
39 #include <cds/intrusive/options.h>
40 #include <cds/opt/buffer.h>
41 #include <cds/opt/compare.h>
42 #include <cds/details/bounded_container.h>
43
44 namespace cds { namespace intrusive {
45
46     /// MSPriorityQueue related definitions
47     /** @ingroup cds_intrusive_helper
48     */
49     namespace mspriority_queue {
50
51         /// MSPriorityQueue statistics
52         template <typename Counter = cds::atomicity::event_counter>
53         struct stat {
54             typedef Counter   event_counter ; ///< Event counter type
55
56             event_counter   m_nPushCount;            ///< Count of success push operation
57             event_counter   m_nPopCount;             ///< Count of success pop operation
58             event_counter   m_nPushFailCount;        ///< Count of failed ("the queue is full") push operation
59             event_counter   m_nPopFailCount;         ///< Count of failed ("the queue is empty") pop operation
60             event_counter   m_nPushHeapifySwapCount; ///< Count of item swapping when heapifying in push
61             event_counter   m_nPopHeapifySwapCount;  ///< Count of item swapping when heapifying in pop
62             event_counter   m_nItemMovedTop;         ///< Count of events when \p push() encountered that inserted item was moved to top by a concurrent \p pop()
63             event_counter   m_nItemMovedUp;          ///< Count of events when \p push() encountered that inserted item was moved upwards by a concurrent \p pop()
64             event_counter   m_nPushEmptyPass;        ///< Count of empty pass during heapify via concurrent operations
65
66             //@cond
67             void onPushSuccess()            { ++m_nPushCount            ;}
68             void onPopSuccess()             { ++m_nPopCount             ;}
69             void onPushFailed()             { ++m_nPushFailCount        ;}
70             void onPopFailed()              { ++m_nPopFailCount         ;}
71             void onPushHeapifySwap()        { ++m_nPushHeapifySwapCount ;}
72             void onPopHeapifySwap()         { ++m_nPopHeapifySwapCount  ;}
73
74             void onItemMovedTop()           { ++m_nItemMovedTop         ;}
75             void onItemMovedUp()            { ++m_nItemMovedUp          ;}
76             void onPushEmptyPass()          { ++m_nPushEmptyPass        ;}
77             //@endcond
78         };
79
80         /// MSPriorityQueue empty statistics
81         struct empty_stat {
82             //@cond
83             void onPushSuccess()            const {}
84             void onPopSuccess()             const {}
85             void onPushFailed()             const {}
86             void onPopFailed()              const {}
87             void onPushHeapifySwap()        const {}
88             void onPopHeapifySwap()         const {}
89
90             void onItemMovedTop()           const {}
91             void onItemMovedUp()            const {}
92             void onPushEmptyPass()          const {}
93             //@endcond
94         };
95
96         /// MSPriorityQueue traits
97         struct traits {
98             /// Storage type
99             /**
100                 The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer.
101
102                 You may specify any type of buffer's value since at instantiation time
103                 the \p buffer::rebind member metafunction is called to change type
104                 of values stored in the buffer.
105             */
106             typedef opt::v::initialized_dynamic_buffer<void *, CDS_DEFAULT_ALLOCATOR, false>  buffer;
107
108             /// Priority compare functor
109             /**
110                 No default functor is provided. If the option is not specified, the \p less is used.
111             */
112             typedef opt::none       compare;
113
114             /// Specifies binary predicate used for priority comparing.
115             /**
116                 Default is \p std::less<T>.
117             */
118             typedef opt::none       less;
119
120             /// Type of mutual-exclusion lock
121             typedef cds::sync::spin lock_type;
122
123             /// Back-off strategy
124             typedef backoff::yield      back_off;
125
126             /// Internal statistics
127             /**
128                 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
129                 or any other with interface like \p %mspriority_queue::stat
130             */
131             typedef empty_stat      stat;
132         };
133
134         /// Metafunction converting option list to traits
135         /**
136             \p Options:
137             - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer.
138                 Default is \p %opt::v::initialized_dynamic_buffer.
139                 You may specify any type of value for the buffer since at instantiation time
140                 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
141             - \p opt::compare - priority compare functor. No default functor is provided.
142                 If the option is not specified, the \p opt::less is used.
143             - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
144             - \p opt::lock_type - lock type. Default is \p cds::sync::spin
145             - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
146             - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
147         */
148         template <typename... Options>
149         struct make_traits {
150 #   ifdef CDS_DOXYGEN_INVOKED
151             typedef implementation_defined type ;   ///< Metafunction result
152 #   else
153             typedef typename cds::opt::make_options<
154                 typename cds::opt::find_type_traits< traits, Options... >::type
155                 ,Options...
156             >::type   type;
157 #   endif
158         };
159
160     }   // namespace mspriority_queue
161
162     /// Michael & Scott array-based lock-based concurrent priority queue heap
163     /** @ingroup cds_intrusive_priority_queue
164         Source:
165             - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
166                 "An efficient algorithm for concurrent priority queue heaps"
167
168         \p %MSPriorityQueue augments the standard array-based heap data structure with
169         a mutual-exclusion lock on the heap's size and locks on each node in the heap.
170         Each node also has a tag that indicates whether
171         it is empty, valid, or in a transient state due to an update to the heap
172         by an inserting thread.
173         The algorithm allows concurrent insertions and deletions in opposite directions,
174         without risking deadlock and without the need for special server threads.
175         It also uses a "bit-reversal" technique to scatter accesses across the fringe
176         of the tree to reduce contention.
177         On large heaps the algorithm achieves significant performance improvements
178         over serialized single-lock algorithm, for various insertion/deletion
179         workloads. For small heaps it still performs well, but not as well as
180         single-lock algorithm.
181
182         Template parameters:
183         - \p T - type to be stored in the queue. The priority is a part of \p T type.
184         - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
185             It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
186             metafunction instead of \p Traits template argument.
187     */
188     template <typename T, class Traits = mspriority_queue::traits >
189     class MSPriorityQueue: public cds::bounded_container
190     {
191     public:
192         typedef T           value_type  ;   ///< Value type stored in the queue
193         typedef Traits      traits      ;   ///< Traits template parameter
194
195 #   ifdef CDS_DOXYGEN_INVOKED
196         typedef implementation_defined key_comparator  ;    ///< priority comparing functor based on opt::compare and opt::less option setter.
197 #   else
198         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
199 #   endif
200
201         typedef typename traits::lock_type lock_type;   ///< heap's size lock type
202         typedef typename traits::back_off  back_off;    ///< Back-off strategy
203         typedef typename traits::stat      stat;        ///< internal statistics type
204
205     protected:
206         //@cond
207         typedef cds::OS::ThreadId   tag_type;
208
209         enum tag_value {
210             Available   = -1,
211             Empty       = 0
212         };
213         //@endcond
214
215         //@cond
216         /// Heap item type
217         struct node {
218             value_type *        m_pVal  ;   ///< A value pointer
219             tag_type volatile   m_nTag  ;   ///< A tag
220             mutable lock_type   m_Lock  ;   ///< Node-level lock
221
222             /// Creates empty node
223             node()
224                 : m_pVal( nullptr )
225                 , m_nTag( tag_type(Empty) )
226             {}
227
228             /// Lock the node
229             void lock()
230             {
231                 m_Lock.lock();
232             }
233
234             /// Unlock the node
235             void unlock()
236             {
237                 m_Lock.unlock();
238             }
239         };
240         //@endcond
241
242     public:
243         typedef typename traits::buffer::template rebind<node>::other   buffer_type ;   ///< Heap array buffer type
244
245         //@cond
246         typedef cds::bitop::bit_reverse_counter<>           item_counter_type;
247         typedef typename item_counter_type::counter_type    counter_type;
248         //@endcond
249
250     protected:
251         item_counter_type   m_ItemCounter   ;   ///< Item counter
252         mutable lock_type   m_Lock          ;   ///< Heap's size lock
253         buffer_type         m_Heap          ;   ///< Heap array
254         stat                m_Stat          ;   ///< internal statistics accumulator
255
256     public:
257         /// Constructs empty priority queue
258         /**
259             For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored.
260         */
261         MSPriorityQueue( size_t nCapacity )
262             : m_Heap( nCapacity )
263         {}
264
265         /// Clears priority queue and destructs the object
266         ~MSPriorityQueue()
267         {
268             clear();
269         }
270
271         /// Inserts a item into priority queue
272         /**
273             If the priority queue is full, the function returns \p false,
274             no item has been added.
275             Otherwise, the function inserts the pointer to \p val into the heap
276             and returns \p true.
277
278             The function does not make a copy of \p val.
279         */
280         bool push( value_type& val )
281         {
282             tag_type const curId = cds::OS::get_current_thread_id();
283
284             // Insert new item at bottom of the heap
285             m_Lock.lock();
286             if ( m_ItemCounter.value() >= capacity() ) {
287                 // the heap is full
288                 m_Lock.unlock();
289                 m_Stat.onPushFailed();
290                 return false;
291             }
292
293             counter_type i = m_ItemCounter.inc();
294             if ( i >= m_Heap.capacity() ) {
295                 // the heap is full
296                 m_Lock.unlock();
297                 m_Stat.onPushFailed();
298                 return false;
299             }
300
301             node& refNode = m_Heap[i];
302             refNode.lock();
303             m_Lock.unlock();
304             assert( refNode.m_nTag == tag_type( Empty ));
305             assert( refNode.m_pVal == nullptr );
306             refNode.m_pVal = &val;
307             refNode.m_nTag = curId;
308             refNode.unlock();
309
310             // Move item towards top of heap while it has a higher priority than its parent
311             heapify_after_push( i, curId );
312
313             m_Stat.onPushSuccess();
314             return true;
315         }
316
317         /// Extracts item with high priority
318         /**
319             If the priority queue is empty, the function returns \p nullptr.
320             Otherwise, it returns the item extracted.
321         */
322         value_type * pop()
323         {
324             m_Lock.lock();
325             if ( m_ItemCounter.value() == 0 ) {
326                 // the heap is empty
327                 m_Lock.unlock();
328                 m_Stat.onPopFailed();
329                 return nullptr;
330             }
331             counter_type nBottom = m_ItemCounter.reversed_value();
332             m_ItemCounter.dec();
333             assert( nBottom < m_Heap.capacity() );
334             assert( nBottom > 0 );
335
336             node& refBottom = m_Heap[ nBottom ];
337             refBottom.lock();
338             m_Lock.unlock();
339             refBottom.m_nTag = tag_type(Empty);
340             value_type * pVal = refBottom.m_pVal;
341             refBottom.m_pVal = nullptr;
342             refBottom.unlock();
343
344             node& refTop = m_Heap[ 1 ];
345             refTop.lock();
346             if ( refTop.m_nTag == tag_type(Empty) ) {
347                 // nBottom == nTop
348                 refTop.unlock();
349                 m_Stat.onPopSuccess();
350                 return pVal;
351             }
352
353             std::swap( refTop.m_pVal, pVal );
354             refTop.m_nTag = tag_type( Available );
355
356             // refTop will be unlocked inside heapify_after_pop
357             heapify_after_pop( &refTop );
358
359             m_Stat.onPopSuccess();
360             return pVal;
361         }
362
363         /// Clears the queue (not atomic)
364         /**
365             This function is no atomic, but thread-safe
366         */
367         void clear()
368         {
369             clear_with( []( value_type const& /*src*/ ) {} );
370         }
371
372         /// Clears the queue (not atomic)
373         /**
374             This function is no atomic, but thread-safe.
375
376             For each item removed the functor \p f is called.
377             \p Func interface is:
378             \code
379                 struct clear_functor
380                 {
381                     void operator()( value_type& item );
382                 };
383             \endcode
384             A lambda function or a function pointer can be used as \p f.
385         */
386         template <typename Func>
387         void clear_with( Func f )
388         {
389             value_type * pVal;
390             while (( pVal = pop()) != nullptr )
391                 f( *pVal );
392         }
393
394         /// Checks is the priority queue is empty
395         bool empty() const
396         {
397             return size() == 0;
398         }
399
400         /// Checks if the priority queue is full
401         bool full() const
402         {
403             return size() == capacity();
404         }
405
406         /// Returns current size of priority queue
407         size_t size() const
408         {
409             std::unique_lock<lock_type> l( m_Lock );
410             return static_cast<size_t>( m_ItemCounter.value());
411         }
412
413         /// Return capacity of the priority queue
414         size_t capacity() const
415         {
416             // m_Heap[0] is not used
417             return m_Heap.capacity() - 1;
418         }
419
420         /// Returns const reference to internal statistics
421         stat const& statistics() const
422         {
423             return m_Stat;
424         }
425
426     protected:
427         //@cond
428
429         void heapify_after_push( counter_type i, tag_type curId )
430         {
431             key_comparator  cmp;
432             back_off        bkoff;
433
434             // Move item towards top of the heap while it has higher priority than parent
435             while ( i > 1 ) {
436                 bool bProgress = true;
437                 counter_type nParent = i / 2;
438                 node& refParent = m_Heap[nParent];
439                 refParent.lock();
440                 node& refItem = m_Heap[i];
441                 refItem.lock();
442
443                 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
444                     if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
445                         std::swap( refItem.m_nTag, refParent.m_nTag );
446                         std::swap( refItem.m_pVal, refParent.m_pVal );
447                         m_Stat.onPushHeapifySwap();
448                         i = nParent;
449                     }
450                     else {
451                         refItem.m_nTag = tag_type(Available);
452                         i = 0;
453                     }
454                 }
455                 else if ( refParent.m_nTag == tag_type( Empty ) ) {
456                     m_Stat.onItemMovedTop();
457                     i = 0;
458                 }
459                 else if ( refItem.m_nTag != curId ) {
460                     m_Stat.onItemMovedUp();
461                     i = nParent;
462                 }
463                 else {
464                     m_Stat.onPushEmptyPass();
465                     bProgress = false;
466                 }
467
468                 refItem.unlock();
469                 refParent.unlock();
470
471                 if ( !bProgress )
472                     bkoff();
473                 else
474                     bkoff.reset();
475             }
476
477             if ( i == 1 ) {
478                 node& refItem = m_Heap[i];
479                 refItem.lock();
480                 if ( refItem.m_nTag == curId )
481                     refItem.m_nTag = tag_type(Available);
482                 refItem.unlock();
483             }
484         }
485
486         void heapify_after_pop( node * pParent )
487         {
488             key_comparator cmp;
489             counter_type const nCapacity = m_Heap.capacity();
490
491             counter_type nParent = 1;
492             for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) {
493                 node* pChild = &m_Heap[ nChild ];
494                 pChild->lock();
495
496                 if ( pChild->m_nTag == tag_type( Empty )) {
497                     pChild->unlock();
498                     break;
499                 }
500
501                 counter_type const nRight = nChild + 1;
502                 if ( nRight < nCapacity ) {
503                     node& refRight = m_Heap[nRight];
504                     refRight.lock();
505
506                     if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) {
507                         // get right child
508                         pChild->unlock();
509                         nChild = nRight;
510                         pChild = &refRight;
511                     }
512                     else
513                         refRight.unlock();
514                 }
515
516                 // If child has higher priority than parent then swap
517                 // Otherwise stop
518                 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
519                     std::swap( pParent->m_nTag, pChild->m_nTag );
520                     std::swap( pParent->m_pVal, pChild->m_pVal );
521                     pParent->unlock();
522                     m_Stat.onPopHeapifySwap();
523                     nParent = nChild;
524                     pParent = pChild;
525                 }
526                 else {
527                     pChild->unlock();
528                     break;
529                 }
530             }
531             pParent->unlock();
532         }
533         //@endcond
534     };
535
536 }} // namespace cds::intrusive
537
538 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H