2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
32 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
34 #include <mutex> // std::unique_lock
35 #include <cds/intrusive/details/base.h>
36 #include <cds/sync/spinlock.h>
37 #include <cds/os/thread.h>
38 #include <cds/details/bit_reverse_counter.h>
39 #include <cds/intrusive/options.h>
40 #include <cds/opt/buffer.h>
41 #include <cds/opt/compare.h>
42 #include <cds/details/bounded_container.h>
44 namespace cds { namespace intrusive {
46 /// MSPriorityQueue related definitions
47 /** @ingroup cds_intrusive_helper
49 namespace mspriority_queue {
51 /// MSPriorityQueue statistics
52 template <typename Counter = cds::atomicity::event_counter>
54 typedef Counter event_counter ; ///< Event counter type
56 event_counter m_nPushCount; ///< Count of success push operation
57 event_counter m_nPopCount; ///< Count of success pop operation
58 event_counter m_nPushFailCount; ///< Count of failed ("the queue is full") push operation
59 event_counter m_nPopFailCount; ///< Count of failed ("the queue is empty") pop operation
60 event_counter m_nPushHeapifySwapCount; ///< Count of item swapping when heapifying in push
61 event_counter m_nPopHeapifySwapCount; ///< Count of item swapping when heapifying in pop
62 event_counter m_nItemMovedTop; ///< Count of events when \p push() encountered that inserted item was moved to top by a concurrent \p pop()
63 event_counter m_nItemMovedUp; ///< Count of events when \p push() encountered that inserted item was moved upwards by a concurrent \p pop()
64 event_counter m_nPushEmptyPass; ///< Count of empty pass during heapify via concurrent operations
67 void onPushSuccess() { ++m_nPushCount ;}
68 void onPopSuccess() { ++m_nPopCount ;}
69 void onPushFailed() { ++m_nPushFailCount ;}
70 void onPopFailed() { ++m_nPopFailCount ;}
71 void onPushHeapifySwap() { ++m_nPushHeapifySwapCount ;}
72 void onPopHeapifySwap() { ++m_nPopHeapifySwapCount ;}
74 void onItemMovedTop() { ++m_nItemMovedTop ;}
75 void onItemMovedUp() { ++m_nItemMovedUp ;}
76 void onPushEmptyPass() { ++m_nPushEmptyPass ;}
80 /// MSPriorityQueue empty statistics
83 void onPushSuccess() const {}
84 void onPopSuccess() const {}
85 void onPushFailed() const {}
86 void onPopFailed() const {}
87 void onPushHeapifySwap() const {}
88 void onPopHeapifySwap() const {}
90 void onItemMovedTop() const {}
91 void onItemMovedUp() const {}
92 void onPushEmptyPass() const {}
96 /// MSPriorityQueue traits
100 The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer.
102 You may specify any type of buffer's value since at instantiation time
103 the \p buffer::rebind member metafunction is called to change type
104 of values stored in the buffer.
106 typedef opt::v::initialized_dynamic_buffer<void *, CDS_DEFAULT_ALLOCATOR, false> buffer;
108 /// Priority compare functor
110 No default functor is provided. If the option is not specified, the \p less is used.
112 typedef opt::none compare;
114 /// Specifies binary predicate used for priority comparing.
116 Default is \p std::less<T>.
118 typedef opt::none less;
120 /// Type of mutual-exclusion lock
121 typedef cds::sync::spin lock_type;
123 /// Back-off strategy
124 typedef backoff::yield back_off;
126 /// Internal statistics
128 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
129 or any other with interface like \p %mspriority_queue::stat
131 typedef empty_stat stat;
134 /// Metafunction converting option list to traits
137 - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer.
138 Default is \p %opt::v::initialized_dynamic_buffer.
139 You may specify any type of value for the buffer since at instantiation time
140 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
141 - \p opt::compare - priority compare functor. No default functor is provided.
142 If the option is not specified, the \p opt::less is used.
143 - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
144 - \p opt::lock_type - lock type. Default is \p cds::sync::spin
145 - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
146 - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
148 template <typename... Options>
150 # ifdef CDS_DOXYGEN_INVOKED
151 typedef implementation_defined type ; ///< Metafunction result
153 typedef typename cds::opt::make_options<
154 typename cds::opt::find_type_traits< traits, Options... >::type
160 } // namespace mspriority_queue
162 /// Michael & Scott array-based lock-based concurrent priority queue heap
163 /** @ingroup cds_intrusive_priority_queue
165 - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
166 "An efficient algorithm for concurrent priority queue heaps"
168 \p %MSPriorityQueue augments the standard array-based heap data structure with
169 a mutual-exclusion lock on the heap's size and locks on each node in the heap.
170 Each node also has a tag that indicates whether
171 it is empty, valid, or in a transient state due to an update to the heap
172 by an inserting thread.
173 The algorithm allows concurrent insertions and deletions in opposite directions,
174 without risking deadlock and without the need for special server threads.
175 It also uses a "bit-reversal" technique to scatter accesses across the fringe
176 of the tree to reduce contention.
177 On large heaps the algorithm achieves significant performance improvements
178 over serialized single-lock algorithm, for various insertion/deletion
179 workloads. For small heaps it still performs well, but not as well as
180 single-lock algorithm.
183 - \p T - type to be stored in the queue. The priority is a part of \p T type.
184 - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
185 It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
186 metafunction instead of \p Traits template argument.
188 template <typename T, class Traits = mspriority_queue::traits >
189 class MSPriorityQueue: public cds::bounded_container
192 typedef T value_type ; ///< Value type stored in the queue
193 typedef Traits traits ; ///< Traits template parameter
195 # ifdef CDS_DOXYGEN_INVOKED
196 typedef implementation_defined key_comparator ; ///< priority comparing functor based on opt::compare and opt::less option setter.
198 typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
201 typedef typename traits::lock_type lock_type; ///< heap's size lock type
202 typedef typename traits::back_off back_off; ///< Back-off strategy
203 typedef typename traits::stat stat; ///< internal statistics type
207 typedef cds::OS::ThreadId tag_type;
218 value_type * m_pVal ; ///< A value pointer
219 tag_type volatile m_nTag ; ///< A tag
220 mutable lock_type m_Lock ; ///< Node-level lock
222 /// Creates empty node
225 , m_nTag( tag_type(Empty) )
243 typedef typename traits::buffer::template rebind<node>::other buffer_type ; ///< Heap array buffer type
246 typedef cds::bitop::bit_reverse_counter<> item_counter_type;
247 typedef typename item_counter_type::counter_type counter_type;
251 item_counter_type m_ItemCounter ; ///< Item counter
252 mutable lock_type m_Lock ; ///< Heap's size lock
253 buffer_type m_Heap ; ///< Heap array
254 stat m_Stat ; ///< internal statistics accumulator
257 /// Constructs empty priority queue
259 For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored.
261 MSPriorityQueue( size_t nCapacity )
262 : m_Heap( nCapacity )
265 /// Clears priority queue and destructs the object
271 /// Inserts a item into priority queue
273 If the priority queue is full, the function returns \p false,
274 no item has been added.
275 Otherwise, the function inserts the pointer to \p val into the heap
278 The function does not make a copy of \p val.
280 bool push( value_type& val )
282 tag_type const curId = cds::OS::get_current_thread_id();
284 // Insert new item at bottom of the heap
286 if ( m_ItemCounter.value() >= capacity() ) {
289 m_Stat.onPushFailed();
293 counter_type i = m_ItemCounter.inc();
294 if ( i >= m_Heap.capacity() ) {
297 m_Stat.onPushFailed();
301 node& refNode = m_Heap[i];
304 assert( refNode.m_nTag == tag_type( Empty ));
305 assert( refNode.m_pVal == nullptr );
306 refNode.m_pVal = &val;
307 refNode.m_nTag = curId;
310 // Move item towards top of heap while it has a higher priority than its parent
311 heapify_after_push( i, curId );
313 m_Stat.onPushSuccess();
317 /// Extracts item with high priority
319 If the priority queue is empty, the function returns \p nullptr.
320 Otherwise, it returns the item extracted.
325 if ( m_ItemCounter.value() == 0 ) {
328 m_Stat.onPopFailed();
331 counter_type nBottom = m_ItemCounter.reversed_value();
333 assert( nBottom < m_Heap.capacity() );
334 assert( nBottom > 0 );
336 node& refBottom = m_Heap[ nBottom ];
339 refBottom.m_nTag = tag_type(Empty);
340 value_type * pVal = refBottom.m_pVal;
341 refBottom.m_pVal = nullptr;
344 node& refTop = m_Heap[ 1 ];
346 if ( refTop.m_nTag == tag_type(Empty) ) {
349 m_Stat.onPopSuccess();
353 std::swap( refTop.m_pVal, pVal );
354 refTop.m_nTag = tag_type( Available );
356 // refTop will be unlocked inside heapify_after_pop
357 heapify_after_pop( &refTop );
359 m_Stat.onPopSuccess();
363 /// Clears the queue (not atomic)
365 This function is no atomic, but thread-safe
369 clear_with( []( value_type const& /*src*/ ) {} );
372 /// Clears the queue (not atomic)
374 This function is no atomic, but thread-safe.
376 For each item removed the functor \p f is called.
377 \p Func interface is:
381 void operator()( value_type& item );
384 A lambda function or a function pointer can be used as \p f.
386 template <typename Func>
387 void clear_with( Func f )
390 while (( pVal = pop()) != nullptr )
394 /// Checks is the priority queue is empty
400 /// Checks if the priority queue is full
403 return size() == capacity();
406 /// Returns current size of priority queue
409 std::unique_lock<lock_type> l( m_Lock );
410 return static_cast<size_t>( m_ItemCounter.value());
413 /// Return capacity of the priority queue
414 size_t capacity() const
416 // m_Heap[0] is not used
417 return m_Heap.capacity() - 1;
420 /// Returns const reference to internal statistics
421 stat const& statistics() const
429 void heapify_after_push( counter_type i, tag_type curId )
434 // Move item towards top of the heap while it has higher priority than parent
436 bool bProgress = true;
437 counter_type nParent = i / 2;
438 node& refParent = m_Heap[nParent];
440 node& refItem = m_Heap[i];
443 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
444 if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
445 std::swap( refItem.m_nTag, refParent.m_nTag );
446 std::swap( refItem.m_pVal, refParent.m_pVal );
447 m_Stat.onPushHeapifySwap();
451 refItem.m_nTag = tag_type(Available);
455 else if ( refParent.m_nTag == tag_type( Empty ) ) {
456 m_Stat.onItemMovedTop();
459 else if ( refItem.m_nTag != curId ) {
460 m_Stat.onItemMovedUp();
464 m_Stat.onPushEmptyPass();
478 node& refItem = m_Heap[i];
480 if ( refItem.m_nTag == curId )
481 refItem.m_nTag = tag_type(Available);
486 void heapify_after_pop( node * pParent )
489 counter_type const nCapacity = m_Heap.capacity();
491 counter_type nParent = 1;
492 for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) {
493 node* pChild = &m_Heap[ nChild ];
496 if ( pChild->m_nTag == tag_type( Empty )) {
501 counter_type const nRight = nChild + 1;
502 if ( nRight < nCapacity ) {
503 node& refRight = m_Heap[nRight];
506 if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) {
516 // If child has higher priority than parent then swap
518 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
519 std::swap( pParent->m_nTag, pChild->m_nTag );
520 std::swap( pParent->m_pVal, pChild->m_pVal );
522 m_Stat.onPopHeapifySwap();
536 }} // namespace cds::intrusive
538 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H