3 #ifndef __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
4 #define __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
6 #include <cds/container/base.h>
7 #include <cds/opt/buffer.h>
8 #include <cds/opt/value_cleaner.h>
9 #include <cds/cxx11_atomic.h>
11 #include <cds/details/trivial_assign.h>
12 #include <cds/details/bounded_container.h>
14 namespace cds { namespace container {
16 /// Vyukov's MPMC bounded queue
17 /** @ingroup cds_nonintrusive_queue
18 This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
19 It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
20 blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
21 just implemented by means of atomic RMW operations w/o mutexes.
23 The cost of enqueue/dequeue is 1 CAS per operation.
24 No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
25 i.e. do not touch the same data while queue is not empty.
28 http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
30 \par Template parameters
31 \li \p T - type stored in queue.
32 \li \p Options - queue's options
34 Options \p Options are:
35 - opt::buffer - buffer to store items. Mandatory option, see option description for full list of possible types.
36 - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter
37 - opt::value_cleaner - a functor to clean item dequeued. Default value is \ref opt::v::destruct_cleaner
38 that calls the destructor of type \p T.
39 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied. If \p T
40 is a complex type, \p value_cleaner may be the useful feature.
41 - opt::alignment - the alignment for internal queue data. Default is opt::cache_line_alignment
42 - opt::memory_model - C++ memory ordering model. Can be opt::v::relaxed_ordering (relaxed memory model, the default)
43 or opt::v::sequential_consistent (sequentially consisnent memory model).
46 Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
50 #include <cds/container/vyukov_mpmc_cycle_queue.h>
52 // // Queue with 1024 item static buffer
53 cds::container::vyukov_mpmc_bounded<
55 ,cds::opt::buffer< cds::opt::v::static_buffer<int, 1024> >
59 template <typename T, typename... Options>
60 class VyukovMPMCCycleQueue
61 : public cds::bounded_container
65 struct default_options
67 typedef cds::opt::v::destruct_cleaner value_cleaner;
68 typedef atomicity::empty_item_counter item_counter;
69 typedef opt::v::empty_disposer disposer ; // for intrusive version only
70 typedef opt::v::relaxed_ordering memory_model;
71 enum { alignment = opt::cache_line_alignment };
77 typedef typename opt::make_options<
78 typename cds::opt::find_type_traits< default_options, Options... >::type
85 typedef typename options::value_cleaner value_cleaner;
89 typedef T value_type ; ///< @anchor cds_container_VyukovMPMCCycleQueue_value_type type of value stored in the queue
90 typedef typename options::item_counter item_counter ; ///< Item counter type
91 typedef typename options::memory_model memory_model ; ///< Memory ordering. See cds::opt::memory_model option
93 /// Rebind template arguments
94 template <typename T2, typename... Options2>
96 typedef VyukovMPMCCycleQueue< T2, Options2...> other ; ///< Rebinding result
101 typedef atomics::atomic<size_t> sequence_type;
104 sequence_type sequence;
111 typedef cds::details::trivial_assign< value_type, value_type > copy_assign;
113 typedef typename options::buffer::template rebind<cell_type>::other buffer;
114 typedef typename opt::details::alignment_setter< sequence_type, options::alignment >::type aligned_sequence_type;
115 typedef typename opt::details::alignment_setter< buffer, options::alignment >::type aligned_buffer;
120 aligned_buffer m_buffer;
121 size_t const m_nBufferMask;
122 aligned_sequence_type m_posEnqueue;
123 aligned_sequence_type m_posDequeue;
124 item_counter m_ItemCounter;
128 /// Constructs the queue of capacity \p nCapacity
130 For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
132 VyukovMPMCCycleQueue(
135 : m_buffer( nCapacity )
136 , m_nBufferMask( m_buffer.capacity() - 1 )
138 nCapacity = m_buffer.capacity();
140 // Buffer capacity must be power of 2
141 assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
143 for (size_t i = 0; i != nCapacity; i += 1)
144 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
146 m_posEnqueue.store(0, memory_model::memory_order_relaxed);
147 m_posDequeue.store(0, memory_model::memory_order_relaxed);
150 ~VyukovMPMCCycleQueue()
155 /// Enqueues \p data to queue using copy functor
156 /** @anchor cds_container_VyukovMPMCCycleQueue_enqueue_func
157 \p Func is a functor called to copy value \p data of type \p Source
158 which may be differ from type \p T stored in the queue.
159 The functor's interface is:
162 void operator()(T& dest, Source const& data)
164 // // Code to copy \p data to \p dest
169 You may use \p boost:ref construction to pass functor \p f by reference.
171 <b>Requirements</b> The functor \p Func should not throw any exception.
173 template <typename Source, typename Func>
174 bool enqueue(Source const& data, Func func)
177 size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
181 cell = &m_buffer[pos & m_nBufferMask];
182 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
184 intptr_t dif = (intptr_t)seq - (intptr_t)pos;
188 if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed) )
194 pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
197 unref(func)( cell->data, data );
199 cell->sequence.store(pos + 1, memory_model::memory_order_release);
205 /// @anchor cds_container_VyukovMPMCCycleQueue_enqueue Enqueues \p data to queue
206 bool enqueue(value_type const& data )
208 return enqueue( data, [](value_type& dest, value_type const& src){ new ( &dest ) value_type( src ); });
211 /// Enqueues data of type \ref cds_container_VyukovMPMCCycleQueue_value_type "value_type" constructed with <tt>std::forward<Args>(args)...</tt>
212 template <typename... Args>
213 bool emplace( Args&&... args )
216 size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
220 cell = &m_buffer[pos & m_nBufferMask];
221 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
223 intptr_t dif = (intptr_t)seq - (intptr_t)pos;
227 if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed) )
233 pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
236 new ( &cell->data ) value_type( std::forward<Args>(args)... );
238 cell->sequence.store(pos + 1, memory_model::memory_order_release);
244 /// Dequeues an item from queue
245 /** @anchor cds_container_VyukovMPMCCycleQueue_dequeue_func
246 \p Func is a functor called to copy dequeued value of type \p T to \p dest of type \p Dest.
247 The functor's interface is:
250 void operator()(Dest& dest, T const& data)
252 // // Code to copy \p data to \p dest
257 You may use \p boost:ref construction to pass functor \p func by reference.
259 <b>Requirements</b> The functor \p Func should not throw any exception.
261 template <typename Dest, typename Func>
262 bool dequeue( Dest& data, Func func )
265 size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
269 cell = &m_buffer[pos & m_nBufferMask];
270 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
271 intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
274 if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed))
280 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
283 unref(func)( data, cell->data );
284 value_cleaner()( cell->data );
286 cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
291 /// Dequeues an item from queue to \p data
292 /** @anchor cds_container_VyukovMPMCCycleQueue_dequeue
293 If queue is empty, returns \p false, \p data is unchanged.
295 bool dequeue(value_type & data )
297 return dequeue( data, copy_assign() );
300 /// Synonym of \ref cds_container_VyukovMPMCCycleQueue_enqueue "enqueue"
301 bool push(value_type const& data)
303 return enqueue(data);
306 /// Synonym for template version of \ref cds_container_VyukovMPMCCycleQueue_enqueue_func "enqueue" function
307 template <typename Source, typename Func>
308 bool push( const Source& data, Func f )
310 return enqueue( data, f );
313 /// Synonym of \ref cds_container_VyukovMPMCCycleQueue_dequeue "dequeue"
314 bool pop(value_type& data)
316 return dequeue(data);
319 /// Synonym for template version of \ref cds_container_VyukovMPMCCycleQueue_dequeue_func "dequeue" function
320 template <typename Type, typename Func>
321 bool pop( Type& dest, Func f )
323 return dequeue( dest, f );
326 /// Checks if the queue is empty
329 const cell_type * cell;
330 size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
334 cell = &m_buffer[pos & m_nBufferMask];
335 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
336 intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
343 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
354 /// Returns queue's item count
356 The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
357 this function always returns 0.
361 return m_ItemCounter.value();
364 /// Returns capacity of cyclic buffer
365 size_t capacity() const
367 return m_buffer.capacity();
370 }} // namespace cds::container
372 #endif // #ifndef __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H