Replaced alignment option with padding for VyukovMPMCCycleQueue
[libcds.git] / cds / container / vyukov_mpmc_cycle_queue.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
4 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
5
6 #include <cds/container/details/base.h>
7 #include <cds/opt/buffer.h>
8 #include <cds/opt/value_cleaner.h>
9 #include <cds/algo/atomic.h>
10 #include <cds/details/bounded_container.h>
11
12 namespace cds { namespace container {
13
14     /// VyukovMPMCCycleQueue related definitions
15     /** @ingroup cds_nonintrusive_helper
16     */
17     namespace vyukov_queue {
18
19         /// VyukovMPMCCycleQueue default traits
20         struct traits {
21             /// Buffer type for internal array
22             /*
23                 The type of element for the buffer is not important: the queue rebinds
24                 buffer for required type via \p rebind metafunction.
25
26                 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
27             */
28             typedef cds::opt::v::dynamic_buffer< void * > buffer;
29
30             /// A functor to clean item dequeued.
31             /**
32                 The functor  calls the destructor for queue item.
33                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
34                 If \p T is a complex type, \p value_cleaner may be the useful feature.
35
36                 Default value is \ref opt::v::destruct_cleaner
37             */
38             typedef cds::opt::v::destruct_cleaner value_cleaner;
39
40             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
41             typedef cds::atomicity::empty_item_counter item_counter;
42
43             /// C++ memory ordering model
44             /**
45                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
46                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
47             */
48             typedef opt::v::relaxed_ordering    memory_model;
49
50             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
51             enum { padding = opt::cache_line_padding };
52         };
53
54         /// Metafunction converting option list to \p vyukov_queue::traits
55         /**
56             Supported \p Options are:
57             - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
58                 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
59                 element in the buffer is not important: it will be changed via \p rebind metafunction.
60             - \p opt::value_cleaner - a functor to clean item dequeued.
61                 The functor calls the destructor for queue item.
62                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
63                 If \p T is a complex type, \p value_cleaner can be an useful feature.
64                 Default value is \ref opt::v::destruct_cleaner
65             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
66                 To enable item counting use \p cds::atomicity::item_counter
67             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
68             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
69                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
70
71             Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
72             \code
73             typedef cds::container::VyukovMPMCCycleQueue< Foo,
74                 typename cds::container::vyukov_queue::make_traits<
75                     cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
76                     cds::opt::item_counte< cds::atomicity::item_counter >
77                 >::type
78             > myQueue;
79             \endcode
80         */
81         template <typename... Options>
82         struct make_traits {
83 #   ifdef CDS_DOXYGEN_INVOKED
84             typedef implementation_defined type;   ///< Metafunction result
85 #   else
86             typedef typename cds::opt::make_options<
87                 typename cds::opt::find_type_traits< traits, Options... >::type
88                 , Options...
89             >::type type;
90 #   endif
91         };
92
93     } //namespace vyukov_queue
94
95     /// Vyukov's MPMC bounded queue
96     /** @ingroup cds_nonintrusive_queue
97         This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
98         It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
99         blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
100         just implemented by means of atomic RMW operations w/o mutexes.
101
102         The cost of enqueue/dequeue is 1 CAS per operation.
103         No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
104         i.e. do not touch the same data while queue is not empty.
105
106         Source:
107             - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
108
109         Template parameters
110         - \p T - type stored in queue.
111         - \p Traits - queue traits, default is \p vykov_queue::traits. You can use \p vykov_queue::make_traits
112             metafunction to make your traits or just derive your traits from \p %vykov_queue::traits:
113             \code
114             struct myTraits: public cds::container::vykov_queue::traits {
115                 typedef cds::atomicity::item_counter    item_counter;
116             };
117             typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
118
119             // Equivalent make_traits example:
120             typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
121                 typename cds::container::vykov_queue::make_traits<
122                     cds::opt::item_counter< cds::atomicity::item_counter >
123                 >::type
124             > myQueue;
125             \endcode
126
127         \par License
128             Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
129     */
130     template <typename T, typename Traits = vyukov_queue::traits >
131     class VyukovMPMCCycleQueue : public cds::bounded_container
132     {
133     public:
134         typedef T value_type;   ///< Value type to be stored in the queue
135         typedef Traits traits;  ///< Queue traits
136         typedef typename traits::item_counter  item_counter;  ///< Item counter type
137         typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See cds::opt::memory_model option
138         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
139
140         /// Rebind template arguments
141         template <typename T2, typename Traits2>
142         struct rebind {
143             typedef VyukovMPMCCycleQueue< T2, Traits2 > other   ;   ///< Rebinding result
144         };
145
146     protected:
147         //@cond
148         typedef atomics::atomic<size_t> sequence_type;
149         struct cell_type
150         {
151             sequence_type   sequence;
152             value_type      data;
153
154             cell_type()
155             {}
156         };
157
158         typedef typename traits::buffer::template rebind<cell_type>::other buffer;
159         //@endcond
160
161     protected:
162         //@cond
163         buffer          m_buffer;
164         typename opt::details::apply_padding< buffer, traits::padding >::padding_type pad1_;
165         size_t const    m_nBufferMask;
166         sequence_type   m_posEnqueue;
167         typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
168         sequence_type   m_posDequeue;
169         typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad3_;
170         item_counter    m_ItemCounter;
171         //@endcond
172
173     public:
174         /// Constructs the queue of capacity \p nCapacity
175         /**
176             For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
177
178             The buffer capacity must be the power of two.
179         */
180         VyukovMPMCCycleQueue(
181             size_t nCapacity = 0
182             )
183             : m_buffer( nCapacity )
184             , m_nBufferMask( m_buffer.capacity() - 1 )
185         {
186             nCapacity = m_buffer.capacity();
187
188             // Buffer capacity must be power of 2
189             assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
190
191             for (size_t i = 0; i != nCapacity; ++i )
192                 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
193
194             m_posEnqueue.store(0, memory_model::memory_order_relaxed);
195             m_posDequeue.store(0, memory_model::memory_order_relaxed);
196         }
197
198         ~VyukovMPMCCycleQueue()
199         {
200             clear();
201         }
202
203         /// Enqueues data to the queue using a functor
204         /**
205             \p Func is a functor called to copy a value to the queue cell.
206             The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
207             \code
208             cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
209             Bar bar;
210             myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
211             \endcode
212         */
213         template <typename Func>
214         bool enqueue_with(Func f)
215         {
216             cell_type* cell;
217             size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
218
219             for (;;)
220             {
221                 cell = &m_buffer[pos & m_nBufferMask];
222                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
223
224                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
225
226                 if (dif == 0) {
227                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
228                         break;
229                 }
230                 else if (dif < 0)
231                     return false;
232                 else
233                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
234             }
235
236             f( cell->data );
237
238             cell->sequence.store(pos + 1, memory_model::memory_order_release);
239             ++m_ItemCounter;
240
241             return true;
242         }
243
244         /// Enqueues \p val value into the queue.
245         /**
246             The new queue item is created by calling placement new in free cell.
247             Returns \p true if success, \p false if the queue is full.
248         */
249         bool enqueue( value_type const& val )
250         {
251             return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
252         }
253
254         /// Synonym for \p enqueue()
255         bool push( value_type const& data )
256         {
257             return enqueue( data );
258         }
259
260         /// Synonym for \p enqueue_with()
261         template <typename Func>
262         bool push_with( Func f )
263         {
264             return enqueue_with( f );
265         }
266
267         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
268         template <typename... Args>
269         bool emplace( Args&&... args )
270         {   
271 #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
272             //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda. 
273             return enqueue_with ( std::bind([]( value_type& dest,Args ... args ){ new ( &dest ) value_type( std::forward<Args>(args)... );}, std::placeholders::_1 ,args...));        
274 #else            
275             return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>(args)... ); });            
276 #endif        
277         }
278
279         /// Dequeues a value using a functor
280         /**
281             \p Func is a functor called to copy dequeued value.
282             The functor takes one argument - a reference to removed node:
283             \code
284             cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
285             Bar bar;
286             myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
287             \endcode
288             The functor is called only if the queue is not empty.
289         */
290         template <typename Func>
291         bool dequeue_with( Func f )
292         {
293             cell_type * cell;
294             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
295
296             for (;;)
297             {
298                 cell = &m_buffer[pos & m_nBufferMask];
299                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
300                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
301
302                 if (dif == 0) {
303                     if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
304                         break;
305                 }
306                 else if (dif < 0)
307                     return false;
308                 else
309                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
310             }
311
312             f( cell->data );
313             value_cleaner()( cell->data );
314             cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
315             --m_ItemCounter;
316
317             return true;
318         }
319
320         /// Dequeues a value from the queue
321         /**
322             If queue is not empty, the function returns \p true, \p dest contains copy of
323             dequeued value. The assignment operator for type \ref value_type is invoked.
324             If queue is empty, the function returns \p false, \p dest is unchanged.
325         */
326         bool dequeue(value_type & dest )
327         {
328             return dequeue_with( [&dest]( value_type& src ){ dest = src; } );
329         }
330
331         /// Synonym for \p dequeue()
332         bool pop(value_type& data)
333         {
334             return dequeue(data);
335         }
336
337         /// Synonym for \p dequeue_with()
338         template <typename Func>
339         bool pop_with( Func f )
340         {
341             return dequeue_with( f );
342         }
343
344         /// Checks if the queue is empty
345         bool empty() const
346         {
347             const cell_type * cell;
348             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
349
350             for (;;)
351             {
352                 cell = &m_buffer[pos & m_nBufferMask];
353                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
354                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
355
356                 if (dif == 0)
357                     return false;
358                 else if (dif < 0)
359                     return true;
360                 else
361                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
362             }
363         }
364
365         /// Clears the queue
366         void clear()
367         {
368             value_type v;
369             while ( pop(v) );
370         }
371
372         /// Returns queue's item count
373         /**
374             The value returned depends on \p vyukov_queue::traits::item_counter option.
375             For \p atomicity::empty_item_counter, the function always returns 0.
376         */
377         size_t size() const
378         {
379             return m_ItemCounter.value();
380         }
381
382         /// Returns capacity of the queue
383         size_t capacity() const
384         {
385             return m_buffer.capacity();
386         }
387     };
388 }}  // namespace cds::container
389
390 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H