Remove CDS_EMPLACE_SUPPORT macro and unused code
[libcds.git] / cds / container / vyukov_mpmc_cycle_queue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
4 #define __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
5
6 #include <cds/container/base.h>
7 #include <cds/opt/buffer.h>
8 #include <cds/opt/value_cleaner.h>
9 #include <cds/cxx11_atomic.h>
10 #include <cds/ref.h>
11 #include <cds/details/trivial_assign.h>
12 #include <cds/details/bounded_container.h>
13
14 namespace cds { namespace container {
15
16     /// Vyukov's MPMC bounded queue
17     /** @ingroup cds_nonintrusive_queue
18         This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
19         It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
20         blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
21         just implemented by means of atomic RMW operations w/o mutexes.
22
23         The cost of enqueue/dequeue is 1 CAS per operation.
24         No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
25         i.e. do not touch the same data while queue is not empty.
26
27         \par Source:
28             http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
29
30         \par Template parameters
31             \li \p T - type stored in queue.
32             \li \p Options - queue's options
33
34         Options \p Options are:
35         - opt::buffer - buffer to store items. Mandatory option, see option description for full list of possible types.
36         - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter
37         - opt::value_cleaner - a functor to clean item dequeued. Default value is \ref opt::v::destruct_cleaner
38             that calls the destructor of type \p T.
39             After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied. If \p T
40             is a complex type, \p value_cleaner may be the useful feature.
41         - opt::alignment - the alignment for internal queue data. Default is opt::cache_line_alignment
42         - opt::memory_model - C++ memory ordering model. Can be opt::v::relaxed_ordering (relaxed memory model, the default)
43             or opt::v::sequential_consistent (sequentially consisnent memory model).
44
45         \par License
46             Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
47
48         \par Example
49         \code
50         #include <cds/container/vyukov_mpmc_cycle_queue.h>
51
52         // // Queue with 1024 item static buffer
53         cds::container::vyukov_mpmc_bounded<
54             int
55             ,cds::opt::buffer< cds::opt::v::static_buffer<int, 1024> >
56         > myQueue;
57         \endcode
58     */
59     template <typename T, CDS_DECL_OPTIONS6>
60     class VyukovMPMCCycleQueue
61         : public cds::bounded_container
62     {
63     protected:
64         //@cond
65         struct default_options
66         {
67             typedef cds::opt::v::destruct_cleaner  value_cleaner;
68             typedef atomicity::empty_item_counter item_counter;
69             typedef opt::v::empty_disposer      disposer    ;   // for intrusive version only
70             typedef opt::v::relaxed_ordering    memory_model;
71             enum { alignment = opt::cache_line_alignment };
72         };
73         //@endcond
74
75     public:
76         //@cond
77         typedef typename opt::make_options<
78             typename cds::opt::find_type_traits< default_options, CDS_OPTIONS6 >::type
79             ,CDS_OPTIONS6
80         >::type   options;
81         //@endcond
82
83     protected:
84         //@cond
85         typedef typename options::value_cleaner  value_cleaner;
86         //@endcond
87
88     public:
89         typedef T value_type    ;   ///< @anchor cds_container_VyukovMPMCCycleQueue_value_type type of value stored in the queue
90         typedef typename options::item_counter  item_counter ;  ///< Item counter type
91         typedef typename options::memory_model  memory_model ;  ///< Memory ordering. See cds::opt::memory_model option
92
93         /// Rebind template arguments
94         template <typename T2, CDS_DECL_OTHER_OPTIONS6>
95         struct rebind {
96             typedef VyukovMPMCCycleQueue< T2, CDS_OTHER_OPTIONS6> other   ;   ///< Rebinding result
97         };
98
99     protected:
100         //@cond
101         typedef atomics::atomic<size_t> sequence_type;
102         struct cell_type
103         {
104             sequence_type   sequence;
105             value_type      data;
106
107             cell_type()
108             {}
109         };
110
111 #   ifndef CDS_CXX11_LAMBDA_SUPPORT
112         struct copy_construct {
113             void operator()( value_type& dest, value_type const& src )
114             {
115                 new ( &dest ) value_type( src );
116             }
117         };
118 #   endif
119
120         typedef cds::details::trivial_assign< value_type, value_type > copy_assign;
121
122         typedef typename options::buffer::template rebind<cell_type>::other buffer;
123         typedef typename opt::details::alignment_setter< sequence_type, options::alignment >::type aligned_sequence_type;
124         typedef typename opt::details::alignment_setter< buffer, options::alignment >::type aligned_buffer;
125         //@endcond
126
127     protected:
128         //@cond
129         aligned_buffer  m_buffer;
130         size_t const    m_nBufferMask;
131         aligned_sequence_type m_posEnqueue;
132         aligned_sequence_type m_posDequeue;
133         item_counter    m_ItemCounter;
134         //@endcond
135
136     public:
137         /// Constructs the queue of capacity \p nCapacity
138         /**
139             For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
140         */
141         VyukovMPMCCycleQueue(
142             size_t nCapacity = 0
143             )
144             : m_buffer( nCapacity )
145             , m_nBufferMask( m_buffer.capacity() - 1 )
146         {
147             nCapacity = m_buffer.capacity();
148
149             // Buffer capacity must be power of 2
150             assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
151
152             for (size_t i = 0; i != nCapacity; i += 1)
153                 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
154
155             m_posEnqueue.store(0, memory_model::memory_order_relaxed);
156             m_posDequeue.store(0, memory_model::memory_order_relaxed);
157         }
158
159         ~VyukovMPMCCycleQueue()
160         {
161             clear();
162         }
163
164         /// Enqueues \p data to queue using copy functor
165         /** @anchor cds_container_VyukovMPMCCycleQueue_enqueue_func
166             \p Func is a functor called to copy value \p data of type \p Source
167             which may be differ from type \p T stored in the queue.
168             The functor's interface is:
169             \code
170                 struct myFunctor {
171                     void operator()(T& dest, Source const& data)
172                     {
173                         // // Code to copy \p data to \p dest
174                         dest = data;
175                     }
176                 };
177             \endcode
178             You may use \p boost:ref construction to pass functor \p f by reference.
179
180             <b>Requirements</b> The functor \p Func should not throw any exception.
181         */
182         template <typename Source, typename Func>
183         bool enqueue(Source const& data, Func func)
184         {
185             cell_type* cell;
186             size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
187
188             for (;;)
189             {
190                 cell = &m_buffer[pos & m_nBufferMask];
191                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
192
193                 intptr_t dif = (intptr_t)seq - (intptr_t)pos;
194
195                 if (dif == 0)
196                 {
197                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed) )
198                         break;
199                 }
200                 else if (dif < 0)
201                     return false;
202                 else
203                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
204             }
205
206             unref(func)( cell->data, data );
207
208             cell->sequence.store(pos + 1, memory_model::memory_order_release);
209             ++m_ItemCounter;
210
211             return true;
212         }
213
214         /// @anchor cds_container_VyukovMPMCCycleQueue_enqueue Enqueues \p data to queue
215         bool enqueue(value_type const& data )
216         {
217 #       ifdef CDS_CXX11_LAMBDA_SUPPORT
218             return enqueue( data, [](value_type& dest, value_type const& src){ new ( &dest ) value_type( src ); });
219 #       else
220             return enqueue( data, copy_construct() );
221 #       endif
222         }
223
224         /// Enqueues data of type \ref cds_container_VyukovMPMCCycleQueue_value_type "value_type" constructed with <tt>std::forward<Args>(args)...</tt>
225         template <typename... Args>
226         bool emplace( Args&&... args )
227         {
228             cell_type* cell;
229             size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
230
231             for (;;)
232             {
233                 cell = &m_buffer[pos & m_nBufferMask];
234                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
235
236                 intptr_t dif = (intptr_t)seq - (intptr_t)pos;
237
238                 if (dif == 0)
239                 {
240                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed) )
241                         break;
242                 }
243                 else if (dif < 0)
244                     return false;
245                 else
246                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
247             }
248
249             new ( &cell->data ) value_type( std::forward<Args>(args)... );
250
251             cell->sequence.store(pos + 1, memory_model::memory_order_release);
252             ++m_ItemCounter;
253
254             return true;
255         }
256
257         /// Dequeues an item from queue
258         /** @anchor cds_container_VyukovMPMCCycleQueue_dequeue_func
259             \p Func is a functor called to copy dequeued value of type \p T to \p dest of type \p Dest.
260             The functor's interface is:
261             \code
262             struct myFunctor {
263             void operator()(Dest& dest, T const& data)
264             {
265                 // // Code to copy \p data to \p dest
266                 dest = data;
267             }
268             };
269             \endcode
270             You may use \p boost:ref construction to pass functor \p func by reference.
271
272             <b>Requirements</b> The functor \p Func should not throw any exception.
273         */
274         template <typename Dest, typename Func>
275         bool dequeue( Dest& data, Func func )
276         {
277             cell_type * cell;
278             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
279
280             for (;;)
281             {
282                 cell = &m_buffer[pos & m_nBufferMask];
283                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
284                 intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
285
286                 if (dif == 0) {
287                     if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed))
288                         break;
289                 }
290                 else if (dif < 0)
291                     return false;
292                 else
293                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
294             }
295
296             unref(func)( data, cell->data );
297             value_cleaner()( cell->data );
298             --m_ItemCounter;
299             cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
300
301             return true;
302         }
303
304         /// Dequeues an item from queue to \p data
305         /** @anchor cds_container_VyukovMPMCCycleQueue_dequeue
306             If queue is empty, returns \p false, \p data is unchanged.
307         */
308         bool dequeue(value_type & data )
309         {
310             return dequeue( data, copy_assign() );
311         }
312
313         /// Synonym of \ref cds_container_VyukovMPMCCycleQueue_enqueue "enqueue"
314         bool push(value_type const& data)
315         {
316             return enqueue(data);
317         }
318
319         /// Synonym for template version of \ref cds_container_VyukovMPMCCycleQueue_enqueue_func "enqueue" function
320         template <typename Source, typename Func>
321         bool push( const Source& data, Func f  )
322         {
323             return enqueue( data, f );
324         }
325
326         /// Synonym of \ref cds_container_VyukovMPMCCycleQueue_dequeue "dequeue"
327         bool pop(value_type& data)
328         {
329             return dequeue(data);
330         }
331
332         /// Synonym for template version of \ref cds_container_VyukovMPMCCycleQueue_dequeue_func "dequeue" function
333         template <typename Type, typename Func>
334         bool pop( Type& dest, Func f )
335         {
336             return dequeue( dest, f );
337         }
338
339         /// Checks if the queue is empty
340         bool empty() const
341         {
342             const cell_type * cell;
343             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
344
345             for (;;)
346             {
347                 cell = &m_buffer[pos & m_nBufferMask];
348                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
349                 intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
350
351                 if (dif == 0)
352                     return false;
353                 else if (dif < 0)
354                     return true;
355                 else
356                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
357             }
358         }
359
360         /// Clears the queue
361         void clear()
362         {
363             value_type v;
364             while ( pop(v) );
365         }
366
367         /// Returns queue's item count
368         /**
369             The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
370             this function always returns 0.
371         */
372         size_t size() const
373         {
374             return m_ItemCounter.value();
375         }
376
377         /// Returns capacity of cyclic buffer
378         size_t capacity() const
379         {
380             return m_buffer.capacity();
381         }
382     };
383 }}  // namespace cds::container
384
385 #endif // #ifndef __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H