Merge pull request #63 from mgalimullin/cmake-update
[libcds.git] / cds / container / rwqueue.h
index 1ecdcf7a5744f0a71fa77948a762a2da91c8535c..85206857ea0349216dfcb8ffa404d7b2a878e7c2 100644 (file)
@@ -1,11 +1,40 @@
-//$$CDS-header$$
-
-#ifndef __CDS_CONTAINER_RWQUEUE_H
-#define __CDS_CONTAINER_RWQUEUE_H
+/*
+    This file is a part of libcds - Concurrent Data Structures library
+
+    (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
+
+    Source code repo: http://github.com/khizmax/libcds/
+    Download: http://sourceforge.net/projects/libcds/files/
+    
+    Redistribution and use in source and binary forms, with or without
+    modification, are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright notice, this
+      list of conditions and the following disclaimer.
+
+    * Redistributions in binary form must reproduce the above copyright notice,
+      this list of conditions and the following disclaimer in the documentation
+      and/or other materials provided with the distribution.
+
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+    AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+    IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+    FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+    SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+    OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
+*/
+
+#ifndef CDSLIB_CONTAINER_RWQUEUE_H
+#define CDSLIB_CONTAINER_RWQUEUE_H
 
 #include <mutex>        // unique_lock
-#include <cds/container/msqueue.h>
 #include <cds/sync/spinlock.h>
