202feb12aa549dd616f5a53bd7165a4eda899f91
[libcds.git] / cds / intrusive / tsigas_cycle_queue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
4 #define __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
5
6 #include <cds/intrusive/details/base.h>
7 #include <cds/algo/atomic.h>
8 #include <cds/details/bounded_container.h>
9 #include <cds/opt/buffer.h>
10
11 namespace cds { namespace intrusive {
12
13     /// TsigasCycleQueue related definitions
14     /** @ingroup cds_intrusive_helper
15     */
16     namespace tsigas_queue {
17
18         /// TsigasCycleQueue default traits
19         struct traits
20         {
21             /// Buffer type for cyclic array
22             /*
23                 The type of element for the buffer is not important: the queue rebinds
24                 buffer for required type via \p rebind metafunction.
25
26                 For \p TsigasCycleQueue queue the buffer size should have power-of-2 size.
27             */
28             typedef cds::opt::v::dynamic_buffer< void * > buffer;
29
30             /// Back-off strategy
31             typedef cds::backoff::empty         back_off;
32
33             /// The functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used for dequeuing
34             typedef opt::v::empty_disposer      disposer;
35
36             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
37             typedef atomicity::empty_item_counter item_counter;
38
39             /// C++ memory ordering model
40             /**
41                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
42                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
43             */
44             typedef opt::v::relaxed_ordering    memory_model;
45
46             /// Alignment for internal queue data. Default is \p opt::cache_line_alignment
47             enum { alignment = opt::cache_line_alignment };
48         };
49
50         /// Metafunction converting option list to \p tsigas_queue::traits
51         /**
52             Supported \p Options are:
53             - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
54                 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
55                 element in the buffer is not important: it will be changed via \p rebind metafunction.
56             - \p opt::back_off - back-off strategy used, default is \p cds::backoff::empty.
57             - \p opt::disposer - the functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used
58                 when dequeuing.
59             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
60                 To enable item counting use \p cds::atomicity::item_counter
61             - \p opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment
62             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
63                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
64
65             Example: declare \p %TsigasCycleQueue with item counting and static iternal buffer of size 1024:
66             \code
67             typedef cds::intrusive::TsigasCycleQueue< Foo,
68                 typename cds::intrusive::tsigas_queue::make_traits<
69                     cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
70                     cds::opt::item_counte< cds::atomicity::item_counter >
71                 >::type
72             > myQueue;
73             \endcode
74         */
75         template <typename... Options>
76         struct make_traits {
77 #   ifdef CDS_DOXYGEN_INVOKED
78             typedef implementation_defined type;   ///< Metafunction result
79 #   else
80             typedef typename cds::opt::make_options<
81                 typename cds::opt::find_type_traits< traits, Options... >::type
82                 , Options...
83             >::type type;
84 #   endif
85         };
86
87
88     } //namespace tsigas_queue
89
90     /// Non-blocking cyclic queue discovered by Philippas Tsigas and Yi Zhang
91     /** @ingroup cds_intrusive_queue
92
93         Source:
94         \li [2000] Philippas Tsigas, Yi Zhang "A Simple, Fast and Scalable Non-Blocking Concurrent FIFO Queue
95             for Shared Memory Multiprocessor Systems"
96
97         Template arguments:
98         - \p T - value type to be stored in queue. The queue stores pointers to passed data of type \p T.
99             <b>Restriction</b>: the queue can manage at least two-byte aligned data: the least significant bit (LSB)
100             of any pointer stored in the queue must be zero since the algorithm may use LSB
101             as a flag that marks the free cell.
102         - \p Traits - queue traits, default is \p tsigas_queue::traits. You can use \p tsigas_queue::make_traits
103             metafunction to make your traits or just derive your traits from \p %tsigas_queue::traits:
104             \code
105             struct myTraits: public cds::intrusive::tsigas_queue::traits {
106                 typedef cds::atomicity::item_counter    item_counter;
107             };
108             typedef cds::intrusive::TsigasCycleQueue< Foo, myTraits > myQueue;
109
110             // Equivalent make_traits example:
111             typedef cds::intrusive::TsigasCycleQueue< Foo,
112                 typename cds::intrusive::tsigas_queue::make_traits<
113                     cds::opt::item_counter< cds::atomicity::item_counter >
114                 >::type
115             > myQueue;
116             \endcode
117
118         This queue algorithm does not require any garbage collector.
119
120         \par Examples:
121         \code
122         #include <cds/intrusive/tsigas_cycle_queue.h>
123
124         struct Foo {
125             ...
126         };
127
128         // Queue of Foo pointers, capacity is 1024, statically allocated buffer:
129         struct queue_traits: public cds::intrusive::tsigas_queue::traits
130         {
131             typedef cds::opt::v::static_buffer< Foo, 1024 > buffer;
132         };
133         typedef cds::intrusive::TsigasCycleQueue< Foo, queue_traits > static_queue;
134         static_queue    stQueue;
135
136         // Queue of Foo pointers, capacity is 1024, dynamically allocated buffer, with item counting:
137         typedef cds::intrusive::TsigasCycleQueue< Foo,
138             typename cds::intrusive::tsigas_queue::make_traits<
139                 cds::opt::buffer< cds::opt::v::dynamic_buffer< Foo > >,
140                 cds::opt::item_counter< cds::atomicity::item_counter >
141             >::type
142         > dynamic_queue;
143         dynamic_queue    dynQueue( 1024 );
144         \endcode
145     */
146     template <typename T, typename Traits = tsigas_queue::traits >
147     class TsigasCycleQueue: public cds::bounded_container
148     {
149     public:
150         /// Rebind template arguments
151         template <typename T2, typename Traits2>
152         struct rebind {
153             typedef TsigasCycleQueue< T2, Traits2 > other   ;   ///< Rebinding result
154         };
155
156     public:
157         typedef T value_type;   ///< type of value to be stored in the queue
158         typedef Traits traits;  ///< Queue traits
159         typedef typename traits::item_counter  item_counter;    ///< Item counter type
160         typedef typename traits::disposer      disposer;        ///< Item disposer
161         typedef typename traits::back_off      back_off;        ///< back-off strategy used
162         typedef typename traits::memory_model  memory_model;    ///< Memory ordering. See cds::opt::memory_model option
163         typedef typename traits::buffer::template rebind< atomics::atomic<value_type *> >::other buffer; ///< Internal buffer
164
165     protected:
166         //@cond
167         typedef typename opt::details::alignment_setter< buffer, traits::alignment >::type aligned_buffer;
168         typedef size_t index_type;
169         typedef typename opt::details::alignment_setter< atomics::atomic<index_type>, traits::alignment >::type aligned_index;
170         //@endcond
171
172     protected:
173         //@cond
174         buffer          m_buffer    ;   ///< array of pointer T *, array size is equal to m_nCapacity+1
175         aligned_index   m_nHead     ;   ///< index of queue's head
176         aligned_index   m_nTail     ;   ///< index of queue's tail
177         item_counter    m_ItemCounter   ;   ///< item counter
178         //@endcond
179
180     protected:
181         //@cond
182         static CDS_CONSTEXPR intptr_t const free0 = 0;
183         static CDS_CONSTEXPR intptr_t const free1 = 1;
184
185         static bool is_free( const value_type * p ) CDS_NOEXCEPT
186         {
187             return p == reinterpret_cast<value_type *>(free0) || p == reinterpret_cast<value_type *>(free1);
188         }
189
190         size_t CDS_CONSTEXPR buffer_capacity() const CDS_NOEXCEPT
191         {
192             return m_buffer.capacity();
193         }
194
195         index_type CDS_CONSTEXPR modulo() const CDS_NOEXCEPT
196         {
197             return buffer_capacity() - 1;
198         }
199         //@endcond
200
201     public:
202         /// Initialize empty queue of capacity \p nCapacity
203         /**
204             If internal buffer type is \p cds::opt::v::static_buffer, the \p nCapacity parameter is ignored.
205
206             Note that the real capacity of queue is \p nCapacity - 2.
207         */
208         TsigasCycleQueue( size_t nCapacity = 0 )
209             : m_buffer( nCapacity )
210             , m_nHead(0)
211             , m_nTail(1)
212         {
213             m_buffer.zeroize();
214         }
215
216         /// Clears the queue
217         ~TsigasCycleQueue()
218         {
219             clear();
220         }
221
222         /// Enqueues an item to the queue
223         /** @anchor cds_intrusive_TsigasQueue_enqueue
224             Returns \p true if success, \p false if queue is full
225         */
226         bool enqueue( value_type& data )
227         {
228             value_type * pNewNode  = &data;
229             assert( (reinterpret_cast<ptr_atomic_t>( pNewNode ) & 1) == 0 );
230             back_off bkoff;
231
232             const index_type nModulo = modulo();
233
234             do {
235                 index_type te = m_nTail.load(memory_model::memory_order_acquire);
236                 index_type ate = te;
237                 value_type * tt = m_buffer[ ate ].load(memory_model::memory_order_relaxed);
238                 index_type temp = ( ate + 1 ) & nModulo ;    // next item after tail
239
240                 // Looking for actual tail
241                 while ( !is_free( tt ) ) {
242                     if ( te != m_nTail.load(memory_model::memory_order_relaxed) )    // check the tail consistency
243                         goto TryAgain;
244                     if ( temp == m_nHead.load(memory_model::memory_order_acquire) )    // queue full?
245                         break;
246                     tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
247                     ate = temp;
248                     temp = (temp + 1) & nModulo;
249                 }
250
251                 if ( te != m_nTail.load(memory_model::memory_order_relaxed) )
252                     continue;
253
254                 // Check whether queue is full
255                 if ( temp == m_nHead.load(memory_model::memory_order_acquire) ) {
256                     ate = ( temp + 1 ) & nModulo;
257                     tt = m_buffer[ ate ].load(memory_model::memory_order_relaxed);
258                     if ( !is_free( tt ) ) {
259                         return false;   // Queue is full
260                     }
261
262                     // help the dequeue to update head
263                     m_nHead.compare_exchange_strong( temp, ate, memory_model::memory_order_release, atomics::memory_order_relaxed );
264                     continue;
265                 }
266
267                 if ( tt == reinterpret_cast<value_type *>(free1) )
268                     pNewNode = reinterpret_cast<value_type *>(reinterpret_cast<intptr_t>( pNewNode ) | 1);
269                 if ( te != m_nTail.load(memory_model::memory_order_relaxed) )
270                     continue;
271
272                 // get actual tail and try to enqueue new node
273                 if ( m_buffer[ate].compare_exchange_strong( tt, pNewNode, memory_model::memory_order_release, atomics::memory_order_relaxed ) ) {
274                     if ( temp % 2 == 0 )
275                         m_nTail.compare_exchange_strong( te, temp, memory_model::memory_order_release, atomics::memory_order_relaxed );
276                     ++m_ItemCounter;
277                     return true;
278                 }
279             TryAgain:;
280             } while ( bkoff(), true );
281
282             // No control path reaches this line!
283             return false;
284         }
285
286         /// Dequeues item from the queue
287         /** @anchor cds_intrusive_TsigasQueue_dequeue
288             If the queue is empty the function returns \p nullptr
289
290             Dequeue does not call value disposer. You may manually dispose returned value if it is needed.
291         */
292         value_type * dequeue()
293         {
294             back_off bkoff;
295
296             const index_type nModulo = modulo();
297             do {
298                 index_type th = m_nHead.load(memory_model::memory_order_acquire);
299                 index_type temp = ( th + 1 ) & nModulo;
300                 value_type * tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
301                 value_type * pNull;
302
303                 // find the actual head after this loop
304                 while ( is_free( tt ) ) {
305                     if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
306                         goto TryAgain;
307
308                     // two consecutive nullptr means the queue is empty
309                     if ( temp == m_nTail.load(memory_model::memory_order_acquire) )
310                         return nullptr;
311
312                     temp = ( temp + 1 ) & nModulo;
313                     tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
314                 }
315
316                 if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
317                     continue;
318
319                 // check whether the queue is empty
320                 if ( temp == m_nTail.load(memory_model::memory_order_acquire) ) {
321                     // help the enqueue to update end
322                     m_nTail.compare_exchange_weak( temp, (temp + 1) & nModulo, memory_model::memory_order_release, atomics::memory_order_relaxed );
323                     continue;
324                 }
325
326                 pNull = reinterpret_cast<value_type *>((reinterpret_cast<ptr_atomic_t>(tt) & 1) ? free0 : free1 );
327
328                 if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
329                     continue;
330
331                 // Get the actual head, null means empty
332                 if ( m_buffer[temp].compare_exchange_weak( tt, pNull, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
333                     if ( temp % 2 == 0 )
334                         m_nHead.compare_exchange_weak( th, temp, memory_model::memory_order_release, atomics::memory_order_relaxed );
335                     --m_ItemCounter;
336                     return reinterpret_cast<value_type *>(reinterpret_cast<intptr_t>( tt ) & ~intptr_t(1));
337                 }
338
339             TryAgain:;
340             } while ( bkoff(), true );
341
342             // No control path reaches this line!
343             return nullptr;
344         }
345
346         /// Synonym for \p enqueue()
347         bool push( value_type& data )
348         {
349             return enqueue( data );
350         }
351
352         /// Synonym for \p dequeue()
353         value_type * pop()
354         {
355             return dequeue();
356         }
357
358         /// Checks if the queue is empty
359         bool empty() const
360         {
361             const index_type nModulo = modulo();
362
363         TryAgain:
364             index_type th = m_nHead.load(memory_model::memory_order_relaxed);
365             index_type temp = ( th + 1 ) & nModulo;
366             const value_type * tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
367
368             // find the actual head after this loop
369             while ( is_free( tt ) ) {
370                 if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
371                     goto TryAgain;
372                 // two consecutive nullptr means queue empty
373                 if ( temp == m_nTail.load(memory_model::memory_order_relaxed) )
374                     return true;
375                 temp = ( temp + 1 ) & nModulo;
376                 tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
377             }
378             return false;
379         }
380
381         /// Clears queue in lock-free manner.
382         /**
383             \p f parameter is a functor to dispose removed items:
384             \code
385             myQueue.clear( []( value_type * p ) { delete p; } );
386             \endcode
387         */
388         template <typename Disposer>
389         void clear( Disposer f )
390         {
391             value_type * pv;
392             while ( (pv = pop()) != nullptr ) {
393                 f( pv );
394             }
395         }
396
397         /// Clears the queue
398         /**
399             This function uses the disposer that is specified in \p Traits,
400             see \p tsigas_queue::traits::disposer.
401         */
402         void clear()
403         {
404             clear( disposer() );
405         }
406
407         /// Returns queue's item count
408         /**
409             The value returned depends on \p tsigas_queue::traits::item_counter.
410             For \p atomicity::empty_item_counter, the function always returns 0.
411         */
412         size_t size() const CDS_NOEXCEPT
413         {
414             return m_ItemCounter.value();
415         }
416
417         /// Returns capacity of internal cyclic buffer
418         size_t CDS_CONSTEXPR capacity() const CDS_NOEXCEPT
419         {
420             return buffer_capacity() - 2;
421         }
422     };
423
424 }}  // namespace cds::intrusive
425
426 #endif  // #ifndef __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H