Fixed MichaelList<RCU> removal bug (to be continued)
authorkhizmax <libcds.dev@gmail.com>
Thu, 21 May 2015 05:59:12 +0000 (08:59 +0300)
committerkhizmax <libcds.dev@gmail.com>
Thu, 21 May 2015 05:59:12 +0000 (08:59 +0300)
cds/intrusive/michael_list_rcu.h

index 25f9ce58b699812d432af821cea55bff6f7bd93f..540832299d30037b05aeebeec408c3bd597a0441 100644 (file)
 
 namespace cds { namespace intrusive {
 
+    //@cond
+    namespace michael_list {
+
+        /// Node specialization for uRCU
+        template <class RCU, typename Tag>
+        struct node< cds::urcu::gc< RCU >, Tag >
+        {
+            typedef cds::urcu::gc< RCU > gc;   ///< Garbage collector
+            typedef Tag                  tag;  ///< tag
+
+            typedef cds::details::marked_ptr<node, 1>   marked_ptr         ;   ///< marked pointer
+            typedef typename gc::template atomic_marked_ptr< marked_ptr>     atomic_marked_ptr   ;   ///< atomic marked pointer specific for GC
+
+            atomic_marked_ptr m_pNext ; ///< pointer to the next node in the container
+            node *            m_pDelChain; ///< Deleted node chain (local for a thread)
+
+            CDS_CONSTEXPR node() CDS_NOEXCEPT
+                : m_pNext( nullptr )
+                , m_pDelChain( nullptr )
+            {}
+        };
+    } // namespace michael_list
+    //@endcond
+
     /// Michael's lock-free ordered single-linked list (template specialization for \ref cds_urcu_desc "RCU")
     /** @ingroup cds_intrusive_list
         \anchor cds_intrusive_MichaelList_rcu
@@ -99,6 +123,26 @@ namespace cds { namespace intrusive {
             atomic_node_ptr * pPrev ;   ///< Previous node
             node_type * pCur        ;   ///< Current node
             node_type * pNext       ;   ///< Next node
+
+            atomic_node_ptr& refHead;
+            node_type * pDelChain; ///< Head of deleted node chain
+
+            position( atomic_node_ptr& head )
+                : refHead( head )
+                , pDelChain( nullptr )
+            {}
+
+#       ifdef _DEBUG
+            ~position()
+            {
+                assert( pDelChain == nullptr );
+            }
+#       endif
+        };
+
+        enum {
+            erase_mask   = 1,
+            extract_mask = 3
         };
 
         typedef cds::urcu::details::check_deadlock_policy< gc, rcu_check_deadlock>   check_deadlock_policy;
@@ -106,6 +150,7 @@ namespace cds { namespace intrusive {
         static void clear_links( node_type * pNode )
         {
             pNode->m_pNext.store( marked_node_ptr(), memory_model::memory_order_relaxed );
+            pNode->m_pDelChain = nullptr;
         }
 
         struct clear_and_dispose {
@@ -142,16 +187,47 @@ namespace cds { namespace intrusive {
             return pos.pPrev->compare_exchange_strong( p, marked_node_ptr(pNode), memory_model::memory_order_release, atomics::memory_order_relaxed );
         }
 
-        bool unlink_node( position& pos )
+        static void link_to_remove_chain( position& pos, node_type * pDel )
+        {
+            assert( pDel->m_pDelChain == nullptr );
+
+            pDel->m_pDelChain = pos.pDelChain;
+            pos.pDelChain = pDel;
+        }
+
+        static void free_node_chain( position& pos )
+        {
+            assert( !gc::is_locked() );
+
+            node_type * p = pos.pDelChain;
+            if ( p ) {
+                pos.pDelChain = nullptr;
+                while ( p ) {
+                    node_type * pNext = p->m_pDelChain;
+                    dispose_node( p );
+                    p = pNext;
+                }
+            }
+        }
+
+        bool unlink_node( position& pos, bool bExtract )
         {
-            // Mark the node (logical deleting)
+            // Mark the node (logical deletion)
             marked_node_ptr next(pos.pNext, 0);
-            if ( pos.pCur->m_pNext.compare_exchange_strong( next, marked_node_ptr(pos.pNext, 1), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
+
+            int const nMask = bExtract ? extract_mask : erase_mask;
+            if ( pos.pCur->m_pNext.compare_exchange_strong( next, next | nMask, memory_model::memory_order_acq_rel, atomics::memory_order_relaxed )) {
+
+                // Try physical removal
                 marked_node_ptr cur(pos.pCur);
-                if ( pos.pPrev->compare_exchange_strong( cur, marked_node_ptr( pos.pNext ), memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
-                    return true;
-                next |= 1;
-                CDS_VERIFY( pos.pCur->m_pNext.compare_exchange_strong( next, next ^ 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ));
+                if ( pos.pPrev->compare_exchange_strong(cur, marked_node_ptr(pos.pNext), memory_model::memory_order_acquire, atomics::memory_order_relaxed) ) {
+                    if ( !bExtract )
+                        link_to_remove_chain( pos, pos.pCur );
+                }
+                else {
+                    search( pos.refHead, *node_traits::to_value_ptr( pos.pCur ), pos, key_comparator() );
+                }
+                return true;
             }
             return false;
         }
@@ -679,8 +755,8 @@ namespace cds { namespace intrusive {
             RCU \p synchronize method can be called.
             Note that depending on RCU type used the \ref disposer invocation can be deferred.
 
-            The function can throw cds::urcu::rcu_deadlock exception if an deadlock is encountered and
-            deadlock checking policy is opt::v::rcu_throw_deadlock.
+            The function can throw \p cds::urcu::rcu_deadlock exception if an deadlock is encountered and
+            deadlock checking policy is \p opt::v::rcu_throw_deadlock.
         */
         void clear()
         {
@@ -691,7 +767,7 @@ namespace cds { namespace intrusive {
                 for (;;) {
                     {
                         rcu_lock l;
-                        pHead = m_pHead.load(memory_model::memory_order_consume);
+                        pHead = m_pHead.load(memory_model::memory_order_acquire);
                         if ( !pHead.ptr() )
                             break;
                         marked_node_ptr pNext( pHead->m_pNext.load(memory_model::memory_order_relaxed) );
@@ -745,12 +821,20 @@ namespace cds { namespace intrusive {
             return insert_at( refHead, *node_traits::to_value_ptr( pNode ) );
         }
 
-        bool insert_at( atomic_node_ptr& refHead, value_type& val, bool bLock = true )
+        bool insert_at( atomic_node_ptr& refHead, value_type& val )
         {
+            rcu_lock l;
+            return insert_at_locked( refHead, val );
+        }
+
+        bool insert_at_locked( atomic_node_ptr& refHead, value_type& val )
+        {
+            // RCU lock should be locked!!!
+            assert( gc::is_locked() );
+
             link_checker::is_empty( node_traits::to_node_ptr( val ) );
-            position pos;
+            position pos( refHead );
 
-            rcu_lock l( bLock );
             while ( true ) {
                 if ( search( refHead, val, pos, key_comparator() ) )
                     return false;
@@ -769,7 +853,7 @@ namespace cds { namespace intrusive {
         bool insert_at( atomic_node_ptr& refHead, value_type& val, Func f )
         {
             link_checker::is_empty( node_traits::to_node_ptr( val ) );
-            position pos;
+            position pos( refHead );
 
             rcu_lock l;
             while ( true ) {
@@ -787,20 +871,29 @@ namespace cds { namespace intrusive {
             }
         }
 
-        iterator insert_at_( atomic_node_ptr& refHead, value_type& val, bool bLock = true )
+        iterator insert_at_( atomic_node_ptr& refHead, value_type& val )
         {
-            rcu_lock l( bLock );
-            if ( insert_at( refHead, val, false ))
+            rcu_lock l;
+            if ( insert_at_locked( refHead, val ))
                 return iterator( node_traits::to_node_ptr( val ));
             return end();
         }
 
         template <typename Func>
-        std::pair<iterator, bool> ensure_at_( atomic_node_ptr& refHead, value_type& val, Func func, bool bLock = true )
+        std::pair<iterator, bool> ensure_at_( atomic_node_ptr& refHead, value_type& val, Func func )
         {
-            position pos;
+            rcu_lock l;
+            return ensure_at_locked( refHead, val, func );
+        }
+
+        template <typename Func>
+        std::pair<iterator, bool> ensure_at_locked( atomic_node_ptr& refHead, value_type& val, Func func )
+        {
+            position pos( refHead );
+
+            // RCU lock should be locked!!!
+            assert( gc::is_locked() );
 
-            rcu_lock l( bLock );
             while ( true ) {
                 if ( search( refHead, val, pos, key_comparator() ) ) {
                     assert( key_comparator()( val, *node_traits::to_value_ptr( *pos.pCur ) ) == 0 );
@@ -824,16 +917,16 @@ namespace cds { namespace intrusive {
         }
 
         template <typename Func>
-        std::pair<bool, bool> ensure_at( atomic_node_ptr& refHead, value_type& val, Func func, bool bLock = true )
+        std::pair<bool, bool> ensure_at( atomic_node_ptr& refHead, value_type& val, Func func )
         {
-            rcu_lock l( bLock );
-            std::pair<iterator, bool> ret = ensure_at_( refHead, val, func, false );
+            rcu_lock l;
+            std::pair<iterator, bool> ret = ensure_at_locked( refHead, val, func );
             return std::make_pair( ret.first != end(), ret.second );
         }
 
         bool unlink_at( atomic_node_ptr& refHead, value_type& val )
         {
-            position pos;
+            position pos( refHead );
             back_off bkoff;
             check_deadlock_policy::check();
 
@@ -842,14 +935,14 @@ namespace cds { namespace intrusive {
                     rcu_lock l;
                     if ( !search( refHead, val, pos, key_comparator() ) || node_traits::to_value_ptr( *pos.pCur ) != &val )
                         return false;
-                    if ( !unlink_node( pos )) {
+                    if ( !unlink_node( pos, false )) {
                         bkoff();
                         continue;
                     }
                 }
 
                 --m_ItemCounter;
-                dispose_node( pos.pCur );
+                free_node_chain( pos );
                 return true;
             }
         }
@@ -865,7 +958,7 @@ namespace cds { namespace intrusive {
                     rcu_lock l;
                     if ( !search( refHead, val, pos, cmp ) )
                         return false;
-                    if ( !unlink_node( pos )) {
+                    if ( !unlink_node( pos, false )) {
                         bkoff();
                         continue;
                     }
@@ -873,7 +966,7 @@ namespace cds { namespace intrusive {
 
                 f( *node_traits::to_value_ptr( *pos.pCur ) );
                 --m_ItemCounter;
-                dispose_node( pos.pCur );
+                free_node_chain( pos );
                 return true;
             }
         }
@@ -881,28 +974,28 @@ namespace cds { namespace intrusive {
         template <typename Q, typename Compare, typename Func>
         bool erase_at( atomic_node_ptr& refHead, Q const& val, Compare cmp, Func f )
         {
-            position pos;
+            position pos( refHead );
             return erase_at( refHead, val, cmp, f, pos );
         }
 
         template <typename Q, typename Compare>
         bool erase_at( atomic_node_ptr& refHead, const Q& val, Compare cmp )
         {
-            position pos;
+            position pos( refHead );
             return erase_at( refHead, val, cmp, [](value_type const&){}, pos );
         }
 
         template <typename Q, typename Compare >
         value_type * extract_at( atomic_node_ptr& refHead, Q const& val, Compare cmp )
         {
-            position pos;
+            position pos( refHead );
             back_off bkoff;
             assert( gc::is_locked() )  ;   // RCU must be locked!!!
 
             for (;;) {
                 if ( !search( refHead, val, pos, cmp ) )
                     return nullptr;
-                if ( !unlink_node( pos )) {
+                if ( !unlink_node( pos, true )) {
                     bkoff();
                     continue;
                 }
@@ -913,11 +1006,11 @@ namespace cds { namespace intrusive {
         }
 
         template <typename Q, typename Compare, typename Func>
-        bool find_at( atomic_node_ptr& refHead, Q& val, Compare cmp, Func f, bool bLock = true ) const
+        bool find_at( atomic_node_ptr& refHead, Q& val, Compare cmp, Func f ) const
         {
-            position pos;
+            position pos( refHead );
 
-            rcu_lock l( bLock );
+            rcu_lock l;
             if ( search( refHead, val, pos, cmp ) ) {
                 assert( pos.pCur != nullptr );
                 f( *node_traits::to_value_ptr( *pos.pCur ), val );
@@ -946,7 +1039,7 @@ namespace cds { namespace intrusive {
         const_iterator find_at_( atomic_node_ptr& refHead, Q const& val, Compare cmp ) const
         {
             assert( gc::is_locked() );
-            position pos;
+            position pos( refHead );
 
             if ( search( refHead, val, pos, cmp ) ) {
                 assert( pos.pCur != nullptr );
@@ -961,7 +1054,7 @@ namespace cds { namespace intrusive {
 
         //@cond
         template <typename Q, typename Compare>
-        bool search( atomic_node_ptr& refHead, const Q& val, position& pos, Compare cmp ) const
+        bool search( atomic_node_ptr& refHead, const Q& val, position& pos, Compare cmp )
         {
             // RCU lock should be locked!!!
             assert( gc::is_locked() );
@@ -988,17 +1081,22 @@ namespace cds { namespace intrusive {
                 pNext = pCur->m_pNext.load(memory_model::memory_order_acquire);
 
                 if ( pPrev->load(memory_model::memory_order_relaxed) != pCur
-                    || pNext != pCur->m_pNext.load(memory_model::memory_order_relaxed)
-                    || pNext.bits() != 0 )  // pNext contains deletion mark for pCur
+                    || pNext != pCur->m_pNext.load(memory_model::memory_order_relaxed))
                 {
-                    // if pCur is marked as deleted (pNext.bits() != 0)
-                    // we wait for physical removal.
-                    // Helping technique is not suitable for RCU since it requires
-                    // that the RCU should be in unlocking state.
                     bkoff();
                     goto try_again;
                 }
 
+                if ( pNext.bits() ) {
+                    // pCur is marked as deleted. Try to unlink it from the list
+                    if ( pPrev->compare_exchange_weak( pCur, marked_node_ptr( pNext.ptr() ), memory_model::memory_order_acquire, atomics::memory_order_relaxed ) ) {
+                        if ( pNext.bits() == erase_mask )
+                            link_to_remove_chain( pos, pCur.ptr() );
+                    }
+
+                    goto try_again;
+                }
+
                 assert( pCur.ptr() != nullptr );
                 int nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr() ), val );
                 if ( nCmp >= 0 ) {