+#include <cds/opt/options.h>
+#include <cds/details/allocator.h>
 
 namespace cds { namespace container {
     /// RWQueue related definitions
@@ -24,8 +53,8 @@ namespace cds { namespace container {
             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
             typedef cds::atomicity::empty_item_counter item_counter;
 
-            /// Alignment of internal queue data. Default is \p opt::cache_line_alignment
-            enum { alignment = opt::cache_line_alignment };
+            /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
+            enum { padding = opt::cache_line_padding };
         };
 
         /// Metafunction converting option list to \p rwqueue::traits
@@ -35,7 +64,7 @@ namespace cds { namespace container {
             - opt::allocator - allocator (like \p std::allocator) used for allocating queue nodes. Default is \ref CDS_DEFAULT_ALLOCATOR
             - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
                 To enable item counting use \p cds::atomicity::item_counter.
-            - opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment
+            - \p opt::padding - padding for internal critical data. Default is \p opt::cache_line_padding
 
             Example: declare mutex-based \p %RWQueue with item counting
             \code
@@ -103,7 +132,7 @@ namespace cds { namespace container {
         typedef T       value_type; ///< Type of value to be stored in the queue
         typedef Traits  traits;     ///< Queue traits
 
-        typedef typename traits::lock_type  lock_type;      ///< Locking primitive
+        typedef typename traits::lock_type    lock_type;    ///< Locking primitive
         typedef typename traits::item_counter item_counter; ///< Item counting policy used
 
     protected:
@@ -111,8 +140,8 @@ namespace cds { namespace container {
         /// Node type
         struct node_type
         {
-            node_type * volatile    m_pNext ;   ///< Pointer to the next node in the queue
-            value_type              m_value ;   ///< Value stored in the node
+            atomics::atomic< node_type *> m_pNext;  ///< Pointer to the next node in the queue
+            value_type              m_value;        ///< Value stored in the node
 
             node_type( value_type const& v )
                 : m_pNext( nullptr )
@@ -136,16 +165,19 @@ namespace cds { namespace container {
 
     protected:
         //@cond
-        typedef typename opt::details::alignment_setter< lock_type, traits::alignment >::type aligned_lock_type;
         typedef std::unique_lock<lock_type> scoped_lock;
         typedef cds::details::Allocator< node_type, allocator_type >  node_allocator;
 
-        item_counter    m_ItemCounter;
+        struct head_type {
+            mutable lock_type lock;
+            node_type *       ptr;
+        };
+
+        head_type m_Head;
+        typename opt::details::apply_padding< head_type, traits::padding >::padding_type pad_;
+        head_type m_Tail;
 
-        mutable aligned_lock_type   m_HeadLock;
-        node_type * m_pHead;
-        mutable aligned_lock_type   m_TailLock;
-        node_type * m_pTail;
+        item_counter    m_ItemCounter;
         //@endcond
 
     protected:
@@ -175,9 +207,9 @@ namespace cds { namespace container {
         {
             assert( p != nullptr );
             {
-                scoped_lock lock( m_TailLock );
-                m_pTail =
-                    m_pTail->m_pNext = p;
+                scoped_lock lock( m_Tail.lock );
+                m_Tail.ptr->m_pNext.store( p, atomics::memory_order_release );
+                m_Tail.ptr = p;
             }
             ++m_ItemCounter;
             return true;
@@ -197,16 +229,16 @@ namespace cds { namespace container {
         RWQueue()
         {
             node_type * pNode = alloc_node();
-            m_pHead =
-                m_pTail = pNode;
+            m_Head.ptr =
+                m_Tail.ptr = pNode;
         }
 
         /// Destructor clears queue
         ~RWQueue()
         {
             clear();
-            assert( m_pHead == m_pTail );
-            free_node( m_pHead );
+            assert( m_Head.ptr == m_Tail.ptr );
+            free_node( m_Head.ptr );
         }
 
         /// Enqueues \p data. Always return \a true
@@ -220,6 +252,17 @@ namespace cds { namespace container {
             return false;
         }
 
+        /// Enqueues \p data, move semantics
+        bool enqueue( value_type&& data )
+        {
+            scoped_node_ptr p( alloc_node_move( std::move( data )));
+            if ( enqueue_node( p.get() ) ) {
+                p.release();
+                return true;
+            }
+            return false;
+        }
+
         /// Enqueues \p data to the queue using a functor
         /**
             \p Func is a functor called to create node.
@@ -254,12 +297,18 @@ namespace cds { namespace container {
             return false;
         }
 
-        /// Synonym for \p enqueue() function
+        /// Synonym for \p enqueue( value_type const& ) function
         bool push( value_type const& val )
         {
             return enqueue( val );
         }
 
+        /// Synonym for \p enqueue( value_type&& ) function
+        bool push( value_type&& val )
+        {
+            return enqueue( std::move( val ));
+        }
+
         /// Synonym for \p enqueue_with() function
         template <typename Func>
         bool push_with( Func f )
@@ -274,7 +323,7 @@ namespace cds { namespace container {
         */
         bool dequeue( value_type& dest )
         {
-            return dequeue_with( [&dest]( value_type& src ) { dest = src; } );
+            return dequeue_with( [&dest]( value_type& src ) { dest = std::move( src ); });
         }
 
         /// Dequeues a value using a functor
@@ -293,13 +342,13 @@ namespace cds { namespace container {
         {
             node_type * pNode;
             {
-                scoped_lock lock( m_HeadLock );
-                pNode = m_pHead;
-                node_type * pNewHead = pNode->m_pNext;
+                scoped_lock lock( m_Head.lock );
+                pNode = m_Head.ptr;
+                node_type * pNewHead = pNode->m_pNext.load( atomics::memory_order_acquire );
                 if ( pNewHead == nullptr )
                     return false;
                 f( pNewHead->m_value );
-                m_pHead = pNewHead;
+                m_Head.ptr = pNewHead;
             }    // unlock here
             --m_ItemCounter;
             free_node( pNode );
@@ -322,20 +371,21 @@ namespace cds { namespace container {
         /// Checks if queue is empty
         bool empty() const
         {
-            scoped_lock lock( m_HeadLock );
-            return m_pHead->m_pNext == nullptr;
+            scoped_lock lock( m_Head.lock );
+            return m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) == nullptr;
         }
 
         /// Clears queue
         void clear()
         {
-            scoped_lock lockR( m_HeadLock );
-            scoped_lock lockW( m_TailLock );
-            while ( m_pHead->m_pNext != nullptr ) {
-                node_type * pHead = m_pHead;
-                m_pHead = m_pHead->m_pNext;
+            scoped_lock lockR( m_Head.lock );
+            scoped_lock lockW( m_Tail.lock );
+            while ( m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) != nullptr ) {
+                node_type * pHead = m_Head.ptr;
+                m_Head.ptr = m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed );
                 free_node( pHead );
             }
+            m_ItemCounter.reset();
         }
 
         /// Returns queue's item count
@@ -352,14 +402,14 @@ namespace cds { namespace container {
         }
 
         //@cond
-        /// Returns reference to internal statistics
-        cds::container::msqueue::empty_stat statistics() const
+        /// The class has no internal statistics. For test consistency only
+        std::nullptr_t statistics() const
         {
-            return cds::container::msqueue::empty_stat();
+            return nullptr;
         }
         //@endcond
     };
 
 }}  // namespace cds::container
 
-#endif // #ifndef __CDS_CONTAINER_RWQUEUE_H
+#endif // #ifndef CDSLIB_CONTAINER_RWQUEUE_H