cf021320fba48f7aaacd7a88a677644f52054878
[libcds.git] / cds / container / weak_ringbuffer.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
32 #define CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
33
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
39
40 namespace cds { namespace container {
41
42     /// \p WeakRingBuffer related definitions
43     /** @ingroup cds_nonintrusive_helper
44     */
45     namespace weak_ringbuffer {
46
47         /// \p WeakRingBuffer default traits
48         struct traits {
49             /// Buffer type for internal array
50             /*
51                 The type of element for the buffer is not important: \p WeakRingBuffer rebind
52                 the buffer for required type via \p rebind metafunction.
53
54                 For \p WeakRingBuffer the buffer size should have power-of-2 size.
55
56                 You should use only uninitialized buffer for the ring buffer -
57                 \p cds::opt::v::uninitialized_dynamic_buffer (the default),
58                 \p cds::opt::v::uninitialized_static_buffer.
59             */
60             typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
61
62             /// A functor to clean item dequeued.
63             /**
64                 The functor calls the destructor for popped element.
65                 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
66                 If \p T is a complex type, \p value_cleaner may be useful feature.
67                 For POD types \ref opt::v::empty_cleaner is suitable
68
69                 Default value is \ref opt::v::auto_cleaner that calls destructor only if it is not trivial.
70             */
71             typedef cds::opt::v::auto_cleaner value_cleaner;
72
73             /// C++ memory ordering model
74             /**
75                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
76                 or \p opt::v::sequential_consistent (sequentially consistent memory model).
77             */
78             typedef opt::v::relaxed_ordering    memory_model;
79
80             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
81             enum { padding = opt::cache_line_padding };
82         };
83
84         /// Metafunction converting option list to \p weak_ringbuffer::traits
85         /**
86             Supported \p Options are:
87             - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
88                 \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
89                 element in the buffer is not important: it will be changed via \p rebind metafunction.
90             - \p opt::value_cleaner - a functor to clean items dequeued.
91                 The functor calls the destructor for ring-buffer item.
92                 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
93                 If \p T is a complex type, \p value_cleaner can be an useful feature.
94                 Default value is \ref opt::v::empty_cleaner that is suitable for POD types.
95             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
96             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
97                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
98
99             Example: declare \p %WeakRingBuffer with static iternal buffer for 1024 objects:
100             \code
101             typedef cds::container::WeakRingBuffer< Foo,
102                 typename cds::container::weak_ringbuffer::make_traits<
103                     cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >
104                 >::type
105             > myRing;
106             \endcode
107         */
108         template <typename... Options>
109         struct make_traits {
110 #   ifdef CDS_DOXYGEN_INVOKED
111             typedef implementation_defined type;   ///< Metafunction result
112 #   else
113             typedef typename cds::opt::make_options<
114                 typename cds::opt::find_type_traits< traits, Options... >::type
115                 , Options...
116             >::type type;
117 #   endif
118         };
119
120     } // namespace weak_ringbuffer
121
122     /// Single-producer single-consumer ring buffer
123     /** @ingroup cds_nonintrusive_queue
124         Source: [2013] Nhat Minh Le, Adrien Guatto, Albert Cohen, Antoniu Pop. Correct and Effcient Bounded
125             FIFO Queues. [Research Report] RR-8365, INRIA. 2013. <hal-00862450>
126
127         Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
128         you can push/pop an array of elements.
129
130         There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer<void, Traits>" 
131         that is not a queue but a "memory pool" between producer and consumer threads. 
132         \p WeakRingBuffer<void> supports variable-sized data.
133
134         @warning: \p %WeakRingBuffer is developed for 64-bit architecture.
135         32-bit platform must provide support for 64-bit atomics.
136     */
137     template <typename T, typename Traits = weak_ringbuffer::traits>
138     class WeakRingBuffer: public cds::bounded_container
139     {
140     public:
141         typedef T value_type;   ///< Value type to be stored in the ring buffer
142         typedef Traits traits;  ///< Ring buffer traits
143         typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
144         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p weak_ringbuffer::traits::value_cleaner
145
146         /// Rebind template arguments
147         template <typename T2, typename Traits2>
148         struct rebind {
149             typedef WeakRingBuffer< T2, Traits2 > other;   ///< Rebinding result
150         };
151
152         //@cond
153         // Only for tests
154         typedef size_t item_counter;
155         //@endcond
156
157     private:
158         //@cond
159         typedef typename traits::buffer::template rebind< value_type >::other buffer;
160         typedef uint64_t    counter_type;
161         //@endcond
162
163     public:
164
165         /// Creates the ring buffer of \p capacity
166         /**
167             For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
168
169             If the buffer capacity is a power of two, lightweight binary arithmetics is used
170             instead of modulo arithmetics.
171         */
172         WeakRingBuffer( size_t capacity = 0 )
173             : front_( 0 )
174             , pfront_( 0 )
175             , cback_( 0 )
176             , buffer_( capacity )
177         {
178             back_.store( 0, memory_model::memory_order_release );
179         }
180
181         /// Destroys the ring buffer
182         ~WeakRingBuffer()
183         {
184             value_cleaner cleaner;
185             counter_type back = back_.load( memory_model::memory_order_relaxed );
186             for ( counter_type front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
187                 cleaner( buffer_[ buffer_.mod( front ) ] );
188         }
189
190         /// Batch push - push array \p arr of size \p count
191         /**
192             \p CopyFunc is a per-element copy functor: for each element of \p arr
193             <tt>copy( dest, arr[i] )</tt> is called.
194             The \p CopyFunc signature:
195             \code
196                 void copy_func( value_type& element, Q const& source );
197             \endcode
198             Here \p element is uninitialized so you should construct it using placement new
199             if needed; for example, if the element type is \p str::string and \p Q is <tt>char const*</tt>,
200             \p copy functor can be:
201             \code
202             cds::container::WeakRingBuffer<std::string> ringbuf;
203             char const* arr[10];
204             ringbuf.push( arr, 10, 
205                 []( std::string& element, char const* src ) {
206                     new( &element ) std::string( src );
207                 });
208             \endcode
209             You may use move semantics if appropriate:
210             \code
211             cds::container::WeakRingBuffer<std::string> ringbuf;
212             std::string arr[10];
213             ringbuf.push( arr, 10,
214                 []( std::string& element, std:string& src ) {
215                     new( &element ) std::string( std::move( src ));
216                 });
217             \endcode
218
219             Returns \p true if success or \p false if not enough space in the ring
220         */
221         template <typename Q, typename CopyFunc>
222         bool push( Q* arr, size_t count, CopyFunc copy )
223         {
224             assert( count < capacity() );
225             counter_type back = back_.load( memory_model::memory_order_relaxed );
226
227             assert( static_cast<size_t>( back - pfront_ ) <= capacity() );
228
229             if ( static_cast<size_t>( pfront_ + capacity() - back ) < count ) {
230                 pfront_ = front_.load( memory_model::memory_order_acquire );
231
232                 if ( static_cast<size_t>( pfront_ + capacity() - back ) < count ) {
233                     // not enough space
234                     return false;
235                 }
236             }
237
238             // copy data
239             for ( size_t i = 0; i < count; ++i, ++back )
240                 copy( buffer_[buffer_.mod( back )], arr[i] );
241
242             back_.store( back, memory_model::memory_order_release );
243
244             return true;
245         }
246
247         /// Batch push - push array \p arr of size \p count with assignment as copy functor
248         /**
249             This function is equivalent for:
250             \code
251             push( arr, count, []( value_type& dest, Q const& src ) { dest = src; } );
252             \endcode
253
254             The function is available only if <tt>std::is_constructible<value_type, Q>::value</tt>
255             is \p true.
256
257             Returns \p true if success or \p false if not enough space in the ring
258         */
259         template <typename Q>
260         typename std::enable_if< std::is_constructible<value_type, Q>::value, bool>::type
261         push( Q* arr, size_t count )
262         {
263             return push( arr, count, []( value_type& dest, Q const& src ) { new( &dest ) value_type( src ); } );
264         }
265
266         /// Push one element created from \p args
267         /**
268             The function is available only if <tt>std::is_constructible<value_type, Args...>::value</tt>
269             is \p true.
270
271             Returns \p false if the ring is full or \p true otherwise.
272         */
273         template <typename... Args>
274         typename std::enable_if< std::is_constructible<value_type, Args...>::value, bool>::type
275         emplace( Args&&... args )
276         {
277             counter_type back = back_.load( memory_model::memory_order_relaxed );
278
279             assert( static_cast<size_t>( back - pfront_ ) <= capacity() );
280
281             if ( pfront_ + capacity() - back < 1 ) {
282                 pfront_ = front_.load( memory_model::memory_order_acquire );
283
284                 if ( pfront_ + capacity() - back < 1 ) {
285                     // not enough space
286                     return false;
287                 }
288             }
289
290             new( &buffer_[buffer_.mod( back )] ) value_type( std::forward<Args>(args)... );
291
292             back_.store( back + 1, memory_model::memory_order_release );
293
294             return true;
295         }
296
297         /// Enqueues data to the ring using a functor
298         /**
299             \p Func is a functor called to copy a value to the ring element.
300             The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
301             \code
302             cds::container::WeakRingBuffer< Foo > myRing;
303             Bar bar;
304             myRing.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
305             \endcode
306         */
307         template <typename Func>
308         bool enqueue_with( Func f )
309         {
310             counter_type back = back_.load( memory_model::memory_order_relaxed );
311
312             assert( static_cast<size_t>( back - pfront_ ) <= capacity() );
313
314             if ( pfront_ + capacity() - back < 1 ) {
315                 pfront_ = front_.load( memory_model::memory_order_acquire );
316
317                 if ( pfront_ + capacity() - back < 1 ) {
318                     // not enough space
319                     return false;
320                 }
321             }
322
323             f( buffer_[buffer_.mod( back )] );
324
325             back_.store( back + 1, memory_model::memory_order_release );
326
327             return true;
328
329         }
330
331         /// Enqueues \p val value into the queue.
332         /**
333             The new queue item is created by calling placement new in free cell.
334             Returns \p true if success, \p false if the ring is full.
335         */
336         bool enqueue( value_type const& val )
337         {
338             return emplace( val );
339         }
340
341         /// Enqueues \p val value into the queue, move semantics
342         bool enqueue( value_type&& val )
343         {
344             return emplace( std::move( val ));
345         }
346
347         /// Synonym for \p enqueue( value_type const& )
348         bool push( value_type const& val )
349         {
350             return enqueue( val );
351         }
352
353         /// Synonym for \p enqueue( value_type&& )
354         bool push( value_type&& val )
355         {
356             return enqueue( std::move( val ));
357         }
358
359         /// Synonym for \p enqueue_with()
360         template <typename Func>
361         bool push_with( Func f )
362         {
363             return enqueue_with( f );
364         }
365
366         /// Batch pop \p count element from the ring buffer into \p arr
367         /**
368             \p CopyFunc is a per-element copy functor: for each element of \p arr
369             <tt>copy( arr[i], source )</tt> is called.
370             The \p CopyFunc signature:
371             \code
372             void copy_func( Q& dest, value_type& elemen );
373             \endcode
374
375             Returns \p true if success or \p false if not enough space in the ring
376         */
377         template <typename Q, typename CopyFunc>
378         bool pop( Q* arr, size_t count, CopyFunc copy )
379         {
380             assert( count < capacity() );
381
382             counter_type front = front_.load( memory_model::memory_order_relaxed );
383             assert( static_cast<size_t>( cback_ - front ) < capacity() );
384
385             if ( static_cast<size_t>( cback_ - front ) < count ) {
386                 cback_ = back_.load( memory_model::memory_order_acquire );
387                 if ( static_cast<size_t>( cback_ - front ) < count )
388                     return false;
389             }
390
391             // copy data
392             value_cleaner cleaner;
393             for ( size_t i = 0; i < count; ++i, ++front ) {
394                 value_type& val = buffer_[buffer_.mod( front )];
395                 copy( arr[i], val );
396                 cleaner( val );
397             }
398
399             front_.store( front, memory_model::memory_order_release );
400             return true;
401         }
402
403         /// Batch pop - push array \p arr of size \p count with assignment as copy functor
404         /**
405             This function is equivalent for:
406             \code
407             pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
408             \endcode
409
410             The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
411             is \p true.
412
413             Returns \p true if success or \p false if not enough space in the ring
414         */
415         template <typename Q>
416         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
417         pop( Q* arr, size_t count )
418         {
419             return pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
420         }
421
422         /// Dequeues an element from the ring to \p val
423         /**
424             The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
425             is \p true.
426
427             Returns \p false if the ring is full or \p true otherwise.
428         */
429         template <typename Q>
430         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
431         dequeue( Q& val )
432         {
433             return pop( &val, 1 );
434         }
435
436         /// Synonym for \p dequeue( Q& )
437         template <typename Q>
438         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
439         pop( Q& val )
440         {
441             return dequeue( val );
442         }
443
444         /// Dequeues a value using a functor
445         /**
446             \p Func is a functor called to copy dequeued value.
447             The functor takes one argument - a reference to removed node:
448             \code
449             cds:container::WeakRingBuffer< Foo > myRing;
450             Bar bar;
451             myRing.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
452             \endcode
453
454             Returns \p true if the ring is not empty, \p false otherwise.
455             The functor is called only if the ring is not empty.
456         */
457         template <typename Func>
458         bool dequeue_with( Func f )
459         {
460             counter_type front = front_.load( memory_model::memory_order_relaxed );
461             assert( static_cast<size_t>( cback_ - front ) < capacity() );
462
463             if ( cback_ - front < 1 ) {
464                 cback_ = back_.load( memory_model::memory_order_acquire );
465                 if ( cback_ - front < 1 )
466                     return false;
467             }
468
469             value_type& val = buffer_[buffer_.mod( front )];
470             f( val );
471             value_cleaner()( val );
472
473             front_.store( front + 1, memory_model::memory_order_release );
474             return true;
475         }
476
477         /// Synonym for \p dequeue_with()
478         template <typename Func>
479         bool pop_with( Func f )
480         {
481             return dequeue_with( f );
482         }
483
484         /// Gets pointer to first element of ring buffer
485         /**
486             If the ring buffer is empty, returns \p nullptr
487
488             The function is thread-safe since there is only one consumer.
489             Recall, \p WeakRingBuffer is single-producer/single consumer container.
490         */
491         value_type* front()
492         {
493             counter_type front = front_.load( memory_model::memory_order_relaxed );
494             assert( static_cast<size_t>( cback_ - front ) < capacity() );
495
496             if ( cback_ - front < 1 ) {
497                 cback_ = back_.load( memory_model::memory_order_acquire );
498                 if ( cback_ - front < 1 )
499                     return nullptr;
500             }
501
502             return &buffer_[buffer_.mod( front )];
503         }
504
505         /// Removes front element of ring-buffer
506         /**
507             If the ring-buffer is empty, returns \p false.
508             Otherwise, pops the first element from the ring.
509         */
510         bool pop_front()
511         {
512             counter_type front = front_.load( memory_model::memory_order_relaxed );
513             assert( static_cast<size_t>( cback_ - front ) <= capacity() );
514
515             if ( cback_ - front < 1 ) {
516                 cback_ = back_.load( memory_model::memory_order_acquire );
517                 if ( cback_ - front < 1 )
518                     return false;
519             }
520
521             // clean cell
522             value_cleaner()( buffer_[buffer_.mod( front )] );
523
524             front_.store( front + 1, memory_model::memory_order_release );
525             return true;
526         }
527
528         /// Clears the ring buffer (only consumer can call this function!)
529         void clear()
530         {
531             value_type v;
532             while ( pop( v ) );
533         }
534
535         /// Checks if the ring-buffer is empty
536         bool empty() const
537         {
538             return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
539         }
540
541         /// Checks if the ring-buffer is full
542         bool full() const
543         {
544             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
545         }
546
547         /// Returns the current size of ring buffer
548         size_t size() const
549         {
550             return static_cast<size_t>( back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ));
551         }
552
553         /// Returns capacity of the ring buffer
554         size_t capacity() const
555         {
556             return buffer_.capacity();
557         }
558
559     private:
560         //@cond
561         atomics::atomic<counter_type>   front_;
562         typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad1_;
563         atomics::atomic<counter_type>   back_;
564         typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad2_;
565         counter_type                    pfront_;
566         typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad3_;
567         counter_type                    cback_;
568         typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad4_;
569
570         buffer                      buffer_;
571         //@endcond
572     };
573
574
575     /// Single-producer single-consumer ring buffer for untyped variable-sized data
576     /** @ingroup cds_nonintrusive_queue
577         @anchor cds_nonintrusive_WeakRingBuffer_void
578
579         This SPSC ring-buffer is intended for data of variable size. The producer
580         allocates a buffer from ring, you fill it with data and pushes them back to ring.
581         The consumer thread reads data from front-end and then pops them:
582         \code
583         // allocates 1M ring buffer
584         WeakRingBuffer<void>    theRing( 1024 * 1024 );
585
586         void producer_thread()
587         {
588             // Get data of size N bytes
589             size_t size;
590             void*  data;
591
592             while ( true ) {
593                 // Get external data
594                 std::tie( data, size ) = get_data();
595
596                 if ( data == nullptr )
597                     break;
598
599                 // Allocates a buffer from the ring
600                 void* buf = theRing.back( size );
601                 if ( !buf ) {
602                     std::cout << "The ring is full" << std::endl;
603                     break;
604                 }
605
606                 memcpy( buf, data, size );
607
608                 // Push data into the ring
609                 theRing.push_back();
610             }
611         }
612
613         void consumer_thread()
614         {
615             while ( true ) {
616                 auto buf = theRing.front();
617
618                 if ( buf.first == nullptr ) {
619                     std::cout << "The ring is empty" << std::endl;
620                     break;
621                 }
622
623                 // Process data
624                 process_data( buf.first, buf.second );
625
626                 // Free buffer
627                 theRing.pop_front();
628             }
629         }
630         \endcode
631
632         @warning: \p %WeakRingBuffer is developed for 64-bit architecture.
633         32-bit platform must provide support for 64-bit atomics.
634     */
635 #ifdef CDS_DOXYGEN_INVOKED
636     template <typename Traits = weak_ringbuffer::traits>
637 #else
638     template <typename Traits>
639 #endif
640     class WeakRingBuffer<void, Traits>: public cds::bounded_container
641     {
642     public:
643         typedef Traits      traits;         ///< Ring buffer traits
644         typedef typename    traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
645
646     private:
647         //@cond
648         typedef typename traits::buffer::template rebind< uint8_t >::other buffer;
649         typedef uint64_t    counter_type;
650         //@endcond
651
652     public:
653         /// Creates the ring buffer of \p capacity bytes
654         /**
655             For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
656
657             If the buffer capacity is a power of two, lightweight binary arithmetics is used
658             instead of modulo arithmetics.
659         */
660         WeakRingBuffer( size_t capacity = 0 )
661             : front_( 0 )
662             , pfront_( 0 )
663             , cback_( 0 )
664             , buffer_( capacity )
665         {
666             back_.store( 0, memory_model::memory_order_release );
667         }
668
669         /// [producer] Reserve \p size bytes
670         /**
671             The function returns a pointer to reserved buffer of \p size bytes. 
672             If no enough space in the ring buffer the function returns \p nullptr.
673
674             After successful \p %back() you should fill the buffer provided and call \p push_back():
675             \code
676             // allocates 1M ring buffer
677             WeakRingBuffer<void>    theRing( 1024 * 1024 );
678
679             void producer_thread()
680             {
681                 // Get data of size N bytes
682                 size_t size;1
683                 void*  data;
684
685                 while ( true ) {
686                     // Get external data
687                     std::tie( data, size ) = get_data();
688
689                     if ( data == nullptr )
690                         break;
691
692                     // Allocates a buffer from the ring
693                     void* buf = theRing.back( size );
694                     if ( !buf ) {
695                         std::cout << "The ring is full" << std::endl;
696                         break;
697                     }
698
699                     memcpy( buf, data, size );
700
701                     // Push data into the ring
702                     theRing.push_back();
703                 }
704             }
705             \endcode
706         */
707         void* back( size_t size )
708         {
709             assert( size > 0 );
710
711             // Any data is rounded to 8-byte boundary
712             size_t real_size = calc_real_size( size );
713
714             // check if we can reserve read_size bytes
715             assert( real_size < capacity() );
716             counter_type back = back_.load( memory_model::memory_order_relaxed );
717
718             assert( static_cast<size_t>( back - pfront_ ) <= capacity() );
719
720             if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
721                 pfront_ = front_.load( memory_model::memory_order_acquire );
722
723                 if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
724                     // not enough space
725                     return nullptr;
726                 }
727             }
728
729             uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
730
731             // Check if the buffer free space is enough for storing real_size bytes
732             size_t tail_size = capacity() - static_cast<size_t>( buffer_.mod( back ));
733             if ( tail_size < real_size ) {
734                 // make unused tail
735                 assert( tail_size >= sizeof( size_t ) );
736                 assert( !is_tail( tail_size ) );
737
738                 *reinterpret_cast<size_t*>( reserved ) = make_tail( tail_size - sizeof(size_t));
739                 back += tail_size;
740
741                 // We must be in beginning of buffer
742                 assert( buffer_.mod( back ) == 0 );
743
744                 if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
745                     pfront_ = front_.load( memory_model::memory_order_acquire );
746
747                     if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
748                         // not enough space
749                         return nullptr;
750                     }
751                 }
752
753                 back_.store( back, memory_model::memory_order_release );
754                 reserved = buffer_.buffer();
755             }
756
757             // reserve and store size
758             *reinterpret_cast<size_t*>( reserved ) = size;
759
760             return reinterpret_cast<void*>( reserved + sizeof( size_t ) );
761         }
762
763         /// [producer] Push reserved bytes into ring
764         /**
765             The function pushes reserved buffer into the ring. Afte this call,
766             the buffer becomes visible by a consumer:
767             \code
768             // allocates 1M ring buffer
769             WeakRingBuffer<void>    theRing( 1024 * 1024 );
770
771             void producer_thread()
772             {
773                 // Get data of size N bytes
774                 size_t size;1
775                 void*  data;
776
777                 while ( true ) {
778                     // Get external data
779                     std::tie( data, size ) = get_data();
780
781                     if ( data == nullptr )
782                         break;
783
784                     // Allocates a buffer from the ring
785                     void* buf = theRing.back( size );
786                     if ( !buf ) {
787                         std::cout << "The ring is full" << std::endl;
788                         break;
789                     }
790
791                     memcpy( buf, data, size );
792
793                     // Push data into the ring
794                     theRing.push_back();
795                 }
796             }
797             \endcode
798         */
799         void push_back()
800         {
801             counter_type back = back_.load( memory_model::memory_order_relaxed );
802             uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
803
804             size_t real_size = calc_real_size( *reinterpret_cast<size_t*>( reserved ) );
805             assert( real_size < capacity() );
806
807             back_.store( back + real_size, memory_model::memory_order_release );
808         }
809
810         /// [producer] Push \p data of \p size bytes into ring
811         /**
812             This function invokes \p back( size ), \p memcpy( buf, data, size )
813             and \p push_back() in one call.
814         */
815         bool push_back( void const* data, size_t size )
816         {
817             void* buf = back( size );
818             if ( buf ) {
819                 memcpy( buf, data, size );
820                 push_back();
821                 return true;
822             }
823             return false;
824         }
825
826         /// [consumer] Get top data from the ring
827         /**
828             If the ring is empty, the function returns \p nullptr in \p std:pair::first.
829         */
830         std::pair<void*, size_t> front()
831         {
832             counter_type front = front_.load( memory_model::memory_order_relaxed );
833             assert( static_cast<size_t>( cback_ - front ) < capacity() );
834
835             if ( cback_ - front < sizeof( size_t )) {
836                 cback_ = back_.load( memory_model::memory_order_acquire );
837                 if ( cback_ - front < sizeof( size_t ) )
838                     return std::make_pair( nullptr, 0u );
839             }
840
841             uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
842
843             // check alignment
844             assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
845
846             size_t size = *reinterpret_cast<size_t*>( buf );
847             if ( is_tail( size ) ) {
848                 // unused tail, skip
849                 CDS_VERIFY( pop_front() );
850
851                 front = front_.load( memory_model::memory_order_relaxed );
852                 buf = buffer_.buffer() + buffer_.mod( front );
853                 size = *reinterpret_cast<size_t*>( buf );
854
855                 assert( !is_tail( size ) );
856                 assert( buf == buffer_.buffer() );
857             }
858
859 #ifdef _DEBUG
860             size_t real_size = calc_real_size( size );
861             if ( static_cast<size_t>( cback_ - front ) < real_size ) {
862                 cback_ = back_.load( memory_model::memory_order_acquire );
863                 assert( static_cast<size_t>( cback_ - front ) >= real_size );
864             }
865 #endif
866
867             return std::make_pair( reinterpret_cast<void*>( buf + sizeof( size_t )), size );
868         }
869
870         /// [consumer] Pops top data
871         /**
872             Typical consumer workloop:
873             \code
874             // allocates 1M ring buffer
875             WeakRingBuffer<void>    theRing( 1024 * 1024 );
876
877             void consumer_thread()
878             {
879                 while ( true ) {
880                     auto buf = theRing.front();
881
882                     if ( buf.first == nullptr ) {
883                         std::cout << "The ring is empty" << std::endl;
884                         break;
885                     }
886
887                     // Process data
888                     process_data( buf.first, buf.second );
889
890                     // Free buffer
891                     theRing.pop_front();
892                 }
893             }
894             \endcode
895         */
896         bool pop_front()
897         {
898             counter_type front = front_.load( memory_model::memory_order_relaxed );
899             assert( static_cast<size_t>( cback_ - front ) <= capacity() );
900
901             if ( cback_ - front < sizeof(size_t) ) {
902                 cback_ = back_.load( memory_model::memory_order_acquire );
903                 if ( cback_ - front < sizeof( size_t ) )
904                     return false;
905             }
906
907             uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
908
909             // check alignment
910             assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
911
912             size_t size = *reinterpret_cast<size_t*>( buf );
913             size_t real_size = calc_real_size( untail( size ));
914
915 #ifdef _DEBUG
916             if ( static_cast<size_t>( cback_ - front ) < real_size ) {
917                 cback_ = back_.load( memory_model::memory_order_acquire );
918                 assert( static_cast<size_t>( cback_ - front ) >= real_size );
919             }
920 #endif
921
922             front_.store( front + real_size, memory_model::memory_order_release );
923             return true;
924
925         }
926
927         /// [consumer] Clears the ring buffer
928         void clear()
929         {
930             for ( auto el = front(); el.first; el = front() )
931                 pop_front();
932         }
933
934         /// Checks if the ring-buffer is empty
935         bool empty() const
936         {
937             return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
938         }
939
940         /// Checks if the ring-buffer is full
941         bool full() const
942         {
943             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
944         }
945
946         /// Returns the current size of ring buffer
947         size_t size() const
948         {
949             return static_cast<size_t>( back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ));
950         }
951
952         /// Returns capacity of the ring buffer
953         size_t capacity() const
954         {
955             return buffer_.capacity();
956         }
957
958     private:
959         //@cond
960         static size_t calc_real_size( size_t size )
961         {
962             size_t real_size =  (( size + sizeof( uintptr_t ) - 1 ) & ~( sizeof( uintptr_t ) - 1 )) + sizeof( size_t );
963
964             assert( real_size > size );
965             assert( real_size - size >= sizeof( size_t ) );
966
967             return real_size;
968         }
969
970         static bool is_tail( size_t size )
971         {
972             return ( size & ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ))) != 0;
973         }
974
975         static size_t make_tail( size_t size )
976         {
977             return size | ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ));
978         }
979
980         static size_t untail( size_t size )
981         {
982             return size & (( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ) ) - 1);
983         }
984         //@endcond
985
986     private:
987         //@cond
988         atomics::atomic<counter_type>     front_;
989         typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad1_;
990         atomics::atomic<counter_type>     back_;
991         typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad2_;
992         counter_type                      pfront_;
993         typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad3_;
994         counter_type                      cback_;
995         typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad4_;
996
997         buffer                      buffer_;
998         //@endcond
999     };
1000
1001 }} // namespace cds::container
1002
1003
1004 #endif // #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H