2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
32 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
34 #include <mutex> // std::unique_lock
35 #include <cds/intrusive/details/base.h>
36 #include <cds/sync/spinlock.h>
37 #include <cds/os/thread.h>
38 #include <cds/details/bit_reverse_counter.h>
39 #include <cds/intrusive/options.h>
40 #include <cds/opt/buffer.h>
41 #include <cds/opt/compare.h>
42 #include <cds/details/bounded_container.h>
44 namespace cds { namespace intrusive {
46 /// MSPriorityQueue related definitions
47 /** @ingroup cds_intrusive_helper
49 namespace mspriority_queue {
51 /// MSPriorityQueue statistics
52 template <typename Counter = cds::atomicity::event_counter>
54 typedef Counter event_counter ; ///< Event counter type
56 event_counter m_nPushCount ; ///< Count of success push operation
57 event_counter m_nPopCount ; ///< Count of success pop operation
58 event_counter m_nPushFailCount ; ///< Count of failed ("the queue is full") push operation
59 event_counter m_nPopFailCount ; ///< Count of failed ("the queue is empty") pop operation
60 event_counter m_nPushHeapifySwapCount ; ///< Count of item swapping when heapifying in push
61 event_counter m_nPopHeapifySwapCount ; ///< Count of item swapping when heapifying in pop
64 void onPushSuccess() { ++m_nPushCount ;}
65 void onPopSuccess() { ++m_nPopCount ;}
66 void onPushFailed() { ++m_nPushFailCount ;}
67 void onPopFailed() { ++m_nPopFailCount ;}
68 void onPushHeapifySwap() { ++m_nPushHeapifySwapCount ;}
69 void onPopHeapifySwap() { ++m_nPopHeapifySwapCount ;}
73 /// MSPriorityQueue empty statistics
76 void onPushSuccess() {}
77 void onPopSuccess() {}
78 void onPushFailed() {}
80 void onPushHeapifySwap() {}
81 void onPopHeapifySwap() {}
85 /// MSPriorityQueue traits
89 The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer.
91 You may specify any type of buffer's value since at instantiation time
92 the \p buffer::rebind member metafunction is called to change type
93 of values stored in the buffer.
95 typedef opt::v::initialized_dynamic_buffer<void *> buffer;
97 /// Priority compare functor
99 No default functor is provided. If the option is not specified, the \p less is used.
101 typedef opt::none compare;
103 /// Specifies binary predicate used for priority comparing.
105 Default is \p std::less<T>.
107 typedef opt::none less;
109 /// Type of mutual-exclusion lock
110 typedef cds::sync::spin lock_type;
112 /// Back-off strategy
113 typedef backoff::yield back_off;
115 /// Internal statistics
117 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
118 or any other with interface like \p %mspriority_queue::stat
120 typedef empty_stat stat;
123 /// Metafunction converting option list to traits
126 - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer.
127 Default is \p %opt::v::initialized_dynamic_buffer.
128 You may specify any type of value for the buffer since at instantiation time
129 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
130 - \p opt::compare - priority compare functor. No default functor is provided.
131 If the option is not specified, the \p opt::less is used.
132 - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
133 - \p opt::lock_type - lock type. Default is \p cds::sync::spin
134 - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
135 - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
137 template <typename... Options>
139 # ifdef CDS_DOXYGEN_INVOKED
140 typedef implementation_defined type ; ///< Metafunction result
142 typedef typename cds::opt::make_options<
143 typename cds::opt::find_type_traits< traits, Options... >::type
149 } // namespace mspriority_queue
151 /// Michael & Scott array-based lock-based concurrent priority queue heap
152 /** @ingroup cds_intrusive_priority_queue
154 - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
155 "An efficient algorithm for concurrent priority queue heaps"
157 \p %MSPriorityQueue augments the standard array-based heap data structure with
158 a mutual-exclusion lock on the heap's size and locks on each node in the heap.
159 Each node also has a tag that indicates whether
160 it is empty, valid, or in a transient state due to an update to the heap
161 by an inserting thread.
162 The algorithm allows concurrent insertions and deletions in opposite directions,
163 without risking deadlock and without the need for special server threads.
164 It also uses a "bit-reversal" technique to scatter accesses across the fringe
165 of the tree to reduce contention.
166 On large heaps the algorithm achieves significant performance improvements
167 over serialized single-lock algorithm, for various insertion/deletion
168 workloads. For small heaps it still performs well, but not as well as
169 single-lock algorithm.
172 - \p T - type to be stored in the queue. The priority is a part of \p T type.
173 - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
174 It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
175 metafunction instead of \p Traits template argument.
177 template <typename T, class Traits = mspriority_queue::traits >
178 class MSPriorityQueue: public cds::bounded_container
181 typedef T value_type ; ///< Value type stored in the queue
182 typedef Traits traits ; ///< Traits template parameter
184 # ifdef CDS_DOXYGEN_INVOKED
185 typedef implementation_defined key_comparator ; ///< priority comparing functor based on opt::compare and opt::less option setter.
187 typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
190 typedef typename traits::lock_type lock_type; ///< heap's size lock type
191 typedef typename traits::back_off back_off; ///< Back-off strategy
192 typedef typename traits::stat stat; ///< internal statistics type
196 typedef cds::OS::ThreadId tag_type;
207 value_type * m_pVal ; ///< A value pointer
208 tag_type volatile m_nTag ; ///< A tag
209 mutable lock_type m_Lock ; ///< Node-level lock
211 /// Creates empty node
214 , m_nTag( tag_type(Empty) )
232 typedef typename traits::buffer::template rebind<node>::other buffer_type ; ///< Heap array buffer type
235 typedef cds::bitop::bit_reverse_counter<> item_counter_type;
236 typedef typename item_counter_type::counter_type counter_type;
240 item_counter_type m_ItemCounter ; ///< Item counter
241 mutable lock_type m_Lock ; ///< Heap's size lock
242 buffer_type m_Heap ; ///< Heap array
243 stat m_Stat ; ///< internal statistics accumulator
246 /// Constructs empty priority queue
248 For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored.
250 MSPriorityQueue( size_t nCapacity )
251 : m_Heap( nCapacity )
254 /// Clears priority queue and destructs the object
260 /// Inserts a item into priority queue
262 If the priority queue is full, the function returns \p false,
263 no item has been added.
264 Otherwise, the function inserts the pointer to \p val into the heap
267 The function does not make a copy of \p val.
269 bool push( value_type& val )
271 tag_type const curId = cds::OS::get_current_thread_id();
273 // Insert new item at bottom of the heap
275 if ( m_ItemCounter.value() >= capacity() ) {
278 m_Stat.onPushFailed();
282 counter_type i = m_ItemCounter.inc();
283 assert( i < m_Heap.capacity() );
285 node& refNode = m_Heap[i];
288 refNode.m_pVal = &val;
289 refNode.m_nTag = curId;
292 // Move item towards top of the heap while it has higher priority than parent
293 heapify_after_push( i, curId );
295 m_Stat.onPushSuccess();
299 /// Extracts item with high priority
301 If the priority queue is empty, the function returns \p nullptr.
302 Otherwise, it returns the item extracted.
307 if ( m_ItemCounter.value() == 0 ) {
310 m_Stat.onPopFailed();
313 counter_type nBottom = m_ItemCounter.reversed_value();
315 // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
316 // Consequently, "<=" is here
317 assert( nBottom <= capacity() );
318 assert( nBottom > 0 );
320 node& refBottom = m_Heap[ nBottom ];
323 refBottom.m_nTag = tag_type(Empty);
324 value_type * pVal = refBottom.m_pVal;
325 refBottom.m_pVal = nullptr;
328 node& refTop = m_Heap[ 1 ];
330 if ( refTop.m_nTag == tag_type(Empty) ) {
333 m_Stat.onPopSuccess();
337 std::swap( refTop.m_pVal, pVal );
338 refTop.m_nTag = tag_type( Available );
340 // refTop will be unlocked inside heapify_after_pop
341 heapify_after_pop( 1, &refTop );
343 m_Stat.onPopSuccess();
347 /// Clears the queue (not atomic)
349 This function is no atomic, but thread-safe
353 clear_with( []( value_type const& /*src*/ ) {} );
356 /// Clears the queue (not atomic)
358 This function is no atomic, but thread-safe.
360 For each item removed the functor \p f is called.
361 \p Func interface is:
365 void operator()( value_type& item );
368 A lambda function or a function pointer can be used as \p f.
370 template <typename Func>
371 void clear_with( Func f )
374 value_type * pVal = pop();
380 /// Checks is the priority queue is empty
386 /// Checks if the priority queue is full
389 return size() == capacity();
392 /// Returns current size of priority queue
395 std::unique_lock<lock_type> l( m_Lock );
396 size_t nSize = (size_t) m_ItemCounter.value();
400 /// Return capacity of the priority queue
401 size_t capacity() const
403 // m_Heap[0] is not used
404 return m_Heap.capacity() - 1;
407 /// Returns const reference to internal statistics
408 stat const& statistics() const
416 void heapify_after_push( counter_type i, tag_type curId )
421 // Move item towards top of the heap while it has higher priority than parent
423 bool bProgress = true;
424 counter_type nParent = i / 2;
425 node& refParent = m_Heap[nParent];
427 node& refItem = m_Heap[i];
430 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
431 if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
432 std::swap( refItem.m_nTag, refParent.m_nTag );
433 std::swap( refItem.m_pVal, refParent.m_pVal );
434 m_Stat.onPushHeapifySwap();
438 refItem.m_nTag = tag_type(Available);
442 else if ( refParent.m_nTag == tag_type(Empty) )
444 else if ( refItem.m_nTag != curId )
459 node& refItem = m_Heap[i];
461 if ( refItem.m_nTag == curId )
462 refItem.m_nTag = tag_type(Available);
467 void heapify_after_pop( counter_type nParent, node * pParent )
471 while ( nParent < m_Heap.capacity() / 2 ) {
472 counter_type nLeft = nParent * 2;
473 counter_type nRight = nLeft + 1;
474 node& refLeft = m_Heap[nLeft];
475 node& refRight = m_Heap[nRight];
481 if ( refLeft.m_nTag == tag_type(Empty) ) {
486 else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
497 // If child has higher priority that parent then swap
499 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
500 std::swap( pParent->m_nTag, pChild->m_nTag );
501 std::swap( pParent->m_pVal, pChild->m_pVal );
503 m_Stat.onPopHeapifySwap();
517 }} // namespace cds::intrusive
519 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H