Fixed a bug
[libcds.git] / cds / container / weak_ringbuffer.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
32 #define CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
33
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
39
40 namespace cds { namespace container {
41
42     /// \p WeakRingBuffer related definitions
43     /** @ingroup cds_nonintrusive_helper
44     */
45     namespace weak_ringbuffer {
46
47         /// \p WeakRingBuffer default traits
48         struct traits {
49             /// Buffer type for internal array
50             /*
51                 The type of element for the buffer is not important: \p WeakRingBuffer rebind
52                 the buffer for required type via \p rebind metafunction.
53
54                 For \p WeakRingBuffer the buffer size should have power-of-2 size.
55
56                 You should use only uninitialized buffer for the ring buffer -
57                 \p cds::opt::v::uninitialized_dynamic_buffer (the default),
58                 \p cds::opt::v::uninitialized_static_buffer.
59             */
60             typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
61
62             /// A functor to clean item dequeued.
63             /**
64                 The functor calls the destructor for popped element.
65                 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
66                 If \p T is a complex type, \p value_cleaner may be useful feature.
67                 For POD types \ref opt::v::empty_cleaner is suitable
68
69                 Default value is \ref opt::v::auto_cleaner that calls destructor only if it is not trivial.
70             */
71             typedef cds::opt::v::auto_cleaner value_cleaner;
72
73             /// C++ memory ordering model
74             /**
75                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
76                 or \p opt::v::sequential_consistent (sequentially consistent memory model).
77             */
78             typedef opt::v::relaxed_ordering    memory_model;
79
80             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
81             enum { padding = opt::cache_line_padding };
82         };
83
84         /// Metafunction converting option list to \p weak_ringbuffer::traits
85         /**
86             Supported \p Options are:
87             - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
88                 \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
89                 element in the buffer is not important: it will be changed via \p rebind metafunction.
90             - \p opt::value_cleaner - a functor to clean items dequeued.
91                 The functor calls the destructor for ring-buffer item.
92                 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
93                 If \p T is a complex type, \p value_cleaner can be an useful feature.
94                 Default value is \ref opt::v::empty_cleaner that is suitable for POD types.
95             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
96             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
97                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
98
99             Example: declare \p %WeakRingBuffer with static iternal buffer of size 1024:
100             \code
101             typedef cds::container::WeakRingBuffer< Foo,
102                 typename cds::container::weak_ringbuffer::make_traits<
103                     cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >
104                 >::type
105             > myRing;
106             \endcode
107         */
108         template <typename... Options>
109         struct make_traits {
110 #   ifdef CDS_DOXYGEN_INVOKED
111             typedef implementation_defined type;   ///< Metafunction result
112 #   else
113             typedef typename cds::opt::make_options<
114                 typename cds::opt::find_type_traits< traits, Options... >::type
115                 , Options...
116             >::type type;
117 #   endif
118         };
119
120     } // namespace weak_ringbuffer
121
122     /// Single-producer single-consumer ring buffer
123     /** @ingroup cds_nonintrusive_queue
124         Source: [2013] Nhat Minh Le, Adrien Guatto, Albert Cohen, Antoniu Pop. Correct and Effcient Bounded
125             FIFO Queues. [Research Report] RR-8365, INRIA. 2013. <hal-00862450>
126
127         Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
128         you can push/pop an array of elements.
129
130         There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer<void, Traits>" 
131         that is not a queue but a "memory pool" between producer and consumer threads. 
132         \p WeakRingBuffer<void> supports data of different size.
133     */
134     template <typename T, typename Traits = weak_ringbuffer::traits>
135     class WeakRingBuffer: public cds::bounded_container
136     {
137     public:
138         typedef T value_type;   ///< Value type to be stored in the ring buffer
139         typedef Traits traits;  ///< Ring buffer traits
140         typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
141         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p weak_ringbuffer::traits::value_cleaner
142
143         /// Rebind template arguments
144         template <typename T2, typename Traits2>
145         struct rebind {
146             typedef WeakRingBuffer< T2, Traits2 > other;   ///< Rebinding result
147         };
148
149         //@cond
150         // Only for tests
151         typedef size_t item_counter;
152         //@endcond
153
154     private:
155         //@cond
156         typedef typename traits::buffer::template rebind< value_type >::other buffer;
157         //@endcond
158
159     public:
160
161         /// Creates the ring buffer of \p capacity
162         /**
163             For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
164
165             If the buffer capacity is a power of two, lightweight binary arithmetics is used
166             instead of modulo arithmetics.
167         */
168         WeakRingBuffer( size_t capacity = 0 )
169             : front_( 0 )
170             , pfront_( 0 )
171             , cback_( 0 )
172             , buffer_( capacity )
173         {
174             back_.store( 0, memory_model::memory_order_release );
175         }
176
177         /// Destroys the ring buffer
178         ~WeakRingBuffer()
179         {
180             value_cleaner cleaner;
181             size_t back = back_.load( memory_model::memory_order_relaxed );
182             for ( size_t front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
183                 cleaner( buffer_[ buffer_.mod( front ) ] );
184         }
185
186         /// Batch push - push array \p arr of size \p count
187         /**
188             \p CopyFunc is a per-element copy functor: for each element of \p arr
189             <tt>copy( dest, arr[i] )</tt> is called.
190             The \p CopyFunc signature:
191             \code
192                 void copy_func( value_type& element, Q const& source );
193             \endcode
194             Here \p element is uninitialized so you should construct it using placement new
195             if needed; for example, if the element type is \p str::string and \p Q is <tt>char const*</tt>,
196             \p copy functor can be:
197             \code
198             cds::container::WeakRingBuffer<std::string> ringbuf;
199             char const* arr[10];
200             ringbuf.push( arr, 10, 
201                 []( std::string& element, char const* src ) {
202                     new( &element ) std::string( src );
203                 });
204             \endcode
205             You may use move semantics if appropriate:
206             \code
207             cds::container::WeakRingBuffer<std::string> ringbuf;
208             std::string arr[10];
209             ringbuf.push( arr, 10,
210                 []( std::string& element, std:string& src ) {
211                     new( &element ) std::string( std::move( src ));
212                 });
213             \endcode
214
215             Returns \p true if success or \p false if not enough space in the ring
216         */
217         template <typename Q, typename CopyFunc>
218         bool push( Q* arr, size_t count, CopyFunc copy )
219         {
220             assert( count < capacity() );
221             size_t back = back_.load( memory_model::memory_order_relaxed );
222
223             assert( back - pfront_ <= capacity() );
224
225             if ( pfront_ + capacity() - back < count ) {
226                 pfront_ = front_.load( memory_model::memory_order_acquire );
227
228                 if ( pfront_ + capacity() - back < count ) {
229                     // not enough space
230                     return false;
231                 }
232             }
233
234             // copy data
235             for ( size_t i = 0; i < count; ++i, ++back )
236                 copy( buffer_[buffer_.mod( back )], arr[i] );
237
238             back_.store( back, memory_model::memory_order_release );
239
240             return true;
241         }
242
243         /// Batch push - push array \p arr of size \p count with assignment as copy functor
244         /**
245             This function is equivalent for:
246             \code
247             push( arr, count, []( value_type& dest, Q const& src ) { dest = src; } );
248             \endcode
249
250             The function is available only if <tt>std::is_constructible<value_type, Q>::value</tt>
251             is \p true.
252
253             Returns \p true if success or \p false if not enough space in the ring
254         */
255         template <typename Q>
256         typename std::enable_if< std::is_constructible<value_type, Q>::value, bool>::type
257         push( Q* arr, size_t count )
258         {
259             return push( arr, count, []( value_type& dest, Q const& src ) { new( &dest ) value_type( src ); } );
260         }
261
262         /// Push one element created from \p args
263         /**
264             The function is available only if <tt>std::is_constructible<value_type, Args...>::value</tt>
265             is \p true.
266
267             Returns \p false if the ring is full or \p true otherwise.
268         */
269         template <typename... Args>
270         typename std::enable_if< std::is_constructible<value_type, Args...>::value, bool>::type
271         emplace( Args&&... args )
272         {
273             size_t back = back_.load( memory_model::memory_order_relaxed );
274
275             assert( back - pfront_ <= capacity() );
276
277             if ( pfront_ + capacity() - back < 1 ) {
278                 pfront_ = front_.load( memory_model::memory_order_acquire );
279
280                 if ( pfront_ + capacity() - back < 1 ) {
281                     // not enough space
282                     return false;
283                 }
284             }
285
286             new( &buffer_[buffer_.mod( back )] ) value_type( std::forward<Args>(args)... );
287
288             back_.store( back + 1, memory_model::memory_order_release );
289
290             return true;
291         }
292
293         /// Enqueues data to the ring using a functor
294         /**
295             \p Func is a functor called to copy a value to the ring element.
296             The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
297             \code
298             cds::container::WeakRingBuffer< Foo > myRing;
299             Bar bar;
300             myRing.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
301             \endcode
302         */
303         template <typename Func>
304         bool enqueue_with( Func f )
305         {
306             size_t back = back_.load( memory_model::memory_order_relaxed );
307
308             assert( back - pfront_ <= capacity() );
309
310             if ( pfront_ + capacity() - back < 1 ) {
311                 pfront_ = front_.load( memory_model::memory_order_acquire );
312
313                 if ( pfront_ + capacity() - back < 1 ) {
314                     // not enough space
315                     return false;
316                 }
317             }
318
319             f( buffer_[buffer_.mod( back )] );
320
321             back_.store( back + 1, memory_model::memory_order_release );
322
323             return true;
324
325         }
326
327         /// Enqueues \p val value into the queue.
328         /**
329             The new queue item is created by calling placement new in free cell.
330             Returns \p true if success, \p false if the ring is full.
331         */
332         bool enqueue( value_type const& val )
333         {
334             return emplace( val );
335         }
336
337         /// Enqueues \p val value into the queue, move semantics
338         bool enqueue( value_type&& val )
339         {
340             return emplace( std::move( val ));
341         }
342
343         /// Synonym for \p enqueue( value_type const& )
344         bool push( value_type const& val )
345         {
346             return enqueue( val );
347         }
348
349         /// Synonym for \p enqueue( value_type&& )
350         bool push( value_type&& val )
351         {
352             return enqueue( std::move( val ));
353         }
354
355         /// Synonym for \p enqueue_with()
356         template <typename Func>
357         bool push_with( Func f )
358         {
359             return enqueue_with( f );
360         }
361
362         /// Batch pop \p count element from the ring buffer into \p arr
363         /**
364             \p CopyFunc is a per-element copy functor: for each element of \p arr
365             <tt>copy( arr[i], source )</tt> is called.
366             The \p CopyFunc signature:
367             \code
368             void copy_func( Q& dest, value_type& elemen );
369             \endcode
370
371             Returns \p true if success or \p false if not enough space in the ring
372         */
373         template <typename Q, typename CopyFunc>
374         bool pop( Q* arr, size_t count, CopyFunc copy )
375         {
376             assert( count < capacity() );
377
378             size_t front = front_.load( memory_model::memory_order_relaxed );
379             assert( cback_ - front < capacity() );
380
381             if ( cback_ - front < count ) {
382                 cback_ = back_.load( memory_model::memory_order_acquire );
383                 if ( cback_ - front < count )
384                     return false;
385             }
386
387             // copy data
388             value_cleaner cleaner;
389             for ( size_t i = 0; i < count; ++i, ++front ) {
390                 value_type& val = buffer_[buffer_.mod( front )];
391                 copy( arr[i], val );
392                 cleaner( val );
393             }
394
395             front_.store( front, memory_model::memory_order_release );
396             return true;
397         }
398
399         /// Batch pop - push array \p arr of size \p count with assignment as copy functor
400         /**
401             This function is equivalent for:
402             \code
403             pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
404             \endcode
405
406             The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
407             is \p true.
408
409             Returns \p true if success or \p false if not enough space in the ring
410         */
411         template <typename Q>
412         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
413         pop( Q* arr, size_t count )
414         {
415             return pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
416         }
417
418         /// Dequeues an element from the ring to \p val
419         /**
420             The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
421             is \p true.
422
423             Returns \p false if the ring is full or \p true otherwise.
424         */
425         template <typename Q>
426         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
427         dequeue( Q& val )
428         {
429             return pop( &val, 1 );
430         }
431
432         /// Synonym for \p dequeue( Q& )
433         template <typename Q>
434         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
435         pop( Q& val )
436         {
437             return dequeue( val );
438         }
439
440         /// Dequeues a value using a functor
441         /**
442             \p Func is a functor called to copy dequeued value.
443             The functor takes one argument - a reference to removed node:
444             \code
445             cds:container::WeakRingBuffer< Foo > myRing;
446             Bar bar;
447             myRing.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
448             \endcode
449
450             Returns \p true if the ring is not empty, \p false otherwise.
451             The functor is called only if the ring is not empty.
452         */
453         template <typename Func>
454         bool dequeue_with( Func f )
455         {
456             size_t front = front_.load( memory_model::memory_order_relaxed );
457             assert( cback_ - front < capacity() );
458
459             if ( cback_ - front < 1 ) {
460                 cback_ = back_.load( memory_model::memory_order_acquire );
461                 if ( cback_ - front < 1 )
462                     return false;
463             }
464
465             value_type& val = buffer_[buffer_.mod( front )];
466             f( val );
467             value_cleaner()( val );
468
469             front_.store( front + 1, memory_model::memory_order_release );
470             return true;
471         }
472
473         /// Synonym for \p dequeue_with()
474         template <typename Func>
475         bool pop_with( Func f )
476         {
477             return dequeue_with( f );
478         }
479
480         /// Gets pointer to first element of ring buffer
481         /**
482             If the ring buffer is empty, returns \p nullptr
483
484             The function is thread-safe since there is only one consumer.
485             Recall, \p WeakRingBuffer is single-producer/single consumer container.
486         */
487         value_type* front()
488         {
489             size_t front = front_.load( memory_model::memory_order_relaxed );
490             assert( cback_ - front < capacity() );
491
492             if ( cback_ - front < 1 ) {
493                 cback_ = back_.load( memory_model::memory_order_acquire );
494                 if ( cback_ - front < 1 )
495                     return nullptr;
496             }
497
498             return &buffer_[buffer_.mod( front )];
499         }
500
501         /// Removes front element of ring-buffer
502         /**
503             If the ring-buffer is empty, returns \p false.
504             Otherwise, pops the first element from the ring.
505         */
506         bool pop_front()
507         {
508             size_t front = front_.load( memory_model::memory_order_relaxed );
509             assert( cback_ - front <= capacity() );
510
511             if ( cback_ - front < 1 ) {
512                 cback_ = back_.load( memory_model::memory_order_acquire );
513                 if ( cback_ - front < 1 )
514                     return false;
515             }
516
517             // clean cell
518             value_cleaner()( buffer_[buffer_.mod( front )] );
519
520             front_.store( front + 1, memory_model::memory_order_release );
521             return true;
522         }
523
524         /// Clears the ring buffer (only consumer can call this function!)
525         void clear()
526         {
527             value_type v;
528             while ( pop( v ) );
529         }
530
531         /// Checks if the ring-buffer is empty
532         bool empty() const
533         {
534             return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
535         }
536
537         /// Checks if the ring-buffer is full
538         bool full() const
539         {
540             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
541         }
542
543         /// Returns the current size of ring buffer
544         size_t size() const
545         {
546             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
547         }
548
549         /// Returns capacity of the ring buffer
550         size_t capacity() const
551         {
552             return buffer_.capacity();
553         }
554
555     private:
556         //@cond
557         atomics::atomic<size_t>     front_;
558         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
559         atomics::atomic<size_t>     back_;
560         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
561         size_t                      pfront_;
562         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
563         size_t                      cback_;
564         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
565
566         buffer                      buffer_;
567         //@endcond
568     };
569
570
571     /// Single-producer single-consumer ring buffer for untyped variable-sized data
572     /** @ingroup cds_nonintrusive_queue
573         @anchor cds_nonintrusive_WeakRingBuffer_void
574     */
575 #ifdef CDS_DOXYGEN_INVOKED
576     template <typename Traits = weak_ringbuffer::traits>
577 #else
578     template <typename Traits>
579 #endif
580     class WeakRingBuffer<void, Traits>: public cds::bounded_container
581     {
582     public:
583         typedef Traits      traits;         ///< Ring buffer traits
584         typedef typename    traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
585
586     private:
587         //@cond
588         typedef typename traits::buffer::template rebind< uint8_t >::other buffer;
589         //@endcond
590
591     public:
592         /// Creates the ring buffer of \p capacity bytes
593         /**
594             For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
595
596             If the buffer capacity is a power of two, lightweight binary arithmetics is used
597             instead of modulo arithmetics.
598         */
599         WeakRingBuffer( size_t capacity = 0 )
600             : front_( 0 )
601             , pfront_( 0 )
602             , cback_( 0 )
603             , buffer_( capacity )
604         {
605             back_.store( 0, memory_model::memory_order_release );
606         }
607
608         /// [producer] Reserve \p size bytes
609         void* back( size_t size )
610         {
611             // Any data is rounded to 8-byte boundary
612             size_t real_size = calc_real_size( size );
613
614             // check if we can reserve read_size bytes
615             assert( real_size < capacity() );
616             size_t back = back_.load( memory_model::memory_order_relaxed );
617
618             assert( back - pfront_ <= capacity() );
619
620             if ( pfront_ + capacity() - back < real_size ) {
621                 pfront_ = front_.load( memory_model::memory_order_acquire );
622
623                 if ( pfront_ + capacity() - back < real_size ) {
624                     // not enough space
625                     return nullptr;
626                 }
627             }
628
629             uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
630
631             // Check if the buffer free space is enough for storing real_size bytes
632             size_t tail_size = capacity() - buffer_.mod( back );
633             if ( tail_size < real_size ) {
634                 // make unused tail
635                 assert( tail_size >= sizeof( size_t ) );
636                 assert( !is_tail( tail_size ) );
637
638                 *reinterpret_cast<size_t*>( reserved ) = make_tail( tail_size - sizeof(size_t));
639                 back += tail_size;
640
641                 // We must be in beginning of buffer
642                 assert( buffer_.mod( back ) == 0 );
643
644                 if ( pfront_ + capacity() - back < real_size ) {
645                     pfront_ = front_.load( memory_model::memory_order_acquire );
646
647                     if ( pfront_ + capacity() - back < real_size ) {
648                         // not enough space
649                         return nullptr;
650                     }
651                 }
652
653                 reserved = buffer_.buffer();
654             }
655
656             // reserve and store size
657             *reinterpret_cast<size_t*>( reserved ) = size;
658
659             return reinterpret_cast<void*>( reserved + sizeof( size_t ) );
660         }
661
662         /// [producer] Push reserved bytes into ring
663         void push_back()
664         {
665             size_t back = back_.load( memory_model::memory_order_relaxed );
666             uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
667
668             size_t real_size = calc_real_size( *reinterpret_cast<size_t*>( reserved ) );
669             assert( real_size < capacity() );
670
671             back_.store( back + real_size, memory_model::memory_order_release );
672         }
673
674         /// [producer] Push \p data of \p size bytes into ring
675         bool push_back( void const* data, size_t size )
676         {
677             void* buf = back( size );
678             if ( buf ) {
679                 memcpy( buf, data, size );
680                 push_back();
681                 return true;
682             }
683             return false;
684         }
685
686         /// [consumer] Get top data from the ring
687         std::pair<void*, size_t> front()
688         {
689             size_t front = front_.load( memory_model::memory_order_relaxed );
690             assert( cback_ - front < capacity() );
691
692             if ( cback_ - front < sizeof( size_t )) {
693                 cback_ = back_.load( memory_model::memory_order_acquire );
694                 if ( cback_ - front < sizeof( size_t ) )
695                     return std::make_pair( nullptr, 0u );
696             }
697
698             uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
699
700             // check alignment
701             assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
702
703             size_t size = *reinterpret_cast<size_t*>( buf );
704             if ( is_tail( size ) ) {
705                 // unused tail, skip
706                 CDS_VERIFY( pop_front() );
707
708                 front = front_.load( memory_model::memory_order_relaxed );
709                 buf = buffer_.buffer() + buffer_.mod( front );
710                 size = *reinterpret_cast<size_t*>( buf );
711
712                 assert( !is_tail( size ) );
713             }
714
715 #ifdef _DEBUG
716             size_t real_size = calc_real_size( size );
717             if ( cback_ - front < real_size ) {
718                 cback_ = back_.load( memory_model::memory_order_acquire );
719                 assert( cback_ - front >= real_size );
720             }
721 #endif
722
723             return std::make_pair( reinterpret_cast<void*>( buf + sizeof( size_t ) ), size );
724         }
725
726         /// [consumer] Pops top data
727         bool pop_front()
728         {
729             size_t front = front_.load( memory_model::memory_order_relaxed );
730             assert( cback_ - front <= capacity() );
731
732             if ( cback_ - front < sizeof(size_t) ) {
733                 cback_ = back_.load( memory_model::memory_order_acquire );
734                 if ( cback_ - front < sizeof( size_t ) )
735                     return false;
736             }
737
738             uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
739
740             // check alignment
741             assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
742
743             size_t size = *reinterpret_cast<size_t*>( buf );
744             assert( !is_tail( size ) );
745
746             size_t real_size = calc_real_size( size );
747
748 #ifdef _DEBUG
749             if ( cback_ - front < real_size ) {
750                 cback_ = back_.load( memory_model::memory_order_acquire );
751                 assert( cback_ - front >= real_size );
752             }
753 #endif
754
755             front_.store( front + real_size, memory_model::memory_order_release );
756             return true;
757
758         }
759
760         /// [consumer] Clears the ring buffer
761         void clear()
762         {
763             for ( auto el = front(); el.first; el = front() )
764                 pop_front();
765         }
766
767         /// Checks if the ring-buffer is empty
768         bool empty() const
769         {
770             return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
771         }
772
773         /// Checks if the ring-buffer is full
774         bool full() const
775         {
776             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
777         }
778
779         /// Returns the current size of ring buffer
780         size_t size() const
781         {
782             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
783         }
784
785         /// Returns capacity of the ring buffer
786         size_t capacity() const
787         {
788             return buffer_.capacity();
789         }
790
791     private:
792         static size_t calc_real_size( size_t size )
793         {
794             size_t real_size =  (( size + sizeof( uintptr_t ) - 1 ) & ~( sizeof( uintptr_t ) - 1 )) + sizeof( size_t );
795
796             assert( real_size > size );
797             assert( real_size - size >= sizeof( size_t ) );
798
799             return real_size;
800         }
801
802         static bool is_tail( size_t size )
803         {
804             return ( size & ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ))) != 0;
805         }
806
807         static size_t make_tail( size_t size )
808         {
809             return size | ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ));
810         }
811
812     private:
813         //@cond
814         atomics::atomic<size_t>     front_;
815         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
816         atomics::atomic<size_t>     back_;
817         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
818         size_t                      pfront_;
819         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
820         size_t                      cback_;
821         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
822
823         buffer                      buffer_;
824         //@endcond
825     };
826
827 }} // namespace cds::container
828
829
830 #endif // #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H