Added prototype for untyped variable-sized WeakRingBuffer<void> (not tested yet,...
[libcds.git] / cds / container / weak_ringbuffer.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
32 #define CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
33
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
39
40 namespace cds { namespace container {
41
42     /// \p WeakRingBuffer related definitions
43     /** @ingroup cds_nonintrusive_helper
44     */
45     namespace weak_ringbuffer {
46
47         /// \p WeakRingBuffer default traits
48         struct traits {
49             /// Buffer type for internal array
50             /*
51                 The type of element for the buffer is not important: \p WeakRingBuffer rebind
52                 the buffer for required type via \p rebind metafunction.
53
54                 For \p WeakRingBuffer the buffer size should have power-of-2 size.
55
56                 You should use only uninitialized buffer for the ring buffer -
57                 \p cds::opt::v::uninitialized_dynamic_buffer (the default),
58                 \p cds::opt::v::uninitialized_static_buffer.
59             */
60             typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
61
62             /// A functor to clean item dequeued.
63             /**
64                 The functor calls the destructor for popped element.
65                 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
66                 If \p T is a complex type, \p value_cleaner may be useful feature.
67                 For POD types \ref opt::v::empty_cleaner is suitable
68
69                 Default value is \ref opt::v::auto_cleaner that calls destructor only if it is not trivial.
70             */
71             typedef cds::opt::v::auto_cleaner value_cleaner;
72
73             /// C++ memory ordering model
74             /**
75                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
76                 or \p opt::v::sequential_consistent (sequentially consistent memory model).
77             */
78             typedef opt::v::relaxed_ordering    memory_model;
79
80             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
81             enum { padding = opt::cache_line_padding };
82         };
83
84         /// Metafunction converting option list to \p weak_ringbuffer::traits
85         /**
86             Supported \p Options are:
87             - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
88                 \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
89                 element in the buffer is not important: it will be changed via \p rebind metafunction.
90             - \p opt::value_cleaner - a functor to clean items dequeued.
91                 The functor calls the destructor for ring-buffer item.
92                 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
93                 If \p T is a complex type, \p value_cleaner can be an useful feature.
94                 Default value is \ref opt::v::empty_cleaner that is suitable for POD types.
95             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
96             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
97                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
98
99             Example: declare \p %WeakRingBuffer with static iternal buffer of size 1024:
100             \code
101             typedef cds::container::WeakRingBuffer< Foo,
102                 typename cds::container::weak_ringbuffer::make_traits<
103                     cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >
104                 >::type
105             > myRing;
106             \endcode
107         */
108         template <typename... Options>
109         struct make_traits {
110 #   ifdef CDS_DOXYGEN_INVOKED
111             typedef implementation_defined type;   ///< Metafunction result
112 #   else
113             typedef typename cds::opt::make_options<
114                 typename cds::opt::find_type_traits< traits, Options... >::type
115                 , Options...
116             >::type type;
117 #   endif
118         };
119
120     } // namespace weak_ringbuffer
121
122     /// Single-producer single-consumer ring buffer
123     /** @ingroup cds_nonintrusive_queue
124         Source: [2013] Nhat Minh Le, Adrien Guatto, Albert Cohen, Antoniu Pop. Correct and Effcient Bounded
125             FIFO Queues. [Research Report] RR-8365, INRIA. 2013. <hal-00862450>
126
127         Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
128         you can push/pop an array of elements.
129
130         There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer<void, Traits>" 
131         that is not a queue but a "memory pool" between producer and consumer threads. 
132         \p WeakRingBuffer<void> supports data of different size.
133     */
134     template <typename T, typename Traits = weak_ringbuffer::traits>
135     class WeakRingBuffer: public cds::bounded_container
136     {
137     public:
138         typedef T value_type;   ///< Value type to be stored in the ring buffer
139         typedef Traits traits;  ///< Ring buffer traits
140         typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
141         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p weak_ringbuffer::traits::value_cleaner
142
143         /// Rebind template arguments
144         template <typename T2, typename Traits2>
145         struct rebind {
146             typedef WeakRingBuffer< T2, Traits2 > other;   ///< Rebinding result
147         };
148
149         //@cond
150         // Only for tests
151         typedef size_t item_counter;
152         //@endcond
153
154     private:
155         //@cond
156         typedef typename traits::buffer::template rebind< value_type >::other buffer;
157         //@endcond
158
159     public:
160
161         /// Creates the ring buffer of \p capacity
162         /**
163             For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
164
165             If the buffer capacity is a power of two, lightweight binary arithmetics is used
166             instead of modulo arithmetics.
167         */
168         WeakRingBuffer( size_t capacity = 0 )
169             : front_( 0 )
170             , pfront_( 0 )
171             , cback_( 0 )
172             , buffer_( capacity )
173         {
174             back_.store( 0, memory_model::memory_order_release );
175         }
176
177         /// Destroys the ring buffer
178         ~WeakRingBuffer()
179         {
180             value_cleaner cleaner;
181             size_t back = back_.load( memory_model::memory_order_relaxed );
182             for ( size_t front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
183                 cleaner( buffer_[ buffer_.mod( front ) ] );
184         }
185
186         /// Batch push - push array \p arr of size \p count
187         /**
188             \p CopyFunc is a per-element copy functor: for each element of \p arr
189             <tt>copy( dest, arr[i] )</tt> is called.
190             The \p CopyFunc signature:
191             \code
192                 void copy_func( value_type& element, Q const& source );
193             \endcode
194             Here \p element is uninitialized so you should construct it using placement new
195             if needed; for example, if the element type is \p str::string and \p Q is <tt>char const*</tt>,
196             \p copy functor can be:
197             \code
198             cds::container::WeakRingBuffer<std::string> ringbuf;
199             char const* arr[10];
200             ringbuf.push( arr, 10, 
201                 []( std::string& element, char const* src ) {
202                     new( &element ) std::string( src );
203                 });
204             \endcode
205             You may use move semantics if appropriate:
206             \code
207             cds::container::WeakRingBuffer<std::string> ringbuf;
208             std::string arr[10];
209             ringbuf.push( arr, 10,
210                 []( std::string& element, std:string& src ) {
211                     new( &element ) std::string( std::move( src ));
212                 });
213             \endcode
214
215             Returns \p true if success or \p false if not enough space in the ring
216         */
217         template <typename Q, typename CopyFunc>
218         bool push( Q* arr, size_t count, CopyFunc copy )
219         {
220             assert( count < capacity() );
221             size_t back = back_.load( memory_model::memory_order_relaxed );
222
223             assert( back - pfront_ <= capacity() );
224
225             if ( pfront_ + capacity() - back < count ) {
226                 pfront_ = front_.load( memory_model::memory_order_acquire );
227
228                 if ( pfront_ + capacity() - back < count ) {
229                     // not enough space
230                     return false;
231                 }
232             }
233
234             // copy data
235             for ( size_t i = 0; i < count; ++i, ++back )
236                 copy( buffer_[buffer_.mod( back )], arr[i] );
237
238             back_.store( back, memory_model::memory_order_release );
239
240             return true;
241         }
242
243         /// Batch push - push array \p arr of size \p count with assignment as copy functor
244         /**
245             This function is equivalent for:
246             \code
247             push( arr, count, []( value_type& dest, Q const& src ) { dest = src; } );
248             \endcode
249
250             The function is available only if <tt>std::is_constructible<value_type, Q>::value</tt>
251             is \p true.
252
253             Returns \p true if success or \p false if not enough space in the ring
254         */
255         template <typename Q>
256         typename std::enable_if< std::is_constructible<value_type, Q>::value, bool>::type
257         push( Q* arr, size_t count )
258         {
259             return push( arr, count, []( value_type& dest, Q const& src ) { new( &dest ) value_type( src ); } );
260         }
261
262         /// Push one element created from \p args
263         /**
264             The function is available only if <tt>std::is_constructible<value_type, Args...>::value</tt>
265             is \p true.
266
267             Returns \p false if the ring is full or \p true otherwise.
268         */
269         template <typename... Args>
270         typename std::enable_if< std::is_constructible<value_type, Args...>::value, bool>::type
271         emplace( Args&&... args )
272         {
273             size_t back = back_.load( memory_model::memory_order_relaxed );
274
275             assert( back - pfront_ <= capacity() );
276
277             if ( pfront_ + capacity() - back < 1 ) {
278                 pfront_ = front_.load( memory_model::memory_order_acquire );
279
280                 if ( pfront_ + capacity() - back < 1 ) {
281                     // not enough space
282                     return false;
283                 }
284             }
285
286             new( &buffer_[buffer_.mod( back )] ) value_type( std::forward<Args>(args)... );
287
288             back_.store( back + 1, memory_model::memory_order_release );
289
290             return true;
291         }
292
293         /// Enqueues data to the ring using a functor
294         /**
295             \p Func is a functor called to copy a value to the ring element.
296             The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
297             \code
298             cds::container::WeakRingBuffer< Foo > myRing;
299             Bar bar;
300             myRing.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
301             \endcode
302         */
303         template <typename Func>
304         bool enqueue_with( Func f )
305         {
306             size_t back = back_.load( memory_model::memory_order_relaxed );
307
308             assert( back - pfront_ <= capacity() );
309
310             if ( pfront_ + capacity() - back < 1 ) {
311                 pfront_ = front_.load( memory_model::memory_order_acquire );
312
313                 if ( pfront_ + capacity() - back < 1 ) {
314                     // not enough space
315                     return false;
316                 }
317             }
318
319             f( buffer_[buffer_.mod( back )] );
320
321             back_.store( back + 1, memory_model::memory_order_release );
322
323             return true;
324
325         }
326
327         /// Enqueues \p val value into the queue.
328         /**
329             The new queue item is created by calling placement new in free cell.
330             Returns \p true if success, \p false if the ring is full.
331         */
332         bool enqueue( value_type const& val )
333         {
334             return emplace( val );
335         }
336
337         /// Enqueues \p val value into the queue, move semantics
338         bool enqueue( value_type&& val )
339         {
340             return emplace( std::move( val ));
341         }
342
343         /// Synonym for \p enqueue( value_type const& )
344         bool push( value_type const& val )
345         {
346             return enqueue( val );
347         }
348
349         /// Synonym for \p enqueue( value_type&& )
350         bool push( value_type&& val )
351         {
352             return enqueue( std::move( val ));
353         }
354
355         /// Synonym for \p enqueue_with()
356         template <typename Func>
357         bool push_with( Func f )
358         {
359             return enqueue_with( f );
360         }
361
362         /// Batch pop \p count element from the ring buffer into \p arr
363         /**
364             \p CopyFunc is a per-element copy functor: for each element of \p arr
365             <tt>copy( arr[i], source )</tt> is called.
366             The \p CopyFunc signature:
367             \code
368             void copy_func( Q& dest, value_type& elemen );
369             \endcode
370
371             Returns \p true if success or \p false if not enough space in the ring
372         */
373         template <typename Q, typename CopyFunc>
374         bool pop( Q* arr, size_t count, CopyFunc copy )
375         {
376             assert( count < capacity() );
377
378             size_t front = front_.load( memory_model::memory_order_relaxed );
379             assert( cback_ - front < capacity() );
380
381             if ( cback_ - front < count ) {
382                 cback_ = back_.load( memory_model::memory_order_acquire );
383                 if ( cback_ - front < count )
384                     return false;
385             }
386
387             // copy data
388             value_cleaner cleaner;
389             for ( size_t i = 0; i < count; ++i, ++front ) {
390                 value_type& val = buffer_[buffer_.mod( front )];
391                 copy( arr[i], val );
392                 cleaner( val );
393             }
394
395             front_.store( front, memory_model::memory_order_release );
396             return true;
397         }
398
399         /// Batch pop - push array \p arr of size \p count with assignment as copy functor
400         /**
401             This function is equivalent for:
402             \code
403             pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
404             \endcode
405
406             The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
407             is \p true.
408
409             Returns \p true if success or \p false if not enough space in the ring
410         */
411         template <typename Q>
412         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
413         pop( Q* arr, size_t count )
414         {
415             return pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
416         }
417
418         /// Dequeues an element from the ring to \p val
419         /**
420             The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
421             is \p true.
422
423             Returns \p false if the ring is full or \p true otherwise.
424         */
425         template <typename Q>
426         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
427         dequeue( Q& val )
428         {
429             return pop( &val, 1 );
430         }
431
432         /// Synonym for \p dequeue( Q& )
433         template <typename Q>
434         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
435         pop( Q& val )
436         {
437             return dequeue( val );
438         }
439
440         /// Dequeues a value using a functor
441         /**
442             \p Func is a functor called to copy dequeued value.
443             The functor takes one argument - a reference to removed node:
444             \code
445             cds:container::WeakRingBuffer< Foo > myRing;
446             Bar bar;
447             myRing.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
448             \endcode
449
450             Returns \p true if the ring is not empty, \p false otherwise.
451             The functor is called only if the ring is not empty.
452         */
453         template <typename Func>
454         bool dequeue_with( Func f )
455         {
456             size_t front = front_.load( memory_model::memory_order_relaxed );
457             assert( cback_ - front < capacity() );
458
459             if ( cback_ - front < 1 ) {
460                 cback_ = back_.load( memory_model::memory_order_acquire );
461                 if ( cback_ - front < 1 )
462                     return false;
463             }
464
465             value_type& val = buffer_[buffer_.mod( front )];
466             f( val );
467             value_cleaner()( val );
468
469             front_.store( front + 1, memory_model::memory_order_release );
470             return true;
471         }
472
473         /// Synonym for \p dequeue_with()
474         template <typename Func>
475         bool pop_with( Func f )
476         {
477             return dequeue_with( f );
478         }
479
480         /// Gets pointer to first element of ring buffer
481         /**
482             If the ring buffer is empty, returns \p nullptr
483
484             The function is thread-safe since there is only one consumer.
485             Recall, \p WeakRingBuffer is single-producer/single consumer container.
486         */
487         value_type* front()
488         {
489             size_t front = front_.load( memory_model::memory_order_relaxed );
490             assert( cback_ - front < capacity() );
491
492             if ( cback_ - front < 1 ) {
493                 cback_ = back_.load( memory_model::memory_order_acquire );
494                 if ( cback_ - front < 1 )
495                     return nullptr;
496             }
497
498             return &buffer_[buffer_.mod( front )];
499         }
500
501         /// Removes front element of ring-buffer
502         /**
503             If the ring-buffer is empty, returns \p false.
504             Otherwise, pops the first element from the ring.
505         */
506         bool pop_front()
507         {
508             size_t front = front_.load( memory_model::memory_order_relaxed );
509             assert( cback_ - front <= capacity() );
510
511             if ( cback_ - front < 1 ) {
512                 cback_ = back_.load( memory_model::memory_order_acquire );
513                 if ( cback_ - front < 1 )
514                     return false;
515             }
516
517             // clean cell
518             value_cleaner()( buffer_[buffer_.mod( front )] );
519
520             front_.store( front + 1, memory_model::memory_order_release );
521             return true;
522         }
523
524         /// Clears the ring buffer (only consumer can call this function!)
525         void clear()
526         {
527             value_type v;
528             while ( pop( v ) );
529         }
530
531         /// Checks if the ring-buffer is empty
532         bool empty() const
533         {
534             return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
535         }
536
537         /// Checks if the ring-buffer is full
538         bool full() const
539         {
540             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
541         }
542
543         /// Returns the current size of ring buffer
544         size_t size() const
545         {
546             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
547         }
548
549         /// Returns capacity of the ring buffer
550         size_t capacity() const
551         {
552             return buffer_.capacity();
553         }
554
555     private:
556         //@cond
557         atomics::atomic<size_t>     front_;
558         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
559         atomics::atomic<size_t>     back_;
560         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
561         size_t                      pfront_;
562         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
563         size_t                      cback_;
564         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
565
566         buffer                      buffer_;
567         //@endcond
568     };
569
570
571     /// Single-producer single-consumer ring buffer for untyped variable-sized data
572     /** @ingroup cds_nonintrusive_queue
573         @anchor cds_nonintrusive_WeakRingBuffer_void
574     */
575     template <typename Traits = weak_ringbuffer::traits>
576     class WeakRingBuffer<void, Traits>: public cds::bounded_container
577     {
578     public:
579         typedef Traits      traits;         ///< Ring buffer traits
580         typedef typename    traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
581
582     private:
583         //@cond
584         typedef typename traits::buffer::template rebind< uint8_t >::other buffer;
585         //@endcond
586
587     public:
588         /// Creates the ring buffer of \p capacity bytes
589         /**
590             For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
591
592             If the buffer capacity is a power of two, lightweight binary arithmetics is used
593             instead of modulo arithmetics.
594         */
595         WeakRingBuffer( size_t capacity = 0 )
596             : front_( 0 )
597             , pfront_( 0 )
598             , cback_( 0 )
599             , buffer_( capacity )
600         {
601             back_.store( 0, memory_model::memory_order_release );
602         }
603
604         /// [producer] Reserve \p size bytes
605         void* back( size_t size )
606         {
607             // Any data is rounded to 8-byte boundary
608             size_t real_size = calc_real_size( size );
609
610             // check if we can reserve read_size bytes
611             assert( real_size < capacity() );
612             size_t back = back_.load( memory_model::memory_order_relaxed );
613
614             assert( back - pfront_ <= capacity() );
615
616             if ( pfront_ + capacity() - back < real_size ) {
617                 pfront_ = front_.load( memory_model::memory_order_acquire );
618
619                 if ( pfront_ + capacity() - back < real_size ) {
620                     // not enough space
621                     return nullptr;
622                 }
623             }
624
625             uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
626
627             // Check if the buffer free space is enough for storing real_size bytes
628             size_t tail_size = capacity() - buffer_.mod( back );
629             if ( tail_size < real_size ) {
630                 // make unused tail
631                 assert( tail_size >= sizeof( size_t ) );
632                 assert( !is_tail( tail_size ) );
633
634                 *reinterpret_cast<size_t*>( reserved ) = make_tail( tail_size - sizeof(size_t));
635                 back += tail_size;
636
637                 // We must be in beginning of buffer
638                 assert( buffer_.mod( back ) == 0 );
639
640                 if ( pfront_ + capacity() - back < real_size ) {
641                     pfront_ = front_.load( memory_model::memory_order_acquire );
642
643                     if ( pfront_ + capacity() - back < real_size ) {
644                         // not enough space
645                         return nullptr;
646                     }
647                 }
648
649                 reserved = buffer_.buffer();
650             }
651
652             // reserve and store size
653             uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
654             *reinterpret_cast<size_t*>( reserved ) = size;
655
656             return reinterpret_cast<void*>( reserved + sizeof( size_t ) );
657         }
658
659         /// [producer] Push reserved bytes into ring
660         void push_back()
661         {
662             size_t back = back_.load( memory_model::memory_order_relaxed );
663             uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
664
665             size_t real_size = calc_real_size( *reinterpret_cast<size_t*>( reserved ) );
666             assert( real_size < capacity() );
667
668             back_.store( back + real_size, memory_model::memory_order_release );
669         }
670
671         /// [producer] Push \p data of \p size bytes into ring
672         bool push_back( void const* data, size_t size )
673         {
674             void* buf = back( size );
675             if ( buf ) {
676                 memcpy( buf, data, size );
677                 push_back();
678                 return true;
679             }
680             return false;
681         }
682
683         /// [consumer] Get top data from the ring
684         std::pair<void*, size_t> front()
685         {
686             size_t front = front_.load( memory_model::memory_order_relaxed );
687             assert( cback_ - front < capacity() );
688
689             if ( cback_ - front < sizeof( size_t )) {
690                 cback_ = back_.load( memory_model::memory_order_acquire );
691                 if ( cback_ - front < sizeof( size_t ) )
692                     return std::make_pair( nullptr, 0u );
693             }
694
695             uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
696
697             // check alignment
698             assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
699
700             size_t size = *reinterpret_cast<size_t*>( buf );
701             if ( is_tail( size ) ) {
702                 // unused tail, skip
703                 CDS_VERIFY( pop_front() );
704
705                 front = front_.load( memory_model::memory_order_relaxed );
706                 buf = buffer_.buffer() + buffer_.mod( front );
707                 size = *reinterpret_cast<size_t*>( buf );
708
709                 assert( !is_tail( size ) );
710             }
711
712 #ifdef _DEBUG
713             size_t real_size = calc_real_size( size );
714             if ( cback_ - front < real_size ) {
715                 cback_ = back_.load( memory_model::memory_order_acquire );
716                 assert( cback_ - front >= real_size );
717             }
718 #endif
719
720             return std::make_pair( reinterpret_cast<void*>( buf + sizeof( size_t ) ), size );
721         }
722
723         /// [consumer] Pops top data
724         bool pop_front()
725         {
726             size_t front = front_.load( memory_model::memory_order_relaxed );
727             assert( cback_ - front <= capacity() );
728
729             if ( cback_ - front < sizeof(size_t) ) {
730                 cback_ = back_.load( memory_model::memory_order_acquire );
731                 if ( cback_ - front < sizeof( size_t ) )
732                     return false;
733             }
734
735             uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
736
737             // check alignment
738             assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
739
740             size_t size = *reinterpret_cast<size_t*>( buf );
741             assert( !is_tail( size ) );
742
743             size_t real_size = calc_real_size( size );
744
745 #ifdef _DEBUG
746             if ( cback_ - front < real_size ) {
747                 cback_ = back_.load( memory_model::memory_order_acquire );
748                 assert( cback_ - front >= real_size );
749             }
750 #endif
751
752             front_.store( front + real_size, memory_model::memory_order_release );
753             return true;
754
755         }
756
757         /// [consumer] Clears the ring buffer
758         void clear()
759         {
760             for ( auto el = front(); el.first; el = front() )
761                 pop_front();
762         }
763
764         /// Checks if the ring-buffer is empty
765         bool empty() const
766         {
767             return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
768         }
769
770         /// Checks if the ring-buffer is full
771         bool full() const
772         {
773             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
774         }
775
776         /// Returns the current size of ring buffer
777         size_t size() const
778         {
779             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
780         }
781
782         /// Returns capacity of the ring buffer
783         size_t capacity() const
784         {
785             return buffer_.capacity();
786         }
787
788     private:
789         static size_t calc_real_size( size_t size )
790         {
791             size_t real_size =  (( size + sizeof( uintptr_t ) - 1 ) & ~( sizeof( uintptr_t ) - 1 )) + sizeof( size_t );
792
793             assert( real_size > size );
794             assert( real_size - size >= sizeof( size_t ) );
795
796             return real_size;
797         }
798
799         static bool is_tail( size_t size )
800         {
801             return ( size & ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ))) != 0;
802         }
803
804         static size_t make_tail( size_t size )
805         {
806             return size | ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ));
807         }
808
809     private:
810         //@cond
811         atomics::atomic<size_t>     front_;
812         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
813         atomics::atomic<size_t>     back_;
814         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
815         size_t                      pfront_;
816         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
817         size_t                      cback_;
818         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
819
820         buffer                      buffer_;
821         //@endcond
822     };
823
824 }} // namespace cds::container
825
826
827 #endif // #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H