e3d211efa82212464db8689218cfa425f59deda4
[libcds.git] / cds / intrusive / mspriority_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
32 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
33
34 #include <mutex>  // std::unique_lock
35 #include <cds/intrusive/details/base.h>
36 #include <cds/sync/spinlock.h>
37 #include <cds/os/thread.h>
38 #include <cds/details/bit_reverse_counter.h>
39 #include <cds/intrusive/options.h>
40 #include <cds/opt/buffer.h>
41 #include <cds/opt/compare.h>
42 #include <cds/details/bounded_container.h>
43
44 namespace cds { namespace intrusive {
45
46     /// MSPriorityQueue related definitions
47     /** @ingroup cds_intrusive_helper
48     */
49     namespace mspriority_queue {
50
51         /// MSPriorityQueue statistics
52         template <typename Counter = cds::atomicity::event_counter>
53         struct stat {
54             typedef Counter   event_counter ; ///< Event counter type
55
56             event_counter   m_nPushCount;            ///< Count of success push operation
57             event_counter   m_nPopCount;             ///< Count of success pop operation
58             event_counter   m_nPushFailCount;        ///< Count of failed ("the queue is full") push operation
59             event_counter   m_nPopFailCount;         ///< Count of failed ("the queue is empty") pop operation
60             event_counter   m_nPushHeapifySwapCount; ///< Count of item swapping when heapifying in push
61             event_counter   m_nPopHeapifySwapCount;  ///< Count of item swapping when heapifying in pop
62             event_counter   m_nItemMovedTop;         ///< Count of events when \p push() encountered that inserted item was moved to top by a concurrent \p pop()
63             event_counter   m_nItemMovedUp;          ///< Count of events when \p push() encountered that inserted item was moved upwards by a concurrent \p pop()
64             event_counter   m_nPushEmptyPass;        ///< Count of empty pass during heapify via concurrent operations
65
66             //@cond
67             void onPushSuccess()            { ++m_nPushCount            ;}
68             void onPopSuccess()             { ++m_nPopCount             ;}
69             void onPushFailed()             { ++m_nPushFailCount        ;}
70             void onPopFailed()              { ++m_nPopFailCount         ;}
71             void onPushHeapifySwap()        { ++m_nPushHeapifySwapCount ;}
72             void onPopHeapifySwap()         { ++m_nPopHeapifySwapCount  ;}
73
74             void onItemMovedTop()           { ++m_nItemMovedTop         ;}
75             void onItemMovedUp()            { ++m_nItemMovedUp          ;}
76             void onPushEmptyPass()          { ++m_nPushEmptyPass        ;}
77             //@endcond
78         };
79
80         /// MSPriorityQueue empty statistics
81         struct empty_stat {
82             //@cond
83             void onPushSuccess()            const {}
84             void onPopSuccess()             const {}
85             void onPushFailed()             const {}
86             void onPopFailed()              const {}
87             void onPushHeapifySwap()        const {}
88             void onPopHeapifySwap()         const {}
89
90             void onItemMovedTop()           const {}
91             void onItemMovedUp()            const {}
92             void onPushEmptyPass()          const {}
93             //@endcond
94         };
95
96         /// MSPriorityQueue traits
97         struct traits {
98             /// Storage type
99             /**
100                 The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer.
101
102                 You may specify any type of buffer's value since at instantiation time
103                 the \p buffer::rebind member metafunction is called to change type
104                 of values stored in the buffer.
105             */
106             typedef opt::v::initialized_dynamic_buffer<void *>  buffer;
107
108             /// Priority compare functor
109             /**
110                 No default functor is provided. If the option is not specified, the \p less is used.
111             */
112             typedef opt::none       compare;
113
114             /// Specifies binary predicate used for priority comparing.
115             /**
116                 Default is \p std::less<T>.
117             */
118             typedef opt::none       less;
119
120             /// Type of mutual-exclusion lock
121             typedef cds::sync::spin lock_type;
122
123             /// Back-off strategy
124             typedef backoff::yield      back_off;
125
126             /// Internal statistics
127             /**
128                 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
129                 or any other with interface like \p %mspriority_queue::stat
130             */
131             typedef empty_stat      stat;
132         };
133
134         /// Metafunction converting option list to traits
135         /**
136             \p Options:
137             - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer.
138                 Default is \p %opt::v::initialized_dynamic_buffer.
139                 You may specify any type of value for the buffer since at instantiation time
140                 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
141             - \p opt::compare - priority compare functor. No default functor is provided.
142                 If the option is not specified, the \p opt::less is used.
143             - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
144             - \p opt::lock_type - lock type. Default is \p cds::sync::spin
145             - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
146             - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
147         */
148         template <typename... Options>
149         struct make_traits {
150 #   ifdef CDS_DOXYGEN_INVOKED
151             typedef implementation_defined type ;   ///< Metafunction result
152 #   else
153             typedef typename cds::opt::make_options<
154                 typename cds::opt::find_type_traits< traits, Options... >::type
155                 ,Options...
156             >::type   type;
157 #   endif
158         };
159
160     }   // namespace mspriority_queue
161
162     /// Michael & Scott array-based lock-based concurrent priority queue heap
163     /** @ingroup cds_intrusive_priority_queue
164         Source:
165             - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
166                 "An efficient algorithm for concurrent priority queue heaps"
167
168         \p %MSPriorityQueue augments the standard array-based heap data structure with
169         a mutual-exclusion lock on the heap's size and locks on each node in the heap.
170         Each node also has a tag that indicates whether
171         it is empty, valid, or in a transient state due to an update to the heap
172         by an inserting thread.
173         The algorithm allows concurrent insertions and deletions in opposite directions,
174         without risking deadlock and without the need for special server threads.
175         It also uses a "bit-reversal" technique to scatter accesses across the fringe
176         of the tree to reduce contention.
177         On large heaps the algorithm achieves significant performance improvements
178         over serialized single-lock algorithm, for various insertion/deletion
179         workloads. For small heaps it still performs well, but not as well as
180         single-lock algorithm.
181
182         Template parameters:
183         - \p T - type to be stored in the queue. The priority is a part of \p T type.
184         - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
185             It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
186             metafunction instead of \p Traits template argument.
187     */
188     template <typename T, class Traits = mspriority_queue::traits >
189     class MSPriorityQueue: public cds::bounded_container
190     {
191     public:
192         typedef T           value_type  ;   ///< Value type stored in the queue
193         typedef Traits      traits      ;   ///< Traits template parameter
194
195 #   ifdef CDS_DOXYGEN_INVOKED
196         typedef implementation_defined key_comparator  ;    ///< priority comparing functor based on opt::compare and opt::less option setter.
197 #   else
198         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
199 #   endif
200
201         typedef typename traits::lock_type lock_type;   ///< heap's size lock type
202         typedef typename traits::back_off  back_off;    ///< Back-off strategy
203         typedef typename traits::stat      stat;        ///< internal statistics type
204
205     protected:
206         //@cond
207         typedef cds::OS::ThreadId   tag_type;
208
209         enum tag_value {
210             Available   = -1,
211             Empty       = 0
212         };
213         //@endcond
214
215         //@cond
216         /// Heap item type
217         struct node {
218             value_type *        m_pVal  ;   ///< A value pointer
219             tag_type volatile   m_nTag  ;   ///< A tag
220             mutable lock_type   m_Lock  ;   ///< Node-level lock
221
222             /// Creates empty node
223             node()
224                 : m_pVal( nullptr )
225                 , m_nTag( tag_type(Empty) )
226             {}
227
228             /// Lock the node
229             void lock()
230             {
231                 m_Lock.lock();
232             }
233
234             /// Unlock the node
235             void unlock()
236             {
237                 m_Lock.unlock();
238             }
239         };
240         //@endcond
241
242     public:
243         typedef typename traits::buffer::template rebind<node, typename traits::buffer::allocator, false>::other   buffer_type ;   ///< Heap array buffer type
244
245         //@cond
246         typedef cds::bitop::bit_reverse_counter<>           item_counter_type;
247         typedef typename item_counter_type::counter_type    counter_type;
248         //@endcond
249
250     protected:
251         item_counter_type   m_ItemCounter   ;   ///< Item counter
252         mutable lock_type   m_Lock          ;   ///< Heap's size lock
253         buffer_type         m_Heap          ;   ///< Heap array
254         stat                m_Stat          ;   ///< internal statistics accumulator
255
256     public:
257         /// Constructs empty priority queue
258         /**
259             For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored.
260         */
261         MSPriorityQueue( size_t nCapacity )
262             : m_Heap( nCapacity )
263         {}
264
265         /// Clears priority queue and destructs the object
266         ~MSPriorityQueue()
267         {
268             clear();
269         }
270
271         /// Inserts a item into priority queue
272         /**
273             If the priority queue is full, the function returns \p false,
274             no item has been added.
275             Otherwise, the function inserts the pointer to \p val into the heap
276             and returns \p true.
277
278             The function does not make a copy of \p val.
279         */
280         bool push( value_type& val )
281         {
282             tag_type const curId = cds::OS::get_current_thread_id();
283
284             // Insert new item at bottom of the heap
285             m_Lock.lock();
286             if ( m_ItemCounter.value() >= capacity() ) {
287                 // the heap is full
288                 m_Lock.unlock();
289                 m_Stat.onPushFailed();
290                 return false;
291             }
292
293             counter_type i = m_ItemCounter.inc();
294             assert( i < m_Heap.capacity() );
295
296             node& refNode = m_Heap[i];
297             refNode.lock();
298             m_Lock.unlock();
299             assert( refNode.m_nTag == tag_type( Empty ));
300             assert( refNode.m_pVal == nullptr );
301             refNode.m_pVal = &val;
302             refNode.m_nTag = curId;
303             refNode.unlock();
304
305             // Move item towards top of heap while it has a higher priority than its parent
306             heapify_after_push( i, curId );
307
308             m_Stat.onPushSuccess();
309             return true;
310         }
311
312         /// Extracts item with high priority
313         /**
314             If the priority queue is empty, the function returns \p nullptr.
315             Otherwise, it returns the item extracted.
316         */
317         value_type * pop()
318         {
319             m_Lock.lock();
320             if ( m_ItemCounter.value() == 0 ) {
321                 // the heap is empty
322                 m_Lock.unlock();
323                 m_Stat.onPopFailed();
324                 return nullptr;
325             }
326             counter_type nBottom = m_ItemCounter.dec();
327             assert( nBottom < m_Heap.capacity() );
328             assert( nBottom > 0 );
329
330             node& refBottom = m_Heap[ nBottom ];
331             refBottom.lock();
332             m_Lock.unlock();
333             refBottom.m_nTag = tag_type(Empty);
334             value_type * pVal = refBottom.m_pVal;
335             refBottom.m_pVal = nullptr;
336             refBottom.unlock();
337
338             node& refTop = m_Heap[ 1 ];
339             refTop.lock();
340             if ( refTop.m_nTag == tag_type(Empty) ) {
341                 // nBottom == nTop
342                 refTop.unlock();
343                 m_Stat.onPopSuccess();
344                 return pVal;
345             }
346
347             std::swap( refTop.m_pVal, pVal );
348             refTop.m_nTag = tag_type( Available );
349
350             // refTop will be unlocked inside heapify_after_pop
351             heapify_after_pop( &refTop );
352
353             m_Stat.onPopSuccess();
354             return pVal;
355         }
356
357         /// Clears the queue (not atomic)
358         /**
359             This function is no atomic, but thread-safe
360         */
361         void clear()
362         {
363             clear_with( []( value_type const& /*src*/ ) {} );
364         }
365
366         /// Clears the queue (not atomic)
367         /**
368             This function is no atomic, but thread-safe.
369
370             For each item removed the functor \p f is called.
371             \p Func interface is:
372             \code
373                 struct clear_functor
374                 {
375                     void operator()( value_type& item );
376                 };
377             \endcode
378             A lambda function or a function pointer can be used as \p f.
379         */
380         template <typename Func>
381         void clear_with( Func f )
382         {
383             while ( !empty() ) {
384                 value_type * pVal = pop();
385                 if ( pVal )
386                     f( *pVal );
387             }
388         }
389
390         /// Checks is the priority queue is empty
391         bool empty() const
392         {
393             return size() == 0;
394         }
395
396         /// Checks if the priority queue is full
397         bool full() const
398         {
399             return size() == capacity();
400         }
401
402         /// Returns current size of priority queue
403         size_t size() const
404         {
405             std::unique_lock<lock_type> l( m_Lock );
406             return static_cast<size_t>( m_ItemCounter.value());
407         }
408
409         /// Return capacity of the priority queue
410         size_t capacity() const
411         {
412             // m_Heap[0] is not used
413             return m_Heap.capacity() - 1;
414         }
415
416         /// Returns const reference to internal statistics
417         stat const& statistics() const
418         {
419             return m_Stat;
420         }
421
422     protected:
423         //@cond
424
425         void heapify_after_push( counter_type i, tag_type curId )
426         {
427             key_comparator  cmp;
428             back_off        bkoff;
429
430             // Move item towards top of the heap while it has higher priority than parent
431             while ( i > 1 ) {
432                 bool bProgress = true;
433                 counter_type nParent = i / 2;
434                 node& refParent = m_Heap[nParent];
435                 refParent.lock();
436                 node& refItem = m_Heap[i];
437                 refItem.lock();
438
439                 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
440                     if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
441                         std::swap( refItem.m_nTag, refParent.m_nTag );
442                         std::swap( refItem.m_pVal, refParent.m_pVal );
443                         m_Stat.onPushHeapifySwap();
444                         i = nParent;
445                     }
446                     else {
447                         refItem.m_nTag = tag_type(Available);
448                         i = 0;
449                     }
450                 }
451                 else if ( refParent.m_nTag == tag_type( Empty ) ) {
452                     m_Stat.onItemMovedTop();
453                     i = 0;
454                 }
455                 else if ( refItem.m_nTag != curId ) {
456                     m_Stat.onItemMovedUp();
457                     i = nParent;
458                 }
459                 else {
460                     m_Stat.onPushEmptyPass();
461                     bProgress = false;
462                 }
463
464                 refItem.unlock();
465                 refParent.unlock();
466
467                 if ( !bProgress )
468                     bkoff();
469                 else
470                     bkoff.reset();
471             }
472
473             if ( i == 1 ) {
474                 node& refItem = m_Heap[i];
475                 refItem.lock();
476                 if ( refItem.m_nTag == curId )
477                     refItem.m_nTag = tag_type(Available);
478                 refItem.unlock();
479             }
480         }
481
482         void heapify_after_pop( node * pParent )
483         {
484             key_comparator cmp;
485             counter_type const nCapacity = m_Heap.capacity();
486
487             counter_type nParent = 1;
488             for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) {
489                 node* pChild = &m_Heap[ nChild ];
490                 pChild->lock();
491
492                 if ( pChild->m_nTag == tag_type( Empty )) {
493                     pChild->unlock();
494                     break;
495                 }
496
497                 counter_type const nRight = nChild + 1;
498                 if ( nRight < nCapacity ) {
499                     node& refRight = m_Heap[nRight];
500                     refRight.lock();
501
502                     if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) {
503                         // get right child
504                         pChild->unlock();
505                         nChild = nRight;
506                         pChild = &refRight;
507                     }
508                     else
509                         refRight.unlock();
510                 }
511
512                 // If child has higher priority than parent then swap
513                 // Otherwise stop
514                 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
515                     std::swap( pParent->m_nTag, pChild->m_nTag );
516                     std::swap( pParent->m_pVal, pChild->m_pVal );
517                     pParent->unlock();
518                     m_Stat.onPopHeapifySwap();
519                     nParent = nChild;
520                     pParent = pChild;
521                 }
522                 else {
523                     pChild->unlock();
524                     break;
525                 }
526             }
527             pParent->unlock();
528         }
529         //@endcond
530     };
531
532 }} // namespace cds::intrusive
533
534 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H