-//$$CDS-header$$
+/*
+ This file is a part of libcds - Concurrent Data Structures library
-#ifndef __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
-#define __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
+ (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
+
+ Source code repo: http://github.com/khizmax/libcds/
+ Download: http://sourceforge.net/projects/libcds/files/
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
+#define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
#include <cds/container/details/base.h>
#include <cds/opt/buffer.h>
/// Buffer type for internal array
/*
The type of element for the buffer is not important: the queue rebinds
- buffer for required type via \p rebind metafunction.
+ the buffer for required type via \p rebind metafunction.
For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
+
+ You should use only uninitialized buffer for the queue -
+ \p cds::opt::v::uninitialized_dynamic_buffer (the default),
+ \p cds::opt::v::uninitialized_static_buffer.
*/
- typedef cds::opt::v::dynamic_buffer< void * > buffer;
+ typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
/// A functor to clean item dequeued.
/**
- The functor calls the destructor for queue item.
+ The functor calls the destructor for queue item.
After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
- If \p T is a complex type, \p value_cleaner may be the useful feature.
+ If \p T is a complex type, \p value_cleaner may be useful feature.
Default value is \ref opt::v::destruct_cleaner
*/
/// C++ memory ordering model
/**
Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
- or \p opt::v::sequential_consistent (sequentially consisnent memory model).
+ or \p opt::v::sequential_consistent (sequentially consistent memory model).
*/
typedef opt::v::relaxed_ordering memory_model;
- /// Alignment for internal queue data. Default is \p opt::cache_line_alignment
- enum { alignment = opt::cache_line_alignment };
+ /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
+ enum { padding = opt::cache_line_padding };
+
+ /// Back-off strategy
+ typedef cds::backoff::Default back_off;
+
+ /// Single-consumer version
+ /**
+ For single-consumer version of algorithm some additional functions
+ (\p front(), \p pop_front()) is available.
+
+ Default is \p false
+ */
+ static CDS_CONSTEXPR bool const single_consumer = false;
};
/// Metafunction converting option list to \p vyukov_queue::traits
/**
Supported \p Options are:
- - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
- \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
+ - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
+ \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
element in the buffer is not important: it will be changed via \p rebind metafunction.
- \p opt::value_cleaner - a functor to clean item dequeued.
The functor calls the destructor for queue item.
After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
- If \p T is a complex type, \p value_cleaner may be the useful feature.
+ If \p T is a complex type, \p value_cleaner can be an useful feature.
Default value is \ref opt::v::destruct_cleaner
+ - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
- \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
To enable item counting use \p cds::atomicity::item_counter
- - \p opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment
+ - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
- \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
or \p opt::v::sequential_consistent (sequentially consisnent memory model).
\code
typedef cds::container::VyukovMPMCCycleQueue< Foo,
typename cds::container::vyukov_queue::make_traits<
- cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
+ cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >,
cds::opt::item_counte< cds::atomicity::item_counter >
>::type
> myQueue;
No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
i.e. do not touch the same data while queue is not empty.
+ There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue
+ that supports \p front() and \p pop_front() functions.
+
Source:
- http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
Template parameters
- \p T - type stored in queue.
- - \p Traits - queue traits, default is \p vykov_queue::traits. You can use \p vykov_queue::make_traits
- metafunction to make your traits or just derive your traits from \p %vykov_queue::traits:
+ - \p Traits - queue traits, default is \p vyukov_queue::traits. You can use \p vyukov_queue::make_traits
+ metafunction to make your traits or just derive your traits from \p %vyukov_queue::traits:
\code
- struct myTraits: public cds::container::vykov_queue::traits {
+ struct myTraits: public cds::container::vyukov_queue::traits {
typedef cds::atomicity::item_counter item_counter;
};
typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
// Equivalent make_traits example:
typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
- typename cds::container::vykov_queue::make_traits<
+ typename cds::container::vyukov_queue::make_traits<
cds::opt::item_counter< cds::atomicity::item_counter >
>::type
> myQueue;
typedef T value_type; ///< Value type to be stored in the queue
typedef Traits traits; ///< Queue traits
typedef typename traits::item_counter item_counter; ///< Item counter type
- typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
+ typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option
typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
+ typedef typename traits::back_off back_off; ///< back-off strategy
+
+ /// \p true for single-consumer version, \p false otherwise
+ static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer;
/// Rebind template arguments
template <typename T2, typename Traits2>
};
typedef typename traits::buffer::template rebind<cell_type>::other buffer;
- typedef typename opt::details::alignment_setter< sequence_type, traits::alignment >::type aligned_sequence_type;
- typedef typename opt::details::alignment_setter< buffer, traits::alignment >::type aligned_buffer;
//@endcond
protected:
//@cond
- aligned_buffer m_buffer;
+ buffer m_buffer;
size_t const m_nBufferMask;
- aligned_sequence_type m_posEnqueue;
- aligned_sequence_type m_posDequeue;
+ typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_;
+ sequence_type m_posEnqueue;
+ typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
+ sequence_type m_posDequeue;
+ typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad3_;
item_counter m_ItemCounter;
//@endcond
public:
/// Constructs the queue of capacity \p nCapacity
/**
- For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
+ For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
+
+ The buffer capacity must be the power of two.
*/
VyukovMPMCCycleQueue(
size_t nCapacity = 0
// Buffer capacity must be power of 2
assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
- for (size_t i = 0; i != nCapacity; i += 1)
+ for (size_t i = 0; i != nCapacity; ++i )
m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
m_posEnqueue.store(0, memory_model::memory_order_relaxed);
bool enqueue_with(Func f)
{
cell_type* cell;
- size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
+ back_off bkoff;
+ size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
for (;;)
{
cell = &m_buffer[pos & m_nBufferMask];
size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
- intptr_t dif = (intptr_t)seq - (intptr_t)pos;
+ intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
- if (dif == 0)
- {
- if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed) )
+ if (dif == 0) {
+ if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
break;
}
- else if (dif < 0)
- return false;
+ else if (dif < 0) {
+ // Queue full?
+ if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity())
+ return false; // queue full
+ bkoff();
+ pos = m_posEnqueue.load( memory_model::memory_order_relaxed );
+ }
else
pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
}
return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
}
- /// Synonym for \p enqueue()
+ /// Enqueues \p val value into the queue, move semantics
+ bool enqueue( value_type&& val )
+ {
+ return enqueue_with( [&val]( value_type& dest ) { new (&dest) value_type( std::move( val ));});
+ }
+
+ /// Synonym for \p enqueue( valuetype const& )
bool push( value_type const& data )
{
return enqueue( data );
}
+ /// Synonym for \p enqueue( value_type&& )
+ bool push( value_type&& data )
+ {
+ return enqueue( std::move( data ));
+ }
+
/// Synonym for \p enqueue_with()
template <typename Func>
bool push_with( Func f )
template <typename... Args>
bool emplace( Args&&... args )
{
- cell_type* cell;
- size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
-
- for (;;)
- {
- cell = &m_buffer[pos & m_nBufferMask];
- size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
-
- intptr_t dif = (intptr_t)seq - (intptr_t)pos;
-
- if (dif == 0)
- {
- if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed) )
- break;
- }
- else if (dif < 0)
- return false;
- else
- pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
- }
-
- new ( &cell->data ) value_type( std::forward<Args>(args)... );
-
- cell->sequence.store(pos + 1, memory_model::memory_order_release);
- ++m_ItemCounter;
-
- return true;
+#if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
+ //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda.
+ value_type val( std::forward<Args>(args)... );
+ return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( std::move( val )); });
+#else
+ return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>( args )... ); });
+#endif
}
/// Dequeues a value using a functor
bool dequeue_with( Func f )
{
cell_type * cell;
- size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
+ back_off bkoff;
+ size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
for (;;)
{
cell = &m_buffer[pos & m_nBufferMask];
size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
- intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
+ intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
if (dif == 0) {
- if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed))
+ if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
break;
}
- else if (dif < 0)
- return false;
+ else if (dif < 0) {
+ // Queue empty?
+ if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
+ return false; // queue empty
+ bkoff();
+ pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+ }
else
pos = m_posDequeue.load(memory_model::memory_order_relaxed);
}
f( cell->data );
value_cleaner()( cell->data );
- --m_ItemCounter;
cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
+ --m_ItemCounter;
return true;
}
/// Dequeues a value from the queue
/**
- If queue is not empty, the function returns \p true, \p dest contains copy of
+ If queue is not empty, the function returns \p true, \p dest contains a copy of
dequeued value. The assignment operator for type \ref value_type is invoked.
If queue is empty, the function returns \p false, \p dest is unchanged.
*/
- bool dequeue(value_type & dest )
+ bool dequeue(value_type& dest )
{
- return dequeue_with( [&dest]( value_type& src ){ dest = src; } );
+ return dequeue_with( [&dest]( value_type& src ){ dest = std::move( src );});
}
/// Synonym for \p dequeue()
return dequeue_with( f );
}
+ /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version)
+ template <bool SC = c_single_consumer >
+ typename std::enable_if<SC, value_type *>::type front()
+ {
+ static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true");
+
+ cell_type * cell;
+ back_off bkoff;
+
+ size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+ for ( ;;)
+ {
+ cell = &m_buffer[pos & m_nBufferMask];
+ size_t seq = cell->sequence.load( memory_model::memory_order_acquire );
+ intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
+
+ if ( dif == 0 )
+ return &cell->data;
+ else if ( dif < 0 ) {
+ // Queue empty?
+ if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
+ return nullptr; // queue empty
+ bkoff();
+ pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+ }
+ else
+ pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+ }
+ }
+
+ /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version)
+ template <bool SC = c_single_consumer >
+ typename std::enable_if<SC, bool>::type pop_front()
+ {
+ return dequeue_with( []( value_type& ) {} );
+ }
+
/// Checks if the queue is empty
bool empty() const
{
const cell_type * cell;
- size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
+ back_off bkoff;
+ size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
for (;;)
{
cell = &m_buffer[pos & m_nBufferMask];
size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
- intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
+ intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
if (dif == 0)
return false;
- else if (dif < 0)
- return true;
- else
- pos = m_posDequeue.load(memory_model::memory_order_relaxed);
+ else if (dif < 0) {
+ if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
+ return true;
+ }
+ bkoff();
+ pos = m_posDequeue.load(memory_model::memory_order_relaxed);
}
}
void clear()
{
value_type v;
- while ( pop(v) );
+ while ( pop(v));
}
/// Returns queue's item count
/**
The value returned depends on \p vyukov_queue::traits::item_counter option.
- For \p atomicity::empty_item_counter, this function always returns 0.
+ For \p atomicity::empty_item_counter, the function always returns 0.
*/
size_t size() const
{
return m_buffer.capacity();
}
};
+
+ //@cond
+ namespace vyukov_queue {
+
+ template <typename Traits>
+ struct single_consumer_traits : public Traits
+ {
+ static CDS_CONSTEXPR bool const single_consumer = true;
+ };
+ } // namespace vyukov_queue
+ //@endcond
+
+ /// Vyukov's queue multiple producer - single consumer version
+ template <typename T, typename Traits = vyukov_queue::traits >
+ using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits<Traits> >;
+
}} // namespace cds::container
-#endif // #ifndef __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
+#endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H