//@cond
struct guard_block: public cds::intrusive::FreeListImpl::node
{
- guard_block* next_; // next block in the thread list
+ atomics::atomic<guard_block*> next_block_; // next block in the thread list
guard_block()
- : next_( nullptr )
+ : next_block_( nullptr )
{}
guard* first()
public:
thread_hp_storage( guard* arr, size_t nSize ) CDS_NOEXCEPT
: free_head_( arr )
- , extended_list_( nullptr )
, array_( arr )
, initial_capacity_( nSize )
# ifdef CDS_ENABLE_HPSTAT
{
// Initialize guards
new( arr ) guard[nSize];
+ extended_list_.store( nullptr, atomics::memory_order_release );
}
thread_hp_storage() = delete;
// free all extended blocks
hp_allocator& a = hp_allocator::instance();
- for ( guard_block* p = extended_list_; p; ) {
- guard_block* next = p->next_;
+ for ( guard_block* p = extended_list_.load( atomics::memory_order_relaxed ); p; ) {
+ guard_block* next = p->next_block_.load( atomics::memory_order_relaxed );
a.free( p );
p = next;
}
- extended_list_ = nullptr;
+ extended_list_.store( nullptr, atomics::memory_order_release );
}
void init()
{
- assert( extended_list_ == nullptr );
+ assert( extended_list_.load(atomics::memory_order_relaxed) == nullptr );
guard* p = array_;
for ( guard* pEnd = p + initial_capacity_ - 1; p != pEnd; ++p )
assert( free_head_ == nullptr );
guard_block* block = hp_allocator::instance().alloc();
- block->next_ = extended_list_;
- extended_list_ = block;
+ block->next_block_.store( extended_list_.load( atomics::memory_order_relaxed ), atomics::memory_order_release );
+ extended_list_.store( block, atomics::memory_order_release );
free_head_ = block->first();
CDS_HPSTAT( ++extend_call_count_ );
}
private:
guard* free_head_; ///< Head of free guard list
- guard_block* extended_list_; ///< Head of extended guard blocks allocated for the thread
+ atomics::atomic<guard_block*> extended_list_; ///< Head of extended guard blocks allocated for the thread
guard* const array_; ///< initial HP array
size_t const initial_capacity_; ///< Capacity of \p array_
# ifdef CDS_ENABLE_HPSTAT
size_t size() const CDS_NOEXCEPT
{
- return current_ - retired_;
+ return current_.load(atomics::memory_order_relaxed) - retired_;
}
bool push( retired_ptr&& p ) CDS_NOEXCEPT
{
- *current_ = p;
+ retired_ptr* cur = current_.load( atomics::memory_order_relaxed );
+ *cur = p;
CDS_HPSTAT( ++retire_call_count_ );
- return ++current_ < last_;
+ current_.store( cur + 1, atomics::memory_order_relaxed );
+ return cur + 1 < last_;
}
retired_ptr* first() const CDS_NOEXCEPT
retired_ptr* last() const CDS_NOEXCEPT
{
- return current_;
+ return current_.load( atomics::memory_order_relaxed );
}
void reset( size_t nSize ) CDS_NOEXCEPT
{
- current_ = first() + nSize;
+ current_.store( first() + nSize, atomics::memory_order_relaxed );
+ }
+
+ void interthread_clear()
+ {
+ current_.exchange( first(), atomics::memory_order_acq_rel );
}
bool full() const CDS_NOEXCEPT
{
- return current_ == last_;
+ return current_.load( atomics::memory_order_relaxed ) == last_;
}
static size_t calc_array_size( size_t capacity )
}
private:
- retired_ptr* current_;
- retired_ptr* const last_;
- retired_ptr* const retired_;
+ atomics::atomic<retired_ptr*> current_;
+ retired_ptr* const last_;
+ retired_ptr* const retired_;
# ifdef CDS_ENABLE_HPSTAT
public:
size_t retire_call_count_;
thread_record( guard* guards, size_t guard_count )
: thread_data( guards, guard_count )
+ , m_pNextNode( nullptr )
+ , m_idOwner( cds::OS::c_NullThreadId )
, m_bFree( false )
{}
};
}
CDS_EXPORT_API smr::smr( size_t nInitialHazardPtrCount )
- : thread_list_( nullptr )
- , initial_hazard_count_( nInitialHazardPtrCount < 4 ? 16 : nInitialHazardPtrCount )
+ : initial_hazard_count_( nInitialHazardPtrCount < 4 ? 16 : nInitialHazardPtrCount )
, last_plist_size_( initial_hazard_count_ * 64 )
- {}
+ {
+ thread_list_.store( nullptr, atomics::memory_order_release );
+ }
CDS_EXPORT_API smr::~smr()
{
CDS_HPSTAT( statistics( s_postmortem_stat ) );
thread_record* pHead = thread_list_.load( atomics::memory_order_relaxed );
- thread_list_.store( nullptr, atomics::memory_order_relaxed );
+ thread_list_.store( nullptr, atomics::memory_order_release );
thread_record* pNext = nullptr;
for ( thread_record* hprec = pHead; hprec; hprec = pNext )
const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
// First try to reuse a free (non-active) DHP record
- for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) ) {
+ for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_acquire ) ) {
cds::OS::ThreadId thId = nullThreadId;
if ( !hprec->m_idOwner.compare_exchange_strong( thId, curThreadId, atomics::memory_order_relaxed, atomics::memory_order_relaxed ) )
continue;
hprec = create_thread_data();
hprec->m_idOwner.store( curThreadId, atomics::memory_order_relaxed );
- thread_record* pOldHead = thread_list_.load( atomics::memory_order_relaxed );
+ thread_record* pOldHead = thread_list_.load( atomics::memory_order_acquire );
do {
- hprec->m_pNextNode.store( pOldHead, atomics::memory_order_relaxed );
+ hprec->m_pNextNode.store( pOldHead, atomics::memory_order_release );
} while ( !thread_list_.compare_exchange_weak( pOldHead, hprec, atomics::memory_order_release, atomics::memory_order_acquire ) );
}
if ( pNode->m_idOwner.load( std::memory_order_relaxed ) != cds::OS::c_NullThreadId ) {
copy_hazards( plist, pNode->hazards_.array_, pNode->hazards_.initial_capacity_ );
- for ( guard_block* block = pNode->hazards_.extended_list_; block; block = block->next_ )
+ for ( guard_block* block = pNode->hazards_.extended_list_.load( atomics::memory_order_acquire );
+ block;
+ block = block->next_block_.load( atomics::memory_order_acquire ) )
+ {
copy_hazards( plist, block->first(), defaults::c_extended_guard_block_size );
+ }
}
pNode = pNode->m_pNextNode.load( atomics::memory_order_relaxed );
// If m_bFree == true then hprec->retired_ is empty - we don't need to see it
if ( hprec->m_bFree.load( atomics::memory_order_acquire ) ) {
+ CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN;
assert( hprec->retired_.empty() );
+ CDS_TSAN_ANNOTATE_IGNORE_READS_END;
continue;
}
# ifdef CDS_ENABLE_HPSTAT
for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) )
{
+ CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN;
++st.thread_rec_count;
st.guard_allocated += hprec->hazards_.alloc_guard_count_;
st.guard_freed += hprec->hazards_.free_guard_count_;
st.free_count += hprec->free_call_count_;
st.scan_count += hprec->scan_call_count_;
st.help_scan_count += hprec->help_scan_call_count_;
+ CDS_TSAN_ANNOTATE_IGNORE_READS_END;
}
+ CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN;
st.hp_block_count = hp_allocator_.block_allocated_.load( atomics::memory_order_relaxed );
st.retired_block_count = retired_allocator_.block_allocated_.load( atomics::memory_order_relaxed );
+ CDS_TSAN_ANNOTATE_IGNORE_READS_END;
# endif
}
thread_record( guard* guards, size_t guard_count, retired_ptr* retired_arr, size_t retired_capacity )
: thread_data( guards, guard_count, retired_arr, retired_capacity )
+ , m_pNextNode( nullptr )
+ , m_idOwner( cds::OS::c_NullThreadId )
, m_bFree( false )
{}
};
}
CDS_EXPORT_API smr::smr( size_t nHazardPtrCount, size_t nMaxThreadCount, size_t nMaxRetiredPtrCount, scan_type nScanType )
- : thread_list_( nullptr )
- , hazard_ptr_count_( nHazardPtrCount == 0 ? defaults::c_nHazardPointerPerThread : nHazardPtrCount )
+ : hazard_ptr_count_( nHazardPtrCount == 0 ? defaults::c_nHazardPointerPerThread : nHazardPtrCount )
, max_thread_count_( nMaxThreadCount == 0 ? defaults::c_nMaxThreadCount : nMaxThreadCount )
, max_retired_ptr_count_( calc_retired_size( nMaxRetiredPtrCount, hazard_ptr_count_, max_thread_count_ ))
, scan_type_( nScanType )
, scan_func_( nScanType == classic ? &smr::classic_scan : &smr::inplace_scan )
- {}
+ {
+ thread_list_.store( nullptr, atomics::memory_order_release );
+ }
CDS_EXPORT_API smr::~smr()
{
CDS_DEBUG_ONLY( const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId; )
CDS_DEBUG_ONLY( const cds::OS::ThreadId mainThreadId = cds::OS::get_current_thread_id();)
- CDS_HPSTAT( statistics( s_postmortem_stat ));
+ CDS_HPSTAT( statistics( s_postmortem_stat ) );
thread_record* pHead = thread_list_.load( atomics::memory_order_relaxed );
- thread_list_.store( nullptr, atomics::memory_order_relaxed );
+ thread_list_.store( nullptr, atomics::memory_order_release );
thread_record* pNext = nullptr;
for ( thread_record* hprec = pHead; hprec; hprec = pNext )
+--------------------------+
*/
- char* mem = reinterpret_cast<char*>( s_alloc_memory( nSize ));
- return new( mem ) thread_record(
- reinterpret_cast<guard*>( mem + sizeof( thread_record )), get_hazard_ptr_count(),
- reinterpret_cast<retired_ptr*>( mem + sizeof( thread_record ) + guard_array_size ), get_max_retired_ptr_count()
+ uint8_t* mem = reinterpret_cast<uint8_t*>( s_alloc_memory( nSize ));
+
+ return new( mem ) thread_record(
+ reinterpret_cast<guard*>( mem + sizeof( thread_record )),
+ get_hazard_ptr_count(),
+ reinterpret_cast<retired_ptr*>( mem + sizeof( thread_record ) + guard_array_size ),
+ get_max_retired_ptr_count()
);
}
CDS_EXPORT_API smr::thread_record* smr::alloc_thread_data()
{
- //CDS_HAZARDPTR_STATISTIC( ++m_Stat.m_AllocHPRec )
-
thread_record * hprec;
const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
// First try to reuse a free (non-active) HP record
- for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) ) {
+ for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_acquire )) {
cds::OS::ThreadId thId = nullThreadId;
- if ( !hprec->m_idOwner.compare_exchange_strong( thId, curThreadId, atomics::memory_order_relaxed, atomics::memory_order_relaxed ) )
+ if ( !hprec->m_idOwner.compare_exchange_strong( thId, curThreadId, atomics::memory_order_relaxed, atomics::memory_order_relaxed ))
continue;
hprec->m_bFree.store( false, atomics::memory_order_release );
return hprec;
hprec = create_thread_data();
hprec->m_idOwner.store( curThreadId, atomics::memory_order_relaxed );
- thread_record* pOldHead = thread_list_.load( atomics::memory_order_relaxed );
+ thread_record* pOldHead = thread_list_.load( atomics::memory_order_acquire );
do {
- hprec->m_pNextNode.store( pOldHead, atomics::memory_order_relaxed );
+ hprec->m_pNextNode.store( pOldHead, atomics::memory_order_release );
} while ( !thread_list_.compare_exchange_weak( pOldHead, hprec, atomics::memory_order_release, atomics::memory_order_acquire ) );
return hprec;
}
}
- CDS_HPSTAT( ++pRec->scan_count_ );
+ CDS_HPSTAT( ++pThreadRec->scan_count_ );
// Sort retired pointer array
std::sort( first_retired, last_retired, retired_ptr::less );
{
thread_record* pRec = static_cast<thread_record*>( pThreadRec );
- CDS_HPSTAT( ++pRec->scan_count_ );
+ CDS_HPSTAT( ++pThreadRec->scan_count_ );
std::vector< void*, allocator<void*>> plist;
plist.reserve( get_max_thread_count() * get_hazard_ptr_count());
scan( pThis );
}
- src.reset( 0 );
-
- hprec->m_bFree.store( true, atomics::memory_order_relaxed );
+ src.interthread_clear();
+ hprec->m_bFree.store( true, atomics::memory_order_release );
hprec->m_idOwner.store( nullThreadId, atomics::memory_order_release );
scan( pThis );
# ifdef CDS_ENABLE_HPSTAT
for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) )
{
+ CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN;
++st.thread_rec_count;
st.guard_allocated += hprec->hazards_.alloc_guard_count_;
st.guard_freed += hprec->hazards_.free_guard_count_;
st.free_count += hprec->free_count_;
st.scan_count += hprec->scan_count_;
st.help_scan_count += hprec->help_scan_count_;
+ CDS_TSAN_ANNOTATE_IGNORE_READS_END;
}
# endif
}