2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
32 #define CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
40 namespace cds { namespace container {
42 /// \p WeakRingBuffer related definitions
43 /** @ingroup cds_nonintrusive_helper
45 namespace weak_ringbuffer {
47 /// \p WeakRingBuffer default traits
49 /// Buffer type for internal array
51 The type of element for the buffer is not important: \p WeakRingBuffer rebind
52 the buffer for required type via \p rebind metafunction.
54 For \p WeakRingBuffer the buffer size should have power-of-2 size.
56 You should use only uninitialized buffer for the ring buffer -
57 \p cds::opt::v::uninitialized_dynamic_buffer (the default),
58 \p cds::opt::v::uninitialized_static_buffer.
60 typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
62 /// A functor to clean item dequeued.
64 The functor calls the destructor for popped element.
65 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
66 If \p T is a complex type, \p value_cleaner may be useful feature.
67 For POD types \ref opt::v::empty_cleaner is suitable
69 Default value is \ref opt::v::auto_cleaner that calls destructor only if it is not trivial.
71 typedef cds::opt::v::auto_cleaner value_cleaner;
73 /// C++ memory ordering model
75 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
76 or \p opt::v::sequential_consistent (sequentially consistent memory model).
78 typedef opt::v::relaxed_ordering memory_model;
80 /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
81 enum { padding = opt::cache_line_padding };
84 /// Metafunction converting option list to \p weak_ringbuffer::traits
86 Supported \p Options are:
87 - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
88 \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
89 element in the buffer is not important: it will be changed via \p rebind metafunction.
90 - \p opt::value_cleaner - a functor to clean items dequeued.
91 The functor calls the destructor for ring-buffer item.
92 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
93 If \p T is a complex type, \p value_cleaner can be an useful feature.
94 Default value is \ref opt::v::empty_cleaner that is suitable for POD types.
95 - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
96 - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
97 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
99 Example: declare \p %WeakRingBuffer with static iternal buffer of size 1024:
101 typedef cds::container::WeakRingBuffer< Foo,
102 typename cds::container::weak_ringbuffer::make_traits<
103 cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >
108 template <typename... Options>
110 # ifdef CDS_DOXYGEN_INVOKED
111 typedef implementation_defined type; ///< Metafunction result
113 typedef typename cds::opt::make_options<
114 typename cds::opt::find_type_traits< traits, Options... >::type
120 } // namespace weak_ringbuffer
122 /// Single-producer single-consumer ring buffer
123 /** @ingroup cds_nonintrusive_queue
124 Source: [2013] Nhat Minh Le, Adrien Guatto, Albert Cohen, Antoniu Pop. Correct and Effcient Bounded
125 FIFO Queues. [Research Report] RR-8365, INRIA. 2013. <hal-00862450>
127 Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
128 you can push/pop an array of elements.
130 There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer<void, Traits>"
131 that is not a queue but a "memory pool" between producer and consumer threads.
132 \p WeakRingBuffer<void> supports data of different size.
134 template <typename T, typename Traits = weak_ringbuffer::traits>
135 class WeakRingBuffer: public cds::bounded_container
138 typedef T value_type; ///< Value type to be stored in the ring buffer
139 typedef Traits traits; ///< Ring buffer traits
140 typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option
141 typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p weak_ringbuffer::traits::value_cleaner
143 /// Rebind template arguments
144 template <typename T2, typename Traits2>
146 typedef WeakRingBuffer< T2, Traits2 > other; ///< Rebinding result
151 typedef size_t item_counter;
156 typedef typename traits::buffer::template rebind< value_type >::other buffer;
161 /// Creates the ring buffer of \p capacity
163 For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
165 If the buffer capacity is a power of two, lightweight binary arithmetics is used
166 instead of modulo arithmetics.
168 WeakRingBuffer( size_t capacity = 0 )
172 , buffer_( capacity )
174 back_.store( 0, memory_model::memory_order_release );
177 /// Destroys the ring buffer
180 value_cleaner cleaner;
181 size_t back = back_.load( memory_model::memory_order_relaxed );
182 for ( size_t front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
183 cleaner( buffer_[ buffer_.mod( front ) ] );
186 /// Batch push - push array \p arr of size \p count
188 \p CopyFunc is a per-element copy functor: for each element of \p arr
189 <tt>copy( dest, arr[i] )</tt> is called.
190 The \p CopyFunc signature:
192 void copy_func( value_type& element, Q const& source );
194 Here \p element is uninitialized so you should construct it using placement new
195 if needed; for example, if the element type is \p str::string and \p Q is <tt>char const*</tt>,
196 \p copy functor can be:
198 cds::container::WeakRingBuffer<std::string> ringbuf;
200 ringbuf.push( arr, 10,
201 []( std::string& element, char const* src ) {
202 new( &element ) std::string( src );
205 You may use move semantics if appropriate:
207 cds::container::WeakRingBuffer<std::string> ringbuf;
209 ringbuf.push( arr, 10,
210 []( std::string& element, std:string& src ) {
211 new( &element ) std::string( std::move( src ));
215 Returns \p true if success or \p false if not enough space in the ring
217 template <typename Q, typename CopyFunc>
218 bool push( Q* arr, size_t count, CopyFunc copy )
220 assert( count < capacity() );
221 size_t back = back_.load( memory_model::memory_order_relaxed );
223 assert( back - pfront_ <= capacity() );
225 if ( pfront_ + capacity() - back < count ) {
226 pfront_ = front_.load( memory_model::memory_order_acquire );
228 if ( pfront_ + capacity() - back < count ) {
235 for ( size_t i = 0; i < count; ++i, ++back )
236 copy( buffer_[buffer_.mod( back )], arr[i] );
238 back_.store( back, memory_model::memory_order_release );
243 /// Batch push - push array \p arr of size \p count with assignment as copy functor
245 This function is equivalent for:
247 push( arr, count, []( value_type& dest, Q const& src ) { dest = src; } );
250 The function is available only if <tt>std::is_constructible<value_type, Q>::value</tt>
253 Returns \p true if success or \p false if not enough space in the ring
255 template <typename Q>
256 typename std::enable_if< std::is_constructible<value_type, Q>::value, bool>::type
257 push( Q* arr, size_t count )
259 return push( arr, count, []( value_type& dest, Q const& src ) { new( &dest ) value_type( src ); } );
262 /// Push one element created from \p args
264 The function is available only if <tt>std::is_constructible<value_type, Args...>::value</tt>
267 Returns \p false if the ring is full or \p true otherwise.
269 template <typename... Args>
270 typename std::enable_if< std::is_constructible<value_type, Args...>::value, bool>::type
271 emplace( Args&&... args )
273 size_t back = back_.load( memory_model::memory_order_relaxed );
275 assert( back - pfront_ <= capacity() );
277 if ( pfront_ + capacity() - back < 1 ) {
278 pfront_ = front_.load( memory_model::memory_order_acquire );
280 if ( pfront_ + capacity() - back < 1 ) {
286 new( &buffer_[buffer_.mod( back )] ) value_type( std::forward<Args>(args)... );
288 back_.store( back + 1, memory_model::memory_order_release );
293 /// Enqueues data to the ring using a functor
295 \p Func is a functor called to copy a value to the ring element.
296 The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
298 cds::container::WeakRingBuffer< Foo > myRing;
300 myRing.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
303 template <typename Func>
304 bool enqueue_with( Func f )
306 size_t back = back_.load( memory_model::memory_order_relaxed );
308 assert( back - pfront_ <= capacity() );
310 if ( pfront_ + capacity() - back < 1 ) {
311 pfront_ = front_.load( memory_model::memory_order_acquire );
313 if ( pfront_ + capacity() - back < 1 ) {
319 f( buffer_[buffer_.mod( back )] );
321 back_.store( back + 1, memory_model::memory_order_release );
327 /// Enqueues \p val value into the queue.
329 The new queue item is created by calling placement new in free cell.
330 Returns \p true if success, \p false if the ring is full.
332 bool enqueue( value_type const& val )
334 return emplace( val );
337 /// Enqueues \p val value into the queue, move semantics
338 bool enqueue( value_type&& val )
340 return emplace( std::move( val ));
343 /// Synonym for \p enqueue( value_type const& )
344 bool push( value_type const& val )
346 return enqueue( val );
349 /// Synonym for \p enqueue( value_type&& )
350 bool push( value_type&& val )
352 return enqueue( std::move( val ));
355 /// Synonym for \p enqueue_with()
356 template <typename Func>
357 bool push_with( Func f )
359 return enqueue_with( f );
362 /// Batch pop \p count element from the ring buffer into \p arr
364 \p CopyFunc is a per-element copy functor: for each element of \p arr
365 <tt>copy( arr[i], source )</tt> is called.
366 The \p CopyFunc signature:
368 void copy_func( Q& dest, value_type& elemen );
371 Returns \p true if success or \p false if not enough space in the ring
373 template <typename Q, typename CopyFunc>
374 bool pop( Q* arr, size_t count, CopyFunc copy )
376 assert( count < capacity() );
378 size_t front = front_.load( memory_model::memory_order_relaxed );
379 assert( cback_ - front < capacity() );
381 if ( cback_ - front < count ) {
382 cback_ = back_.load( memory_model::memory_order_acquire );
383 if ( cback_ - front < count )
388 value_cleaner cleaner;
389 for ( size_t i = 0; i < count; ++i, ++front ) {
390 value_type& val = buffer_[buffer_.mod( front )];
395 front_.store( front, memory_model::memory_order_release );
399 /// Batch pop - push array \p arr of size \p count with assignment as copy functor
401 This function is equivalent for:
403 pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
406 The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
409 Returns \p true if success or \p false if not enough space in the ring
411 template <typename Q>
412 typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
413 pop( Q* arr, size_t count )
415 return pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
418 /// Dequeues an element from the ring to \p val
420 The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
423 Returns \p false if the ring is full or \p true otherwise.
425 template <typename Q>
426 typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
429 return pop( &val, 1 );
432 /// Synonym for \p dequeue( Q& )
433 template <typename Q>
434 typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
437 return dequeue( val );
440 /// Dequeues a value using a functor
442 \p Func is a functor called to copy dequeued value.
443 The functor takes one argument - a reference to removed node:
445 cds:container::WeakRingBuffer< Foo > myRing;
447 myRing.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
450 Returns \p true if the ring is not empty, \p false otherwise.
451 The functor is called only if the ring is not empty.
453 template <typename Func>
454 bool dequeue_with( Func f )
456 size_t front = front_.load( memory_model::memory_order_relaxed );
457 assert( cback_ - front < capacity() );
459 if ( cback_ - front < 1 ) {
460 cback_ = back_.load( memory_model::memory_order_acquire );
461 if ( cback_ - front < 1 )
465 value_type& val = buffer_[buffer_.mod( front )];
467 value_cleaner()( val );
469 front_.store( front + 1, memory_model::memory_order_release );
473 /// Synonym for \p dequeue_with()
474 template <typename Func>
475 bool pop_with( Func f )
477 return dequeue_with( f );
480 /// Gets pointer to first element of ring buffer
482 If the ring buffer is empty, returns \p nullptr
484 The function is thread-safe since there is only one consumer.
485 Recall, \p WeakRingBuffer is single-producer/single consumer container.
489 size_t front = front_.load( memory_model::memory_order_relaxed );
490 assert( cback_ - front < capacity() );
492 if ( cback_ - front < 1 ) {
493 cback_ = back_.load( memory_model::memory_order_acquire );
494 if ( cback_ - front < 1 )
498 return &buffer_[buffer_.mod( front )];
501 /// Removes front element of ring-buffer
503 If the ring-buffer is empty, returns \p false.
504 Otherwise, pops the first element from the ring.
508 size_t front = front_.load( memory_model::memory_order_relaxed );
509 assert( cback_ - front <= capacity() );
511 if ( cback_ - front < 1 ) {
512 cback_ = back_.load( memory_model::memory_order_acquire );
513 if ( cback_ - front < 1 )
518 value_cleaner()( buffer_[buffer_.mod( front )] );
520 front_.store( front + 1, memory_model::memory_order_release );
524 /// Clears the ring buffer (only consumer can call this function!)
531 /// Checks if the ring-buffer is empty
534 return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
537 /// Checks if the ring-buffer is full
540 return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
543 /// Returns the current size of ring buffer
546 return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
549 /// Returns capacity of the ring buffer
550 size_t capacity() const
552 return buffer_.capacity();
557 atomics::atomic<size_t> front_;
558 typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
559 atomics::atomic<size_t> back_;
560 typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
562 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
564 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
571 /// Single-producer single-consumer ring buffer for untyped variable-sized data
572 /** @ingroup cds_nonintrusive_queue
573 @anchor cds_nonintrusive_WeakRingBuffer_void
575 #ifdef CDS_DOXYGEN_INVOKED
576 template <typename Traits = weak_ringbuffer::traits>
578 template <typename Traits>
580 class WeakRingBuffer<void, Traits>: public cds::bounded_container
583 typedef Traits traits; ///< Ring buffer traits
584 typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option
588 typedef typename traits::buffer::template rebind< uint8_t >::other buffer;
592 /// Creates the ring buffer of \p capacity bytes
594 For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
596 If the buffer capacity is a power of two, lightweight binary arithmetics is used
597 instead of modulo arithmetics.
599 WeakRingBuffer( size_t capacity = 0 )
603 , buffer_( capacity )
605 back_.store( 0, memory_model::memory_order_release );
608 /// [producer] Reserve \p size bytes
609 void* back( size_t size )
611 // Any data is rounded to 8-byte boundary
612 size_t real_size = calc_real_size( size );
614 // check if we can reserve read_size bytes
615 assert( real_size < capacity() );
616 size_t back = back_.load( memory_model::memory_order_relaxed );
618 assert( back - pfront_ <= capacity() );
620 if ( pfront_ + capacity() - back < real_size ) {
621 pfront_ = front_.load( memory_model::memory_order_acquire );
623 if ( pfront_ + capacity() - back < real_size ) {
629 uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
631 // Check if the buffer free space is enough for storing real_size bytes
632 size_t tail_size = capacity() - buffer_.mod( back );
633 if ( tail_size < real_size ) {
635 assert( tail_size >= sizeof( size_t ) );
636 assert( !is_tail( tail_size ) );
638 *reinterpret_cast<size_t*>( reserved ) = make_tail( tail_size - sizeof(size_t));
641 // We must be in beginning of buffer
642 assert( buffer_.mod( back ) == 0 );
644 if ( pfront_ + capacity() - back < real_size ) {
645 pfront_ = front_.load( memory_model::memory_order_acquire );
647 if ( pfront_ + capacity() - back < real_size ) {
653 reserved = buffer_.buffer();
656 // reserve and store size
657 *reinterpret_cast<size_t*>( reserved ) = size;
659 return reinterpret_cast<void*>( reserved + sizeof( size_t ) );
662 /// [producer] Push reserved bytes into ring
665 size_t back = back_.load( memory_model::memory_order_relaxed );
666 uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
668 size_t real_size = calc_real_size( *reinterpret_cast<size_t*>( reserved ) );
669 assert( real_size < capacity() );
671 back_.store( back + real_size, memory_model::memory_order_release );
674 /// [producer] Push \p data of \p size bytes into ring
675 bool push_back( void const* data, size_t size )
677 void* buf = back( size );
679 memcpy( buf, data, size );
686 /// [consumer] Get top data from the ring
687 std::pair<void*, size_t> front()
689 size_t front = front_.load( memory_model::memory_order_relaxed );
690 assert( cback_ - front < capacity() );
692 if ( cback_ - front < sizeof( size_t )) {
693 cback_ = back_.load( memory_model::memory_order_acquire );
694 if ( cback_ - front < sizeof( size_t ) )
695 return std::make_pair( nullptr, 0u );
698 uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
701 assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
703 size_t size = *reinterpret_cast<size_t*>( buf );
704 if ( is_tail( size ) ) {
706 CDS_VERIFY( pop_front() );
708 front = front_.load( memory_model::memory_order_relaxed );
709 buf = buffer_.buffer() + buffer_.mod( front );
710 size = *reinterpret_cast<size_t*>( buf );
712 assert( !is_tail( size ) );
716 size_t real_size = calc_real_size( size );
717 if ( cback_ - front < real_size ) {
718 cback_ = back_.load( memory_model::memory_order_acquire );
719 assert( cback_ - front >= real_size );
723 return std::make_pair( reinterpret_cast<void*>( buf + sizeof( size_t ) ), size );
726 /// [consumer] Pops top data
729 size_t front = front_.load( memory_model::memory_order_relaxed );
730 assert( cback_ - front <= capacity() );
732 if ( cback_ - front < sizeof(size_t) ) {
733 cback_ = back_.load( memory_model::memory_order_acquire );
734 if ( cback_ - front < sizeof( size_t ) )
738 uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
741 assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
743 size_t size = *reinterpret_cast<size_t*>( buf );
744 assert( !is_tail( size ) );
746 size_t real_size = calc_real_size( size );
749 if ( cback_ - front < real_size ) {
750 cback_ = back_.load( memory_model::memory_order_acquire );
751 assert( cback_ - front >= real_size );
755 front_.store( front + real_size, memory_model::memory_order_release );
760 /// [consumer] Clears the ring buffer
763 for ( auto el = front(); el.first; el = front() )
767 /// Checks if the ring-buffer is empty
770 return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
773 /// Checks if the ring-buffer is full
776 return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
779 /// Returns the current size of ring buffer
782 return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
785 /// Returns capacity of the ring buffer
786 size_t capacity() const
788 return buffer_.capacity();
792 static size_t calc_real_size( size_t size )
794 size_t real_size = (( size + sizeof( uintptr_t ) - 1 ) & ~( sizeof( uintptr_t ) - 1 )) + sizeof( size_t );
796 assert( real_size > size );
797 assert( real_size - size >= sizeof( size_t ) );
802 static bool is_tail( size_t size )
804 return ( size & ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ))) != 0;
807 static size_t make_tail( size_t size )
809 return size | ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ));
814 atomics::atomic<size_t> front_;
815 typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
816 atomics::atomic<size_t> back_;
817 typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
819 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
821 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
827 }} // namespace cds::container
830 #endif // #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H