Replace cds::ref/boost::ref with std::ref, remove cds::unref and cds/ref.h header
[libcds.git] / cds / intrusive / tsigas_cycle_queue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
4 #define __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
5
6 #include <functional>   // ref
7 #include <cds/intrusive/details/base.h>
8 #include <cds/cxx11_atomic.h>
9 #include <cds/details/bounded_container.h>
10 #include <cds/opt/buffer.h>
11
12 namespace cds { namespace intrusive {
13
14     /// Non-blocking cyclic queue discovered by Philippas Tsigas and Yi Zhang
15     /** @ingroup cds_intrusive_queue
16
17         Source:
18         \li [2000] Philippas Tsigas, Yi Zhang "A Simple, Fast and Scalable Non-Blocking Concurrent FIFO Queue
19             for Shared Memory Multiprocessor Systems"
20
21         Template arguments:
22         - T - data stored in queue. The queue stores pointers to passed data of type \p T.
23             <b>Restriction</b>: the queue can manage at least two-byte aligned data: the least significant bit (LSB)
24             of any pointer stored in the queue must be zero since the algorithm may use LSB
25             as a flag that marks the free cell.
26         - Options - options
27
28         \p Options are:
29         - opt::buffer - buffer to store items. Mandatory option, see option description for full list of possible types.
30         - opt::disposer - the functor used for dispose removed items. Default is opt::v::empty_disposer. This option is used
31             only in \ref clear function.
32         - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter
33         - opt::back_off - back-off strategy used. If the option is not specified, the cds::backoff::empty is used.
34         - opt::alignment - the alignment for internal queue data. Default is opt::cache_line_alignment
35         - opt::memory_model - C++ memory ordering model. Can be opt::v::relaxed_ordering (relaxed memory model, the default)
36             or opt::v::sequential_consistent (sequentially consisnent memory model).
37
38         This queue algorithm does not require any garbage collector.
39
40         \par Examples:
41         \code
42         #include <cds/intrusive/tsigas_cycle_queue.h>
43
44         struct Foo {
45             ...
46         };
47
48         // Queue of Foo pointers, capacity is 1024, statically allocated buffer:
49         typedef cds::intrusive::TsigasCycleQueue<
50             Foo
51             ,cds::opt::buffer< cds::opt::v::static_buffer< Foo, 1024 > >
52         > static_queue;
53         static_queue    stQueue;
54
55         // Queue of Foo pointers, capacity is 1024, dynamically allocated buffer:
56         typedef cds::intrusive::TsigasCycleQueue<
57             Foo
58             ,cds::opt::buffer< cds::opt::v::dynamic_buffer< Foo > >
59         > dynamic_queue;
60         dynamic_queue    dynQueue( 1024 );
61         \endcode
62     */
63     template <typename T, typename... Options>
64     class TsigasCycleQueue: public cds::bounded_container
65     {
66         //@cond
67         struct default_options
68         {
69             typedef cds::backoff::empty         back_off;
70             typedef opt::v::empty_disposer      disposer;
71             typedef atomicity::empty_item_counter item_counter;
72             typedef opt::v::relaxed_ordering    memory_model;
73             enum { alignment = opt::cache_line_alignment };
74         };
75         //@endcond
76
77     public:
78         //@cond
79         typedef typename opt::make_options<
80             typename cds::opt::find_type_traits< default_options, Options...>::type
81             ,Options...
82         >::type   options;
83         //@endcond
84
85     public:
86         /// Rebind template arguments
87         template <typename T2, typename... Options2>
88         struct rebind {
89             typedef TsigasCycleQueue< T2, Options2...> other   ;   ///< Rebinding result
90         };
91
92     public:
93         typedef T value_type    ;   ///< type of value stored in the queue
94         typedef typename options::item_counter  item_counter;   ///< Item counter type
95         typedef typename options::disposer      disposer    ;   ///< Item disposer
96         typedef typename options::back_off      back_off    ;   ///< back-off strategy used
97         typedef typename options::memory_model  memory_model;   ///< Memory ordering. See cds::opt::memory_model option
98
99     protected:
100         //@cond
101         typedef typename options::buffer::template rebind< atomics::atomic<value_type *> >::other buffer;
102         typedef typename opt::details::alignment_setter< buffer, options::alignment >::type aligned_buffer;
103         typedef size_t index_type;
104         typedef typename opt::details::alignment_setter< atomics::atomic<index_type>, options::alignment >::type aligned_index;
105         //@endcond
106
107     protected:
108         //@cond
109         buffer          m_buffer    ;   ///< array of pointer T *, array size is equal to m_nCapacity+1
110         aligned_index   m_nHead     ;   ///< index of queue's head
111         aligned_index   m_nTail     ;   ///< index of queue's tail
112         item_counter    m_ItemCounter   ;   ///< item counter
113         //@endcond
114
115     protected:
116         //@cond
117         static CDS_CONSTEXPR value_type * free0() CDS_NOEXCEPT
118         {
119             return nullptr;
120         }
121         static CDS_CONSTEXPR value_type * free1() CDS_NOEXCEPT
122         {
123             return (value_type*) 1;
124         }
125         static bool is_free( const value_type * p ) CDS_NOEXCEPT
126         {
127             return p == free0() || p == free1();
128         }
129
130         size_t buffer_capacity() const CDS_NOEXCEPT
131         {
132             return m_buffer.capacity();
133         }
134
135         index_type modulo() const CDS_NOEXCEPT
136         {
137             return buffer_capacity() - 1;
138         }
139         //@endcond
140
141     public:
142         /// Initialize empty queue of capacity \p nCapacity
143         /**
144             For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
145
146             Note that the real capacity of queue is \p nCapacity - 2.
147         */
148         TsigasCycleQueue( size_t nCapacity = 0 )
149             : m_buffer( nCapacity )
150             , m_nHead(0)
151             , m_nTail(1)
152         {
153             m_buffer.zeroize();
154         }
155
156         /// Clears the queue
157         ~TsigasCycleQueue()
158         {
159             clear();
160         }
161
162         /// Returns queue's item count
163         /**
164             The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
165             this function always returns 0.
166         */
167         size_t size() const CDS_NOEXCEPT
168         {
169             return m_ItemCounter.value();
170         }
171
172         /// Returns capacity of cyclic buffer
173         size_t capacity() const CDS_NOEXCEPT
174         {
175             return buffer_capacity() - 2;
176         }
177
178         /// Enqueues item from the queue
179         /** @anchor cds_intrusive_TsigasQueue_enqueue
180             Returns \p true if success, \p false otherwise (for example, if queue is full)
181         */
182         bool enqueue( value_type& data )
183         {
184             value_type * pNewNode  = &data;
185             assert( (reinterpret_cast<ptr_atomic_t>( pNewNode ) & 1) == 0 );
186             back_off bkoff;
187
188             const index_type nModulo = modulo();
189
190             do {
191                 index_type te = m_nTail.load(memory_model::memory_order_acquire);
192                 index_type ate = te;
193                 value_type * tt = m_buffer[ ate ].load(memory_model::memory_order_relaxed);
194                 index_type temp = ( ate + 1 ) & nModulo ;    // next item after tail
195
196                 // Looking for actual tail
197                 while ( !is_free( tt ) ) {
198                     if ( te != m_nTail.load(memory_model::memory_order_relaxed) )    // check the tail consistency
199                         goto TryAgain;
200                     if ( temp == m_nHead.load(memory_model::memory_order_acquire) )    // queue full?
201                         break;
202                     tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
203                     ate = temp;
204                     temp = (temp + 1) & nModulo;
205                 }
206
207                 if ( te != m_nTail.load(memory_model::memory_order_relaxed) )
208                     continue;
209
210                 // Check whether queue is full
211                 if ( temp == m_nHead.load(memory_model::memory_order_acquire) ) {
212                     ate = ( temp + 1 ) & nModulo;
213                     tt = m_buffer[ ate ].load(memory_model::memory_order_relaxed);
214                     if ( !is_free( tt ) ) {
215                         return false    ;    // Queue is full
216                     }
217
218                     // help the dequeue to update head
219                     m_nHead.compare_exchange_strong( temp, ate, memory_model::memory_order_release, atomics::memory_order_relaxed );
220                     continue;
221                 }
222
223                 if ( tt == free1() )
224                     pNewNode = reinterpret_cast<value_type *>(reinterpret_cast<intptr_t>( pNewNode ) | 1);
225                 if ( te != m_nTail.load(memory_model::memory_order_relaxed) )
226                     continue;
227
228                 // get actual tail and try to enqueue new node
229                 if ( m_buffer[ate].compare_exchange_strong( tt, pNewNode, memory_model::memory_order_release, atomics::memory_order_relaxed ) ) {
230                     if ( temp % 2 == 0 )
231                         m_nTail.compare_exchange_strong( te, temp, memory_model::memory_order_release, atomics::memory_order_relaxed );
232                     ++m_ItemCounter;
233                     return true;
234                 }
235             TryAgain:;
236             } while ( bkoff(), true );
237
238             // No control path reaches this line!
239             return false;
240         }
241
242         /// Dequeues item from the queue
243         /** @anchor cds_intrusive_TsigasQueue_dequeue
244             If the queue is empty the function returns \p nullptr
245
246             Dequeue does not call value disposer. You can manually dispose returned value if it is needed.
247         */
248         value_type * dequeue()
249         {
250             back_off bkoff;
251
252             const index_type nModulo = modulo();
253             do {
254                 index_type th = m_nHead.load(memory_model::memory_order_acquire);
255                 index_type temp = ( th + 1 ) & nModulo;
256                 value_type * tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
257                 value_type * pNull;
258
259                 // find the actual head after this loop
260                 while ( is_free( tt ) ) {
261                     if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
262                         goto TryAgain;
263
264                     // two consecutive nullptr means queue empty
265                     if ( temp == m_nTail.load(memory_model::memory_order_acquire) )
266                         return nullptr;
267
268                     temp = ( temp + 1 ) & nModulo;
269                     tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
270                 }
271
272                 if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
273                     continue;
274
275                 // check whether the queue is empty
276                 if ( temp == m_nTail.load(memory_model::memory_order_acquire) ) {
277                     // help the enqueue to update end
278                     m_nTail.compare_exchange_strong( temp, (temp + 1) & nModulo, memory_model::memory_order_release, atomics::memory_order_relaxed );
279                     continue;
280                 }
281
282                 pNull = (reinterpret_cast<ptr_atomic_t>( tt ) & 1) ? free0() : free1();
283
284                 if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
285                     continue;
286
287                 // Get the actual head, null means empty
288                 if ( m_buffer[temp].compare_exchange_strong( tt, pNull, memory_model::memory_order_release, atomics::memory_order_relaxed )) {
289                     if ( temp % 2 == 0 )
290                         m_nHead.compare_exchange_strong( th, temp, memory_model::memory_order_release, atomics::memory_order_relaxed );
291                     --m_ItemCounter;
292                     return reinterpret_cast<value_type *>(reinterpret_cast<intptr_t>( tt ) & ~intptr_t(1));
293                 }
294
295             TryAgain:;
296             } while ( bkoff(), true );
297
298             // No control path reaches this line!
299             return nullptr;
300         }
301
302         /// Synonym of \ref cds_intrusive_TsigasQueue_enqueue "enqueue"
303         bool push( value_type& data )
304         {
305             return enqueue( data );
306         }
307
308         /// Synonym of \ref cds_intrusive_TsigasQueue_dequeue "dequeue"
309         value_type * pop()
310         {
311             return dequeue();
312         }
313
314         /// Checks if the queue is empty
315         bool empty() const
316         {
317             const index_type nModulo = modulo();
318
319         TryAgain:
320             index_type th = m_nHead.load(memory_model::memory_order_relaxed);
321             index_type temp = ( th + 1 ) & nModulo;
322             const value_type * tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
323
324             // find the actual head after this loop
325             while ( is_free( tt ) ) {
326                 if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
327                     goto TryAgain;
328                 // two consecutive nullptr means queue empty
329                 if ( temp == m_nTail.load(memory_model::memory_order_relaxed) )
330                     return true;
331                 temp = ( temp + 1 ) & nModulo;
332                 tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
333             }
334             return false;
335         }
336
337         /// Clears queue in lock-free manner.
338         /**
339             \p f parameter is a functor to dispose removed items.
340             The interface of \p DISPOSER is:
341             \code
342             struct myDisposer {
343                 void operator ()( T * val );
344             };
345             \endcode
346             You can pass \p disposer by reference using \p std::ref.
347             The disposer will be called immediately for each item.
348         */
349         template <typename Disposer>
350         void clear( Disposer f )
351         {
352             value_type * pv;
353             while ( (pv = pop()) != nullptr ) {
354                 f( pv );
355             }
356         }
357
358         /// Clears the queue
359         /**
360             This function uses the disposer that is specified in \p Options.
361         */
362         void clear()
363         {
364             clear( disposer() );
365         }
366     };
367
368 }}  // namespace cds::intrusive
369
370 #endif  // #ifndef __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H