Move libcds 1.6.0 from SVN
[libcds.git] / cds / intrusive / msqueue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_MSQUEUE_H
4 #define __CDS_INTRUSIVE_MSQUEUE_H
5
6 #include <cds/intrusive/single_link_struct.h>
7 #include <cds/intrusive/queue_stat.h>
8 #include <cds/intrusive/details/dummy_node_holder.h>
9
10 #include <cds/details/std/type_traits.h>
11
12 namespace cds { namespace intrusive {
13
14     /// Michael & Scott's lock-free queue (intrusive variant)
15     /** @ingroup cds_intrusive_queue
16         Implementation of well-known Michael & Scott's queue algorithm.
17
18         \par Source:
19             [1998] Maged Michael, Michael Scott "Simple, fast, and practical non-blocking and blocking
20                    concurrent queue algorithms"
21
22         Template arguments:
23         - \p GC - garbage collector type: gc::HP, gc::HRC, gc::PTB
24         - \p T - type to be stored in the queue, should be convertible to \ref single_link::node
25         - \p Options - options
26
27         Type of node: \ref single_link::node
28
29         \p Options are:
30         - opt::hook - hook used. Possible values are: single_link::base_hook, single_link::member_hook, single_link::traits_hook.
31             If the option is not specified, <tt>single_link::base_hook<></tt> is used.
32             For Gidenstam's gc::HRC, only single_link::base_hook is supported.
33         - opt::back_off - back-off strategy used. If the option is not specified, the cds::backoff::empty is used.
34         - opt::disposer - the functor used for dispose removed items. Default is opt::v::empty_disposer. This option is used
35             in \ref dequeue function.
36         - opt::link_checker - the type of node's link fields checking. Default is \ref opt::debug_check_link
37             Note: for gc::HRC garbage collector, link checking policy is always selected as \ref opt::always_check_link.
38         - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter (no item counting feature)
39         - opt::stat - the type to gather internal statistics.
40             Possible option value are: \ref queue_stat, \ref queue_dummy_stat, user-provided class that supports queue_stat interface.
41             Default is \ref queue_dummy_stat.
42         - opt::alignment - the alignment for internal queue data. Default is opt::cache_line_alignment
43         - opt::memory_model - C++ memory ordering model. Can be opt::v::relaxed_ordering (relaxed memory model, the default)
44             or opt::v::sequential_consistent (sequentially consisnent memory model).
45
46         Garbage collecting schema \p GC must be consistent with the single_link::node GC.
47
48         \par About item disposing
49         The Michael & Scott's queue algo has a key feature: even if the queue is empty it contains one item that is "dummy" one from
50         the standpoint of the algo. See \ref dequeue function doc for explanation.
51
52         \par Examples
53         \code
54         #include <cds/intrusive/msqueue.h>
55         #include <cds/gc/hp.h>
56
57         namespace ci = cds::inrtusive;
58         typedef cds::gc::HP hp_gc;
59
60         // MSQueue with Hazard Pointer garbage collector, base hook + item disposer:
61         struct Foo: public ci::single_link::node< hp_gc >
62         {
63             // Your data
64             ...
65         };
66
67         // Disposer for Foo struct just deletes the object passed in
68         struct fooDisposer {
69             void operator()( Foo * p )
70             {
71                 delete p;
72             }
73         };
74
75         typedef ci::MSQueue< hp_gc,
76             Foo
77             ,ci::opt::hook<
78                 ci::single_link::base_hook< ci::opt::gc<hp_gc> >
79             >
80             ,ci::opt::disposer< fooDisposer >
81         > fooQueue;
82
83         // MSQueue with Hazard Pointer garbage collector,
84         // member hook + item disposer + item counter,
85         // without alignment of internal queue data:
86         struct Bar
87         {
88             // Your data
89             ...
90             ci::single_link::node< hp_gc > hMember;
91         };
92
93         typedef ci::MSQueue< hp_gc,
94             Foo
95             ,ci::opt::hook<
96                 ci::single_link::member_hook<
97                     offsetof(Bar, hMember)
98                     ,ci::opt::gc<hp_gc>
99                 >
100             >
101             ,ci::opt::disposer< fooDisposer >
102             ,cds::opt::item_counter< cds::atomicity::item_counter >
103             ,cds::opt::alignment< cds::opt::no_special_alignment >
104         > barQueue;
105         \endcode
106     */
107     template <typename GC, typename T, CDS_DECL_OPTIONS9>
108     class MSQueue
109     {
110         //@cond
111         struct default_options
112         {
113             typedef cds::backoff::empty             back_off;
114             typedef single_link::base_hook<>        hook;
115             typedef opt::v::empty_disposer          disposer;
116             typedef atomicity::empty_item_counter   item_counter;
117             typedef queue_dummy_stat                stat;
118             typedef opt::v::relaxed_ordering        memory_model;
119             static const opt::link_check_type link_checker = opt::debug_check_link;
120             enum { alignment = opt::cache_line_alignment };
121         };
122         //@endcond
123
124     public:
125         //@cond
126         typedef typename opt::make_options<
127             typename cds::opt::find_type_traits< default_options, CDS_OPTIONS9 >::type
128             ,CDS_OPTIONS9
129         >::type   options;
130         //@endcond
131
132     public:
133         typedef T  value_type   ;   ///< type of value stored in the queue
134         typedef typename options::hook      hook        ;   ///< hook type
135         typedef typename hook::node_type    node_type   ;   ///< node type
136         typedef typename options::disposer  disposer    ;   ///< disposer used
137         typedef typename get_node_traits< value_type, node_type, hook>::type node_traits ;    ///< node traits
138         typedef typename single_link::get_link_checker< node_type, options::link_checker >::type link_checker   ;   ///< link checker
139
140         typedef GC gc          ;   ///< Garbage collector
141         typedef typename options::back_off  back_off    ;   ///< back-off strategy
142         typedef typename options::item_counter item_counter ;   ///< Item counting policy used
143         typedef typename options::stat      stat        ;   ///< Internal statistics policy used
144         typedef typename options::memory_model  memory_model ;   ///< Memory ordering. See cds::opt::memory_model option
145
146         /// Rebind template arguments
147         template <typename GC2, typename T2, CDS_DECL_OTHER_OPTIONS9>
148         struct rebind {
149             typedef MSQueue< GC2, T2, CDS_OTHER_OPTIONS9> other   ;   ///< Rebinding result
150         };
151
152     protected:
153         //@cond
154         struct internal_disposer
155         {
156             void operator()( value_type * p )
157             {
158                 assert( p != null_ptr<value_type *>());
159
160                 MSQueue::clear_links( node_traits::to_node_ptr(p) );
161                 disposer()( p );
162             }
163         };
164
165         typedef intrusive::node_to_value<MSQueue> node_to_value;
166         typedef typename opt::details::alignment_setter< typename node_type::atomic_node_ptr, options::alignment >::type aligned_node_ptr;
167
168         typedef typename opt::details::alignment_setter<
169             cds::intrusive::details::dummy_node< gc, node_type>,
170             options::alignment
171         >::type    dummy_node_type;
172
173         aligned_node_ptr    m_pHead ;           ///< Queue's head pointer (cache-line aligned)
174         aligned_node_ptr    m_pTail ;           ///< Queue's tail pointer (cache-line aligned)
175         dummy_node_type     m_Dummy ;           ///< dummy node
176         item_counter        m_ItemCounter   ;   ///< Item counter
177         stat                m_Stat  ;           ///< Internal statistics
178         //@endcond
179
180         //@cond
181         struct dequeue_result {
182             typename gc::template GuardArray<2>  guards;
183
184             node_type * pHead;
185             node_type * pNext;
186         };
187
188         bool do_dequeue( dequeue_result& res )
189         {
190             node_type * pNext;
191             back_off bkoff;
192
193             node_type * h;
194             while ( true ) {
195                 h = res.guards.protect( 0, m_pHead, node_to_value() );
196                 pNext = h->m_pNext.load( memory_model::memory_order_relaxed );
197                 res.guards.assign( 1, node_to_value()( pNext ));
198                 //pNext = res.guards.protect( 1, h->m_pNext, node_to_value() );
199                 if ( m_pHead.load(memory_model::memory_order_acquire) != h )
200                     continue;
201
202                 if ( pNext == null_ptr<node_type *>() )
203                     return false ;    // empty queue
204
205                 node_type * t = m_pTail.load(memory_model::memory_order_acquire);
206                 if ( h == t ) {
207                     // It is needed to help enqueue
208                     m_pTail.compare_exchange_strong( t, pNext, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed );
209                     m_Stat.onBadTail();
210                     continue;
211                 }
212
213                 if ( m_pHead.compare_exchange_strong( h, pNext, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed ))
214                     break;
215
216                 m_Stat.onDequeueRace();
217                 bkoff();
218             }
219
220             --m_ItemCounter;
221             m_Stat.onDequeue();
222
223             res.pHead = h;
224             res.pNext = pNext;
225             return true;
226         }
227
228         static void clear_links( node_type * pNode )
229         {
230             pNode->m_pNext.store( null_ptr<node_type *>(), memory_model::memory_order_release );
231         }
232
233         void dispose_result( dequeue_result& res )
234         {
235             dispose_node( res.pHead );
236         }
237
238         void dispose_node( node_type * p )
239         {
240             if ( p != m_Dummy.get() ) {
241                 gc::template retire<internal_disposer>( node_traits::to_value_ptr(p) );
242             }
243             else {
244                 // We cannot clear m_Dummy here since it leads to ABA.
245                 // On the other hand, we cannot use deferred clear_links( &m_Dummy ) call via
246                 // HP retiring cycle since m_Dummy is member of MSQueue and may be destroyed
247                 // before HP retiring cycle invocation.
248                 // So, we will never clear m_Dummy for gc::HP and gc::PTB
249                 // However, gc::HRC nodes are managed by reference counting, so, we must
250                 // call HP retire cycle.
251                 m_Dummy.retire();
252             }
253         }
254         //@endcond
255
256     public:
257         /// Initializes empty queue
258         MSQueue()
259             : m_pHead( null_ptr<node_type *>() )
260             , m_pTail( null_ptr<node_type *>() )
261         {
262             // GC and node_type::gc must be the same
263             static_assert(( std::is_same<gc, typename node_type::gc>::value ), "GC and node_type::gc must be the same");
264
265             // For cds::gc::HRC, only base_hook is allowed
266             static_assert((
267                 std::conditional<
268                     std::is_same<gc, cds::gc::HRC>::value,
269                     std::is_same< typename hook::hook_type, opt::base_hook_tag >,
270                     boost::true_type
271                 >::type::value
272             ), "For cds::gc::HRC, only base_hook is allowed");
273
274             // Head/tail initialization should be made via store call
275             // since gc::HRC manages reference counting
276             m_pHead.store( m_Dummy.get(), memory_model::memory_order_relaxed );
277             m_pTail.store( m_Dummy.get(), memory_model::memory_order_relaxed );
278         }
279
280         /// Destructor clears the queue
281         /**
282             Since the Michael & Scott queue contains at least one item even
283             if the queue is empty, the destructor may call item disposer.
284         */
285         ~MSQueue()
286         {
287             clear();
288
289             node_type * pHead = m_pHead.load(memory_model::memory_order_relaxed);
290
291             assert( pHead != null_ptr<node_type *>() );
292             assert( pHead == m_pTail.load(memory_model::memory_order_relaxed) );
293
294             m_pHead.store( null_ptr<node_type *>(), memory_model::memory_order_relaxed );
295             m_pTail.store( null_ptr<node_type *>(), memory_model::memory_order_relaxed );
296
297             dispose_node( pHead );
298         }
299
300         /// Returns queue's item count
301         /**
302             The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
303             this function always returns 0.
304
305             <b>Warning</b>: even if you use real item counter and it returns 0, this fact is not mean that the queue
306             is empty. To check queue emptyness use \ref empty() method.
307         */
308         size_t size() const
309         {
310             return m_ItemCounter.value();
311         }
312
313         /// Returns reference to internal statistics
314         const stat& statistics() const
315         {
316             return m_Stat;
317         }
318
319         /// Enqueues \p val value into the queue.
320         /** @anchor cds_intrusive_MSQueue_enqueue
321             The function always returns \p true.
322         */
323         bool enqueue( value_type& val )
324         {
325             node_type * pNew = node_traits::to_node_ptr( val );
326             link_checker::is_empty( pNew );
327
328             typename gc::Guard guard;
329             back_off bkoff;
330
331             node_type * t;
332             while ( true ) {
333                 t = guard.protect( m_pTail, node_to_value() );
334
335                 node_type * pNext = t->m_pNext.load(memory_model::memory_order_acquire);
336                 if ( pNext != null_ptr<node_type *>() ) {
337                     // Tail is misplaced, advance it
338                     m_pTail.compare_exchange_weak( t, pNext, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed );
339                     m_Stat.onBadTail();
340                     continue;
341                 }
342
343                 node_type * tmp = null_ptr<node_type *>();
344                 if ( t->m_pNext.compare_exchange_strong( tmp, pNew, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed ))
345                     break;
346
347                 m_Stat.onEnqueueRace();
348                 bkoff();
349             }
350             ++m_ItemCounter;
351             m_Stat.onEnqueue();
352
353             if ( !m_pTail.compare_exchange_strong( t, pNew, memory_model::memory_order_acq_rel, CDS_ATOMIC::memory_order_relaxed ))
354                 m_Stat.onAdvanceTailFailed();
355             return true;
356         }
357
358         /// Dequeues a value from the queue
359         /** @anchor cds_intrusive_MSQueue_dequeue
360             If the queue is empty the function returns \p NULL.
361
362             \par Warning
363             The queue algorithm has following feature: when \p dequeue is called,
364             the item returning is still queue's top, and previous top is disposed:
365
366             \code
367             before dequeuing         Dequeue               after dequeuing
368             +------------------+                           +------------------+
369       Top ->|      Item 1      |  -> Dispose Item 1        |      Item 2      | <- Top
370             +------------------+                           +------------------+
371             |      Item 2      |  -> Return Item 2         |       ...        |
372             +------------------+
373             |       ...        |
374             \endcode
375
376             \p dequeue function returns Item 2, that becomes new top of queue, and calls
377             the disposer for Item 1, that was queue's top on function entry.
378             Thus, you cannot manually delete item returned because it is still included in
379             item sequence and it has valuable link field that must not be zeroed.
380             The item may be deleted only in disposer call.
381         */
382         value_type * dequeue()
383         {
384             dequeue_result res;
385
386             if ( do_dequeue( res )) {
387                 dispose_result( res );
388
389                 return node_traits::to_value_ptr( *res.pNext );
390             }
391             return null_ptr<value_type *>();
392         }
393
394         /// Synonym for \ref cds_intrusive_MSQueue_enqueue "enqueue" function
395         bool push( value_type& val )
396         {
397             return enqueue( val );
398         }
399
400         /// Synonym for \ref cds_intrusive_MSQueue_dequeue "dequeue" function
401         value_type * pop()
402         {
403             return dequeue();
404         }
405
406         /// Checks if the queue is empty
407         bool empty() const
408         {
409             typename gc::Guard guard;
410             return guard.protect( m_pHead, node_to_value() )->m_pNext.load(memory_model::memory_order_relaxed) == null_ptr<node_type *>();
411         }
412
413         /// Clear the queue
414         /**
415             The function repeatedly calls \ref dequeue until it returns \p NULL.
416             The disposer defined in template \p Options is called for each item
417             that can be safely disposed.
418         */
419         void clear()
420         {
421             while ( dequeue() );
422         }
423     };
424
425 }} // namespace cds::intrusive
426
427 #endif // #ifndef __CDS_INTRUSIVE_MSQUEUE_H