Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
you can push/pop an array of elements.
+
+ There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer<void, Traits>"
+ that is not a queue but a "memory pool" between producer and consumer threads.
+ \p WeakRingBuffer<void> supports data of different size.
*/
template <typename T, typename Traits = weak_ringbuffer::traits>
class WeakRingBuffer: public cds::bounded_container
});
\endcode
- Returns \p true if success or \p false if not enought sufficient space in the ring
+ Returns \p true if success or \p false if not enough space in the ring
*/
template <typename Q, typename CopyFunc>
bool push( Q* arr, size_t count, CopyFunc copy )
pfront_ = front_.load( memory_model::memory_order_acquire );
if ( pfront_ + capacity() - back < count ) {
- // not enought space
+ // not enough space
return false;
}
}
The function is available only if <tt>std::is_constructible<value_type, Q>::value</tt>
is \p true.
- Returns \p true if success or \p false if not enought sufficient space in the ring
+ Returns \p true if success or \p false if not enough space in the ring
*/
template <typename Q>
typename std::enable_if< std::is_constructible<value_type, Q>::value, bool>::type
pfront_ = front_.load( memory_model::memory_order_acquire );
if ( pfront_ + capacity() - back < 1 ) {
- // not enought space
+ // not enough space
return false;
}
}
pfront_ = front_.load( memory_model::memory_order_acquire );
if ( pfront_ + capacity() - back < 1 ) {
- // not enought space
+ // not enough space
return false;
}
}
void copy_func( Q& dest, value_type& elemen );
\endcode
- Returns \p true if success or \p false if not enought sufficient space in the ring
+ Returns \p true if success or \p false if not enough space in the ring
*/
template <typename Q, typename CopyFunc>
bool pop( Q* arr, size_t count, CopyFunc copy )
The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
is \p true.
- Returns \p true if success or \p false if not enought sufficient space in the ring
+ Returns \p true if success or \p false if not enough space in the ring
*/
template <typename Q>
typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
return true;
}
- /// Clears the ring buffer
+ /// Clears the ring buffer (only consumer can call this function!)
void clear()
{
value_type v;
while ( pop( v ) );
}
+ /// Checks if the ring-buffer is empty
+ bool empty() const
+ {
+ return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
+ }
+
+ /// Checks if the ring-buffer is full
+ bool full() const
+ {
+ return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
+ }
+
+ /// Returns the current size of ring buffer
+ size_t size() const
+ {
+ return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
+ }
+
+ /// Returns capacity of the ring buffer
+ size_t capacity() const
+ {
+ return buffer_.capacity();
+ }
+
+ private:
+ //@cond
+ atomics::atomic<size_t> front_;
+ typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
+ atomics::atomic<size_t> back_;
+ typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
+ size_t pfront_;
+ typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
+ size_t cback_;
+ typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
+
+ buffer buffer_;
+ //@endcond
+ };
+
+
+ /// Single-producer single-consumer ring buffer for untyped variable-sized data
+ /** @ingroup cds_nonintrusive_queue
+ @anchor cds_nonintrusive_WeakRingBuffer_void
+ */
+ template <typename Traits = weak_ringbuffer::traits>
+ class WeakRingBuffer<void, Traits>: public cds::bounded_container
+ {
+ public:
+ typedef Traits traits; ///< Ring buffer traits
+ typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option
+
+ private:
+ //@cond
+ typedef typename traits::buffer::template rebind< uint8_t >::other buffer;
+ //@endcond
+
+ public:
+ /// Creates the ring buffer of \p capacity bytes
+ /**
+ For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
+
+ If the buffer capacity is a power of two, lightweight binary arithmetics is used
+ instead of modulo arithmetics.
+ */
+ WeakRingBuffer( size_t capacity = 0 )
+ : front_( 0 )
+ , pfront_( 0 )
+ , cback_( 0 )
+ , buffer_( capacity )
+ {
+ back_.store( 0, memory_model::memory_order_release );
+ }
+
+ /// [producer] Reserve \p size bytes
+ void* back( size_t size )
+ {
+ // Any data is rounded to 8-byte boundary
+ size_t real_size = calc_real_size( size );
+
+ // check if we can reserve read_size bytes
+ assert( real_size < capacity() );
+ size_t back = back_.load( memory_model::memory_order_relaxed );
+
+ assert( back - pfront_ <= capacity() );
+
+ if ( pfront_ + capacity() - back < real_size ) {
+ pfront_ = front_.load( memory_model::memory_order_acquire );
+
+ if ( pfront_ + capacity() - back < real_size ) {
+ // not enough space
+ return nullptr;
+ }
+ }
+
+ uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
+
+ // Check if the buffer free space is enough for storing real_size bytes
+ size_t tail_size = capacity() - buffer_.mod( back );
+ if ( tail_size < real_size ) {
+ // make unused tail
+ assert( tail_size >= sizeof( size_t ) );
+ assert( !is_tail( tail_size ) );
+
+ *reinterpret_cast<size_t*>( reserved ) = make_tail( tail_size - sizeof(size_t));
+ back += tail_size;
+
+ // We must be in beginning of buffer
+ assert( buffer_.mod( back ) == 0 );
+
+ if ( pfront_ + capacity() - back < real_size ) {
+ pfront_ = front_.load( memory_model::memory_order_acquire );
+
+ if ( pfront_ + capacity() - back < real_size ) {
+ // not enough space
+ return nullptr;
+ }
+ }
+
+ reserved = buffer_.buffer();
+ }
+
+ // reserve and store size
+ uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
+ *reinterpret_cast<size_t*>( reserved ) = size;
+
+ return reinterpret_cast<void*>( reserved + sizeof( size_t ) );
+ }
+
+ /// [producer] Push reserved bytes into ring
+ void push_back()
+ {
+ size_t back = back_.load( memory_model::memory_order_relaxed );
+ uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
+
+ size_t real_size = calc_real_size( *reinterpret_cast<size_t*>( reserved ) );
+ assert( real_size < capacity() );
+
+ back_.store( back + real_size, memory_model::memory_order_release );
+ }
+
+ /// [producer] Push \p data of \p size bytes into ring
+ bool push_back( void const* data, size_t size )
+ {
+ void* buf = back( size );
+ if ( buf ) {
+ memcpy( buf, data, size );
+ push_back();
+ return true;
+ }
+ return false;
+ }
+
+ /// [consumer] Get top data from the ring
+ std::pair<void*, size_t> front()
+ {
+ size_t front = front_.load( memory_model::memory_order_relaxed );
+ assert( cback_ - front < capacity() );
+
+ if ( cback_ - front < sizeof( size_t )) {
+ cback_ = back_.load( memory_model::memory_order_acquire );
+ if ( cback_ - front < sizeof( size_t ) )
+ return std::make_pair( nullptr, 0u );
+ }
+
+ uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
+
+ // check alignment
+ assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
+
+ size_t size = *reinterpret_cast<size_t*>( buf );
+ if ( is_tail( size ) ) {
+ // unused tail, skip
+ CDS_VERIFY( pop_front() );
+
+ front = front_.load( memory_model::memory_order_relaxed );
+ buf = buffer_.buffer() + buffer_.mod( front );
+ size = *reinterpret_cast<size_t*>( buf );
+
+ assert( !is_tail( size ) );
+ }
+
+#ifdef _DEBUG
+ size_t real_size = calc_real_size( size );
+ if ( cback_ - front < real_size ) {
+ cback_ = back_.load( memory_model::memory_order_acquire );
+ assert( cback_ - front >= real_size );
+ }
+#endif
+
+ return std::make_pair( reinterpret_cast<void*>( buf + sizeof( size_t ) ), size );
+ }
+
+ /// [consumer] Pops top data
+ bool pop_front()
+ {
+ size_t front = front_.load( memory_model::memory_order_relaxed );
+ assert( cback_ - front <= capacity() );
+
+ if ( cback_ - front < sizeof(size_t) ) {
+ cback_ = back_.load( memory_model::memory_order_acquire );
+ if ( cback_ - front < sizeof( size_t ) )
+ return false;
+ }
+
+ uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
+
+ // check alignment
+ assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
+
+ size_t size = *reinterpret_cast<size_t*>( buf );
+ assert( !is_tail( size ) );
+
+ size_t real_size = calc_real_size( size );
+
+#ifdef _DEBUG
+ if ( cback_ - front < real_size ) {
+ cback_ = back_.load( memory_model::memory_order_acquire );
+ assert( cback_ - front >= real_size );
+ }
+#endif
+
+ front_.store( front + real_size, memory_model::memory_order_release );
+ return true;
+
+ }
+
+ /// [consumer] Clears the ring buffer
+ void clear()
+ {
+ for ( auto el = front(); el.first; el = front() )
+ pop_front();
+ }
/// Checks if the ring-buffer is empty
bool empty() const
return buffer_.capacity();
}
+ private:
+ static size_t calc_real_size( size_t size )
+ {
+ size_t real_size = (( size + sizeof( uintptr_t ) - 1 ) & ~( sizeof( uintptr_t ) - 1 )) + sizeof( size_t );
+
+ assert( real_size > size );
+ assert( real_size - size >= sizeof( size_t ) );
+
+ return real_size;
+ }
+
+ static bool is_tail( size_t size )
+ {
+ return ( size & ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ))) != 0;
+ }
+
+ static size_t make_tail( size_t size )
+ {
+ return size | ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ));
+ }
+
private:
//@cond
atomics::atomic<size_t> front_;