Replace CDS_ATOMIC with namespace atomics
[libcds.git] / cds / container / vyukov_mpmc_cycle_queue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
4 #define __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
5
6 #include <cds/container/base.h>
7 #include <cds/opt/buffer.h>
8 #include <cds/opt/value_cleaner.h>
9 #include <cds/cxx11_atomic.h>
10 #include <cds/ref.h>
11 #include <cds/details/trivial_assign.h>
12 #include <cds/details/bounded_container.h>
13
14 namespace cds { namespace container {
15
16     /// Vyukov's MPMC bounded queue
17     /** @ingroup cds_nonintrusive_queue
18         This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
19         It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
20         blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
21         just implemented by means of atomic RMW operations w/o mutexes.
22
23         The cost of enqueue/dequeue is 1 CAS per operation.
24         No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
25         i.e. do not touch the same data while queue is not empty.
26
27         \par Source:
28             http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
29
30         \par Template parameters
31             \li \p T - type stored in queue.
32             \li \p Options - queue's options
33
34         Options \p Options are:
35         - opt::buffer - buffer to store items. Mandatory option, see option description for full list of possible types.
36         - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter
37         - opt::value_cleaner - a functor to clean item dequeued. Default value is \ref opt::v::destruct_cleaner
38             that calls the destructor of type \p T.
39             After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied. If \p T
40             is a complex type, \p value_cleaner may be the useful feature.
41         - opt::alignment - the alignment for internal queue data. Default is opt::cache_line_alignment
42         - opt::memory_model - C++ memory ordering model. Can be opt::v::relaxed_ordering (relaxed memory model, the default)
43             or opt::v::sequential_consistent (sequentially consisnent memory model).
44
45         \par License
46             Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
47
48         \par Example
49         \code
50         #include <cds/container/vyukov_mpmc_cycle_queue.h>
51
52         // // Queue with 1024 item static buffer
53         cds::container::vyukov_mpmc_bounded<
54             int
55             ,cds::opt::buffer< cds::opt::v::static_buffer<int, 1024> >
56         > myQueue;
57         \endcode
58     */
59     template <typename T, CDS_DECL_OPTIONS6>
60     class VyukovMPMCCycleQueue
61         : public cds::bounded_container
62     {
63     protected:
64         //@cond
65         struct default_options
66         {
67             typedef cds::opt::v::destruct_cleaner  value_cleaner;
68             typedef atomicity::empty_item_counter item_counter;
69             typedef opt::v::empty_disposer      disposer    ;   // for intrusive version only
70             typedef opt::v::relaxed_ordering    memory_model;
71             enum { alignment = opt::cache_line_alignment };
72         };
73         //@endcond
74
75     public:
76         //@cond
77         typedef typename opt::make_options<
78             typename cds::opt::find_type_traits< default_options, CDS_OPTIONS6 >::type
79             ,CDS_OPTIONS6
80         >::type   options;
81         //@endcond
82
83     protected:
84         //@cond
85         typedef typename options::value_cleaner  value_cleaner;
86         //@endcond
87
88     public:
89         typedef T value_type    ;   ///< @anchor cds_container_VyukovMPMCCycleQueue_value_type type of value stored in the queue
90         typedef typename options::item_counter  item_counter ;  ///< Item counter type
91         typedef typename options::memory_model  memory_model ;  ///< Memory ordering. See cds::opt::memory_model option
92
93         /// Rebind template arguments
94         template <typename T2, CDS_DECL_OTHER_OPTIONS6>
95         struct rebind {
96             typedef VyukovMPMCCycleQueue< T2, CDS_OTHER_OPTIONS6> other   ;   ///< Rebinding result
97         };
98
99     protected:
100         //@cond
101         typedef atomics::atomic<size_t> sequence_type;
102         struct cell_type
103         {
104             sequence_type   sequence;
105             value_type      data;
106
107             cell_type()
108             {}
109         };
110
111 #   ifndef CDS_CXX11_LAMBDA_SUPPORT
112         struct copy_construct {
113             void operator()( value_type& dest, value_type const& src )
114             {
115                 new ( &dest ) value_type( src );
116             }
117         };
118 #   endif
119
120         typedef cds::details::trivial_assign< value_type, value_type > copy_assign;
121
122         typedef typename options::buffer::template rebind<cell_type>::other buffer;
123         typedef typename opt::details::alignment_setter< sequence_type, options::alignment >::type aligned_sequence_type;
124         typedef typename opt::details::alignment_setter< buffer, options::alignment >::type aligned_buffer;
125         //@endcond
126
127     protected:
128         //@cond
129         aligned_buffer  m_buffer;
130         size_t const    m_nBufferMask;
131         aligned_sequence_type m_posEnqueue;
132         aligned_sequence_type m_posDequeue;
133         item_counter    m_ItemCounter;
134         //@endcond
135
136     public:
137         /// Constructs the queue of capacity \p nCapacity
138         /**
139             For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
140         */
141         VyukovMPMCCycleQueue(
142             size_t nCapacity = 0
143             )
144             : m_buffer( nCapacity )
145             , m_nBufferMask( m_buffer.capacity() - 1 )
146         {
147             nCapacity = m_buffer.capacity();
148
149             // Buffer capacity must be power of 2
150             assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
151
152             for (size_t i = 0; i != nCapacity; i += 1)
153                 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
154
155             m_posEnqueue.store(0, memory_model::memory_order_relaxed);
156             m_posDequeue.store(0, memory_model::memory_order_relaxed);
157         }
158
159         ~VyukovMPMCCycleQueue()
160         {
161             clear();
162         }
163
164         /// Enqueues \p data to queue using copy functor
165         /** @anchor cds_container_VyukovMPMCCycleQueue_enqueue_func
166             \p Func is a functor called to copy value \p data of type \p Source
167             which may be differ from type \p T stored in the queue.
168             The functor's interface is:
169             \code
170                 struct myFunctor {
171                     void operator()(T& dest, Source const& data)
172                     {
173                         // // Code to copy \p data to \p dest
174                         dest = data;
175                     }
176                 };
177             \endcode
178             You may use \p boost:ref construction to pass functor \p f by reference.
179
180             <b>Requirements</b> The functor \p Func should not throw any exception.
181         */
182         template <typename Source, typename Func>
183         bool enqueue(Source const& data, Func func)
184         {
185             cell_type* cell;
186             size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
187
188             for (;;)
189             {
190                 cell = &m_buffer[pos & m_nBufferMask];
191                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
192
193                 intptr_t dif = (intptr_t)seq - (intptr_t)pos;
194
195                 if (dif == 0)
196                 {
197                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed) )
198                         break;
199                 }
200                 else if (dif < 0)
201                     return false;
202                 else
203                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
204             }
205
206             unref(func)( cell->data, data );
207
208             cell->sequence.store(pos + 1, memory_model::memory_order_release);
209             ++m_ItemCounter;
210
211             return true;
212         }
213
214         /// @anchor cds_container_VyukovMPMCCycleQueue_enqueue Enqueues \p data to queue
215         bool enqueue(value_type const& data )
216         {
217 #       ifdef CDS_CXX11_LAMBDA_SUPPORT
218             return enqueue( data, [](value_type& dest, value_type const& src){ new ( &dest ) value_type( src ); });
219 #       else
220             return enqueue( data, copy_construct() );
221 #       endif
222         }
223
224 #   ifdef CDS_EMPLACE_SUPPORT
225         /// Enqueues data of type \ref cds_container_VyukovMPMCCycleQueue_value_type "value_type" constructed with <tt>std::forward<Args>(args)...</tt>
226         /**
227             This function is available only for compiler that supports
228             variadic template and move semantics
229         */
230         template <typename... Args>
231         bool emplace( Args&&... args )
232         {
233             cell_type* cell;
234             size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
235
236             for (;;)
237             {
238                 cell = &m_buffer[pos & m_nBufferMask];
239                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
240
241                 intptr_t dif = (intptr_t)seq - (intptr_t)pos;
242
243                 if (dif == 0)
244                 {
245                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed) )
246                         break;
247                 }
248                 else if (dif < 0)
249                     return false;
250                 else
251                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
252             }
253
254             new ( &cell->data ) value_type( std::forward<Args>(args)... );
255
256             cell->sequence.store(pos + 1, memory_model::memory_order_release);
257             ++m_ItemCounter;
258
259             return true;
260
261         }
262 #   endif
263
264         /// Dequeues an item from queue
265         /** @anchor cds_container_VyukovMPMCCycleQueue_dequeue_func
266             \p Func is a functor called to copy dequeued value of type \p T to \p dest of type \p Dest.
267             The functor's interface is:
268             \code
269             struct myFunctor {
270             void operator()(Dest& dest, T const& data)
271             {
272                 // // Code to copy \p data to \p dest
273                 dest = data;
274             }
275             };
276             \endcode
277             You may use \p boost:ref construction to pass functor \p func by reference.
278
279             <b>Requirements</b> The functor \p Func should not throw any exception.
280         */
281         template <typename Dest, typename Func>
282         bool dequeue( Dest& data, Func func )
283         {
284             cell_type * cell;
285             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
286
287             for (;;)
288             {
289                 cell = &m_buffer[pos & m_nBufferMask];
290                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
291                 intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
292
293                 if (dif == 0) {
294                     if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed))
295                         break;
296                 }
297                 else if (dif < 0)
298                     return false;
299                 else
300                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
301             }
302
303             unref(func)( data, cell->data );
304             value_cleaner()( cell->data );
305             --m_ItemCounter;
306             cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
307
308             return true;
309         }
310
311         /// Dequeues an item from queue to \p data
312         /** @anchor cds_container_VyukovMPMCCycleQueue_dequeue
313             If queue is empty, returns \p false, \p data is unchanged.
314         */
315         bool dequeue(value_type & data )
316         {
317             return dequeue( data, copy_assign() );
318         }
319
320         /// Synonym of \ref cds_container_VyukovMPMCCycleQueue_enqueue "enqueue"
321         bool push(value_type const& data)
322         {
323             return enqueue(data);
324         }
325
326         /// Synonym for template version of \ref cds_container_VyukovMPMCCycleQueue_enqueue_func "enqueue" function
327         template <typename Source, typename Func>
328         bool push( const Source& data, Func f  )
329         {
330             return enqueue( data, f );
331         }
332
333         /// Synonym of \ref cds_container_VyukovMPMCCycleQueue_dequeue "dequeue"
334         bool pop(value_type& data)
335         {
336             return dequeue(data);
337         }
338
339         /// Synonym for template version of \ref cds_container_VyukovMPMCCycleQueue_dequeue_func "dequeue" function
340         template <typename Type, typename Func>
341         bool pop( Type& dest, Func f )
342         {
343             return dequeue( dest, f );
344         }
345
346         /// Checks if the queue is empty
347         bool empty() const
348         {
349             const cell_type * cell;
350             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
351
352             for (;;)
353             {
354                 cell = &m_buffer[pos & m_nBufferMask];
355                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
356                 intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
357
358                 if (dif == 0)
359                     return false;
360                 else if (dif < 0)
361                     return true;
362                 else
363                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
364             }
365         }
366
367         /// Clears the queue
368         void clear()
369         {
370             value_type v;
371             while ( pop(v) );
372         }
373
374         /// Returns queue's item count
375         /**
376             The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
377             this function always returns 0.
378         */
379         size_t size() const
380         {
381             return m_ItemCounter.value();
382         }
383
384         /// Returns capacity of cyclic buffer
385         size_t capacity() const
386         {
387             return m_buffer.capacity();
388         }
389     };
390 }}  // namespace cds::container
391
392 #endif // #ifndef __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H