Docfix
[libcds.git] / cds / container / weak_ringbuffer.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
32 #define CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
33
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
39
40 namespace cds { namespace container {
41
42     /// \p WeakRingBuffer related definitions
43     /** @ingroup cds_nonintrusive_helper
44     */
45     namespace weak_ringbuffer {
46
47         /// \p WeakRingBuffer default traits
48         struct traits {
49             /// Buffer type for internal array
50             /*
51                 The type of element for the buffer is not important: \p WeakRingBuffer rebind
52                 the buffer for required type via \p rebind metafunction.
53
54                 For \p WeakRingBuffer the buffer size should have power-of-2 size.
55
56                 You should use only uninitialized buffer for the ring buffer -
57                 \p cds::opt::v::uninitialized_dynamic_buffer (the default),
58                 \p cds::opt::v::uninitialized_static_buffer.
59             */
60             typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
61
62             /// A functor to clean item dequeued.
63             /**
64                 The functor calls the destructor for popped element.
65                 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
66                 If \p T is a complex type, \p value_cleaner may be useful feature.
67                 For POD types \ref opt::v::empty_cleaner is suitable
68
69                 Default value is \ref opt::v::auto_cleaner that calls destructor only if it is not trivial.
70             */
71             typedef cds::opt::v::auto_cleaner value_cleaner;
72
73             /// C++ memory ordering model
74             /**
75                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
76                 or \p opt::v::sequential_consistent (sequentially consistent memory model).
77             */
78             typedef opt::v::relaxed_ordering    memory_model;
79
80             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
81             enum { padding = opt::cache_line_padding };
82         };
83
84         /// Metafunction converting option list to \p weak_ringbuffer::traits
85         /**
86             Supported \p Options are:
87             - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
88                 \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
89                 element in the buffer is not important: it will be changed via \p rebind metafunction.
90             - \p opt::value_cleaner - a functor to clean items dequeued.
91                 The functor calls the destructor for ring-buffer item.
92                 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
93                 If \p T is a complex type, \p value_cleaner can be an useful feature.
94                 Default value is \ref opt::v::empty_cleaner that is suitable for POD types.
95             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
96             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
97                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
98
99             Example: declare \p %WeakRingBuffer with static iternal buffer for 1024 objects:
100             \code
101             typedef cds::container::WeakRingBuffer< Foo,
102                 typename cds::container::weak_ringbuffer::make_traits<
103                     cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >
104                 >::type
105             > myRing;
106             \endcode
107         */
108         template <typename... Options>
109         struct make_traits {
110 #   ifdef CDS_DOXYGEN_INVOKED
111             typedef implementation_defined type;   ///< Metafunction result
112 #   else
113             typedef typename cds::opt::make_options<
114                 typename cds::opt::find_type_traits< traits, Options... >::type
115                 , Options...
116             >::type type;
117 #   endif
118         };
119
120     } // namespace weak_ringbuffer
121
122     /// Single-producer single-consumer ring buffer
123     /** @ingroup cds_nonintrusive_queue
124         Source: [2013] Nhat Minh Le, Adrien Guatto, Albert Cohen, Antoniu Pop. Correct and Effcient Bounded
125             FIFO Queues. [Research Report] RR-8365, INRIA. 2013. <hal-00862450>
126
127         Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
128         you can push/pop an array of elements.
129
130         There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer<void, Traits>" 
131         that is not a queue but a "memory pool" between producer and consumer threads. 
132         \p WeakRingBuffer<void> supports variable-sized data.
133
134         @warning: \p %WeakRingBuffer is developed for 64-bit architecture.
135         On 32-bit platform an integer overflow of internal counters is possible.
136     */
137     template <typename T, typename Traits = weak_ringbuffer::traits>
138     class WeakRingBuffer: public cds::bounded_container
139     {
140     public:
141         typedef T value_type;   ///< Value type to be stored in the ring buffer
142         typedef Traits traits;  ///< Ring buffer traits
143         typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
144         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p weak_ringbuffer::traits::value_cleaner
145
146         /// Rebind template arguments
147         template <typename T2, typename Traits2>
148         struct rebind {
149             typedef WeakRingBuffer< T2, Traits2 > other;   ///< Rebinding result
150         };
151
152         //@cond
153         // Only for tests
154         typedef size_t item_counter;
155         //@endcond
156
157     private:
158         //@cond
159         typedef typename traits::buffer::template rebind< value_type >::other buffer;
160         //@endcond
161
162     public:
163
164         /// Creates the ring buffer of \p capacity
165         /**
166             For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
167
168             If the buffer capacity is a power of two, lightweight binary arithmetics is used
169             instead of modulo arithmetics.
170         */
171         WeakRingBuffer( size_t capacity = 0 )
172             : front_( 0 )
173             , pfront_( 0 )
174             , cback_( 0 )
175             , buffer_( capacity )
176         {
177             back_.store( 0, memory_model::memory_order_release );
178         }
179
180         /// Destroys the ring buffer
181         ~WeakRingBuffer()
182         {
183             value_cleaner cleaner;
184             size_t back = back_.load( memory_model::memory_order_relaxed );
185             for ( size_t front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
186                 cleaner( buffer_[ buffer_.mod( front ) ] );
187         }
188
189         /// Batch push - push array \p arr of size \p count
190         /**
191             \p CopyFunc is a per-element copy functor: for each element of \p arr
192             <tt>copy( dest, arr[i] )</tt> is called.
193             The \p CopyFunc signature:
194             \code
195                 void copy_func( value_type& element, Q const& source );
196             \endcode
197             Here \p element is uninitialized so you should construct it using placement new
198             if needed; for example, if the element type is \p str::string and \p Q is <tt>char const*</tt>,
199             \p copy functor can be:
200             \code
201             cds::container::WeakRingBuffer<std::string> ringbuf;
202             char const* arr[10];
203             ringbuf.push( arr, 10, 
204                 []( std::string& element, char const* src ) {
205                     new( &element ) std::string( src );
206                 });
207             \endcode
208             You may use move semantics if appropriate:
209             \code
210             cds::container::WeakRingBuffer<std::string> ringbuf;
211             std::string arr[10];
212             ringbuf.push( arr, 10,
213                 []( std::string& element, std:string& src ) {
214                     new( &element ) std::string( std::move( src ));
215                 });
216             \endcode
217
218             Returns \p true if success or \p false if not enough space in the ring
219         */
220         template <typename Q, typename CopyFunc>
221         bool push( Q* arr, size_t count, CopyFunc copy )
222         {
223             assert( count < capacity() );
224             size_t back = back_.load( memory_model::memory_order_relaxed );
225
226             assert( back - pfront_ <= capacity() );
227
228             if ( pfront_ + capacity() - back < count ) {
229                 pfront_ = front_.load( memory_model::memory_order_acquire );
230
231                 if ( pfront_ + capacity() - back < count ) {
232                     // not enough space
233                     return false;
234                 }
235             }
236
237             // copy data
238             for ( size_t i = 0; i < count; ++i, ++back )
239                 copy( buffer_[buffer_.mod( back )], arr[i] );
240
241             back_.store( back, memory_model::memory_order_release );
242
243             return true;
244         }
245
246         /// Batch push - push array \p arr of size \p count with assignment as copy functor
247         /**
248             This function is equivalent for:
249             \code
250             push( arr, count, []( value_type& dest, Q const& src ) { dest = src; } );
251             \endcode
252
253             The function is available only if <tt>std::is_constructible<value_type, Q>::value</tt>
254             is \p true.
255
256             Returns \p true if success or \p false if not enough space in the ring
257         */
258         template <typename Q>
259         typename std::enable_if< std::is_constructible<value_type, Q>::value, bool>::type
260         push( Q* arr, size_t count )
261         {
262             return push( arr, count, []( value_type& dest, Q const& src ) { new( &dest ) value_type( src ); } );
263         }
264
265         /// Push one element created from \p args
266         /**
267             The function is available only if <tt>std::is_constructible<value_type, Args...>::value</tt>
268             is \p true.
269
270             Returns \p false if the ring is full or \p true otherwise.
271         */
272         template <typename... Args>
273         typename std::enable_if< std::is_constructible<value_type, Args...>::value, bool>::type
274         emplace( Args&&... args )
275         {
276             size_t back = back_.load( memory_model::memory_order_relaxed );
277
278             assert( back - pfront_ <= capacity() );
279
280             if ( pfront_ + capacity() - back < 1 ) {
281                 pfront_ = front_.load( memory_model::memory_order_acquire );
282
283                 if ( pfront_ + capacity() - back < 1 ) {
284                     // not enough space
285                     return false;
286                 }
287             }
288
289             new( &buffer_[buffer_.mod( back )] ) value_type( std::forward<Args>(args)... );
290
291             back_.store( back + 1, memory_model::memory_order_release );
292
293             return true;
294         }
295
296         /// Enqueues data to the ring using a functor
297         /**
298             \p Func is a functor called to copy a value to the ring element.
299             The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
300             \code
301             cds::container::WeakRingBuffer< Foo > myRing;
302             Bar bar;
303             myRing.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
304             \endcode
305         */
306         template <typename Func>
307         bool enqueue_with( Func f )
308         {
309             size_t back = back_.load( memory_model::memory_order_relaxed );
310
311             assert( back - pfront_ <= capacity() );
312
313             if ( pfront_ + capacity() - back < 1 ) {
314                 pfront_ = front_.load( memory_model::memory_order_acquire );
315
316                 if ( pfront_ + capacity() - back < 1 ) {
317                     // not enough space
318                     return false;
319                 }
320             }
321
322             f( buffer_[buffer_.mod( back )] );
323
324             back_.store( back + 1, memory_model::memory_order_release );
325
326             return true;
327
328         }
329
330         /// Enqueues \p val value into the queue.
331         /**
332             The new queue item is created by calling placement new in free cell.
333             Returns \p true if success, \p false if the ring is full.
334         */
335         bool enqueue( value_type const& val )
336         {
337             return emplace( val );
338         }
339
340         /// Enqueues \p val value into the queue, move semantics
341         bool enqueue( value_type&& val )
342         {
343             return emplace( std::move( val ));
344         }
345
346         /// Synonym for \p enqueue( value_type const& )
347         bool push( value_type const& val )
348         {
349             return enqueue( val );
350         }
351
352         /// Synonym for \p enqueue( value_type&& )
353         bool push( value_type&& val )
354         {
355             return enqueue( std::move( val ));
356         }
357
358         /// Synonym for \p enqueue_with()
359         template <typename Func>
360         bool push_with( Func f )
361         {
362             return enqueue_with( f );
363         }
364
365         /// Batch pop \p count element from the ring buffer into \p arr
366         /**
367             \p CopyFunc is a per-element copy functor: for each element of \p arr
368             <tt>copy( arr[i], source )</tt> is called.
369             The \p CopyFunc signature:
370             \code
371             void copy_func( Q& dest, value_type& elemen );
372             \endcode
373
374             Returns \p true if success or \p false if not enough space in the ring
375         */
376         template <typename Q, typename CopyFunc>
377         bool pop( Q* arr, size_t count, CopyFunc copy )
378         {
379             assert( count < capacity() );
380
381             size_t front = front_.load( memory_model::memory_order_relaxed );
382             assert( cback_ - front < capacity() );
383
384             if ( cback_ - front < count ) {
385                 cback_ = back_.load( memory_model::memory_order_acquire );
386                 if ( cback_ - front < count )
387                     return false;
388             }
389
390             // copy data
391             value_cleaner cleaner;
392             for ( size_t i = 0; i < count; ++i, ++front ) {
393                 value_type& val = buffer_[buffer_.mod( front )];
394                 copy( arr[i], val );
395                 cleaner( val );
396             }
397
398             front_.store( front, memory_model::memory_order_release );
399             return true;
400         }
401
402         /// Batch pop - push array \p arr of size \p count with assignment as copy functor
403         /**
404             This function is equivalent for:
405             \code
406             pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
407             \endcode
408
409             The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
410             is \p true.
411
412             Returns \p true if success or \p false if not enough space in the ring
413         */
414         template <typename Q>
415         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
416         pop( Q* arr, size_t count )
417         {
418             return pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
419         }
420
421         /// Dequeues an element from the ring to \p val
422         /**
423             The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
424             is \p true.
425
426             Returns \p false if the ring is full or \p true otherwise.
427         */
428         template <typename Q>
429         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
430         dequeue( Q& val )
431         {
432             return pop( &val, 1 );
433         }
434
435         /// Synonym for \p dequeue( Q& )
436         template <typename Q>
437         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
438         pop( Q& val )
439         {
440             return dequeue( val );
441         }
442
443         /// Dequeues a value using a functor
444         /**
445             \p Func is a functor called to copy dequeued value.
446             The functor takes one argument - a reference to removed node:
447             \code
448             cds:container::WeakRingBuffer< Foo > myRing;
449             Bar bar;
450             myRing.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
451             \endcode
452
453             Returns \p true if the ring is not empty, \p false otherwise.
454             The functor is called only if the ring is not empty.
455         */
456         template <typename Func>
457         bool dequeue_with( Func f )
458         {
459             size_t front = front_.load( memory_model::memory_order_relaxed );
460             assert( cback_ - front < capacity() );
461
462             if ( cback_ - front < 1 ) {
463                 cback_ = back_.load( memory_model::memory_order_acquire );
464                 if ( cback_ - front < 1 )
465                     return false;
466             }
467
468             value_type& val = buffer_[buffer_.mod( front )];
469             f( val );
470             value_cleaner()( val );
471
472             front_.store( front + 1, memory_model::memory_order_release );
473             return true;
474         }
475
476         /// Synonym for \p dequeue_with()
477         template <typename Func>
478         bool pop_with( Func f )
479         {
480             return dequeue_with( f );
481         }
482
483         /// Gets pointer to first element of ring buffer
484         /**
485             If the ring buffer is empty, returns \p nullptr
486
487             The function is thread-safe since there is only one consumer.
488             Recall, \p WeakRingBuffer is single-producer/single consumer container.
489         */
490         value_type* front()
491         {
492             size_t front = front_.load( memory_model::memory_order_relaxed );
493             assert( cback_ - front < capacity() );
494
495             if ( cback_ - front < 1 ) {
496                 cback_ = back_.load( memory_model::memory_order_acquire );
497                 if ( cback_ - front < 1 )
498                     return nullptr;
499             }
500
501             return &buffer_[buffer_.mod( front )];
502         }
503
504         /// Removes front element of ring-buffer
505         /**
506             If the ring-buffer is empty, returns \p false.
507             Otherwise, pops the first element from the ring.
508         */
509         bool pop_front()
510         {
511             size_t front = front_.load( memory_model::memory_order_relaxed );
512             assert( cback_ - front <= capacity() );
513
514             if ( cback_ - front < 1 ) {
515                 cback_ = back_.load( memory_model::memory_order_acquire );
516                 if ( cback_ - front < 1 )
517                     return false;
518             }
519
520             // clean cell
521             value_cleaner()( buffer_[buffer_.mod( front )] );
522
523             front_.store( front + 1, memory_model::memory_order_release );
524             return true;
525         }
526
527         /// Clears the ring buffer (only consumer can call this function!)
528         void clear()
529         {
530             value_type v;
531             while ( pop( v ) );
532         }
533
534         /// Checks if the ring-buffer is empty
535         bool empty() const
536         {
537             return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
538         }
539
540         /// Checks if the ring-buffer is full
541         bool full() const
542         {
543             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
544         }
545
546         /// Returns the current size of ring buffer
547         size_t size() const
548         {
549             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
550         }
551
552         /// Returns capacity of the ring buffer
553         size_t capacity() const
554         {
555             return buffer_.capacity();
556         }
557
558     private:
559         //@cond
560         atomics::atomic<size_t>     front_;
561         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
562         atomics::atomic<size_t>     back_;
563         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
564         size_t                      pfront_;
565         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
566         size_t                      cback_;
567         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
568
569         buffer                      buffer_;
570         //@endcond
571     };
572
573
574     /// Single-producer single-consumer ring buffer for untyped variable-sized data
575     /** @ingroup cds_nonintrusive_queue
576         @anchor cds_nonintrusive_WeakRingBuffer_void
577
578         This SPSC ring-buffer is intended for data of variable size. The producer
579         allocates a buffer from ring, you fill it with data and pushes them back to ring.
580         The consumer thread reads data from front-end and then pops them:
581         \code
582         // allocates 1M ring buffer
583         WeakRingBuffer<void>    theRing( 1024 * 1024 );
584
585         void producer_thread()
586         {
587             // Get data of size N bytes
588             size_t size;
589             void*  data;
590
591             while ( true ) {
592                 // Get external data
593                 std::tie( data, size ) = get_data();
594
595                 if ( data == nullptr )
596                     break;
597
598                 // Allocates a buffer from the ring
599                 void* buf = theRing.back( size );
600                 if ( !buf ) {
601                     std::cout << "The ring is full" << std::endl;
602                     break;
603                 }
604
605                 memcpy( buf, data, size );
606
607                 // Push data into the ring
608                 theRing.push_back();
609             }
610         }
611
612         void consumer_thread()
613         {
614             while ( true ) {
615                 auto buf = theRing.front();
616
617                 if ( buf.first == nullptr ) {
618                     std::cout << "The ring is empty" << std::endl;
619                     break;
620                 }
621
622                 // Process data
623                 process_data( buf.first, buf.second );
624
625                 // Free buffer
626                 theRing.pop_front();
627             }
628         }
629         \endcode
630
631         @warning: \p %WeakRingBuffer is developed for 64-bit architecture.
632         On 32-bit platform an integer overflow of internal counters is possible.
633     */
634 #ifdef CDS_DOXYGEN_INVOKED
635     template <typename Traits = weak_ringbuffer::traits>
636 #else
637     template <typename Traits>
638 #endif
639     class WeakRingBuffer<void, Traits>: public cds::bounded_container
640     {
641     public:
642         typedef Traits      traits;         ///< Ring buffer traits
643         typedef typename    traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
644
645     private:
646         //@cond
647         typedef typename traits::buffer::template rebind< uint8_t >::other buffer;
648         //@endcond
649
650     public:
651         /// Creates the ring buffer of \p capacity bytes
652         /**
653             For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
654
655             If the buffer capacity is a power of two, lightweight binary arithmetics is used
656             instead of modulo arithmetics.
657         */
658         WeakRingBuffer( size_t capacity = 0 )
659             : front_( 0 )
660             , pfront_( 0 )
661             , cback_( 0 )
662             , buffer_( capacity )
663         {
664             back_.store( 0, memory_model::memory_order_release );
665         }
666
667         /// [producer] Reserve \p size bytes
668         /**
669             The function returns a pointer to reserved buffer of \p size bytes. 
670             If no enough space in the ring buffer the function returns \p nullptr.
671
672             After successful \p %back() you should fill the buffer provided and call \p push_back():
673             \code
674             // allocates 1M ring buffer
675             WeakRingBuffer<void>    theRing( 1024 * 1024 );
676
677             void producer_thread()
678             {
679                 // Get data of size N bytes
680                 size_t size;1
681                 void*  data;
682
683                 while ( true ) {
684                     // Get external data
685                     std::tie( data, size ) = get_data();
686
687                     if ( data == nullptr )
688                         break;
689
690                     // Allocates a buffer from the ring
691                     void* buf = theRing.back( size );
692                     if ( !buf ) {
693                         std::cout << "The ring is full" << std::endl;
694                         break;
695                     }
696
697                     memcpy( buf, data, size );
698
699                     // Push data into the ring
700                     theRing.push_back();
701                 }
702             }
703             \endcode
704         */
705         void* back( size_t size )
706         {
707             assert( size > 0 );
708
709             // Any data is rounded to 8-byte boundary
710             size_t real_size = calc_real_size( size );
711
712             // check if we can reserve read_size bytes
713             assert( real_size < capacity() );
714             size_t back = back_.load( memory_model::memory_order_relaxed );
715
716             assert( back - pfront_ <= capacity() );
717
718             if ( pfront_ + capacity() - back < real_size ) {
719                 pfront_ = front_.load( memory_model::memory_order_acquire );
720
721                 if ( pfront_ + capacity() - back < real_size ) {
722                     // not enough space
723                     return nullptr;
724                 }
725             }
726
727             uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
728
729             // Check if the buffer free space is enough for storing real_size bytes
730             size_t tail_size = capacity() - buffer_.mod( back );
731             if ( tail_size < real_size ) {
732                 // make unused tail
733                 assert( tail_size >= sizeof( size_t ) );
734                 assert( !is_tail( tail_size ) );
735
736                 *reinterpret_cast<size_t*>( reserved ) = make_tail( tail_size - sizeof(size_t));
737                 back += tail_size;
738
739                 // We must be in beginning of buffer
740                 assert( buffer_.mod( back ) == 0 );
741
742                 if ( pfront_ + capacity() - back < real_size ) {
743                     pfront_ = front_.load( memory_model::memory_order_acquire );
744
745                     if ( pfront_ + capacity() - back < real_size ) {
746                         // not enough space
747                         return nullptr;
748                     }
749                 }
750
751                 back_.store( back, memory_model::memory_order_release );
752                 reserved = buffer_.buffer();
753             }
754
755             // reserve and store size
756             *reinterpret_cast<size_t*>( reserved ) = size;
757
758             return reinterpret_cast<void*>( reserved + sizeof( size_t ) );
759         }
760
761         /// [producer] Push reserved bytes into ring
762         /**
763             The function pushes reserved buffer into the ring. Afte this call,
764             the buffer becomes visible by a consumer:
765             \code
766             // allocates 1M ring buffer
767             WeakRingBuffer<void>    theRing( 1024 * 1024 );
768
769             void producer_thread()
770             {
771                 // Get data of size N bytes
772                 size_t size;1
773                 void*  data;
774
775                 while ( true ) {
776                     // Get external data
777                     std::tie( data, size ) = get_data();
778
779                     if ( data == nullptr )
780                         break;
781
782                     // Allocates a buffer from the ring
783                     void* buf = theRing.back( size );
784                     if ( !buf ) {
785                         std::cout << "The ring is full" << std::endl;
786                         break;
787                     }
788
789                     memcpy( buf, data, size );
790
791                     // Push data into the ring
792                     theRing.push_back();
793                 }
794             }
795             \endcode
796         */
797         void push_back()
798         {
799             size_t back = back_.load( memory_model::memory_order_relaxed );
800             uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
801
802             size_t real_size = calc_real_size( *reinterpret_cast<size_t*>( reserved ) );
803             assert( real_size < capacity() );
804
805             back_.store( back + real_size, memory_model::memory_order_release );
806         }
807
808         /// [producer] Push \p data of \p size bytes into ring
809         /**
810             This function invokes \p back( size ), \p memcpy( buf, data, size )
811             and \p push_back() in one call.
812         */
813         bool push_back( void const* data, size_t size )
814         {
815             void* buf = back( size );
816             if ( buf ) {
817                 memcpy( buf, data, size );
818                 push_back();
819                 return true;
820             }
821             return false;
822         }
823
824         /// [consumer] Get top data from the ring
825         /**
826             If the ring is empty, the function returns \p nullptr in \p std:pair::first.
827         */
828         std::pair<void*, size_t> front()
829         {
830             size_t front = front_.load( memory_model::memory_order_relaxed );
831             assert( cback_ - front < capacity() );
832
833             if ( cback_ - front < sizeof( size_t )) {
834                 cback_ = back_.load( memory_model::memory_order_acquire );
835                 if ( cback_ - front < sizeof( size_t ) )
836                     return std::make_pair( nullptr, 0u );
837             }
838
839             uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
840
841             // check alignment
842             assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
843
844             size_t size = *reinterpret_cast<size_t*>( buf );
845             if ( is_tail( size ) ) {
846                 // unused tail, skip
847                 CDS_VERIFY( pop_front() );
848
849                 front = front_.load( memory_model::memory_order_relaxed );
850                 buf = buffer_.buffer() + buffer_.mod( front );
851                 size = *reinterpret_cast<size_t*>( buf );
852
853                 assert( !is_tail( size ) );
854                 assert( buf == buffer_.buffer() );
855             }
856
857 #ifdef _DEBUG
858             size_t real_size = calc_real_size( size );
859             if ( cback_ - front < real_size ) {
860                 cback_ = back_.load( memory_model::memory_order_acquire );
861                 assert( cback_ - front >= real_size );
862             }
863 #endif
864
865             return std::make_pair( reinterpret_cast<void*>( buf + sizeof( size_t )), size );
866         }
867
868         /// [consumer] Pops top data
869         /**
870             Typical consumer workloop:
871             \code
872             // allocates 1M ring buffer
873             WeakRingBuffer<void>    theRing( 1024 * 1024 );
874
875             void consumer_thread()
876             {
877                 while ( true ) {
878                     auto buf = theRing.front();
879
880                     if ( buf.first == nullptr ) {
881                         std::cout << "The ring is empty" << std::endl;
882                         break;
883                     }
884
885                     // Process data
886                     process_data( buf.first, buf.second );
887
888                     // Free buffer
889                     theRing.pop_front();
890                 }
891             }
892             \endcode
893         */
894         bool pop_front()
895         {
896             size_t front = front_.load( memory_model::memory_order_relaxed );
897             assert( cback_ - front <= capacity() );
898
899             if ( cback_ - front < sizeof(size_t) ) {
900                 cback_ = back_.load( memory_model::memory_order_acquire );
901                 if ( cback_ - front < sizeof( size_t ) )
902                     return false;
903             }
904
905             uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
906
907             // check alignment
908             assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
909
910             size_t size = *reinterpret_cast<size_t*>( buf );
911             size_t real_size = calc_real_size( untail( size ));
912
913 #ifdef _DEBUG
914             if ( cback_ - front < real_size ) {
915                 cback_ = back_.load( memory_model::memory_order_acquire );
916                 assert( cback_ - front >= real_size );
917             }
918 #endif
919
920             front_.store( front + real_size, memory_model::memory_order_release );
921             return true;
922
923         }
924
925         /// [consumer] Clears the ring buffer
926         void clear()
927         {
928             for ( auto el = front(); el.first; el = front() )
929                 pop_front();
930         }
931
932         /// Checks if the ring-buffer is empty
933         bool empty() const
934         {
935             return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
936         }
937
938         /// Checks if the ring-buffer is full
939         bool full() const
940         {
941             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
942         }
943
944         /// Returns the current size of ring buffer
945         size_t size() const
946         {
947             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
948         }
949
950         /// Returns capacity of the ring buffer
951         size_t capacity() const
952         {
953             return buffer_.capacity();
954         }
955
956     private:
957         //@cond
958         static size_t calc_real_size( size_t size )
959         {
960             size_t real_size =  (( size + sizeof( uintptr_t ) - 1 ) & ~( sizeof( uintptr_t ) - 1 )) + sizeof( size_t );
961
962             assert( real_size > size );
963             assert( real_size - size >= sizeof( size_t ) );
964
965             return real_size;
966         }
967
968         static bool is_tail( size_t size )
969         {
970             return ( size & ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ))) != 0;
971         }
972
973         static size_t make_tail( size_t size )
974         {
975             return size | ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ));
976         }
977
978         static size_t untail( size_t size )
979         {
980             return size & (( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ) ) - 1);
981         }
982         //@endcond
983
984     private:
985         //@cond
986         atomics::atomic<size_t>     front_;
987         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
988         atomics::atomic<size_t>     back_;
989         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
990         size_t                      pfront_;
991         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
992         size_t                      cback_;
993         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
994
995         buffer                      buffer_;
996         //@endcond
997     };
998
999 }} // namespace cds::container
1000
1001
1002 #endif // #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H