Merged branch 'master' of https://github.com/Nemo1369/libcds
[libcds.git] / cds / intrusive / mspriority_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
32 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
33
34 #include <mutex>  // std::unique_lock
35 #include <cds/intrusive/details/base.h>
36 #include <cds/sync/spinlock.h>
37 #include <cds/os/thread.h>
38 #include <cds/details/bit_reverse_counter.h>
39 #include <cds/intrusive/options.h>
40 #include <cds/opt/buffer.h>
41 #include <cds/opt/compare.h>
42 #include <cds/details/bounded_container.h>
43
44 namespace cds { namespace intrusive {
45
46     /// MSPriorityQueue related definitions
47     /** @ingroup cds_intrusive_helper
48     */
49     namespace mspriority_queue {
50
51         /// MSPriorityQueue statistics
52         template <typename Counter = cds::atomicity::event_counter>
53         struct stat {
54             typedef Counter   event_counter ; ///< Event counter type
55
56             event_counter   m_nPushCount;            ///< Count of success push operation
57             event_counter   m_nPopCount;             ///< Count of success pop operation
58             event_counter   m_nPushFailCount;        ///< Count of failed ("the queue is full") push operation
59             event_counter   m_nPopFailCount;         ///< Count of failed ("the queue is empty") pop operation
60             event_counter   m_nPushHeapifySwapCount; ///< Count of item swapping when heapifying in push
61             event_counter   m_nPopHeapifySwapCount;  ///< Count of item swapping when heapifying in pop
62             event_counter   m_nItemMovedTop;         ///< Count of events when \p push() encountered that inserted item was moved to top by a concurrent \p pop()
63             event_counter   m_nItemMovedUp;          ///< Count of events when \p push() encountered that inserted item was moved upwards by a concurrent \p pop()
64             event_counter   m_nPushEmptyPass;        ///< Count of empty pass during heapify via concurrent operations
65
66             //@cond
67             void onPushSuccess()            { ++m_nPushCount            ;}
68             void onPopSuccess()             { ++m_nPopCount             ;}
69             void onPushFailed()             { ++m_nPushFailCount        ;}
70             void onPopFailed()              { ++m_nPopFailCount         ;}
71             void onPushHeapifySwap()        { ++m_nPushHeapifySwapCount ;}
72             void onPopHeapifySwap()         { ++m_nPopHeapifySwapCount  ;}
73
74             void onItemMovedTop()           { ++m_nItemMovedTop         ;}
75             void onItemMovedUp()            { ++m_nItemMovedUp          ;}
76             void onPushEmptyPass()          { ++m_nPushEmptyPass        ;}
77             //@endcond
78         };
79
80         /// MSPriorityQueue empty statistics
81         struct empty_stat {
82             //@cond
83             void onPushSuccess()            const {}
84             void onPopSuccess()             const {}
85             void onPushFailed()             const {}
86             void onPopFailed()              const {}
87             void onPushHeapifySwap()        const {}
88             void onPopHeapifySwap()         const {}
89
90             void onItemMovedTop()           const {}
91             void onItemMovedUp()            const {}
92             void onPushEmptyPass()          const {}
93             //@endcond
94         };
95
96         /// MSPriorityQueue traits
97         struct traits {
98             /// Storage type
99             /**
100                 The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer.
101
102                 You may specify any type of buffer's value since at instantiation time
103                 the \p buffer::rebind member metafunction is called to change type
104                 of values stored in the buffer.
105             */
106             typedef opt::v::initialized_dynamic_buffer<void *>  buffer;
107
108             /// Priority compare functor
109             /**
110                 No default functor is provided. If the option is not specified, the \p less is used.
111             */
112             typedef opt::none       compare;
113
114             /// Specifies binary predicate used for priority comparing.
115             /**
116                 Default is \p std::less<T>.
117             */
118             typedef opt::none       less;
119
120             /// Type of mutual-exclusion lock. The lock is not need to be recursive.
121             typedef cds::sync::spin lock_type;
122
123             /// Back-off strategy
124             typedef backoff::yield      back_off;
125
126             /// Internal statistics
127             /**
128                 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
129                 or any other with interface like \p %mspriority_queue::stat
130             */
131             typedef empty_stat      stat;
132         };
133
134         /// Metafunction converting option list to traits
135         /**
136             \p Options:
137             - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer.
138                 Default is \p %opt::v::initialized_dynamic_buffer.
139                 You may specify any type of value for the buffer since at instantiation time
140                 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
141             - \p opt::compare - priority compare functor. No default functor is provided.
142                 If the option is not specified, the \p opt::less is used.
143             - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
144             - \p opt::lock_type - lock type. Default is \p cds::sync::spin
145             - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
146             - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
147         */
148         template <typename... Options>
149         struct make_traits {
150 #   ifdef CDS_DOXYGEN_INVOKED
151             typedef implementation_defined type ;   ///< Metafunction result
152 #   else
153             typedef typename cds::opt::make_options<
154                 typename cds::opt::find_type_traits< traits, Options... >::type
155                 ,Options...
156             >::type   type;
157 #   endif
158         };
159
160     }   // namespace mspriority_queue
161
162     /// Michael & Scott array-based lock-based concurrent priority queue heap
163     /** @ingroup cds_intrusive_priority_queue
164         Source:
165             - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
166                 "An efficient algorithm for concurrent priority queue heaps"
167
168         \p %MSPriorityQueue augments the standard array-based heap data structure with
169         a mutual-exclusion lock on the heap's size and locks on each node in the heap.
170         Each node also has a tag that indicates whether
171         it is empty, valid, or in a transient state due to an update to the heap
172         by an inserting thread.
173         The algorithm allows concurrent insertions and deletions in opposite directions,
174         without risking deadlock and without the need for special server threads.
175         It also uses a "bit-reversal" technique to scatter accesses across the fringe
176         of the tree to reduce contention.
177         On large heaps the algorithm achieves significant performance improvements
178         over serialized single-lock algorithm, for various insertion/deletion
179         workloads. For small heaps it still performs well, but not as well as
180         single-lock algorithm.
181
182         Template parameters:
183         - \p T - type to be stored in the queue. The priority is a part of \p T type.
184         - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
185             It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
186             metafunction instead of \p Traits template argument.
187     */
188     template <typename T, class Traits = mspriority_queue::traits >
189     class MSPriorityQueue: public cds::bounded_container
190     {
191     public:
192         typedef T           value_type  ;   ///< Value type stored in the queue
193         typedef Traits      traits      ;   ///< Traits template parameter
194
195 #   ifdef CDS_DOXYGEN_INVOKED
196         typedef implementation_defined key_comparator  ;    ///< priority comparing functor based on opt::compare and opt::less option setter.
197 #   else
198         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
199 #   endif
200
201         typedef typename traits::lock_type      lock_type;   ///< heap's size lock type
202         typedef typename traits::back_off       back_off;    ///< Back-off strategy
203         typedef typename traits::stat           stat;        ///< internal statistics type, see \p mspriority_queue::traits::stat
204         typedef typename cds::bitop::bit_reverse_counter<> item_counter;///< Item counter type
205
206     protected:
207         //@cond
208         typedef cds::OS::ThreadId   tag_type;
209
210         enum tag_value {
211             Available   = -1,
212             Empty       = 0
213         };
214         //@endcond
215
216         //@cond
217         /// Heap item type
218         struct node {
219             value_type *        m_pVal  ;   ///< A value pointer
220             tag_type volatile   m_nTag  ;   ///< A tag
221             mutable lock_type   m_Lock  ;   ///< Node-level lock
222
223             /// Creates empty node
224             node()
225                 : m_pVal( nullptr )
226                 , m_nTag( tag_type(Empty))
227             {}
228
229             /// Lock the node
230             void lock()
231             {
232                 m_Lock.lock();
233             }
234
235             /// Unlock the node
236             void unlock()
237             {
238                 m_Lock.unlock();
239             }
240         };
241         //@endcond
242
243     public:
244         typedef typename traits::buffer::template rebind<node>::other   buffer_type ;   ///< Heap array buffer type
245
246         //@cond
247         typedef typename item_counter::counter_type    counter_type;
248         //@endcond
249
250     protected:
251         item_counter        m_ItemCounter   ;   ///< Item counter
252         mutable lock_type   m_Lock          ;   ///< Heap's size lock
253         buffer_type         m_Heap          ;   ///< Heap array
254         stat                m_Stat          ;   ///< internal statistics accumulator
255
256     public:
257         /// Constructs empty priority queue
258         /**
259             For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored.
260         */
261         MSPriorityQueue( size_t nCapacity )
262             : m_Heap( nCapacity )
263         {}
264
265         /// Clears priority queue and destructs the object
266         ~MSPriorityQueue()
267         {
268             clear();
269         }
270
271         /// Inserts a item into priority queue
272         /**
273             If the priority queue is full, the function returns \p false,
274             no item has been added.
275             Otherwise, the function inserts the pointer to \p val into the heap
276             and returns \p true.
277
278             The function does not make a copy of \p val.
279         */
280         bool push( value_type& val )
281         {
282             tag_type const curId = cds::OS::get_current_thread_id();
283
284             // Insert new item at bottom of the heap
285             m_Lock.lock();
286             if ( m_ItemCounter.value() >= capacity()) {
287                 // the heap is full
288                 m_Lock.unlock();
289                 m_Stat.onPushFailed();
290                 return false;
291             }
292
293             counter_type i = m_ItemCounter.inc();
294             assert( i < m_Heap.capacity());
295
296             node& refNode = m_Heap[i];
297             refNode.lock();
298             m_Lock.unlock();
299             assert( refNode.m_nTag == tag_type( Empty ));
300             assert( refNode.m_pVal == nullptr );
301             refNode.m_pVal = &val;
302             refNode.m_nTag = curId;
303             refNode.unlock();
304
305             // Move item towards top of heap while it has a higher priority than its parent
306             heapify_after_push( i, curId );
307
308             m_Stat.onPushSuccess();
309             return true;
310         }
311
312         /// Extracts item with high priority
313         /**
314             If the priority queue is empty, the function returns \p nullptr.
315             Otherwise, it returns the item extracted.
316         */
317         value_type * pop()
318         {
319             node& refTop = m_Heap[1];
320
321             m_Lock.lock();
322             if ( m_ItemCounter.value() == 0 ) {
323                 // the heap is empty
324                 m_Lock.unlock();
325                 m_Stat.onPopFailed();
326                 return nullptr;
327             }
328             counter_type nBottom = m_ItemCounter.dec();
329             assert( nBottom < m_Heap.capacity());
330             assert( nBottom > 0 );
331
332             refTop.lock();
333             if ( nBottom == 1 ) {
334                 refTop.m_nTag = tag_type( Empty );
335                 value_type * pVal = refTop.m_pVal;
336                 refTop.m_pVal = nullptr;
337                 refTop.unlock();
338                 m_Lock.unlock();
339                 m_Stat.onPopSuccess();
340                 return pVal;
341             }
342
343             node& refBottom = m_Heap[nBottom];
344             refBottom.lock();
345             m_Lock.unlock();
346             refBottom.m_nTag = tag_type(Empty);
347             value_type * pVal = refBottom.m_pVal;
348             refBottom.m_pVal = nullptr;
349             refBottom.unlock();
350
351             if ( refTop.m_nTag == tag_type(Empty)) {
352                 // nBottom == nTop
353                 refTop.unlock();
354                 m_Stat.onPopSuccess();
355                 return pVal;
356             }
357
358             std::swap( refTop.m_pVal, pVal );
359             refTop.m_nTag = tag_type( Available );
360
361             // refTop will be unlocked inside heapify_after_pop
362             heapify_after_pop( &refTop );
363
364             m_Stat.onPopSuccess();
365             return pVal;
366         }
367
368         /// Clears the queue (not atomic)
369         /**
370             This function is no atomic, but thread-safe
371         */
372         void clear()
373         {
374             clear_with( []( value_type const& /*src*/ ) {} );
375         }
376
377         /// Clears the queue (not atomic)
378         /**
379             This function is no atomic, but thread-safe.
380
381             For each item removed the functor \p f is called.
382             \p Func interface is:
383             \code
384                 struct clear_functor
385                 {
386                     void operator()( value_type& item );
387                 };
388             \endcode
389             A lambda function or a function pointer can be used as \p f.
390         */
391         template <typename Func>
392         void clear_with( Func f )
393         {
394             value_type * pVal;
395             while (( pVal = pop()) != nullptr )
396                 f( *pVal );
397         }
398
399         /// Checks is the priority queue is empty
400         bool empty() const
401         {
402             return size() == 0;
403         }
404
405         /// Checks if the priority queue is full
406         bool full() const
407         {
408             return size() == capacity();
409         }
410
411         /// Returns current size of priority queue
412         size_t size() const
413         {
414             std::unique_lock<lock_type> l( m_Lock );
415             return static_cast<size_t>( m_ItemCounter.value());
416         }
417
418         /// Return capacity of the priority queue
419         size_t capacity() const
420         {
421             // m_Heap[0] is not used
422             return m_Heap.capacity() - 1;
423         }
424
425         /// Returns const reference to internal statistics
426         stat const& statistics() const
427         {
428             return m_Stat;
429         }
430
431     protected:
432         //@cond
433
434         void heapify_after_push( counter_type i, tag_type curId )
435         {
436             key_comparator  cmp;
437             back_off        bkoff;
438
439             // Move item towards top of the heap while it has higher priority than parent
440             while ( i > 1 ) {
441                 bool bProgress = true;
442                 counter_type nParent = i / 2;
443                 node& refParent = m_Heap[nParent];
444                 refParent.lock();
445                 node& refItem = m_Heap[i];
446                 refItem.lock();
447
448                 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
449                     if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
450                         std::swap( refItem.m_nTag, refParent.m_nTag );
451                         std::swap( refItem.m_pVal, refParent.m_pVal );
452                         m_Stat.onPushHeapifySwap();
453                         i = nParent;
454                     }
455                     else {
456                         refItem.m_nTag = tag_type(Available);
457                         i = 0;
458                     }
459                 }
460                 else if ( refParent.m_nTag == tag_type( Empty )) {
461                     m_Stat.onItemMovedTop();
462                     i = 0;
463                 }
464                 else if ( refItem.m_nTag != curId ) {
465                     m_Stat.onItemMovedUp();
466                     i = nParent;
467                 }
468                 else {
469                     m_Stat.onPushEmptyPass();
470                     bProgress = false;
471                 }
472
473                 refItem.unlock();
474                 refParent.unlock();
475
476                 if ( !bProgress )
477                     bkoff();
478                 else
479                     bkoff.reset();
480             }
481
482             if ( i == 1 ) {
483                 node& refItem = m_Heap[i];
484                 refItem.lock();
485                 if ( refItem.m_nTag == curId )
486                     refItem.m_nTag = tag_type(Available);
487                 refItem.unlock();
488             }
489         }
490
491         void heapify_after_pop( node * pParent )
492         {
493             key_comparator cmp;
494             counter_type const nCapacity = m_Heap.capacity();
495
496             counter_type nParent = 1;
497             for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) {
498                 node* pChild = &m_Heap[ nChild ];
499                 pChild->lock();
500
501                 if ( pChild->m_nTag == tag_type( Empty )) {
502                     pChild->unlock();
503                     break;
504                 }
505
506                 counter_type const nRight = nChild + 1;
507                 if ( nRight < nCapacity ) {
508                     node& refRight = m_Heap[nRight];
509                     refRight.lock();
510
511                     if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) {
512                         // get right child
513                         pChild->unlock();
514                         nChild = nRight;
515                         pChild = &refRight;
516                     }
517                     else
518                         refRight.unlock();
519                 }
520
521                 // If child has higher priority than parent then swap
522                 // Otherwise stop
523                 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
524                     std::swap( pParent->m_nTag, pChild->m_nTag );
525                     std::swap( pParent->m_pVal, pChild->m_pVal );
526                     pParent->unlock();
527                     m_Stat.onPopHeapifySwap();
528                     nParent = nChild;
529                     pParent = pChild;
530                 }
531                 else {
532                     pChild->unlock();
533                     break;
534                 }
535             }
536             pParent->unlock();
537         }
538         //@endcond
539     };
540
541 }} // namespace cds::intrusive
542
543 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H