Make Vyukov queue linearizable
[libcds.git] / cds / container / vyukov_mpmc_cycle_queue.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
4 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
5
6 #include <cds/container/details/base.h>
7 #include <cds/opt/buffer.h>
8 #include <cds/opt/value_cleaner.h>
9 #include <cds/algo/atomic.h>
10 #include <cds/details/bounded_container.h>
11
12 namespace cds { namespace container {
13
14     /// VyukovMPMCCycleQueue related definitions
15     /** @ingroup cds_nonintrusive_helper
16     */
17     namespace vyukov_queue {
18
19         /// VyukovMPMCCycleQueue default traits
20         struct traits {
21             /// Buffer type for internal array
22             /*
23                 The type of element for the buffer is not important: the queue rebinds
24                 buffer for required type via \p rebind metafunction.
25
26                 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
27             */
28             typedef cds::opt::v::dynamic_buffer< void * > buffer;
29
30             /// A functor to clean item dequeued.
31             /**
32                 The functor  calls the destructor for queue item.
33                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
34                 If \p T is a complex type, \p value_cleaner may be the useful feature.
35
36                 Default value is \ref opt::v::destruct_cleaner
37             */
38             typedef cds::opt::v::destruct_cleaner value_cleaner;
39
40             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
41             typedef cds::atomicity::empty_item_counter item_counter;
42
43             /// C++ memory ordering model
44             /**
45                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
46                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
47             */
48             typedef opt::v::relaxed_ordering    memory_model;
49
50             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
51             enum { padding = opt::cache_line_padding };
52
53             /// Back-off strategy
54             typedef cds::backoff::Default           back_off;
55         };
56
57         /// Metafunction converting option list to \p vyukov_queue::traits
58         /**
59             Supported \p Options are:
60             - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
61                 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
62                 element in the buffer is not important: it will be changed via \p rebind metafunction.
63             - \p opt::value_cleaner - a functor to clean item dequeued.
64                 The functor calls the destructor for queue item.
65                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
66                 If \p T is a complex type, \p value_cleaner can be an useful feature.
67                 Default value is \ref opt::v::destruct_cleaner
68             - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
69             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
70                 To enable item counting use \p cds::atomicity::item_counter
71             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
72             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
73                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
74
75             Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
76             \code
77             typedef cds::container::VyukovMPMCCycleQueue< Foo,
78                 typename cds::container::vyukov_queue::make_traits<
79                     cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
80                     cds::opt::item_counte< cds::atomicity::item_counter >
81                 >::type
82             > myQueue;
83             \endcode
84         */
85         template <typename... Options>
86         struct make_traits {
87 #   ifdef CDS_DOXYGEN_INVOKED
88             typedef implementation_defined type;   ///< Metafunction result
89 #   else
90             typedef typename cds::opt::make_options<
91                 typename cds::opt::find_type_traits< traits, Options... >::type
92                 , Options...
93             >::type type;
94 #   endif
95         };
96
97     } //namespace vyukov_queue
98
99     /// Vyukov's MPMC bounded queue
100     /** @ingroup cds_nonintrusive_queue
101         This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
102         It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
103         blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
104         just implemented by means of atomic RMW operations w/o mutexes.
105
106         The cost of enqueue/dequeue is 1 CAS per operation.
107         No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
108         i.e. do not touch the same data while queue is not empty.
109
110         Source:
111             - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
112
113         Template parameters
114         - \p T - type stored in queue.
115         - \p Traits - queue traits, default is \p vykov_queue::traits. You can use \p vykov_queue::make_traits
116             metafunction to make your traits or just derive your traits from \p %vykov_queue::traits:
117             \code
118             struct myTraits: public cds::container::vykov_queue::traits {
119                 typedef cds::atomicity::item_counter    item_counter;
120             };
121             typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
122
123             // Equivalent make_traits example:
124             typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
125                 typename cds::container::vykov_queue::make_traits<
126                     cds::opt::item_counter< cds::atomicity::item_counter >
127                 >::type
128             > myQueue;
129             \endcode
130
131         \par License
132             Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
133     */
134     template <typename T, typename Traits = vyukov_queue::traits >
135     class VyukovMPMCCycleQueue : public cds::bounded_container
136     {
137     public:
138         typedef T value_type;   ///< Value type to be stored in the queue
139         typedef Traits traits;  ///< Queue traits
140         typedef typename traits::item_counter  item_counter;  ///< Item counter type
141         typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See cds::opt::memory_model option
142         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
143         typedef typename traits::back_off  back_off;          ///< back-off strategy
144
145         /// Rebind template arguments
146         template <typename T2, typename Traits2>
147         struct rebind {
148             typedef VyukovMPMCCycleQueue< T2, Traits2 > other   ;   ///< Rebinding result
149         };
150
151     protected:
152         //@cond
153         typedef atomics::atomic<size_t> sequence_type;
154         struct cell_type
155         {
156             sequence_type   sequence;
157             value_type      data;
158
159             cell_type()
160             {}
161         };
162
163         typedef typename traits::buffer::template rebind<cell_type>::other buffer;
164         //@endcond
165
166     protected:
167         //@cond
168         buffer          m_buffer;
169         size_t const    m_nBufferMask;
170         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_;
171         sequence_type   m_posEnqueue;
172         typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
173         sequence_type   m_posDequeue;
174         typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad3_;
175         item_counter    m_ItemCounter;
176         //@endcond
177
178     public:
179         /// Constructs the queue of capacity \p nCapacity
180         /**
181             For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
182
183             The buffer capacity must be the power of two.
184         */
185         VyukovMPMCCycleQueue(
186             size_t nCapacity = 0
187             )
188             : m_buffer( nCapacity )
189             , m_nBufferMask( m_buffer.capacity() - 1 )
190         {
191             nCapacity = m_buffer.capacity();
192
193             // Buffer capacity must be power of 2
194             assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
195
196             for (size_t i = 0; i != nCapacity; ++i )
197                 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
198
199             m_posEnqueue.store(0, memory_model::memory_order_relaxed);
200             m_posDequeue.store(0, memory_model::memory_order_relaxed);
201         }
202
203         ~VyukovMPMCCycleQueue()
204         {
205             clear();
206         }
207
208         /// Enqueues data to the queue using a functor
209         /**
210             \p Func is a functor called to copy a value to the queue cell.
211             The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
212             \code
213             cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
214             Bar bar;
215             myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
216             \endcode
217         */
218         template <typename Func>
219         bool enqueue_with(Func f)
220         {
221             cell_type* cell;
222             back_off bkoff;
223
224             size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
225             for (;;)
226             {
227                 cell = &m_buffer[pos & m_nBufferMask];
228                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
229
230                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
231
232                 if (dif == 0) {
233                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
234                         break;
235                 }
236                 else if (dif < 0) {
237                     // Queue full?
238                     if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity() )
239                         return false;   // queue full
240                     bkoff();
241                     pos = m_posEnqueue.load( memory_model::memory_order_relaxed );
242                 }
243                 else
244                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
245             }
246
247             f( cell->data );
248
249             cell->sequence.store(pos + 1, memory_model::memory_order_release);
250             ++m_ItemCounter;
251
252             return true;
253         }
254
255         /// Enqueues \p val value into the queue.
256         /**
257             The new queue item is created by calling placement new in free cell.
258             Returns \p true if success, \p false if the queue is full.
259         */
260         bool enqueue( value_type const& val )
261         {
262             return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
263         }
264
265         /// Synonym for \p enqueue()
266         bool push( value_type const& data )
267         {
268             return enqueue( data );
269         }
270
271         /// Synonym for \p enqueue_with()
272         template <typename Func>
273         bool push_with( Func f )
274         {
275             return enqueue_with( f );
276         }
277
278         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
279         template <typename... Args>
280         bool emplace( Args&&... args )
281         {
282 #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
283             //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda.
284             return enqueue_with ( std::bind([]( value_type& dest,Args ... args ){ new ( &dest ) value_type( std::forward<Args>(args)... );}, std::placeholders::_1 ,args...));
285 #else
286             return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>(args)... ); });
287 #endif
288         }
289
290         /// Dequeues a value using a functor
291         /**
292             \p Func is a functor called to copy dequeued value.
293             The functor takes one argument - a reference to removed node:
294             \code
295             cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
296             Bar bar;
297             myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
298             \endcode
299             The functor is called only if the queue is not empty.
300         */
301         template <typename Func>
302         bool dequeue_with( Func f )
303         {
304             cell_type * cell;
305             back_off bkoff;
306
307             size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
308             for (;;)
309             {
310                 cell = &m_buffer[pos & m_nBufferMask];
311                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
312                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
313
314                 if (dif == 0) {
315                     if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
316                         break;
317                 }
318                 else if (dif < 0) {
319                     // Queue empty?
320                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
321                         return false;   // queue empty
322                     bkoff();
323                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
324                 }
325                 else
326                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
327             }
328
329             f( cell->data );
330             value_cleaner()( cell->data );
331             cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
332             --m_ItemCounter;
333
334             return true;
335         }
336
337         /// Dequeues a value from the queue
338         /**
339             If queue is not empty, the function returns \p true, \p dest contains copy of
340             dequeued value. The assignment operator for type \ref value_type is invoked.
341             If queue is empty, the function returns \p false, \p dest is unchanged.
342         */
343         bool dequeue(value_type & dest )
344         {
345             return dequeue_with( [&dest]( value_type& src ){ dest = src; } );
346         }
347
348         /// Synonym for \p dequeue()
349         bool pop(value_type& data)
350         {
351             return dequeue(data);
352         }
353
354         /// Synonym for \p dequeue_with()
355         template <typename Func>
356         bool pop_with( Func f )
357         {
358             return dequeue_with( f );
359         }
360
361         /// Checks if the queue is empty
362         bool empty() const
363         {
364             const cell_type * cell;
365             back_off bkoff;
366
367             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
368             for (;;)
369             {
370                 cell = &m_buffer[pos & m_nBufferMask];
371                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
372                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
373
374                 if (dif == 0)
375                     return false;
376                 else if (dif < 0) {
377                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
378                         return true;
379                 }
380                 bkoff();
381                 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
382             }
383         }
384
385         /// Clears the queue
386         void clear()
387         {
388             value_type v;
389             while ( pop(v) );
390         }
391
392         /// Returns queue's item count
393         /**
394             The value returned depends on \p vyukov_queue::traits::item_counter option.
395             For \p atomicity::empty_item_counter, the function always returns 0.
396         */
397         size_t size() const
398         {
399             return m_ItemCounter.value();
400         }
401
402         /// Returns capacity of the queue
403         size_t capacity() const
404         {
405             return m_buffer.capacity();
406         }
407     };
408 }}  // namespace cds::container
409
410 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H