Docfix
[libcds.git] / cds / intrusive / mspriority_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
32 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
33
34 #include <mutex>  // std::unique_lock
35 #include <cds/intrusive/details/base.h>
36 #include <cds/sync/spinlock.h>
37 #include <cds/os/thread.h>
38 #include <cds/details/bit_reverse_counter.h>
39 #include <cds/intrusive/options.h>
40 #include <cds/opt/buffer.h>
41 #include <cds/opt/compare.h>
42 #include <cds/details/bounded_container.h>
43
44 namespace cds { namespace intrusive {
45
46     /// MSPriorityQueue related definitions
47     /** @ingroup cds_intrusive_helper
48     */
49     namespace mspriority_queue {
50
51         /// MSPriorityQueue statistics
52         template <typename Counter = cds::atomicity::event_counter>
53         struct stat {
54             typedef Counter   event_counter ; ///< Event counter type
55
56             event_counter   m_nPushCount            ;   ///< Count of success push operation
57             event_counter   m_nPopCount             ;   ///< Count of success pop operation
58             event_counter   m_nPushFailCount        ;   ///< Count of failed ("the queue is full") push operation
59             event_counter   m_nPopFailCount         ;   ///< Count of failed ("the queue is empty") pop operation
60             event_counter   m_nPushHeapifySwapCount ;   ///< Count of item swapping when heapifying in push
61             event_counter   m_nPopHeapifySwapCount  ;   ///< Count of item swapping when heapifying in pop
62
63             //@cond
64             void onPushSuccess()            { ++m_nPushCount            ;}
65             void onPopSuccess()             { ++m_nPopCount             ;}
66             void onPushFailed()             { ++m_nPushFailCount        ;}
67             void onPopFailed()              { ++m_nPopFailCount         ;}
68             void onPushHeapifySwap()        { ++m_nPushHeapifySwapCount ;}
69             void onPopHeapifySwap()         { ++m_nPopHeapifySwapCount  ;}
70             //@endcond
71         };
72
73         /// MSPriorityQueue empty statistics
74         struct empty_stat {
75             //@cond
76             void onPushSuccess()            {}
77             void onPopSuccess()             {}
78             void onPushFailed()             {}
79             void onPopFailed()              {}
80             void onPushHeapifySwap()        {}
81             void onPopHeapifySwap()         {}
82             //@endcond
83         };
84
85         /// MSPriorityQueue traits
86         struct traits {
87             /// Storage type
88             /**
89                 The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer.
90
91                 You may specify any type of buffer's value since at instantiation time
92                 the \p buffer::rebind member metafunction is called to change type
93                 of values stored in the buffer.
94             */
95             typedef opt::v::initialized_dynamic_buffer<void *>  buffer;
96
97             /// Priority compare functor
98             /**
99                 No default functor is provided. If the option is not specified, the \p less is used.
100             */
101             typedef opt::none       compare;
102
103             /// Specifies binary predicate used for priority comparing.
104             /**
105                 Default is \p std::less<T>.
106             */
107             typedef opt::none       less;
108
109             /// Type of mutual-exclusion lock
110             typedef cds::sync::spin lock_type;
111
112             /// Back-off strategy
113             typedef backoff::yield      back_off;
114
115             /// Internal statistics
116             /**
117                 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
118                 or any other with interface like \p %mspriority_queue::stat
119             */
120             typedef empty_stat      stat;
121         };
122
123         /// Metafunction converting option list to traits
124         /**
125             \p Options:
126             - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer.
127                 Default is \p %opt::v::initialized_dynamic_buffer.
128                 You may specify any type of value for the buffer since at instantiation time
129                 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
130             - \p opt::compare - priority compare functor. No default functor is provided.
131                 If the option is not specified, the \p opt::less is used.
132             - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
133             - \p opt::lock_type - lock type. Default is \p cds::sync::spin
134             - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
135             - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
136         */
137         template <typename... Options>
138         struct make_traits {
139 #   ifdef CDS_DOXYGEN_INVOKED
140             typedef implementation_defined type ;   ///< Metafunction result
141 #   else
142             typedef typename cds::opt::make_options<
143                 typename cds::opt::find_type_traits< traits, Options... >::type
144                 ,Options...
145             >::type   type;
146 #   endif
147         };
148
149     }   // namespace mspriority_queue
150
151     /// Michael & Scott array-based lock-based concurrent priority queue heap
152     /** @ingroup cds_intrusive_priority_queue
153         Source:
154             - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
155                 "An efficient algorithm for concurrent priority queue heaps"
156
157         \p %MSPriorityQueue augments the standard array-based heap data structure with
158         a mutual-exclusion lock on the heap's size and locks on each node in the heap.
159         Each node also has a tag that indicates whether
160         it is empty, valid, or in a transient state due to an update to the heap
161         by an inserting thread.
162         The algorithm allows concurrent insertions and deletions in opposite directions,
163         without risking deadlock and without the need for special server threads.
164         It also uses a "bit-reversal" technique to scatter accesses across the fringe
165         of the tree to reduce contention.
166         On large heaps the algorithm achieves significant performance improvements
167         over serialized single-lock algorithm, for various insertion/deletion
168         workloads. For small heaps it still performs well, but not as well as
169         single-lock algorithm.
170
171         Template parameters:
172         - \p T - type to be stored in the queue. The priority is a part of \p T type.
173         - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
174             It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
175             metafunction instead of \p Traits template argument.
176     */
177     template <typename T, class Traits = mspriority_queue::traits >
178     class MSPriorityQueue: public cds::bounded_container
179     {
180     public:
181         typedef T           value_type  ;   ///< Value type stored in the queue
182         typedef Traits      traits      ;   ///< Traits template parameter
183
184 #   ifdef CDS_DOXYGEN_INVOKED
185         typedef implementation_defined key_comparator  ;    ///< priority comparing functor based on opt::compare and opt::less option setter.
186 #   else
187         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
188 #   endif
189
190         typedef typename traits::lock_type lock_type;   ///< heap's size lock type
191         typedef typename traits::back_off  back_off;    ///< Back-off strategy
192         typedef typename traits::stat      stat;        ///< internal statistics type
193
194     protected:
195         //@cond
196         typedef cds::OS::ThreadId   tag_type;
197
198         enum tag_value {
199             Available   = -1,
200             Empty       = 0
201         };
202         //@endcond
203
204         //@cond
205         /// Heap item type
206         struct node {
207             value_type *        m_pVal  ;   ///< A value pointer
208             tag_type volatile   m_nTag  ;   ///< A tag
209             mutable lock_type   m_Lock  ;   ///< Node-level lock
210
211             /// Creates empty node
212             node()
213                 : m_pVal( nullptr )
214                 , m_nTag( tag_type(Empty) )
215             {}
216
217             /// Lock the node
218             void lock()
219             {
220                 m_Lock.lock();
221             }
222
223             /// Unlock the node
224             void unlock()
225             {
226                 m_Lock.unlock();
227             }
228         };
229         //@endcond
230
231     public:
232         typedef typename traits::buffer::template rebind<node>::other   buffer_type ;   ///< Heap array buffer type
233
234         //@cond
235         typedef cds::bitop::bit_reverse_counter<>           item_counter_type;
236         typedef typename item_counter_type::counter_type    counter_type;
237         //@endcond
238
239     protected:
240         item_counter_type   m_ItemCounter   ;   ///< Item counter
241         mutable lock_type   m_Lock          ;   ///< Heap's size lock
242         buffer_type         m_Heap          ;   ///< Heap array
243         stat                m_Stat          ;   ///< internal statistics accumulator
244
245     public:
246         /// Constructs empty priority queue
247         /**
248             For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored.
249         */
250         MSPriorityQueue( size_t nCapacity )
251             : m_Heap( nCapacity )
252         {}
253
254         /// Clears priority queue and destructs the object
255         ~MSPriorityQueue()
256         {
257             clear();
258         }
259
260         /// Inserts a item into priority queue
261         /**
262             If the priority queue is full, the function returns \p false,
263             no item has been added.
264             Otherwise, the function inserts the pointer to \p val into the heap
265             and returns \p true.
266
267             The function does not make a copy of \p val.
268         */
269         bool push( value_type& val )
270         {
271             tag_type const curId = cds::OS::get_current_thread_id();
272
273             // Insert new item at bottom of the heap
274             m_Lock.lock();
275             if ( m_ItemCounter.value() >= capacity() ) {
276                 // the heap is full
277                 m_Lock.unlock();
278                 m_Stat.onPushFailed();
279                 return false;
280             }
281
282             counter_type i = m_ItemCounter.inc();
283             assert( i < m_Heap.capacity() );
284
285             node& refNode = m_Heap[i];
286             refNode.lock();
287             m_Lock.unlock();
288             refNode.m_pVal = &val;
289             refNode.m_nTag = curId;
290             refNode.unlock();
291
292             // Move item towards top of the heap while it has higher priority than parent
293             heapify_after_push( i, curId );
294
295             m_Stat.onPushSuccess();
296             return true;
297         }
298
299         /// Extracts item with high priority
300         /**
301             If the priority queue is empty, the function returns \p nullptr.
302             Otherwise, it returns the item extracted.
303         */
304         value_type * pop()
305         {
306             m_Lock.lock();
307             if ( m_ItemCounter.value() == 0 ) {
308                 // the heap is empty
309                 m_Lock.unlock();
310                 m_Stat.onPopFailed();
311                 return nullptr;
312             }
313             counter_type nBottom = m_ItemCounter.reversed_value();
314             m_ItemCounter.dec();
315             // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
316             // Consequently, "<=" is here
317             assert( nBottom <= capacity() );
318             assert( nBottom > 0 );
319
320             node& refBottom = m_Heap[ nBottom ];
321             refBottom.lock();
322             m_Lock.unlock();
323             refBottom.m_nTag = tag_type(Empty);
324             value_type * pVal = refBottom.m_pVal;
325             refBottom.m_pVal = nullptr;
326             refBottom.unlock();
327
328             node& refTop = m_Heap[ 1 ];
329             refTop.lock();
330             if ( refTop.m_nTag == tag_type(Empty) ) {
331                 // nBottom == nTop
332                 refTop.unlock();
333                 m_Stat.onPopSuccess();
334                 return pVal;
335             }
336
337             std::swap( refTop.m_pVal, pVal );
338             refTop.m_nTag = tag_type( Available );
339
340             // refTop will be unlocked inside heapify_after_pop
341             heapify_after_pop( 1, &refTop );
342
343             m_Stat.onPopSuccess();
344             return pVal;
345         }
346
347         /// Clears the queue (not atomic)
348         /**
349             This function is no atomic, but thread-safe
350         */
351         void clear()
352         {
353             clear_with( []( value_type const& /*src*/ ) {} );
354         }
355
356         /// Clears the queue (not atomic)
357         /**
358             This function is no atomic, but thread-safe.
359
360             For each item removed the functor \p f is called.
361             \p Func interface is:
362             \code
363                 struct clear_functor
364                 {
365                     void operator()( value_type& item );
366                 };
367             \endcode
368             A lambda function or a function pointer can be used as \p f.
369         */
370         template <typename Func>
371         void clear_with( Func f )
372         {
373             while ( !empty() ) {
374                 value_type * pVal = pop();
375                 if ( pVal )
376                     f( *pVal );
377             }
378         }
379
380         /// Checks is the priority queue is empty
381         bool empty() const
382         {
383             return size() == 0;
384         }
385
386         /// Checks if the priority queue is full
387         bool full() const
388         {
389             return size() == capacity();
390         }
391
392         /// Returns current size of priority queue
393         size_t size() const
394         {
395             std::unique_lock<lock_type> l( m_Lock );
396             return static_cast<size_t>( m_ItemCounter.value());
397         }
398
399         /// Return capacity of the priority queue
400         size_t capacity() const
401         {
402             // m_Heap[0] is not used
403             return m_Heap.capacity() - 1;
404         }
405
406         /// Returns const reference to internal statistics
407         stat const& statistics() const
408         {
409             return m_Stat;
410         }
411
412     protected:
413         //@cond
414
415         void heapify_after_push( counter_type i, tag_type curId )
416         {
417             key_comparator  cmp;
418             back_off        bkoff;
419
420             // Move item towards top of the heap while it has higher priority than parent
421             while ( i > 1 ) {
422                 bool bProgress = true;
423                 counter_type nParent = i / 2;
424                 node& refParent = m_Heap[nParent];
425                 refParent.lock();
426                 node& refItem = m_Heap[i];
427                 refItem.lock();
428
429                 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
430                     if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
431                         std::swap( refItem.m_nTag, refParent.m_nTag );
432                         std::swap( refItem.m_pVal, refParent.m_pVal );
433                         m_Stat.onPushHeapifySwap();
434                         i = nParent;
435                     }
436                     else {
437                         refItem.m_nTag = tag_type(Available);
438                         i = 0;
439                     }
440                 }
441                 else if ( refParent.m_nTag == tag_type(Empty) )
442                     i = 0;
443                 else if ( refItem.m_nTag != curId )
444                     i = nParent;
445                 else
446                     bProgress = false;
447
448                 refItem.unlock();
449                 refParent.unlock();
450
451                 if ( !bProgress )
452                     bkoff();
453                 else
454                     bkoff.reset();
455             }
456
457             if ( i == 1 ) {
458                 node& refItem = m_Heap[i];
459                 refItem.lock();
460                 if ( refItem.m_nTag == curId )
461                     refItem.m_nTag = tag_type(Available);
462                 refItem.unlock();
463             }
464         }
465
466         void heapify_after_pop( counter_type nParent, node * pParent )
467         {
468             key_comparator cmp;
469
470             while ( nParent < m_Heap.capacity() / 2 ) {
471                 counter_type nLeft = nParent * 2;
472                 counter_type nRight = nLeft + 1;
473                 node& refLeft = m_Heap[nLeft];
474                 node& refRight = m_Heap[nRight];
475                 refLeft.lock();
476                 refRight.lock();
477
478                 counter_type nChild;
479                 node * pChild;
480                 if ( refLeft.m_nTag == tag_type(Empty) ) {
481                     refRight.unlock();
482                     refLeft.unlock();
483                     break;
484                 }
485                 else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
486                     refRight.unlock();
487                     nChild = nLeft;
488                     pChild = &refLeft;
489                 }
490                 else {
491                     refLeft.unlock();
492                     nChild = nRight;
493                     pChild = &refRight;
494                 }
495
496                 // If child has higher priority that parent then swap
497                 // Otherwise stop
498                 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
499                     std::swap( pParent->m_nTag, pChild->m_nTag );
500                     std::swap( pParent->m_pVal, pChild->m_pVal );
501                     pParent->unlock();
502                     m_Stat.onPopHeapifySwap();
503                     nParent = nChild;
504                     pParent = pChild;
505                 }
506                 else {
507                     pChild->unlock();
508                     break;
509                 }
510             }
511             pParent->unlock();
512         }
513         //@endcond
514     };
515
516 }} // namespace cds::intrusive
517
518 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H