Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
you can push/pop an array of elements.
- There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer<void, Traits>"
- that is not a queue but a "memory pool" between producer and consumer threads.
+ There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer<void, Traits>"
+ that is not a queue but a "memory pool" between producer and consumer threads.
\p WeakRingBuffer<void> supports variable-sized data.
@warning: \p %WeakRingBuffer is developed for 64-bit architecture.
- On 32-bit platform an integer overflow of internal counters is possible.
+ 32-bit platform must provide support for 64-bit atomics.
*/
template <typename T, typename Traits = weak_ringbuffer::traits>
class WeakRingBuffer: public cds::bounded_container
private:
//@cond
typedef typename traits::buffer::template rebind< value_type >::other buffer;
+ typedef uint64_t counter_type;
//@endcond
public:
~WeakRingBuffer()
{
value_cleaner cleaner;
- size_t back = back_.load( memory_model::memory_order_relaxed );
- for ( size_t front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
+ counter_type back = back_.load( memory_model::memory_order_relaxed );
+ for ( counter_type front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
cleaner( buffer_[ buffer_.mod( front ) ] );
}
\code
cds::container::WeakRingBuffer<std::string> ringbuf;
char const* arr[10];
- ringbuf.push( arr, 10,
+ ringbuf.push( arr, 10,
[]( std::string& element, char const* src ) {
new( &element ) std::string( src );
});
template <typename Q, typename CopyFunc>
bool push( Q* arr, size_t count, CopyFunc copy )
{
- assert( count < capacity() );
- size_t back = back_.load( memory_model::memory_order_relaxed );
+ assert( count < capacity());
+ counter_type back = back_.load( memory_model::memory_order_relaxed );
- assert( back - pfront_ <= capacity() );
+ assert( static_cast<size_t>( back - pfront_ ) <= capacity());
- if ( pfront_ + capacity() - back < count ) {
+ if ( static_cast<size_t>( pfront_ + capacity() - back ) < count ) {
pfront_ = front_.load( memory_model::memory_order_acquire );
- if ( pfront_ + capacity() - back < count ) {
+ if ( static_cast<size_t>( pfront_ + capacity() - back ) < count ) {
// not enough space
return false;
}
typename std::enable_if< std::is_constructible<value_type, Args...>::value, bool>::type
emplace( Args&&... args )
{
- size_t back = back_.load( memory_model::memory_order_relaxed );
+ counter_type back = back_.load( memory_model::memory_order_relaxed );
- assert( back - pfront_ <= capacity() );
+ assert( static_cast<size_t>( back - pfront_ ) <= capacity());
if ( pfront_ + capacity() - back < 1 ) {
pfront_ = front_.load( memory_model::memory_order_acquire );
template <typename Func>
bool enqueue_with( Func f )
{
- size_t back = back_.load( memory_model::memory_order_relaxed );
+ counter_type back = back_.load( memory_model::memory_order_relaxed );
- assert( back - pfront_ <= capacity() );
+ assert( static_cast<size_t>( back - pfront_ ) <= capacity());
if ( pfront_ + capacity() - back < 1 ) {
pfront_ = front_.load( memory_model::memory_order_acquire );
template <typename Q, typename CopyFunc>
bool pop( Q* arr, size_t count, CopyFunc copy )
{
- assert( count < capacity() );
+ assert( count < capacity());
- size_t front = front_.load( memory_model::memory_order_relaxed );
- assert( cback_ - front < capacity() );
+ counter_type front = front_.load( memory_model::memory_order_relaxed );
+ assert( static_cast<size_t>( cback_ - front ) < capacity());
- if ( cback_ - front < count ) {
+ if ( static_cast<size_t>( cback_ - front ) < count ) {
cback_ = back_.load( memory_model::memory_order_acquire );
- if ( cback_ - front < count )
+ if ( static_cast<size_t>( cback_ - front ) < count )
return false;
}
template <typename Func>
bool dequeue_with( Func f )
{
- size_t front = front_.load( memory_model::memory_order_relaxed );
- assert( cback_ - front < capacity() );
+ counter_type front = front_.load( memory_model::memory_order_relaxed );
+ assert( static_cast<size_t>( cback_ - front ) < capacity());
if ( cback_ - front < 1 ) {
cback_ = back_.load( memory_model::memory_order_acquire );
*/
value_type* front()
{
- size_t front = front_.load( memory_model::memory_order_relaxed );
- assert( cback_ - front < capacity() );
+ counter_type front = front_.load( memory_model::memory_order_relaxed );
+ assert( static_cast<size_t>( cback_ - front ) < capacity());
if ( cback_ - front < 1 ) {
cback_ = back_.load( memory_model::memory_order_acquire );
*/
bool pop_front()
{
- size_t front = front_.load( memory_model::memory_order_relaxed );
- assert( cback_ - front <= capacity() );
+ counter_type front = front_.load( memory_model::memory_order_relaxed );
+ assert( static_cast<size_t>( cback_ - front ) <= capacity());
if ( cback_ - front < 1 ) {
cback_ = back_.load( memory_model::memory_order_acquire );
void clear()
{
value_type v;
- while ( pop( v ) );
+ while ( pop( v ));
}
/// Checks if the ring-buffer is empty
/// Returns the current size of ring buffer
size_t size() const
{
- return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
+ return static_cast<size_t>( back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ));
}
/// Returns capacity of the ring buffer
private:
//@cond
- atomics::atomic<size_t> front_;
- typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
- atomics::atomic<size_t> back_;
- typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
- size_t pfront_;
- typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
- size_t cback_;
- typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
+ atomics::atomic<counter_type> front_;
+ typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad1_;
+ atomics::atomic<counter_type> back_;
+ typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad2_;
+ counter_type pfront_;
+ typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad3_;
+ counter_type cback_;
+ typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad4_;
buffer buffer_;
//@endcond
\endcode
@warning: \p %WeakRingBuffer is developed for 64-bit architecture.
- On 32-bit platform an integer overflow of internal counters is possible.
+ 32-bit platform must provide support for 64-bit atomics.
*/
#ifdef CDS_DOXYGEN_INVOKED
template <typename Traits = weak_ringbuffer::traits>
private:
//@cond
typedef typename traits::buffer::template rebind< uint8_t >::other buffer;
+ typedef uint64_t counter_type;
//@endcond
public:
/// [producer] Reserve \p size bytes
/**
- The function returns a pointer to reserved buffer of \p size bytes.
+ The function returns a pointer to reserved buffer of \p size bytes.
If no enough space in the ring buffer the function returns \p nullptr.
After successful \p %back() you should fill the buffer provided and call \p push_back():
size_t real_size = calc_real_size( size );
// check if we can reserve read_size bytes
- assert( real_size < capacity() );
- size_t back = back_.load( memory_model::memory_order_relaxed );
+ assert( real_size < capacity());
+ counter_type back = back_.load( memory_model::memory_order_relaxed );
- assert( back - pfront_ <= capacity() );
+ assert( static_cast<size_t>( back - pfront_ ) <= capacity());
- if ( pfront_ + capacity() - back < real_size ) {
+ if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
pfront_ = front_.load( memory_model::memory_order_acquire );
- if ( pfront_ + capacity() - back < real_size ) {
+ if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
// not enough space
return nullptr;
}
uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
// Check if the buffer free space is enough for storing real_size bytes
- size_t tail_size = capacity() - buffer_.mod( back );
+ size_t tail_size = capacity() - static_cast<size_t>( buffer_.mod( back ));
if ( tail_size < real_size ) {
// make unused tail
- assert( tail_size >= sizeof( size_t ) );
- assert( !is_tail( tail_size ) );
+ assert( tail_size >= sizeof( size_t ));
+ assert( !is_tail( tail_size ));
*reinterpret_cast<size_t*>( reserved ) = make_tail( tail_size - sizeof(size_t));
back += tail_size;
// We must be in beginning of buffer
assert( buffer_.mod( back ) == 0 );
- if ( pfront_ + capacity() - back < real_size ) {
+ if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
pfront_ = front_.load( memory_model::memory_order_acquire );
- if ( pfront_ + capacity() - back < real_size ) {
+ if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
// not enough space
return nullptr;
}
// reserve and store size
*reinterpret_cast<size_t*>( reserved ) = size;
- return reinterpret_cast<void*>( reserved + sizeof( size_t ) );
+ return reinterpret_cast<void*>( reserved + sizeof( size_t ));
}
/// [producer] Push reserved bytes into ring
*/
void push_back()
{
- size_t back = back_.load( memory_model::memory_order_relaxed );
+ counter_type back = back_.load( memory_model::memory_order_relaxed );
uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
- size_t real_size = calc_real_size( *reinterpret_cast<size_t*>( reserved ) );
- assert( real_size < capacity() );
+ size_t real_size = calc_real_size( *reinterpret_cast<size_t*>( reserved ));
+ assert( real_size < capacity());
back_.store( back + real_size, memory_model::memory_order_release );
}
*/
std::pair<void*, size_t> front()
{
- size_t front = front_.load( memory_model::memory_order_relaxed );
- assert( cback_ - front < capacity() );
+ counter_type front = front_.load( memory_model::memory_order_relaxed );
+ assert( static_cast<size_t>( cback_ - front ) < capacity());
if ( cback_ - front < sizeof( size_t )) {
cback_ = back_.load( memory_model::memory_order_acquire );
- if ( cback_ - front < sizeof( size_t ) )
+ if ( cback_ - front < sizeof( size_t ))
return std::make_pair( nullptr, 0u );
}
uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
// check alignment
- assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
+ assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 )) == 0 );
size_t size = *reinterpret_cast<size_t*>( buf );
- if ( is_tail( size ) ) {
+ if ( is_tail( size )) {
// unused tail, skip
- CDS_VERIFY( pop_front() );
+ CDS_VERIFY( pop_front());
front = front_.load( memory_model::memory_order_relaxed );
buf = buffer_.buffer() + buffer_.mod( front );
size = *reinterpret_cast<size_t*>( buf );
- assert( !is_tail( size ) );
- assert( buf == buffer_.buffer() );
+ assert( !is_tail( size ));
+ assert( buf == buffer_.buffer());
}
#ifdef _DEBUG
size_t real_size = calc_real_size( size );
- if ( cback_ - front < real_size ) {
+ if ( static_cast<size_t>( cback_ - front ) < real_size ) {
cback_ = back_.load( memory_model::memory_order_acquire );
- assert( cback_ - front >= real_size );
+ assert( static_cast<size_t>( cback_ - front ) >= real_size );
}
#endif
*/
bool pop_front()
{
- size_t front = front_.load( memory_model::memory_order_relaxed );
- assert( cback_ - front <= capacity() );
+ counter_type front = front_.load( memory_model::memory_order_relaxed );
+ assert( static_cast<size_t>( cback_ - front ) <= capacity());
- if ( cback_ - front < sizeof(size_t) ) {
+ if ( cback_ - front < sizeof(size_t)) {
cback_ = back_.load( memory_model::memory_order_acquire );
- if ( cback_ - front < sizeof( size_t ) )
+ if ( cback_ - front < sizeof( size_t ))
return false;
}
uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
// check alignment
- assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
+ assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 )) == 0 );
size_t size = *reinterpret_cast<size_t*>( buf );
size_t real_size = calc_real_size( untail( size ));
#ifdef _DEBUG
- if ( cback_ - front < real_size ) {
+ if ( static_cast<size_t>( cback_ - front ) < real_size ) {
cback_ = back_.load( memory_model::memory_order_acquire );
- assert( cback_ - front >= real_size );
+ assert( static_cast<size_t>( cback_ - front ) >= real_size );
}
#endif
/// [consumer] Clears the ring buffer
void clear()
{
- for ( auto el = front(); el.first; el = front() )
+ for ( auto el = front(); el.first; el = front())
pop_front();
}
/// Returns the current size of ring buffer
size_t size() const
{
- return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
+ return static_cast<size_t>( back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ));
}
/// Returns capacity of the ring buffer
size_t real_size = (( size + sizeof( uintptr_t ) - 1 ) & ~( sizeof( uintptr_t ) - 1 )) + sizeof( size_t );
assert( real_size > size );
- assert( real_size - size >= sizeof( size_t ) );
+ assert( real_size - size >= sizeof( size_t ));
return real_size;
}
static size_t untail( size_t size )
{
- return size & (( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ) ) - 1);
+ return size & (( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 )) - 1);
}
//@endcond
private:
//@cond
- atomics::atomic<size_t> front_;
- typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
- atomics::atomic<size_t> back_;
- typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
- size_t pfront_;
- typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
- size_t cback_;
- typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
+ atomics::atomic<counter_type> front_;
+ typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad1_;
+ atomics::atomic<counter_type> back_;
+ typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad2_;
+ counter_type pfront_;
+ typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad3_;
+ counter_type cback_;
+ typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad4_;
buffer buffer_;
//@endcond