2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
32 #define CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
40 namespace cds { namespace container {
42 /// \p WeakRingBuffer related definitions
43 /** @ingroup cds_nonintrusive_helper
45 namespace weak_ringbuffer {
47 /// \p WeakRingBuffer default traits
49 /// Buffer type for internal array
51 The type of element for the buffer is not important: \p WeakRingBuffer rebind
52 the buffer for required type via \p rebind metafunction.
54 For \p WeakRingBuffer the buffer size should have power-of-2 size.
56 You should use only uninitialized buffer for the ring buffer -
57 \p cds::opt::v::uninitialized_dynamic_buffer (the default),
58 \p cds::opt::v::uninitialized_static_buffer.
60 typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
62 /// A functor to clean item dequeued.
64 The functor calls the destructor for popped element.
65 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
66 If \p T is a complex type, \p value_cleaner may be useful feature.
67 For POD types \ref opt::v::empty_cleaner is suitable
69 Default value is \ref opt::v::auto_cleaner that calls destructor only if it is not trivial.
71 typedef cds::opt::v::auto_cleaner value_cleaner;
73 /// C++ memory ordering model
75 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
76 or \p opt::v::sequential_consistent (sequentially consistent memory model).
78 typedef opt::v::relaxed_ordering memory_model;
80 /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
81 enum { padding = opt::cache_line_padding };
84 /// Metafunction converting option list to \p weak_ringbuffer::traits
86 Supported \p Options are:
87 - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
88 \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
89 element in the buffer is not important: it will be changed via \p rebind metafunction.
90 - \p opt::value_cleaner - a functor to clean items dequeued.
91 The functor calls the destructor for ring-buffer item.
92 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
93 If \p T is a complex type, \p value_cleaner can be an useful feature.
94 Default value is \ref opt::v::empty_cleaner that is suitable for POD types.
95 - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
96 - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
97 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
99 Example: declare \p %WeakRingBuffer with static iternal buffer for 1024 objects:
101 typedef cds::container::WeakRingBuffer< Foo,
102 typename cds::container::weak_ringbuffer::make_traits<
103 cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >
108 template <typename... Options>
110 # ifdef CDS_DOXYGEN_INVOKED
111 typedef implementation_defined type; ///< Metafunction result
113 typedef typename cds::opt::make_options<
114 typename cds::opt::find_type_traits< traits, Options... >::type
120 } // namespace weak_ringbuffer
122 /// Single-producer single-consumer ring buffer
123 /** @ingroup cds_nonintrusive_queue
124 Source: [2013] Nhat Minh Le, Adrien Guatto, Albert Cohen, Antoniu Pop. Correct and Effcient Bounded
125 FIFO Queues. [Research Report] RR-8365, INRIA. 2013. <hal-00862450>
127 Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
128 you can push/pop an array of elements.
130 There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer<void, Traits>"
131 that is not a queue but a "memory pool" between producer and consumer threads.
132 \p WeakRingBuffer<void> supports variable-sized data.
134 @warning: \p %WeakRingBuffer is developed for 64-bit architecture.
135 On 32-bit platform an integer overflow of internal counters is possible.
137 template <typename T, typename Traits = weak_ringbuffer::traits>
138 class WeakRingBuffer: public cds::bounded_container
141 typedef T value_type; ///< Value type to be stored in the ring buffer
142 typedef Traits traits; ///< Ring buffer traits
143 typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option
144 typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p weak_ringbuffer::traits::value_cleaner
146 /// Rebind template arguments
147 template <typename T2, typename Traits2>
149 typedef WeakRingBuffer< T2, Traits2 > other; ///< Rebinding result
154 typedef size_t item_counter;
159 typedef typename traits::buffer::template rebind< value_type >::other buffer;
164 /// Creates the ring buffer of \p capacity
166 For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
168 If the buffer capacity is a power of two, lightweight binary arithmetics is used
169 instead of modulo arithmetics.
171 WeakRingBuffer( size_t capacity = 0 )
175 , buffer_( capacity )
177 back_.store( 0, memory_model::memory_order_release );
180 /// Destroys the ring buffer
183 value_cleaner cleaner;
184 size_t back = back_.load( memory_model::memory_order_relaxed );
185 for ( size_t front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
186 cleaner( buffer_[ buffer_.mod( front ) ] );
189 /// Batch push - push array \p arr of size \p count
191 \p CopyFunc is a per-element copy functor: for each element of \p arr
192 <tt>copy( dest, arr[i] )</tt> is called.
193 The \p CopyFunc signature:
195 void copy_func( value_type& element, Q const& source );
197 Here \p element is uninitialized so you should construct it using placement new
198 if needed; for example, if the element type is \p str::string and \p Q is <tt>char const*</tt>,
199 \p copy functor can be:
201 cds::container::WeakRingBuffer<std::string> ringbuf;
203 ringbuf.push( arr, 10,
204 []( std::string& element, char const* src ) {
205 new( &element ) std::string( src );
208 You may use move semantics if appropriate:
210 cds::container::WeakRingBuffer<std::string> ringbuf;
212 ringbuf.push( arr, 10,
213 []( std::string& element, std:string& src ) {
214 new( &element ) std::string( std::move( src ));
218 Returns \p true if success or \p false if not enough space in the ring
220 template <typename Q, typename CopyFunc>
221 bool push( Q* arr, size_t count, CopyFunc copy )
223 assert( count < capacity() );
224 size_t back = back_.load( memory_model::memory_order_relaxed );
226 assert( back - pfront_ <= capacity() );
228 if ( pfront_ + capacity() - back < count ) {
229 pfront_ = front_.load( memory_model::memory_order_acquire );
231 if ( pfront_ + capacity() - back < count ) {
238 for ( size_t i = 0; i < count; ++i, ++back )
239 copy( buffer_[buffer_.mod( back )], arr[i] );
241 back_.store( back, memory_model::memory_order_release );
246 /// Batch push - push array \p arr of size \p count with assignment as copy functor
248 This function is equivalent for:
250 push( arr, count, []( value_type& dest, Q const& src ) { dest = src; } );
253 The function is available only if <tt>std::is_constructible<value_type, Q>::value</tt>
256 Returns \p true if success or \p false if not enough space in the ring
258 template <typename Q>
259 typename std::enable_if< std::is_constructible<value_type, Q>::value, bool>::type
260 push( Q* arr, size_t count )
262 return push( arr, count, []( value_type& dest, Q const& src ) { new( &dest ) value_type( src ); } );
265 /// Push one element created from \p args
267 The function is available only if <tt>std::is_constructible<value_type, Args...>::value</tt>
270 Returns \p false if the ring is full or \p true otherwise.
272 template <typename... Args>
273 typename std::enable_if< std::is_constructible<value_type, Args...>::value, bool>::type
274 emplace( Args&&... args )
276 size_t back = back_.load( memory_model::memory_order_relaxed );
278 assert( back - pfront_ <= capacity() );
280 if ( pfront_ + capacity() - back < 1 ) {
281 pfront_ = front_.load( memory_model::memory_order_acquire );
283 if ( pfront_ + capacity() - back < 1 ) {
289 new( &buffer_[buffer_.mod( back )] ) value_type( std::forward<Args>(args)... );
291 back_.store( back + 1, memory_model::memory_order_release );
296 /// Enqueues data to the ring using a functor
298 \p Func is a functor called to copy a value to the ring element.
299 The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
301 cds::container::WeakRingBuffer< Foo > myRing;
303 myRing.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
306 template <typename Func>
307 bool enqueue_with( Func f )
309 size_t back = back_.load( memory_model::memory_order_relaxed );
311 assert( back - pfront_ <= capacity() );
313 if ( pfront_ + capacity() - back < 1 ) {
314 pfront_ = front_.load( memory_model::memory_order_acquire );
316 if ( pfront_ + capacity() - back < 1 ) {
322 f( buffer_[buffer_.mod( back )] );
324 back_.store( back + 1, memory_model::memory_order_release );
330 /// Enqueues \p val value into the queue.
332 The new queue item is created by calling placement new in free cell.
333 Returns \p true if success, \p false if the ring is full.
335 bool enqueue( value_type const& val )
337 return emplace( val );
340 /// Enqueues \p val value into the queue, move semantics
341 bool enqueue( value_type&& val )
343 return emplace( std::move( val ));
346 /// Synonym for \p enqueue( value_type const& )
347 bool push( value_type const& val )
349 return enqueue( val );
352 /// Synonym for \p enqueue( value_type&& )
353 bool push( value_type&& val )
355 return enqueue( std::move( val ));
358 /// Synonym for \p enqueue_with()
359 template <typename Func>
360 bool push_with( Func f )
362 return enqueue_with( f );
365 /// Batch pop \p count element from the ring buffer into \p arr
367 \p CopyFunc is a per-element copy functor: for each element of \p arr
368 <tt>copy( arr[i], source )</tt> is called.
369 The \p CopyFunc signature:
371 void copy_func( Q& dest, value_type& elemen );
374 Returns \p true if success or \p false if not enough space in the ring
376 template <typename Q, typename CopyFunc>
377 bool pop( Q* arr, size_t count, CopyFunc copy )
379 assert( count < capacity() );
381 size_t front = front_.load( memory_model::memory_order_relaxed );
382 assert( cback_ - front < capacity() );
384 if ( cback_ - front < count ) {
385 cback_ = back_.load( memory_model::memory_order_acquire );
386 if ( cback_ - front < count )
391 value_cleaner cleaner;
392 for ( size_t i = 0; i < count; ++i, ++front ) {
393 value_type& val = buffer_[buffer_.mod( front )];
398 front_.store( front, memory_model::memory_order_release );
402 /// Batch pop - push array \p arr of size \p count with assignment as copy functor
404 This function is equivalent for:
406 pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
409 The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
412 Returns \p true if success or \p false if not enough space in the ring
414 template <typename Q>
415 typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
416 pop( Q* arr, size_t count )
418 return pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
421 /// Dequeues an element from the ring to \p val
423 The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
426 Returns \p false if the ring is full or \p true otherwise.
428 template <typename Q>
429 typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
432 return pop( &val, 1 );
435 /// Synonym for \p dequeue( Q& )
436 template <typename Q>
437 typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
440 return dequeue( val );
443 /// Dequeues a value using a functor
445 \p Func is a functor called to copy dequeued value.
446 The functor takes one argument - a reference to removed node:
448 cds:container::WeakRingBuffer< Foo > myRing;
450 myRing.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
453 Returns \p true if the ring is not empty, \p false otherwise.
454 The functor is called only if the ring is not empty.
456 template <typename Func>
457 bool dequeue_with( Func f )
459 size_t front = front_.load( memory_model::memory_order_relaxed );
460 assert( cback_ - front < capacity() );
462 if ( cback_ - front < 1 ) {
463 cback_ = back_.load( memory_model::memory_order_acquire );
464 if ( cback_ - front < 1 )
468 value_type& val = buffer_[buffer_.mod( front )];
470 value_cleaner()( val );
472 front_.store( front + 1, memory_model::memory_order_release );
476 /// Synonym for \p dequeue_with()
477 template <typename Func>
478 bool pop_with( Func f )
480 return dequeue_with( f );
483 /// Gets pointer to first element of ring buffer
485 If the ring buffer is empty, returns \p nullptr
487 The function is thread-safe since there is only one consumer.
488 Recall, \p WeakRingBuffer is single-producer/single consumer container.
492 size_t front = front_.load( memory_model::memory_order_relaxed );
493 assert( cback_ - front < capacity() );
495 if ( cback_ - front < 1 ) {
496 cback_ = back_.load( memory_model::memory_order_acquire );
497 if ( cback_ - front < 1 )
501 return &buffer_[buffer_.mod( front )];
504 /// Removes front element of ring-buffer
506 If the ring-buffer is empty, returns \p false.
507 Otherwise, pops the first element from the ring.
511 size_t front = front_.load( memory_model::memory_order_relaxed );
512 assert( cback_ - front <= capacity() );
514 if ( cback_ - front < 1 ) {
515 cback_ = back_.load( memory_model::memory_order_acquire );
516 if ( cback_ - front < 1 )
521 value_cleaner()( buffer_[buffer_.mod( front )] );
523 front_.store( front + 1, memory_model::memory_order_release );
527 /// Clears the ring buffer (only consumer can call this function!)
534 /// Checks if the ring-buffer is empty
537 return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
540 /// Checks if the ring-buffer is full
543 return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
546 /// Returns the current size of ring buffer
549 return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
552 /// Returns capacity of the ring buffer
553 size_t capacity() const
555 return buffer_.capacity();
560 atomics::atomic<size_t> front_;
561 typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
562 atomics::atomic<size_t> back_;
563 typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
565 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
567 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
574 /// Single-producer single-consumer ring buffer for untyped variable-sized data
575 /** @ingroup cds_nonintrusive_queue
576 @anchor cds_nonintrusive_WeakRingBuffer_void
578 This SPSC ring-buffer is intended for data of variable size. The producer
579 allocates a buffer from ring, you fill it with data and pushes them back to ring.
580 The consumer thread reads data from front-end and then pops them:
582 // allocates 1M ring buffer
583 WeakRingBuffer<void> theRing( 1024 * 1024 );
585 void producer_thread()
587 // Get data of size N bytes
593 std::tie( data, size ) = get_data();
595 if ( data == nullptr )
598 // Allocates a buffer from the ring
599 void* buf = theRing.back( size );
601 std::cout << "The ring is full" << std::endl;
605 memcpy( buf, data, size );
607 // Push data into the ring
612 void consumer_thread()
615 auto buf = theRing.front();
617 if ( buf.first == nullptr ) {
618 std::cout << "The ring is empty" << std::endl;
623 process_data( buf.first, buf.second );
631 @warning: \p %WeakRingBuffer is developed for 64-bit architecture.
632 On 32-bit platform an integer overflow of internal counters is possible.
634 #ifdef CDS_DOXYGEN_INVOKED
635 template <typename Traits = weak_ringbuffer::traits>
637 template <typename Traits>
639 class WeakRingBuffer<void, Traits>: public cds::bounded_container
642 typedef Traits traits; ///< Ring buffer traits
643 typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option
647 typedef typename traits::buffer::template rebind< uint8_t >::other buffer;
651 /// Creates the ring buffer of \p capacity bytes
653 For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
655 If the buffer capacity is a power of two, lightweight binary arithmetics is used
656 instead of modulo arithmetics.
658 WeakRingBuffer( size_t capacity = 0 )
662 , buffer_( capacity )
664 back_.store( 0, memory_model::memory_order_release );
667 /// [producer] Reserve \p size bytes
669 The function returns a pointer to reserved buffer of \p size bytes.
670 If no enough space in the ring buffer the function returns \p nullptr.
672 After successful \p %back() you should fill the buffer provided and call \p push_back():
674 // allocates 1M ring buffer
675 WeakRingBuffer<void> theRing( 1024 * 1024 );
677 void producer_thread()
679 // Get data of size N bytes
685 std::tie( data, size ) = get_data();
687 if ( data == nullptr )
690 // Allocates a buffer from the ring
691 void* buf = theRing.back( size );
693 std::cout << "The ring is full" << std::endl;
697 memcpy( buf, data, size );
699 // Push data into the ring
705 void* back( size_t size )
709 // Any data is rounded to 8-byte boundary
710 size_t real_size = calc_real_size( size );
712 // check if we can reserve read_size bytes
713 assert( real_size < capacity() );
714 size_t back = back_.load( memory_model::memory_order_relaxed );
716 assert( back - pfront_ <= capacity() );
718 if ( pfront_ + capacity() - back < real_size ) {
719 pfront_ = front_.load( memory_model::memory_order_acquire );
721 if ( pfront_ + capacity() - back < real_size ) {
727 uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
729 // Check if the buffer free space is enough for storing real_size bytes
730 size_t tail_size = capacity() - buffer_.mod( back );
731 if ( tail_size < real_size ) {
733 assert( tail_size >= sizeof( size_t ) );
734 assert( !is_tail( tail_size ) );
736 *reinterpret_cast<size_t*>( reserved ) = make_tail( tail_size - sizeof(size_t));
739 // We must be in beginning of buffer
740 assert( buffer_.mod( back ) == 0 );
742 if ( pfront_ + capacity() - back < real_size ) {
743 pfront_ = front_.load( memory_model::memory_order_acquire );
745 if ( pfront_ + capacity() - back < real_size ) {
751 back_.store( back, memory_model::memory_order_release );
752 reserved = buffer_.buffer();
755 // reserve and store size
756 *reinterpret_cast<size_t*>( reserved ) = size;
758 return reinterpret_cast<void*>( reserved + sizeof( size_t ) );
761 /// [producer] Push reserved bytes into ring
763 The function pushes reserved buffer into the ring. Afte this call,
764 the buffer becomes visible by a consumer:
766 // allocates 1M ring buffer
767 WeakRingBuffer<void> theRing( 1024 * 1024 );
769 void producer_thread()
771 // Get data of size N bytes
777 std::tie( data, size ) = get_data();
779 if ( data == nullptr )
782 // Allocates a buffer from the ring
783 void* buf = theRing.back( size );
785 std::cout << "The ring is full" << std::endl;
789 memcpy( buf, data, size );
791 // Push data into the ring
799 size_t back = back_.load( memory_model::memory_order_relaxed );
800 uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
802 size_t real_size = calc_real_size( *reinterpret_cast<size_t*>( reserved ) );
803 assert( real_size < capacity() );
805 back_.store( back + real_size, memory_model::memory_order_release );
808 /// [producer] Push \p data of \p size bytes into ring
810 This function invokes \p back( size ), \p memcpy( buf, data, size )
811 and \p push_back() in one call.
813 bool push_back( void const* data, size_t size )
815 void* buf = back( size );
817 memcpy( buf, data, size );
824 /// [consumer] Get top data from the ring
826 If the ring is empty, the function returns \p nullptr in \p std:pair::first.
828 std::pair<void*, size_t> front()
830 size_t front = front_.load( memory_model::memory_order_relaxed );
831 assert( cback_ - front < capacity() );
833 if ( cback_ - front < sizeof( size_t )) {
834 cback_ = back_.load( memory_model::memory_order_acquire );
835 if ( cback_ - front < sizeof( size_t ) )
836 return std::make_pair( nullptr, 0u );
839 uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
842 assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
844 size_t size = *reinterpret_cast<size_t*>( buf );
845 if ( is_tail( size ) ) {
847 CDS_VERIFY( pop_front() );
849 front = front_.load( memory_model::memory_order_relaxed );
850 buf = buffer_.buffer() + buffer_.mod( front );
851 size = *reinterpret_cast<size_t*>( buf );
853 assert( !is_tail( size ) );
854 assert( buf == buffer_.buffer() );
858 size_t real_size = calc_real_size( size );
859 if ( cback_ - front < real_size ) {
860 cback_ = back_.load( memory_model::memory_order_acquire );
861 assert( cback_ - front >= real_size );
865 return std::make_pair( reinterpret_cast<void*>( buf + sizeof( size_t )), size );
868 /// [consumer] Pops top data
870 Typical consumer workloop:
872 // allocates 1M ring buffer
873 WeakRingBuffer<void> theRing( 1024 * 1024 );
875 void consumer_thread()
878 auto buf = theRing.front();
880 if ( buf.first == nullptr ) {
881 std::cout << "The ring is empty" << std::endl;
886 process_data( buf.first, buf.second );
896 size_t front = front_.load( memory_model::memory_order_relaxed );
897 assert( cback_ - front <= capacity() );
899 if ( cback_ - front < sizeof(size_t) ) {
900 cback_ = back_.load( memory_model::memory_order_acquire );
901 if ( cback_ - front < sizeof( size_t ) )
905 uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
908 assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
910 size_t size = *reinterpret_cast<size_t*>( buf );
911 size_t real_size = calc_real_size( untail( size ));
914 if ( cback_ - front < real_size ) {
915 cback_ = back_.load( memory_model::memory_order_acquire );
916 assert( cback_ - front >= real_size );
920 front_.store( front + real_size, memory_model::memory_order_release );
925 /// [consumer] Clears the ring buffer
928 for ( auto el = front(); el.first; el = front() )
932 /// Checks if the ring-buffer is empty
935 return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
938 /// Checks if the ring-buffer is full
941 return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
944 /// Returns the current size of ring buffer
947 return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
950 /// Returns capacity of the ring buffer
951 size_t capacity() const
953 return buffer_.capacity();
958 static size_t calc_real_size( size_t size )
960 size_t real_size = (( size + sizeof( uintptr_t ) - 1 ) & ~( sizeof( uintptr_t ) - 1 )) + sizeof( size_t );
962 assert( real_size > size );
963 assert( real_size - size >= sizeof( size_t ) );
968 static bool is_tail( size_t size )
970 return ( size & ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ))) != 0;
973 static size_t make_tail( size_t size )
975 return size | ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ));
978 static size_t untail( size_t size )
980 return size & (( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ) ) - 1);
986 atomics::atomic<size_t> front_;
987 typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
988 atomics::atomic<size_t> back_;
989 typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
991 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
993 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
999 }} // namespace cds::container
1002 #endif // #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H