1d7aa89974f277d199e08292888e9a8a44815175
[libcds.git] / cds / container / vyukov_mpmc_cycle_queue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
4 #define __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
5
6 #include <cds/container/details/base.h>
7 #include <cds/opt/buffer.h>
8 #include <cds/opt/value_cleaner.h>
9 #include <cds/cxx11_atomic.h>
10 #include <cds/ref.h>
11 #include <cds/details/trivial_assign.h>
12 #include <cds/details/bounded_container.h>
13
14 namespace cds { namespace container {
15
16     /// Vyukov's MPMC bounded queue
17     /** @ingroup cds_nonintrusive_queue
18         This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
19         It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
20         blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
21         just implemented by means of atomic RMW operations w/o mutexes.
22
23         The cost of enqueue/dequeue is 1 CAS per operation.
24         No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
25         i.e. do not touch the same data while queue is not empty.
26
27         \par Source:
28             http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
29
30         \par Template parameters
31             \li \p T - type stored in queue.
32             \li \p Options - queue's options
33
34         Options \p Options are:
35         - opt::buffer - buffer to store items. Mandatory option, see option description for full list of possible types.
36         - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter
37         - opt::value_cleaner - a functor to clean item dequeued. Default value is \ref opt::v::destruct_cleaner
38             that calls the destructor of type \p T.
39             After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied. If \p T
40             is a complex type, \p value_cleaner may be the useful feature.
41         - opt::alignment - the alignment for internal queue data. Default is opt::cache_line_alignment
42         - opt::memory_model - C++ memory ordering model. Can be opt::v::relaxed_ordering (relaxed memory model, the default)
43             or opt::v::sequential_consistent (sequentially consisnent memory model).
44
45         \par License
46             Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
47
48         \par Example
49         \code
50         #include <cds/container/vyukov_mpmc_cycle_queue.h>
51
52         // // Queue with 1024 item static buffer
53         cds::container::vyukov_mpmc_bounded<
54             int
55             ,cds::opt::buffer< cds::opt::v::static_buffer<int, 1024> >
56         > myQueue;
57         \endcode
58     */
59     template <typename T, typename... Options>
60     class VyukovMPMCCycleQueue
61         : public cds::bounded_container
62     {
63     protected:
64         //@cond
65         struct default_options
66         {
67             typedef cds::opt::v::destruct_cleaner  value_cleaner;
68             typedef atomicity::empty_item_counter item_counter;
69             typedef opt::v::empty_disposer      disposer    ;   // for intrusive version only
70             typedef opt::v::relaxed_ordering    memory_model;
71             enum { alignment = opt::cache_line_alignment };
72         };
73         //@endcond
74
75     public:
76         //@cond
77         typedef typename opt::make_options<
78             typename cds::opt::find_type_traits< default_options, Options... >::type
79             ,Options...
80         >::type   options;
81         //@endcond
82
83     protected:
84         //@cond
85         typedef typename options::value_cleaner  value_cleaner;
86         //@endcond
87
88     public:
89         typedef T value_type    ;   ///< @anchor cds_container_VyukovMPMCCycleQueue_value_type type of value stored in the queue
90         typedef typename options::item_counter  item_counter ;  ///< Item counter type
91         typedef typename options::memory_model  memory_model ;  ///< Memory ordering. See cds::opt::memory_model option
92
93         /// Rebind template arguments
94         template <typename T2, typename... Options2>
95         struct rebind {
96             typedef VyukovMPMCCycleQueue< T2, Options2...> other   ;   ///< Rebinding result
97         };
98
99     protected:
100         //@cond
101         typedef atomics::atomic<size_t> sequence_type;
102         struct cell_type
103         {
104             sequence_type   sequence;
105             value_type      data;
106
107             cell_type()
108             {}
109         };
110
111         typedef cds::details::trivial_assign< value_type, value_type > copy_assign;
112
113         typedef typename options::buffer::template rebind<cell_type>::other buffer;
114         typedef typename opt::details::alignment_setter< sequence_type, options::alignment >::type aligned_sequence_type;
115         typedef typename opt::details::alignment_setter< buffer, options::alignment >::type aligned_buffer;
116         //@endcond
117
118     protected:
119         //@cond
120         aligned_buffer  m_buffer;
121         size_t const    m_nBufferMask;
122         aligned_sequence_type m_posEnqueue;
123         aligned_sequence_type m_posDequeue;
124         item_counter    m_ItemCounter;
125         //@endcond
126
127     public:
128         /// Constructs the queue of capacity \p nCapacity
129         /**
130             For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
131         */
132         VyukovMPMCCycleQueue(
133             size_t nCapacity = 0
134             )
135             : m_buffer( nCapacity )
136             , m_nBufferMask( m_buffer.capacity() - 1 )
137         {
138             nCapacity = m_buffer.capacity();
139
140             // Buffer capacity must be power of 2
141             assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
142
143             for (size_t i = 0; i != nCapacity; i += 1)
144                 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
145
146             m_posEnqueue.store(0, memory_model::memory_order_relaxed);
147             m_posDequeue.store(0, memory_model::memory_order_relaxed);
148         }
149
150         ~VyukovMPMCCycleQueue()
151         {
152             clear();
153         }
154
155         /// Enqueues \p data to queue using copy functor
156         /** @anchor cds_container_VyukovMPMCCycleQueue_enqueue_func
157             \p Func is a functor called to copy value \p data of type \p Source
158             which may be differ from type \p T stored in the queue.
159             The functor's interface is:
160             \code
161                 struct myFunctor {
162                     void operator()(T& dest, Source const& data)
163                     {
164                         // // Code to copy \p data to \p dest
165                         dest = data;
166                     }
167                 };
168             \endcode
169             You may use \p boost:ref construction to pass functor \p f by reference.
170
171             <b>Requirements</b> The functor \p Func should not throw any exception.
172         */
173         template <typename Source, typename Func>
174         bool enqueue(Source const& data, Func func)
175         {
176             cell_type* cell;
177             size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
178
179             for (;;)
180             {
181                 cell = &m_buffer[pos & m_nBufferMask];
182                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
183
184                 intptr_t dif = (intptr_t)seq - (intptr_t)pos;
185
186                 if (dif == 0)
187                 {
188                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed) )
189                         break;
190                 }
191                 else if (dif < 0)
192                     return false;
193                 else
194                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
195             }
196
197             unref(func)( cell->data, data );
198
199             cell->sequence.store(pos + 1, memory_model::memory_order_release);
200             ++m_ItemCounter;
201
202             return true;
203         }
204
205         /// @anchor cds_container_VyukovMPMCCycleQueue_enqueue Enqueues \p data to queue
206         bool enqueue(value_type const& data )
207         {
208             return enqueue( data, [](value_type& dest, value_type const& src){ new ( &dest ) value_type( src ); });
209         }
210
211         /// Enqueues data of type \ref cds_container_VyukovMPMCCycleQueue_value_type "value_type" constructed with <tt>std::forward<Args>(args)...</tt>
212         template <typename... Args>
213         bool emplace( Args&&... args )
214         {
215             cell_type* cell;
216             size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
217
218             for (;;)
219             {
220                 cell = &m_buffer[pos & m_nBufferMask];
221                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
222
223                 intptr_t dif = (intptr_t)seq - (intptr_t)pos;
224
225                 if (dif == 0)
226                 {
227                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed) )
228                         break;
229                 }
230                 else if (dif < 0)
231                     return false;
232                 else
233                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
234             }
235
236             new ( &cell->data ) value_type( std::forward<Args>(args)... );
237
238             cell->sequence.store(pos + 1, memory_model::memory_order_release);
239             ++m_ItemCounter;
240
241             return true;
242         }
243
244         /// Dequeues an item from queue
245         /** @anchor cds_container_VyukovMPMCCycleQueue_dequeue_func
246             \p Func is a functor called to copy dequeued value of type \p T to \p dest of type \p Dest.
247             The functor's interface is:
248             \code
249             struct myFunctor {
250             void operator()(Dest& dest, T const& data)
251             {
252                 // // Code to copy \p data to \p dest
253                 dest = data;
254             }
255             };
256             \endcode
257             You may use \p boost:ref construction to pass functor \p func by reference.
258
259             <b>Requirements</b> The functor \p Func should not throw any exception.
260         */
261         template <typename Dest, typename Func>
262         bool dequeue( Dest& data, Func func )
263         {
264             cell_type * cell;
265             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
266
267             for (;;)
268             {
269                 cell = &m_buffer[pos & m_nBufferMask];
270                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
271                 intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
272
273                 if (dif == 0) {
274                     if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed))
275                         break;
276                 }
277                 else if (dif < 0)
278                     return false;
279                 else
280                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
281             }
282
283             unref(func)( data, cell->data );
284             value_cleaner()( cell->data );
285             --m_ItemCounter;
286             cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
287
288             return true;
289         }
290
291         /// Dequeues an item from queue to \p data
292         /** @anchor cds_container_VyukovMPMCCycleQueue_dequeue
293             If queue is empty, returns \p false, \p data is unchanged.
294         */
295         bool dequeue(value_type & data )
296         {
297             return dequeue( data, copy_assign() );
298         }
299
300         /// Synonym of \ref cds_container_VyukovMPMCCycleQueue_enqueue "enqueue"
301         bool push(value_type const& data)
302         {
303             return enqueue(data);
304         }
305
306         /// Synonym for template version of \ref cds_container_VyukovMPMCCycleQueue_enqueue_func "enqueue" function
307         template <typename Source, typename Func>
308         bool push( const Source& data, Func f  )
309         {
310             return enqueue( data, f );
311         }
312
313         /// Synonym of \ref cds_container_VyukovMPMCCycleQueue_dequeue "dequeue"
314         bool pop(value_type& data)
315         {
316             return dequeue(data);
317         }
318
319         /// Synonym for template version of \ref cds_container_VyukovMPMCCycleQueue_dequeue_func "dequeue" function
320         template <typename Type, typename Func>
321         bool pop( Type& dest, Func f )
322         {
323             return dequeue( dest, f );
324         }
325
326         /// Checks if the queue is empty
327         bool empty() const
328         {
329             const cell_type * cell;
330             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
331
332             for (;;)
333             {
334                 cell = &m_buffer[pos & m_nBufferMask];
335                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
336                 intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
337
338                 if (dif == 0)
339                     return false;
340                 else if (dif < 0)
341                     return true;
342                 else
343                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
344             }
345         }
346
347         /// Clears the queue
348         void clear()
349         {
350             value_type v;
351             while ( pop(v) );
352         }
353
354         /// Returns queue's item count
355         /**
356             The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
357             this function always returns 0.
358         */
359         size_t size() const
360         {
361             return m_ItemCounter.value();
362         }
363
364         /// Returns capacity of cyclic buffer
365         size_t capacity() const
366         {
367             return m_buffer.capacity();
368         }
369     };
370 }}  // namespace cds::container
371
372 #endif // #ifndef __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H