[TSan] Fixed data races in HP/DHP
authorkhizmax <libcds.dev@gmail.com>
Sat, 1 Apr 2017 16:35:04 +0000 (19:35 +0300)
committerkhizmax <libcds.dev@gmail.com>
Sat, 1 Apr 2017 16:35:04 +0000 (19:35 +0300)
cds/gc/dhp.h
cds/gc/hp.h
src/dhp.cpp
src/hp.cpp

index 23d876b0fc1d4b54515a3336f6335ecf0df9e568..d9dbf11fee139da60bb1dba937ad9247a4d00178 100644 (file)
@@ -61,10 +61,10 @@ namespace cds { namespace gc {
         //@cond
         struct guard_block: public cds::intrusive::FreeListImpl::node
         {
-            guard_block*    next_;  // next block in the thread list
+            atomics::atomic<guard_block*>  next_block_;  // next block in the thread list
 
             guard_block()
-                : next_( nullptr )
+                : next_block_( nullptr )
             {}
 
             guard* first()
@@ -113,7 +113,6 @@ namespace cds { namespace gc {
         public:
             thread_hp_storage( guard* arr, size_t nSize ) CDS_NOEXCEPT
                 : free_head_( arr )
-                , extended_list_( nullptr )
                 , array_( arr )
                 , initial_capacity_( nSize )
 #       ifdef CDS_ENABLE_HPSTAT
@@ -124,6 +123,7 @@ namespace cds { namespace gc {
             {
                 // Initialize guards
                 new( arr ) guard[nSize];
+                extended_list_.store( nullptr, atomics::memory_order_release );
             }
 
             thread_hp_storage() = delete;
@@ -195,18 +195,18 @@ namespace cds { namespace gc {
 
                 // free all extended blocks
                 hp_allocator& a = hp_allocator::instance();
-                for ( guard_block* p = extended_list_; p; ) {
-                    guard_block* next = p->next_;
+                for ( guard_block* p = extended_list_.load( atomics::memory_order_relaxed ); p; ) {
+                    guard_block* next = p->next_block_.load( atomics::memory_order_relaxed );
                     a.free( p );
                     p = next;
                 }
 
-                extended_list_ = nullptr;
+                extended_list_.store( nullptr, atomics::memory_order_release );
             }
 
             void init()
             {
-                assert( extended_list_ == nullptr );
+                assert( extended_list_.load(atomics::memory_order_relaxed) == nullptr );
 
                 guard* p = array_;
                 for ( guard* pEnd = p + initial_capacity_ - 1; p != pEnd; ++p )
@@ -221,15 +221,15 @@ namespace cds { namespace gc {
                 assert( free_head_ == nullptr );
 
                 guard_block* block = hp_allocator::instance().alloc();
-                block->next_ = extended_list_;
-                extended_list_ = block;
+                block->next_block_.store( extended_list_.load( atomics::memory_order_relaxed ), atomics::memory_order_release );
+                extended_list_.store( block, atomics::memory_order_release );
                 free_head_ = block->first();
                 CDS_HPSTAT( ++extend_call_count_ );
             }
 
         private:
             guard*          free_head_;        ///< Head of free guard list
-            guard_block*    extended_list_;    ///< Head of extended guard blocks allocated for the thread
+            atomics::atomic<guard_block*> extended_list_;    ///< Head of extended guard blocks allocated for the thread
             guard* const    array_;            ///< initial HP array
             size_t const    initial_capacity_; ///< Capacity of \p array_
 #       ifdef CDS_ENABLE_HPSTAT
index 09ba805945ece08dc3ccd34e03daa71e76726239..ba57dca6fef2a189b1df71a5d3739bda4ca9521e 100644 (file)
@@ -267,14 +267,16 @@ namespace cds { namespace gc {
 
             size_t size() const CDS_NOEXCEPT
             {
-                return current_ - retired_;
+                return current_.load(atomics::memory_order_relaxed) - retired_;
             }
 
             bool push( retired_ptr&& p ) CDS_NOEXCEPT
             {
-                *current_ = p;
+                retired_ptr* cur = current_.load( atomics::memory_order_relaxed );
+                *cur = p;
                 CDS_HPSTAT( ++retire_call_count_ );
-                return ++current_ < last_;
+                current_.store( cur + 1, atomics::memory_order_relaxed );
+                return cur + 1 < last_;
             }
 
             retired_ptr* first() const CDS_NOEXCEPT
@@ -284,17 +286,22 @@ namespace cds { namespace gc {
 
             retired_ptr* last() const CDS_NOEXCEPT
             {
-                return current_;
+                return current_.load( atomics::memory_order_relaxed );
             }
 
             void reset( size_t nSize ) CDS_NOEXCEPT
             {
-                current_ = first() + nSize;
+                current_.store( first() + nSize, atomics::memory_order_relaxed );
+            }
+
+            void interthread_clear()
+            {
+                current_.exchange( first(), atomics::memory_order_acq_rel );
             }
 
             bool full() const CDS_NOEXCEPT
             {
-                return current_ == last_;
+                return current_.load( atomics::memory_order_relaxed ) == last_;
             }
 
             static size_t calc_array_size( size_t capacity )
@@ -303,9 +310,9 @@ namespace cds { namespace gc {
             }
 
         private:
-            retired_ptr*            current_;
-            retired_ptr* const      last_;
-            retired_ptr* const      retired_;
+            atomics::atomic<retired_ptr*> current_;
+            retired_ptr* const            last_;
+            retired_ptr* const            retired_;
 #       ifdef CDS_ENABLE_HPSTAT
         public:
             size_t  retire_call_count_;
index 341cc3610a999e4cf640ce4796bc83ffae4a4a8e..ac8ec62f56f3de94fb91a51fde63b31bae7c2527 100644 (file)
@@ -149,6 +149,8 @@ namespace cds { namespace gc { namespace dhp {
 
         thread_record( guard* guards, size_t guard_count )
             : thread_data( guards, guard_count )
+            , m_pNextNode( nullptr )
+            , m_idOwner( cds::OS::c_NullThreadId )
             , m_bFree( false )
         {}
     };
@@ -191,10 +193,11 @@ namespace cds { namespace gc { namespace dhp {
     }
 
     CDS_EXPORT_API smr::smr( size_t nInitialHazardPtrCount )
-        : thread_list_( nullptr )
-        , initial_hazard_count_( nInitialHazardPtrCount < 4 ? 16 : nInitialHazardPtrCount )
+        : initial_hazard_count_( nInitialHazardPtrCount < 4 ? 16 : nInitialHazardPtrCount )
         , last_plist_size_( initial_hazard_count_ * 64 )
-    {}
+    {
+        thread_list_.store( nullptr, atomics::memory_order_release );
+    }
 
     CDS_EXPORT_API smr::~smr()
     {
@@ -204,7 +207,7 @@ namespace cds { namespace gc { namespace dhp {
         CDS_HPSTAT( statistics( s_postmortem_stat ) );
 
         thread_record* pHead = thread_list_.load( atomics::memory_order_relaxed );
-        thread_list_.store( nullptr, atomics::memory_order_relaxed );
+        thread_list_.store( nullptr, atomics::memory_order_release );
 
         thread_record* pNext = nullptr;
         for ( thread_record* hprec = pHead; hprec; hprec = pNext )
@@ -304,7 +307,7 @@ namespace cds { namespace gc { namespace dhp {
         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
 
         // First try to reuse a free (non-active) DHP record
-        for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) ) {
+        for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_acquire ) ) {
             cds::OS::ThreadId thId = nullThreadId;
             if ( !hprec->m_idOwner.compare_exchange_strong( thId, curThreadId, atomics::memory_order_relaxed, atomics::memory_order_relaxed ) )
                 continue;
@@ -318,9 +321,9 @@ namespace cds { namespace gc { namespace dhp {
             hprec = create_thread_data();
             hprec->m_idOwner.store( curThreadId, atomics::memory_order_relaxed );
 
-            thread_record* pOldHead = thread_list_.load( atomics::memory_order_relaxed );
+            thread_record* pOldHead = thread_list_.load( atomics::memory_order_acquire );
             do {
-                hprec->m_pNextNode.store( pOldHead, atomics::memory_order_relaxed );
+                hprec->m_pNextNode.store( pOldHead, atomics::memory_order_release );
             } while ( !thread_list_.compare_exchange_weak( pOldHead, hprec, atomics::memory_order_release, atomics::memory_order_acquire ) );
         }
 
@@ -408,8 +411,12 @@ namespace cds { namespace gc { namespace dhp {
             if ( pNode->m_idOwner.load( std::memory_order_relaxed ) != cds::OS::c_NullThreadId ) {
                 copy_hazards( plist, pNode->hazards_.array_, pNode->hazards_.initial_capacity_ );
 
-                for ( guard_block* block = pNode->hazards_.extended_list_; block; block = block->next_ )
+                for ( guard_block* block = pNode->hazards_.extended_list_.load( atomics::memory_order_acquire ); 
+                    block;
+                    block = block->next_block_.load( atomics::memory_order_acquire ) )
+                {
                     copy_hazards( plist, block->first(), defaults::c_extended_guard_block_size );
+                }
             }
 
             pNode = pNode->m_pNextNode.load( atomics::memory_order_relaxed );
@@ -462,7 +469,9 @@ namespace cds { namespace gc { namespace dhp {
 
             // If m_bFree == true then hprec->retired_ is empty - we don't need to see it
             if ( hprec->m_bFree.load( atomics::memory_order_acquire ) ) {
+                CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN;
                 assert( hprec->retired_.empty() );
+                CDS_TSAN_ANNOTATE_IGNORE_READS_END;
                 continue;
             }
 
@@ -508,6 +517,7 @@ namespace cds { namespace gc { namespace dhp {
 #   ifdef CDS_ENABLE_HPSTAT
         for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) )
         {
+            CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN;
             ++st.thread_rec_count;
             st.guard_allocated      += hprec->hazards_.alloc_guard_count_;
             st.guard_freed          += hprec->hazards_.free_guard_count_;
@@ -517,10 +527,13 @@ namespace cds { namespace gc { namespace dhp {
             st.free_count           += hprec->free_call_count_;
             st.scan_count           += hprec->scan_call_count_;
             st.help_scan_count      += hprec->help_scan_call_count_;
+            CDS_TSAN_ANNOTATE_IGNORE_READS_END;
         }
 
+        CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN;
         st.hp_block_count = hp_allocator_.block_allocated_.load( atomics::memory_order_relaxed );
         st.retired_block_count = retired_allocator_.block_allocated_.load( atomics::memory_order_relaxed );
+        CDS_TSAN_ANNOTATE_IGNORE_READS_END;
 #   endif
     }
 
index d3b16b5854ffd34de7707772e484d72241dfff36..afa05a3775dde0b3e81f0ed715f97c1b5769972b 100644 (file)
@@ -103,6 +103,8 @@ namespace cds { namespace gc { namespace hp {
 
         thread_record( guard* guards, size_t guard_count, retired_ptr* retired_arr, size_t retired_capacity )
             : thread_data( guards, guard_count, retired_arr, retired_capacity )
+            , m_pNextNode( nullptr )
+            , m_idOwner( cds::OS::c_NullThreadId )
             , m_bFree( false )
         {}
     };
@@ -140,23 +142,24 @@ namespace cds { namespace gc { namespace hp {
     }
 
     CDS_EXPORT_API smr::smr( size_t nHazardPtrCount, size_t nMaxThreadCount, size_t nMaxRetiredPtrCount, scan_type nScanType )
-        : thread_list_( nullptr )
-        , hazard_ptr_count_( nHazardPtrCount == 0 ? defaults::c_nHazardPointerPerThread : nHazardPtrCount )
+        : hazard_ptr_count_( nHazardPtrCount == 0 ? defaults::c_nHazardPointerPerThread : nHazardPtrCount )
         , max_thread_count_( nMaxThreadCount == 0 ? defaults::c_nMaxThreadCount : nMaxThreadCount )
         , max_retired_ptr_count_( calc_retired_size( nMaxRetiredPtrCount, hazard_ptr_count_, max_thread_count_ ))
         , scan_type_( nScanType )
         , scan_func_( nScanType == classic ? &smr::classic_scan : &smr::inplace_scan )
-    {}
+    {
+        thread_list_.store( nullptr, atomics::memory_order_release );
+    }
 
     CDS_EXPORT_API smr::~smr()
     {
         CDS_DEBUG_ONLY( const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId; )
         CDS_DEBUG_ONLY( const cds::OS::ThreadId mainThreadId = cds::OS::get_current_thread_id();)
 
-        CDS_HPSTAT( statistics( s_postmortem_stat ));
+        CDS_HPSTAT( statistics( s_postmortem_stat ) );
 
         thread_record* pHead = thread_list_.load( atomics::memory_order_relaxed );
-        thread_list_.store( nullptr, atomics::memory_order_relaxed );
+        thread_list_.store( nullptr, atomics::memory_order_release );
 
         thread_record* pNext = nullptr;
         for ( thread_record* hprec = pHead; hprec; hprec = pNext )
@@ -204,10 +207,13 @@ namespace cds { namespace gc { namespace hp {
             +--------------------------+
         */
 
-        char* mem = reinterpret_cast<char*>( s_alloc_memory( nSize ));
-        return new( mem ) thread_record( 
-            reinterpret_cast<guard*>( mem + sizeof( thread_record )), get_hazard_ptr_count(),
-            reinterpret_cast<retired_ptr*>( mem + sizeof( thread_record ) + guard_array_size ), get_max_retired_ptr_count()
+        uint8_t* mem = reinterpret_cast<uint8_t*>( s_alloc_memory( nSize ));
+
+        return new( mem ) thread_record(
+            reinterpret_cast<guard*>( mem + sizeof( thread_record )),
+            get_hazard_ptr_count(),
+            reinterpret_cast<retired_ptr*>( mem + sizeof( thread_record ) + guard_array_size ),
+            get_max_retired_ptr_count()
         );
     }
 
@@ -223,16 +229,14 @@ namespace cds { namespace gc { namespace hp {
 
     CDS_EXPORT_API smr::thread_record* smr::alloc_thread_data()
     {
-        //CDS_HAZARDPTR_STATISTIC( ++m_Stat.m_AllocHPRec )
-
         thread_record * hprec;
         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
 
         // First try to reuse a free (non-active) HP record
-        for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) ) {
+        for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_acquire )) {
             cds::OS::ThreadId thId = nullThreadId;
-            if ( !hprec->m_idOwner.compare_exchange_strong( thId, curThreadId, atomics::memory_order_relaxed, atomics::memory_order_relaxed ) )
+            if ( !hprec->m_idOwner.compare_exchange_strong( thId, curThreadId, atomics::memory_order_relaxed, atomics::memory_order_relaxed ))
                 continue;
             hprec->m_bFree.store( false, atomics::memory_order_release );
             return hprec;
@@ -243,9 +247,9 @@ namespace cds { namespace gc { namespace hp {
         hprec = create_thread_data();
         hprec->m_idOwner.store( curThreadId, atomics::memory_order_relaxed );
 
-        thread_record* pOldHead = thread_list_.load( atomics::memory_order_relaxed );
+        thread_record* pOldHead = thread_list_.load( atomics::memory_order_acquire );
         do {
-            hprec->m_pNextNode.store( pOldHead, atomics::memory_order_relaxed );
+            hprec->m_pNextNode.store( pOldHead, atomics::memory_order_release );
         } while ( !thread_list_.compare_exchange_weak( pOldHead, hprec, atomics::memory_order_release, atomics::memory_order_acquire ) );
 
         return hprec;
@@ -315,7 +319,7 @@ namespace cds { namespace gc { namespace hp {
             }
         }
 
-        CDS_HPSTAT( ++pRec->scan_count_ );
+        CDS_HPSTAT( ++pThreadRec->scan_count_ );
 
         // Sort retired pointer array
         std::sort( first_retired, last_retired, retired_ptr::less );
@@ -383,7 +387,7 @@ namespace cds { namespace gc { namespace hp {
     {
         thread_record* pRec = static_cast<thread_record*>( pThreadRec );
 
-        CDS_HPSTAT( ++pRec->scan_count_ );
+        CDS_HPSTAT( ++pThreadRec->scan_count_ );
 
         std::vector< void*, allocator<void*>>   plist;
         plist.reserve( get_max_thread_count() * get_hazard_ptr_count());
@@ -477,9 +481,8 @@ namespace cds { namespace gc { namespace hp {
                     scan( pThis );
             }
 
-            src.reset( 0 );
-
-            hprec->m_bFree.store( true, atomics::memory_order_relaxed );
+            src.interthread_clear();
+            hprec->m_bFree.store( true, atomics::memory_order_release );
             hprec->m_idOwner.store( nullThreadId, atomics::memory_order_release );
 
             scan( pThis );
@@ -492,6 +495,7 @@ namespace cds { namespace gc { namespace hp {
 #   ifdef CDS_ENABLE_HPSTAT
         for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) )
         {
+            CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN;
             ++st.thread_rec_count;
             st.guard_allocated += hprec->hazards_.alloc_guard_count_;
             st.guard_freed     += hprec->hazards_.free_guard_count_;
@@ -499,6 +503,7 @@ namespace cds { namespace gc { namespace hp {
             st.free_count      += hprec->free_count_;
             st.scan_count      += hprec->scan_count_;
             st.help_scan_count += hprec->help_scan_count_;
+            CDS_TSAN_ANNOTATE_IGNORE_READS_END;
         }
 #   endif
     }