Uses different pass count for different parallel queue test cases
[libcds.git] / cds / container / weak_ringbuffer.h
index 3ae5dd6446b7bb2a953b5743451ea3d0a11c5e67..0198c0327de928875f71aed9ff2313b85f9c272d 100644 (file)
@@ -96,7 +96,7 @@ namespace cds { namespace container {
             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
 
-            Example: declare \p %WeakRingBuffer with static iternal buffer of size 1024:
+            Example: declare \p %WeakRingBuffer with static iternal buffer for 1024 objects:
             \code
             typedef cds::container::WeakRingBuffer< Foo,
                 typename cds::container::weak_ringbuffer::make_traits<
@@ -127,9 +127,12 @@ namespace cds { namespace container {
         Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
         you can push/pop an array of elements.
 
-        There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer<void, Traits>" 
-        that is not a queue but a "memory pool" between producer and consumer threads. 
-        \p WeakRingBuffer<void> supports data of different size.
+        There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer<void, Traits>"
+        that is not a queue but a "memory pool" between producer and consumer threads.
+        \p WeakRingBuffer<void> supports variable-sized data.
+
+        @warning: \p %WeakRingBuffer is developed for 64-bit architecture.
+        32-bit platform must provide support for 64-bit atomics.
     */
     template <typename T, typename Traits = weak_ringbuffer::traits>
     class WeakRingBuffer: public cds::bounded_container
@@ -154,6 +157,7 @@ namespace cds { namespace container {
     private:
         //@cond
         typedef typename traits::buffer::template rebind< value_type >::other buffer;
+        typedef uint64_t    counter_type;
         //@endcond
 
     public:
@@ -178,8 +182,8 @@ namespace cds { namespace container {
         ~WeakRingBuffer()
         {
             value_cleaner cleaner;
-            size_t back = back_.load( memory_model::memory_order_relaxed );
-            for ( size_t front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
+            counter_type back = back_.load( memory_model::memory_order_relaxed );
+            for ( counter_type front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
                 cleaner( buffer_[ buffer_.mod( front ) ] );
         }
 
@@ -197,7 +201,7 @@ namespace cds { namespace container {
             \code
             cds::container::WeakRingBuffer<std::string> ringbuf;
             char const* arr[10];
-            ringbuf.push( arr, 10, 
+            ringbuf.push( arr, 10,
                 []( std::string& element, char const* src ) {
                     new( &element ) std::string( src );
                 });
@@ -217,15 +221,15 @@ namespace cds { namespace container {
         template <typename Q, typename CopyFunc>
         bool push( Q* arr, size_t count, CopyFunc copy )
         {
-            assert( count < capacity() );
-            size_t back = back_.load( memory_model::memory_order_relaxed );
+            assert( count < capacity());
+            counter_type back = back_.load( memory_model::memory_order_relaxed );
 
-            assert( back - pfront_ <= capacity() );
+            assert( static_cast<size_t>( back - pfront_ ) <= capacity());
 
-            if ( pfront_ + capacity() - back < count ) {
+            if ( static_cast<size_t>( pfront_ + capacity() - back ) < count ) {
                 pfront_ = front_.load( memory_model::memory_order_acquire );
 
-                if ( pfront_ + capacity() - back < count ) {
+                if ( static_cast<size_t>( pfront_ + capacity() - back ) < count ) {
                     // not enough space
                     return false;
                 }
@@ -270,9 +274,9 @@ namespace cds { namespace container {
         typename std::enable_if< std::is_constructible<value_type, Args...>::value, bool>::type
         emplace( Args&&... args )
         {
-            size_t back = back_.load( memory_model::memory_order_relaxed );
+            counter_type back = back_.load( memory_model::memory_order_relaxed );
 
-            assert( back - pfront_ <= capacity() );
+            assert( static_cast<size_t>( back - pfront_ ) <= capacity());
 
             if ( pfront_ + capacity() - back < 1 ) {
                 pfront_ = front_.load( memory_model::memory_order_acquire );
@@ -303,9 +307,9 @@ namespace cds { namespace container {
         template <typename Func>
         bool enqueue_with( Func f )
         {
-            size_t back = back_.load( memory_model::memory_order_relaxed );
+            counter_type back = back_.load( memory_model::memory_order_relaxed );
 
-            assert( back - pfront_ <= capacity() );
+            assert( static_cast<size_t>( back - pfront_ ) <= capacity());
 
             if ( pfront_ + capacity() - back < 1 ) {
                 pfront_ = front_.load( memory_model::memory_order_acquire );
@@ -373,14 +377,14 @@ namespace cds { namespace container {
         template <typename Q, typename CopyFunc>
         bool pop( Q* arr, size_t count, CopyFunc copy )
         {
-            assert( count < capacity() );
+            assert( count < capacity());
 
-            size_t front = front_.load( memory_model::memory_order_relaxed );
-            assert( cback_ - front < capacity() );
+            counter_type front = front_.load( memory_model::memory_order_relaxed );
+            assert( static_cast<size_t>( cback_ - front ) < capacity());
 
-            if ( cback_ - front < count ) {
+            if ( static_cast<size_t>( cback_ - front ) < count ) {
                 cback_ = back_.load( memory_model::memory_order_acquire );
-                if ( cback_ - front < count )
+                if ( static_cast<size_t>( cback_ - front ) < count )
                     return false;
             }
 
@@ -453,8 +457,8 @@ namespace cds { namespace container {
         template <typename Func>
         bool dequeue_with( Func f )
         {
-            size_t front = front_.load( memory_model::memory_order_relaxed );
-            assert( cback_ - front < capacity() );
+            counter_type front = front_.load( memory_model::memory_order_relaxed );
+            assert( static_cast<size_t>( cback_ - front ) < capacity());
 
             if ( cback_ - front < 1 ) {
                 cback_ = back_.load( memory_model::memory_order_acquire );
@@ -486,8 +490,8 @@ namespace cds { namespace container {
         */
         value_type* front()
         {
-            size_t front = front_.load( memory_model::memory_order_relaxed );
-            assert( cback_ - front < capacity() );
+            counter_type front = front_.load( memory_model::memory_order_relaxed );
+            assert( static_cast<size_t>( cback_ - front ) < capacity());
 
             if ( cback_ - front < 1 ) {
                 cback_ = back_.load( memory_model::memory_order_acquire );
@@ -505,8 +509,8 @@ namespace cds { namespace container {
         */
         bool pop_front()
         {
-            size_t front = front_.load( memory_model::memory_order_relaxed );
-            assert( cback_ - front <= capacity() );
+            counter_type front = front_.load( memory_model::memory_order_relaxed );
+            assert( static_cast<size_t>( cback_ - front ) <= capacity());
 
             if ( cback_ - front < 1 ) {
                 cback_ = back_.load( memory_model::memory_order_acquire );
@@ -525,7 +529,7 @@ namespace cds { namespace container {
         void clear()
         {
             value_type v;
-            while ( pop( v ) );
+            while ( pop( v ));
         }
 
         /// Checks if the ring-buffer is empty
@@ -543,7 +547,7 @@ namespace cds { namespace container {
         /// Returns the current size of ring buffer
         size_t size() const
         {
-            return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
+            return static_cast<size_t>( back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ));
         }
 
         /// Returns capacity of the ring buffer
@@ -554,14 +558,14 @@ namespace cds { namespace container {
 
     private:
         //@cond
-        atomics::atomic<size_t>     front_;
-        typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
-        atomics::atomic<size_t>     back_;
-        typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
-        size_t                      pfront_;
-        typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
-        size_t                      cback_;
-        typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
+        atomics::atomic<counter_type>   front_;
+        typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad1_;
+        atomics::atomic<counter_type>   back_;
+        typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad2_;
+        counter_type                    pfront_;
+        typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad3_;
+        counter_type                    cback_;
+        typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad4_;
 
         buffer                      buffer_;
         //@endcond
@@ -571,8 +575,68 @@ namespace cds { namespace container {
     /// Single-producer single-consumer ring buffer for untyped variable-sized data
     /** @ingroup cds_nonintrusive_queue
         @anchor cds_nonintrusive_WeakRingBuffer_void
+
+        This SPSC ring-buffer is intended for data of variable size. The producer
+        allocates a buffer from ring, you fill it with data and pushes them back to ring.
+        The consumer thread reads data from front-end and then pops them:
+        \code
+        // allocates 1M ring buffer
+        WeakRingBuffer<void>    theRing( 1024 * 1024 );
+
+        void producer_thread()
+        {
+            // Get data of size N bytes
+            size_t size;
+            void*  data;
+
+            while ( true ) {
+                // Get external data
+                std::tie( data, size ) = get_data();
+
+                if ( data == nullptr )
+                    break;
+
+                // Allocates a buffer from the ring
+                void* buf = theRing.back( size );
+                if ( !buf ) {
+                    std::cout << "The ring is full" << std::endl;
+                    break;
+                }
+
+                memcpy( buf, data, size );
+
+                // Push data into the ring
+                theRing.push_back();
+            }
+        }
+
+        void consumer_thread()
+        {
+            while ( true ) {
+                auto buf = theRing.front();
+
+                if ( buf.first == nullptr ) {
+                    std::cout << "The ring is empty" << std::endl;
+                    break;
+                }
+
+                // Process data
+                process_data( buf.first, buf.second );
+
+                // Free buffer
+                theRing.pop_front();
+            }
+        }
+        \endcode
+
+        @warning: \p %WeakRingBuffer is developed for 64-bit architecture.
+        32-bit platform must provide support for 64-bit atomics.
     */
+#ifdef CDS_DOXYGEN_INVOKED
     template <typename Traits = weak_ringbuffer::traits>
+#else
+    template <typename Traits>
+#endif
     class WeakRingBuffer<void, Traits>: public cds::bounded_container
     {
     public:
@@ -582,6 +646,7 @@ namespace cds { namespace container {
     private:
         //@cond
         typedef typename traits::buffer::template rebind< uint8_t >::other buffer;
+        typedef uint64_t    counter_type;
         //@endcond
 
     public:
@@ -602,21 +667,60 @@ namespace cds { namespace container {
         }
 
         /// [producer] Reserve \p size bytes
+        /**
+            The function returns a pointer to reserved buffer of \p size bytes.
+            If no enough space in the ring buffer the function returns \p nullptr.
+
+            After successful \p %back() you should fill the buffer provided and call \p push_back():
+            \code
+            // allocates 1M ring buffer
+            WeakRingBuffer<void>    theRing( 1024 * 1024 );
+
+            void producer_thread()
+            {
+                // Get data of size N bytes
+                size_t size;1
+                void*  data;
+
+                while ( true ) {
+                    // Get external data
+                    std::tie( data, size ) = get_data();
+
+                    if ( data == nullptr )
+                        break;
+
+                    // Allocates a buffer from the ring
+                    void* buf = theRing.back( size );
+                    if ( !buf ) {
+                        std::cout << "The ring is full" << std::endl;
+                        break;
+                    }
+
+                    memcpy( buf, data, size );
+
+                    // Push data into the ring
+                    theRing.push_back();
+                }
+            }
+            \endcode
+        */
         void* back( size_t size )
         {
+            assert( size > 0 );
+
             // Any data is rounded to 8-byte boundary
             size_t real_size = calc_real_size( size );
 
             // check if we can reserve read_size bytes
-            assert( real_size < capacity() );
-            size_t back = back_.load( memory_model::memory_order_relaxed );
+            assert( real_size < capacity());
+            counter_type back = back_.load( memory_model::memory_order_relaxed );
 
-            assert( back - pfront_ <= capacity() );
+            assert( static_cast<size_t>( back - pfront_ ) <= capacity());
 
-            if ( pfront_ + capacity() - back < real_size ) {
+            if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
                 pfront_ = front_.load( memory_model::memory_order_acquire );
 
-                if ( pfront_ + capacity() - back < real_size ) {
+                if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
                     // not enough space
                     return nullptr;
                 }
@@ -625,11 +729,11 @@ namespace cds { namespace container {
             uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
 
             // Check if the buffer free space is enough for storing real_size bytes
-            size_t tail_size = capacity() - buffer_.mod( back );
+            size_t tail_size = capacity() - static_cast<size_t>( buffer_.mod( back ));
             if ( tail_size < real_size ) {
                 // make unused tail
-                assert( tail_size >= sizeof( size_t ) );
-                assert( !is_tail( tail_size ) );
+                assert( tail_size >= sizeof( size_t ));
+                assert( !is_tail( tail_size ));
 
                 *reinterpret_cast<size_t*>( reserved ) = make_tail( tail_size - sizeof(size_t));
                 back += tail_size;
@@ -637,38 +741,77 @@ namespace cds { namespace container {
                 // We must be in beginning of buffer
                 assert( buffer_.mod( back ) == 0 );
 
-                if ( pfront_ + capacity() - back < real_size ) {
+                if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
                     pfront_ = front_.load( memory_model::memory_order_acquire );
 
-                    if ( pfront_ + capacity() - back < real_size ) {
+                    if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
                         // not enough space
                         return nullptr;
                     }
                 }
 
+                back_.store( back, memory_model::memory_order_release );
                 reserved = buffer_.buffer();
             }
 
             // reserve and store size
-            uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
             *reinterpret_cast<size_t*>( reserved ) = size;
 
-            return reinterpret_cast<void*>( reserved + sizeof( size_t ) );
+            return reinterpret_cast<void*>( reserved + sizeof( size_t ));
         }
 
         /// [producer] Push reserved bytes into ring
+        /**
+            The function pushes reserved buffer into the ring. Afte this call,
+            the buffer becomes visible by a consumer:
+            \code
+            // allocates 1M ring buffer
+            WeakRingBuffer<void>    theRing( 1024 * 1024 );
+
+            void producer_thread()
+            {
+                // Get data of size N bytes
+                size_t size;1
+                void*  data;
+
+                while ( true ) {
+                    // Get external data
+                    std::tie( data, size ) = get_data();
+
+                    if ( data == nullptr )
+                        break;
+
+                    // Allocates a buffer from the ring
+                    void* buf = theRing.back( size );
+                    if ( !buf ) {
+                        std::cout << "The ring is full" << std::endl;
+                        break;
+                    }
+
+                    memcpy( buf, data, size );
+
+                    // Push data into the ring
+                    theRing.push_back();
+                }
+            }
+            \endcode
+        */
         void push_back()
         {
-            size_t back = back_.load( memory_model::memory_order_relaxed );
+            counter_type back = back_.load( memory_model::memory_order_relaxed );
             uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
 
-            size_t real_size = calc_real_size( *reinterpret_cast<size_t*>( reserved ) );
-            assert( real_size < capacity() );
+            size_t real_size = calc_real_size( *reinterpret_cast<size_t*>( reserved ));
+            assert( real_size < capacity());
 
             back_.store( back + real_size, memory_model::memory_order_release );
         }
 
         /// [producer] Push \p data of \p size bytes into ring
+        /**
+            This function invokes \p back( size ), \p memcpy( buf, data, size )
+            and \p push_back() in one call.
+        */
         bool push_back( void const* data, size_t size )
         {
             void* buf = back( size );
@@ -681,71 +824,98 @@ namespace cds { namespace container {
         }
 
         /// [consumer] Get top data from the ring
+        /**
+            If the ring is empty, the function returns \p nullptr in \p std:pair::first.
+        */
         std::pair<void*, size_t> front()
         {
-            size_t front = front_.load( memory_model::memory_order_relaxed );
-            assert( cback_ - front < capacity() );
+            counter_type front = front_.load( memory_model::memory_order_relaxed );
+            assert( static_cast<size_t>( cback_ - front ) < capacity());
 
             if ( cback_ - front < sizeof( size_t )) {
                 cback_ = back_.load( memory_model::memory_order_acquire );
-                if ( cback_ - front < sizeof( size_t ) )
+                if ( cback_ - front < sizeof( size_t ))
                     return std::make_pair( nullptr, 0u );
             }
 
             uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
 
             // check alignment
-            assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
+            assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 )) == 0 );
 
             size_t size = *reinterpret_cast<size_t*>( buf );
-            if ( is_tail( size ) ) {
+            if ( is_tail( size )) {
                 // unused tail, skip
-                CDS_VERIFY( pop_front() );
+                CDS_VERIFY( pop_front());
 
                 front = front_.load( memory_model::memory_order_relaxed );
                 buf = buffer_.buffer() + buffer_.mod( front );
                 size = *reinterpret_cast<size_t*>( buf );
 
-                assert( !is_tail( size ) );
+                assert( !is_tail( size ));
+                assert( buf == buffer_.buffer());
             }
 
 #ifdef _DEBUG
             size_t real_size = calc_real_size( size );
-            if ( cback_ - front < real_size ) {
+            if ( static_cast<size_t>( cback_ - front ) < real_size ) {
                 cback_ = back_.load( memory_model::memory_order_acquire );
-                assert( cback_ - front >= real_size );
+                assert( static_cast<size_t>( cback_ - front ) >= real_size );
             }
 #endif
 
-            return std::make_pair( reinterpret_cast<void*>( buf + sizeof( size_t ) ), size );
+            return std::make_pair( reinterpret_cast<void*>( buf + sizeof( size_t )), size );
         }
 
         /// [consumer] Pops top data
+        /**
+            Typical consumer workloop:
+            \code
+            // allocates 1M ring buffer
+            WeakRingBuffer<void>    theRing( 1024 * 1024 );
+
+            void consumer_thread()
+            {
+                while ( true ) {
+                    auto buf = theRing.front();
+
+                    if ( buf.first == nullptr ) {
+                        std::cout << "The ring is empty" << std::endl;
+                        break;
+                    }
+
+                    // Process data
+                    process_data( buf.first, buf.second );
+
+                    // Free buffer
+                    theRing.pop_front();
+                }
+            }
+            \endcode
+        */
         bool pop_front()
         {
-            size_t front = front_.load( memory_model::memory_order_relaxed );
-            assert( cback_ - front <= capacity() );
+            counter_type front = front_.load( memory_model::memory_order_relaxed );
+            assert( static_cast<size_t>( cback_ - front ) <= capacity());
 
-            if ( cback_ - front < sizeof(size_t) ) {
+            if ( cback_ - front < sizeof(size_t)) {
                 cback_ = back_.load( memory_model::memory_order_acquire );
-                if ( cback_ - front < sizeof( size_t ) )
+                if ( cback_ - front < sizeof( size_t ))
                     return false;
             }
 
             uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
 
             // check alignment
-            assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
+            assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 )) == 0 );
 
             size_t size = *reinterpret_cast<size_t*>( buf );
-            assert( !is_tail( size ) );
-
-            size_t real_size = calc_real_size( size );
+            size_t real_size = calc_real_size( untail( size ));
 
 #ifdef _DEBUG
-            if ( cback_ - front < real_size ) {
+            if ( static_cast<size_t>( cback_ - front ) < real_size ) {
                 cback_ = back_.load( memory_model::memory_order_acquire );
-                assert( cback_ - front >= real_size );
+                assert( static_cast<size_t>( cback_ - front ) >= real_size );
             }
 #endif
 
@@ -757,7 +927,7 @@ namespace cds { namespace container {
         /// [consumer] Clears the ring buffer
         void clear()
         {
-            for ( auto el = front(); el.first; el = front() )
+            for ( auto el = front(); el.first; el = front())
                 pop_front();
         }
 
@@ -776,7 +946,7 @@ namespace cds { namespace container {
         /// Returns the current size of ring buffer
         size_t size() const
         {
-            return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
+            return static_cast<size_t>( back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ));
         }
 
         /// Returns capacity of the ring buffer
@@ -786,12 +956,13 @@ namespace cds { namespace container {
         }
 
     private:
+        //@cond
         static size_t calc_real_size( size_t size )
         {
             size_t real_size =  (( size + sizeof( uintptr_t ) - 1 ) & ~( sizeof( uintptr_t ) - 1 )) + sizeof( size_t );
 
             assert( real_size > size );
-            assert( real_size - size >= sizeof( size_t ) );
+            assert( real_size - size >= sizeof( size_t ));
 
             return real_size;
         }
@@ -806,16 +977,22 @@ namespace cds { namespace container {
             return size | ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ));
         }
 
+        static size_t untail( size_t size )
+        {
+            return size & (( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 )) - 1);
+        }
+        //@endcond
+
     private:
         //@cond
-        atomics::atomic<size_t>     front_;
-        typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
-        atomics::atomic<size_t>     back_;
-        typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
-        size_t                      pfront_;
-        typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
-        size_t                      cback_;
-        typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
+        atomics::atomic<counter_type>     front_;
+        typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad1_;
+        atomics::atomic<counter_type>     back_;
+        typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad2_;
+        counter_type                      pfront_;
+        typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad3_;
+        counter_type                      cback_;
+        typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad4_;
 
         buffer                      buffer_;
         //@endcond