Source code repo: http://github.com/khizmax/libcds/
Download: http://sourceforge.net/projects/libcds/files/
-
+
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
void onInvokeExclusive() { ++m_nInvokeExclusive; }
void onWakeupByNotifying() { ++m_nWakeupByNotifying; }
void onPassiveToCombiner() { ++m_nPassiveToCombiner; }
-
+
//@endcond
};
/// Destroys the objects and mark all publication records as inactive
~kernel()
{
- // mark all publication record as detached
- for ( publication_record* p = m_pHead; p; ) {
- p->pOwner = nullptr;
+ m_pThreadRec.reset(); // calls tls_cleanup()
+ // delete all publication records
+ for ( publication_record* p = m_pHead; p; ) {
publication_record * pRec = p;
p = p->pNext.load( memory_model::memory_order_relaxed );
- if ( pRec->nState.load( memory_model::memory_order_acquire ) == removed )
- free_publication_record( static_cast<publication_record_type *>( pRec ));
+ free_publication_record( static_cast<publication_record_type *>( pRec ));
+ m_Stat.onDeletePubRecord();
}
}
if ( !pRec ) {
// Allocate new publication record
pRec = cxx11_allocator().New();
- pRec->pOwner = reinterpret_cast<void *>( this );
m_pThreadRec.reset( pRec );
m_Stat.onCreatePubRecord();
}
/// Marks publication record for the current thread as empty
void release_record( publication_record_type * pRec )
{
- assert( pRec->is_done() );
+ assert( pRec->is_done());
pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_release );
}
Some operation in flat combining containers should be called in exclusive mode
i.e the current thread should become the combiner to process the operation.
The typical example is \p empty() function.
-
+
\p %invoke_exclusive() allows do that: the current thread becomes the combiner,
invokes \p f exclusively but unlike a typical usage the thread does not process any pending request.
Instead, after end of \p f call the current thread wakes up a pending thread if any.
/// Marks \p rec as executed
/**
This function should be called by container if \p batch_combine mode is used.
- For usual combining (see \p combine() ) this function is excess.
+ For usual combining (see \p combine()) this function is excess.
*/
void operation_done( publication_record& rec )
{
{
// Thread done
// pRec that is TLS data should be excluded from publication list
- if ( pRec ) {
- if ( pRec->pOwner ) {
- // kernel is alive
- pRec->nState.store( removed, memory_model::memory_order_release );
- }
- else {
- // kernel already deleted
- free_publication_record( pRec );
- }
- }
+ pRec->nState.store( removed, memory_model::memory_order_release );
}
static void free_publication_record( publication_record_type* pRec )
assert( m_pThreadRec.get() == nullptr );
publication_record_type* pRec = cxx11_allocator().New();
m_pHead = pRec;
- pRec->pOwner = this;
m_pThreadRec.reset( pRec );
m_Stat.onCreatePubRecord();
}
pRec->nState.store( active, memory_model::memory_order_release );
// Insert record to publication list
- if ( m_pHead != static_cast<publication_record *>(pRec) ) {
+ if ( m_pHead != static_cast<publication_record *>(pRec)) {
publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
if ( p != static_cast<publication_record *>( pRec )) {
do {
- pRec->pNext = p;
+ pRec->pNext.store( p, memory_model::memory_order_relaxed );
// Failed CAS changes p
} while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
- memory_model::memory_order_release, atomics::memory_order_relaxed ));
+ memory_model::memory_order_release, atomics::memory_order_acquire ));
m_Stat.onActivatePubRecord();
}
}
template <class Container>
void try_combining( Container& owner, publication_record_type* pRec )
{
- if ( m_Mutex.try_lock() ) {
+ if ( m_Mutex.try_lock()) {
// The thread becomes a combiner
- lock_guard l( m_Mutex, std::adopt_lock_t() );
+ lock_guard l( m_Mutex, std::adopt_lock_t());
// The record pRec can be excluded from publication list. Re-publish it
republish( pRec );
}
else {
// There is another combiner, wait while it executes our request
- if ( !wait_for_combining( pRec ) ) {
+ if ( !wait_for_combining( pRec )) {
// The thread becomes a combiner
- lock_guard l( m_Mutex, std::adopt_lock_t() );
+ lock_guard l( m_Mutex, std::adopt_lock_t());
// The record pRec can be excluded from publication list. Re-publish it
republish( pRec );
template <class Container>
void try_batch_combining( Container& owner, publication_record_type * pRec )
{
- if ( m_Mutex.try_lock() ) {
+ if ( m_Mutex.try_lock()) {
// The thread becomes a combiner
- lock_guard l( m_Mutex, std::adopt_lock_t() );
+ lock_guard l( m_Mutex, std::adopt_lock_t());
// The record pRec can be excluded from publication list. Re-publish it
republish( pRec );
}
else {
// There is another combiner, wait while it executes our request
- if ( !wait_for_combining( pRec ) ) {
+ if ( !wait_for_combining( pRec )) {
// The thread becomes a combiner
- lock_guard l( m_Mutex, std::adopt_lock_t() );
+ lock_guard l( m_Mutex, std::adopt_lock_t());
// The record pRec can be excluded from publication list. Re-publish it
republish( pRec );
void combining( Container& owner )
{
// The thread is a combiner
- assert( !m_Mutex.try_lock() );
+ assert( !m_Mutex.try_lock());
unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
case active:
if ( p->op() >= req_Operation ) {
p->nAge.store( nCurAge, memory_model::memory_order_release );
- owner.fc_apply( static_cast<publication_record_type*>(p) );
+ owner.fc_apply( static_cast<publication_record_type*>(p));
operation_done( *p );
bOpDone = true;
}
assert( p == m_pHead );
break;
case removed:
- // The record should be removed
- p = unlink_and_delete_record( pPrev, p );
- continue;
+ // The record should be removed (except m_pHead)
+ if ( pPrev ) {
+ p = unlink_and_delete_record( pPrev, p );
+ continue;
+ }
+ break;
default:
/// ??? That is impossible
assert(false);
void batch_combining( Container& owner )
{
// The thread is a combiner
- assert( !m_Mutex.try_lock() );
+ assert( !m_Mutex.try_lock());
unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
- owner.fc_process( begin(), end() );
+ owner.fc_process( begin(), end());
combining_pass( owner, nCurAge );
m_Stat.onCombining();
if ( m_waitStrategy.wait( *this, *pRec ))
m_Stat.onWakeupByNotifying();
- if ( m_Mutex.try_lock() ) {
+ if ( m_Mutex.try_lock()) {
if ( pRec->op( memory_model::memory_order_acquire ) == req_Response ) {
// Operation is done
m_Mutex.unlock();
publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
{
- if ( pPrev ) {
- publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
- if ( pPrev->pNext.compare_exchange_strong( p, pNext,
- memory_model::memory_order_release, atomics::memory_order_relaxed ))
- {
- free_publication_record( static_cast<publication_record_type *>( p ));
- m_Stat.onDeletePubRecord();
- }
- return pNext;
- }
- else {
- m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
+ // m_pHead is persistent node and cannot be deleted
+ assert( pPrev != nullptr );
+ assert( p != m_pHead );
+
+ publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
+ if ( pPrev->pNext.compare_exchange_strong( p, pNext,
+ memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
+ {
free_publication_record( static_cast<publication_record_type *>( p ));
m_Stat.onDeletePubRecord();
- return m_pHead;
}
+ return pNext;
}
//@endcond
};