Merge branch 'integration' of github.com:khizmax/libcds into integration
[libcds.git] / cds / container / vyukov_mpmc_cycle_queue.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
4 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
5
6 #include <cds/container/details/base.h>
7 #include <cds/opt/buffer.h>
8 #include <cds/opt/value_cleaner.h>
9 #include <cds/algo/atomic.h>
10 #include <cds/details/bounded_container.h>
11
12 namespace cds { namespace container {
13
14     /// VyukovMPMCCycleQueue related definitions
15     /** @ingroup cds_nonintrusive_helper
16     */
17     namespace vyukov_queue {
18
19         /// VyukovMPMCCycleQueue default traits
20         struct traits {
21             /// Buffer type for internal array
22             /*
23                 The type of element for the buffer is not important: the queue rebinds
24                 buffer for required type via \p rebind metafunction.
25
26                 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
27             */
28             typedef cds::opt::v::dynamic_buffer< void * > buffer;
29
30             /// A functor to clean item dequeued.
31             /**
32                 The functor  calls the destructor for queue item.
33                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
34                 If \p T is a complex type, \p value_cleaner may be the useful feature.
35
36                 Default value is \ref opt::v::destruct_cleaner
37             */
38             typedef cds::opt::v::destruct_cleaner value_cleaner;
39
40             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
41             typedef cds::atomicity::empty_item_counter item_counter;
42
43             /// C++ memory ordering model
44             /**
45                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
46                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
47             */
48             typedef opt::v::relaxed_ordering    memory_model;
49
50             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
51             enum { padding = opt::cache_line_padding };
52
53             /// Back-off strategy
54             typedef cds::backoff::Default           back_off;
55
56             /// Single-consumer version
57             /**
58                 For single-consumer version of algorithm some additional functions 
59                 (\p front(), \p pop_front()) is available.
60
61                 Default is \p false
62             */
63             static CDS_CONSTEXPR bool const single_consumer = false;
64         };
65
66         /// Metafunction converting option list to \p vyukov_queue::traits
67         /**
68             Supported \p Options are:
69             - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
70                 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
71                 element in the buffer is not important: it will be changed via \p rebind metafunction.
72             - \p opt::value_cleaner - a functor to clean item dequeued.
73                 The functor calls the destructor for queue item.
74                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
75                 If \p T is a complex type, \p value_cleaner can be an useful feature.
76                 Default value is \ref opt::v::destruct_cleaner
77             - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
78             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
79                 To enable item counting use \p cds::atomicity::item_counter
80             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
81             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
82                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
83
84             Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
85             \code
86             typedef cds::container::VyukovMPMCCycleQueue< Foo,
87                 typename cds::container::vyukov_queue::make_traits<
88                     cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
89                     cds::opt::item_counte< cds::atomicity::item_counter >
90                 >::type
91             > myQueue;
92             \endcode
93         */
94         template <typename... Options>
95         struct make_traits {
96 #   ifdef CDS_DOXYGEN_INVOKED
97             typedef implementation_defined type;   ///< Metafunction result
98 #   else
99             typedef typename cds::opt::make_options<
100                 typename cds::opt::find_type_traits< traits, Options... >::type
101                 , Options...
102             >::type type;
103 #   endif
104         };
105
106     } //namespace vyukov_queue
107
108     /// Vyukov's MPMC bounded queue
109     /** @ingroup cds_nonintrusive_queue
110         This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
111         It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
112         blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
113         just implemented by means of atomic RMW operations w/o mutexes.
114
115         The cost of enqueue/dequeue is 1 CAS per operation.
116         No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
117         i.e. do not touch the same data while queue is not empty.
118
119         There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue
120         that supports \p front() and \p pop_front() functions.
121
122         Source:
123             - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
124
125         Template parameters
126         - \p T - type stored in queue.
127         - \p Traits - queue traits, default is \p vykov_queue::traits. You can use \p vykov_queue::make_traits
128             metafunction to make your traits or just derive your traits from \p %vykov_queue::traits:
129             \code
130             struct myTraits: public cds::container::vykov_queue::traits {
131                 typedef cds::atomicity::item_counter    item_counter;
132             };
133             typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
134
135             // Equivalent make_traits example:
136             typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
137                 typename cds::container::vykov_queue::make_traits<
138                     cds::opt::item_counter< cds::atomicity::item_counter >
139                 >::type
140             > myQueue;
141             \endcode
142
143         \par License
144             Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
145     */
146     template <typename T, typename Traits = vyukov_queue::traits >
147     class VyukovMPMCCycleQueue : public cds::bounded_container
148     {
149     public:
150         typedef T value_type;   ///< Value type to be stored in the queue
151         typedef Traits traits;  ///< Queue traits
152         typedef typename traits::item_counter  item_counter;  ///< Item counter type
153         typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See cds::opt::memory_model option
154         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
155         typedef typename traits::back_off  back_off;          ///< back-off strategy
156
157         /// \p true for single-consumer version, \p false otherwise
158         static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer;
159
160         /// Rebind template arguments
161         template <typename T2, typename Traits2>
162         struct rebind {
163             typedef VyukovMPMCCycleQueue< T2, Traits2 > other   ;   ///< Rebinding result
164         };
165
166     protected:
167         //@cond
168         typedef atomics::atomic<size_t> sequence_type;
169         struct cell_type
170         {
171             sequence_type   sequence;
172             value_type      data;
173
174             cell_type()
175             {}
176         };
177
178         typedef typename traits::buffer::template rebind<cell_type>::other buffer;
179         //@endcond
180
181     protected:
182         //@cond
183         buffer          m_buffer;
184         size_t const    m_nBufferMask;
185         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_;
186         sequence_type   m_posEnqueue;
187         typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
188         sequence_type   m_posDequeue;
189         typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad3_;
190         item_counter    m_ItemCounter;
191         //@endcond
192
193     public:
194         /// Constructs the queue of capacity \p nCapacity
195         /**
196             For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
197
198             The buffer capacity must be the power of two.
199         */
200         VyukovMPMCCycleQueue(
201             size_t nCapacity = 0
202             )
203             : m_buffer( nCapacity )
204             , m_nBufferMask( m_buffer.capacity() - 1 )
205         {
206             nCapacity = m_buffer.capacity();
207
208             // Buffer capacity must be power of 2
209             assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
210
211             for (size_t i = 0; i != nCapacity; ++i )
212                 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
213
214             m_posEnqueue.store(0, memory_model::memory_order_relaxed);
215             m_posDequeue.store(0, memory_model::memory_order_relaxed);
216         }
217
218         ~VyukovMPMCCycleQueue()
219         {
220             clear();
221         }
222
223         /// Enqueues data to the queue using a functor
224         /**
225             \p Func is a functor called to copy a value to the queue cell.
226             The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
227             \code
228             cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
229             Bar bar;
230             myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
231             \endcode
232         */
233         template <typename Func>
234         bool enqueue_with(Func f)
235         {
236             cell_type* cell;
237             back_off bkoff;
238
239             size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
240             for (;;)
241             {
242                 cell = &m_buffer[pos & m_nBufferMask];
243                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
244
245                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
246
247                 if (dif == 0) {
248                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
249                         break;
250                 }
251                 else if (dif < 0) {
252                     // Queue full?
253                     if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity() )
254                         return false;   // queue full
255                     bkoff();
256                     pos = m_posEnqueue.load( memory_model::memory_order_relaxed );
257                 }
258                 else
259                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
260             }
261
262             f( cell->data );
263
264             cell->sequence.store(pos + 1, memory_model::memory_order_release);
265             ++m_ItemCounter;
266
267             return true;
268         }
269
270         /// Enqueues \p val value into the queue.
271         /**
272             The new queue item is created by calling placement new in free cell.
273             Returns \p true if success, \p false if the queue is full.
274         */
275         bool enqueue( value_type const& val )
276         {
277             return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
278         }
279
280         /// Synonym for \p enqueue()
281         bool push( value_type const& data )
282         {
283             return enqueue( data );
284         }
285
286         /// Synonym for \p enqueue_with()
287         template <typename Func>
288         bool push_with( Func f )
289         {
290             return enqueue_with( f );
291         }
292
293         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
294         template <typename... Args>
295         bool emplace( Args&&... args )
296         {
297 #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
298             //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda.
299             return enqueue_with ( std::bind([]( value_type& dest,Args ... args ){ new ( &dest ) value_type( std::forward<Args>(args)... );}, std::placeholders::_1 ,args...));
300 #else
301             return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>(args)... ); });
302 #endif
303         }
304
305         /// Dequeues a value using a functor
306         /**
307             \p Func is a functor called to copy dequeued value.
308             The functor takes one argument - a reference to removed node:
309             \code
310             cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
311             Bar bar;
312             myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
313             \endcode
314             The functor is called only if the queue is not empty.
315         */
316         template <typename Func>
317         bool dequeue_with( Func f )
318         {
319             cell_type * cell;
320             back_off bkoff;
321
322             size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
323             for (;;)
324             {
325                 cell = &m_buffer[pos & m_nBufferMask];
326                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
327                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
328
329                 if (dif == 0) {
330                     if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
331                         break;
332                 }
333                 else if (dif < 0) {
334                     // Queue empty?
335                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
336                         return false;   // queue empty
337                     bkoff();
338                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
339                 }
340                 else
341                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
342             }
343
344             f( cell->data );
345             value_cleaner()( cell->data );
346             cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
347             --m_ItemCounter;
348
349             return true;
350         }
351
352         /// Dequeues a value from the queue
353         /**
354             If queue is not empty, the function returns \p true, \p dest contains copy of
355             dequeued value. The assignment operator for type \ref value_type is invoked.
356             If queue is empty, the function returns \p false, \p dest is unchanged.
357         */
358         bool dequeue(value_type & dest )
359         {
360             return dequeue_with( [&dest]( value_type& src ){ dest = src; } );
361         }
362
363         /// Synonym for \p dequeue()
364         bool pop(value_type& data)
365         {
366             return dequeue(data);
367         }
368
369         /// Synonym for \p dequeue_with()
370         template <typename Func>
371         bool pop_with( Func f )
372         {
373             return dequeue_with( f );
374         }
375
376         /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version)
377         template <bool SC = c_single_consumer >
378         typename std::enable_if<SC, value_type *>::type front()
379         {
380             static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true");
381
382             cell_type * cell;
383             back_off bkoff;
384
385             size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
386             for ( ;;)
387             {
388                 cell = &m_buffer[pos & m_nBufferMask];
389                 size_t seq = cell->sequence.load( memory_model::memory_order_acquire );
390                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
391
392                 if ( dif == 0 )
393                     return &cell->data;
394                 else if ( dif < 0 ) {
395                     // Queue empty?
396                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
397                         return nullptr;   // queue empty
398                     bkoff();
399                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
400                 }
401                 else
402                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
403             }
404         }
405
406         /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version)
407         template <bool SC = c_single_consumer >
408         typename std::enable_if<SC, bool>::type pop_front()
409         {
410             return dequeue_with( []( value_type& ) {} );
411         }
412
413         /// Checks if the queue is empty
414         bool empty() const
415         {
416             const cell_type * cell;
417             back_off bkoff;
418
419             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
420             for (;;)
421             {
422                 cell = &m_buffer[pos & m_nBufferMask];
423                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
424                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
425
426                 if (dif == 0)
427                     return false;
428                 else if (dif < 0) {
429                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
430                         return true;
431                 }
432                 bkoff();
433                 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
434             }
435         }
436
437         /// Clears the queue
438         void clear()
439         {
440             value_type v;
441             while ( pop(v) );
442         }
443
444         /// Returns queue's item count
445         /**
446             The value returned depends on \p vyukov_queue::traits::item_counter option.
447             For \p atomicity::empty_item_counter, the function always returns 0.
448         */
449         size_t size() const
450         {
451             return m_ItemCounter.value();
452         }
453
454         /// Returns capacity of the queue
455         size_t capacity() const
456         {
457             return m_buffer.capacity();
458         }
459     };
460
461     //@cond
462     namespace vyukov_queue { 
463         template <typename Traits>
464         struct single_consumer_traits : public Traits
465         {
466             static CDS_CONSTEXPR bool const single_consumer = true;
467         };
468     } // namespace vyukov_queue
469     //@endcond
470
471     /// Vyukov's queue multiple producer - single consumer version 
472     template <typename T, typename Traits = vyukov_queue::traits >
473     using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits<Traits> >;
474
475 }}  // namespace cds::container
476
477 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H