issue#11: cds: changed __CDS_ guard prefix to CDSLIB_ for all .h files
[libcds.git] / cds / container / vyukov_mpmc_cycle_queue.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
4 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
5
6 #include <cds/container/details/base.h>
7 #include <cds/opt/buffer.h>
8 #include <cds/opt/value_cleaner.h>
9 #include <cds/algo/atomic.h>
10 #include <cds/details/bounded_container.h>
11
12 namespace cds { namespace container {
13
14     /// VyukovMPMCCycleQueue related definitions
15     /** @ingroup cds_nonintrusive_helper
16     */
17     namespace vyukov_queue {
18
19         /// VyukovMPMCCycleQueue default traits
20         struct traits {
21             /// Buffer type for internal array
22             /*
23                 The type of element for the buffer is not important: the queue rebinds
24                 buffer for required type via \p rebind metafunction.
25
26                 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
27             */
28             typedef cds::opt::v::dynamic_buffer< void * > buffer;
29
30             /// A functor to clean item dequeued.
31             /**
32                 The functor  calls the destructor for queue item.
33                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
34                 If \p T is a complex type, \p value_cleaner may be the useful feature.
35
36                 Default value is \ref opt::v::destruct_cleaner
37             */
38             typedef cds::opt::v::destruct_cleaner value_cleaner;
39
40             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
41             typedef cds::atomicity::empty_item_counter item_counter;
42
43             /// C++ memory ordering model
44             /**
45                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
46                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
47             */
48             typedef opt::v::relaxed_ordering    memory_model;
49
50             /// Alignment for internal queue data. Default is \p opt::cache_line_alignment
51             enum { alignment = opt::cache_line_alignment };
52         };
53
54         /// Metafunction converting option list to \p vyukov_queue::traits
55         /**
56             Supported \p Options are:
57             - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
58                 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
59                 element in the buffer is not important: it will be changed via \p rebind metafunction.
60             - \p opt::value_cleaner - a functor to clean item dequeued.
61                 The functor calls the destructor for queue item.
62                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
63                 If \p T is a complex type, \p value_cleaner may be the useful feature.
64                 Default value is \ref opt::v::destruct_cleaner
65             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
66                 To enable item counting use \p cds::atomicity::item_counter
67             - \p opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment
68             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
69                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
70
71             Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
72             \code
73             typedef cds::container::VyukovMPMCCycleQueue< Foo,
74                 typename cds::container::vyukov_queue::make_traits<
75                     cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
76                     cds::opt::item_counte< cds::atomicity::item_counter >
77                 >::type
78             > myQueue;
79             \endcode
80         */
81         template <typename... Options>
82         struct make_traits {
83 #   ifdef CDS_DOXYGEN_INVOKED
84             typedef implementation_defined type;   ///< Metafunction result
85 #   else
86             typedef typename cds::opt::make_options<
87                 typename cds::opt::find_type_traits< traits, Options... >::type
88                 , Options...
89             >::type type;
90 #   endif
91         };
92
93     } //namespace vyukov_queue
94
95     /// Vyukov's MPMC bounded queue
96     /** @ingroup cds_nonintrusive_queue
97         This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
98         It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
99         blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
100         just implemented by means of atomic RMW operations w/o mutexes.
101
102         The cost of enqueue/dequeue is 1 CAS per operation.
103         No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
104         i.e. do not touch the same data while queue is not empty.
105
106         Source:
107             - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
108
109         Template parameters
110         - \p T - type stored in queue.
111         - \p Traits - queue traits, default is \p vykov_queue::traits. You can use \p vykov_queue::make_traits
112             metafunction to make your traits or just derive your traits from \p %vykov_queue::traits:
113             \code
114             struct myTraits: public cds::container::vykov_queue::traits {
115                 typedef cds::atomicity::item_counter    item_counter;
116             };
117             typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
118
119             // Equivalent make_traits example:
120             typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
121                 typename cds::container::vykov_queue::make_traits<
122                     cds::opt::item_counter< cds::atomicity::item_counter >
123                 >::type
124             > myQueue;
125             \endcode
126
127         \par License
128             Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
129     */
130     template <typename T, typename Traits = vyukov_queue::traits >
131     class VyukovMPMCCycleQueue : public cds::bounded_container
132     {
133     public:
134         typedef T value_type;   ///< Value type to be stored in the queue
135         typedef Traits traits;  ///< Queue traits
136         typedef typename traits::item_counter  item_counter;  ///< Item counter type
137         typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See cds::opt::memory_model option
138         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
139
140         /// Rebind template arguments
141         template <typename T2, typename Traits2>
142         struct rebind {
143             typedef VyukovMPMCCycleQueue< T2, Traits2 > other   ;   ///< Rebinding result
144         };
145
146     protected:
147         //@cond
148         typedef atomics::atomic<size_t> sequence_type;
149         struct cell_type
150         {
151             sequence_type   sequence;
152             value_type      data;
153
154             cell_type()
155             {}
156         };
157
158         typedef typename traits::buffer::template rebind<cell_type>::other buffer;
159         typedef typename opt::details::alignment_setter< sequence_type, traits::alignment >::type aligned_sequence_type;
160         typedef typename opt::details::alignment_setter< buffer, traits::alignment >::type aligned_buffer;
161         //@endcond
162
163     protected:
164         //@cond
165         aligned_buffer  m_buffer;
166         size_t const    m_nBufferMask;
167         aligned_sequence_type m_posEnqueue;
168         aligned_sequence_type m_posDequeue;
169         item_counter    m_ItemCounter;
170         //@endcond
171
172     public:
173         /// Constructs the queue of capacity \p nCapacity
174         /**
175             For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
176         */
177         VyukovMPMCCycleQueue(
178             size_t nCapacity = 0
179             )
180             : m_buffer( nCapacity )
181             , m_nBufferMask( m_buffer.capacity() - 1 )
182         {
183             nCapacity = m_buffer.capacity();
184
185             // Buffer capacity must be power of 2
186             assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
187
188             for (size_t i = 0; i != nCapacity; i += 1)
189                 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
190
191             m_posEnqueue.store(0, memory_model::memory_order_relaxed);
192             m_posDequeue.store(0, memory_model::memory_order_relaxed);
193         }
194
195         ~VyukovMPMCCycleQueue()
196         {
197             clear();
198         }
199
200         /// Enqueues data to the queue using a functor
201         /**
202             \p Func is a functor called to copy a value to the queue cell.
203             The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
204             \code
205             cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
206             Bar bar;
207             myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
208             \endcode
209         */
210         template <typename Func>
211         bool enqueue_with(Func f)
212         {
213             cell_type* cell;
214             size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
215
216             for (;;)
217             {
218                 cell = &m_buffer[pos & m_nBufferMask];
219                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
220
221                 intptr_t dif = (intptr_t)seq - (intptr_t)pos;
222
223                 if (dif == 0)
224                 {
225                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed) )
226                         break;
227                 }
228                 else if (dif < 0)
229                     return false;
230                 else
231                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
232             }
233
234             f( cell->data );
235
236             cell->sequence.store(pos + 1, memory_model::memory_order_release);
237             ++m_ItemCounter;
238
239             return true;
240         }
241
242         /// Enqueues \p val value into the queue.
243         /**
244             The new queue item is created by calling placement new in free cell.
245             Returns \p true if success, \p false if the queue is full.
246         */
247         bool enqueue( value_type const& val )
248         {
249             return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
250         }
251
252         /// Synonym for \p enqueue()
253         bool push( value_type const& data )
254         {
255             return enqueue( data );
256         }
257
258         /// Synonym for \p enqueue_with()
259         template <typename Func>
260         bool push_with( Func f )
261         {
262             return enqueue_with( f );
263         }
264
265         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
266         template <typename... Args>
267         bool emplace( Args&&... args )
268         {
269             cell_type* cell;
270             size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
271
272             for (;;)
273             {
274                 cell = &m_buffer[pos & m_nBufferMask];
275                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
276
277                 intptr_t dif = (intptr_t)seq - (intptr_t)pos;
278
279                 if (dif == 0)
280                 {
281                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed) )
282                         break;
283                 }
284                 else if (dif < 0)
285                     return false;
286                 else
287                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
288             }
289
290             new ( &cell->data ) value_type( std::forward<Args>(args)... );
291
292             cell->sequence.store(pos + 1, memory_model::memory_order_release);
293             ++m_ItemCounter;
294
295             return true;
296         }
297
298         /// Dequeues a value using a functor
299         /**
300             \p Func is a functor called to copy dequeued value.
301             The functor takes one argument - a reference to removed node:
302             \code
303             cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
304             Bar bar;
305             myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
306             \endcode
307             The functor is called only if the queue is not empty.
308         */
309         template <typename Func>
310         bool dequeue_with( Func f )
311         {
312             cell_type * cell;
313             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
314
315             for (;;)
316             {
317                 cell = &m_buffer[pos & m_nBufferMask];
318                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
319                 intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
320
321                 if (dif == 0) {
322                     if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed))
323                         break;
324                 }
325                 else if (dif < 0)
326                     return false;
327                 else
328                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
329             }
330
331             f( cell->data );
332             value_cleaner()( cell->data );
333             --m_ItemCounter;
334             cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
335
336             return true;
337         }
338
339         /// Dequeues a value from the queue
340         /**
341             If queue is not empty, the function returns \p true, \p dest contains copy of
342             dequeued value. The assignment operator for type \ref value_type is invoked.
343             If queue is empty, the function returns \p false, \p dest is unchanged.
344         */
345         bool dequeue(value_type & dest )
346         {
347             return dequeue_with( [&dest]( value_type& src ){ dest = src; } );
348         }
349
350         /// Synonym for \p dequeue()
351         bool pop(value_type& data)
352         {
353             return dequeue(data);
354         }
355
356         /// Synonym for \p dequeue_with()
357         template <typename Func>
358         bool pop_with( Func f )
359         {
360             return dequeue_with( f );
361         }
362
363         /// Checks if the queue is empty
364         bool empty() const
365         {
366             const cell_type * cell;
367             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
368
369             for (;;)
370             {
371                 cell = &m_buffer[pos & m_nBufferMask];
372                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
373                 intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
374
375                 if (dif == 0)
376                     return false;
377                 else if (dif < 0)
378                     return true;
379                 else
380                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
381             }
382         }
383
384         /// Clears the queue
385         void clear()
386         {
387             value_type v;
388             while ( pop(v) );
389         }
390
391         /// Returns queue's item count
392         /**
393             The value returned depends on \p vyukov_queue::traits::item_counter option.
394             For \p atomicity::empty_item_counter, this function always returns 0.
395         */
396         size_t size() const
397         {
398             return m_ItemCounter.value();
399         }
400
401         /// Returns capacity of the queue
402         size_t capacity() const
403         {
404             return m_buffer.capacity();
405         }
406     };
407 }}  // namespace cds::container
408
409 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H