Fixed MichaelList<RCU> removal bug (TBC: get() is still not working properly)
authorkhizmax <khizmax@gmail.com>
Thu, 21 May 2015 09:38:44 +0000 (12:38 +0300)
committerkhizmax <khizmax@gmail.com>
Thu, 21 May 2015 09:38:44 +0000 (12:38 +0300)
cds/container/michael_kvlist_rcu.h
cds/container/michael_list_rcu.h
cds/intrusive/michael_list_rcu.h

index d113430013d1ba9bb3c03dfc752bb0f88e7a2cdf..b3d53e1ab0a476a137c0e0a32411961b62a29ed6 100644 (file)
@@ -113,7 +113,7 @@ namespace cds { namespace container {
         typedef typename base_class::rcu_check_deadlock rcu_check_deadlock ; ///< RCU deadlock checking policy
 
         typedef typename gc::scoped_lock    rcu_lock ;  ///< RCU scoped lock
-        static CDS_CONSTEXPR const bool c_bExtractLockExternal = base_class::c_bExtractLockExternal; ///< Group of \p extract_xxx functions require external locking
+        static CDS_CONSTEXPR const bool c_bExtractLockExternal = base_class::c_bExtractLockExternal; ///< Group of \p extract_xxx functions do not require external locking
 
     protected:
         //@cond
@@ -477,7 +477,7 @@ namespace cds { namespace container {
 
             The functor \p Func interface:
             \code
-            struct extractor {
+            struct functor {
                 void operator()(value_type& val) { ... }
             };
             \endcode
@@ -515,10 +515,9 @@ namespace cds { namespace container {
             unlinks it from the list, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item found.
             If \p key is not found the function returns an empty \p exempt_ptr.
 
-            @note The function does NOT call RCU read-side lock or synchronization,
-            and does NOT dispose the item found. It just excludes the item from the list
-            and returns a pointer to item found.
-            You should lock RCU before calling this function.
+            @note The function does NOT dispose the item found. 
+            It just excludes the item from the list and returns a pointer to item found.
+            You shouldn't lock RCU before calling this function.
 
             \code
             #include <cds/urcu/general_buffered.h>
@@ -531,19 +530,18 @@ namespace cds { namespace container {
             // ...
 
             rcu_michael_list::exempt_ptr p;
-            {
-                // first, we should lock RCU
-                rcu_michael_list::rcu_lock sl;
-
-                // Now, you can apply extract function
-                // Note that you must not delete the item found inside the RCU lock
-                p = theList.extract( 10 );
-                if ( p ) {
-                    // do something with p
-                    ...
-                }
+
+            // The RCU should NOT be locked when extract() is called!
+            assert( !rcu::is_locked() );
+
+            // extract() call
+            p = theList.extract( 10 );
+            if ( p ) {
+                // do something with p
+                ...
             }
-            // Outside RCU lock section we may safely release extracted pointer.
+
+            // we may safely release extracted pointer here.
             // release() passes the pointer to RCU reclamation cycle.
             p.release();
             \endcode
@@ -577,7 +575,7 @@ namespace cds { namespace container {
             The function makes RCU lock internally.
         */
         template <typename Q>
-        bool find( Q const& key ) const
+        bool find( Q const& key )
         {
             return find_at( head(), key, intrusive_key_comparator() );
         }
@@ -590,7 +588,7 @@ namespace cds { namespace container {
             \p pred must imply the same element order as the comparator used for building the list.
         */
         template <typename Q, typename Less>
-        bool find_with( Q const& key, Less pred ) const
+        bool find_with( Q const& key, Less pred )
         {
             CDS_UNUSED( pred );
             return find_at( head(), key, typename maker::template less_wrapper<Less>::type() );
@@ -617,7 +615,7 @@ namespace cds { namespace container {
             The function returns \p true if \p key is found, \p false otherwise.
         */
         template <typename Q, typename Func>
-        bool find( Q const& key, Func f ) const
+        bool find( Q const& key, Func f )
         {
             return find_at( head(), key, intrusive_key_comparator(), f );
         }
@@ -630,7 +628,7 @@ namespace cds { namespace container {
             \p pred must imply the same element order as the comparator used for building the list.
         */
         template <typename Q, typename Less, typename Func>
-        bool find_with( Q const& key, Less pred, Func f ) const
+        bool find_with( Q const& key, Less pred, Func f )
         {
             CDS_UNUSED( pred );
             return find_at( head(), key, typename maker::template less_wrapper<Less>::type(), f );
@@ -664,7 +662,7 @@ namespace cds { namespace container {
             \endcode
         */
         template <typename K>
-        value_type * get( K const& key ) const
+        value_type * get( K const& key )
         {
             return get_at( head(), key, intrusive_key_comparator());
         }
@@ -679,7 +677,7 @@ namespace cds { namespace container {
             \p pred must imply the same element order as the comparator used for building the list.
         */
         template <typename K, typename Less>
-        value_type * get_with( K const& key, Less pred ) const
+        value_type * get_with( K const& key, Less pred )
         {
             CDS_UNUSED( pred );
             return get_at( head(), key, typename maker::template less_wrapper<Less>::type() );
@@ -788,19 +786,19 @@ namespace cds { namespace container {
         }
 
         template <typename K, typename Compare>
-        bool find_at( head_type& refHead, K const& key, Compare cmp ) const
+        bool find_at( head_type& refHead, K const& key, Compare cmp )
         {
             return base_class::find_at( refHead, key, cmp, [](node_type&, K const&) {} );
         }
 
         template <typename K, typename Compare, typename Func>
-        bool find_at( head_type& refHead, K& key, Compare cmp, Func f ) const
+        bool find_at( head_type& refHead, K& key, Compare cmp, Func f )
         {
             return base_class::find_at( refHead, key, cmp, [&f](node_type& node, K const&){ f( node.m_Data ); });
         }
 
         template <typename K, typename Compare>
-        value_type * get_at( head_type& refHead, K const& val, Compare cmp ) const
+        value_type * get_at( head_type& refHead, K const& val, Compare cmp )
         {
             node_type * pNode = base_class::get_at( refHead, val, cmp );
             return pNode ? &pNode->m_Data : nullptr;
index cc53b77351ed8b7f39d284448966b14184caff41..ee2904a8b72768813a205a388a34c76704c8f755 100644 (file)
@@ -123,7 +123,7 @@ namespace cds { namespace container {
         typedef typename base_class::rcu_check_deadlock rcu_check_deadlock ; ///< RCU deadlock checking policy
 
         typedef typename gc::scoped_lock    rcu_lock ;  ///< RCU scoped lock
-        static CDS_CONSTEXPR const bool c_bExtractLockExternal = base_class::c_bExtractLockExternal; ///< Group of \p extract_xxx functions require external locking
+        static CDS_CONSTEXPR const bool c_bExtractLockExternal = base_class::c_bExtractLockExternal; ///< Group of \p extract_xxx functions do not require external locking
 
     protected:
         //@cond
@@ -452,7 +452,7 @@ namespace cds { namespace container {
 
             The functor \p Func interface:
             \code
-            struct extractor {
+            struct functor {
                 void operator()(const value_type& val) { ... }
             };
             \endcode
@@ -493,10 +493,9 @@ namespace cds { namespace container {
             unlinks it from the list, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item found.
             If the item with the key equal to \p key is not found the function returns an empty \p exempt_ptr.
 
-            @note The function does NOT call RCU read-side lock or synchronization,
-            and does NOT dispose the item found. It just excludes the item from the list
+            @note The function does NOT dispose the item found. It just excludes the item from the list
             and returns a pointer to item found.
-            You should lock RCU before calling this function.
+            You shouldn't lock RCU before calling this function.
 
             \code
             #include <cds/urcu/general_buffered.h>
@@ -509,19 +508,18 @@ namespace cds { namespace container {
             // ...
 
             rcu_michael_list::exempt_ptr p;
-            {
-                // first, we should lock RCU
-                rcu::scoped_lock sl;
-
-                // Now, you can apply extract function
-                // Note that you must not delete the item found inside the RCU lock
-                p = theList.extract( 10 )
-                if ( p ) {
-                    // do something with p
-                    ...
-                }
+
+            // The RCU should NOT be locked when extract() is called!
+            assert( !rcu::is_locked() );
+                
+            // extract() call
+            p = theList.extract( 10 )
+            if ( p ) {
+                // do something with p
+                ...
             }
-            // Outside RCU lock section we may safely release extracted pointer.
+
+            // we may safely release extracted pointer here.
             // release() passes the pointer to RCU reclamation cycle.
             p.release();
             \endcode
@@ -555,7 +553,7 @@ namespace cds { namespace container {
             The function makes RCU lock internally.
         */
         template <typename Q>
-        bool find( Q const& key ) const
+        bool find( Q const& key )
         {
             return find_at( head(), key, intrusive_key_comparator() );
         }
@@ -568,7 +566,7 @@ namespace cds { namespace container {
             \p pred must imply the same element order as the comparator used for building the list.
         */
         template <typename Q, typename Less>
-        bool find_with( Q const& key, Less pred ) const
+        bool find_with( Q const& key, Less pred )
         {
             CDS_UNUSED( pred );
             return find_at( head(), key, typename maker::template less_wrapper<Less>::type() );
@@ -595,13 +593,13 @@ namespace cds { namespace container {
             The function returns \p true if \p val is found, \p false otherwise.
         */
         template <typename Q, typename Func>
-        bool find( Q& key, Func f ) const
+        bool find( Q& key, Func f )
         {
             return find_at( head(), key, intrusive_key_comparator(), f );
         }
         //@cond
         template <typename Q, typename Func>
-        bool find( Q const& key, Func f ) const
+        bool find( Q const& key, Func f )
         {
             return find_at( head(), key, intrusive_key_comparator(), f );
         }
@@ -615,14 +613,14 @@ namespace cds { namespace container {
             \p pred must imply the same element order as the comparator used for building the list.
         */
         template <typename Q, typename Less, typename Func>
-        bool find_with( Q& key, Less pred, Func f ) const
+        bool find_with( Q& key, Less pred, Func f )
         {
             CDS_UNUSED( pred );
             return find_at( head(), key, typename maker::template less_wrapper<Less>::type(), f );
         }
         //@cond
         template <typename Q, typename Less, typename Func>
-        bool find_with( Q const& key, Less pred, Func f ) const
+        bool find_with( Q const& key, Less pred, Func f )
         {
             CDS_UNUSED( pred );
             return find_at( head(), key, typename maker::template less_wrapper<Less>::type(), f );
@@ -657,7 +655,7 @@ namespace cds { namespace container {
             \endcode
         */
         template <typename Q>
-        value_type * get( Q const& key ) const
+        value_type * get( Q const& key )
         {
             return get_at( head(), key, intrusive_key_comparator());
         }
@@ -672,7 +670,7 @@ namespace cds { namespace container {
             \p pred must imply the same element order as the comparator used for building the list.
         */
         template <typename Q, typename Less>
-        value_type * get_with( Q const& key, Less pred ) const
+        value_type * get_with( Q const& key, Less pred )
         {
             CDS_UNUSED( pred );
             return get_at( head(), key, typename maker::template less_wrapper<Less>::type());
@@ -767,19 +765,19 @@ namespace cds { namespace container {
         }
 
         template <typename Q, typename Compare>
-        bool find_at( head_type& refHead, Q const& key, Compare cmp ) const
+        bool find_at( head_type& refHead, Q const& key, Compare cmp )
         {
             return base_class::find_at( refHead, key, cmp, [](node_type&, Q const &) {} );
         }
 
         template <typename Q, typename Compare, typename Func>
-        bool find_at( head_type& refHead, Q& val, Compare cmp, Func f ) const
+        bool find_at( head_type& refHead, Q& val, Compare cmp, Func f )
         {
             return base_class::find_at( refHead, val, cmp, [&f](node_type& node, Q& v){ f( node_to_value(node), v ); });
         }
 
         template <typename Q, typename Compare>
-        value_type * get_at( head_type& refHead, Q const& val, Compare cmp ) const
+        value_type * get_at( head_type& refHead, Q const& val, Compare cmp )
         {
             node_type * pNode = base_class::get_at( refHead, val, cmp );
             return pNode ? &pNode->m_Value : nullptr;
index 540832299d30037b05aeebeec408c3bd597a0441..8a69ae6c5bf2d9056c24168e87383f550a318290 100644 (file)
@@ -95,7 +95,7 @@ namespace cds { namespace intrusive {
         typedef typename traits::rcu_check_deadlock    rcu_check_deadlock; ///< Deadlock checking policy
 
         typedef typename gc::scoped_lock    rcu_lock ;  ///< RCU scoped lock
-        static CDS_CONSTEXPR const bool c_bExtractLockExternal = true; ///< Group of \p extract_xxx functions require external locking
+        static CDS_CONSTEXPR const bool c_bExtractLockExternal = false; ///< Group of \p extract_xxx functions do not require external locking
 
         //@cond
         // Rebind traits (split-list support)
@@ -117,30 +117,10 @@ namespace cds { namespace intrusive {
         atomic_node_ptr     m_pHead         ;   ///< Head pointer
         item_counter        m_ItemCounter   ;   ///< Item counter
 
+    protected:
         //@cond
-        /// Position pointer for item search
-        struct position {
-            atomic_node_ptr * pPrev ;   ///< Previous node
-            node_type * pCur        ;   ///< Current node
-            node_type * pNext       ;   ///< Next node
-
-            atomic_node_ptr& refHead;
-            node_type * pDelChain; ///< Head of deleted node chain
-
-            position( atomic_node_ptr& head )
-                : refHead( head )
-                , pDelChain( nullptr )
-            {}
-
-#       ifdef _DEBUG
-            ~position()
-            {
-                assert( pDelChain == nullptr );
-            }
-#       endif
-        };
-
-        enum {
+        enum erase_node_mask
+        {
             erase_mask   = 1,
             extract_mask = 3
         };
@@ -161,13 +141,6 @@ namespace cds { namespace intrusive {
                 disposer()( p );
             }
         };
-        //@endcond
-
-    public:
-        using exempt_ptr = cds::urcu::exempt_ptr< gc, value_type, value_type, clear_and_dispose, void >; ///< pointer to extracted node
-
-    protected:
-        //@cond
 
         static void dispose_node( node_type * pNode )
         {
@@ -177,6 +150,43 @@ namespace cds { namespace intrusive {
             gc::template retire_ptr<clear_and_dispose>( node_traits::to_value_ptr( *pNode ) );
         }
 
+        /// Position pointer for item search
+        struct position {
+            atomic_node_ptr * pPrev ;   ///< Previous node
+            node_type * pCur        ;   ///< Current node
+            node_type * pNext       ;   ///< Next node
+
+            atomic_node_ptr& refHead;
+            node_type * pDelChain; ///< Head of deleted node chain
+
+            position( atomic_node_ptr& head )
+                : refHead( head )
+                , pDelChain( nullptr )
+            {}
+
+            ~position()
+            {
+                assert( !gc::is_locked() );
+
+                node_type * p = pDelChain;
+                if ( p ) {
+                    while ( p ) {
+                        node_type * pNext = p->m_pDelChain;
+                        dispose_node( p );
+                        p = pNext;
+                    }
+                }
+            }
+        };
+
+        //@endcond
+
+    public:
+        using exempt_ptr = cds::urcu::exempt_ptr< gc, value_type, value_type, clear_and_dispose, void >; ///< pointer to extracted node
+
+    protected:
+        //@cond
+
         bool link_node( node_type * pNode, position& pos )
         {
             assert( pNode != nullptr );
@@ -195,36 +205,21 @@ namespace cds { namespace intrusive {
             pos.pDelChain = pDel;
         }
 
-        static void free_node_chain( position& pos )
-        {
-            assert( !gc::is_locked() );
-
-            node_type * p = pos.pDelChain;
-            if ( p ) {
-                pos.pDelChain = nullptr;
-                while ( p ) {
-                    node_type * pNext = p->m_pDelChain;
-                    dispose_node( p );
-                    p = pNext;
-                }
-            }
-        }
-
-        bool unlink_node( position& pos, bool bExtract )
+        bool unlink_node( position& pos, erase_node_mask nMask )
         {
             // Mark the node (logical deletion)
             marked_node_ptr next(pos.pNext, 0);
 
-            int const nMask = bExtract ? extract_mask : erase_mask;
             if ( pos.pCur->m_pNext.compare_exchange_strong( next, next | nMask, memory_model::memory_order_acq_rel, atomics::memory_order_relaxed )) {
 
-                // Try physical removal
+                // Try physical removal - fast path
                 marked_node_ptr cur(pos.pCur);
                 if ( pos.pPrev->compare_exchange_strong(cur, marked_node_ptr(pos.pNext), memory_model::memory_order_acquire, atomics::memory_order_relaxed) ) {
-                    if ( !bExtract )
+                    if ( nMask == erase_mask )
                         link_to_remove_chain( pos, pos.pCur );
                 }
                 else {
+                    // Slow path
                     search( pos.refHead, *node_traits::to_value_ptr( pos.pCur ), pos, key_comparator() );
                 }
                 return true;
@@ -562,10 +557,9 @@ namespace cds { namespace intrusive {
             unlinks it from the list, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item found.
             If \p key is not found the function returns an empty \p exempt_ptr.
 
-            @note The function does NOT call RCU read-side lock or synchronization,
-            and does NOT dispose the item found. It just unlinks the item from the list
+            @note The function does NOT dispose the item found. It just unlinks the item from the list
             and returns a pointer to item found.
-            You should lock RCU before calling this function, and you should manually release
+            You shouldn't lock RCU before calling this function, and you should manually release
             \p dest exempt pointer outside the RCU lock before reusing the pointer.
 
             \code
@@ -579,17 +573,15 @@ namespace cds { namespace intrusive {
             // ...
 
             rcu_michael_list::exempt_ptr p1;
-            {
-                // first, we should lock RCU
-                rcu::scoped_lock sl;
-
-                // Now, you can apply extract function
-                // Note that you must not delete the item found inside the RCU lock
-                p1 = theList.extract( 10 )
-                if ( p1 ) {
-                    // do something with p1
-                    ...
-                }
+
+            // The RCU should NOT be locked when extract() is called!
+            assert( !rcu::is_locked() );
+
+            // You can call extract() function
+            p1 = theList.extract( 10 );
+            if ( p1 ) {
+                // do something with p1
+                ...
             }
 
             // We may safely release p1 here
@@ -640,15 +632,15 @@ namespace cds { namespace intrusive {
             The function returns \p true if \p val is found, \p false otherwise.
         */
         template <typename Q, typename Func>
-        bool find( Q& key, Func f ) const
+        bool find( Q& key, Func f )
         {
-            return find_at( const_cast<atomic_node_ptr&>(m_pHead), key, key_comparator(), f );
+            return find_at( m_pHead, key, key_comparator(), f );
         }
         //@cond
         template <typename Q, typename Func>
-        bool find( Q const& key, Func f ) const
+        bool find( Q const& key, Func f )
         {
-            return find_at( const_cast<atomic_node_ptr&>(m_pHead), key, key_comparator(), f );
+            return find_at( m_pHead, key, key_comparator(), f );
         }
         //@endcond
 
@@ -660,17 +652,17 @@ namespace cds { namespace intrusive {
             \p pred must imply the same element order as the comparator used for building the list.
         */
         template <typename Q, typename Less, typename Func>
-        bool find_with( Q& key, Less pred, Func f ) const
+        bool find_with( Q& key, Less pred, Func f )
         {
             CDS_UNUSED( pred );
-            return find_at( const_cast<atomic_node_ptr&>( m_pHead ), key, cds::opt::details::make_comparator_from_less<Less>(), f );
+            return find_at( m_pHead, key, cds::opt::details::make_comparator_from_less<Less>(), f );
         }
         //@cond
         template <typename Q, typename Less, typename Func>
-        bool find_with( Q const& key, Less pred, Func f ) const
+        bool find_with( Q const& key, Less pred, Func f )
         {
             CDS_UNUSED( pred );
-            return find_at( const_cast<atomic_node_ptr&>(m_pHead), key, cds::opt::details::make_comparator_from_less<Less>(), f );
+            return find_at( m_pHead, key, cds::opt::details::make_comparator_from_less<Less>(), f );
         }
         //@endcond
 
@@ -680,9 +672,9 @@ namespace cds { namespace intrusive {
             and returns \p true if \p val found or \p false otherwise.
         */
         template <typename Q>
-        bool find( Q const& key ) const
+        bool find( Q const& key )
         {
-            return find_at( const_cast<atomic_node_ptr&>( m_pHead ), key, key_comparator() );
+            return find_at( m_pHead, key, key_comparator() );
         }
 
         /// Finds \p key using \p pred predicate for searching
@@ -693,10 +685,10 @@ namespace cds { namespace intrusive {
             \p pred must imply the same element order as the comparator used for building the list.
         */
         template <typename Q, typename Less>
-        bool find_with( Q const& key, Less pred ) const
+        bool find_with( Q const& key, Less pred )
         {
             CDS_UNUSED( pred );
-            return find_at( const_cast<atomic_node_ptr&>( m_pHead ), key, cds::opt::details::make_comparator_from_less<Less>() );
+            return find_at( m_pHead, key, cds::opt::details::make_comparator_from_less<Less>() );
         }
 
         /// Finds \p key and return the item found
@@ -727,7 +719,7 @@ namespace cds { namespace intrusive {
             \endcode
         */
         template <typename Q>
-        value_type * get( Q const& key ) const
+        value_type * get( Q const& key )
         {
             return get_at( const_cast<atomic_node_ptr&>( m_pHead ), key, key_comparator());
         }
@@ -742,7 +734,7 @@ namespace cds { namespace intrusive {
             \p pred must imply the same element order as the comparator used for building the list.
         */
         template <typename Q, typename Less>
-        value_type * get_with( Q const& key, Less pred ) const
+        value_type * get_with( Q const& key, Less pred )
         {
             CDS_UNUSED( pred );
             return get_at( const_cast<atomic_node_ptr&>( m_pHead ), key, cds::opt::details::make_comparator_from_less<Less>());
@@ -823,29 +815,10 @@ namespace cds { namespace intrusive {
 
         bool insert_at( atomic_node_ptr& refHead, value_type& val )
         {
-            rcu_lock l;
-            return insert_at_locked( refHead, val );
-        }
-
-        bool insert_at_locked( atomic_node_ptr& refHead, value_type& val )
-        {
-            // RCU lock should be locked!!!
-            assert( gc::is_locked() );
-
-            link_checker::is_empty( node_traits::to_node_ptr( val ) );
             position pos( refHead );
-
-            while ( true ) {
-                if ( search( refHead, val, pos, key_comparator() ) )
-                    return false;
-
-                if ( link_node( node_traits::to_node_ptr( val ), pos ) ) {
-                    ++m_ItemCounter;
-                    return true;
-                }
-
-                // clear next field
-                node_traits::to_node_ptr( val )->m_pNext.store( marked_node_ptr(), memory_model::memory_order_relaxed );
+            {
+                rcu_lock l;
+                return insert_at_locked( pos, val );
             }
         }
 
@@ -855,20 +828,23 @@ namespace cds { namespace intrusive {
             link_checker::is_empty( node_traits::to_node_ptr( val ) );
             position pos( refHead );
 
-            rcu_lock l;
-            while ( true ) {
-                if ( search( refHead, val, pos, key_comparator() ) )
-                    return false;
+            {
+                rcu_lock l;
+                while ( true ) {
+                    if ( search( refHead, val, pos, key_comparator()))
+                        return false;
 
-                if ( link_node( node_traits::to_node_ptr( val ), pos ) ) {
-                    f( val );
-                    ++m_ItemCounter;
-                    return true;
-                }
+                    if ( link_node( node_traits::to_node_ptr( val ), pos ) ) {
+                        f( val );
+                        ++m_ItemCounter;
+                        return true;
+                    }
 
-                // clear next field
-                node_traits::to_node_ptr( val )->m_pNext.store( marked_node_ptr(), memory_model::memory_order_relaxed );
+                    // clear next field
+                    node_traits::to_node_ptr( val )->m_pNext.store( marked_node_ptr(), memory_model::memory_order_relaxed );
+                }
             }
+
         }
 
         iterator insert_at_( atomic_node_ptr& refHead, value_type& val )
@@ -881,47 +857,23 @@ namespace cds { namespace intrusive {
 
         template <typename Func>
         std::pair<iterator, bool> ensure_at_( atomic_node_ptr& refHead, value_type& val, Func func )
-        {
-            rcu_lock l;
-            return ensure_at_locked( refHead, val, func );
-        }
-
-        template <typename Func>
-        std::pair<iterator, bool> ensure_at_locked( atomic_node_ptr& refHead, value_type& val, Func func )
         {
             position pos( refHead );
-
-            // RCU lock should be locked!!!
-            assert( gc::is_locked() );
-
-            while ( true ) {
-                if ( search( refHead, val, pos, key_comparator() ) ) {
-                    assert( key_comparator()( val, *node_traits::to_value_ptr( *pos.pCur ) ) == 0 );
-
-                    func( false, *node_traits::to_value_ptr( *pos.pCur ), val );
-                    return std::make_pair( iterator( pos.pCur ), false );
-                }
-                else {
-                    link_checker::is_empty( node_traits::to_node_ptr( val ) );
-
-                    if ( link_node( node_traits::to_node_ptr( val ), pos ) ) {
-                        ++m_ItemCounter;
-                        func( true, val , val );
-                        return std::make_pair( iterator( node_traits::to_node_ptr( val )), true );
-                    }
-
-                    // clear the next field
-                    node_traits::to_node_ptr( val )->m_pNext.store( marked_node_ptr(), memory_model::memory_order_relaxed );
-                }
+            {
+                rcu_lock l;
+                return ensure_at_locked( pos, val, func );
             }
         }
 
         template <typename Func>
         std::pair<bool, bool> ensure_at( atomic_node_ptr& refHead, value_type& val, Func func )
         {
-            rcu_lock l;
-            std::pair<iterator, bool> ret = ensure_at_locked( refHead, val, func );
-            return std::make_pair( ret.first != end(), ret.second );
+            position pos( refHead );
+            {
+                rcu_lock l;
+                std::pair<iterator, bool> ret = ensure_at_locked( pos, val, func );
+                return std::make_pair( ret.first != end(), ret.second );
+            }
         }
 
         bool unlink_at( atomic_node_ptr& refHead, value_type& val )
@@ -935,20 +887,19 @@ namespace cds { namespace intrusive {
                     rcu_lock l;
                     if ( !search( refHead, val, pos, key_comparator() ) || node_traits::to_value_ptr( *pos.pCur ) != &val )
                         return false;
-                    if ( !unlink_node( pos, false )) {
+                    if ( !unlink_node( pos, erase_mask )) {
                         bkoff();
                         continue;
                     }
                 }
 
                 --m_ItemCounter;
-                free_node_chain( pos );
                 return true;
             }
         }
 
         template <typename Q, typename Compare, typename Func>
-        bool erase_at( atomic_node_ptr& refHead, Q const& val, Compare cmp, Func f, position& pos )
+        bool erase_at( position& pos, Q const& val, Compare cmp, Func f )
         {
             back_off bkoff;
             check_deadlock_policy::check();
@@ -956,9 +907,9 @@ namespace cds { namespace intrusive {
             for (;;) {
                 {
                     rcu_lock l;
-                    if ( !search( refHead, val, pos, cmp ) )
+                    if ( !search( pos.refHead, val, pos, cmp ) )
                         return false;
-                    if ( !unlink_node( pos, false )) {
+                    if ( !unlink_node( pos, erase_mask )) {
                         bkoff();
                         continue;
                     }
@@ -966,7 +917,6 @@ namespace cds { namespace intrusive {
 
                 f( *node_traits::to_value_ptr( *pos.pCur ) );
                 --m_ItemCounter;
-                free_node_chain( pos );
                 return true;
             }
         }
@@ -975,14 +925,14 @@ namespace cds { namespace intrusive {
         bool erase_at( atomic_node_ptr& refHead, Q const& val, Compare cmp, Func f )
         {
             position pos( refHead );
-            return erase_at( refHead, val, cmp, f, pos );
+            return erase_at( pos, val, cmp, f );
         }
 
         template <typename Q, typename Compare>
         bool erase_at( atomic_node_ptr& refHead, const Q& val, Compare cmp )
         {
             position pos( refHead );
-            return erase_at( refHead, val, cmp, [](value_type const&){}, pos );
+            return erase_at( pos, val, cmp, [](value_type const&){} );
         }
 
         template <typename Q, typename Compare >
@@ -990,64 +940,58 @@ namespace cds { namespace intrusive {
         {
             position pos( refHead );
             back_off bkoff;
-            assert( gc::is_locked() )  ;   // RCU must be locked!!!
+            assert( !gc::is_locked() )  ;   // RCU must not be locked!!!
 
-            for (;;) {
-                if ( !search( refHead, val, pos, cmp ) )
-                    return nullptr;
-                if ( !unlink_node( pos, true )) {
-                    bkoff();
-                    continue;
-                }
+            {
+                rcu_lock l;
+                for (;;) {
+                    if ( !search( refHead, val, pos, cmp ) )
+                        return nullptr;
+                    if ( !unlink_node( pos, extract_mask )) {
+                        bkoff();
+                        continue;
+                    }
 
-                --m_ItemCounter;
-                return node_traits::to_value_ptr( pos.pCur );
+                    --m_ItemCounter;
+                    return node_traits::to_value_ptr( pos.pCur );
+                }
             }
         }
 
         template <typename Q, typename Compare, typename Func>
-        bool find_at( atomic_node_ptr& refHead, Q& val, Compare cmp, Func f ) const
+        bool find_at( atomic_node_ptr& refHead, Q& val, Compare cmp, Func f )
         {
             position pos( refHead );
 
-            rcu_lock l;
-            if ( search( refHead, val, pos, cmp ) ) {
-                assert( pos.pCur != nullptr );
-                f( *node_traits::to_value_ptr( *pos.pCur ), val );
-                return true;
+            {
+                rcu_lock l;
+                if ( search( refHead, val, pos, cmp ) ) {
+                    assert( pos.pCur != nullptr );
+                    f( *node_traits::to_value_ptr( *pos.pCur ), val );
+                    return true;
+                }
+                return false;
             }
-            return false;
         }
 
         template <typename Q, typename Compare>
-        bool find_at( atomic_node_ptr& refHead, Q const& val, Compare cmp ) const
+        bool find_at( atomic_node_ptr& refHead, Q const& val, Compare cmp )
         {
-            rcu_lock l;
-            return find_at_( refHead, val, cmp ) != end();
+            position pos( refHead );
+            {
+                rcu_lock l;
+                return find_at_locked( pos, val, cmp ) != cend();
+            }
         }
 
         template <typename Q, typename Compare>
-        value_type * get_at( atomic_node_ptr& refHead, Q const& val, Compare cmp ) const
+        value_type * get_at( atomic_node_ptr& refHead, Q const& val, Compare cmp )
         {
             value_type * pFound = nullptr;
             return find_at( refHead, val, cmp,
                 [&pFound](value_type& found, Q const& ) { pFound = &found; } )
                 ? pFound : nullptr;
         }
-
-        template <typename Q, typename Compare>
-        const_iterator find_at_( atomic_node_ptr& refHead, Q const& val, Compare cmp ) const
-        {
-            assert( gc::is_locked() );
-            position pos( refHead );
-
-            if ( search( refHead, val, pos, cmp ) ) {
-                assert( pos.pCur != nullptr );
-                return const_iterator( pos.pCur );
-            }
-            return end();
-        }
-
         //@endcond
 
     protected:
@@ -1110,6 +1054,69 @@ namespace cds { namespace intrusive {
             }
         }
         //@endcond
+
+    private:
+        //@cond
+        bool insert_at_locked( position& pos, value_type& val )
+        {
+            // RCU lock should be locked!!!
+            assert( gc::is_locked() );
+            link_checker::is_empty( node_traits::to_node_ptr( val ) );
+
+            while ( true ) {
+                if ( search( pos.refHead, val, pos, key_comparator() ) )
+                    return false;
+
+                if ( link_node( node_traits::to_node_ptr( val ), pos ) ) {
+                    ++m_ItemCounter;
+                    return true;
+                }
+
+                // clear next field
+                node_traits::to_node_ptr( val )->m_pNext.store( marked_node_ptr(), memory_model::memory_order_relaxed );
+            }
+        }
+
+        template <typename Func>
+        std::pair<iterator, bool> ensure_at_locked( position& pos, value_type& val, Func func )
+        {
+            // RCU lock should be locked!!!
+            assert( gc::is_locked() );
+
+            while ( true ) {
+                if ( search( pos.refHead, val, pos, key_comparator() ) ) {
+                    assert( key_comparator()( val, *node_traits::to_value_ptr( *pos.pCur ) ) == 0 );
+
+                    func( false, *node_traits::to_value_ptr( *pos.pCur ), val );
+                    return std::make_pair( iterator( pos.pCur ), false );
+                }
+                else {
+                    link_checker::is_empty( node_traits::to_node_ptr( val ) );
+
+                    if ( link_node( node_traits::to_node_ptr( val ), pos ) ) {
+                        ++m_ItemCounter;
+                        func( true, val , val );
+                        return std::make_pair( iterator( node_traits::to_node_ptr( val )), true );
+                    }
+
+                    // clear the next field
+                    node_traits::to_node_ptr( val )->m_pNext.store( marked_node_ptr(), memory_model::memory_order_relaxed );
+                }
+            }
+        }
+
+        template <typename Q, typename Compare>
+        const_iterator find_at_locked( position& pos, Q const& val, Compare cmp )
+        {
+            assert( gc::is_locked() );
+
+            if ( search( pos.refHead, val, pos, cmp ) ) {
+                assert( pos.pCur != nullptr );
+                return const_iterator( pos.pCur );
+            }
+            return cend();
+        }
+        //@endcond
     };
 
 }}  // namespace cds::intrusive