SplitListMap, SplitListSet:
[libcds.git] / cds / intrusive / moir_queue.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_INTRUSIVE_MOIR_QUEUE_H
4 #define CDSLIB_INTRUSIVE_MOIR_QUEUE_H
5
6 #include <cds/intrusive/msqueue.h>
7
8 namespace cds { namespace intrusive {
9
10     /// A variation of Michael & Scott's lock-free queue (intrusive variant)
11     /** @ingroup cds_intrusive_queue
12         This is slightly optimized Michael & Scott's queue algorithm that overloads \ref dequeue function.
13
14         Source:
15             - [2000] Simon Doherty, Lindsay Groves, Victor Luchangco, Mark Moir
16                 "Formal Verification of a practical lock-free queue algorithm"
17
18         Cite from this work about difference from Michael & Scott algo:
19         "Our algorithm differs from Michael and Scott\92s [MS98] in that we test whether \p Tail points to the header
20         node only <b>after</b> \p Head has been updated, so a dequeuing process reads \p Tail only once. The dequeue in
21         [MS98] performs this test before checking whether the next pointer in the dummy node is null, which
22         means that it reads \p Tail every time a dequeuing process loops. Under high load, when operations retry
23         frequently, our modification will reduce the number of accesses to global memory. This modification, however,
24         introduces the possibility of \p Head and \p Tail \93crossing\94."
25
26         Explanation of template arguments see intrusive::MSQueue.
27
28         \par Examples
29         \code
30         #include <cds/intrusive/moir_queue.h>
31         #include <cds/gc/hp.h>
32
33         namespace ci = cds::inrtusive;
34         typedef cds::gc::HP hp_gc;
35
36         // MoirQueue with Hazard Pointer garbage collector, base hook + item disposer:
37         struct Foo: public ci::msqueue::node< hp_gc >
38         {
39             // Your data
40             ...
41         };
42
43         // Disposer for Foo struct just deletes the object passed in
44         struct fooDisposer {
45             void operator()( Foo * p )
46             {
47                 delete p;
48             }
49         };
50
51         typedef ci::MoirQueue<
52             hp_gc
53             ,Foo
54             typename ci::msqueue::make_traits<
55                 ,ci::opt::hook<
56                     ci::msqueue::base_hook< ci::opt::gc<hp_gc> >
57                 >
58                 ,ci::opt::disposer< fooDisposer >
59             >::type
60         > fooQueue;
61
62         // MoirQueue with Hazard Pointer garbage collector,
63         // member hook + item disposer + item counter,
64         // without padding of internal queue data:
65         struct Bar
66         {
67             // Your data
68             ...
69             ci::msqueue::node< hp_gc > hMember;
70         };
71
72         struct barQueueTraits: public ci::msqueue::traits
73         {
74             typedef ci::msqueue::member_hook< offsetof(Bar, hMember), ,ci::opt::gc<hp_gc> > hook;
75             typedef fooDisposer disposer;
76             typedef cds::atomicity::item_counter item_counter;
77             enum { padding = cds::opt::no_special_padding };
78         };
79         typedef ci::MoirQueue< hp_gc, Bar, barQueueTraits > barQueue;
80         \endcode
81     */
82     template <typename GC, typename T, typename Traits = cds::intrusive::msqueue::traits>
83     class MoirQueue: public MSQueue< GC, T, Traits >
84     {
85         //@cond
86         typedef MSQueue< GC, T, Traits > base_class;
87         typedef typename base_class::node_type node_type;
88         //@endcond
89
90     public:
91         //@cond
92         typedef typename base_class::value_type value_type;
93         typedef typename base_class::back_off   back_off;
94         typedef typename base_class::gc         gc;
95         typedef typename base_class::node_traits node_traits;
96         typedef typename base_class::memory_model   memory_model;
97         //@endcond
98
99         /// Rebind template arguments
100         template < typename GC2, typename T2, typename Traits2 >
101         struct rebind {
102             typedef MoirQueue< GC2, T2, Traits2> other   ;   ///< Rebinding result
103         };
104
105     protected:
106         //@cond
107         typedef typename base_class::dequeue_result dequeue_result;
108
109         bool do_dequeue( dequeue_result& res )
110         {
111             back_off bkoff;
112
113             node_type * pNext;
114             node_type * h;
115             while ( true ) {
116                 h = res.guards.protect( 0, base_class::m_pHead, []( node_type * p ) -> value_type * { return node_traits::to_value_ptr( p );});
117                 pNext = res.guards.protect( 1, h->m_pNext, []( node_type * p ) -> value_type * { return node_traits::to_value_ptr( p );});
118
119                 if ( pNext == nullptr ) {
120                     base_class::m_Stat.onEmptyDequeue();
121                     return false;    // queue is empty
122                 }
123
124                 if ( base_class::m_pHead.compare_exchange_strong( h, pNext, memory_model::memory_order_release, atomics::memory_order_relaxed )) {
125                     node_type * t = base_class::m_pTail.load(memory_model::memory_order_acquire);
126                     if ( h == t )
127                         base_class::m_pTail.compare_exchange_strong( t, pNext, memory_model::memory_order_release, atomics::memory_order_relaxed );
128                     break;
129                 }
130
131                 base_class::m_Stat.onDequeueRace();
132                 bkoff();
133             }
134
135             --base_class::m_ItemCounter;
136             base_class::m_Stat.onDequeue();
137
138             res.pHead = h;
139             res.pNext = pNext;
140             return true;
141         }
142         //@endcond
143
144     public:
145         /// Dequeues a value from the queue
146         /** @anchor cds_intrusive_MoirQueue_dequeue
147             See warning about item disposing in \p MSQueue::dequeue.
148         */
149         value_type * dequeue()
150         {
151             dequeue_result res;
152             if ( do_dequeue( res )) {
153                 base_class::dispose_result( res );
154                 return node_traits::to_value_ptr( *res.pNext );
155             }
156             return nullptr;
157         }
158
159         /// Synonym for \ref cds_intrusive_MoirQueue_dequeue "dequeue" function
160         value_type * pop()
161         {
162             return dequeue();
163         }
164     };
165
166 }} // namespace cds::intrusive
167
168 #endif // #ifndef CDSLIB_INTRUSIVE_MOIR_QUEUE_H