Text formatting, docfix
[libcds.git] / cds / container / vyukov_mpmc_cycle_queue.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
4 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
5
6 #include <cds/container/details/base.h>
7 #include <cds/opt/buffer.h>
8 #include <cds/opt/value_cleaner.h>
9 #include <cds/algo/atomic.h>
10 #include <cds/details/bounded_container.h>
11
12 namespace cds { namespace container {
13
14     /// VyukovMPMCCycleQueue related definitions
15     /** @ingroup cds_nonintrusive_helper
16     */
17     namespace vyukov_queue {
18
19         /// VyukovMPMCCycleQueue default traits
20         struct traits {
21             /// Buffer type for internal array
22             /*
23                 The type of element for the buffer is not important: the queue rebinds
24                 buffer for required type via \p rebind metafunction.
25
26                 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
27             */
28             typedef cds::opt::v::dynamic_buffer< void * > buffer;
29
30             /// A functor to clean item dequeued.
31             /**
32                 The functor  calls the destructor for queue item.
33                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
34                 If \p T is a complex type, \p value_cleaner may be the useful feature.
35
36                 Default value is \ref opt::v::destruct_cleaner
37             */
38             typedef cds::opt::v::destruct_cleaner value_cleaner;
39
40             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
41             typedef cds::atomicity::empty_item_counter item_counter;
42
43             /// C++ memory ordering model
44             /**
45                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
46                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
47             */
48             typedef opt::v::relaxed_ordering    memory_model;
49
50             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
51             enum { padding = opt::cache_line_padding };
52
53             /// Back-off strategy
54             typedef cds::backoff::Default           back_off;
55
56             /// Single-consumer version
57             /**
58                 For single-consumer version of algorithm some additional functions
59
60                 (\p front(), \p pop_front()) is available.
61
62                 Default is \p false
63             */
64             static CDS_CONSTEXPR bool const single_consumer = false;
65         };
66
67         /// Metafunction converting option list to \p vyukov_queue::traits
68         /**
69             Supported \p Options are:
70             - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
71                 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
72                 element in the buffer is not important: it will be changed via \p rebind metafunction.
73             - \p opt::value_cleaner - a functor to clean item dequeued.
74                 The functor calls the destructor for queue item.
75                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
76                 If \p T is a complex type, \p value_cleaner can be an useful feature.
77                 Default value is \ref opt::v::destruct_cleaner
78             - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
79             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
80                 To enable item counting use \p cds::atomicity::item_counter
81             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
82             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
83                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
84
85             Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
86             \code
87             typedef cds::container::VyukovMPMCCycleQueue< Foo,
88                 typename cds::container::vyukov_queue::make_traits<
89                     cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
90                     cds::opt::item_counte< cds::atomicity::item_counter >
91                 >::type
92             > myQueue;
93             \endcode
94         */
95         template <typename... Options>
96         struct make_traits {
97 #   ifdef CDS_DOXYGEN_INVOKED
98             typedef implementation_defined type;   ///< Metafunction result
99 #   else
100             typedef typename cds::opt::make_options<
101                 typename cds::opt::find_type_traits< traits, Options... >::type
102                 , Options...
103             >::type type;
104 #   endif
105         };
106
107     } //namespace vyukov_queue
108
109     /// Vyukov's MPMC bounded queue
110     /** @ingroup cds_nonintrusive_queue
111         This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
112         It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
113         blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
114         just implemented by means of atomic RMW operations w/o mutexes.
115
116         The cost of enqueue/dequeue is 1 CAS per operation.
117         No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
118         i.e. do not touch the same data while queue is not empty.
119
120         There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue
121         that supports \p front() and \p pop_front() functions.
122
123         Source:
124             - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
125
126         Template parameters
127         - \p T - type stored in queue.
128         - \p Traits - queue traits, default is \p vykov_queue::traits. You can use \p vykov_queue::make_traits
129             metafunction to make your traits or just derive your traits from \p %vykov_queue::traits:
130             \code
131             struct myTraits: public cds::container::vykov_queue::traits {
132                 typedef cds::atomicity::item_counter    item_counter;
133             };
134             typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
135
136             // Equivalent make_traits example:
137             typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
138                 typename cds::container::vykov_queue::make_traits<
139                     cds::opt::item_counter< cds::atomicity::item_counter >
140                 >::type
141             > myQueue;
142             \endcode
143
144         \par License
145             Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
146     */
147     template <typename T, typename Traits = vyukov_queue::traits >
148     class VyukovMPMCCycleQueue : public cds::bounded_container
149     {
150     public:
151         typedef T value_type;   ///< Value type to be stored in the queue
152         typedef Traits traits;  ///< Queue traits
153         typedef typename traits::item_counter  item_counter;  ///< Item counter type
154         typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See cds::opt::memory_model option
155         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
156         typedef typename traits::back_off  back_off;          ///< back-off strategy
157
158         /// \p true for single-consumer version, \p false otherwise
159         static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer;
160
161         /// Rebind template arguments
162         template <typename T2, typename Traits2>
163         struct rebind {
164             typedef VyukovMPMCCycleQueue< T2, Traits2 > other   ;   ///< Rebinding result
165         };
166
167     protected:
168         //@cond
169         typedef atomics::atomic<size_t> sequence_type;
170         struct cell_type
171         {
172             sequence_type   sequence;
173             value_type      data;
174
175             cell_type()
176             {}
177         };
178
179         typedef typename traits::buffer::template rebind<cell_type>::other buffer;
180         //@endcond
181
182     protected:
183         //@cond
184         buffer          m_buffer;
185         size_t const    m_nBufferMask;
186         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_;
187         sequence_type   m_posEnqueue;
188         typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
189         sequence_type   m_posDequeue;
190         typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad3_;
191         item_counter    m_ItemCounter;
192         //@endcond
193
194     public:
195         /// Constructs the queue of capacity \p nCapacity
196         /**
197             For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
198
199             The buffer capacity must be the power of two.
200         */
201         VyukovMPMCCycleQueue(
202             size_t nCapacity = 0
203             )
204             : m_buffer( nCapacity )
205             , m_nBufferMask( m_buffer.capacity() - 1 )
206         {
207             nCapacity = m_buffer.capacity();
208
209             // Buffer capacity must be power of 2
210             assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
211
212             for (size_t i = 0; i != nCapacity; ++i )
213                 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
214
215             m_posEnqueue.store(0, memory_model::memory_order_relaxed);
216             m_posDequeue.store(0, memory_model::memory_order_relaxed);
217         }
218
219         ~VyukovMPMCCycleQueue()
220         {
221             clear();
222         }
223
224         /// Enqueues data to the queue using a functor
225         /**
226             \p Func is a functor called to copy a value to the queue cell.
227             The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
228             \code
229             cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
230             Bar bar;
231             myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
232             \endcode
233         */
234         template <typename Func>
235         bool enqueue_with(Func f)
236         {
237             cell_type* cell;
238             back_off bkoff;
239
240             size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
241             for (;;)
242             {
243                 cell = &m_buffer[pos & m_nBufferMask];
244                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
245
246                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
247
248                 if (dif == 0) {
249                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
250                         break;
251                 }
252                 else if (dif < 0) {
253                     // Queue full?
254                     if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity() )
255                         return false;   // queue full
256                     bkoff();
257                     pos = m_posEnqueue.load( memory_model::memory_order_relaxed );
258                 }
259                 else
260                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
261             }
262
263             f( cell->data );
264
265             cell->sequence.store(pos + 1, memory_model::memory_order_release);
266             ++m_ItemCounter;
267
268             return true;
269         }
270
271         /// Enqueues \p val value into the queue.
272         /**
273             The new queue item is created by calling placement new in free cell.
274             Returns \p true if success, \p false if the queue is full.
275         */
276         bool enqueue( value_type const& val )
277         {
278             return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
279         }
280
281         /// Synonym for \p enqueue()
282         bool push( value_type const& data )
283         {
284             return enqueue( data );
285         }
286
287         /// Synonym for \p enqueue_with()
288         template <typename Func>
289         bool push_with( Func f )
290         {
291             return enqueue_with( f );
292         }
293
294         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
295         template <typename... Args>
296         bool emplace( Args&&... args )
297         {
298 #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
299             //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda.
300             return enqueue_with ( std::bind([]( value_type& dest,Args ... args ){ new ( &dest ) value_type( std::forward<Args>(args)... );}, std::placeholders::_1 ,args...));
301 #else
302             return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>(args)... ); });
303 #endif
304         }
305
306         /// Dequeues a value using a functor
307         /**
308             \p Func is a functor called to copy dequeued value.
309             The functor takes one argument - a reference to removed node:
310             \code
311             cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
312             Bar bar;
313             myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
314             \endcode
315             The functor is called only if the queue is not empty.
316         */
317         template <typename Func>
318         bool dequeue_with( Func f )
319         {
320             cell_type * cell;
321             back_off bkoff;
322
323             size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
324             for (;;)
325             {
326                 cell = &m_buffer[pos & m_nBufferMask];
327                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
328                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
329
330                 if (dif == 0) {
331                     if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
332                         break;
333                 }
334                 else if (dif < 0) {
335                     // Queue empty?
336                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
337                         return false;   // queue empty
338                     bkoff();
339                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
340                 }
341                 else
342                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
343             }
344
345             f( cell->data );
346             value_cleaner()( cell->data );
347             cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
348             --m_ItemCounter;
349
350             return true;
351         }
352
353         /// Dequeues a value from the queue
354         /**
355             If queue is not empty, the function returns \p true, \p dest contains copy of
356             dequeued value. The assignment operator for type \ref value_type is invoked.
357             If queue is empty, the function returns \p false, \p dest is unchanged.
358         */
359         bool dequeue(value_type & dest )
360         {
361             return dequeue_with( [&dest]( value_type& src ){ dest = src; } );
362         }
363
364         /// Synonym for \p dequeue()
365         bool pop(value_type& data)
366         {
367             return dequeue(data);
368         }
369
370         /// Synonym for \p dequeue_with()
371         template <typename Func>
372         bool pop_with( Func f )
373         {
374             return dequeue_with( f );
375         }
376
377         /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version)
378         template <bool SC = c_single_consumer >
379         typename std::enable_if<SC, value_type *>::type front()
380         {
381             static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true");
382
383             cell_type * cell;
384             back_off bkoff;
385
386             size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
387             for ( ;;)
388             {
389                 cell = &m_buffer[pos & m_nBufferMask];
390                 size_t seq = cell->sequence.load( memory_model::memory_order_acquire );
391                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
392
393                 if ( dif == 0 )
394                     return &cell->data;
395                 else if ( dif < 0 ) {
396                     // Queue empty?
397                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
398                         return nullptr;   // queue empty
399                     bkoff();
400                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
401                 }
402                 else
403                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
404             }
405         }
406
407         /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version)
408         template <bool SC = c_single_consumer >
409         typename std::enable_if<SC, bool>::type pop_front()
410         {
411             return dequeue_with( []( value_type& ) {} );
412         }
413
414         /// Checks if the queue is empty
415         bool empty() const
416         {
417             const cell_type * cell;
418             back_off bkoff;
419
420             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
421             for (;;)
422             {
423                 cell = &m_buffer[pos & m_nBufferMask];
424                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
425                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
426
427                 if (dif == 0)
428                     return false;
429                 else if (dif < 0) {
430                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
431                         return true;
432                 }
433                 bkoff();
434                 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
435             }
436         }
437
438         /// Clears the queue
439         void clear()
440         {
441             value_type v;
442             while ( pop(v) );
443         }
444
445         /// Returns queue's item count
446         /**
447             The value returned depends on \p vyukov_queue::traits::item_counter option.
448             For \p atomicity::empty_item_counter, the function always returns 0.
449         */
450         size_t size() const
451         {
452             return m_ItemCounter.value();
453         }
454
455         /// Returns capacity of the queue
456         size_t capacity() const
457         {
458             return m_buffer.capacity();
459         }
460     };
461
462     //@cond
463     namespace vyukov_queue {
464
465         template <typename Traits>
466         struct single_consumer_traits : public Traits
467         {
468             static CDS_CONSTEXPR bool const single_consumer = true;
469         };
470     } // namespace vyukov_queue
471     //@endcond
472
473     /// Vyukov's queue multiple producer - single consumer version
474
475     template <typename T, typename Traits = vyukov_queue::traits >
476     using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits<Traits> >;
477
478 }}  // namespace cds::container
479
480 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H