Added WeakRingBuffer - a single-producer/single-consumer queue based on ring buffer
authorkhizmax <libcds.dev@gmail.com>
Tue, 9 May 2017 08:04:54 +0000 (11:04 +0300)
committerkhizmax <libcds.dev@gmail.com>
Tue, 9 May 2017 08:04:54 +0000 (11:04 +0300)
14 files changed:
cds/algo/atomic.h
cds/compiler/defs.h
cds/container/vyukov_mpmc_cycle_queue.h
cds/container/weak_ringbuffer.h [new file with mode: 0644]
cds/opt/buffer.h
cds/opt/value_cleaner.h
projects/Win/vc141/cds.vcxproj
projects/Win/vc141/cds.vcxproj.filters
projects/Win/vc141/gtest-queue.vcxproj
projects/Win/vc141/gtest-queue.vcxproj.filters
test/stress/queue/queue_type.h
test/unit/queue/CMakeLists.txt
test/unit/queue/test_bounded_queue.h
test/unit/queue/weak_ringbuffer.cpp [new file with mode: 0644]

index c27f70334eee199af7dcdbb662c4897858048cd9..56b54ef7e51960b41ccbe75bdda23e3474be2471 100644 (file)
@@ -199,8 +199,8 @@ namespace cds {
         class item_counter
         {
         public:
-            typedef atomics::atomic_size_t   atomic_type ;   ///< atomic type used
-            typedef size_t counter_type    ;   ///< Integral item counter type (size_t)
+            typedef atomics::atomic_size_t   atomic_type;   ///< atomic type used
+            typedef size_t counter_type;                    ///< Integral item counter type (size_t)
 
         private:
             //@cond
@@ -243,12 +243,24 @@ namespace cds {
                 return m_Counter.fetch_add( 1, order );
             }
 
+            /// Increments the counter. Semantics: postincrement
+            counter_type inc( counter_type count, atomics::memory_order order = atomics::memory_order_relaxed )
+            {
+                return m_Counter.fetch_add( count, order );
+            }
+
             /// Decrements the counter. Semantics: postdecrement
             counter_type dec(atomics::memory_order order = atomics::memory_order_relaxed)
             {
                 return m_Counter.fetch_sub( 1, order );
             }
 
+            /// Decrements the counter. Semantics: postdecrement
+            counter_type dec( counter_type count, atomics::memory_order order = atomics::memory_order_relaxed )
+            {
+                return m_Counter.fetch_sub( count, order );
+            }
+
             /// Preincrement
             counter_type operator ++()
             {
@@ -271,6 +283,18 @@ namespace cds {
                 return dec();
             }
 
+            /// Increment by \p count
+            counter_type operator +=( counter_type count )
+            {
+                return inc( count ) + count;
+            }
+
+            /// Decrement by \p count
+            counter_type operator -=( counter_type count )
+            {
+                return dec( count ) - count;
+            }
+
             /// Resets count to 0
             void reset(atomics::memory_order order = atomics::memory_order_relaxed)
             {
@@ -303,36 +327,62 @@ namespace cds {
             }
 
             /// Dummy increment. Always returns 0
-            static size_t inc(atomics::memory_order /*order*/ = atomics::memory_order_relaxed)
+            static counter_type inc(atomics::memory_order /*order*/ = atomics::memory_order_relaxed)
+            {
+                return 0;
+            }
+
+            /// Dummy increment. Always returns 0
+            static counter_type inc( counter_type /*count*/, atomics::memory_order /*order*/ = atomics::memory_order_relaxed )
+            {
+                return 0;
+            }
+
+            /// Dummy increment. Always returns 0
+            static counter_type dec(atomics::memory_order /*order*/ = atomics::memory_order_relaxed)
             {
                 return 0;
             }
 
             /// Dummy increment. Always returns 0
-            static size_t dec(atomics::memory_order /*order*/ = atomics::memory_order_relaxed)
+            static counter_type dec( counter_type /*count*/, atomics::memory_order /*order*/ = atomics::memory_order_relaxed )
             {
                 return 0;
             }
 
             /// Dummy pre-increment. Always returns 0
-            size_t operator ++() const
+            counter_type operator ++() const
             {
                 return 0;
             }
             /// Dummy post-increment. Always returns 0
-            size_t operator ++(int) const
+            counter_type operator ++(int) const
             {
                 return 0;
             }
 
             /// Dummy pre-decrement. Always returns 0
-            size_t operator --() const
+            counter_type operator --() const
             {
                 return 0;
             }
             /// Dummy post-decrement. Always returns 0
-            size_t operator --(int) const
+            counter_type operator --(int) const
+            {
+                return 0;
+            }
+
+            /// Dummy increment by \p count, always returns 0
+            counter_type operator +=( counter_type count )
+            {
+                CDS_UNUSED( count );
+                return 0;
+            }
+
+            /// Dummy decrement by \p count, always returns 0
+            counter_type operator -=( counter_type count )
             {
+                CDS_UNUSED( count );
                 return 0;
             }
 
index 69f016709304dda7128ad5ed33df4fee8487f3dc..a70ee5b9334b868f292273e0c0ed949774b7a2f3 100644 (file)
 #   define cds_unlikely( expr ) expr
 #endif
 
+#ifndef static_if
+#   define static_if  if
+#endif
+
 // Features
 #include <cds/compiler/feature_tsan.h>
 
index 4a06618a25c38a4f52a71263792e1e0fbafa56c9..c452e1b5916205f0c36eb001c3b04d218953c5d9 100644 (file)
@@ -65,9 +65,9 @@ namespace cds { namespace container {
                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
                 If \p T is a complex type, \p value_cleaner may be useful feature.
 
-                Default value is \ref opt::v::destruct_cleaner
+                Default value is \ref opt::v::auto_cleaner
             */
-            typedef cds::opt::v::destruct_cleaner value_cleaner;
+            typedef cds::opt::v::auto_cleaner value_cleaner;
 
             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
             typedef cds::atomicity::empty_item_counter item_counter;
@@ -105,7 +105,7 @@ namespace cds { namespace container {
                 The functor calls the destructor for queue item.
                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
                 If \p T is a complex type, \p value_cleaner can be an useful feature.
-                Default value is \ref opt::v::destruct_cleaner
+                Default value is \ref opt::v::auto_cleaner
             - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
                 To enable item counting use \p cds::atomicity::item_counter
@@ -315,7 +315,7 @@ namespace cds { namespace container {
             return enqueue_with( [&val]( value_type& dest ) { new (&dest) value_type( std::move( val ));});
         }
 
-        /// Synonym for \p enqueue( valuetype const& )
+        /// Synonym for \p enqueue( value_type const& )
         bool push( value_type const& data )
         {
             return enqueue( data );
diff --git a/cds/container/weak_ringbuffer.h b/cds/container/weak_ringbuffer.h
new file mode 100644 (file)
index 0000000..17b249b
--- /dev/null
@@ -0,0 +1,570 @@
+/*
+    This file is a part of libcds - Concurrent Data Structures library
+
+    (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
+
+    Source code repo: http://github.com/khizmax/libcds/
+    Download: http://sourceforge.net/projects/libcds/files/
+
+    Redistribution and use in source and binary forms, with or without
+    modification, are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright notice, this
+      list of conditions and the following disclaimer.
+
+    * Redistributions in binary form must reproduce the above copyright notice,
+      this list of conditions and the following disclaimer in the documentation
+      and/or other materials provided with the distribution.
+
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+    AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+    IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+    FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+    SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+    OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
+#define CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
+
+#include <cds/container/details/base.h>
+#include <cds/opt/buffer.h>
+#include <cds/opt/value_cleaner.h>
+#include <cds/algo/atomic.h>
+#include <cds/details/bounded_container.h>
+
+namespace cds { namespace container {
+
+    /// \p WeakRingBuffer related definitions
+    /** @ingroup cds_nonintrusive_helper
+    */
+    namespace weak_ringbuffer {
+
+        /// \p WeakRingBuffer default traits
+        struct traits {
+            /// Buffer type for internal array
+            /*
+                The type of element for the buffer is not important: \p WeakRingBuffer rebind
+                the buffer for required type via \p rebind metafunction.
+
+                For \p WeakRingBuffer the buffer size should have power-of-2 size.
+
+                You should use only uninitialized buffer for the ring buffer -
+                \p cds::opt::v::uninitialized_dynamic_buffer (the default),
+                \p cds::opt::v::uninitialized_static_buffer.
+            */
+            typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
+
+            /// A functor to clean item dequeued.
+            /**
+                The functor calls the destructor for popped element.
+                After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
+                If \p T is a complex type, \p value_cleaner may be useful feature.
+                For POD types \ref opt::v::empty_cleaner is suitable
+
+                Default value is \ref opt::v::auto_cleaner that calls destructor only if it is not trivial.
+            */
+            typedef cds::opt::v::auto_cleaner value_cleaner;
+
+            /// C++ memory ordering model
+            /**
+                Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
+                or \p opt::v::sequential_consistent (sequentially consistent memory model).
+            */
+            typedef opt::v::relaxed_ordering    memory_model;
+
+            /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
+            enum { padding = opt::cache_line_padding };
+        };
+
+        /// Metafunction converting option list to \p weak_ringbuffer::traits
+        /**
+            Supported \p Options are:
+            - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
+                \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
+                element in the buffer is not important: it will be changed via \p rebind metafunction.
+            - \p opt::value_cleaner - a functor to clean items dequeued.
+                The functor calls the destructor for ring-buffer item.
+                After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
+                If \p T is a complex type, \p value_cleaner can be an useful feature.
+                Default value is \ref opt::v::empty_cleaner that is suitable for POD types.
+            - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
+            - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
+                or \p opt::v::sequential_consistent (sequentially consisnent memory model).
+
+            Example: declare \p %WeakRingBuffer with static iternal buffer of size 1024:
+            \code
+            typedef cds::container::WeakRingBuffer< Foo,
+                typename cds::container::weak_ringbuffer::make_traits<
+                    cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >
+                >::type
+            > myRing;
+            \endcode
+        */
+        template <typename... Options>
+        struct make_traits {
+#   ifdef CDS_DOXYGEN_INVOKED
+            typedef implementation_defined type;   ///< Metafunction result
+#   else
+            typedef typename cds::opt::make_options<
+                typename cds::opt::find_type_traits< traits, Options... >::type
+                , Options...
+            >::type type;
+#   endif
+        };
+
+    } // namespace weak_ringbuffer
+
+    /// Single-producer single-consumer ring buffer
+    /** @ingroup cds_nonintrusive_queue
+        Source: [2013] Nhat Minh Le, Adrien Guatto, Albert Cohen, Antoniu Pop. Correct and Effcient Bounded
+            FIFO Queues. [Research Report] RR-8365, INRIA. 2013. <hal-00862450>
+
+        Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
+        you can push/pop an array of elements.
+    */
+    template <typename T, typename Traits = weak_ringbuffer::traits>
+    class WeakRingBuffer: public cds::bounded_container
+    {
+    public:
+        typedef T value_type;   ///< Value type to be stored in the ring buffer
+        typedef Traits traits;  ///< Ring buffer traits
+        typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
+        typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p weak_ringbuffer::traits::value_cleaner
+
+        /// Rebind template arguments
+        template <typename T2, typename Traits2>
+        struct rebind {
+            typedef WeakRingBuffer< T2, Traits2 > other;   ///< Rebinding result
+        };
+
+        //@cond
+        // Only for tests
+        typedef size_t item_counter;
+        //@endcond
+
+    private:
+        //@cond
+        typedef typename traits::buffer::template rebind< value_type >::other buffer;
+        //@endcond
+
+    public:
+
+        /// Creates the ring buffer of \p capacity
+        /**
+            For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
+
+            If the buffer capacity is a power of two, lightweight binary arithmetics is used
+            instead of modulo arithmetics.
+        */
+        WeakRingBuffer( size_t capacity = 0 )
+            : front_( 0 )
+            , pfront_( 0 )
+            , cback_( 0 )
+            , buffer_( capacity )
+        {
+            back_.store( 0, memory_model::memory_order_release );
+        }
+
+        /// Destroys the ring buffer
+        ~WeakRingBuffer()
+        {
+            value_cleaner cleaner;
+            size_t back = back_.load( memory_model::memory_order_relaxed );
+            for ( size_t front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
+                cleaner( buffer_[ buffer_.mod( front ) ] );
+        }
+
+        /// Batch push - push array \p arr of size \p count
+        /**
+            \p CopyFunc is a per-element copy functor: for each element of \p arr
+            <tt>copy( dest, arr[i] )</tt> is called.
+            The \p CopyFunc signature:
+            \code
+                void copy_func( value_type& element, Q const& source );
+            \endcode
+            Here \p element is uninitialized so you should construct it using placement new
+            if needed; for example, if the element type is \p str::string and \p Q is <tt>char const*</tt>,
+            \p copy functor can be:
+            \code
+            cds::container::WeakRingBuffer<std::string> ringbuf;
+            char const* arr[10];
+            ringbuf.push( arr, 10, 
+                []( std::string& element, char const* src ) {
+                    new( &element ) std::string( src );
+                });
+            \endcode
+            You may use move semantics if appropriate:
+            \code
+            cds::container::WeakRingBuffer<std::string> ringbuf;
+            std::string arr[10];
+            ringbuf.push( arr, 10,
+                []( std::string& element, std:string& src ) {
+                    new( &element ) std::string( std::move( src ));
+                });
+            \endcode
+
+            Returns \p true if success or \p false if not enought sufficient space in the ring
+        */
+        template <typename Q, typename CopyFunc>
+        bool push( Q* arr, size_t count, CopyFunc copy )
+        {
+            assert( count < capacity() );
+            size_t back = back_.load( memory_model::memory_order_relaxed );
+
+            assert( back - pfront_ <= capacity() );
+
+            if ( pfront_ + capacity() - back < count ) {
+                pfront_ = front_.load( memory_model::memory_order_acquire );
+
+                if ( pfront_ + capacity() - back < count ) {
+                    // not enought space
+                    return false;
+                }
+            }
+
+            // copy data
+            for ( size_t i = 0; i < count; ++i, ++back )
+                copy( buffer_[buffer_.mod( back )], arr[i] );
+
+            back_.store( back, memory_model::memory_order_release );
+
+            return true;
+        }
+
+        /// Batch push - push array \p arr of size \p count with assignment as copy functor
+        /**
+            This function is equivalent for:
+            \code
+            push( arr, count, []( value_type& dest, Q const& src ) { dest = src; } );
+            \endcode
+
+            The function is available only if <tt>std::is_constructible<value_type, Q>::value</tt>
+            is \p true.
+
+            Returns \p true if success or \p false if not enought sufficient space in the ring
+        */
+        template <typename Q>
+        typename std::enable_if< std::is_constructible<value_type, Q>::value, bool>::type
+        push( Q* arr, size_t count )
+        {
+            return push( arr, count, []( value_type& dest, Q const& src ) { new( &dest ) value_type( src ); } );
+        }
+
+        /// Push one element created from \p args
+        /**
+            The function is available only if <tt>std::is_constructible<value_type, Args...>::value</tt>
+            is \p true.
+
+            Returns \p false if the ring is full or \p true otherwise.
+        */
+        template <typename... Args>
+        typename std::enable_if< std::is_constructible<value_type, Args...>::value, bool>::type
+        emplace( Args&&... args )
+        {
+            size_t back = back_.load( memory_model::memory_order_relaxed );
+
+            assert( back - pfront_ <= capacity() );
+
+            if ( pfront_ + capacity() - back < 1 ) {
+                pfront_ = front_.load( memory_model::memory_order_acquire );
+
+                if ( pfront_ + capacity() - back < 1 ) {
+                    // not enought space
+                    return false;
+                }
+            }
+
+            new( &buffer_[buffer_.mod( back )] ) value_type( std::forward<Args>(args)... );
+
+            back_.store( back + 1, memory_model::memory_order_release );
+
+            return true;
+        }
+
+        /// Enqueues data to the ring using a functor
+        /**
+            \p Func is a functor called to copy a value to the ring element.
+            The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
+            \code
+            cds::container::WeakRingBuffer< Foo > myRing;
+            Bar bar;
+            myRing.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
+            \endcode
+        */
+        template <typename Func>
+        bool enqueue_with( Func f )
+        {
+            size_t back = back_.load( memory_model::memory_order_relaxed );
+
+            assert( back - pfront_ <= capacity() );
+
+            if ( pfront_ + capacity() - back < 1 ) {
+                pfront_ = front_.load( memory_model::memory_order_acquire );
+
+                if ( pfront_ + capacity() - back < 1 ) {
+                    // not enought space
+                    return false;
+                }
+            }
+
+            f( buffer_[buffer_.mod( back )] );
+
+            back_.store( back + 1, memory_model::memory_order_release );
+
+            return true;
+
+        }
+
+        /// Enqueues \p val value into the queue.
+        /**
+            The new queue item is created by calling placement new in free cell.
+            Returns \p true if success, \p false if the ring is full.
+        */
+        bool enqueue( value_type const& val )
+        {
+            return emplace( val );
+        }
+
+        /// Enqueues \p val value into the queue, move semantics
+        bool enqueue( value_type&& val )
+        {
+            return emplace( std::move( val ));
+        }
+
+        /// Synonym for \p enqueue( value_type const& )
+        bool push( value_type const& val )
+        {
+            return enqueue( val );
+        }
+
+        /// Synonym for \p enqueue( value_type&& )
+        bool push( value_type&& val )
+        {
+            return enqueue( std::move( val ));
+        }
+
+        /// Synonym for \p enqueue_with()
+        template <typename Func>
+        bool push_with( Func f )
+        {
+            return enqueue_with( f );
+        }
+
+        /// Batch pop \p count element from the ring buffer into \p arr
+        /**
+            \p CopyFunc is a per-element copy functor: for each element of \p arr
+            <tt>copy( arr[i], source )</tt> is called.
+            The \p CopyFunc signature:
+            \code
+            void copy_func( Q& dest, value_type& elemen );
+            \endcode
+
+            Returns \p true if success or \p false if not enought sufficient space in the ring
+        */
+        template <typename Q, typename CopyFunc>
+        bool pop( Q* arr, size_t count, CopyFunc copy )
+        {
+            assert( count < capacity() );
+
+            size_t front = front_.load( memory_model::memory_order_relaxed );
+            assert( cback_ - front < capacity() );
+
+            if ( cback_ - front < count ) {
+                cback_ = back_.load( memory_model::memory_order_acquire );
+                if ( cback_ - front < count )
+                    return false;
+            }
+
+            // copy data
+            value_cleaner cleaner;
+            for ( size_t i = 0; i < count; ++i, ++front ) {
+                value_type& val = buffer_[buffer_.mod( front )];
+                copy( arr[i], val );
+                cleaner( val );
+            }
+
+            front_.store( front, memory_model::memory_order_release );
+            return true;
+        }
+
+        /// Batch pop - push array \p arr of size \p count with assignment as copy functor
+        /**
+            This function is equivalent for:
+            \code
+            pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
+            \endcode
+
+            The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
+            is \p true.
+
+            Returns \p true if success or \p false if not enought sufficient space in the ring
+        */
+        template <typename Q>
+        typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
+        pop( Q* arr, size_t count )
+        {
+            return pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
+        }
+
+        /// Dequeues an element from the ring to \p val
+        /**
+            The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
+            is \p true.
+
+            Returns \p false if the ring is full or \p true otherwise.
+        */
+        template <typename Q>
+        typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
+        dequeue( Q& val )
+        {
+            return pop( &val, 1 );
+        }
+
+        /// Synonym for \p dequeue( Q& )
+        template <typename Q>
+        typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
+        pop( Q& val )
+        {
+            return dequeue( val );
+        }
+
+        /// Dequeues a value using a functor
+        /**
+            \p Func is a functor called to copy dequeued value.
+            The functor takes one argument - a reference to removed node:
+            \code
+            cds:container::WeakRingBuffer< Foo > myRing;
+            Bar bar;
+            myRing.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
+            \endcode
+
+            Returns \p true if the ring is not empty, \p false otherwise.
+            The functor is called only if the ring is not empty.
+        */
+        template <typename Func>
+        bool dequeue_with( Func f )
+        {
+            size_t front = front_.load( memory_model::memory_order_relaxed );
+            assert( cback_ - front < capacity() );
+
+            if ( cback_ - front < 1 ) {
+                cback_ = back_.load( memory_model::memory_order_acquire );
+                if ( cback_ - front < 1 )
+                    return false;
+            }
+
+            value_type& val = buffer_[buffer_.mod( front )];
+            f( val );
+            value_cleaner()( val );
+
+            front_.store( front + 1, memory_model::memory_order_release );
+            return true;
+        }
+
+        /// Synonym for \p dequeue_with()
+        template <typename Func>
+        bool pop_with( Func f )
+        {
+            return dequeue_with( f );
+        }
+
+        /// Gets pointer to first element of ring buffer
+        /**
+            If the ring buffer is empty, returns \p nullptr
+
+            The function is thread-safe since there is only one consumer.
+            Recall, \p WeakRingBuffer is single-producer/single consumer container.
+        */
+        value_type* front()
+        {
+            size_t front = front_.load( memory_model::memory_order_relaxed );
+            assert( cback_ - front < capacity() );
+
+            if ( cback_ - front < 1 ) {
+                cback_ = back_.load( memory_model::memory_order_acquire );
+                if ( cback_ - front < 1 )
+                    return nullptr;
+            }
+
+            return &buffer_[buffer_.mod( front )];
+        }
+
+        /// Removes front element of ring-buffer
+        /**
+            If the ring-buffer is empty, returns \p false.
+            Otherwise, pops the first element from the ring.
+        */
+        bool pop_front()
+        {
+            size_t front = front_.load( memory_model::memory_order_relaxed );
+            assert( cback_ - front <= capacity() );
+
+            if ( cback_ - front < 1 ) {
+                cback_ = back_.load( memory_model::memory_order_acquire );
+                if ( cback_ - front < 1 )
+                    return false;
+            }
+
+            // clean cell
+            value_cleaner()( buffer_[buffer_.mod( front )] );
+
+            front_.store( front + 1, memory_model::memory_order_release );
+            return true;
+        }
+
+        /// Clears the ring buffer
+        void clear()
+        {
+            value_type v;
+            while ( pop( v ) );
+        }
+
+
+        /// Checks if the ring-buffer is empty
+        bool empty() const
+        {
+            return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
+        }
+
+        /// Checks if the ring-buffer is full
+        bool full() const
+        {
+            return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
+        }
+
+        /// Returns the current size of ring buffer
+        size_t size() const
+        {
+            return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
+        }
+
+        /// Returns capacity of the ring buffer
+        size_t capacity() const
+        {
+            return buffer_.capacity();
+        }
+
+    private:
+        //@cond
+        atomics::atomic<size_t>     front_;
+        typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
+        atomics::atomic<size_t>     back_;
+        typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
+        size_t                      pfront_;
+        typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
+        size_t                      cback_;
+        typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
+
+        buffer                      buffer_;
+        //@endcond
+    };
+
+}} // namespace cds::container
+
+
+#endif // #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
index ca3cb8748d539bd5cfd3657bd3ff4b1a52f9450b..90d79478cfe85e5b59d4076f6b07a6a13bc2e4c3 100644 (file)
@@ -173,6 +173,19 @@ namespace cds { namespace opt {
             {
                 return &( m_buffer[0].v );
             }
+
+            /// Returns <tt> idx % capacity() </tt>
+            /**
+            If the buffer size is a power of two, binary arithmethics is used
+            instead of modulo arithmetics
+            */
+            size_t mod( size_t idx )
+            {
+                static_if( c_bExp2 )
+                    return idx & ( capacity() - 1 );
+                else
+                    return idx % capacity();
+            }
         };
 
         /// Static initialized buffer
@@ -265,6 +278,19 @@ namespace cds { namespace opt {
             {
                 return m_buffer;
             }
+
+            /// Returns <tt> idx % capacity() </tt>
+            /**
+            If the buffer size is a power of two, binary arithmethics is used
+            instead of modulo arithmetics
+            */
+            size_t mod( size_t idx )
+            {
+                static_if( c_bExp2 )
+                    return idx & ( capacity() - 1 );
+                else
+                    return idx % capacity();
+            }
         };
 
         /// Dynamically allocated uninitialized buffer
@@ -367,6 +393,19 @@ namespace cds { namespace opt {
             {
                 return m_buffer;
             }
+
+            /// Returns <tt> idx % capacity() </tt>
+            /**
+                If the buffer size is a power of two, binary arithmethics is used
+                instead of modulo arithmetics
+            */
+            size_t mod( size_t idx )
+            {
+                static_if ( c_bExp2 )
+                    return idx & ( capacity() - 1 );
+                else
+                    return idx % capacity();
+            }
         };
 
 
@@ -471,6 +510,19 @@ namespace cds { namespace opt {
             {
                 return m_buffer;
             }
+
+            /// Returns <tt> idx % capacity() </tt>
+            /**
+            If the buffer size is a power of two, binary arithmethics is used
+            instead of modulo arithmetics
+            */
+            size_t mod( size_t idx )
+            {
+                static_if( c_bExp2 )
+                    return idx & ( capacity() - 1 );
+                else
+                    return idx % capacity();
+            }
         };
 
     }   // namespace v
index b2a7112ed5c25a639c19bbd37977a7283408ef6b..81e79b16037696caf6df90254fd3ec4e8b23e6dd 100644 (file)
@@ -48,13 +48,15 @@ namespace cds { namespace opt {
             void operator ()( value_type& val )
             {
                 ...
-                // code to cleanup \p val
+                // code to cleanup val
             }
         }
         \endcode
 
         Predefined option types:
             \li opt::v::empty_cleaner
+            \li opt::v::destruct_cleaner
+            \li opt::v::auto_cleaner
     */
     template <typename Type>
     struct value_cleaner {
@@ -92,7 +94,32 @@ namespace cds { namespace opt {
             template <typename T>
             void operator()( T& val )
             {
-                (&val)->T::~T();
+                ( &val )->~T();
+            }
+            //@endcond
+        };
+
+        /// Cleaner that calls \p T destructor if it is not trivial
+        /**
+            If \p T has non-trivial destructor (<tt> std::is_trivially_destructible<T>::value </tt> is \p false),
+            the cleaner calls \p T destructor.
+
+            If <tt> std::is_trivially_destructible<T>::value </tt> is \p true, the cleaner is empty - no
+            destructor of \p T is called.
+        */
+        struct auto_cleaner
+        {
+            //@cond
+            template <typename T>
+            typename std::enable_if< std::is_trivially_destructible<T>::value >::type
+            operator()( T& /*val*/ )
+            {}
+
+            template <typename T>
+            typename std::enable_if< !std::is_trivially_destructible<T>::value >::type
+            operator()( T& val )
+            {
+                ( &val )->~T();
             }
             //@endcond
         };
index 18ff94056a8393598db7b30e73032ec02b14fa56..a74adfc68bdc9eeb091a83ae15bde69a4a72b4ca 100644 (file)
     <ClInclude Include="..\..\..\cds\container\striped_set\std_list.h" />
     <ClInclude Include="..\..\..\cds\container\striped_set\std_set.h" />
     <ClInclude Include="..\..\..\cds\container\striped_set\std_vector.h" />
+    <ClInclude Include="..\..\..\cds\container\weak_ringbuffer.h" />
     <ClInclude Include="..\..\..\cds\details\binary_functor_wrapper.h" />
     <ClInclude Include="..\..\..\cds\details\bit_reverse_counter.h" />
     <ClInclude Include="..\..\..\cds\details\bounded_container.h" />
index 305825bf430ed91fc6e559fa166281c56fdc7439..137444c8c7c6978afc1d86947c0b101e7ba47d1a 100644 (file)
     <ClInclude Include="..\..\..\cds\algo\bit_reversal.h">
       <Filter>Header Files\cds\algo</Filter>
     </ClInclude>
+    <ClInclude Include="..\..\..\cds\container\weak_ringbuffer.h">
+      <Filter>Header Files\cds\container</Filter>
+    </ClInclude>
   </ItemGroup>
 </Project>
\ No newline at end of file
index eeaec107326a518c1a8802ab0ca54acca919e1de..6a1efb9f63eaf23e61b31f6dc2fc21c911c2559a 100644 (file)
@@ -55,6 +55,7 @@
     </ClCompile>
     <ClCompile Include="..\..\..\test\unit\queue\segmented_queue_hp.cpp" />
     <ClCompile Include="..\..\..\test\unit\queue\vyukov_mpmc_queue.cpp" />
+    <ClCompile Include="..\..\..\test\unit\queue\weak_ringbuffer.cpp" />
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="..\..\..\test\unit\queue\test_bounded_queue.h" />
index 82aefaa424a4c70570564f01ba2e44815cb1493d..692356f3ee18907ff887fb1faf1ba23496339dab 100644 (file)
@@ -89,6 +89,9 @@
     <ClCompile Include="..\..\..\test\unit\queue\intrusive_vyukov_queue.cpp">
       <Filter>Source Files</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\..\test\unit\queue\weak_ringbuffer.cpp">
+      <Filter>Source Files</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="..\..\..\test\unit\queue\test_generic_queue.h">
index 12fd8cf85fb257446475c1d5dd237e7dc6b16d8d..2e02f218091f2289843133a1b9fe1f1b7f69a5a4 100644 (file)
@@ -40,6 +40,7 @@
 #include <cds/container/fcqueue.h>
 #include <cds/container/fcdeque.h>
 #include <cds/container/segmented_queue.h>
+#include <cds/container/weak_ringbuffer.h>
 
 #include <cds/gc/hp.h>
 #include <cds/gc/dhp.h>
@@ -293,6 +294,29 @@ namespace fc_details{
         };
 
 
+        // WeakRingBuffer
+        struct traits_WeakRingBuffer_dyn: public cds::container::weak_ringbuffer::traits
+        {
+            typedef cds::opt::v::uninitialized_dynamic_buffer< int > buffer;
+        };
+        class WeakRingBuffer_dyn
+            : public cds::container::WeakRingBuffer< Value, traits_WeakRingBuffer_dyn >
+        {
+            typedef cds::container::WeakRingBuffer< Value, traits_WeakRingBuffer_dyn > base_class;
+        public:
+            WeakRingBuffer_dyn()
+                : base_class( 1024 * 64 )
+            {}
+            WeakRingBuffer_dyn( size_t nCapacity )
+                : base_class( nCapacity )
+            {}
+
+            cds::opt::none statistics() const
+            {
+                return cds::opt::none();
+            }
+        };
+
         // BasketQueue
 
         typedef cds::container::BasketQueue< cds::gc::HP , Value > BasketQueue_HP;
@@ -790,6 +814,9 @@ namespace cds_test {
     CDSSTRESS_Queue_F( test_fixture, VyukovMPMCCycleQueue_dyn       ) \
     CDSSTRESS_Queue_F( test_fixture, VyukovMPMCCycleQueue_dyn_ic    )
 
+#define CDSSTRESS_WeakRingBuffer( test_fixture ) \
+    CDSSTRESS_Queue_F( test_fixture, WeakRingBuffer_dyn       )
+
 #define CDSSTRESS_StdQueue( test_fixture ) \
     CDSSTRESS_Queue_F( test_fixture, StdQueue_deque_Spinlock ) \
     CDSSTRESS_Queue_F( test_fixture, StdQueue_list_Spinlock  ) \
index 626ef727e4720c301ab0498437d10b8bd7486237..aea3e81f434c207a928f21847c76558b349202b7 100644 (file)
@@ -15,6 +15,7 @@ set(CDSGTEST_QUEUE_SOURCES
     segmented_queue_hp.cpp
     segmented_queue_dhp.cpp
     vyukov_mpmc_queue.cpp
+    weak_ringbuffer.cpp
     intrusive_basket_queue_hp.cpp
     intrusive_basket_queue_dhp.cpp
     intrusive_fcqueue.cpp
index f41799636c40fa1f525be42a50687aad062cf505..a1657b1b2dc49155c5dbebcea4e3ddd15b841c67 100644 (file)
@@ -57,6 +57,7 @@ namespace cds_test {
             }
             ASSERT_FALSE( q.empty());
             ASSERT_CONTAINER_SIZE( q, nSize );
+            ASSERT_FALSE( q.enqueue( static_cast<value_type>( nSize ) * 2 ) );
 
             for ( size_t i = 0; i < nSize; ++i ) {
                 it = -1;
diff --git a/test/unit/queue/weak_ringbuffer.cpp b/test/unit/queue/weak_ringbuffer.cpp
new file mode 100644 (file)
index 0000000..64d97b6
--- /dev/null
@@ -0,0 +1,255 @@
+/*
+    This file is a part of libcds - Concurrent Data Structures library
+
+    (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
+
+    Source code repo: http://github.com/khizmax/libcds/
+    Download: http://sourceforge.net/projects/libcds/files/
+
+    Redistribution and use in source and binary forms, with or without
+    modification, are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright notice, this
+    list of conditions and the following disclaimer.
+
+    * Redistributions in binary form must reproduce the above copyright notice,
+    this list of conditions and the following disclaimer in the documentation
+    and/or other materials provided with the distribution.
+
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+    AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+    IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+    FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+    SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+    OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#include "test_bounded_queue.h"
+
+#include <cds/container/weak_ringbuffer.h>
+
+namespace {
+    namespace cc = cds::container;
+
+    class WeakRingBuffer: public cds_test::bounded_queue
+    {
+    public:
+        template <typename Queue>
+        void test_array( Queue& q )
+        {
+            typedef typename Queue::value_type value_type;
+
+            const size_t nSize = q.capacity();
+            static const size_t nArrSize = 16;
+            const size_t nArrCount = nSize / nArrSize;
+
+            {
+                value_type el[nArrSize];
+
+                // batch push
+                for ( size_t i = 0; i < nSize; i += nArrSize ) {
+                    for ( size_t k = 0; k < nArrSize; ++k )
+                        el[k] = static_cast<value_type>( i + k );
+
+                    if ( i + nArrSize <= nSize ) {
+                        ASSERT_TRUE( q.push( el, nArrSize ) );
+                    }
+                    else {
+                        ASSERT_FALSE( q.push( el, nArrSize ) );
+                    }
+                }
+
+                ASSERT_TRUE( !q.empty() );
+                if ( nSize % nArrSize != 0 ) {
+                    ASSERT_FALSE( q.full() );
+                    ASSERT_CONTAINER_SIZE( q, nArrCount * nArrSize );
+                    for ( size_t i = nArrCount * nArrSize; i < nSize; ++i ) {
+                        ASSERT_TRUE( q.enqueue( static_cast<value_type>( i ) ) );
+                    }
+                }
+                ASSERT_TRUE( q.full() );
+                ASSERT_CONTAINER_SIZE( q, nSize );
+
+                // batch pop
+                value_type expected = 0;
+                while ( q.pop( el, nArrSize ) ) {
+                    for ( size_t i = 0; i < nArrSize; ++i ) {
+                        ASSERT_EQ( el[i], expected );
+                        ++expected;
+                    }
+                }
+
+                if ( nSize % nArrSize == 0 ) {
+                    ASSERT_TRUE( q.empty() );
+                }
+                else {
+                    ASSERT_FALSE( q.empty() );
+                    ASSERT_CONTAINER_SIZE( q, nSize % nArrSize );
+                    q.clear();
+                }
+                ASSERT_TRUE( q.empty() );
+                ASSERT_FALSE( q.full() );
+                ASSERT_CONTAINER_SIZE( q, 0u );
+            }
+
+            {
+                // batch push with functor
+                size_t el[nArrSize];
+
+                auto func_push = []( value_type& dest, size_t src ) { dest = static_cast<value_type>( src * 10 ); };
+                for ( size_t i = 0; i < nSize; i += nArrSize ) {
+                    for ( size_t k = 0; k < nArrSize; ++k )
+                        el[k] = i + k;
+
+                    if ( i + nArrSize <= nSize ) {
+                        ASSERT_TRUE( q.push( el, nArrSize, func_push ) );
+                    }
+                    else {
+                        ASSERT_FALSE( q.push( el, nArrSize, func_push ) );
+                    }
+                }
+
+                ASSERT_TRUE( !q.empty() );
+                if ( nSize % nArrSize != 0 ) {
+                    ASSERT_FALSE( q.full() );
+                    ASSERT_CONTAINER_SIZE( q, nArrCount * nArrSize );
+                    for ( size_t i = nArrCount * nArrSize; i < nSize; ++i ) {
+                        ASSERT_TRUE( q.push( &i, 1, func_push ) );
+                    }
+                }
+                ASSERT_TRUE( q.full() );
+                ASSERT_CONTAINER_SIZE( q, nSize );
+
+                // batch pop with functor
+                auto func_pop = []( size_t& dest, value_type src ) { dest = static_cast<size_t>( src / 10 ); };
+                size_t expected = 0;
+                while ( q.pop( el, nArrSize, func_pop ) ) {
+                    for ( size_t i = 0; i < nArrSize; ++i ) {
+                        ASSERT_EQ( el[i], expected );
+                        ++expected;
+                    }
+                }
+
+                if ( nSize % nArrSize == 0 ) {
+                    ASSERT_TRUE( q.empty() );
+                }
+                else {
+                    ASSERT_FALSE( q.empty() );
+                    ASSERT_CONTAINER_SIZE( q, nSize % nArrSize );
+                    size_t v;
+                    while ( q.pop( &v, 1, func_pop ) ) {
+                        ASSERT_EQ( v, expected );
+                        ++expected;
+                    }
+                }
+                ASSERT_TRUE( q.empty() );
+                ASSERT_FALSE( q.full() );
+                ASSERT_CONTAINER_SIZE( q, 0u );
+
+                // front/pop_front
+                for ( size_t i = 0; i < nSize; i += nArrSize ) {
+                    for ( size_t k = 0; k < nArrSize; ++k )
+                        el[k] = i + k;
+
+                    if ( i + nArrSize <= nSize ) {
+                        ASSERT_TRUE( q.push( el, nArrSize, func_push ) );
+                    }
+                    else {
+                        ASSERT_FALSE( q.push( el, nArrSize, func_push ) );
+                    }
+                }
+
+                ASSERT_TRUE( !q.empty() );
+                if ( nSize % nArrSize != 0 ) {
+                    ASSERT_FALSE( q.full() );
+                    ASSERT_CONTAINER_SIZE( q, nArrCount * nArrSize );
+                    for ( size_t i = nArrCount * nArrSize; i < nSize; ++i ) {
+                        ASSERT_TRUE( q.push( &i, 1, func_push ) );
+                    }
+                }
+                ASSERT_TRUE( q.full() );
+                ASSERT_CONTAINER_SIZE( q, nSize );
+
+                value_type cur = 0;
+                while ( !q.empty() ) {
+                    value_type* front = q.front();
+                    ASSERT_TRUE( front != nullptr );
+                    ASSERT_EQ( cur, *front );
+                    ASSERT_TRUE( q.pop_front() );
+                    cur += 10;
+                }
+
+                ASSERT_TRUE( q.empty() );
+                ASSERT_TRUE( q.front() == nullptr );
+                ASSERT_FALSE( q.pop_front() );
+            }
+        }
+    };
+
+    TEST_F( WeakRingBuffer, defaulted )
+    {
+        typedef cds::container::WeakRingBuffer< int > test_queue;
+
+        test_queue q( 128 );
+        test( q );
+        test_array( q );
+    }
+
+    TEST_F( WeakRingBuffer, stat )
+    {
+        struct traits: public cds::container::weak_ringbuffer::traits
+        {
+            typedef cds::opt::v::uninitialized_static_buffer<int, 128> buffer;
+        };
+        typedef cds::container::WeakRingBuffer< int, traits > test_queue;
+
+        test_queue q;
+        test( q );
+        test_array( q );
+    }
+
+    TEST_F( WeakRingBuffer, dynamic )
+    {
+        struct traits: public cds::container::weak_ringbuffer::traits
+        {
+            typedef cds::opt::v::uninitialized_dynamic_buffer<int> buffer;
+        };
+        typedef cds::container::WeakRingBuffer< int, traits > test_queue;
+
+        test_queue q( 128 );
+        test( q );
+        test_array( q );
+    }
+
+    TEST_F( WeakRingBuffer, dynamic_mod )
+    {
+        struct traits: public cds::container::weak_ringbuffer::traits
+        {
+            typedef cds::opt::v::uninitialized_dynamic_buffer<int, CDS_DEFAULT_ALLOCATOR, false> buffer;
+        };
+        typedef cds::container::WeakRingBuffer< int, traits > test_queue;
+
+        test_queue q( 100 );
+        test( q );
+        test_array( q );
+    }
+
+    TEST_F( WeakRingBuffer, dynamic_padding )
+    {
+        struct traits: public cds::container::weak_ringbuffer::traits
+        {
+            typedef cds::opt::v::uninitialized_dynamic_buffer<int> buffer;
+            enum { padding = 32 };
+        };
+        typedef cds::container::WeakRingBuffer< int, traits > test_queue;
+
+        test_queue q( 128 );
+        test( q );
+        test_array( q );
+    }
+
+} // namespace
\ No newline at end of file