FlatCombining: fixed a race
[libcds.git] / cds / algo / flat_combining / kernel.h
index 1f24cc21f43e22dba797482184e326a9e3c32f79..654992d6adc43da066d43428c75b9a89f1121ae6 100644 (file)
@@ -5,7 +5,7 @@
 
     Source code repo: http://github.com/khizmax/libcds/
     Download: http://sourceforge.net/projects/libcds/files/
-    
+
     Redistribution and use in source and binary forms, with or without
     modification, are permitted provided that the following conditions are met:
 
@@ -143,7 +143,7 @@ namespace cds { namespace algo {
             void    onInvokeExclusive()         { ++m_nInvokeExclusive;         }
             void    onWakeupByNotifying()       { ++m_nWakeupByNotifying;       }
             void    onPassiveToCombiner()       { ++m_nPassiveToCombiner;       }
-            
+
             //@endcond
         };
 
@@ -291,14 +291,14 @@ namespace cds { namespace algo {
             /// Destroys the objects and mark all publication records as inactive
             ~kernel()
             {
-                // mark all publication record as detached
-                for ( publication_record* p = m_pHead; p; ) {
-                    p->pOwner = nullptr;
+                m_pThreadRec.reset();   // calls tls_cleanup()
 
+                // delete all publication records
+                for ( publication_record* p = m_pHead; p; ) {
                     publication_record * pRec = p;
                     p = p->pNext.load( memory_model::memory_order_relaxed );
-                    if ( pRec->nState.load( memory_model::memory_order_acquire ) == removed )
-                        free_publication_record( static_cast<publication_record_type *>( pRec ));
+                    free_publication_record( static_cast<publication_record_type *>( pRec ));
+                    m_Stat.onDeletePubRecord();
                 }
             }
 
@@ -313,7 +313,6 @@ namespace cds { namespace algo {
                 if ( !pRec ) {
                     // Allocate new publication record
                     pRec = cxx11_allocator().New();
-                    pRec->pOwner = reinterpret_cast<void *>( this );
                     m_pThreadRec.reset( pRec );
                     m_Stat.onCreatePubRecord();
                 }
@@ -329,7 +328,7 @@ namespace cds { namespace algo {
             /// Marks publication record for the current thread as empty
             void release_record( publication_record_type * pRec )
             {
-                assert( pRec->is_done() );
+                assert( pRec->is_done());
                 pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_release );
             }
 
@@ -391,7 +390,7 @@ namespace cds { namespace algo {
                 Some operation in flat combining containers should be called in exclusive mode
                 i.e the current thread should become the combiner to process the operation.
                 The typical example is \p empty() function.
-                
+
                 \p %invoke_exclusive() allows do that: the current thread becomes the combiner,
                 invokes \p f exclusively but unlike a typical usage the thread does not process any pending request.
                 Instead, after end of \p f call the current thread wakes up a pending thread if any.
@@ -410,7 +409,7 @@ namespace cds { namespace algo {
             /// Marks \p rec as executed
             /**
                 This function should be called by container if \p batch_combine mode is used.
-                For usual combining (see \p combine() ) this function is excess.
+                For usual combining (see \p combine()) this function is excess.
             */
             void operation_done( publication_record& rec )
             {
@@ -571,16 +570,7 @@ namespace cds { namespace algo {
             {
                 // Thread done
                 // pRec that is TLS data should be excluded from publication list
-                if ( pRec ) {
-                    if ( pRec->pOwner ) {
-                        // kernel is alive
-                        pRec->nState.store( removed, memory_model::memory_order_release );
-                    }
-                    else {
-                        // kernel already deleted
-                        free_publication_record( pRec );
-                    }
-                }
+                pRec->nState.store( removed, memory_model::memory_order_release );
             }
 
             static void free_publication_record( publication_record_type* pRec )
@@ -593,7 +583,6 @@ namespace cds { namespace algo {
                 assert( m_pThreadRec.get() == nullptr );
                 publication_record_type* pRec = cxx11_allocator().New();
                 m_pHead = pRec;
-                pRec->pOwner = this;
                 m_pThreadRec.reset( pRec );
                 m_Stat.onCreatePubRecord();
             }
@@ -606,14 +595,14 @@ namespace cds { namespace algo {
                 pRec->nState.store( active, memory_model::memory_order_release );
 
                 // Insert record to publication list
-                if ( m_pHead != static_cast<publication_record *>(pRec) ) {
+                if ( m_pHead != static_cast<publication_record *>(pRec)) {
                     publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
                     if ( p != static_cast<publication_record *>( pRec )) {
                         do {
-                            pRec->pNext = p;
+                            pRec->pNext.store( p, memory_model::memory_order_relaxed );
                             // Failed CAS changes p
                         } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
-                            memory_model::memory_order_release, atomics::memory_order_relaxed ));
+                            memory_model::memory_order_release, atomics::memory_order_acquire ));
                         m_Stat.onActivatePubRecord();
                     }
                 }
@@ -630,9 +619,9 @@ namespace cds { namespace algo {
             template <class Container>
             void try_combining( Container& owner, publication_record_type* pRec )
             {
-                if ( m_Mutex.try_lock() ) {
+                if ( m_Mutex.try_lock()) {
                     // The thread becomes a combiner
-                    lock_guard l( m_Mutex, std::adopt_lock_t() );
+                    lock_guard l( m_Mutex, std::adopt_lock_t());
 
                     // The record pRec can be excluded from publication list. Re-publish it
                     republish( pRec );
@@ -642,9 +631,9 @@ namespace cds { namespace algo {
                 }
                 else {
                     // There is another combiner, wait while it executes our request
-                    if ( !wait_for_combining( pRec ) ) {
+                    if ( !wait_for_combining( pRec )) {
                         // The thread becomes a combiner
-                        lock_guard l( m_Mutex, std::adopt_lock_t() );
+                        lock_guard l( m_Mutex, std::adopt_lock_t());
 
                         // The record pRec can be excluded from publication list. Re-publish it
                         republish( pRec );
@@ -658,9 +647,9 @@ namespace cds { namespace algo {
             template <class Container>
             void try_batch_combining( Container& owner, publication_record_type * pRec )
             {
-                if ( m_Mutex.try_lock() ) {
+                if ( m_Mutex.try_lock()) {
                     // The thread becomes a combiner
-                    lock_guard l( m_Mutex, std::adopt_lock_t() );
+                    lock_guard l( m_Mutex, std::adopt_lock_t());
 
                     // The record pRec can be excluded from publication list. Re-publish it
                     republish( pRec );
@@ -670,9 +659,9 @@ namespace cds { namespace algo {
                 }
                 else {
                     // There is another combiner, wait while it executes our request
-                    if ( !wait_for_combining( pRec ) ) {
+                    if ( !wait_for_combining( pRec )) {
                         // The thread becomes a combiner
-                        lock_guard l( m_Mutex, std::adopt_lock_t() );
+                        lock_guard l( m_Mutex, std::adopt_lock_t());
 
                         // The record pRec can be excluded from publication list. Re-publish it
                         republish( pRec );
@@ -687,7 +676,7 @@ namespace cds { namespace algo {
             void combining( Container& owner )
             {
                 // The thread is a combiner
-                assert( !m_Mutex.try_lock() );
+                assert( !m_Mutex.try_lock());
 
                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
 
@@ -716,7 +705,7 @@ namespace cds { namespace algo {
                         case active:
                             if ( p->op() >= req_Operation ) {
                                 p->nAge.store( nCurAge, memory_model::memory_order_release );
-                                owner.fc_apply( static_cast<publication_record_type*>(p) );
+                                owner.fc_apply( static_cast<publication_record_type*>(p));
                                 operation_done( *p );
                                 bOpDone = true;
                             }
@@ -726,9 +715,12 @@ namespace cds { namespace algo {
                             assert( p == m_pHead );
                             break;
                         case removed:
-                            // The record should be removed
-                            p = unlink_and_delete_record( pPrev, p );
-                            continue;
+                            // The record should be removed (except m_pHead)
+                            if ( pPrev ) {
+                                p = unlink_and_delete_record( pPrev, p );
+                                continue;
+                            }
+                            break;
                         default:
                             /// ??? That is impossible
                             assert(false);
@@ -743,12 +735,12 @@ namespace cds { namespace algo {
             void batch_combining( Container& owner )
             {
                 // The thread is a combiner
-                assert( !m_Mutex.try_lock() );
+                assert( !m_Mutex.try_lock());
 
                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
 
                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
-                    owner.fc_process( begin(), end() );
+                    owner.fc_process( begin(), end());
 
                 combining_pass( owner, nCurAge );
                 m_Stat.onCombining();
@@ -771,7 +763,7 @@ namespace cds { namespace algo {
                     if ( m_waitStrategy.wait( *this, *pRec ))
                         m_Stat.onWakeupByNotifying();
 
-                    if ( m_Mutex.try_lock() ) {
+                    if ( m_Mutex.try_lock()) {
                         if ( pRec->op( memory_model::memory_order_acquire ) == req_Response ) {
                             // Operation is done
                             m_Mutex.unlock();
@@ -819,22 +811,18 @@ namespace cds { namespace algo {
 
             publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
             {
-                if ( pPrev ) {
-                    publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
-                    if ( pPrev->pNext.compare_exchange_strong( p, pNext,
-                        memory_model::memory_order_release, atomics::memory_order_relaxed ))
-                    {
-                        free_publication_record( static_cast<publication_record_type *>( p ));
-                        m_Stat.onDeletePubRecord();
-                    }
-                    return pNext;
-                }
-                else {
-                    m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
+                // m_pHead is persistent node and cannot be deleted
+                assert( pPrev != nullptr );
+                assert( p != m_pHead );
+
+                publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
+                if ( pPrev->pNext.compare_exchange_strong( p, pNext,
+                    memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
+                {
                     free_publication_record( static_cast<publication_record_type *>( p ));
                     m_Stat.onDeletePubRecord();
-                    return m_pHead;
                 }
+                return pNext;
             }
             //@endcond
         };