Removed redundant spaces
[libcds.git] / cds / intrusive / tsigas_cycle_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
32 #define CDSLIB_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
33
34 #include <cds/intrusive/details/base.h>
35 #include <cds/algo/atomic.h>
36 #include <cds/details/bounded_container.h>
37 #include <cds/opt/buffer.h>
38
39 namespace cds { namespace intrusive {
40
41     /// TsigasCycleQueue related definitions
42     /** @ingroup cds_intrusive_helper
43     */
44     namespace tsigas_queue {
45
46         /// TsigasCycleQueue default traits
47         struct traits
48         {
49             /// Buffer type for cyclic array
50             /*
51                 The type of element for the buffer is not important: the queue rebinds
52                 buffer for required type via \p rebind metafunction.
53
54                 For \p TsigasCycleQueue queue the buffer size should have power-of-2 size.
55
56                 You should use any initialized buffer type, see \p opt::buffer.
57             */
58             typedef cds::opt::v::initialized_dynamic_buffer< void * > buffer;
59
60             /// Back-off strategy
61             typedef cds::backoff::empty         back_off;
62
63             /// The functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used for dequeuing
64             typedef opt::v::empty_disposer      disposer;
65
66             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
67             typedef atomicity::empty_item_counter item_counter;
68
69             /// C++ memory ordering model
70             /**
71                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
72                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
73             */
74             typedef opt::v::relaxed_ordering    memory_model;
75
76             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
77             enum { padding = opt::cache_line_padding };
78         };
79
80         /// Metafunction converting option list to \p tsigas_queue::traits
81         /**
82             Supported \p Options are:
83             - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
84                 \p opt::v::initialized_dynamic_buffer (the default), \p opt::v::initialized_static_buffer. The type of
85                 element in the buffer is not important: it will be changed via \p rebind metafunction.
86             - \p opt::back_off - back-off strategy used, default is \p cds::backoff::empty.
87             - \p opt::disposer - the functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used
88                 when dequeuing.
89             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
90                 To enable item counting use \p cds::atomicity::item_counter
91             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
92             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
93                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
94
95             Example: declare \p %TsigasCycleQueue with item counting and static iternal buffer of size 1024:
96             \code
97             typedef cds::intrusive::TsigasCycleQueue< Foo,
98                 typename cds::intrusive::tsigas_queue::make_traits<
99                     cds::opt::buffer< cds::opt::v::initialized_static_buffer< void *, 1024 >,
100                     cds::opt::item_counte< cds::atomicity::item_counter >
101                 >::type
102             > myQueue;
103             \endcode
104         */
105         template <typename... Options>
106         struct make_traits {
107 #   ifdef CDS_DOXYGEN_INVOKED
108             typedef implementation_defined type;   ///< Metafunction result
109 #   else
110             typedef typename cds::opt::make_options<
111                 typename cds::opt::find_type_traits< traits, Options... >::type
112                 , Options...
113             >::type type;
114 #   endif
115         };
116
117
118     } //namespace tsigas_queue
119
120     /// Non-blocking cyclic queue discovered by Philippas Tsigas and Yi Zhang
121     /** @ingroup cds_intrusive_queue
122
123         Source:
124         \li [2000] Philippas Tsigas, Yi Zhang "A Simple, Fast and Scalable Non-Blocking Concurrent FIFO Queue
125             for Shared Memory Multiprocessor Systems"
126
127         Template arguments:
128         - \p T - value type to be stored in queue. The queue stores pointers to passed data of type \p T.
129             <b>Restriction</b>: the queue can manage at least two-byte aligned data: the least significant bit (LSB)
130             of any pointer stored in the queue must be zero since the algorithm may use LSB
131             as a flag that marks the free cell.
132         - \p Traits - queue traits, default is \p tsigas_queue::traits. You can use \p tsigas_queue::make_traits
133             metafunction to make your traits or just derive your traits from \p %tsigas_queue::traits:
134             \code
135             struct myTraits: public cds::intrusive::tsigas_queue::traits {
136                 typedef cds::atomicity::item_counter    item_counter;
137             };
138             typedef cds::intrusive::TsigasCycleQueue< Foo, myTraits > myQueue;
139
140             // Equivalent make_traits example:
141             typedef cds::intrusive::TsigasCycleQueue< Foo,
142                 typename cds::intrusive::tsigas_queue::make_traits<
143                     cds::opt::item_counter< cds::atomicity::item_counter >
144                 >::type
145             > myQueue;
146             \endcode
147
148         This queue algorithm does not require any garbage collector.
149
150         \par Examples:
151         \code
152         #include <cds/intrusive/tsigas_cycle_queue.h>
153
154         struct Foo {
155             ...
156         };
157
158         // Queue of Foo pointers, capacity is 1024, statically allocated buffer:
159         struct queue_traits: public cds::intrusive::tsigas_queue::traits
160         {
161             typedef cds::opt::v::initialized_static_buffer< Foo, 1024 > buffer;
162         };
163         typedef cds::intrusive::TsigasCycleQueue< Foo, queue_traits > static_queue;
164         static_queue    stQueue;
165
166         // Queue of Foo pointers, capacity is 1024, dynamically allocated buffer, with item counting:
167         typedef cds::intrusive::TsigasCycleQueue< Foo,
168             typename cds::intrusive::tsigas_queue::make_traits<
169                 cds::opt::buffer< cds::opt::v::initialized_dynamic_buffer< Foo > >,
170                 cds::opt::item_counter< cds::atomicity::item_counter >
171             >::type
172         > dynamic_queue;
173         dynamic_queue    dynQueue( 1024 );
174         \endcode
175     */
176     template <typename T, typename Traits = tsigas_queue::traits >
177     class TsigasCycleQueue: public cds::bounded_container
178     {
179     public:
180         /// Rebind template arguments
181         template <typename T2, typename Traits2>
182         struct rebind {
183             typedef TsigasCycleQueue< T2, Traits2 > other   ;   ///< Rebinding result
184         };
185
186     public:
187         typedef T value_type;   ///< type of value to be stored in the queue
188         typedef Traits traits;  ///< Queue traits
189         typedef typename traits::item_counter  item_counter;    ///< Item counter type
190         typedef typename traits::disposer      disposer;        ///< Item disposer
191         typedef typename traits::back_off      back_off;        ///< back-off strategy used
192         typedef typename traits::memory_model  memory_model;    ///< Memory ordering. See cds::opt::memory_model option
193         typedef typename traits::buffer::template rebind< atomics::atomic<value_type *> >::other buffer; ///< Internal buffer
194
195     protected:
196         //@cond
197         typedef size_t index_type;
198         //@endcond
199
200     protected:
201         //@cond
202         buffer       m_buffer    ;   ///< array of pointer T *, array size is equal to m_nCapacity+1
203         typename opt::details::apply_padding< index_type, traits::padding >::padding_type pad1_;
204         atomics::atomic<index_type>   m_nHead     ;   ///< index of queue's head
205         typename opt::details::apply_padding< index_type, traits::padding >::padding_type pad2_;
206         atomics::atomic<index_type>   m_nTail     ;   ///< index of queue's tail
207         typename opt::details::apply_padding< index_type, traits::padding >::padding_type pad3_;
208         item_counter m_ItemCounter;  ///< item counter
209         //@endcond
210
211     protected:
212         //@cond
213         static CDS_CONSTEXPR intptr_t const free0 = 0;
214         static CDS_CONSTEXPR intptr_t const free1 = 1;
215
216         static bool is_free( const value_type * p ) CDS_NOEXCEPT
217         {
218             return (reinterpret_cast<intptr_t>(p) & ~intptr_t(1)) == 0;
219         }
220
221         size_t CDS_CONSTEXPR buffer_capacity() const CDS_NOEXCEPT
222         {
223             return m_buffer.capacity();
224         }
225
226         index_type CDS_CONSTEXPR modulo() const CDS_NOEXCEPT
227         {
228             return buffer_capacity() - 1;
229         }
230         //@endcond
231
232     public:
233         /// Initialize empty queue of capacity \p nCapacity
234         /**
235             If internal buffer type is \p cds::opt::v::initialized_static_buffer, the \p nCapacity parameter is ignored.
236
237             Note that the real capacity of queue is \p nCapacity - 2.
238         */
239         TsigasCycleQueue( size_t nCapacity = 0 )
240             : m_buffer( nCapacity )
241             , m_nHead(0)
242             , m_nTail(1)
243         {
244             m_buffer.zeroize();
245         }
246
247         /// Clears the queue
248         ~TsigasCycleQueue()
249         {
250             clear();
251         }
252
253         /// Enqueues an item to the queue
254         /** @anchor cds_intrusive_TsigasQueue_enqueue
255             Returns \p true if success, \p false if queue is full
256         */
257         bool enqueue( value_type& data )
258         {
259             value_type * pNewNode  = &data;
260             assert( (reinterpret_cast<uintptr_t>(pNewNode) & 1) == 0 );
261             back_off bkoff;
262
263             const index_type nModulo = modulo();
264
265             do {
266                 index_type te = m_nTail.load(memory_model::memory_order_acquire);
267                 index_type ate = te;
268                 value_type * tt = m_buffer[ ate ].load(memory_model::memory_order_relaxed);
269                 index_type temp = ( ate + 1 ) & nModulo ;    // next item after tail
270
271                 // Looking for actual tail
272                 while ( !is_free( tt )) {
273                     if ( te != m_nTail.load(memory_model::memory_order_relaxed))    // check the tail consistency
274                         goto TryAgain;
275                     if ( temp == m_nHead.load(memory_model::memory_order_acquire))    // queue full?
276                         break;
277                     tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
278                     ate = temp;
279                     temp = (temp + 1) & nModulo;
280                 }
281
282                 if ( te != m_nTail.load(memory_model::memory_order_acquire))
283                     continue;
284
285                 // Check whether queue is full
286                 if ( temp == m_nHead.load(memory_model::memory_order_acquire)) {
287                     ate = ( temp + 1 ) & nModulo;
288                     tt = m_buffer[ ate ].load(memory_model::memory_order_relaxed);
289                     if ( !is_free( tt )) {
290                         return false;   // Queue is full
291                     }
292
293                     // help the dequeue to update head
294                     m_nHead.compare_exchange_strong( temp, ate, memory_model::memory_order_release, atomics::memory_order_relaxed );
295                     continue;
296                 }
297
298                 if ( tt == reinterpret_cast<value_type *>(free1))
299                     pNewNode = reinterpret_cast<value_type *>(reinterpret_cast<intptr_t>( pNewNode ) | 1);
300                 if ( te != m_nTail.load(memory_model::memory_order_acquire))
301                     continue;
302
303                 // get actual tail and try to enqueue new node
304                 if ( m_buffer[ate].compare_exchange_strong( tt, pNewNode, memory_model::memory_order_release, atomics::memory_order_relaxed )) {
305                     if ( temp % 2 == 0 )
306                         m_nTail.compare_exchange_strong( te, temp, memory_model::memory_order_release, atomics::memory_order_relaxed );
307                     ++m_ItemCounter;
308                     return true;
309                 }
310             TryAgain:;
311             } while ( bkoff(), true );
312
313             // No control path reaches this line!
314             return false;
315         }
316
317         /// Dequeues item from the queue
318         /** @anchor cds_intrusive_TsigasQueue_dequeue
319             If the queue is empty the function returns \p nullptr
320
321             Dequeue does not call value disposer. You may manually dispose returned value if it is needed.
322         */
323         value_type * dequeue()
324         {
325             back_off bkoff;
326
327             const index_type nModulo = modulo();
328             do {
329                 index_type th = m_nHead.load(memory_model::memory_order_acquire);
330                 index_type temp = ( th + 1 ) & nModulo;
331                 value_type * tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
332                 value_type * pNull;
333
334                 // find the actual head after this loop
335                 while ( is_free( tt )) {
336                     if ( th != m_nHead.load(memory_model::memory_order_relaxed))
337                         goto TryAgain;
338
339                     // two consecutive nullptr means the queue is empty
340                     if ( temp == m_nTail.load(memory_model::memory_order_acquire))
341                         return nullptr;
342
343                     temp = ( temp + 1 ) & nModulo;
344                     tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
345                 }
346
347                 if ( th != m_nHead.load(memory_model::memory_order_relaxed))
348                     continue;
349
350                 // check whether the queue is empty
351                 if ( temp == m_nTail.load(memory_model::memory_order_acquire)) {
352                     // help the enqueue to update end
353                     m_nTail.compare_exchange_weak( temp, (temp + 1) & nModulo, memory_model::memory_order_release, atomics::memory_order_relaxed );
354                     continue;
355                 }
356
357                 pNull = reinterpret_cast<value_type *>((reinterpret_cast<uintptr_t>(tt) & 1) ? free0 : free1);
358
359                 if ( th != m_nHead.load(memory_model::memory_order_relaxed))
360                     continue;
361
362                 // Get the actual head, null means empty
363                 if ( m_buffer[temp].compare_exchange_weak( tt, pNull, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
364                     if ( temp % 2 == 0 )
365                         m_nHead.compare_exchange_weak( th, temp, memory_model::memory_order_release, atomics::memory_order_relaxed );
366                     --m_ItemCounter;
367                     return reinterpret_cast<value_type *>(reinterpret_cast<intptr_t>( tt ) & ~intptr_t(1));
368                 }
369
370             TryAgain:;
371             } while ( bkoff(), true );
372
373             // No control path reaches this line!
374             return nullptr;
375         }
376
377         /// Synonym for \p enqueue()
378         bool push( value_type& data )
379         {
380             return enqueue( data );
381         }
382
383         /// Synonym for \p dequeue()
384         value_type * pop()
385         {
386             return dequeue();
387         }
388
389         /// Checks if the queue is empty
390         bool empty() const
391         {
392             const index_type nModulo = modulo();
393
394         TryAgain:
395             index_type th = m_nHead.load(memory_model::memory_order_relaxed);
396             index_type temp = ( th + 1 ) & nModulo;
397             const value_type * tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
398
399             // find the actual head after this loop
400             while ( is_free( tt )) {
401                 if ( th != m_nHead.load(memory_model::memory_order_relaxed))
402                     goto TryAgain;
403                 // two consecutive nullptr means queue empty
404                 if ( temp == m_nTail.load(memory_model::memory_order_relaxed))
405                     return true;
406                 temp = ( temp + 1 ) & nModulo;
407                 tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
408             }
409             return false;
410         }
411
412         /// Clears queue in lock-free manner.
413         /**
414             \p f parameter is a functor to dispose removed items:
415             \code
416             myQueue.clear( []( value_type * p ) { delete p; } );
417             \endcode
418         */
419         template <typename Disposer>
420         void clear( Disposer f )
421         {
422             value_type * pv;
423             while ( (pv = pop()) != nullptr ) {
424                 f( pv );
425             }
426         }
427
428         /// Clears the queue
429         /**
430             This function uses the disposer that is specified in \p Traits,
431             see \p tsigas_queue::traits::disposer.
432         */
433         void clear()
434         {
435             clear( disposer());
436         }
437
438         /// Returns queue's item count
439         /**
440             The value returned depends on \p tsigas_queue::traits::item_counter.
441             For \p atomicity::empty_item_counter, the function always returns 0.
442         */
443         size_t size() const CDS_NOEXCEPT
444         {
445             return m_ItemCounter.value();
446         }
447
448         /// Returns capacity of internal cyclic buffer
449         size_t CDS_CONSTEXPR capacity() const CDS_NOEXCEPT
450         {
451             return buffer_capacity() - 2;
452         }
453     };
454
455 }}  // namespace cds::intrusive
456
457 #endif  // #ifndef CDSLIB_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H