2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
32 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
34 #include <mutex> // std::unique_lock
35 #include <cds/intrusive/details/base.h>
36 #include <cds/sync/spinlock.h>
37 #include <cds/os/thread.h>
38 #include <cds/details/bit_reverse_counter.h>
39 #include <cds/intrusive/options.h>
40 #include <cds/opt/buffer.h>
41 #include <cds/opt/compare.h>
42 #include <cds/details/bounded_container.h>
44 namespace cds { namespace intrusive {
46 /// MSPriorityQueue related definitions
47 /** @ingroup cds_intrusive_helper
49 namespace mspriority_queue {
51 /// MSPriorityQueue statistics
52 template <typename Counter = cds::atomicity::event_counter>
54 typedef Counter event_counter ; ///< Event counter type
56 event_counter m_nPushCount; ///< Count of success push operation
57 event_counter m_nPopCount; ///< Count of success pop operation
58 event_counter m_nPushFailCount; ///< Count of failed ("the queue is full") push operation
59 event_counter m_nPopFailCount; ///< Count of failed ("the queue is empty") pop operation
60 event_counter m_nPushHeapifySwapCount; ///< Count of item swapping when heapifying in push
61 event_counter m_nPopHeapifySwapCount; ///< Count of item swapping when heapifying in pop
62 event_counter m_nItemMovedTop; ///< Count of events when \p push() encountered that inserted item was moved to top by a concurrent \p pop()
63 event_counter m_nItemMovedUp; ///< Count of events when \p push() encountered that inserted item was moved upwards by a concurrent \p pop()
64 event_counter m_nPushEmptyPass; ///< Count of empty pass during heapify via concurrent operations
67 void onPushSuccess() { ++m_nPushCount ;}
68 void onPopSuccess() { ++m_nPopCount ;}
69 void onPushFailed() { ++m_nPushFailCount ;}
70 void onPopFailed() { ++m_nPopFailCount ;}
71 void onPushHeapifySwap() { ++m_nPushHeapifySwapCount ;}
72 void onPopHeapifySwap() { ++m_nPopHeapifySwapCount ;}
74 void onItemMovedTop() { ++m_nItemMovedTop ;}
75 void onItemMovedUp() { ++m_nItemMovedUp ;}
76 void onPushEmptyPass() { ++m_nPushEmptyPass ;}
80 /// MSPriorityQueue empty statistics
83 void onPushSuccess() const {}
84 void onPopSuccess() const {}
85 void onPushFailed() const {}
86 void onPopFailed() const {}
87 void onPushHeapifySwap() const {}
88 void onPopHeapifySwap() const {}
90 void onItemMovedTop() const {}
91 void onItemMovedUp() const {}
92 void onPushEmptyPass() const {}
96 /// Monotonic item counter, see \p traits::item_counter for explanation
97 class monotonic_counter
101 typedef size_t counter_type;
127 /// MSPriorityQueue traits
131 The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer.
133 You may specify any type of buffer's value since at instantiation time
134 the \p buffer::rebind member metafunction is called to change type
135 of values stored in the buffer.
137 typedef opt::v::initialized_dynamic_buffer<void *> buffer;
139 /// Priority compare functor
141 No default functor is provided. If the option is not specified, the \p less is used.
143 typedef opt::none compare;
145 /// Specifies binary predicate used for priority comparing.
147 Default is \p std::less<T>.
149 typedef opt::none less;
151 /// Type of mutual-exclusion lock. The lock is not need to be recursive.
152 typedef cds::sync::spin lock_type;
154 /// Back-off strategy
155 typedef backoff::yield back_off;
157 /// Internal statistics
159 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
160 or any other with interface like \p %mspriority_queue::stat
162 typedef empty_stat stat;
164 /// Item counter type
166 Two type are possible:
167 - \p cds::bitop::bit_reverse_counter - a counter described in <a href="http://www.research.ibm.com/people/m/michael/ipl-1996.pdf">original paper</a>,
168 which was developed for reducing lock contention. However, bit-reversing technigue requires more memory than classic heapifying algorithm
169 because of sparsing of elements: for priority queue of max size \p N the bit-reversing technique requires array size up to 2<sup>K</sup>
170 where \p K - the nearest power of two such that <tt>2<sup>K</sup> >= N</tt>.
171 - \p mspriority_queue::monotonic_counter - a classic monotonic item counter. This counter can lead to false sharing under high contention.
172 By the other hand, for priority queue of max size \p N it requires \p N array size.
174 By default, \p MSPriorityQueue uses \p %cds::bitop::bit_reverse_counter as described in original paper.
176 typedef cds::bitop::bit_reverse_counter<> item_counter;
179 /// Metafunction converting option list to traits
182 - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer.
183 Default is \p %opt::v::initialized_dynamic_buffer.
184 You may specify any type of value for the buffer since at instantiation time
185 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
186 - \p opt::compare - priority compare functor. No default functor is provided.
187 If the option is not specified, the \p opt::less is used.
188 - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
189 - \p opt::lock_type - lock type. Default is \p cds::sync::spin
190 - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
191 - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
192 - \p opt::item_counter - an item counter type for \p MSPriorityQueue.
193 Available type: \p cds::bitop::bit_reverse_counter, \p mspriority_queue::monotonic_counter. See \p traits::item_counter for details.
195 template <typename... Options>
197 # ifdef CDS_DOXYGEN_INVOKED
198 typedef implementation_defined type ; ///< Metafunction result
200 typedef typename cds::opt::make_options<
201 typename cds::opt::find_type_traits< traits, Options... >::type
207 } // namespace mspriority_queue
209 /// Michael & Scott array-based lock-based concurrent priority queue heap
210 /** @ingroup cds_intrusive_priority_queue
212 - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
213 "An efficient algorithm for concurrent priority queue heaps"
215 \p %MSPriorityQueue augments the standard array-based heap data structure with
216 a mutual-exclusion lock on the heap's size and locks on each node in the heap.
217 Each node also has a tag that indicates whether
218 it is empty, valid, or in a transient state due to an update to the heap
219 by an inserting thread.
220 The algorithm allows concurrent insertions and deletions in opposite directions,
221 without risking deadlock and without the need for special server threads.
222 It also uses a "bit-reversal" technique to scatter accesses across the fringe
223 of the tree to reduce contention.
224 On large heaps the algorithm achieves significant performance improvements
225 over serialized single-lock algorithm, for various insertion/deletion
226 workloads. For small heaps it still performs well, but not as well as
227 single-lock algorithm.
230 - \p T - type to be stored in the queue. The priority is a part of \p T type.
231 - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
232 It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
233 metafunction instead of \p Traits template argument.
235 template <typename T, class Traits = mspriority_queue::traits >
236 class MSPriorityQueue: public cds::bounded_container
239 typedef T value_type ; ///< Value type stored in the queue
240 typedef Traits traits ; ///< Traits template parameter
242 # ifdef CDS_DOXYGEN_INVOKED
243 typedef implementation_defined key_comparator ; ///< priority comparing functor based on opt::compare and opt::less option setter.
245 typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
248 typedef typename traits::lock_type lock_type; ///< heap's size lock type
249 typedef typename traits::back_off back_off; ///< Back-off strategy
250 typedef typename traits::stat stat; ///< internal statistics type, see \p mspriority_queue::traits::stat
251 typedef typename traits::item_counter item_counter;///< Item counter type, see \p mspriority_queue::traits::item_counter
255 typedef cds::OS::ThreadId tag_type;
266 value_type * m_pVal ; ///< A value pointer
267 tag_type volatile m_nTag ; ///< A tag
268 mutable lock_type m_Lock ; ///< Node-level lock
270 /// Creates empty node
273 , m_nTag( tag_type(Empty) )
291 typedef typename traits::buffer::template rebind<node>::other buffer_type ; ///< Heap array buffer type
294 typedef typename item_counter::counter_type counter_type;
298 item_counter m_ItemCounter ; ///< Item counter
299 mutable lock_type m_Lock ; ///< Heap's size lock
300 buffer_type m_Heap ; ///< Heap array
301 stat m_Stat ; ///< internal statistics accumulator
304 /// Constructs empty priority queue
306 For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored.
308 MSPriorityQueue( size_t nCapacity )
309 : m_Heap( nCapacity )
312 /// Clears priority queue and destructs the object
318 /// Inserts a item into priority queue
320 If the priority queue is full, the function returns \p false,
321 no item has been added.
322 Otherwise, the function inserts the pointer to \p val into the heap
325 The function does not make a copy of \p val.
327 bool push( value_type& val )
329 tag_type const curId = cds::OS::get_current_thread_id();
331 // Insert new item at bottom of the heap
333 if ( m_ItemCounter.value() >= capacity() ) {
336 m_Stat.onPushFailed();
340 counter_type i = m_ItemCounter.inc();
341 assert( i < m_Heap.capacity() );
343 node& refNode = m_Heap[i];
346 assert( refNode.m_nTag == tag_type( Empty ));
347 assert( refNode.m_pVal == nullptr );
348 refNode.m_pVal = &val;
349 refNode.m_nTag = curId;
352 // Move item towards top of heap while it has a higher priority than its parent
353 heapify_after_push( i, curId );
355 m_Stat.onPushSuccess();
359 /// Extracts item with high priority
361 If the priority queue is empty, the function returns \p nullptr.
362 Otherwise, it returns the item extracted.
366 node& refTop = m_Heap[1];
369 if ( m_ItemCounter.value() == 0 ) {
372 m_Stat.onPopFailed();
375 counter_type nBottom = m_ItemCounter.dec();
376 assert( nBottom < m_Heap.capacity() );
377 assert( nBottom > 0 );
380 if ( nBottom == 1 ) {
381 refTop.m_nTag = tag_type( Empty );
382 value_type * pVal = refTop.m_pVal;
383 refTop.m_pVal = nullptr;
386 m_Stat.onPopSuccess();
390 node& refBottom = m_Heap[nBottom];
393 refBottom.m_nTag = tag_type(Empty);
394 value_type * pVal = refBottom.m_pVal;
395 refBottom.m_pVal = nullptr;
398 if ( refTop.m_nTag == tag_type(Empty) ) {
401 m_Stat.onPopSuccess();
405 std::swap( refTop.m_pVal, pVal );
406 refTop.m_nTag = tag_type( Available );
408 // refTop will be unlocked inside heapify_after_pop
409 heapify_after_pop( &refTop );
411 m_Stat.onPopSuccess();
415 /// Clears the queue (not atomic)
417 This function is no atomic, but thread-safe
421 clear_with( []( value_type const& /*src*/ ) {} );
424 /// Clears the queue (not atomic)
426 This function is no atomic, but thread-safe.
428 For each item removed the functor \p f is called.
429 \p Func interface is:
433 void operator()( value_type& item );
436 A lambda function or a function pointer can be used as \p f.
438 template <typename Func>
439 void clear_with( Func f )
442 while (( pVal = pop()) != nullptr )
446 /// Checks is the priority queue is empty
452 /// Checks if the priority queue is full
455 return size() == capacity();
458 /// Returns current size of priority queue
461 std::unique_lock<lock_type> l( m_Lock );
462 return static_cast<size_t>( m_ItemCounter.value());
465 /// Return capacity of the priority queue
466 size_t capacity() const
468 // m_Heap[0] is not used
469 return m_Heap.capacity() - 1;
472 /// Returns const reference to internal statistics
473 stat const& statistics() const
481 void heapify_after_push( counter_type i, tag_type curId )
486 // Move item towards top of the heap while it has higher priority than parent
488 bool bProgress = true;
489 counter_type nParent = i / 2;
490 node& refParent = m_Heap[nParent];
492 node& refItem = m_Heap[i];
495 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
496 if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
497 std::swap( refItem.m_nTag, refParent.m_nTag );
498 std::swap( refItem.m_pVal, refParent.m_pVal );
499 m_Stat.onPushHeapifySwap();
503 refItem.m_nTag = tag_type(Available);
507 else if ( refParent.m_nTag == tag_type( Empty ) ) {
508 m_Stat.onItemMovedTop();
511 else if ( refItem.m_nTag != curId ) {
512 m_Stat.onItemMovedUp();
516 m_Stat.onPushEmptyPass();
530 node& refItem = m_Heap[i];
532 if ( refItem.m_nTag == curId )
533 refItem.m_nTag = tag_type(Available);
538 void heapify_after_pop( node * pParent )
541 counter_type const nCapacity = m_Heap.capacity();
543 counter_type nParent = 1;
544 for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) {
545 node* pChild = &m_Heap[ nChild ];
548 if ( pChild->m_nTag == tag_type( Empty )) {
553 counter_type const nRight = nChild + 1;
554 if ( nRight < nCapacity ) {
555 node& refRight = m_Heap[nRight];
558 if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) {
568 // If child has higher priority than parent then swap
570 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
571 std::swap( pParent->m_nTag, pChild->m_nTag );
572 std::swap( pParent->m_pVal, pChild->m_pVal );
574 m_Stat.onPopHeapifySwap();
588 }} // namespace cds::intrusive
590 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H