Merge branch 'dev' into integration
[libcds.git] / cds / intrusive / mspriority_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
32 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
33
34 #include <mutex>  // std::unique_lock
35 #include <cds/intrusive/details/base.h>
36 #include <cds/sync/spinlock.h>
37 #include <cds/os/thread.h>
38 #include <cds/details/bit_reverse_counter.h>
39 #include <cds/intrusive/options.h>
40 #include <cds/opt/buffer.h>
41 #include <cds/opt/compare.h>
42 #include <cds/details/bounded_container.h>
43
44 namespace cds { namespace intrusive {
45
46     /// MSPriorityQueue related definitions
47     /** @ingroup cds_intrusive_helper
48     */
49     namespace mspriority_queue {
50
51         /// MSPriorityQueue statistics
52         template <typename Counter = cds::atomicity::event_counter>
53         struct stat {
54             typedef Counter   event_counter ; ///< Event counter type
55
56             event_counter   m_nPushCount;            ///< Count of success push operation
57             event_counter   m_nPopCount;             ///< Count of success pop operation
58             event_counter   m_nPushFailCount;        ///< Count of failed ("the queue is full") push operation
59             event_counter   m_nPopFailCount;         ///< Count of failed ("the queue is empty") pop operation
60             event_counter   m_nPushHeapifySwapCount; ///< Count of item swapping when heapifying in push
61             event_counter   m_nPopHeapifySwapCount;  ///< Count of item swapping when heapifying in pop
62             event_counter   m_nItemMovedTop;         ///< Count of events when \p push() encountered that inserted item was moved to top by a concurrent \p pop()
63             event_counter   m_nItemMovedUp;          ///< Count of events when \p push() encountered that inserted item was moved upwards by a concurrent \p pop()
64             event_counter   m_nPushEmptyPass;        ///< Count of empty pass during heapify via concurrent operations
65
66             //@cond
67             void onPushSuccess()            { ++m_nPushCount            ;}
68             void onPopSuccess()             { ++m_nPopCount             ;}
69             void onPushFailed()             { ++m_nPushFailCount        ;}
70             void onPopFailed()              { ++m_nPopFailCount         ;}
71             void onPushHeapifySwap()        { ++m_nPushHeapifySwapCount ;}
72             void onPopHeapifySwap()         { ++m_nPopHeapifySwapCount  ;}
73
74             void onItemMovedTop()           { ++m_nItemMovedTop         ;}
75             void onItemMovedUp()            { ++m_nItemMovedUp          ;}
76             void onPushEmptyPass()          { ++m_nPushEmptyPass        ;}
77             //@endcond
78         };
79
80         /// MSPriorityQueue empty statistics
81         struct empty_stat {
82             //@cond
83             void onPushSuccess()            const {}
84             void onPopSuccess()             const {}
85             void onPushFailed()             const {}
86             void onPopFailed()              const {}
87             void onPushHeapifySwap()        const {}
88             void onPopHeapifySwap()         const {}
89
90             void onItemMovedTop()           const {}
91             void onItemMovedUp()            const {}
92             void onPushEmptyPass()          const {}
93             //@endcond
94         };
95
96         /// Monotonic item counter, see \p traits::item_counter for explanation
97         class monotonic_counter
98         {
99         //@cond
100         public:
101             typedef size_t counter_type;
102
103             monotonic_counter()
104                 : m_nCounter(0)
105             {}
106
107             size_t inc()
108             {
109                 return ++m_nCounter;
110             }
111
112             size_t dec()
113             {
114                 return m_nCounter--;
115             }
116
117             size_t value() const
118             {
119                 return m_nCounter;
120             }
121
122         private:
123             size_t m_nCounter;
124         //@endcond
125         };
126
127         /// MSPriorityQueue traits
128         struct traits {
129             /// Storage type
130             /**
131                 The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer.
132
133                 You may specify any type of buffer's value since at instantiation time
134                 the \p buffer::rebind member metafunction is called to change type
135                 of values stored in the buffer.
136             */
137             typedef opt::v::initialized_dynamic_buffer<void *>  buffer;
138
139             /// Priority compare functor
140             /**
141                 No default functor is provided. If the option is not specified, the \p less is used.
142             */
143             typedef opt::none       compare;
144
145             /// Specifies binary predicate used for priority comparing.
146             /**
147                 Default is \p std::less<T>.
148             */
149             typedef opt::none       less;
150
151             /// Type of mutual-exclusion lock. The lock is not need to be recursive.
152             typedef cds::sync::spin lock_type;
153
154             /// Back-off strategy
155             typedef backoff::yield      back_off;
156
157             /// Internal statistics
158             /**
159                 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
160                 or any other with interface like \p %mspriority_queue::stat
161             */
162             typedef empty_stat      stat;
163
164             /// Item counter type
165             /**
166                 Two type are possible:
167                 - \p cds::bitop::bit_reverse_counter - a counter described in <a href="http://www.research.ibm.com/people/m/michael/ipl-1996.pdf">original paper</a>,
168                   which was developed for reducing lock contention. However, bit-reversing technigue requires more memory than classic heapifying algorithm
169                   because of sparsing of elements: for priority queue of max size \p N the bit-reversing technique requires array size up to 2<sup>K</sup>
170                   where \p K - the nearest power of two such that <tt>2<sup>K</sup> >= N</tt>.
171                 - \p mspriority_queue::monotonic_counter - a classic monotonic item counter. This counter can lead to false sharing under high contention.
172                   By the other hand, for priority queue of max size \p N it requires \p N array size.
173
174                 By default, \p MSPriorityQueue uses \p %cds::bitop::bit_reverse_counter as described in original paper.
175             */
176             typedef cds::bitop::bit_reverse_counter<> item_counter;
177         };
178
179         /// Metafunction converting option list to traits
180         /**
181             \p Options:
182             - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer.
183                 Default is \p %opt::v::initialized_dynamic_buffer.
184                 You may specify any type of value for the buffer since at instantiation time
185                 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
186             - \p opt::compare - priority compare functor. No default functor is provided.
187                 If the option is not specified, the \p opt::less is used.
188             - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
189             - \p opt::lock_type - lock type. Default is \p cds::sync::spin
190             - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
191             - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
192             - \p opt::item_counter - an item counter type for \p MSPriorityQueue. 
193                  Available type: \p cds::bitop::bit_reverse_counter, \p mspriority_queue::monotonic_counter. See \p traits::item_counter for details.
194         */
195         template <typename... Options>
196         struct make_traits {
197 #   ifdef CDS_DOXYGEN_INVOKED
198             typedef implementation_defined type ;   ///< Metafunction result
199 #   else
200             typedef typename cds::opt::make_options<
201                 typename cds::opt::find_type_traits< traits, Options... >::type
202                 ,Options...
203             >::type   type;
204 #   endif
205         };
206
207     }   // namespace mspriority_queue
208
209     /// Michael & Scott array-based lock-based concurrent priority queue heap
210     /** @ingroup cds_intrusive_priority_queue
211         Source:
212             - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
213                 "An efficient algorithm for concurrent priority queue heaps"
214
215         \p %MSPriorityQueue augments the standard array-based heap data structure with
216         a mutual-exclusion lock on the heap's size and locks on each node in the heap.
217         Each node also has a tag that indicates whether
218         it is empty, valid, or in a transient state due to an update to the heap
219         by an inserting thread.
220         The algorithm allows concurrent insertions and deletions in opposite directions,
221         without risking deadlock and without the need for special server threads.
222         It also uses a "bit-reversal" technique to scatter accesses across the fringe
223         of the tree to reduce contention.
224         On large heaps the algorithm achieves significant performance improvements
225         over serialized single-lock algorithm, for various insertion/deletion
226         workloads. For small heaps it still performs well, but not as well as
227         single-lock algorithm.
228
229         Template parameters:
230         - \p T - type to be stored in the queue. The priority is a part of \p T type.
231         - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
232             It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
233             metafunction instead of \p Traits template argument.
234     */
235     template <typename T, class Traits = mspriority_queue::traits >
236     class MSPriorityQueue: public cds::bounded_container
237     {
238     public:
239         typedef T           value_type  ;   ///< Value type stored in the queue
240         typedef Traits      traits      ;   ///< Traits template parameter
241
242 #   ifdef CDS_DOXYGEN_INVOKED
243         typedef implementation_defined key_comparator  ;    ///< priority comparing functor based on opt::compare and opt::less option setter.
244 #   else
245         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
246 #   endif
247
248         typedef typename traits::lock_type      lock_type;   ///< heap's size lock type
249         typedef typename traits::back_off       back_off;    ///< Back-off strategy
250         typedef typename traits::stat           stat;        ///< internal statistics type, see \p mspriority_queue::traits::stat
251         typedef typename traits::item_counter   item_counter;///< Item counter type, see \p mspriority_queue::traits::item_counter
252
253     protected:
254         //@cond
255         typedef cds::OS::ThreadId   tag_type;
256
257         enum tag_value {
258             Available   = -1,
259             Empty       = 0
260         };
261         //@endcond
262
263         //@cond
264         /// Heap item type
265         struct node {
266             value_type *        m_pVal  ;   ///< A value pointer
267             tag_type volatile   m_nTag  ;   ///< A tag
268             mutable lock_type   m_Lock  ;   ///< Node-level lock
269
270             /// Creates empty node
271             node()
272                 : m_pVal( nullptr )
273                 , m_nTag( tag_type(Empty) )
274             {}
275
276             /// Lock the node
277             void lock()
278             {
279                 m_Lock.lock();
280             }
281
282             /// Unlock the node
283             void unlock()
284             {
285                 m_Lock.unlock();
286             }
287         };
288         //@endcond
289
290     public:
291         typedef typename traits::buffer::template rebind<node>::other   buffer_type ;   ///< Heap array buffer type
292
293         //@cond
294         typedef typename item_counter::counter_type    counter_type;
295         //@endcond
296
297     protected:
298         item_counter        m_ItemCounter   ;   ///< Item counter
299         mutable lock_type   m_Lock          ;   ///< Heap's size lock
300         buffer_type         m_Heap          ;   ///< Heap array
301         stat                m_Stat          ;   ///< internal statistics accumulator
302
303     public:
304         /// Constructs empty priority queue
305         /**
306             For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored.
307         */
308         MSPriorityQueue( size_t nCapacity )
309             : m_Heap( nCapacity )
310         {}
311
312         /// Clears priority queue and destructs the object
313         ~MSPriorityQueue()
314         {
315             clear();
316         }
317
318         /// Inserts a item into priority queue
319         /**
320             If the priority queue is full, the function returns \p false,
321             no item has been added.
322             Otherwise, the function inserts the pointer to \p val into the heap
323             and returns \p true.
324
325             The function does not make a copy of \p val.
326         */
327         bool push( value_type& val )
328         {
329             tag_type const curId = cds::OS::get_current_thread_id();
330
331             // Insert new item at bottom of the heap
332             m_Lock.lock();
333             if ( m_ItemCounter.value() >= capacity() ) {
334                 // the heap is full
335                 m_Lock.unlock();
336                 m_Stat.onPushFailed();
337                 return false;
338             }
339
340             counter_type i = m_ItemCounter.inc();
341             assert( i < m_Heap.capacity() );
342
343             node& refNode = m_Heap[i];
344             refNode.lock();
345             m_Lock.unlock();
346             assert( refNode.m_nTag == tag_type( Empty ));
347             assert( refNode.m_pVal == nullptr );
348             refNode.m_pVal = &val;
349             refNode.m_nTag = curId;
350             refNode.unlock();
351
352             // Move item towards top of heap while it has a higher priority than its parent
353             heapify_after_push( i, curId );
354
355             m_Stat.onPushSuccess();
356             return true;
357         }
358
359         /// Extracts item with high priority
360         /**
361             If the priority queue is empty, the function returns \p nullptr.
362             Otherwise, it returns the item extracted.
363         */
364         value_type * pop()
365         {
366             node& refTop = m_Heap[1];
367
368             m_Lock.lock();
369             if ( m_ItemCounter.value() == 0 ) {
370                 // the heap is empty
371                 m_Lock.unlock();
372                 m_Stat.onPopFailed();
373                 return nullptr;
374             }
375             counter_type nBottom = m_ItemCounter.dec();
376             assert( nBottom < m_Heap.capacity() );
377             assert( nBottom > 0 );
378
379             refTop.lock();
380             if ( nBottom == 1 ) {
381                 refTop.m_nTag = tag_type( Empty );
382                 value_type * pVal = refTop.m_pVal;
383                 refTop.m_pVal = nullptr;
384                 refTop.unlock();
385                 m_Lock.unlock();
386                 m_Stat.onPopSuccess();
387                 return pVal;
388             }
389
390             node& refBottom = m_Heap[nBottom];
391             refBottom.lock();
392             m_Lock.unlock();
393             refBottom.m_nTag = tag_type(Empty);
394             value_type * pVal = refBottom.m_pVal;
395             refBottom.m_pVal = nullptr;
396             refBottom.unlock();
397
398             if ( refTop.m_nTag == tag_type(Empty) ) {
399                 // nBottom == nTop
400                 refTop.unlock();
401                 m_Stat.onPopSuccess();
402                 return pVal;
403             }
404
405             std::swap( refTop.m_pVal, pVal );
406             refTop.m_nTag = tag_type( Available );
407
408             // refTop will be unlocked inside heapify_after_pop
409             heapify_after_pop( &refTop );
410
411             m_Stat.onPopSuccess();
412             return pVal;
413         }
414
415         /// Clears the queue (not atomic)
416         /**
417             This function is no atomic, but thread-safe
418         */
419         void clear()
420         {
421             clear_with( []( value_type const& /*src*/ ) {} );
422         }
423
424         /// Clears the queue (not atomic)
425         /**
426             This function is no atomic, but thread-safe.
427
428             For each item removed the functor \p f is called.
429             \p Func interface is:
430             \code
431                 struct clear_functor
432                 {
433                     void operator()( value_type& item );
434                 };
435             \endcode
436             A lambda function or a function pointer can be used as \p f.
437         */
438         template <typename Func>
439         void clear_with( Func f )
440         {
441             value_type * pVal;
442             while (( pVal = pop()) != nullptr )
443                 f( *pVal );
444         }
445
446         /// Checks is the priority queue is empty
447         bool empty() const
448         {
449             return size() == 0;
450         }
451
452         /// Checks if the priority queue is full
453         bool full() const
454         {
455             return size() == capacity();
456         }
457
458         /// Returns current size of priority queue
459         size_t size() const
460         {
461             std::unique_lock<lock_type> l( m_Lock );
462             return static_cast<size_t>( m_ItemCounter.value());
463         }
464
465         /// Return capacity of the priority queue
466         size_t capacity() const
467         {
468             // m_Heap[0] is not used
469             return m_Heap.capacity() - 1;
470         }
471
472         /// Returns const reference to internal statistics
473         stat const& statistics() const
474         {
475             return m_Stat;
476         }
477
478     protected:
479         //@cond
480
481         void heapify_after_push( counter_type i, tag_type curId )
482         {
483             key_comparator  cmp;
484             back_off        bkoff;
485
486             // Move item towards top of the heap while it has higher priority than parent
487             while ( i > 1 ) {
488                 bool bProgress = true;
489                 counter_type nParent = i / 2;
490                 node& refParent = m_Heap[nParent];
491                 refParent.lock();
492                 node& refItem = m_Heap[i];
493                 refItem.lock();
494
495                 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
496                     if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
497                         std::swap( refItem.m_nTag, refParent.m_nTag );
498                         std::swap( refItem.m_pVal, refParent.m_pVal );
499                         m_Stat.onPushHeapifySwap();
500                         i = nParent;
501                     }
502                     else {
503                         refItem.m_nTag = tag_type(Available);
504                         i = 0;
505                     }
506                 }
507                 else if ( refParent.m_nTag == tag_type( Empty ) ) {
508                     m_Stat.onItemMovedTop();
509                     i = 0;
510                 }
511                 else if ( refItem.m_nTag != curId ) {
512                     m_Stat.onItemMovedUp();
513                     i = nParent;
514                 }
515                 else {
516                     m_Stat.onPushEmptyPass();
517                     bProgress = false;
518                 }
519
520                 refItem.unlock();
521                 refParent.unlock();
522
523                 if ( !bProgress )
524                     bkoff();
525                 else
526                     bkoff.reset();
527             }
528
529             if ( i == 1 ) {
530                 node& refItem = m_Heap[i];
531                 refItem.lock();
532                 if ( refItem.m_nTag == curId )
533                     refItem.m_nTag = tag_type(Available);
534                 refItem.unlock();
535             }
536         }
537
538         void heapify_after_pop( node * pParent )
539         {
540             key_comparator cmp;
541             counter_type const nCapacity = m_Heap.capacity();
542
543             counter_type nParent = 1;
544             for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) {
545                 node* pChild = &m_Heap[ nChild ];
546                 pChild->lock();
547
548                 if ( pChild->m_nTag == tag_type( Empty )) {
549                     pChild->unlock();
550                     break;
551                 }
552
553                 counter_type const nRight = nChild + 1;
554                 if ( nRight < nCapacity ) {
555                     node& refRight = m_Heap[nRight];
556                     refRight.lock();
557
558                     if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) {
559                         // get right child
560                         pChild->unlock();
561                         nChild = nRight;
562                         pChild = &refRight;
563                     }
564                     else
565                         refRight.unlock();
566                 }
567
568                 // If child has higher priority than parent then swap
569                 // Otherwise stop
570                 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
571                     std::swap( pParent->m_nTag, pChild->m_nTag );
572                     std::swap( pParent->m_pVal, pChild->m_pVal );
573                     pParent->unlock();
574                     m_Stat.onPopHeapifySwap();
575                     nParent = nChild;
576                     pParent = pChild;
577                 }
578                 else {
579                     pChild->unlock();
580                     break;
581                 }
582             }
583             pParent->unlock();
584         }
585         //@endcond
586     };
587
588 }} // namespace cds::intrusive
589
590 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H