Move cds/intrusive/queue_stat.h to cds/intrusive/details directory
[libcds.git] / cds / intrusive / msqueue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_MSQUEUE_H
4 #define __CDS_INTRUSIVE_MSQUEUE_H
5
6 #include <type_traits>
7 #include <cds/intrusive/details/single_link_struct.h>
8 #include <cds/intrusive/details/queue_stat.h>
9 #include <cds/intrusive/details/dummy_node_holder.h>
10
11 namespace cds { namespace intrusive {
12
13     /// Michael & Scott's lock-free queue (intrusive variant)
14     /** @ingroup cds_intrusive_queue
15         Implementation of well-known Michael & Scott's queue algorithm.
16
17         \par Source:
18             [1998] Maged Michael, Michael Scott "Simple, fast, and practical non-blocking and blocking
19                    concurrent queue algorithms"
20
21         Template arguments:
22         - \p GC - garbage collector type: gc::HP, gc::HRC, gc::PTB
23         - \p T - type to be stored in the queue, should be convertible to \ref single_link::node
24         - \p Options - options
25
26         Type of node: \ref single_link::node
27
28         \p Options are:
29         - opt::hook - hook used. Possible values are: single_link::base_hook, single_link::member_hook, single_link::traits_hook.
30             If the option is not specified, <tt>single_link::base_hook<></tt> is used.
31             For Gidenstam's gc::HRC, only single_link::base_hook is supported.
32         - opt::back_off - back-off strategy used. If the option is not specified, the cds::backoff::empty is used.
33         - opt::disposer - the functor used for dispose removed items. Default is opt::v::empty_disposer. This option is used
34             in \ref dequeue function.
35         - opt::link_checker - the type of node's link fields checking. Default is \ref opt::debug_check_link
36             Note: for gc::HRC garbage collector, link checking policy is always selected as \ref opt::always_check_link.
37         - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter (no item counting feature)
38         - opt::stat - the type to gather internal statistics.
39             Possible option value are: \ref queue_stat, \ref queue_dummy_stat, user-provided class that supports queue_stat interface.
40             Default is \ref queue_dummy_stat.
41         - opt::alignment - the alignment for internal queue data. Default is opt::cache_line_alignment
42         - opt::memory_model - C++ memory ordering model. Can be opt::v::relaxed_ordering (relaxed memory model, the default)
43             or opt::v::sequential_consistent (sequentially consisnent memory model).
44
45         Garbage collecting schema \p GC must be consistent with the single_link::node GC.
46
47         \par About item disposing
48         The Michael & Scott's queue algo has a key feature: even if the queue is empty it contains one item that is "dummy" one from
49         the standpoint of the algo. See \ref dequeue function doc for explanation.
50
51         \par Examples
52         \code
53         #include <cds/intrusive/msqueue.h>
54         #include <cds/gc/hp.h>
55
56         namespace ci = cds::inrtusive;
57         typedef cds::gc::HP hp_gc;
58
59         // MSQueue with Hazard Pointer garbage collector, base hook + item disposer:
60         struct Foo: public ci::single_link::node< hp_gc >
61         {
62             // Your data
63             ...
64         };
65
66         // Disposer for Foo struct just deletes the object passed in
67         struct fooDisposer {
68             void operator()( Foo * p )
69             {
70                 delete p;
71             }
72         };
73
74         typedef ci::MSQueue< hp_gc,
75             Foo
76             ,ci::opt::hook<
77                 ci::single_link::base_hook< ci::opt::gc<hp_gc> >
78             >
79             ,ci::opt::disposer< fooDisposer >
80         > fooQueue;
81
82         // MSQueue with Hazard Pointer garbage collector,
83         // member hook + item disposer + item counter,
84         // without alignment of internal queue data:
85         struct Bar
86         {
87             // Your data
88             ...
89             ci::single_link::node< hp_gc > hMember;
90         };
91
92         typedef ci::MSQueue< hp_gc,
93             Foo
94             ,ci::opt::hook<
95                 ci::single_link::member_hook<
96                     offsetof(Bar, hMember)
97                     ,ci::opt::gc<hp_gc>
98                 >
99             >
100             ,ci::opt::disposer< fooDisposer >
101             ,cds::opt::item_counter< cds::atomicity::item_counter >
102             ,cds::opt::alignment< cds::opt::no_special_alignment >
103         > barQueue;
104         \endcode
105     */
106     template <typename GC, typename T, typename... Options>
107     class MSQueue
108     {
109         //@cond
110         struct default_options
111         {
112             typedef cds::backoff::empty             back_off;
113             typedef single_link::base_hook<>        hook;
114             typedef opt::v::empty_disposer          disposer;
115             typedef atomicity::empty_item_counter   item_counter;
116             typedef queue_dummy_stat                stat;
117             typedef opt::v::relaxed_ordering        memory_model;
118             static const opt::link_check_type link_checker = opt::debug_check_link;
119             enum { alignment = opt::cache_line_alignment };
120         };
121         //@endcond
122
123     public:
124         //@cond
125         typedef typename opt::make_options<
126             typename cds::opt::find_type_traits< default_options, Options... >::type
127             ,Options...
128         >::type   options;
129         //@endcond
130
131     public:
132         typedef T  value_type   ;   ///< type of value stored in the queue
133         typedef typename options::hook      hook        ;   ///< hook type
134         typedef typename hook::node_type    node_type   ;   ///< node type
135         typedef typename options::disposer  disposer    ;   ///< disposer used
136         typedef typename get_node_traits< value_type, node_type, hook>::type node_traits ;    ///< node traits
137         typedef typename single_link::get_link_checker< node_type, options::link_checker >::type link_checker   ;   ///< link checker
138
139         typedef GC gc          ;   ///< Garbage collector
140         typedef typename options::back_off  back_off    ;   ///< back-off strategy
141         typedef typename options::item_counter item_counter ;   ///< Item counting policy used
142         typedef typename options::stat      stat        ;   ///< Internal statistics policy used
143         typedef typename options::memory_model  memory_model ;   ///< Memory ordering. See cds::opt::memory_model option
144
145         /// Rebind template arguments
146         template <typename GC2, typename T2, typename... Options2>
147         struct rebind {
148             typedef MSQueue< GC2, T2, Options2...> other   ;   ///< Rebinding result
149         };
150
151     protected:
152         //@cond
153         struct internal_disposer
154         {
155             void operator()( value_type * p )
156             {
157                 assert( p != nullptr );
158
159                 MSQueue::clear_links( node_traits::to_node_ptr(p) );
160                 disposer()( p );
161             }
162         };
163
164         typedef intrusive::node_to_value<MSQueue> node_to_value;
165         typedef typename opt::details::alignment_setter< typename node_type::atomic_node_ptr, options::alignment >::type aligned_node_ptr;
166
167         typedef typename opt::details::alignment_setter<
168             cds::intrusive::details::dummy_node< gc, node_type>,
169             options::alignment
170         >::type    dummy_node_type;
171
172         aligned_node_ptr    m_pHead ;           ///< Queue's head pointer (cache-line aligned)
173         aligned_node_ptr    m_pTail ;           ///< Queue's tail pointer (cache-line aligned)
174         dummy_node_type     m_Dummy ;           ///< dummy node
175         item_counter        m_ItemCounter   ;   ///< Item counter
176         stat                m_Stat  ;           ///< Internal statistics
177         //@endcond
178
179         //@cond
180         struct dequeue_result {
181             typename gc::template GuardArray<2>  guards;
182
183             node_type * pHead;
184             node_type * pNext;
185         };
186
187         bool do_dequeue( dequeue_result& res )
188         {
189             node_type * pNext;
190             back_off bkoff;
191
192             node_type * h;
193             while ( true ) {
194                 h = res.guards.protect( 0, m_pHead, node_to_value() );
195                 pNext = h->m_pNext.load( memory_model::memory_order_relaxed );
196                 res.guards.assign( 1, node_to_value()( pNext ));
197                 //pNext = res.guards.protect( 1, h->m_pNext, node_to_value() );
198                 if ( m_pHead.load(memory_model::memory_order_acquire) != h )
199                     continue;
200
201                 if ( pNext == nullptr )
202                     return false ;    // empty queue
203
204                 node_type * t = m_pTail.load(memory_model::memory_order_acquire);
205                 if ( h == t ) {
206                     // It is needed to help enqueue
207                     m_pTail.compare_exchange_strong( t, pNext, memory_model::memory_order_release, atomics::memory_order_relaxed );
208                     m_Stat.onBadTail();
209                     continue;
210                 }
211
212                 if ( m_pHead.compare_exchange_strong( h, pNext, memory_model::memory_order_release, atomics::memory_order_relaxed ))
213                     break;
214
215                 m_Stat.onDequeueRace();
216                 bkoff();
217             }
218
219             --m_ItemCounter;
220             m_Stat.onDequeue();
221
222             res.pHead = h;
223             res.pNext = pNext;
224             return true;
225         }
226
227         static void clear_links( node_type * pNode )
228         {
229             pNode->m_pNext.store( nullptr, memory_model::memory_order_release );
230         }
231
232         void dispose_result( dequeue_result& res )
233         {
234             dispose_node( res.pHead );
235         }
236
237         void dispose_node( node_type * p )
238         {
239             if ( p != m_Dummy.get() ) {
240                 gc::template retire<internal_disposer>( node_traits::to_value_ptr(p) );
241             }
242             else {
243                 // We cannot clear m_Dummy here since it leads to ABA.
244                 // On the other hand, we cannot use deferred clear_links( &m_Dummy ) call via
245                 // HP retiring cycle since m_Dummy is member of MSQueue and may be destroyed
246                 // before HP retiring cycle invocation.
247                 // So, we will never clear m_Dummy for gc::HP and gc::PTB
248                 // However, gc::HRC nodes are managed by reference counting, so, we must
249                 // call HP retire cycle.
250                 m_Dummy.retire();
251             }
252         }
253         //@endcond
254
255     public:
256         /// Initializes empty queue
257         MSQueue()
258             : m_pHead( nullptr )
259             , m_pTail( nullptr )
260         {
261             // GC and node_type::gc must be the same
262             static_assert(( std::is_same<gc, typename node_type::gc>::value ), "GC and node_type::gc must be the same");
263
264             // For cds::gc::HRC, only base_hook is allowed
265             static_assert((
266                 std::conditional<
267                     std::is_same<gc, cds::gc::HRC>::value,
268                     std::is_same< typename hook::hook_type, opt::base_hook_tag >,
269                     boost::true_type
270                 >::type::value
271             ), "For cds::gc::HRC, only base_hook is allowed");
272
273             // Head/tail initialization should be made via store call
274             // since gc::HRC manages reference counting
275             m_pHead.store( m_Dummy.get(), memory_model::memory_order_relaxed );
276             m_pTail.store( m_Dummy.get(), memory_model::memory_order_relaxed );
277         }
278
279         /// Destructor clears the queue
280         /**
281             Since the Michael & Scott queue contains at least one item even
282             if the queue is empty, the destructor may call item disposer.
283         */
284         ~MSQueue()
285         {
286             clear();
287
288             node_type * pHead = m_pHead.load(memory_model::memory_order_relaxed);
289
290             assert( pHead != nullptr );
291             assert( pHead == m_pTail.load(memory_model::memory_order_relaxed) );
292
293             m_pHead.store( nullptr, memory_model::memory_order_relaxed );
294             m_pTail.store( nullptr, memory_model::memory_order_relaxed );
295
296             dispose_node( pHead );
297         }
298
299         /// Returns queue's item count
300         /**
301             The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
302             this function always returns 0.
303
304             <b>Warning</b>: even if you use real item counter and it returns 0, this fact is not mean that the queue
305             is empty. To check queue emptyness use \ref empty() method.
306         */
307         size_t size() const
308         {
309             return m_ItemCounter.value();
310         }
311
312         /// Returns reference to internal statistics
313         const stat& statistics() const
314         {
315             return m_Stat;
316         }
317
318         /// Enqueues \p val value into the queue.
319         /** @anchor cds_intrusive_MSQueue_enqueue
320             The function always returns \p true.
321         */
322         bool enqueue( value_type& val )
323         {
324             node_type * pNew = node_traits::to_node_ptr( val );
325             link_checker::is_empty( pNew );
326
327             typename gc::Guard guard;
328             back_off bkoff;
329
330             node_type * t;
331             while ( true ) {
332                 t = guard.protect( m_pTail, node_to_value() );
333
334                 node_type * pNext = t->m_pNext.load(memory_model::memory_order_acquire);
335                 if ( pNext != nullptr ) {
336                     // Tail is misplaced, advance it
337                     m_pTail.compare_exchange_weak( t, pNext, memory_model::memory_order_release, atomics::memory_order_relaxed );
338                     m_Stat.onBadTail();
339                     continue;
340                 }
341
342                 node_type * tmp = nullptr;
343                 if ( t->m_pNext.compare_exchange_strong( tmp, pNew, memory_model::memory_order_release, atomics::memory_order_relaxed ))
344                     break;
345
346                 m_Stat.onEnqueueRace();
347                 bkoff();
348             }
349             ++m_ItemCounter;
350             m_Stat.onEnqueue();
351
352             if ( !m_pTail.compare_exchange_strong( t, pNew, memory_model::memory_order_acq_rel, atomics::memory_order_relaxed ))
353                 m_Stat.onAdvanceTailFailed();
354             return true;
355         }
356
357         /// Dequeues a value from the queue
358         /** @anchor cds_intrusive_MSQueue_dequeue
359             If the queue is empty the function returns \p nullptr.
360
361             \par Warning
362             The queue algorithm has following feature: when \p dequeue is called,
363             the item returning is still queue's top, and previous top is disposed:
364
365             \code
366             before dequeuing         Dequeue               after dequeuing
367             +------------------+                           +------------------+
368       Top ->|      Item 1      |  -> Dispose Item 1        |      Item 2      | <- Top
369             +------------------+                           +------------------+
370             |      Item 2      |  -> Return Item 2         |       ...        |
371             +------------------+
372             |       ...        |
373             \endcode
374
375             \p dequeue function returns Item 2, that becomes new top of queue, and calls
376             the disposer for Item 1, that was queue's top on function entry.
377             Thus, you cannot manually delete item returned because it is still included in
378             item sequence and it has valuable link field that must not be zeroed.
379             The item may be deleted only in disposer call.
380         */
381         value_type * dequeue()
382         {
383             dequeue_result res;
384
385             if ( do_dequeue( res )) {
386                 dispose_result( res );
387
388                 return node_traits::to_value_ptr( *res.pNext );
389             }
390             return nullptr;
391         }
392
393         /// Synonym for \ref cds_intrusive_MSQueue_enqueue "enqueue" function
394         bool push( value_type& val )
395         {
396             return enqueue( val );
397         }
398
399         /// Synonym for \ref cds_intrusive_MSQueue_dequeue "dequeue" function
400         value_type * pop()
401         {
402             return dequeue();
403         }
404
405         /// Checks if the queue is empty
406         bool empty() const
407         {
408             typename gc::Guard guard;
409             return guard.protect( m_pHead, node_to_value() )->m_pNext.load( memory_model::memory_order_relaxed ) == nullptr;
410         }
411
412         /// Clear the queue
413         /**
414             The function repeatedly calls \ref dequeue until it returns \p nullptr.
415             The disposer defined in template \p Options is called for each item
416             that can be safely disposed.
417         */
418         void clear()
419         {
420             while ( dequeue() );
421         }
422     };
423
424 }} // namespace cds::intrusive
425
426 #endif // #ifndef __CDS_INTRUSIVE_MSQUEUE_H