Fixed a memory leak in flat-combining algorithm
authorkhizmax <libcds.dev@gmail.com>
Tue, 27 Dec 2016 21:29:34 +0000 (00:29 +0300)
committerkhizmax <libcds.dev@gmail.com>
Tue, 27 Dec 2016 21:29:34 +0000 (00:29 +0300)
cds/algo/flat_combining/defs.h
cds/algo/flat_combining/kernel.h

index a959e8e..23470d6 100644 (file)
@@ -60,7 +60,8 @@ namespace cds { namespace algo { namespace flat_combining {
         atomics::atomic<unsigned int>           nRequest;   ///< Request field (depends on data structure)
         atomics::atomic<unsigned int>           nState;     ///< Record state: inactive, active, removed
         atomics::atomic<unsigned int>           nAge;       ///< Age of the record
-        atomics::atomic<publication_record *>   pNext;      ///< Next record in publication list
+        atomics::atomic<publication_record *>   pNext;      ///< Next record in active publication list
+        atomics::atomic<publication_record *>   pNextAllocated; ///< Next record in allocated publication list
 
         /// Initializes publication record
         publication_record()
@@ -68,6 +69,7 @@ namespace cds { namespace algo { namespace flat_combining {
             , nState( inactive )
             , nAge( 0 )
             , pNext( nullptr )
+            , pNextAllocated( nullptr )
         {}
 
         /// Returns the value of \p nRequest field
index 654992d..a8d3ce9 100644 (file)
@@ -254,11 +254,12 @@ namespace cds { namespace algo {
             //@endcond
 
         protected:
-            atomics::atomic<unsigned int>  m_nCount;   ///< Total count of combining passes. Used as an age.
-            publication_record_type *   m_pHead;    ///< Head of publication list
+            atomics::atomic<unsigned int>  m_nCount;    ///< Total count of combining passes. Used as an age.
+            publication_record_type*    m_pHead;        ///< Head of active publication list
+            publication_record_type*    m_pAllocatedHead; ///< Head of allocated publication list
             boost::thread_specific_ptr< publication_record_type > m_pThreadRec;   ///< Thread-local publication record
-            mutable global_lock_type    m_Mutex;    ///< Global mutex
-            mutable stat                m_Stat;     ///< Internal statistics
+            mutable global_lock_type    m_Mutex;        ///< Global mutex
+            mutable stat                m_Stat;         ///< Internal statistics
             unsigned int const          m_nCompactFactor;    ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
             unsigned int const          m_nCombinePassCount; ///< Number of combining passes
             wait_strategy               m_waitStrategy;      ///< Wait strategy
@@ -281,24 +282,29 @@ namespace cds { namespace algo {
                 )
                 : m_nCount(0)
                 , m_pHead( nullptr )
+                , m_pAllocatedHead( nullptr )
                 , m_pThreadRec( tls_cleanup )
                 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 ))   // binary mask
                 , m_nCombinePassCount( nCombinePassCount )
             {
-                init();
+                assert( m_pThreadRec.get() == nullptr );
+                publication_record_type* pRec = cxx11_allocator().New();
+                m_pAllocatedHead = 
+                    m_pHead = pRec;
+                m_pThreadRec.reset( pRec );
+                m_Stat.onCreatePubRecord();
             }
 
-            /// Destroys the objects and mark all publication records as inactive
+            /// Destroys the object and all publication records
             ~kernel()
             {
                 m_pThreadRec.reset();   // calls tls_cleanup()
 
                 // delete all publication records
-                for ( publication_record* p = m_pHead; p; ) {
+                for ( publication_record* p = m_pAllocatedHead; p; ) {
                     publication_record * pRec = p;
-                    p = p->pNext.load( memory_model::memory_order_relaxed );
+                    p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
                     free_publication_record( static_cast<publication_record_type *>( pRec ));
-                    m_Stat.onDeletePubRecord();
                 }
             }
 
@@ -315,9 +321,17 @@ namespace cds { namespace algo {
                     pRec = cxx11_allocator().New();
                     m_pThreadRec.reset( pRec );
                     m_Stat.onCreatePubRecord();
-                }
 
-                if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
+                    // Insert in allocated list
+                    assert( m_pAllocatedHead != nullptr );
+                    publication_record* p = m_pAllocatedHead->pNextAllocated.load( memory_model::memory_order_acquire );
+                    do {
+                        pRec->pNextAllocated.store( p, memory_model::memory_order_relaxed );
+                    } while ( !m_pAllocatedHead->pNextAllocated.compare_exchange_weak( p, pRec, memory_model::memory_order_release, atomics::memory_order_acquire ));
+
+                    publish( pRec );
+                }
+                else if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
                     publish( pRec );
 
                 assert( pRec->op() == req_EmptyRecord );
@@ -573,26 +587,18 @@ namespace cds { namespace algo {
                 pRec->nState.store( removed, memory_model::memory_order_release );
             }
 
-            static void free_publication_record( publication_record_type* pRec )
+            void free_publication_record( publication_record_type* pRec )
             {
                 cxx11_allocator().Delete( pRec );
-            }
-
-            void init()
-            {
-                assert( m_pThreadRec.get() == nullptr );
-                publication_record_type* pRec = cxx11_allocator().New();
-                m_pHead = pRec;
-                m_pThreadRec.reset( pRec );
-                m_Stat.onCreatePubRecord();
+                m_Stat.onDeletePubRecord();
             }
 
             void publish( publication_record_type* pRec )
             {
                 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
 
-                pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_release );
-                pRec->nState.store( active, memory_model::memory_order_release );
+                pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_relaxed );
+                pRec->nState.store( active, memory_model::memory_order_relaxed );
 
                 // Insert record to publication list
                 if ( m_pHead != static_cast<publication_record *>(pRec)) {
@@ -690,42 +696,36 @@ namespace cds { namespace algo {
                 }
 
                 m_Stat.onCombining();
-                if ( (nCurAge & m_nCompactFactor) == 0 )
+                if ( ( nCurAge & m_nCompactFactor ) == 0 )
                     compact_list( nCurAge );
             }
 
             template <class Container>
             bool combining_pass( Container& owner, unsigned int nCurAge )
             {
-                publication_record* pPrev = nullptr;
                 publication_record* p = m_pHead;
                 bool bOpDone = false;
                 while ( p ) {
-                    switch ( p->nState.load( memory_model::memory_order_acquire )) {
-                        case active:
-                            if ( p->op() >= req_Operation ) {
-                                p->nAge.store( nCurAge, memory_model::memory_order_release );
-                                owner.fc_apply( static_cast<publication_record_type*>(p));
-                                operation_done( *p );
-                                bOpDone = true;
-                            }
-                            break;
-                        case inactive:
-                            // Only m_pHead can be inactive in the publication list
-                            assert( p == m_pHead );
-                            break;
-                        case removed:
-                            // The record should be removed (except m_pHead)
-                            if ( pPrev ) {
-                                p = unlink_and_delete_record( pPrev, p );
-                                continue;
-                            }
-                            break;
-                        default:
-                            /// ??? That is impossible
-                            assert(false);
+                    switch ( p->nState.load( memory_model::memory_order_acquire ) ) {
+                    case active:
+                        if ( p->op() >= req_Operation ) {
+                            p->nAge.store( nCurAge, memory_model::memory_order_relaxed );
+                            owner.fc_apply( static_cast<publication_record_type*>( p ) );
+                            operation_done( *p );
+                            bOpDone = true;
+                        }
+                        break;
+                    case inactive:
+                        // Only m_pHead can be inactive in the publication list
+                        assert( p == m_pHead );
+                        break;
+                    case removed:
+                        // Such record will be removed on compacting phase
+                        break;
+                    default:
+                        /// ??? That is impossible
+                        assert( false );
                     }
-                    pPrev = p;
                     p = p->pNext.load( memory_model::memory_order_acquire );
                 }
                 return bOpDone;
@@ -735,16 +735,16 @@ namespace cds { namespace algo {
             void batch_combining( Container& owner )
             {
                 // The thread is a combiner
-                assert( !m_Mutex.try_lock());
+                assert( !m_Mutex.try_lock() );
 
                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
 
                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
-                    owner.fc_process( begin(), end());
+                    owner.fc_process( begin(), end() );
 
                 combining_pass( owner, nCurAge );
                 m_Stat.onCombining();
-                if ( (nCurAge & m_nCompactFactor) == 0 )
+                if ( ( nCurAge & m_nCompactFactor ) == 0 )
                     compact_list( nCurAge );
             }
 
@@ -760,10 +760,10 @@ namespace cds { namespace algo {
                     m_Stat.onPassiveWaitIteration();
 
                     // Wait while operation processing
-                    if ( m_waitStrategy.wait( *this, *pRec ))
+                    if ( m_waitStrategy.wait( *this, *pRec ) )
                         m_Stat.onWakeupByNotifying();
 
-                    if ( m_Mutex.try_lock()) {
+                    if ( m_Mutex.try_lock() ) {
                         if ( pRec->op( memory_model::memory_order_acquire ) == req_Response ) {
                             // Operation is done
                             m_Mutex.unlock();
@@ -782,18 +782,21 @@ namespace cds { namespace algo {
                 return true;
             }
 
-            void compact_list( unsigned int const nCurAge )
+            void compact_list( unsigned int nCurAge )
             {
-                // Thinning publication list
-                publication_record * pPrev = nullptr;
-                for ( publication_record * p = m_pHead; p; ) {
-                    if ( p->nState.load( memory_model::memory_order_acquire ) == active
-                      && p->nAge.load( memory_model::memory_order_acquire ) + m_nCompactFactor < nCurAge )
-                    {
-                        if ( pPrev ) {
-                            publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
+                // Compacts publication list
+                // This function is called only by combiner thread
+
+            try_again:
+                publication_record * pPrev = m_pHead;
+                for ( publication_record * p = pPrev->pNext.load( memory_model::memory_order_acquire ); p; ) {
+                    switch ( p->nState.load( memory_model::memory_order_relaxed ) ) {
+                    case active:
+                        if ( p->nAge.load( memory_model::memory_order_relaxed ) + m_nCompactFactor < nCurAge )
+                        {
+                            publication_record * pNext = p->pNext.load( memory_model::memory_order_relaxed );
                             if ( pPrev->pNext.compare_exchange_strong( p, pNext,
-                                memory_model::memory_order_release, atomics::memory_order_relaxed ))
+                                memory_model::memory_order_acquire, atomics::memory_order_relaxed ) )
                             {
                                 p->nState.store( inactive, memory_model::memory_order_release );
                                 p = pNext;
@@ -801,28 +804,41 @@ namespace cds { namespace algo {
                                 continue;
                             }
                         }
+                        break;
+
+                    case removed:
+                        publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
+                        if ( cds_likely( pPrev->pNext.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))) {
+                            p = pNext;
+                            continue;
+                        }
+                        else {
+                            // CAS can be failed only in beginning of list
+                            assert( pPrev == m_pHead );
+                            goto try_again;
+                        }
                     }
                     pPrev = p;
                     p = p->pNext.load( memory_model::memory_order_acquire );
                 }
 
-                m_Stat.onCompactPublicationList();
-            }
-
-            publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
-            {
-                // m_pHead is persistent node and cannot be deleted
-                assert( pPrev != nullptr );
-                assert( p != m_pHead );
+                // Iterate over allocated list to find removed records
+                pPrev = m_pAllocatedHead;
+                for ( publication_record * p = pPrev->pNextAllocated.load( memory_model::memory_order_acquire ); p; ) {
+                    if ( p->nState.load( memory_model::memory_order_relaxed ) == removed ) {
+                        publication_record * pNext = p->pNextAllocated.load( memory_model::memory_order_relaxed );
+                        if ( pPrev->pNextAllocated.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
+                            free_publication_record( static_cast<publication_record_type *>( p ));
+                            p = pNext;
+                            continue;
+                        }
+                    }
 
-                publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
-                if ( pPrev->pNext.compare_exchange_strong( p, pNext,
-                    memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
-                {
-                    free_publication_record( static_cast<publication_record_type *>( p ));
-                    m_Stat.onDeletePubRecord();
+                    pPrev = p;
+                    p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
                 }
-                return pNext;
+
+                m_Stat.onCompactPublicationList();
             }
             //@endcond
         };