From: khizmax Date: Tue, 9 May 2017 08:04:54 +0000 (+0300) Subject: Added WeakRingBuffer - a single-producer/single-consumer queue based on ring buffer X-Git-Tag: v2.3.0~40 X-Git-Url: http://plrg.eecs.uci.edu/git/?p=libcds.git;a=commitdiff_plain;h=d15c6730bbd5f63f903496277939b7fe18ac9853 Added WeakRingBuffer - a single-producer/single-consumer queue based on ring buffer --- diff --git a/cds/algo/atomic.h b/cds/algo/atomic.h index c27f7033..56b54ef7 100644 --- a/cds/algo/atomic.h +++ b/cds/algo/atomic.h @@ -199,8 +199,8 @@ namespace cds { class item_counter { public: - typedef atomics::atomic_size_t atomic_type ; ///< atomic type used - typedef size_t counter_type ; ///< Integral item counter type (size_t) + typedef atomics::atomic_size_t atomic_type; ///< atomic type used + typedef size_t counter_type; ///< Integral item counter type (size_t) private: //@cond @@ -243,12 +243,24 @@ namespace cds { return m_Counter.fetch_add( 1, order ); } + /// Increments the counter. Semantics: postincrement + counter_type inc( counter_type count, atomics::memory_order order = atomics::memory_order_relaxed ) + { + return m_Counter.fetch_add( count, order ); + } + /// Decrements the counter. Semantics: postdecrement counter_type dec(atomics::memory_order order = atomics::memory_order_relaxed) { return m_Counter.fetch_sub( 1, order ); } + /// Decrements the counter. Semantics: postdecrement + counter_type dec( counter_type count, atomics::memory_order order = atomics::memory_order_relaxed ) + { + return m_Counter.fetch_sub( count, order ); + } + /// Preincrement counter_type operator ++() { @@ -271,6 +283,18 @@ namespace cds { return dec(); } + /// Increment by \p count + counter_type operator +=( counter_type count ) + { + return inc( count ) + count; + } + + /// Decrement by \p count + counter_type operator -=( counter_type count ) + { + return dec( count ) - count; + } + /// Resets count to 0 void reset(atomics::memory_order order = atomics::memory_order_relaxed) { @@ -303,36 +327,62 @@ namespace cds { } /// Dummy increment. Always returns 0 - static size_t inc(atomics::memory_order /*order*/ = atomics::memory_order_relaxed) + static counter_type inc(atomics::memory_order /*order*/ = atomics::memory_order_relaxed) + { + return 0; + } + + /// Dummy increment. Always returns 0 + static counter_type inc( counter_type /*count*/, atomics::memory_order /*order*/ = atomics::memory_order_relaxed ) + { + return 0; + } + + /// Dummy increment. Always returns 0 + static counter_type dec(atomics::memory_order /*order*/ = atomics::memory_order_relaxed) { return 0; } /// Dummy increment. Always returns 0 - static size_t dec(atomics::memory_order /*order*/ = atomics::memory_order_relaxed) + static counter_type dec( counter_type /*count*/, atomics::memory_order /*order*/ = atomics::memory_order_relaxed ) { return 0; } /// Dummy pre-increment. Always returns 0 - size_t operator ++() const + counter_type operator ++() const { return 0; } /// Dummy post-increment. Always returns 0 - size_t operator ++(int) const + counter_type operator ++(int) const { return 0; } /// Dummy pre-decrement. Always returns 0 - size_t operator --() const + counter_type operator --() const { return 0; } /// Dummy post-decrement. Always returns 0 - size_t operator --(int) const + counter_type operator --(int) const + { + return 0; + } + + /// Dummy increment by \p count, always returns 0 + counter_type operator +=( counter_type count ) + { + CDS_UNUSED( count ); + return 0; + } + + /// Dummy decrement by \p count, always returns 0 + counter_type operator -=( counter_type count ) { + CDS_UNUSED( count ); return 0; } diff --git a/cds/compiler/defs.h b/cds/compiler/defs.h index 69f01670..a70ee5b9 100644 --- a/cds/compiler/defs.h +++ b/cds/compiler/defs.h @@ -66,6 +66,10 @@ # define cds_unlikely( expr ) expr #endif +#ifndef static_if +# define static_if if +#endif + // Features #include diff --git a/cds/container/vyukov_mpmc_cycle_queue.h b/cds/container/vyukov_mpmc_cycle_queue.h index 4a06618a..c452e1b5 100644 --- a/cds/container/vyukov_mpmc_cycle_queue.h +++ b/cds/container/vyukov_mpmc_cycle_queue.h @@ -65,9 +65,9 @@ namespace cds { namespace container { After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied. If \p T is a complex type, \p value_cleaner may be useful feature. - Default value is \ref opt::v::destruct_cleaner + Default value is \ref opt::v::auto_cleaner */ - typedef cds::opt::v::destruct_cleaner value_cleaner; + typedef cds::opt::v::auto_cleaner value_cleaner; /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting typedef cds::atomicity::empty_item_counter item_counter; @@ -105,7 +105,7 @@ namespace cds { namespace container { The functor calls the destructor for queue item. After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied. If \p T is a complex type, \p value_cleaner can be an useful feature. - Default value is \ref opt::v::destruct_cleaner + Default value is \ref opt::v::auto_cleaner - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used. - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled) To enable item counting use \p cds::atomicity::item_counter @@ -315,7 +315,7 @@ namespace cds { namespace container { return enqueue_with( [&val]( value_type& dest ) { new (&dest) value_type( std::move( val ));}); } - /// Synonym for \p enqueue( valuetype const& ) + /// Synonym for \p enqueue( value_type const& ) bool push( value_type const& data ) { return enqueue( data ); diff --git a/cds/container/weak_ringbuffer.h b/cds/container/weak_ringbuffer.h new file mode 100644 index 00000000..17b249b0 --- /dev/null +++ b/cds/container/weak_ringbuffer.h @@ -0,0 +1,570 @@ +/* + This file is a part of libcds - Concurrent Data Structures library + + (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017 + + Source code repo: http://github.com/khizmax/libcds/ + Download: http://sourceforge.net/projects/libcds/files/ + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H +#define CDSLIB_CONTAINER_WEAK_RINGBUFFER_H + +#include +#include +#include +#include +#include + +namespace cds { namespace container { + + /// \p WeakRingBuffer related definitions + /** @ingroup cds_nonintrusive_helper + */ + namespace weak_ringbuffer { + + /// \p WeakRingBuffer default traits + struct traits { + /// Buffer type for internal array + /* + The type of element for the buffer is not important: \p WeakRingBuffer rebind + the buffer for required type via \p rebind metafunction. + + For \p WeakRingBuffer the buffer size should have power-of-2 size. + + You should use only uninitialized buffer for the ring buffer - + \p cds::opt::v::uninitialized_dynamic_buffer (the default), + \p cds::opt::v::uninitialized_static_buffer. + */ + typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer; + + /// A functor to clean item dequeued. + /** + The functor calls the destructor for popped element. + After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied. + If \p T is a complex type, \p value_cleaner may be useful feature. + For POD types \ref opt::v::empty_cleaner is suitable + + Default value is \ref opt::v::auto_cleaner that calls destructor only if it is not trivial. + */ + typedef cds::opt::v::auto_cleaner value_cleaner; + + /// C++ memory ordering model + /** + Can be \p opt::v::relaxed_ordering (relaxed memory model, the default) + or \p opt::v::sequential_consistent (sequentially consistent memory model). + */ + typedef opt::v::relaxed_ordering memory_model; + + /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding + enum { padding = opt::cache_line_padding }; + }; + + /// Metafunction converting option list to \p weak_ringbuffer::traits + /** + Supported \p Options are: + - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are: + \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of + element in the buffer is not important: it will be changed via \p rebind metafunction. + - \p opt::value_cleaner - a functor to clean items dequeued. + The functor calls the destructor for ring-buffer item. + After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied. + If \p T is a complex type, \p value_cleaner can be an useful feature. + Default value is \ref opt::v::empty_cleaner that is suitable for POD types. + - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding + - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default) + or \p opt::v::sequential_consistent (sequentially consisnent memory model). + + Example: declare \p %WeakRingBuffer with static iternal buffer of size 1024: + \code + typedef cds::container::WeakRingBuffer< Foo, + typename cds::container::weak_ringbuffer::make_traits< + cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 > + >::type + > myRing; + \endcode + */ + template + struct make_traits { +# ifdef CDS_DOXYGEN_INVOKED + typedef implementation_defined type; ///< Metafunction result +# else + typedef typename cds::opt::make_options< + typename cds::opt::find_type_traits< traits, Options... >::type + , Options... + >::type type; +# endif + }; + + } // namespace weak_ringbuffer + + /// Single-producer single-consumer ring buffer + /** @ingroup cds_nonintrusive_queue + Source: [2013] Nhat Minh Le, Adrien Guatto, Albert Cohen, Antoniu Pop. Correct and Effcient Bounded + FIFO Queues. [Research Report] RR-8365, INRIA. 2013. + + Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations - + you can push/pop an array of elements. + */ + template + class WeakRingBuffer: public cds::bounded_container + { + public: + typedef T value_type; ///< Value type to be stored in the ring buffer + typedef Traits traits; ///< Ring buffer traits + typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option + typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p weak_ringbuffer::traits::value_cleaner + + /// Rebind template arguments + template + struct rebind { + typedef WeakRingBuffer< T2, Traits2 > other; ///< Rebinding result + }; + + //@cond + // Only for tests + typedef size_t item_counter; + //@endcond + + private: + //@cond + typedef typename traits::buffer::template rebind< value_type >::other buffer; + //@endcond + + public: + + /// Creates the ring buffer of \p capacity + /** + For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored. + + If the buffer capacity is a power of two, lightweight binary arithmetics is used + instead of modulo arithmetics. + */ + WeakRingBuffer( size_t capacity = 0 ) + : front_( 0 ) + , pfront_( 0 ) + , cback_( 0 ) + , buffer_( capacity ) + { + back_.store( 0, memory_model::memory_order_release ); + } + + /// Destroys the ring buffer + ~WeakRingBuffer() + { + value_cleaner cleaner; + size_t back = back_.load( memory_model::memory_order_relaxed ); + for ( size_t front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front ) + cleaner( buffer_[ buffer_.mod( front ) ] ); + } + + /// Batch push - push array \p arr of size \p count + /** + \p CopyFunc is a per-element copy functor: for each element of \p arr + copy( dest, arr[i] ) is called. + The \p CopyFunc signature: + \code + void copy_func( value_type& element, Q const& source ); + \endcode + Here \p element is uninitialized so you should construct it using placement new + if needed; for example, if the element type is \p str::string and \p Q is char const*, + \p copy functor can be: + \code + cds::container::WeakRingBuffer ringbuf; + char const* arr[10]; + ringbuf.push( arr, 10, + []( std::string& element, char const* src ) { + new( &element ) std::string( src ); + }); + \endcode + You may use move semantics if appropriate: + \code + cds::container::WeakRingBuffer ringbuf; + std::string arr[10]; + ringbuf.push( arr, 10, + []( std::string& element, std:string& src ) { + new( &element ) std::string( std::move( src )); + }); + \endcode + + Returns \p true if success or \p false if not enought sufficient space in the ring + */ + template + bool push( Q* arr, size_t count, CopyFunc copy ) + { + assert( count < capacity() ); + size_t back = back_.load( memory_model::memory_order_relaxed ); + + assert( back - pfront_ <= capacity() ); + + if ( pfront_ + capacity() - back < count ) { + pfront_ = front_.load( memory_model::memory_order_acquire ); + + if ( pfront_ + capacity() - back < count ) { + // not enought space + return false; + } + } + + // copy data + for ( size_t i = 0; i < count; ++i, ++back ) + copy( buffer_[buffer_.mod( back )], arr[i] ); + + back_.store( back, memory_model::memory_order_release ); + + return true; + } + + /// Batch push - push array \p arr of size \p count with assignment as copy functor + /** + This function is equivalent for: + \code + push( arr, count, []( value_type& dest, Q const& src ) { dest = src; } ); + \endcode + + The function is available only if std::is_constructible::value + is \p true. + + Returns \p true if success or \p false if not enought sufficient space in the ring + */ + template + typename std::enable_if< std::is_constructible::value, bool>::type + push( Q* arr, size_t count ) + { + return push( arr, count, []( value_type& dest, Q const& src ) { new( &dest ) value_type( src ); } ); + } + + /// Push one element created from \p args + /** + The function is available only if std::is_constructible::value + is \p true. + + Returns \p false if the ring is full or \p true otherwise. + */ + template + typename std::enable_if< std::is_constructible::value, bool>::type + emplace( Args&&... args ) + { + size_t back = back_.load( memory_model::memory_order_relaxed ); + + assert( back - pfront_ <= capacity() ); + + if ( pfront_ + capacity() - back < 1 ) { + pfront_ = front_.load( memory_model::memory_order_acquire ); + + if ( pfront_ + capacity() - back < 1 ) { + // not enought space + return false; + } + } + + new( &buffer_[buffer_.mod( back )] ) value_type( std::forward(args)... ); + + back_.store( back + 1, memory_model::memory_order_release ); + + return true; + } + + /// Enqueues data to the ring using a functor + /** + \p Func is a functor called to copy a value to the ring element. + The functor \p f takes one argument - a reference to a empty cell of type \ref value_type : + \code + cds::container::WeakRingBuffer< Foo > myRing; + Bar bar; + myRing.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } ); + \endcode + */ + template + bool enqueue_with( Func f ) + { + size_t back = back_.load( memory_model::memory_order_relaxed ); + + assert( back - pfront_ <= capacity() ); + + if ( pfront_ + capacity() - back < 1 ) { + pfront_ = front_.load( memory_model::memory_order_acquire ); + + if ( pfront_ + capacity() - back < 1 ) { + // not enought space + return false; + } + } + + f( buffer_[buffer_.mod( back )] ); + + back_.store( back + 1, memory_model::memory_order_release ); + + return true; + + } + + /// Enqueues \p val value into the queue. + /** + The new queue item is created by calling placement new in free cell. + Returns \p true if success, \p false if the ring is full. + */ + bool enqueue( value_type const& val ) + { + return emplace( val ); + } + + /// Enqueues \p val value into the queue, move semantics + bool enqueue( value_type&& val ) + { + return emplace( std::move( val )); + } + + /// Synonym for \p enqueue( value_type const& ) + bool push( value_type const& val ) + { + return enqueue( val ); + } + + /// Synonym for \p enqueue( value_type&& ) + bool push( value_type&& val ) + { + return enqueue( std::move( val )); + } + + /// Synonym for \p enqueue_with() + template + bool push_with( Func f ) + { + return enqueue_with( f ); + } + + /// Batch pop \p count element from the ring buffer into \p arr + /** + \p CopyFunc is a per-element copy functor: for each element of \p arr + copy( arr[i], source ) is called. + The \p CopyFunc signature: + \code + void copy_func( Q& dest, value_type& elemen ); + \endcode + + Returns \p true if success or \p false if not enought sufficient space in the ring + */ + template + bool pop( Q* arr, size_t count, CopyFunc copy ) + { + assert( count < capacity() ); + + size_t front = front_.load( memory_model::memory_order_relaxed ); + assert( cback_ - front < capacity() ); + + if ( cback_ - front < count ) { + cback_ = back_.load( memory_model::memory_order_acquire ); + if ( cback_ - front < count ) + return false; + } + + // copy data + value_cleaner cleaner; + for ( size_t i = 0; i < count; ++i, ++front ) { + value_type& val = buffer_[buffer_.mod( front )]; + copy( arr[i], val ); + cleaner( val ); + } + + front_.store( front, memory_model::memory_order_release ); + return true; + } + + /// Batch pop - push array \p arr of size \p count with assignment as copy functor + /** + This function is equivalent for: + \code + pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } ); + \endcode + + The function is available only if std::is_assignable::value + is \p true. + + Returns \p true if success or \p false if not enought sufficient space in the ring + */ + template + typename std::enable_if< std::is_assignable::value, bool>::type + pop( Q* arr, size_t count ) + { + return pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } ); + } + + /// Dequeues an element from the ring to \p val + /** + The function is available only if std::is_assignable::value + is \p true. + + Returns \p false if the ring is full or \p true otherwise. + */ + template + typename std::enable_if< std::is_assignable::value, bool>::type + dequeue( Q& val ) + { + return pop( &val, 1 ); + } + + /// Synonym for \p dequeue( Q& ) + template + typename std::enable_if< std::is_assignable::value, bool>::type + pop( Q& val ) + { + return dequeue( val ); + } + + /// Dequeues a value using a functor + /** + \p Func is a functor called to copy dequeued value. + The functor takes one argument - a reference to removed node: + \code + cds:container::WeakRingBuffer< Foo > myRing; + Bar bar; + myRing.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );}); + \endcode + + Returns \p true if the ring is not empty, \p false otherwise. + The functor is called only if the ring is not empty. + */ + template + bool dequeue_with( Func f ) + { + size_t front = front_.load( memory_model::memory_order_relaxed ); + assert( cback_ - front < capacity() ); + + if ( cback_ - front < 1 ) { + cback_ = back_.load( memory_model::memory_order_acquire ); + if ( cback_ - front < 1 ) + return false; + } + + value_type& val = buffer_[buffer_.mod( front )]; + f( val ); + value_cleaner()( val ); + + front_.store( front + 1, memory_model::memory_order_release ); + return true; + } + + /// Synonym for \p dequeue_with() + template + bool pop_with( Func f ) + { + return dequeue_with( f ); + } + + /// Gets pointer to first element of ring buffer + /** + If the ring buffer is empty, returns \p nullptr + + The function is thread-safe since there is only one consumer. + Recall, \p WeakRingBuffer is single-producer/single consumer container. + */ + value_type* front() + { + size_t front = front_.load( memory_model::memory_order_relaxed ); + assert( cback_ - front < capacity() ); + + if ( cback_ - front < 1 ) { + cback_ = back_.load( memory_model::memory_order_acquire ); + if ( cback_ - front < 1 ) + return nullptr; + } + + return &buffer_[buffer_.mod( front )]; + } + + /// Removes front element of ring-buffer + /** + If the ring-buffer is empty, returns \p false. + Otherwise, pops the first element from the ring. + */ + bool pop_front() + { + size_t front = front_.load( memory_model::memory_order_relaxed ); + assert( cback_ - front <= capacity() ); + + if ( cback_ - front < 1 ) { + cback_ = back_.load( memory_model::memory_order_acquire ); + if ( cback_ - front < 1 ) + return false; + } + + // clean cell + value_cleaner()( buffer_[buffer_.mod( front )] ); + + front_.store( front + 1, memory_model::memory_order_release ); + return true; + } + + /// Clears the ring buffer + void clear() + { + value_type v; + while ( pop( v ) ); + } + + + /// Checks if the ring-buffer is empty + bool empty() const + { + return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed ); + } + + /// Checks if the ring-buffer is full + bool full() const + { + return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity(); + } + + /// Returns the current size of ring buffer + size_t size() const + { + return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ); + } + + /// Returns capacity of the ring buffer + size_t capacity() const + { + return buffer_.capacity(); + } + + private: + //@cond + atomics::atomic front_; + typename opt::details::apply_padding< atomics::atomic, traits::padding >::padding_type pad1_; + atomics::atomic back_; + typename opt::details::apply_padding< atomics::atomic, traits::padding >::padding_type pad2_; + size_t pfront_; + typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_; + size_t cback_; + typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_; + + buffer buffer_; + //@endcond + }; + +}} // namespace cds::container + + +#endif // #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H diff --git a/cds/opt/buffer.h b/cds/opt/buffer.h index ca3cb874..90d79478 100644 --- a/cds/opt/buffer.h +++ b/cds/opt/buffer.h @@ -173,6 +173,19 @@ namespace cds { namespace opt { { return &( m_buffer[0].v ); } + + /// Returns idx % capacity() + /** + If the buffer size is a power of two, binary arithmethics is used + instead of modulo arithmetics + */ + size_t mod( size_t idx ) + { + static_if( c_bExp2 ) + return idx & ( capacity() - 1 ); + else + return idx % capacity(); + } }; /// Static initialized buffer @@ -265,6 +278,19 @@ namespace cds { namespace opt { { return m_buffer; } + + /// Returns idx % capacity() + /** + If the buffer size is a power of two, binary arithmethics is used + instead of modulo arithmetics + */ + size_t mod( size_t idx ) + { + static_if( c_bExp2 ) + return idx & ( capacity() - 1 ); + else + return idx % capacity(); + } }; /// Dynamically allocated uninitialized buffer @@ -367,6 +393,19 @@ namespace cds { namespace opt { { return m_buffer; } + + /// Returns idx % capacity() + /** + If the buffer size is a power of two, binary arithmethics is used + instead of modulo arithmetics + */ + size_t mod( size_t idx ) + { + static_if ( c_bExp2 ) + return idx & ( capacity() - 1 ); + else + return idx % capacity(); + } }; @@ -471,6 +510,19 @@ namespace cds { namespace opt { { return m_buffer; } + + /// Returns idx % capacity() + /** + If the buffer size is a power of two, binary arithmethics is used + instead of modulo arithmetics + */ + size_t mod( size_t idx ) + { + static_if( c_bExp2 ) + return idx & ( capacity() - 1 ); + else + return idx % capacity(); + } }; } // namespace v diff --git a/cds/opt/value_cleaner.h b/cds/opt/value_cleaner.h index b2a7112e..81e79b16 100644 --- a/cds/opt/value_cleaner.h +++ b/cds/opt/value_cleaner.h @@ -48,13 +48,15 @@ namespace cds { namespace opt { void operator ()( value_type& val ) { ... - // code to cleanup \p val + // code to cleanup val } } \endcode Predefined option types: \li opt::v::empty_cleaner + \li opt::v::destruct_cleaner + \li opt::v::auto_cleaner */ template struct value_cleaner { @@ -92,7 +94,32 @@ namespace cds { namespace opt { template void operator()( T& val ) { - (&val)->T::~T(); + ( &val )->~T(); + } + //@endcond + }; + + /// Cleaner that calls \p T destructor if it is not trivial + /** + If \p T has non-trivial destructor ( std::is_trivially_destructible::value is \p false), + the cleaner calls \p T destructor. + + If std::is_trivially_destructible::value is \p true, the cleaner is empty - no + destructor of \p T is called. + */ + struct auto_cleaner + { + //@cond + template + typename std::enable_if< std::is_trivially_destructible::value >::type + operator()( T& /*val*/ ) + {} + + template + typename std::enable_if< !std::is_trivially_destructible::value >::type + operator()( T& val ) + { + ( &val )->~T(); } //@endcond }; diff --git a/projects/Win/vc141/cds.vcxproj b/projects/Win/vc141/cds.vcxproj index 18ff9405..a74adfc6 100644 --- a/projects/Win/vc141/cds.vcxproj +++ b/projects/Win/vc141/cds.vcxproj @@ -506,6 +506,7 @@ + diff --git a/projects/Win/vc141/cds.vcxproj.filters b/projects/Win/vc141/cds.vcxproj.filters index 305825bf..137444c8 100644 --- a/projects/Win/vc141/cds.vcxproj.filters +++ b/projects/Win/vc141/cds.vcxproj.filters @@ -1255,5 +1255,8 @@ Header Files\cds\algo + + Header Files\cds\container + \ No newline at end of file diff --git a/projects/Win/vc141/gtest-queue.vcxproj b/projects/Win/vc141/gtest-queue.vcxproj index eeaec107..6a1efb9f 100644 --- a/projects/Win/vc141/gtest-queue.vcxproj +++ b/projects/Win/vc141/gtest-queue.vcxproj @@ -55,6 +55,7 @@ + diff --git a/projects/Win/vc141/gtest-queue.vcxproj.filters b/projects/Win/vc141/gtest-queue.vcxproj.filters index 82aefaa4..692356f3 100644 --- a/projects/Win/vc141/gtest-queue.vcxproj.filters +++ b/projects/Win/vc141/gtest-queue.vcxproj.filters @@ -89,6 +89,9 @@ Source Files + + Source Files + diff --git a/test/stress/queue/queue_type.h b/test/stress/queue/queue_type.h index 12fd8cf8..2e02f218 100644 --- a/test/stress/queue/queue_type.h +++ b/test/stress/queue/queue_type.h @@ -40,6 +40,7 @@ #include #include #include +#include #include #include @@ -293,6 +294,29 @@ namespace fc_details{ }; + // WeakRingBuffer + struct traits_WeakRingBuffer_dyn: public cds::container::weak_ringbuffer::traits + { + typedef cds::opt::v::uninitialized_dynamic_buffer< int > buffer; + }; + class WeakRingBuffer_dyn + : public cds::container::WeakRingBuffer< Value, traits_WeakRingBuffer_dyn > + { + typedef cds::container::WeakRingBuffer< Value, traits_WeakRingBuffer_dyn > base_class; + public: + WeakRingBuffer_dyn() + : base_class( 1024 * 64 ) + {} + WeakRingBuffer_dyn( size_t nCapacity ) + : base_class( nCapacity ) + {} + + cds::opt::none statistics() const + { + return cds::opt::none(); + } + }; + // BasketQueue typedef cds::container::BasketQueue< cds::gc::HP , Value > BasketQueue_HP; @@ -790,6 +814,9 @@ namespace cds_test { CDSSTRESS_Queue_F( test_fixture, VyukovMPMCCycleQueue_dyn ) \ CDSSTRESS_Queue_F( test_fixture, VyukovMPMCCycleQueue_dyn_ic ) +#define CDSSTRESS_WeakRingBuffer( test_fixture ) \ + CDSSTRESS_Queue_F( test_fixture, WeakRingBuffer_dyn ) + #define CDSSTRESS_StdQueue( test_fixture ) \ CDSSTRESS_Queue_F( test_fixture, StdQueue_deque_Spinlock ) \ CDSSTRESS_Queue_F( test_fixture, StdQueue_list_Spinlock ) \ diff --git a/test/unit/queue/CMakeLists.txt b/test/unit/queue/CMakeLists.txt index 626ef727..aea3e81f 100644 --- a/test/unit/queue/CMakeLists.txt +++ b/test/unit/queue/CMakeLists.txt @@ -15,6 +15,7 @@ set(CDSGTEST_QUEUE_SOURCES segmented_queue_hp.cpp segmented_queue_dhp.cpp vyukov_mpmc_queue.cpp + weak_ringbuffer.cpp intrusive_basket_queue_hp.cpp intrusive_basket_queue_dhp.cpp intrusive_fcqueue.cpp diff --git a/test/unit/queue/test_bounded_queue.h b/test/unit/queue/test_bounded_queue.h index f4179963..a1657b1b 100644 --- a/test/unit/queue/test_bounded_queue.h +++ b/test/unit/queue/test_bounded_queue.h @@ -57,6 +57,7 @@ namespace cds_test { } ASSERT_FALSE( q.empty()); ASSERT_CONTAINER_SIZE( q, nSize ); + ASSERT_FALSE( q.enqueue( static_cast( nSize ) * 2 ) ); for ( size_t i = 0; i < nSize; ++i ) { it = -1; diff --git a/test/unit/queue/weak_ringbuffer.cpp b/test/unit/queue/weak_ringbuffer.cpp new file mode 100644 index 00000000..64d97b6c --- /dev/null +++ b/test/unit/queue/weak_ringbuffer.cpp @@ -0,0 +1,255 @@ +/* + This file is a part of libcds - Concurrent Data Structures library + + (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017 + + Source code repo: http://github.com/khizmax/libcds/ + Download: http://sourceforge.net/projects/libcds/files/ + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#include "test_bounded_queue.h" + +#include + +namespace { + namespace cc = cds::container; + + class WeakRingBuffer: public cds_test::bounded_queue + { + public: + template + void test_array( Queue& q ) + { + typedef typename Queue::value_type value_type; + + const size_t nSize = q.capacity(); + static const size_t nArrSize = 16; + const size_t nArrCount = nSize / nArrSize; + + { + value_type el[nArrSize]; + + // batch push + for ( size_t i = 0; i < nSize; i += nArrSize ) { + for ( size_t k = 0; k < nArrSize; ++k ) + el[k] = static_cast( i + k ); + + if ( i + nArrSize <= nSize ) { + ASSERT_TRUE( q.push( el, nArrSize ) ); + } + else { + ASSERT_FALSE( q.push( el, nArrSize ) ); + } + } + + ASSERT_TRUE( !q.empty() ); + if ( nSize % nArrSize != 0 ) { + ASSERT_FALSE( q.full() ); + ASSERT_CONTAINER_SIZE( q, nArrCount * nArrSize ); + for ( size_t i = nArrCount * nArrSize; i < nSize; ++i ) { + ASSERT_TRUE( q.enqueue( static_cast( i ) ) ); + } + } + ASSERT_TRUE( q.full() ); + ASSERT_CONTAINER_SIZE( q, nSize ); + + // batch pop + value_type expected = 0; + while ( q.pop( el, nArrSize ) ) { + for ( size_t i = 0; i < nArrSize; ++i ) { + ASSERT_EQ( el[i], expected ); + ++expected; + } + } + + if ( nSize % nArrSize == 0 ) { + ASSERT_TRUE( q.empty() ); + } + else { + ASSERT_FALSE( q.empty() ); + ASSERT_CONTAINER_SIZE( q, nSize % nArrSize ); + q.clear(); + } + ASSERT_TRUE( q.empty() ); + ASSERT_FALSE( q.full() ); + ASSERT_CONTAINER_SIZE( q, 0u ); + } + + { + // batch push with functor + size_t el[nArrSize]; + + auto func_push = []( value_type& dest, size_t src ) { dest = static_cast( src * 10 ); }; + for ( size_t i = 0; i < nSize; i += nArrSize ) { + for ( size_t k = 0; k < nArrSize; ++k ) + el[k] = i + k; + + if ( i + nArrSize <= nSize ) { + ASSERT_TRUE( q.push( el, nArrSize, func_push ) ); + } + else { + ASSERT_FALSE( q.push( el, nArrSize, func_push ) ); + } + } + + ASSERT_TRUE( !q.empty() ); + if ( nSize % nArrSize != 0 ) { + ASSERT_FALSE( q.full() ); + ASSERT_CONTAINER_SIZE( q, nArrCount * nArrSize ); + for ( size_t i = nArrCount * nArrSize; i < nSize; ++i ) { + ASSERT_TRUE( q.push( &i, 1, func_push ) ); + } + } + ASSERT_TRUE( q.full() ); + ASSERT_CONTAINER_SIZE( q, nSize ); + + // batch pop with functor + auto func_pop = []( size_t& dest, value_type src ) { dest = static_cast( src / 10 ); }; + size_t expected = 0; + while ( q.pop( el, nArrSize, func_pop ) ) { + for ( size_t i = 0; i < nArrSize; ++i ) { + ASSERT_EQ( el[i], expected ); + ++expected; + } + } + + if ( nSize % nArrSize == 0 ) { + ASSERT_TRUE( q.empty() ); + } + else { + ASSERT_FALSE( q.empty() ); + ASSERT_CONTAINER_SIZE( q, nSize % nArrSize ); + size_t v; + while ( q.pop( &v, 1, func_pop ) ) { + ASSERT_EQ( v, expected ); + ++expected; + } + } + ASSERT_TRUE( q.empty() ); + ASSERT_FALSE( q.full() ); + ASSERT_CONTAINER_SIZE( q, 0u ); + + // front/pop_front + for ( size_t i = 0; i < nSize; i += nArrSize ) { + for ( size_t k = 0; k < nArrSize; ++k ) + el[k] = i + k; + + if ( i + nArrSize <= nSize ) { + ASSERT_TRUE( q.push( el, nArrSize, func_push ) ); + } + else { + ASSERT_FALSE( q.push( el, nArrSize, func_push ) ); + } + } + + ASSERT_TRUE( !q.empty() ); + if ( nSize % nArrSize != 0 ) { + ASSERT_FALSE( q.full() ); + ASSERT_CONTAINER_SIZE( q, nArrCount * nArrSize ); + for ( size_t i = nArrCount * nArrSize; i < nSize; ++i ) { + ASSERT_TRUE( q.push( &i, 1, func_push ) ); + } + } + ASSERT_TRUE( q.full() ); + ASSERT_CONTAINER_SIZE( q, nSize ); + + value_type cur = 0; + while ( !q.empty() ) { + value_type* front = q.front(); + ASSERT_TRUE( front != nullptr ); + ASSERT_EQ( cur, *front ); + ASSERT_TRUE( q.pop_front() ); + cur += 10; + } + + ASSERT_TRUE( q.empty() ); + ASSERT_TRUE( q.front() == nullptr ); + ASSERT_FALSE( q.pop_front() ); + } + } + }; + + TEST_F( WeakRingBuffer, defaulted ) + { + typedef cds::container::WeakRingBuffer< int > test_queue; + + test_queue q( 128 ); + test( q ); + test_array( q ); + } + + TEST_F( WeakRingBuffer, stat ) + { + struct traits: public cds::container::weak_ringbuffer::traits + { + typedef cds::opt::v::uninitialized_static_buffer buffer; + }; + typedef cds::container::WeakRingBuffer< int, traits > test_queue; + + test_queue q; + test( q ); + test_array( q ); + } + + TEST_F( WeakRingBuffer, dynamic ) + { + struct traits: public cds::container::weak_ringbuffer::traits + { + typedef cds::opt::v::uninitialized_dynamic_buffer buffer; + }; + typedef cds::container::WeakRingBuffer< int, traits > test_queue; + + test_queue q( 128 ); + test( q ); + test_array( q ); + } + + TEST_F( WeakRingBuffer, dynamic_mod ) + { + struct traits: public cds::container::weak_ringbuffer::traits + { + typedef cds::opt::v::uninitialized_dynamic_buffer buffer; + }; + typedef cds::container::WeakRingBuffer< int, traits > test_queue; + + test_queue q( 100 ); + test( q ); + test_array( q ); + } + + TEST_F( WeakRingBuffer, dynamic_padding ) + { + struct traits: public cds::container::weak_ringbuffer::traits + { + typedef cds::opt::v::uninitialized_dynamic_buffer buffer; + enum { padding = 32 }; + }; + typedef cds::container::WeakRingBuffer< int, traits > test_queue; + + test_queue q( 128 ); + test( q ); + test_array( q ); + } + +} // namespace \ No newline at end of file