SkipList: fixed erase() and find_fastpath() bugs
[libcds.git] / cds / intrusive / impl / skip_list.h
index 80c43ee5b262676edcbb3a9d1d3391cc3904391f..fb29f20a1ec231b24599c38fc2594ab20978652d 100644 (file)
@@ -1153,7 +1153,7 @@ namespace cds { namespace intrusive {
             if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr()),
                 memory_model::memory_order_acquire, atomics::memory_order_relaxed ) )
             {
-                if ( nLevel == 0 ) {
+                if ( pCur->level_unlinked() ) {
                     gc::retire( node_traits::to_value_ptr( pCur.ptr() ), dispose_node );
                     m_Stat.onEraseWhileFind();
                 }
@@ -1197,7 +1197,7 @@ namespace cds { namespace intrusive {
 
                     if ( pSucc.bits() ) {
                         // pCur is marked, i.e. logically deleted
-                        // try to help deleting pCur if pSucc is not being deleted
+                        // try to help deleting pCur
                         help_remove( nLevel, pPred, pCur, pSucc );
                         goto retry;
                     }
@@ -1258,7 +1258,7 @@ namespace cds { namespace intrusive {
 
                     if ( pSucc.bits() ) {
                         // pCur is marked, i.e. logically deleted.
-                        // try to help deleting pCur if pSucc is not being deleted
+                        // try to help deleting pCur
                         help_remove( nLevel, pPred, pCur, pSucc );
                         goto retry;
                     }
@@ -1307,7 +1307,7 @@ namespace cds { namespace intrusive {
 
                     if ( pSucc.bits() ) {
                         // pCur is marked, i.e. logically deleted.
-                        // try to help deleting pCur if pSucc is not being deleted
+                        // try to help deleting pCur
                         help_remove( nLevel, pPred, pCur, pSucc );
                         goto retry;
                     }
@@ -1331,17 +1331,15 @@ namespace cds { namespace intrusive {
         template <typename Func>
         bool insert_at_position( value_type& val, node_type * pNode, position& pos, Func f )
         {
-            unsigned int nHeight = pNode->height();
+            unsigned int const nHeight = pNode->height();
 
             for ( unsigned int nLevel = 1; nLevel < nHeight; ++nLevel )
                 pNode->next( nLevel ).store( marked_node_ptr(), memory_model::memory_order_relaxed );
 
             // Insert at level 0
             {
-                node_type* succ = pos.pSucc[0];
-
-                marked_node_ptr p( succ );
-                pNode->next( 0 ).store( p, memory_model::memory_order_release );
+                marked_node_ptr p( pos.pSucc[0] );
+                pNode->next( 0 ).store( p, memory_model::memory_order_relaxed );
                 if ( !pos.pPrev[0]->next( 0 ).compare_exchange_strong( p, marked_node_ptr( pNode ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
                     return false;
 
@@ -1352,25 +1350,38 @@ namespace cds { namespace intrusive {
             for ( unsigned int nLevel = 1; nLevel < nHeight; ++nLevel ) {
                 marked_node_ptr p;
                 while ( true ) {
-                    marked_node_ptr q( pos.pSucc[nLevel] );
+                    marked_node_ptr pSucc( pos.pSucc[nLevel] );
 
-                    if ( !pNode->next( nLevel ).compare_exchange_strong( p, q, memory_model::memory_order_release, atomics::memory_order_relaxed )) {
+                    // Set pNode->next
+                    // pNode->next must be null but can have a "logical deleted" flag if another thread is removing pNode right now
+                    if ( !pNode->next( nLevel ).compare_exchange_strong( p, pSucc,
+                        memory_model::memory_order_acq_rel, atomics::memory_order_acquire )) 
+                    {
                         // pNode has been marked as removed while we are inserting it
                         // Stop inserting
-                        assert( p.bits() );
-                        m_Stat.onLogicDeleteWhileInsert();
+                        assert( p.bits() != 0 );
+
+                        if ( pNode->level_unlinked( nHeight - nLevel )) {
+                            gc::retire( node_traits::to_value_ptr( pNode ), dispose_node );
+                            m_Stat.onEraseWhileInsert();
+                        }
+                        else
+                            m_Stat.onLogicDeleteWhileInsert();
                         return true;
                     }
+                    p = pSucc;
 
-                    p = q;
-                    bool const result = pos.pPrev[nLevel]->next( nLevel ).compare_exchange_strong( q, marked_node_ptr( pNode ),
-                        memory_model::memory_order_release, atomics::memory_order_relaxed );
-                    if ( result )
+                    // Link pNode into the list at nLevel
+                    if ( pos.pPrev[nLevel]->next( nLevel ).compare_exchange_strong( pSucc, marked_node_ptr( pNode ),
+                        memory_model::memory_order_release, atomics::memory_order_relaxed ))
+                    {
+                        // go to next level
                         break;
+                    }
 
                     // Renew insert position
                     m_Stat.onRenewInsertPosition();
-                    if ( !find_position( val, pos, key_comparator(), false ) ) {
+                    if ( !find_position( val, pos, key_comparator(), false )) {
                         // The node has been deleted while we are inserting it
                         m_Stat.onNotFoundWhileInsert();
                         return true;
@@ -1386,38 +1397,55 @@ namespace cds { namespace intrusive {
             assert( pDel != nullptr );
 
             marked_node_ptr pSucc;
+            back_off bkoff;
 
             // logical deletion (marking)
             for ( unsigned int nLevel = pDel->height() - 1; nLevel > 0; --nLevel ) {
-                while ( true ) {
-                    pSucc = pDel->next( nLevel );
-                    if ( pSucc.bits() || pDel->next( nLevel ).compare_exchange_weak( pSucc, pSucc | 1,
-                        memory_model::memory_order_release, atomics::memory_order_relaxed ) )
+                pSucc = pDel->next( nLevel ).load( memory_model::memory_order_relaxed );
+                if ( pSucc.bits() == 0 ) {
+                    bkoff.reset();
+                    while ( !( pDel->next( nLevel ).compare_exchange_weak( pSucc, pSucc | 1,
+                        memory_model::memory_order_release, atomics::memory_order_acquire ) 
+                        || pSucc.bits() != 0 ))
                     {
-                        break;
+                        bkoff();
+                        m_Stat.onMarkFailed();
                     }
                 }
             }
 
+            marked_node_ptr p( pDel->next( 0 ).load( memory_model::memory_order_relaxed ).ptr());
             while ( true ) {
-                marked_node_ptr p( pDel->next( 0 ).load( memory_model::memory_order_relaxed ).ptr() );
-                if ( pDel->next( 0 ).compare_exchange_strong( p, p | 1, memory_model::memory_order_release, atomics::memory_order_relaxed ) )
+                if ( pDel->next( 0 ).compare_exchange_strong( p, p | 1, memory_model::memory_order_release, atomics::memory_order_acquire ))
                 {
                     f( *node_traits::to_value_ptr( pDel ) );
 
                     // Physical deletion
                     // try fast erase
                     p = pDel;
+                    unsigned nCount = 0;
+
                     for ( int nLevel = static_cast<int>( pDel->height() - 1 ); nLevel >= 0; --nLevel ) {
+
                         pSucc = pDel->next( nLevel ).load( memory_model::memory_order_relaxed );
-                        if ( !pos.pPrev[nLevel]->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ),
-                            memory_model::memory_order_acquire, atomics::memory_order_relaxed ) )
+                        if ( !pos.pPrev[nLevel]->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr()),
+                            memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
                         {
+                            // Maybe, another threads already helped us to delete the node?..
+                            if ( nCount ) {
+                                if ( pDel->level_unlinked( nCount )) {
+                                    gc::retire( node_traits::to_value_ptr( pDel ), dispose_node );
+                                    m_Stat.onFastEraseHelped();
+                                    return true;
+                                }
+                            }
+
                             // Make slow erase
                             find_position( *node_traits::to_value_ptr( pDel ), pos, key_comparator(), false );
                             m_Stat.onSlowErase();
                             return true;
                         }
+                        ++nCount;
                     }
 
                     // Fast erasing success
@@ -1425,13 +1453,12 @@ namespace cds { namespace intrusive {
                     m_Stat.onFastErase();
                     return true;
                 }
-                else {
-                    if ( p.bits() ) {
-                        // Another thread is deleting pDel right now
-                        return false;
-                    }
+                else if ( p.bits() ) {
+                    // Another thread is deleting pDel right now
+                    return false;
                 }
                 m_Stat.onEraseRetry();
+                bkoff();
             }
         }
 
@@ -1444,12 +1471,17 @@ namespace cds { namespace intrusive {
         finsd_fastpath_result find_fastpath( Q& val, Compare cmp, Func f )
         {
             node_type * pPred;
-            typename gc::template GuardArray<2>  guards;
             marked_node_ptr pCur;
             marked_node_ptr pNull;
 
+            // guard array:
+            // 0 - pPred on level N
+            // 1 - pCur on level N
+            typename gc::template GuardArray<2> guards;
             back_off bkoff;
+            unsigned attempt = 0;
 
+        try_again:
             pPred = m_Head.head();
             for ( int nLevel = static_cast<int>( m_nHeight.load( memory_model::memory_order_relaxed ) - 1 ); nLevel >= 0; --nLevel ) {
                 pCur = guards.protect( 1, pPred->next( nLevel ), gc_protect );
@@ -1458,22 +1490,17 @@ namespace cds { namespace intrusive {
 
                 while ( pCur != pNull ) {
                     if ( pCur.bits() ) {
-                        unsigned int nAttempt = 0;
-                        bkoff.reset();
-                        while ( pCur.bits() && nAttempt++ < 16 ) {
+                        // pPred is being removed
+                        if ( ++attempt < 4 ) {
                             bkoff();
-                            pCur = guards.protect( 1, pPred->next( nLevel ), gc_protect );
+                            goto try_again;
                         }
 
-                        if ( pCur.bits() ) {
-                            // Maybe, we are on deleted node sequence
-                            // Abort searching, try slow-path
-                            return find_fastpath_abort;
-                        }
+                        return find_fastpath_abort;
                     }
 
                     if ( pCur.ptr() ) {
-                        int nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr() ), val );
+                        int nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr()), val );
                         if ( nCmp < 0 ) {
                             guards.copy( 0, 1 );
                             pPred = pCur.ptr();
@@ -1484,8 +1511,10 @@ namespace cds { namespace intrusive {
                             f( *node_traits::to_value_ptr( pCur.ptr() ), val );
                             return find_fastpath_found;
                         }
-                        else // pCur > val - go down
+                        else {
+                            // pCur > val - go down
                             break;
+                        }
                     }
                 }
             }