2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
32 #define CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
40 namespace cds { namespace container {
42 /// \p WeakRingBuffer related definitions
43 /** @ingroup cds_nonintrusive_helper
45 namespace weak_ringbuffer {
47 /// \p WeakRingBuffer default traits
49 /// Buffer type for internal array
51 The type of element for the buffer is not important: \p WeakRingBuffer rebind
52 the buffer for required type via \p rebind metafunction.
54 For \p WeakRingBuffer the buffer size should have power-of-2 size.
56 You should use only uninitialized buffer for the ring buffer -
57 \p cds::opt::v::uninitialized_dynamic_buffer (the default),
58 \p cds::opt::v::uninitialized_static_buffer.
60 typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
62 /// A functor to clean item dequeued.
64 The functor calls the destructor for popped element.
65 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
66 If \p T is a complex type, \p value_cleaner may be useful feature.
67 For POD types \ref opt::v::empty_cleaner is suitable
69 Default value is \ref opt::v::auto_cleaner that calls destructor only if it is not trivial.
71 typedef cds::opt::v::auto_cleaner value_cleaner;
73 /// C++ memory ordering model
75 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
76 or \p opt::v::sequential_consistent (sequentially consistent memory model).
78 typedef opt::v::relaxed_ordering memory_model;
80 /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
81 enum { padding = opt::cache_line_padding };
84 /// Metafunction converting option list to \p weak_ringbuffer::traits
86 Supported \p Options are:
87 - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
88 \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
89 element in the buffer is not important: it will be changed via \p rebind metafunction.
90 - \p opt::value_cleaner - a functor to clean items dequeued.
91 The functor calls the destructor for ring-buffer item.
92 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
93 If \p T is a complex type, \p value_cleaner can be an useful feature.
94 Default value is \ref opt::v::empty_cleaner that is suitable for POD types.
95 - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
96 - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
97 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
99 Example: declare \p %WeakRingBuffer with static iternal buffer of size 1024:
101 typedef cds::container::WeakRingBuffer< Foo,
102 typename cds::container::weak_ringbuffer::make_traits<
103 cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >
108 template <typename... Options>
110 # ifdef CDS_DOXYGEN_INVOKED
111 typedef implementation_defined type; ///< Metafunction result
113 typedef typename cds::opt::make_options<
114 typename cds::opt::find_type_traits< traits, Options... >::type
120 } // namespace weak_ringbuffer
122 /// Single-producer single-consumer ring buffer
123 /** @ingroup cds_nonintrusive_queue
124 Source: [2013] Nhat Minh Le, Adrien Guatto, Albert Cohen, Antoniu Pop. Correct and Effcient Bounded
125 FIFO Queues. [Research Report] RR-8365, INRIA. 2013. <hal-00862450>
127 Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
128 you can push/pop an array of elements.
130 template <typename T, typename Traits = weak_ringbuffer::traits>
131 class WeakRingBuffer: public cds::bounded_container
134 typedef T value_type; ///< Value type to be stored in the ring buffer
135 typedef Traits traits; ///< Ring buffer traits
136 typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option
137 typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p weak_ringbuffer::traits::value_cleaner
139 /// Rebind template arguments
140 template <typename T2, typename Traits2>
142 typedef WeakRingBuffer< T2, Traits2 > other; ///< Rebinding result
147 typedef size_t item_counter;
152 typedef typename traits::buffer::template rebind< value_type >::other buffer;
157 /// Creates the ring buffer of \p capacity
159 For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
161 If the buffer capacity is a power of two, lightweight binary arithmetics is used
162 instead of modulo arithmetics.
164 WeakRingBuffer( size_t capacity = 0 )
168 , buffer_( capacity )
170 back_.store( 0, memory_model::memory_order_release );
173 /// Destroys the ring buffer
176 value_cleaner cleaner;
177 size_t back = back_.load( memory_model::memory_order_relaxed );
178 for ( size_t front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
179 cleaner( buffer_[ buffer_.mod( front ) ] );
182 /// Batch push - push array \p arr of size \p count
184 \p CopyFunc is a per-element copy functor: for each element of \p arr
185 <tt>copy( dest, arr[i] )</tt> is called.
186 The \p CopyFunc signature:
188 void copy_func( value_type& element, Q const& source );
190 Here \p element is uninitialized so you should construct it using placement new
191 if needed; for example, if the element type is \p str::string and \p Q is <tt>char const*</tt>,
192 \p copy functor can be:
194 cds::container::WeakRingBuffer<std::string> ringbuf;
196 ringbuf.push( arr, 10,
197 []( std::string& element, char const* src ) {
198 new( &element ) std::string( src );
201 You may use move semantics if appropriate:
203 cds::container::WeakRingBuffer<std::string> ringbuf;
205 ringbuf.push( arr, 10,
206 []( std::string& element, std:string& src ) {
207 new( &element ) std::string( std::move( src ));
211 Returns \p true if success or \p false if not enought sufficient space in the ring
213 template <typename Q, typename CopyFunc>
214 bool push( Q* arr, size_t count, CopyFunc copy )
216 assert( count < capacity() );
217 size_t back = back_.load( memory_model::memory_order_relaxed );
219 assert( back - pfront_ <= capacity() );
221 if ( pfront_ + capacity() - back < count ) {
222 pfront_ = front_.load( memory_model::memory_order_acquire );
224 if ( pfront_ + capacity() - back < count ) {
231 for ( size_t i = 0; i < count; ++i, ++back )
232 copy( buffer_[buffer_.mod( back )], arr[i] );
234 back_.store( back, memory_model::memory_order_release );
239 /// Batch push - push array \p arr of size \p count with assignment as copy functor
241 This function is equivalent for:
243 push( arr, count, []( value_type& dest, Q const& src ) { dest = src; } );
246 The function is available only if <tt>std::is_constructible<value_type, Q>::value</tt>
249 Returns \p true if success or \p false if not enought sufficient space in the ring
251 template <typename Q>
252 typename std::enable_if< std::is_constructible<value_type, Q>::value, bool>::type
253 push( Q* arr, size_t count )
255 return push( arr, count, []( value_type& dest, Q const& src ) { new( &dest ) value_type( src ); } );
258 /// Push one element created from \p args
260 The function is available only if <tt>std::is_constructible<value_type, Args...>::value</tt>
263 Returns \p false if the ring is full or \p true otherwise.
265 template <typename... Args>
266 typename std::enable_if< std::is_constructible<value_type, Args...>::value, bool>::type
267 emplace( Args&&... args )
269 size_t back = back_.load( memory_model::memory_order_relaxed );
271 assert( back - pfront_ <= capacity() );
273 if ( pfront_ + capacity() - back < 1 ) {
274 pfront_ = front_.load( memory_model::memory_order_acquire );
276 if ( pfront_ + capacity() - back < 1 ) {
282 new( &buffer_[buffer_.mod( back )] ) value_type( std::forward<Args>(args)... );
284 back_.store( back + 1, memory_model::memory_order_release );
289 /// Enqueues data to the ring using a functor
291 \p Func is a functor called to copy a value to the ring element.
292 The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
294 cds::container::WeakRingBuffer< Foo > myRing;
296 myRing.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
299 template <typename Func>
300 bool enqueue_with( Func f )
302 size_t back = back_.load( memory_model::memory_order_relaxed );
304 assert( back - pfront_ <= capacity() );
306 if ( pfront_ + capacity() - back < 1 ) {
307 pfront_ = front_.load( memory_model::memory_order_acquire );
309 if ( pfront_ + capacity() - back < 1 ) {
315 f( buffer_[buffer_.mod( back )] );
317 back_.store( back + 1, memory_model::memory_order_release );
323 /// Enqueues \p val value into the queue.
325 The new queue item is created by calling placement new in free cell.
326 Returns \p true if success, \p false if the ring is full.
328 bool enqueue( value_type const& val )
330 return emplace( val );
333 /// Enqueues \p val value into the queue, move semantics
334 bool enqueue( value_type&& val )
336 return emplace( std::move( val ));
339 /// Synonym for \p enqueue( value_type const& )
340 bool push( value_type const& val )
342 return enqueue( val );
345 /// Synonym for \p enqueue( value_type&& )
346 bool push( value_type&& val )
348 return enqueue( std::move( val ));
351 /// Synonym for \p enqueue_with()
352 template <typename Func>
353 bool push_with( Func f )
355 return enqueue_with( f );
358 /// Batch pop \p count element from the ring buffer into \p arr
360 \p CopyFunc is a per-element copy functor: for each element of \p arr
361 <tt>copy( arr[i], source )</tt> is called.
362 The \p CopyFunc signature:
364 void copy_func( Q& dest, value_type& elemen );
367 Returns \p true if success or \p false if not enought sufficient space in the ring
369 template <typename Q, typename CopyFunc>
370 bool pop( Q* arr, size_t count, CopyFunc copy )
372 assert( count < capacity() );
374 size_t front = front_.load( memory_model::memory_order_relaxed );
375 assert( cback_ - front < capacity() );
377 if ( cback_ - front < count ) {
378 cback_ = back_.load( memory_model::memory_order_acquire );
379 if ( cback_ - front < count )
384 value_cleaner cleaner;
385 for ( size_t i = 0; i < count; ++i, ++front ) {
386 value_type& val = buffer_[buffer_.mod( front )];
391 front_.store( front, memory_model::memory_order_release );
395 /// Batch pop - push array \p arr of size \p count with assignment as copy functor
397 This function is equivalent for:
399 pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
402 The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
405 Returns \p true if success or \p false if not enought sufficient space in the ring
407 template <typename Q>
408 typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
409 pop( Q* arr, size_t count )
411 return pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
414 /// Dequeues an element from the ring to \p val
416 The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
419 Returns \p false if the ring is full or \p true otherwise.
421 template <typename Q>
422 typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
425 return pop( &val, 1 );
428 /// Synonym for \p dequeue( Q& )
429 template <typename Q>
430 typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
433 return dequeue( val );
436 /// Dequeues a value using a functor
438 \p Func is a functor called to copy dequeued value.
439 The functor takes one argument - a reference to removed node:
441 cds:container::WeakRingBuffer< Foo > myRing;
443 myRing.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
446 Returns \p true if the ring is not empty, \p false otherwise.
447 The functor is called only if the ring is not empty.
449 template <typename Func>
450 bool dequeue_with( Func f )
452 size_t front = front_.load( memory_model::memory_order_relaxed );
453 assert( cback_ - front < capacity() );
455 if ( cback_ - front < 1 ) {
456 cback_ = back_.load( memory_model::memory_order_acquire );
457 if ( cback_ - front < 1 )
461 value_type& val = buffer_[buffer_.mod( front )];
463 value_cleaner()( val );
465 front_.store( front + 1, memory_model::memory_order_release );
469 /// Synonym for \p dequeue_with()
470 template <typename Func>
471 bool pop_with( Func f )
473 return dequeue_with( f );
476 /// Gets pointer to first element of ring buffer
478 If the ring buffer is empty, returns \p nullptr
480 The function is thread-safe since there is only one consumer.
481 Recall, \p WeakRingBuffer is single-producer/single consumer container.
485 size_t front = front_.load( memory_model::memory_order_relaxed );
486 assert( cback_ - front < capacity() );
488 if ( cback_ - front < 1 ) {
489 cback_ = back_.load( memory_model::memory_order_acquire );
490 if ( cback_ - front < 1 )
494 return &buffer_[buffer_.mod( front )];
497 /// Removes front element of ring-buffer
499 If the ring-buffer is empty, returns \p false.
500 Otherwise, pops the first element from the ring.
504 size_t front = front_.load( memory_model::memory_order_relaxed );
505 assert( cback_ - front <= capacity() );
507 if ( cback_ - front < 1 ) {
508 cback_ = back_.load( memory_model::memory_order_acquire );
509 if ( cback_ - front < 1 )
514 value_cleaner()( buffer_[buffer_.mod( front )] );
516 front_.store( front + 1, memory_model::memory_order_release );
520 /// Clears the ring buffer
528 /// Checks if the ring-buffer is empty
531 return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
534 /// Checks if the ring-buffer is full
537 return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
540 /// Returns the current size of ring buffer
543 return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
546 /// Returns capacity of the ring buffer
547 size_t capacity() const
549 return buffer_.capacity();
554 atomics::atomic<size_t> front_;
555 typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
556 atomics::atomic<size_t> back_;
557 typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
559 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
561 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
567 }} // namespace cds::container
570 #endif // #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H