Docfix, minor changes
[libcds.git] / cds / container / vyukov_mpmc_cycle_queue.h
index 6bde66419f41b6cb6021c1615590e43ff1abe9ad..50b787c819d90879e5cc93488d020e966f7adc6e 100644 (file)
@@ -1,4 +1,32 @@
-//$$CDS-header$$
+/*
+    This file is a part of libcds - Concurrent Data Structures library
+
+    (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
+
+    Source code repo: http://github.com/khizmax/libcds/
+    Download: http://sourceforge.net/projects/libcds/files/
+    
+    Redistribution and use in source and binary forms, with or without
+    modification, are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright notice, this
+      list of conditions and the following disclaimer.
+
+    * Redistributions in binary form must reproduce the above copyright notice,
+      this list of conditions and the following disclaimer in the documentation
+      and/or other materials provided with the distribution.
+
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+    AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+    IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+    FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+    SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+    OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
+*/
 
 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
@@ -21,17 +49,21 @@ namespace cds { namespace container {
             /// Buffer type for internal array
             /*
                 The type of element for the buffer is not important: the queue rebinds
-                buffer for required type via \p rebind metafunction.
+                the buffer for required type via \p rebind metafunction.
 
                 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
+
+                You should use only uninitialized buffer for the queue -
+                \p cds::opt::v::uninitialized_dynamic_buffer (the default),
+                \p cds::opt::v::uninitialized_static_buffer.
             */
-            typedef cds::opt::v::dynamic_buffer< void * > buffer;
+            typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
 
             /// A functor to clean item dequeued.
             /**
-                The functor  calls the destructor for queue item.
+                The functor calls the destructor for queue item.
                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
-                If \p T is a complex type, \p value_cleaner may be the useful feature.
+                If \p T is a complex type, \p value_cleaner may be useful feature.
 
                 Default value is \ref opt::v::destruct_cleaner
             */
@@ -43,25 +75,38 @@ namespace cds { namespace container {
             /// C++ memory ordering model
             /**
                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
-                or \p opt::v::sequential_consistent (sequentially consisnent memory model).
+                or \p opt::v::sequential_consistent (sequentially consistent memory model).
             */
             typedef opt::v::relaxed_ordering    memory_model;
 
             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
             enum { padding = opt::cache_line_padding };
+
+            /// Back-off strategy
+            typedef cds::backoff::Default           back_off;
+
+            /// Single-consumer version
+            /**
+                For single-consumer version of algorithm some additional functions
+                (\p front(), \p pop_front()) is available.
+
+                Default is \p false
+            */
+            static CDS_CONSTEXPR bool const single_consumer = false;
         };
 
         /// Metafunction converting option list to \p vyukov_queue::traits
         /**
             Supported \p Options are:
-            - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
-                \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
+            - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
+                \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
                 element in the buffer is not important: it will be changed via \p rebind metafunction.
             - \p opt::value_cleaner - a functor to clean item dequeued.
                 The functor calls the destructor for queue item.
                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
                 If \p T is a complex type, \p value_cleaner can be an useful feature.
                 Default value is \ref opt::v::destruct_cleaner
+            - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
                 To enable item counting use \p cds::atomicity::item_counter
             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
@@ -72,7 +117,7 @@ namespace cds { namespace container {
             \code
             typedef cds::container::VyukovMPMCCycleQueue< Foo,
                 typename cds::container::vyukov_queue::make_traits<
-                    cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
+                    cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >,
                     cds::opt::item_counte< cds::atomicity::item_counter >
                 >::type
             > myQueue;
@@ -103,22 +148,25 @@ namespace cds { namespace container {
         No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
         i.e. do not touch the same data while queue is not empty.
 
+        There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue
+        that supports \p front() and \p pop_front() functions.
+
         Source:
             - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
 
         Template parameters
         - \p T - type stored in queue.
-        - \p Traits - queue traits, default is \p vykov_queue::traits. You can use \p vykov_queue::make_traits
-            metafunction to make your traits or just derive your traits from \p %vykov_queue::traits:
+        - \p Traits - queue traits, default is \p vyukov_queue::traits. You can use \p vyukov_queue::make_traits
+            metafunction to make your traits or just derive your traits from \p %vyukov_queue::traits:
             \code
-            struct myTraits: public cds::container::vykov_queue::traits {
+            struct myTraits: public cds::container::vyukov_queue::traits {
                 typedef cds::atomicity::item_counter    item_counter;
             };
             typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
 
             // Equivalent make_traits example:
             typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
-                typename cds::container::vykov_queue::make_traits<
+                typename cds::container::vyukov_queue::make_traits<
                     cds::opt::item_counter< cds::atomicity::item_counter >
                 >::type
             > myQueue;
@@ -134,8 +182,12 @@ namespace cds { namespace container {
         typedef T value_type;   ///< Value type to be stored in the queue
         typedef Traits traits;  ///< Queue traits
         typedef typename traits::item_counter  item_counter;  ///< Item counter type
-        typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See cds::opt::memory_model option
+        typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
+        typedef typename traits::back_off  back_off;          ///< back-off strategy
+
+        /// \p true for single-consumer version, \p false otherwise
+        static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer;
 
         /// Rebind template arguments
         template <typename T2, typename Traits2>
@@ -161,8 +213,8 @@ namespace cds { namespace container {
     protected:
         //@cond
         buffer          m_buffer;
-        typename opt::details::apply_padding< buffer, traits::padding >::padding_type pad1_;
         size_t const    m_nBufferMask;
+        typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_;
         sequence_type   m_posEnqueue;
         typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
         sequence_type   m_posDequeue;
@@ -173,7 +225,7 @@ namespace cds { namespace container {
     public:
         /// Constructs the queue of capacity \p nCapacity
         /**
-            For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
+            For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
 
             The buffer capacity must be the power of two.
         */
@@ -214,8 +266,9 @@ namespace cds { namespace container {
         bool enqueue_with(Func f)
         {
             cell_type* cell;
-            size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
+            back_off bkoff;
 
+            size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
             for (;;)
             {
                 cell = &m_buffer[pos & m_nBufferMask];
@@ -227,8 +280,13 @@ namespace cds { namespace container {
                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
                         break;
                 }
-                else if (dif < 0)
-                    return false;
+                else if (dif < 0) {
+                    // Queue full?
+                    if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity() )
+                        return false;   // queue full
+                    bkoff();
+                    pos = m_posEnqueue.load( memory_model::memory_order_relaxed );
+                }
                 else
                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
             }
@@ -251,12 +309,24 @@ namespace cds { namespace container {
             return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
         }
 
-        /// Synonym for \p enqueue()
+        /// Enqueues \p val value into the queue, move semantics
+        bool enqueue( value_type&& val )
+        {
+            return enqueue_with( [&val]( value_type& dest ) { new (&dest) value_type( std::move( val ));});
+        }
+
+        /// Synonym for \p enqueue( valuetype const& )
         bool push( value_type const& data )
         {
             return enqueue( data );
         }
 
+        /// Synonym for \p enqueue( value_type&& )
+        bool push( value_type&& data )
+        {
+            return enqueue( std::move( data ));
+        }
+
         /// Synonym for \p enqueue_with()
         template <typename Func>
         bool push_with( Func f )
@@ -267,13 +337,14 @@ namespace cds { namespace container {
         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
         template <typename... Args>
         bool emplace( Args&&... args )
-        {   
+        {
 #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
-            //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda. 
-            return enqueue_with ( std::bind([]( value_type& dest,Args ... args ){ new ( &dest ) value_type( std::forward<Args>(args)... );}, std::placeholders::_1 ,args...));        
-#else            
-            return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>(args)... ); });            
-#endif        
+            //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda.
+            value_type val( std::forward<Args>(args)... );
+            return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( std::move( val )); });
+#else
+            return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>( args )... ); });
+#endif
         }
 
         /// Dequeues a value using a functor
@@ -291,8 +362,9 @@ namespace cds { namespace container {
         bool dequeue_with( Func f )
         {
             cell_type * cell;
-            size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
+            back_off bkoff;
 
+            size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
             for (;;)
             {
                 cell = &m_buffer[pos & m_nBufferMask];
@@ -303,8 +375,13 @@ namespace cds { namespace container {
                     if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
                         break;
                 }
-                else if (dif < 0)
-                    return false;
+                else if (dif < 0) {
+                    // Queue empty?
+                    if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
+                        return false;   // queue empty
+                    bkoff();
+                    pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+                }
                 else
                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
             }
@@ -319,13 +396,13 @@ namespace cds { namespace container {
 
         /// Dequeues a value from the queue
         /**
-            If queue is not empty, the function returns \p true, \p dest contains copy of
+            If queue is not empty, the function returns \p true, \p dest contains copy of
             dequeued value. The assignment operator for type \ref value_type is invoked.
             If queue is empty, the function returns \p false, \p dest is unchanged.
         */
-        bool dequeue(value_type & dest )
+        bool dequeue(value_type& dest )
         {
-            return dequeue_with( [&dest]( value_type& src ){ dest = src; } );
+            return dequeue_with( [&dest]( value_type& src ){ dest = std::move( src );});
         }
 
         /// Synonym for \p dequeue()
@@ -341,12 +418,50 @@ namespace cds { namespace container {
             return dequeue_with( f );
         }
 
+        /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version)
+        template <bool SC = c_single_consumer >
+        typename std::enable_if<SC, value_type *>::type front()
+        {
+            static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true");
+
+            cell_type * cell;
+            back_off bkoff;
+
+            size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+            for ( ;;)
+            {
+                cell = &m_buffer[pos & m_nBufferMask];
+                size_t seq = cell->sequence.load( memory_model::memory_order_acquire );
+                intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
+
+                if ( dif == 0 )
+                    return &cell->data;
+                else if ( dif < 0 ) {
+                    // Queue empty?
+                    if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
+                        return nullptr;   // queue empty
+                    bkoff();
+                    pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+                }
+                else
+                    pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+            }
+        }
+
+        /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version)
+        template <bool SC = c_single_consumer >
+        typename std::enable_if<SC, bool>::type pop_front()
+        {
+            return dequeue_with( []( value_type& ) {} );
+        }
+
         /// Checks if the queue is empty
         bool empty() const
         {
             const cell_type * cell;
-            size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
+            back_off bkoff;
 
+            size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
             for (;;)
             {
                 cell = &m_buffer[pos & m_nBufferMask];
@@ -355,10 +470,12 @@ namespace cds { namespace container {
 
                 if (dif == 0)
                     return false;
-                else if (dif < 0)
-                    return true;
-                else
-                    pos = m_posDequeue.load(memory_model::memory_order_relaxed);
+                else if (dif < 0) {
+                    if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
+                        return true;
+                }
+                bkoff();
+                pos = m_posDequeue.load(memory_model::memory_order_relaxed);
             }
         }
 
@@ -385,6 +502,22 @@ namespace cds { namespace container {
             return m_buffer.capacity();
         }
     };
+
+    //@cond
+    namespace vyukov_queue {
+
+        template <typename Traits>
+        struct single_consumer_traits : public Traits
+        {
+            static CDS_CONSTEXPR bool const single_consumer = true;
+        };
+    } // namespace vyukov_queue
+    //@endcond
+
+    /// Vyukov's queue multiple producer - single consumer version
+    template <typename T, typename Traits = vyukov_queue::traits >
+    using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits<Traits> >;
+
 }}  // namespace cds::container
 
 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H