Revert prev change
[libcds.git] / cds / intrusive / mspriority_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
32 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
33
34 #include <mutex>  // std::unique_lock
35 #include <cds/intrusive/details/base.h>
36 #include <cds/sync/spinlock.h>
37 #include <cds/os/thread.h>
38 #include <cds/details/bit_reverse_counter.h>
39 #include <cds/intrusive/options.h>
40 #include <cds/opt/buffer.h>
41 #include <cds/opt/compare.h>
42 #include <cds/details/bounded_container.h>
43
44 namespace cds { namespace intrusive {
45
46     /// MSPriorityQueue related definitions
47     /** @ingroup cds_intrusive_helper
48     */
49     namespace mspriority_queue {
50
51         /// MSPriorityQueue statistics
52         template <typename Counter = cds::atomicity::event_counter>
53         struct stat {
54             typedef Counter   event_counter ; ///< Event counter type
55
56             event_counter   m_nPushCount;            ///< Count of success push operation
57             event_counter   m_nPopCount;             ///< Count of success pop operation
58             event_counter   m_nPushFailCount;        ///< Count of failed ("the queue is full") push operation
59             event_counter   m_nPopFailCount;         ///< Count of failed ("the queue is empty") pop operation
60             event_counter   m_nPushHeapifySwapCount; ///< Count of item swapping when heapifying in push
61             event_counter   m_nPopHeapifySwapCount;  ///< Count of item swapping when heapifying in pop
62             event_counter   m_nItemMovedTop;         ///< Count of events when \p push() encountered that inserted item was moved to top by a concurrent \p pop()
63             event_counter   m_nItemMovedUp;          ///< Count of events when \p push() encountered that inserted item was moved upwards by a concurrent \p pop()
64             event_counter   m_nPushEmptyPass;        ///< Count of empty pass during heapify via concurrent operations
65
66             //@cond
67             void onPushSuccess()            { ++m_nPushCount            ;}
68             void onPopSuccess()             { ++m_nPopCount             ;}
69             void onPushFailed()             { ++m_nPushFailCount        ;}
70             void onPopFailed()              { ++m_nPopFailCount         ;}
71             void onPushHeapifySwap()        { ++m_nPushHeapifySwapCount ;}
72             void onPopHeapifySwap()         { ++m_nPopHeapifySwapCount  ;}
73
74             void onItemMovedTop()           { ++m_nItemMovedTop         ;}
75             void onItemMovedUp()            { ++m_nItemMovedUp          ;}
76             void onPushEmptyPass()          { ++m_nPushEmptyPass        ;}
77             //@endcond
78         };
79
80         /// MSPriorityQueue empty statistics
81         struct empty_stat {
82             //@cond
83             void onPushSuccess()            const {}
84             void onPopSuccess()             const {}
85             void onPushFailed()             const {}
86             void onPopFailed()              const {}
87             void onPushHeapifySwap()        const {}
88             void onPopHeapifySwap()         const {}
89
90             void onItemMovedTop()           const {}
91             void onItemMovedUp()            const {}
92             void onPushEmptyPass()          const {}
93             //@endcond
94         };
95
96         /// MSPriorityQueue traits
97         struct traits {
98             /// Storage type
99             /**
100                 The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer.
101
102                 You may specify any type of buffer's value since at instantiation time
103                 the \p buffer::rebind member metafunction is called to change type
104                 of values stored in the buffer.
105             */
106             typedef opt::v::initialized_dynamic_buffer<void *, CDS_DEFAULT_ALLOCATOR, false>  buffer;
107
108             /// Priority compare functor
109             /**
110                 No default functor is provided. If the option is not specified, the \p less is used.
111             */
112             typedef opt::none       compare;
113
114             /// Specifies binary predicate used for priority comparing.
115             /**
116                 Default is \p std::less<T>.
117             */
118             typedef opt::none       less;
119
120             /// Type of mutual-exclusion lock
121             typedef cds::sync::spin lock_type;
122
123             /// Back-off strategy
124             typedef backoff::yield      back_off;
125
126             /// Internal statistics
127             /**
128                 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
129                 or any other with interface like \p %mspriority_queue::stat
130             */
131             typedef empty_stat      stat;
132         };
133
134         /// Metafunction converting option list to traits
135         /**
136             \p Options:
137             - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer.
138                 Default is \p %opt::v::initialized_dynamic_buffer.
139                 You may specify any type of value for the buffer since at instantiation time
140                 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
141             - \p opt::compare - priority compare functor. No default functor is provided.
142                 If the option is not specified, the \p opt::less is used.
143             - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
144             - \p opt::lock_type - lock type. Default is \p cds::sync::spin
145             - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
146             - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
147         */
148         template <typename... Options>
149         struct make_traits {
150 #   ifdef CDS_DOXYGEN_INVOKED
151             typedef implementation_defined type ;   ///< Metafunction result
152 #   else
153             typedef typename cds::opt::make_options<
154                 typename cds::opt::find_type_traits< traits, Options... >::type
155                 ,Options...
156             >::type   type;
157 #   endif
158         };
159
160     }   // namespace mspriority_queue
161
162     /// Michael & Scott array-based lock-based concurrent priority queue heap
163     /** @ingroup cds_intrusive_priority_queue
164         Source:
165             - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
166                 "An efficient algorithm for concurrent priority queue heaps"
167
168         \p %MSPriorityQueue augments the standard array-based heap data structure with
169         a mutual-exclusion lock on the heap's size and locks on each node in the heap.
170         Each node also has a tag that indicates whether
171         it is empty, valid, or in a transient state due to an update to the heap
172         by an inserting thread.
173         The algorithm allows concurrent insertions and deletions in opposite directions,
174         without risking deadlock and without the need for special server threads.
175         It also uses a "bit-reversal" technique to scatter accesses across the fringe
176         of the tree to reduce contention.
177         On large heaps the algorithm achieves significant performance improvements
178         over serialized single-lock algorithm, for various insertion/deletion
179         workloads. For small heaps it still performs well, but not as well as
180         single-lock algorithm.
181
182         Template parameters:
183         - \p T - type to be stored in the queue. The priority is a part of \p T type.
184         - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
185             It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
186             metafunction instead of \p Traits template argument.
187     */
188     template <typename T, class Traits = mspriority_queue::traits >
189     class MSPriorityQueue: public cds::bounded_container
190     {
191     public:
192         typedef T           value_type  ;   ///< Value type stored in the queue
193         typedef Traits      traits      ;   ///< Traits template parameter
194
195 #   ifdef CDS_DOXYGEN_INVOKED
196         typedef implementation_defined key_comparator  ;    ///< priority comparing functor based on opt::compare and opt::less option setter.
197 #   else
198         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
199 #   endif
200
201         typedef typename traits::lock_type lock_type;   ///< heap's size lock type
202         typedef typename traits::back_off  back_off;    ///< Back-off strategy
203         typedef typename traits::stat      stat;        ///< internal statistics type
204
205     protected:
206         //@cond
207         typedef cds::OS::ThreadId   tag_type;
208
209         enum tag_value {
210             Available   = -1,
211             Empty       = 0
212         };
213         //@endcond
214
215         //@cond
216         /// Heap item type
217         struct node {
218             value_type *        m_pVal  ;   ///< A value pointer
219             tag_type volatile   m_nTag  ;   ///< A tag
220             mutable lock_type   m_Lock  ;   ///< Node-level lock
221
222             /// Creates empty node
223             node()
224                 : m_pVal( nullptr )
225                 , m_nTag( tag_type(Empty) )
226             {}
227
228             /// Lock the node
229             void lock()
230             {
231                 m_Lock.lock();
232             }
233
234             /// Unlock the node
235             void unlock()
236             {
237                 m_Lock.unlock();
238             }
239         };
240         //@endcond
241
242     public:
243         typedef typename traits::buffer::template rebind<node>::other   buffer_type ;   ///< Heap array buffer type
244
245         //@cond
246         typedef cds::bitop::bit_reverse_counter<>           item_counter_type;
247         typedef typename item_counter_type::counter_type    counter_type;
248         //@endcond
249
250     protected:
251         item_counter_type   m_ItemCounter   ;   ///< Item counter
252         mutable lock_type   m_Lock          ;   ///< Heap's size lock
253         buffer_type         m_Heap          ;   ///< Heap array
254         stat                m_Stat          ;   ///< internal statistics accumulator
255
256     public:
257         /// Constructs empty priority queue
258         /**
259             For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored.
260         */
261         MSPriorityQueue( size_t nCapacity )
262             : m_Heap( nCapacity )
263         {}
264
265         /// Clears priority queue and destructs the object
266         ~MSPriorityQueue()
267         {
268             clear();
269         }
270
271         /// Inserts a item into priority queue
272         /**
273             If the priority queue is full, the function returns \p false,
274             no item has been added.
275             Otherwise, the function inserts the pointer to \p val into the heap
276             and returns \p true.
277
278             The function does not make a copy of \p val.
279         */
280         bool push( value_type& val )
281         {
282             tag_type const curId = cds::OS::get_current_thread_id();
283
284             // Insert new item at bottom of the heap
285             m_Lock.lock();
286             if ( m_ItemCounter.value() >= capacity() ) {
287                 // the heap is full
288                 m_Lock.unlock();
289                 m_Stat.onPushFailed();
290                 return false;
291             }
292
293             counter_type i = m_ItemCounter.inc();
294             assert( i < m_Heap.capacity() );
295
296             node& refNode = m_Heap[i];
297             refNode.lock();
298             m_Lock.unlock();
299             assert( refNode.m_nTag == tag_type( Empty ));
300             assert( refNode.m_pVal == nullptr );
301             refNode.m_pVal = &val;
302             refNode.m_nTag = curId;
303             refNode.unlock();
304
305             // Move item towards top of heap while it has a higher priority than its parent
306             heapify_after_push( i, curId );
307
308             m_Stat.onPushSuccess();
309             return true;
310         }
311
312         /// Extracts item with high priority
313         /**
314             If the priority queue is empty, the function returns \p nullptr.
315             Otherwise, it returns the item extracted.
316         */
317         value_type * pop()
318         {
319             m_Lock.lock();
320             if ( m_ItemCounter.value() == 0 ) {
321                 // the heap is empty
322                 m_Lock.unlock();
323                 m_Stat.onPopFailed();
324                 return nullptr;
325             }
326             counter_type nBottom = m_ItemCounter.reversed_value();
327             m_ItemCounter.dec();
328             assert( nBottom < m_Heap.capacity() );
329             assert( nBottom > 0 );
330
331             node& refBottom = m_Heap[ nBottom ];
332             refBottom.lock();
333             m_Lock.unlock();
334             refBottom.m_nTag = tag_type(Empty);
335             value_type * pVal = refBottom.m_pVal;
336             refBottom.m_pVal = nullptr;
337             refBottom.unlock();
338
339             node& refTop = m_Heap[ 1 ];
340             refTop.lock();
341             if ( refTop.m_nTag == tag_type(Empty) ) {
342                 // nBottom == nTop
343                 refTop.unlock();
344                 m_Stat.onPopSuccess();
345                 return pVal;
346             }
347
348             std::swap( refTop.m_pVal, pVal );
349             refTop.m_nTag = tag_type( Available );
350
351             // refTop will be unlocked inside heapify_after_pop
352             heapify_after_pop( &refTop );
353
354             m_Stat.onPopSuccess();
355             return pVal;
356         }
357
358         /// Clears the queue (not atomic)
359         /**
360             This function is no atomic, but thread-safe
361         */
362         void clear()
363         {
364             clear_with( []( value_type const& /*src*/ ) {} );
365         }
366
367         /// Clears the queue (not atomic)
368         /**
369             This function is no atomic, but thread-safe.
370
371             For each item removed the functor \p f is called.
372             \p Func interface is:
373             \code
374                 struct clear_functor
375                 {
376                     void operator()( value_type& item );
377                 };
378             \endcode
379             A lambda function or a function pointer can be used as \p f.
380         */
381         template <typename Func>
382         void clear_with( Func f )
383         {
384             while ( !empty() ) {
385                 value_type * pVal = pop();
386                 if ( pVal )
387                     f( *pVal );
388             }
389         }
390
391         /// Checks is the priority queue is empty
392         bool empty() const
393         {
394             return size() == 0;
395         }
396
397         /// Checks if the priority queue is full
398         bool full() const
399         {
400             return size() == capacity();
401         }
402
403         /// Returns current size of priority queue
404         size_t size() const
405         {
406             std::unique_lock<lock_type> l( m_Lock );
407             return static_cast<size_t>( m_ItemCounter.value());
408         }
409
410         /// Return capacity of the priority queue
411         size_t capacity() const
412         {
413             // m_Heap[0] is not used
414             return m_Heap.capacity() - 1;
415         }
416
417         /// Returns const reference to internal statistics
418         stat const& statistics() const
419         {
420             return m_Stat;
421         }
422
423     protected:
424         //@cond
425
426         void heapify_after_push( counter_type i, tag_type curId )
427         {
428             key_comparator  cmp;
429             back_off        bkoff;
430
431             // Move item towards top of the heap while it has higher priority than parent
432             while ( i > 1 ) {
433                 bool bProgress = true;
434                 counter_type nParent = i / 2;
435                 node& refParent = m_Heap[nParent];
436                 refParent.lock();
437                 node& refItem = m_Heap[i];
438                 refItem.lock();
439
440                 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
441                     if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
442                         std::swap( refItem.m_nTag, refParent.m_nTag );
443                         std::swap( refItem.m_pVal, refParent.m_pVal );
444                         m_Stat.onPushHeapifySwap();
445                         i = nParent;
446                     }
447                     else {
448                         refItem.m_nTag = tag_type(Available);
449                         i = 0;
450                     }
451                 }
452                 else if ( refParent.m_nTag == tag_type( Empty ) ) {
453                     m_Stat.onItemMovedTop();
454                     i = 0;
455                 }
456                 else if ( refItem.m_nTag != curId ) {
457                     m_Stat.onItemMovedUp();
458                     i = nParent;
459                 }
460                 else {
461                     m_Stat.onPushEmptyPass();
462                     bProgress = false;
463                 }
464
465                 refItem.unlock();
466                 refParent.unlock();
467
468                 if ( !bProgress )
469                     bkoff();
470                 else
471                     bkoff.reset();
472             }
473
474             if ( i == 1 ) {
475                 node& refItem = m_Heap[i];
476                 refItem.lock();
477                 if ( refItem.m_nTag == curId )
478                     refItem.m_nTag = tag_type(Available);
479                 refItem.unlock();
480             }
481         }
482
483         void heapify_after_pop( node * pParent )
484         {
485             key_comparator cmp;
486             counter_type const nCapacity = m_Heap.capacity();
487
488             counter_type nParent = 1;
489             for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) {
490                 node* pChild = &m_Heap[ nChild ];
491                 pChild->lock();
492
493                 if ( pChild->m_nTag == tag_type( Empty )) {
494                     pChild->unlock();
495                     break;
496                 }
497
498                 counter_type const nRight = nChild + 1;
499                 if ( nRight < nCapacity ) {
500                     node& refRight = m_Heap[nRight];
501                     refRight.lock();
502
503                     if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) {
504                         // get right child
505                         pChild->unlock();
506                         nChild = nRight;
507                         pChild = &refRight;
508                     }
509                     else
510                         refRight.unlock();
511                 }
512
513                 // If child has higher priority than parent then swap
514                 // Otherwise stop
515                 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
516                     std::swap( pParent->m_nTag, pChild->m_nTag );
517                     std::swap( pParent->m_pVal, pChild->m_pVal );
518                     pParent->unlock();
519                     m_Stat.onPopHeapifySwap();
520                     nParent = nChild;
521                     pParent = pChild;
522                 }
523                 else {
524                     pChild->unlock();
525                     break;
526                 }
527             }
528             pParent->unlock();
529         }
530         //@endcond
531     };
532
533 }} // namespace cds::intrusive
534
535 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H