Added prototype for untyped variable-sized WeakRingBuffer<void> (not tested yet,...
authorkhizmax <khizmax@gmail.com>
Fri, 12 May 2017 14:08:59 +0000 (17:08 +0300)
committerkhizmax <khizmax@gmail.com>
Fri, 12 May 2017 14:08:59 +0000 (17:08 +0300)
cds/container/weak_ringbuffer.h

index 17b249b0a6c2f6d13149dfc54a519b6061129bf5..3ae5dd6446b7bb2a953b5743451ea3d0a11c5e67 100644 (file)
@@ -126,6 +126,10 @@ namespace cds { namespace container {
 
         Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
         you can push/pop an array of elements.
+
+        There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer<void, Traits>" 
+        that is not a queue but a "memory pool" between producer and consumer threads. 
+        \p WeakRingBuffer<void> supports data of different size.
     */
     template <typename T, typename Traits = weak_ringbuffer::traits>
     class WeakRingBuffer: public cds::bounded_container
@@ -208,7 +212,7 @@ namespace cds { namespace container {
                 });
             \endcode
 
-            Returns \p true if success or \p false if not enought sufficient space in the ring
+            Returns \p true if success or \p false if not enough space in the ring
         */
         template <typename Q, typename CopyFunc>
         bool push( Q* arr, size_t count, CopyFunc copy )
@@ -222,7 +226,7 @@ namespace cds { namespace container {
                 pfront_ = front_.load( memory_model::memory_order_acquire );
 
                 if ( pfront_ + capacity() - back < count ) {
-                    // not enought space
+                    // not enough space
                     return false;
                 }
             }
@@ -246,7 +250,7 @@ namespace cds { namespace container {
             The function is available only if <tt>std::is_constructible<value_type, Q>::value</tt>
             is \p true.
 
-            Returns \p true if success or \p false if not enought sufficient space in the ring
+            Returns \p true if success or \p false if not enough space in the ring
         */
         template <typename Q>
         typename std::enable_if< std::is_constructible<value_type, Q>::value, bool>::type
@@ -274,7 +278,7 @@ namespace cds { namespace container {
                 pfront_ = front_.load( memory_model::memory_order_acquire );
 
                 if ( pfront_ + capacity() - back < 1 ) {
-                    // not enought space
+                    // not enough space
                     return false;
                 }
             }
@@ -307,7 +311,7 @@ namespace cds { namespace container {
                 pfront_ = front_.load( memory_model::memory_order_acquire );
 
                 if ( pfront_ + capacity() - back < 1 ) {
-                    // not enought space
+                    // not enough space
                     return false;
                 }
             }
@@ -364,7 +368,7 @@ namespace cds { namespace container {
             void copy_func( Q& dest, value_type& elemen );
             \endcode
 
-            Returns \p true if success or \p false if not enought sufficient space in the ring
+            Returns \p true if success or \p false if not enough space in the ring
         */
         template <typename Q, typename CopyFunc>
         bool pop( Q* arr, size_t count, CopyFunc copy )
@@ -402,7 +406,7 @@ namespace cds { namespace container {
             The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
             is \p true.
 
-            Returns \p true if success or \p false if not enought sufficient space in the ring
+            Returns \p true if success or \p false if not enough space in the ring
         */
         template <typename Q>
         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
@@ -517,13 +521,245 @@ namespace cds { namespace container {
             return true;
         }
 
-        /// Clears the ring buffer
+        /// Clears the ring buffer (only consumer can call this function!)
         void clear()
         {
             value_type v;
             while ( pop( v ) );
         }
 
+        /// Checks if the ring-buffer is empty
+        bool empty() const
+        {
+            return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
+        }
+
+        /// Checks if the ring-buffer is full
+        bool full() const
+        {
+            return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
+        }
+
+        /// Returns the current size of ring buffer
+        size_t size() const
+        {
+            return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
+        }
+
+        /// Returns capacity of the ring buffer
+        size_t capacity() const
+        {
+            return buffer_.capacity();
+        }
+
+    private:
+        //@cond
+        atomics::atomic<size_t>     front_;
+        typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
+        atomics::atomic<size_t>     back_;
+        typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
+        size_t                      pfront_;
+        typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
+        size_t                      cback_;
+        typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
+
+        buffer                      buffer_;
+        //@endcond
+    };
+
+
+    /// Single-producer single-consumer ring buffer for untyped variable-sized data
+    /** @ingroup cds_nonintrusive_queue
+        @anchor cds_nonintrusive_WeakRingBuffer_void
+    */
+    template <typename Traits = weak_ringbuffer::traits>
+    class WeakRingBuffer<void, Traits>: public cds::bounded_container
+    {
+    public:
+        typedef Traits      traits;         ///< Ring buffer traits
+        typedef typename    traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
+
+    private:
+        //@cond
+        typedef typename traits::buffer::template rebind< uint8_t >::other buffer;
+        //@endcond
+
+    public:
+        /// Creates the ring buffer of \p capacity bytes
+        /**
+            For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
+
+            If the buffer capacity is a power of two, lightweight binary arithmetics is used
+            instead of modulo arithmetics.
+        */
+        WeakRingBuffer( size_t capacity = 0 )
+            : front_( 0 )
+            , pfront_( 0 )
+            , cback_( 0 )
+            , buffer_( capacity )
+        {
+            back_.store( 0, memory_model::memory_order_release );
+        }
+
+        /// [producer] Reserve \p size bytes
+        void* back( size_t size )
+        {
+            // Any data is rounded to 8-byte boundary
+            size_t real_size = calc_real_size( size );
+
+            // check if we can reserve read_size bytes
+            assert( real_size < capacity() );
+            size_t back = back_.load( memory_model::memory_order_relaxed );
+
+            assert( back - pfront_ <= capacity() );
+
+            if ( pfront_ + capacity() - back < real_size ) {
+                pfront_ = front_.load( memory_model::memory_order_acquire );
+
+                if ( pfront_ + capacity() - back < real_size ) {
+                    // not enough space
+                    return nullptr;
+                }
+            }
+
+            uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
+
+            // Check if the buffer free space is enough for storing real_size bytes
+            size_t tail_size = capacity() - buffer_.mod( back );
+            if ( tail_size < real_size ) {
+                // make unused tail
+                assert( tail_size >= sizeof( size_t ) );
+                assert( !is_tail( tail_size ) );
+
+                *reinterpret_cast<size_t*>( reserved ) = make_tail( tail_size - sizeof(size_t));
+                back += tail_size;
+
+                // We must be in beginning of buffer
+                assert( buffer_.mod( back ) == 0 );
+
+                if ( pfront_ + capacity() - back < real_size ) {
+                    pfront_ = front_.load( memory_model::memory_order_acquire );
+
+                    if ( pfront_ + capacity() - back < real_size ) {
+                        // not enough space
+                        return nullptr;
+                    }
+                }
+
+                reserved = buffer_.buffer();
+            }
+
+            // reserve and store size
+            uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
+            *reinterpret_cast<size_t*>( reserved ) = size;
+
+            return reinterpret_cast<void*>( reserved + sizeof( size_t ) );
+        }
+
+        /// [producer] Push reserved bytes into ring
+        void push_back()
+        {
+            size_t back = back_.load( memory_model::memory_order_relaxed );
+            uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
+
+            size_t real_size = calc_real_size( *reinterpret_cast<size_t*>( reserved ) );
+            assert( real_size < capacity() );
+
+            back_.store( back + real_size, memory_model::memory_order_release );
+        }
+
+        /// [producer] Push \p data of \p size bytes into ring
+        bool push_back( void const* data, size_t size )
+        {
+            void* buf = back( size );
+            if ( buf ) {
+                memcpy( buf, data, size );
+                push_back();
+                return true;
+            }
+            return false;
+        }
+
+        /// [consumer] Get top data from the ring
+        std::pair<void*, size_t> front()
+        {
+            size_t front = front_.load( memory_model::memory_order_relaxed );
+            assert( cback_ - front < capacity() );
+
+            if ( cback_ - front < sizeof( size_t )) {
+                cback_ = back_.load( memory_model::memory_order_acquire );
+                if ( cback_ - front < sizeof( size_t ) )
+                    return std::make_pair( nullptr, 0u );
+            }
+
+            uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
+
+            // check alignment
+            assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
+
+            size_t size = *reinterpret_cast<size_t*>( buf );
+            if ( is_tail( size ) ) {
+                // unused tail, skip
+                CDS_VERIFY( pop_front() );
+
+                front = front_.load( memory_model::memory_order_relaxed );
+                buf = buffer_.buffer() + buffer_.mod( front );
+                size = *reinterpret_cast<size_t*>( buf );
+
+                assert( !is_tail( size ) );
+            }
+
+#ifdef _DEBUG
+            size_t real_size = calc_real_size( size );
+            if ( cback_ - front < real_size ) {
+                cback_ = back_.load( memory_model::memory_order_acquire );
+                assert( cback_ - front >= real_size );
+            }
+#endif
+
+            return std::make_pair( reinterpret_cast<void*>( buf + sizeof( size_t ) ), size );
+        }
+
+        /// [consumer] Pops top data
+        bool pop_front()
+        {
+            size_t front = front_.load( memory_model::memory_order_relaxed );
+            assert( cback_ - front <= capacity() );
+
+            if ( cback_ - front < sizeof(size_t) ) {
+                cback_ = back_.load( memory_model::memory_order_acquire );
+                if ( cback_ - front < sizeof( size_t ) )
+                    return false;
+            }
+
+            uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
+
+            // check alignment
+            assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
+
+            size_t size = *reinterpret_cast<size_t*>( buf );
+            assert( !is_tail( size ) );
+
+            size_t real_size = calc_real_size( size );
+
+#ifdef _DEBUG
+            if ( cback_ - front < real_size ) {
+                cback_ = back_.load( memory_model::memory_order_acquire );
+                assert( cback_ - front >= real_size );
+            }
+#endif
+
+            front_.store( front + real_size, memory_model::memory_order_release );
+            return true;
+
+        }
+
+        /// [consumer] Clears the ring buffer
+        void clear()
+        {
+            for ( auto el = front(); el.first; el = front() )
+                pop_front();
+        }
 
         /// Checks if the ring-buffer is empty
         bool empty() const
@@ -549,6 +785,27 @@ namespace cds { namespace container {
             return buffer_.capacity();
         }
 
+    private:
+        static size_t calc_real_size( size_t size )
+        {
+            size_t real_size =  (( size + sizeof( uintptr_t ) - 1 ) & ~( sizeof( uintptr_t ) - 1 )) + sizeof( size_t );
+
+            assert( real_size > size );
+            assert( real_size - size >= sizeof( size_t ) );
+
+            return real_size;
+        }
+
+        static bool is_tail( size_t size )
+        {
+            return ( size & ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ))) != 0;
+        }
+
+        static size_t make_tail( size_t size )
+        {
+            return size | ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ));
+        }
+
     private:
         //@cond
         atomics::atomic<size_t>     front_;