RWQueue refactoring for eliminating data race (TSan)
authorkhizmax <libcds.dev@gmail.com>
Sat, 11 Jul 2015 11:09:21 +0000 (14:09 +0300)
committerkhizmax <libcds.dev@gmail.com>
Sat, 11 Jul 2015 11:09:21 +0000 (14:09 +0300)
cds/container/rwqueue.h
tests/test-hdr/queue/hdr_queue.h
tests/test-hdr/queue/hdr_rwqueue.cpp
tests/unit/queue/queue_type.h

index c65242170a65a0936ae568f23345eff9494ccd9e..c25d18e42051c37c793b148ae908bde0177ae7be 100644 (file)
@@ -4,8 +4,9 @@
 #define CDSLIB_CONTAINER_RWQUEUE_H
 
 #include <mutex>        // unique_lock
-#include <cds/container/msqueue.h>
 #include <cds/sync/spinlock.h>
+#include <cds/opt/options.h>
+#include <cds/details/allocator.h>
 
 namespace cds { namespace container {
     /// RWQueue related definitions
@@ -24,8 +25,8 @@ namespace cds { namespace container {
             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
             typedef cds::atomicity::empty_item_counter item_counter;
 
-            /// Alignment of internal queue data. Default is \p opt::cache_line_alignment
-            enum { alignment = opt::cache_line_alignment };
+            /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
+            enum { padding = opt::cache_line_padding };
         };
 
         /// Metafunction converting option list to \p rwqueue::traits
@@ -35,7 +36,7 @@ namespace cds { namespace container {
             - opt::allocator - allocator (like \p std::allocator) used for allocating queue nodes. Default is \ref CDS_DEFAULT_ALLOCATOR
             - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
                 To enable item counting use \p cds::atomicity::item_counter.
-            - opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment
+            - \p opt::padding - padding for internal critical data. Default is \p opt::cache_line_padding
 
             Example: declare mutex-based \p %RWQueue with item counting
             \code
@@ -111,8 +112,8 @@ namespace cds { namespace container {
         /// Node type
         struct node_type
         {
-            node_type * volatile    m_pNext ;   ///< Pointer to the next node in the queue
-            value_type              m_value ;   ///< Value stored in the node
+            atomics::atomic< node_type *> m_pNext;  ///< Pointer to the next node in the queue
+            value_type              m_value;        ///< Value stored in the node
 
             node_type( value_type const& v )
                 : m_pNext( nullptr )
@@ -136,16 +137,19 @@ namespace cds { namespace container {
 
     protected:
         //@cond
-        typedef typename opt::details::alignment_setter< lock_type, traits::alignment >::type aligned_lock_type;
         typedef std::unique_lock<lock_type> scoped_lock;
         typedef cds::details::Allocator< node_type, allocator_type >  node_allocator;
 
-        item_counter    m_ItemCounter;
+        struct head_type {
+            mutable lock_type lock;
+            node_type *       ptr;
+        };
 
-        mutable aligned_lock_type   m_HeadLock;
-        node_type * m_pHead;
-        mutable aligned_lock_type   m_TailLock;
-        node_type * m_pTail;
+        head_type m_Head;
+        typename opt::details::apply_padding< head_type, traits::padding >::padding_type pad_;
+        head_type m_Tail;
+
+        item_counter    m_ItemCounter;
         //@endcond
 
     protected:
@@ -175,9 +179,9 @@ namespace cds { namespace container {
         {
             assert( p != nullptr );
             {
-                scoped_lock lock( m_TailLock );
-                m_pTail =
-                    m_pTail->m_pNext = p;
+                scoped_lock lock( m_Tail.lock );
+                m_Tail.ptr->m_pNext.store( p, atomics::memory_order_release );
+                m_Tail.ptr = p;
             }
             ++m_ItemCounter;
             return true;
@@ -197,16 +201,16 @@ namespace cds { namespace container {
         RWQueue()
         {
             node_type * pNode = alloc_node();
-            m_pHead =
-                m_pTail = pNode;
+            m_Head.ptr =
+                m_Tail.ptr = pNode;
         }
 
         /// Destructor clears queue
         ~RWQueue()
         {
             clear();
-            assert( m_pHead == m_pTail );
-            free_node( m_pHead );
+            assert( m_Head.ptr == m_Tail.ptr );
+            free_node( m_Head.ptr );
         }
 
         /// Enqueues \p data. Always return \a true
@@ -293,13 +297,13 @@ namespace cds { namespace container {
         {
             node_type * pNode;
             {
-                scoped_lock lock( m_HeadLock );
-                pNode = m_pHead;
-                node_type * pNewHead = pNode->m_pNext;
+                scoped_lock lock( m_Head.lock );
+                pNode = m_Head.ptr;
+                node_type * pNewHead = pNode->m_pNext.load( atomics::memory_order_acquire );
                 if ( pNewHead == nullptr )
                     return false;
                 f( pNewHead->m_value );
-                m_pHead = pNewHead;
+                m_Head.ptr = pNewHead;
             }    // unlock here
             --m_ItemCounter;
             free_node( pNode );
@@ -322,18 +326,18 @@ namespace cds { namespace container {
         /// Checks if queue is empty
         bool empty() const
         {
-            scoped_lock lock( m_HeadLock );
-            return m_pHead->m_pNext == nullptr;
+            scoped_lock lock( m_Head.lock );
+            return m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) == nullptr;
         }
 
         /// Clears queue
         void clear()
         {
-            scoped_lock lockR( m_HeadLock );
-            scoped_lock lockW( m_TailLock );
-            while ( m_pHead->m_pNext != nullptr ) {
-                node_type * pHead = m_pHead;
-                m_pHead = m_pHead->m_pNext;
+            scoped_lock lockR( m_Head.lock );
+            scoped_lock lockW( m_Tail.lock );
+            while ( m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) != nullptr ) {
+                node_type * pHead = m_Head.ptr;
+                m_Head.ptr = m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed );
                 free_node( pHead );
             }
         }
@@ -352,10 +356,10 @@ namespace cds { namespace container {
         }
 
         //@cond
-        /// Returns reference to internal statistics
-        cds::container::msqueue::empty_stat statistics() const
+        /// The class has no internal statistics. For test consistency only
+        nullptr_t statistics() const
         {
-            return cds::container::msqueue::empty_stat();
+            return nullptr;
         }
         //@endcond
     };
index c5d19e489f160ee261eb6b1dec7188ccddace7ad..a841082e337cbb87157c72a82bd3d1e064534205 100644 (file)
@@ -339,6 +339,7 @@ namespace queue {
         void RWQueue_mutex();
         void RWQueue_ic();
         void RWQueue_ic_mutex();
+        void RWQueue_padding();
 
         void TsigasCycleQueue_static();
         void TsigasCycleQueue_static_ic();
@@ -452,6 +453,8 @@ namespace queue {
             CPPUNIT_TEST( RWQueue_default)
             CPPUNIT_TEST( RWQueue_mutex )
             CPPUNIT_TEST( RWQueue_ic )
+            CPPUNIT_TEST( RWQueue_ic_mutex )
+            CPPUNIT_TEST( RWQueue_padding )
 
         CPPUNIT_TEST_SUITE_END();
 
index ea72eab5fd3a652ed0e0aca4968b112dd8f055eb..993b24f43560a0081b334ebb57e219204832f8db 100644 (file)
@@ -42,4 +42,14 @@ namespace queue {
         test_ic< cds::container::RWQueue< int, queue_traits > >();
     }
 
+    void HdrTestQueue::RWQueue_padding()
+    {
+        struct queue_traits : public cds::container::rwqueue::traits
+        {
+            enum { padding = 16 };
+        };
+
+        test_ic< cds::container::RWQueue< int, queue_traits > >();
+    }
+
 }
index 6262551641fe619437ab574c0442fa78cd2ce6ec..a866c17ae2e6519808a7913590c7207a69f96a08 100644 (file)
@@ -605,6 +605,11 @@ namespace std {
         return o;
     }
 
+    static inline std::ostream& operator <<( std::ostream& o, nullptr_t /*s*/ )
+    {
+        return o;
+    }
+
     static inline ostream& operator <<( ostream& o, cds::container::fcdeque::empty_stat const& /*s*/ )
     {
         return o;