Added WeakRingBuffer - a single-producer/single-consumer queue based on ring buffer
[libcds.git] / cds / container / weak_ringbuffer.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
32 #define CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
33
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
39
40 namespace cds { namespace container {
41
42     /// \p WeakRingBuffer related definitions
43     /** @ingroup cds_nonintrusive_helper
44     */
45     namespace weak_ringbuffer {
46
47         /// \p WeakRingBuffer default traits
48         struct traits {
49             /// Buffer type for internal array
50             /*
51                 The type of element for the buffer is not important: \p WeakRingBuffer rebind
52                 the buffer for required type via \p rebind metafunction.
53
54                 For \p WeakRingBuffer the buffer size should have power-of-2 size.
55
56                 You should use only uninitialized buffer for the ring buffer -
57                 \p cds::opt::v::uninitialized_dynamic_buffer (the default),
58                 \p cds::opt::v::uninitialized_static_buffer.
59             */
60             typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
61
62             /// A functor to clean item dequeued.
63             /**
64                 The functor calls the destructor for popped element.
65                 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
66                 If \p T is a complex type, \p value_cleaner may be useful feature.
67                 For POD types \ref opt::v::empty_cleaner is suitable
68
69                 Default value is \ref opt::v::auto_cleaner that calls destructor only if it is not trivial.
70             */
71             typedef cds::opt::v::auto_cleaner value_cleaner;
72
73             /// C++ memory ordering model
74             /**
75                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
76                 or \p opt::v::sequential_consistent (sequentially consistent memory model).
77             */
78             typedef opt::v::relaxed_ordering    memory_model;
79
80             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
81             enum { padding = opt::cache_line_padding };
82         };
83
84         /// Metafunction converting option list to \p weak_ringbuffer::traits
85         /**
86             Supported \p Options are:
87             - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
88                 \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
89                 element in the buffer is not important: it will be changed via \p rebind metafunction.
90             - \p opt::value_cleaner - a functor to clean items dequeued.
91                 The functor calls the destructor for ring-buffer item.
92                 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
93                 If \p T is a complex type, \p value_cleaner can be an useful feature.
94                 Default value is \ref opt::v::empty_cleaner that is suitable for POD types.
95             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
96             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
97                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
98
99             Example: declare \p %WeakRingBuffer with static iternal buffer of size 1024:
100             \code
101             typedef cds::container::WeakRingBuffer< Foo,
102                 typename cds::container::weak_ringbuffer::make_traits<
103                     cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >
104                 >::type
105             > myRing;
106             \endcode
107         */
108         template <typename... Options>
109         struct make_traits {
110 #   ifdef CDS_DOXYGEN_INVOKED
111             typedef implementation_defined type;   ///< Metafunction result
112 #   else
113             typedef typename cds::opt::make_options<
114                 typename cds::opt::find_type_traits< traits, Options... >::type
115                 , Options...
116             >::type type;
117 #   endif
118         };
119
120     } // namespace weak_ringbuffer
121
122     /// Single-producer single-consumer ring buffer
123     /** @ingroup cds_nonintrusive_queue
124         Source: [2013] Nhat Minh Le, Adrien Guatto, Albert Cohen, Antoniu Pop. Correct and Effcient Bounded
125             FIFO Queues. [Research Report] RR-8365, INRIA. 2013. <hal-00862450>
126
127         Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
128         you can push/pop an array of elements.
129     */
130     template <typename T, typename Traits = weak_ringbuffer::traits>
131     class WeakRingBuffer: public cds::bounded_container
132     {
133     public:
134         typedef T value_type;   ///< Value type to be stored in the ring buffer
135         typedef Traits traits;  ///< Ring buffer traits
136         typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
137         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p weak_ringbuffer::traits::value_cleaner
138
139         /// Rebind template arguments
140         template <typename T2, typename Traits2>
141         struct rebind {
142             typedef WeakRingBuffer< T2, Traits2 > other;   ///< Rebinding result
143         };
144
145         //@cond
146         // Only for tests
147         typedef size_t item_counter;
148         //@endcond
149
150     private:
151         //@cond
152         typedef typename traits::buffer::template rebind< value_type >::other buffer;
153         //@endcond
154
155     public:
156
157         /// Creates the ring buffer of \p capacity
158         /**
159             For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
160
161             If the buffer capacity is a power of two, lightweight binary arithmetics is used
162             instead of modulo arithmetics.
163         */
164         WeakRingBuffer( size_t capacity = 0 )
165             : front_( 0 )
166             , pfront_( 0 )
167             , cback_( 0 )
168             , buffer_( capacity )
169         {
170             back_.store( 0, memory_model::memory_order_release );
171         }
172
173         /// Destroys the ring buffer
174         ~WeakRingBuffer()
175         {
176             value_cleaner cleaner;
177             size_t back = back_.load( memory_model::memory_order_relaxed );
178             for ( size_t front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
179                 cleaner( buffer_[ buffer_.mod( front ) ] );
180         }
181
182         /// Batch push - push array \p arr of size \p count
183         /**
184             \p CopyFunc is a per-element copy functor: for each element of \p arr
185             <tt>copy( dest, arr[i] )</tt> is called.
186             The \p CopyFunc signature:
187             \code
188                 void copy_func( value_type& element, Q const& source );
189             \endcode
190             Here \p element is uninitialized so you should construct it using placement new
191             if needed; for example, if the element type is \p str::string and \p Q is <tt>char const*</tt>,
192             \p copy functor can be:
193             \code
194             cds::container::WeakRingBuffer<std::string> ringbuf;
195             char const* arr[10];
196             ringbuf.push( arr, 10, 
197                 []( std::string& element, char const* src ) {
198                     new( &element ) std::string( src );
199                 });
200             \endcode
201             You may use move semantics if appropriate:
202             \code
203             cds::container::WeakRingBuffer<std::string> ringbuf;
204             std::string arr[10];
205             ringbuf.push( arr, 10,
206                 []( std::string& element, std:string& src ) {
207                     new( &element ) std::string( std::move( src ));
208                 });
209             \endcode
210
211             Returns \p true if success or \p false if not enought sufficient space in the ring
212         */
213         template <typename Q, typename CopyFunc>
214         bool push( Q* arr, size_t count, CopyFunc copy )
215         {
216             assert( count < capacity() );
217             size_t back = back_.load( memory_model::memory_order_relaxed );
218
219             assert( back - pfront_ <= capacity() );
220
221             if ( pfront_ + capacity() - back < count ) {
222                 pfront_ = front_.load( memory_model::memory_order_acquire );
223
224                 if ( pfront_ + capacity() - back < count ) {
225                     // not enought space
226                     return false;
227                 }
228             }
229
230             // copy data
231             for ( size_t i = 0; i < count; ++i, ++back )
232                 copy( buffer_[buffer_.mod( back )], arr[i] );
233
234             back_.store( back, memory_model::memory_order_release );
235
236             return true;
237         }
238
239         /// Batch push - push array \p arr of size \p count with assignment as copy functor
240         /**
241             This function is equivalent for:
242             \code
243             push( arr, count, []( value_type& dest, Q const& src ) { dest = src; } );
244             \endcode
245
246             The function is available only if <tt>std::is_constructible<value_type, Q>::value</tt>
247             is \p true.
248
249             Returns \p true if success or \p false if not enought sufficient space in the ring
250         */
251         template <typename Q>
252         typename std::enable_if< std::is_constructible<value_type, Q>::value, bool>::type
253         push( Q* arr, size_t count )
254         {
255             return push( arr, count, []( value_type& dest, Q const& src ) { new( &dest ) value_type( src ); } );
256         }
257
258         /// Push one element created from \p args
259         /**
260             The function is available only if <tt>std::is_constructible<value_type, Args...>::value</tt>
261             is \p true.
262
263             Returns \p false if the ring is full or \p true otherwise.
264         */
265         template <typename... Args>
266         typename std::enable_if< std::is_constructible<value_type, Args...>::value, bool>::type
267         emplace( Args&&... args )
268         {
269             size_t back = back_.load( memory_model::memory_order_relaxed );
270
271             assert( back - pfront_ <= capacity() );
272
273             if ( pfront_ + capacity() - back < 1 ) {
274                 pfront_ = front_.load( memory_model::memory_order_acquire );
275
276                 if ( pfront_ + capacity() - back < 1 ) {
277                     // not enought space
278                     return false;
279                 }
280             }
281
282             new( &buffer_[buffer_.mod( back )] ) value_type( std::forward<Args>(args)... );
283
284             back_.store( back + 1, memory_model::memory_order_release );
285
286             return true;
287         }
288
289         /// Enqueues data to the ring using a functor
290         /**
291             \p Func is a functor called to copy a value to the ring element.
292             The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
293             \code
294             cds::container::WeakRingBuffer< Foo > myRing;
295             Bar bar;
296             myRing.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
297             \endcode
298         */
299         template <typename Func>
300         bool enqueue_with( Func f )
301         {
302             size_t back = back_.load( memory_model::memory_order_relaxed );
303
304             assert( back - pfront_ <= capacity() );
305
306             if ( pfront_ + capacity() - back < 1 ) {
307                 pfront_ = front_.load( memory_model::memory_order_acquire );
308
309                 if ( pfront_ + capacity() - back < 1 ) {
310                     // not enought space
311                     return false;
312                 }
313             }
314
315             f( buffer_[buffer_.mod( back )] );
316
317             back_.store( back + 1, memory_model::memory_order_release );
318
319             return true;
320
321         }
322
323         /// Enqueues \p val value into the queue.
324         /**
325             The new queue item is created by calling placement new in free cell.
326             Returns \p true if success, \p false if the ring is full.
327         */
328         bool enqueue( value_type const& val )
329         {
330             return emplace( val );
331         }
332
333         /// Enqueues \p val value into the queue, move semantics
334         bool enqueue( value_type&& val )
335         {
336             return emplace( std::move( val ));
337         }
338
339         /// Synonym for \p enqueue( value_type const& )
340         bool push( value_type const& val )
341         {
342             return enqueue( val );
343         }
344
345         /// Synonym for \p enqueue( value_type&& )
346         bool push( value_type&& val )
347         {
348             return enqueue( std::move( val ));
349         }
350
351         /// Synonym for \p enqueue_with()
352         template <typename Func>
353         bool push_with( Func f )
354         {
355             return enqueue_with( f );
356         }
357
358         /// Batch pop \p count element from the ring buffer into \p arr
359         /**
360             \p CopyFunc is a per-element copy functor: for each element of \p arr
361             <tt>copy( arr[i], source )</tt> is called.
362             The \p CopyFunc signature:
363             \code
364             void copy_func( Q& dest, value_type& elemen );
365             \endcode
366
367             Returns \p true if success or \p false if not enought sufficient space in the ring
368         */
369         template <typename Q, typename CopyFunc>
370         bool pop( Q* arr, size_t count, CopyFunc copy )
371         {
372             assert( count < capacity() );
373
374             size_t front = front_.load( memory_model::memory_order_relaxed );
375             assert( cback_ - front < capacity() );
376
377             if ( cback_ - front < count ) {
378                 cback_ = back_.load( memory_model::memory_order_acquire );
379                 if ( cback_ - front < count )
380                     return false;
381             }
382
383             // copy data
384             value_cleaner cleaner;
385             for ( size_t i = 0; i < count; ++i, ++front ) {
386                 value_type& val = buffer_[buffer_.mod( front )];
387                 copy( arr[i], val );
388                 cleaner( val );
389             }
390
391             front_.store( front, memory_model::memory_order_release );
392             return true;
393         }
394
395         /// Batch pop - push array \p arr of size \p count with assignment as copy functor
396         /**
397             This function is equivalent for:
398             \code
399             pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
400             \endcode
401
402             The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
403             is \p true.
404
405             Returns \p true if success or \p false if not enought sufficient space in the ring
406         */
407         template <typename Q>
408         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
409         pop( Q* arr, size_t count )
410         {
411             return pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
412         }
413
414         /// Dequeues an element from the ring to \p val
415         /**
416             The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
417             is \p true.
418
419             Returns \p false if the ring is full or \p true otherwise.
420         */
421         template <typename Q>
422         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
423         dequeue( Q& val )
424         {
425             return pop( &val, 1 );
426         }
427
428         /// Synonym for \p dequeue( Q& )
429         template <typename Q>
430         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
431         pop( Q& val )
432         {
433             return dequeue( val );
434         }
435
436         /// Dequeues a value using a functor
437         /**
438             \p Func is a functor called to copy dequeued value.
439             The functor takes one argument - a reference to removed node:
440             \code
441             cds:container::WeakRingBuffer< Foo > myRing;
442             Bar bar;
443             myRing.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
444             \endcode
445
446             Returns \p true if the ring is not empty, \p false otherwise.
447             The functor is called only if the ring is not empty.
448         */
449         template <typename Func>
450         bool dequeue_with( Func f )
451         {
452             size_t front = front_.load( memory_model::memory_order_relaxed );
453             assert( cback_ - front < capacity() );
454
455             if ( cback_ - front < 1 ) {
456                 cback_ = back_.load( memory_model::memory_order_acquire );
457                 if ( cback_ - front < 1 )
458                     return false;
459             }
460
461             value_type& val = buffer_[buffer_.mod( front )];
462             f( val );
463             value_cleaner()( val );
464
465             front_.store( front + 1, memory_model::memory_order_release );
466             return true;
467         }
468
469         /// Synonym for \p dequeue_with()
470         template <typename Func>
471         bool pop_with( Func f )
472         {
473             return dequeue_with( f );
474         }
475
476         /// Gets pointer to first element of ring buffer
477         /**
478             If the ring buffer is empty, returns \p nullptr
479
480             The function is thread-safe since there is only one consumer.
481             Recall, \p WeakRingBuffer is single-producer/single consumer container.
482         */
483         value_type* front()
484         {
485             size_t front = front_.load( memory_model::memory_order_relaxed );
486             assert( cback_ - front < capacity() );
487
488             if ( cback_ - front < 1 ) {
489                 cback_ = back_.load( memory_model::memory_order_acquire );
490                 if ( cback_ - front < 1 )
491                     return nullptr;
492             }
493
494             return &buffer_[buffer_.mod( front )];
495         }
496
497         /// Removes front element of ring-buffer
498         /**
499             If the ring-buffer is empty, returns \p false.
500             Otherwise, pops the first element from the ring.
501         */
502         bool pop_front()
503         {
504             size_t front = front_.load( memory_model::memory_order_relaxed );
505             assert( cback_ - front <= capacity() );
506
507             if ( cback_ - front < 1 ) {
508                 cback_ = back_.load( memory_model::memory_order_acquire );
509                 if ( cback_ - front < 1 )
510                     return false;
511             }
512
513             // clean cell
514             value_cleaner()( buffer_[buffer_.mod( front )] );
515
516             front_.store( front + 1, memory_model::memory_order_release );
517             return true;
518         }
519
520         /// Clears the ring buffer
521         void clear()
522         {
523             value_type v;
524             while ( pop( v ) );
525         }
526
527
528         /// Checks if the ring-buffer is empty
529         bool empty() const
530         {
531             return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
532         }
533
534         /// Checks if the ring-buffer is full
535         bool full() const
536         {
537             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
538         }
539
540         /// Returns the current size of ring buffer
541         size_t size() const
542         {
543             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
544         }
545
546         /// Returns capacity of the ring buffer
547         size_t capacity() const
548         {
549             return buffer_.capacity();
550         }
551
552     private:
553         //@cond
554         atomics::atomic<size_t>     front_;
555         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
556         atomics::atomic<size_t>     back_;
557         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
558         size_t                      pfront_;
559         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
560         size_t                      cback_;
561         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
562
563         buffer                      buffer_;
564         //@endcond
565     };
566
567 }} // namespace cds::container
568
569
570 #endif // #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H