8dfbad07831fdd3b5ddb7f0a43c8bc3f55d2bbab
[libcds.git] / cds / intrusive / mspriority_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
32 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
33
34 #include <mutex>  // std::unique_lock
35 #include <cds/intrusive/details/base.h>
36 #include <cds/sync/spinlock.h>
37 #include <cds/os/thread.h>
38 #include <cds/details/bit_reverse_counter.h>
39 #include <cds/intrusive/options.h>
40 #include <cds/opt/buffer.h>
41 #include <cds/opt/compare.h>
42 #include <cds/details/bounded_container.h>
43
44 namespace cds { namespace intrusive {
45
46     /// MSPriorityQueue related definitions
47     /** @ingroup cds_intrusive_helper
48     */
49     namespace mspriority_queue {
50
51         /// MSPriorityQueue statistics
52         template <typename Counter = cds::atomicity::event_counter>
53         struct stat {
54             typedef Counter   event_counter ; ///< Event counter type
55
56             event_counter   m_nPushCount            ;   ///< Count of success push operation
57             event_counter   m_nPopCount             ;   ///< Count of success pop operation
58             event_counter   m_nPushFailCount        ;   ///< Count of failed ("the queue is full") push operation
59             event_counter   m_nPopFailCount         ;   ///< Count of failed ("the queue is empty") pop operation
60             event_counter   m_nPushHeapifySwapCount ;   ///< Count of item swapping when heapifying in push
61             event_counter   m_nPopHeapifySwapCount  ;   ///< Count of item swapping when heapifying in pop
62
63             //@cond
64             void onPushSuccess()            { ++m_nPushCount            ;}
65             void onPopSuccess()             { ++m_nPopCount             ;}
66             void onPushFailed()             { ++m_nPushFailCount        ;}
67             void onPopFailed()              { ++m_nPopFailCount         ;}
68             void onPushHeapifySwap()        { ++m_nPushHeapifySwapCount ;}
69             void onPopHeapifySwap()         { ++m_nPopHeapifySwapCount  ;}
70             //@endcond
71         };
72
73         /// MSPriorityQueue empty statistics
74         struct empty_stat {
75             //@cond
76             void onPushSuccess()            {}
77             void onPopSuccess()             {}
78             void onPushFailed()             {}
79             void onPopFailed()              {}
80             void onPushHeapifySwap()        {}
81             void onPopHeapifySwap()         {}
82             //@endcond
83         };
84
85         /// MSPriorityQueue traits
86         struct traits {
87             /// Storage type
88             /**
89                 The storage type for the heap array. Default is \p cds::opt::v::dynamic_buffer.
90
91                 You may specify any type of buffer's value since at instantiation time
92                 the \p buffer::rebind member metafunction is called to change type
93                 of values stored in the buffer.
94             */
95             typedef opt::v::dynamic_buffer<void *>  buffer;
96
97             /// Priority compare functor
98             /**
99                 No default functor is provided. If the option is not specified, the \p less is used.
100             */
101             typedef opt::none       compare;
102
103             /// Specifies binary predicate used for priority comparing.
104             /**
105                 Default is \p std::less<T>.
106             */
107             typedef opt::none       less;
108
109             /// Type of mutual-exclusion lock
110             typedef cds::sync::spin lock_type;
111
112             /// Back-off strategy
113             typedef backoff::yield      back_off;
114
115             /// Internal statistics
116             /**
117                 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
118                 or any other with interface like \p %mspriority_queue::stat
119             */
120             typedef empty_stat      stat;
121         };
122
123         /// Metafunction converting option list to traits
124         /**
125             \p Options:
126             - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::static_buffer, \p opt::v::dynamic_buffer.
127                 Default is \p %opt::v::dynamic_buffer.
128                 You may specify any type of values for the buffer since at instantiation time
129                 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
130             - \p opt::compare - priority compare functor. No default functor is provided.
131                 If the option is not specified, the \p opt::less is used.
132             - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
133             - \p opt::lock_type - lock type. Default is \p cds::sync::spin
134             - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
135             - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
136         */
137         template <typename... Options>
138         struct make_traits {
139 #   ifdef CDS_DOXYGEN_INVOKED
140             typedef implementation_defined type ;   ///< Metafunction result
141 #   else
142             typedef typename cds::opt::make_options<
143                 typename cds::opt::find_type_traits< traits, Options... >::type
144                 ,Options...
145             >::type   type;
146 #   endif
147         };
148
149     }   // namespace mspriority_queue
150
151     /// Michael & Scott array-based lock-based concurrent priority queue heap
152     /** @ingroup cds_intrusive_priority_queue
153         Source:
154             - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
155                 "An efficient algorithm for concurrent priority queue heaps"
156
157         \p %MSPriorityQueue augments the standard array-based heap data structure with
158         a mutual-exclusion lock on the heap's size and locks on each node in the heap.
159         Each node also has a tag that indicates whether
160         it is empty, valid, or in a transient state due to an update to the heap
161         by an inserting thread.
162         The algorithm allows concurrent insertions and deletions in opposite directions,
163         without risking deadlock and without the need for special server threads.
164         It also uses a "bit-reversal" technique to scatter accesses across the fringe
165         of the tree to reduce contention.
166         On large heaps the algorithm achieves significant performance improvements
167         over serialized single-lock algorithm, for various insertion/deletion
168         workloads. For small heaps it still performs well, but not as well as
169         single-lock algorithm.
170
171         Template parameters:
172         - \p T - type to be stored in the queue. The priority is a part of \p T type.
173         - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
174             It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
175             metafunction instead of \p Traits template argument.
176     */
177     template <typename T, class Traits = mspriority_queue::traits >
178     class MSPriorityQueue: public cds::bounded_container
179     {
180     public:
181         typedef T           value_type  ;   ///< Value type stored in the queue
182         typedef Traits      traits      ;   ///< Traits template parameter
183
184 #   ifdef CDS_DOXYGEN_INVOKED
185         typedef implementation_defined key_comparator  ;    ///< priority comparing functor based on opt::compare and opt::less option setter.
186 #   else
187         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
188 #   endif
189
190         typedef typename traits::lock_type lock_type       ;   ///< heap's size lock type
191         typedef typename traits::back_off  back_off        ;   ///< Back-off strategy
192         typedef typename traits::stat          stat        ;   ///< internal statistics type
193
194     protected:
195         //@cond
196         typedef cds::OS::ThreadId   tag_type;
197
198         enum tag_value {
199             Available   = -1,
200             Empty       = 0
201         };
202         //@endcond
203
204         //@cond
205         /// Heap item type
206         struct node {
207             value_type *        m_pVal  ;   ///< A value pointer
208             tag_type volatile   m_nTag  ;   ///< A tag
209             mutable lock_type   m_Lock  ;   ///< Node-level lock
210
211             /// Creates empty node
212             node()
213                 : m_pVal( nullptr )
214                 , m_nTag( tag_type(Empty) )
215             {}
216
217             /// Lock the node
218             void lock()
219             {
220                 m_Lock.lock();
221             }
222
223             /// Unlock the node
224             void unlock()
225             {
226                 m_Lock.unlock();
227             }
228         };
229         //@endcond
230
231     public:
232         typedef typename traits::buffer::template rebind<node>::other   buffer_type ;   ///< Heap array buffer type
233
234         //@cond
235         typedef cds::bitop::bit_reverse_counter<>           item_counter_type;
236         typedef typename item_counter_type::counter_type    counter_type;
237         //@endcond
238
239     protected:
240         item_counter_type   m_ItemCounter   ;   ///< Item counter
241         mutable lock_type   m_Lock          ;   ///< Heap's size lock
242         buffer_type         m_Heap          ;   ///< Heap array
243         stat                m_Stat          ;   ///< internal statistics accumulator
244
245     public:
246         /// Constructs empty priority queue
247         /**
248             For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
249         */
250         MSPriorityQueue( size_t nCapacity )
251             : m_Heap( nCapacity )
252         {}
253
254         /// Clears priority queue and destructs the object
255         ~MSPriorityQueue()
256         {
257             clear();
258         }
259
260         /// Inserts a item into priority queue
261         /**
262             If the priority queue is full, the function returns \p false,
263             no item has been added.
264             Otherwise, the function inserts the pointer to \p val into the heap
265             and returns \p true.
266
267             The function does not make a copy of \p val.
268         */
269         bool push( value_type& val )
270         {
271             tag_type const curId = cds::OS::get_current_thread_id();
272
273             // Insert new item at bottom of the heap
274             m_Lock.lock();
275             if ( m_ItemCounter.value() >= capacity() ) {
276                 // the heap is full
277                 m_Lock.unlock();
278                 m_Stat.onPushFailed();
279                 return false;
280             }
281
282             counter_type i = m_ItemCounter.inc();
283             assert( i < m_Heap.capacity() );
284
285             node& refNode = m_Heap[i];
286             refNode.lock();
287             m_Lock.unlock();
288             refNode.m_pVal = &val;
289             refNode.m_nTag = curId;
290             refNode.unlock();
291
292             // Move item towards top of the heap while it has higher priority than parent
293             heapify_after_push( i, curId );
294
295             m_Stat.onPushSuccess();
296             return true;
297         }
298
299         /// Extracts item with high priority
300         /**
301             If the priority queue is empty, the function returns \p nullptr.
302             Otherwise, it returns the item extracted.
303         */
304         value_type * pop()
305         {
306             m_Lock.lock();
307             if ( m_ItemCounter.value() == 0 ) {
308                 // the heap is empty
309                 m_Lock.unlock();
310                 m_Stat.onPopFailed();
311                 return nullptr;
312             }
313             counter_type nBottom = m_ItemCounter.reversed_value();
314             m_ItemCounter.dec();
315             // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
316             // Consequently, "<=" is here
317             assert( nBottom <= capacity() );
318             assert( nBottom > 0 );
319
320             node& refBottom = m_Heap[ nBottom ];
321             refBottom.lock();
322             m_Lock.unlock();
323             refBottom.m_nTag = tag_type(Empty);
324             value_type * pVal = refBottom.m_pVal;
325             refBottom.m_pVal = nullptr;
326             refBottom.unlock();
327
328             node& refTop = m_Heap[ 1 ];
329             refTop.lock();
330             if ( refTop.m_nTag == tag_type(Empty) ) {
331                 // nBottom == nTop
332                 refTop.unlock();
333                 m_Stat.onPopSuccess();
334                 return pVal;
335             }
336
337             std::swap( refTop.m_pVal, pVal );
338             refTop.m_nTag = tag_type( Available );
339
340             // refTop will be unlocked inside heapify_after_pop
341             heapify_after_pop( 1, &refTop );
342
343             m_Stat.onPopSuccess();
344             return pVal;
345         }
346
347         /// Clears the queue (not atomic)
348         /**
349             This function is no atomic, but thread-safe
350         */
351         void clear()
352         {
353             clear_with( []( value_type const& /*src*/ ) {} );
354         }
355
356         /// Clears the queue (not atomic)
357         /**
358             This function is no atomic, but thread-safe.
359
360             For each item removed the functor \p f is called.
361             \p Func interface is:
362             \code
363                 struct clear_functor
364                 {
365                     void operator()( value_type& item );
366                 };
367             \endcode
368             A lambda function or a function pointer can be used as \p f.
369         */
370         template <typename Func>
371         void clear_with( Func f )
372         {
373             while ( !empty() ) {
374                 value_type * pVal = pop();
375                 if ( pVal )
376                     f( *pVal );
377             }
378         }
379
380         /// Checks is the priority queue is empty
381         bool empty() const
382         {
383             return size() == 0;
384         }
385
386         /// Checks if the priority queue is full
387         bool full() const
388         {
389             return size() == capacity();
390         }
391
392         /// Returns current size of priority queue
393         size_t size() const
394         {
395             std::unique_lock<lock_type> l( m_Lock );
396             size_t nSize = (size_t) m_ItemCounter.value();
397             return nSize;
398         }
399
400         /// Return capacity of the priority queue
401         size_t capacity() const
402         {
403             // m_Heap[0] is not used
404             return m_Heap.capacity() - 1;
405         }
406
407         /// Returns const reference to internal statistics
408         stat const& statistics() const
409         {
410             return m_Stat;
411         }
412
413     protected:
414         //@cond
415
416         void heapify_after_push( counter_type i, tag_type curId )
417         {
418             key_comparator  cmp;
419             back_off        bkoff;
420
421             // Move item towards top of the heap while it has higher priority than parent
422             while ( i > 1 ) {
423                 bool bProgress = true;
424                 counter_type nParent = i / 2;
425                 node& refParent = m_Heap[nParent];
426                 refParent.lock();
427                 node& refItem = m_Heap[i];
428                 refItem.lock();
429
430                 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
431                     if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
432                         std::swap( refItem.m_nTag, refParent.m_nTag );
433                         std::swap( refItem.m_pVal, refParent.m_pVal );
434                         m_Stat.onPushHeapifySwap();
435                         i = nParent;
436                     }
437                     else {
438                         refItem.m_nTag = tag_type(Available);
439                         i = 0;
440                     }
441                 }
442                 else if ( refParent.m_nTag == tag_type(Empty) )
443                     i = 0;
444                 else if ( refItem.m_nTag != curId )
445                     i = nParent;
446                 else
447                     bProgress = false;
448
449                 refItem.unlock();
450                 refParent.unlock();
451
452                 if ( !bProgress )
453                     bkoff();
454                 else
455                     bkoff.reset();
456             }
457
458             if ( i == 1 ) {
459                 node& refItem = m_Heap[i];
460                 refItem.lock();
461                 if ( refItem.m_nTag == curId )
462                     refItem.m_nTag = tag_type(Available);
463                 refItem.unlock();
464             }
465         }
466
467         void heapify_after_pop( counter_type nParent, node * pParent )
468         {
469             key_comparator cmp;
470
471             while ( nParent < m_Heap.capacity() / 2 ) {
472                 counter_type nLeft = nParent * 2;
473                 counter_type nRight = nLeft + 1;
474                 node& refLeft = m_Heap[nLeft];
475                 node& refRight = m_Heap[nRight];
476                 refLeft.lock();
477                 refRight.lock();
478
479                 counter_type nChild;
480                 node * pChild;
481                 if ( refLeft.m_nTag == tag_type(Empty) ) {
482                     refRight.unlock();
483                     refLeft.unlock();
484                     break;
485                 }
486                 else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
487                     refRight.unlock();
488                     nChild = nLeft;
489                     pChild = &refLeft;
490                 }
491                 else {
492                     refLeft.unlock();
493                     nChild = nRight;
494                     pChild = &refRight;
495                 }
496
497                 // If child has higher priority that parent then swap
498                 // Otherwise stop
499                 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
500                     std::swap( pParent->m_nTag, pChild->m_nTag );
501                     std::swap( pParent->m_pVal, pChild->m_pVal );
502                     pParent->unlock();
503                     m_Stat.onPopHeapifySwap();
504                     nParent = nChild;
505                     pParent = pChild;
506                 }
507                 else {
508                     pChild->unlock();
509                     break;
510                 }
511             }
512             pParent->unlock();
513         }
514         //@endcond
515     };
516
517 }} // namespace cds::intrusive
518
519 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H