Intrusive MultiLevelHashSet draft done
authorkhizmax <libcds.dev@gmail.com>
Thu, 30 Jul 2015 20:31:12 +0000 (23:31 +0300)
committerkhizmax <libcds.dev@gmail.com>
Thu, 30 Jul 2015 20:31:12 +0000 (23:31 +0300)
cds/intrusive/details/multilevel_hashset_base.h
cds/intrusive/impl/multilevel_hashset.h

index 45fc510269b5f0d192696af6162bc185305d7219..6bb8768869a5720c2f2ce87ea9ca6e0003ee0241 100644 (file)
@@ -64,10 +64,65 @@ namespace cds { namespace intrusive {
         template <typename EventCounter = cds::atomicity::event_counter>
         struct stat {
             typedef EventCounter event_counter ; ///< Event counter type
+
+            event_counter   m_nInsertSuccess;   ///< Number of success \p insert() operations
+            event_counter   m_nInsertFailed;    ///< Number of failed \p insert() operations
+            event_counter   m_nInsertRetry;     ///< Number of attempts to insert new item
+            event_counter   m_nUpdateNew;       ///< Number of new item inserted for \p update()
+            event_counter   m_nUpdateExisting;  ///< Number of existing item updates
+            event_counter   m_nUpdateFailed;    ///< Number of failed \p update() call
+            event_counter   m_nUpdateRetry;     ///< Number of attempts to update the item
+            event_counter   m_nEraseSuccess;    ///< Number of successful \p erase(), \p unlink(), \p extract() operations
+            event_counter   m_nEraseFailed;     ///< Number of failed \p erase(), \p unlink(), \p extract() operations
+            event_counter   m_nEraseRetry;      ///< Number of attempts to \p erase() an item
+            event_counter   m_nFindSuccess;     ///< Number of successful \p find() and \p get() operations
+            event_counter   m_nFindFailed;      ///< Number of failed \p find() and \p get() operations
+
+            event_counter   m_nExpandNodeSuccess; ///< Number of succeeded attempts converting data node to array node
+            event_counter   m_nExpandNodeFailed;  ///< Number of failed attempts converting data node to array node
+            event_counter   m_nSlotChanged;     ///< Number of array node slot changing by other thread during an operation
+            event_counter   m_nSlotConverting;  ///< Number of events when we encounter a slot while it is converting to array node
+
+            void onInsertSuccess()              { ++m_nInsertSuccess;       }
+            void onInserFailed()                { ++m_nInsertFailed;        }
+            void onInsertRetry()                { ++m_nInsertRetry;         }
+            void onUpdateNew()                  { ++m_nUpdateNew;           }
+            void onUpdateExisting()             { ++m_nUpdateExisting;      }
+            void onUpdateFailed()               { ++m_nUpdateFailed;        }
+            void onUpdateRetry()                { ++m_nUpdateRetry;         }
+            void onEraseSuccess()               { ++m_nEraseSuccess;        }
+            void onEraseFailed()                { ++m_nEraseFailed;         }
+            void onEraseRetry()                 { ++m_nEraseRetry;          }
+            void onFindSuccess()                { ++m_nFinSuccess;          }
+            void onFindFailed()                 { ++m_nFindFailed;          }
+
+            void onExpandNodeSuccess()          { ++m_nExpandNodeSuccess;   }
+            void onExpandNodeFailed()           { ++m_nExpandNodeFailed;    }
+            void onSlotChanged()                { ++m_nSlotChanged;         }
+            void onSlotConverting()             { ++m_nSlotConverting;      }
         };
 
         /// \p MultiLevelHashSet empty internal statistics
         struct empty_stat {
+            //@cond
+            void onInsertSuccess()              const {}
+            void onInsertFailed()               const {}
+            void onInsertRetry()                const {}
+            void onUpdateNew()                  const {}
+            void onUpdateExisting()             const {}
+            void onUpdateFailed()               const {}
+            void onUpdateRetry()                const {}
+            void onEraseSuccess()               const {}
+            void onEraseFailed()                const {}
+            void onEraseRetry()                 const {}
+            void onFindSuccess()                const {}
+            void onFindFailed()                 const {}
+
+            void onExpandNodeSuccess()          const {}
+            void onExpandNodeFailed()           const {}
+            void onSlotChanged()                const {}
+            void onSlotConverting()             const {}
+            //@endcond
         };
 
         /// MultiLevelHashSet traits
@@ -87,7 +142,8 @@ namespace cds { namespace intrusive {
                     // ... other fields
                 };
 
-                struct foo_hash_functor {
+                // Hash accessor
+                struct foo_hash_accessor {
                     hash_type const& operator()( foo const& d ) const
                     {
                         return d.hash;
@@ -234,6 +290,7 @@ namespace cds { namespace intrusive {
                 explicit hash_splitter( hash_type const& h )
                     : m_ptr(reinterpret_cast<uint_type const*>( &h ))
                     , m_pos(0)
+                    , m_first( m_ptr )
 #           ifdef _DEBUG
                     , m_last( m_ptr + c_nHashSize )
 #           endif
@@ -297,11 +354,18 @@ namespace cds { namespace intrusive {
                     return nBits ? cut( nBits ) : 0;
                 }
 
+                void reset()
+                {
+                    m_ptr = m_first;
+                    m_pos = 0;
+                }
+
             private:
                 uint_type const* m_ptr;  ///< current position in the hash
                 size_t           m_pos;  ///< current position in bits
+                uint_type const* m_first; ///< first position
 #           ifdef _DEBUG
-                uint_type const* m_last;
+                uint_type const* m_last;  ///< last position
 #           endif
             };
         } // namespace details
index 6a11bbe412966b4fe807b1ec6988a3fc6ad7b29b..f0516ef0c83daef5f0a3bbcc52039d347d866fc5 100644 (file)
@@ -3,6 +3,9 @@
 #ifndef CDSLIB_INTRUSIVE_IMPL_MULTILEVEL_HASHSET_H
 #define CDSLIB_INTRUSIVE_IMPL_MULTILEVEL_HASHSET_H
 
+#include <tuple> // std::tie
+#include <functional> // std::ref
+
 #include <cds/intrusive/details/multilevel_hashset_base.h>
 #include <cds/details/allocator.h>
 
@@ -117,15 +120,20 @@ namespace cds { namespace intrusive {
 
         typedef typename gc::template guarded_ptr< value_type > guarded_ptr; ///< Guarded pointer
 
+        /// Count of hazard pointers required
+        static CDS_CONSTEXPR size_t const c_nHazardPtrCount = 1;
+
+        /// Node marked poiter
+        typedef cds::details::marked_ptr< value_type, 3 > node_ptr;
+        /// Array node element
+        typedef atomics::atomic< node_ptr > atomic_node_ptr;
+
     protected:
         //@cond
-        enum cell_flags {
-            logically_deleted = 1,  ///< the cell (data node) is logically deleted
-            array_converting = 2,   ///< the cell is converting from data node to an array node
-            array_node = 4          ///< the cell is a pointer to an array node
+        enum node_flags {
+            array_converting = 1,   ///< the cell is converting from data node to an array node
+            array_node = 2          ///< the cell is a pointer to an array node
         };
-        typedef cds::details::marked_ptr< value_type, 7 > node_ptr;
-        typedef atomics::atomic< node_ptr * >  atomic_node_ptr;
 
         typedef cds::details::Allocator< atomic_node_ptr, head_node_allocator > cxx_head_node_allocator;
         typedef cds::details::Allocator< atomic_node_ptr, array_node_allocator > cxx_array_node_allocator;
@@ -138,6 +146,7 @@ namespace cds { namespace intrusive {
             size_t  array_node_size;    // power-of-two
             size_t  array_node_size_log;// log2( array_node_size )
         };
+
         //@endcond
 
     private:
@@ -162,14 +171,14 @@ namespace cds { namespace intrusive {
         */
         MultiLevelHashSet( size_t head_bits = 8, size_t array_bits = 4 )
             : m_Metrics(make_metrics( head_bits, array_bits ))
-            , m_Head( cxx_head_node_allocator().NewArray( m_Metrics.head_node_size, nullptr ))
+            , m_Head( cxx_head_node_allocator().NewArray( head_size(), nullptr ))
         {}
 
         /// Destructs the set and frees all data
         ~MultiLevelHashSet()
         {
             destroy_tree();
-            cxx_head_node_allocator().Delete( m_Head, m_Metrics.head_node_size );
+            cxx_array_node_allocator().Delete( m_Head, head_size() );
         }
 
         /// Inserts new node
@@ -181,6 +190,7 @@ namespace cds { namespace intrusive {
         */
         bool insert( value_type& val )
         {
+            return insert( val, [](value_type&) {} );
         }
 
         /// Inserts new node
@@ -190,7 +200,7 @@ namespace cds { namespace intrusive {
             The function allows to split creating of new item into two part:
             - create item with key only
             - insert new item into the set
-            - if inserting is success, calls  \p f functor to initialize \p val.
+            - if inserting is success, calls \p f functor to initialize \p val.
 
             The functor signature is:
             \code
@@ -205,6 +215,73 @@ namespace cds { namespace intrusive {
         template <typename Func>
         bool insert( value_type& val, Func f )
         {
+            hash_type const& hash = hash_accessor()( val );
+            hash_splitter splitter( hash );
+            hash_comparator cmp;
+            gc::Guard guard;
+            back_off bkoff;
+
+            atomic_node_ptr * pArr = m_Head;
+            size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
+            assert( nSlot < m_Metrics.head_node_size );
+
+            while ( true ) {
+                node_ptr slot = pArr[nSlot].load( memory_model::memory_order_acquire );
+                if ( slot.bits() == array_node ) {
+                    // array node, go down the tree
+                    assert( slot.ptr() != nullptr );
+                    nSlot = splitter.cut( m_Metrics.array_node_size_log );
+                    assert( nSlot < m_Metrics.array_node_size );
+                    pArr = to_array( slot.ptr() );
+                }
+                else if ( slot.bits() == array_converting ) {
+                    // the slot is converting to array node right now
+                    bkoff();
+                    m_Stat.onSlotConverting();
+                }
+                else {
+                    // data node
+                    assert(slot.bits() == 0 );
+
+                    // protect data node by hazard pointer
+                    if ( guard.protect( pArr[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
+                        // slot value has been changed - retry
+                        m_Stat.onSlotChanged();
+                    }
+                    else if ( slot.ptr() ) {
+                        if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) {
+                            // the item with that hash value already exists
+                            m_Stat.onInsertFailed();
+                            return false;
+                        }
+
+                        // the slot must be expanded
+                        atomic_node_ptr * pNewArr;
+                        size_t nNewSlot;
+                        std::tie( pNewArr, nNewSlot ) = expand_slot( pArr[ nSlot ], slot, splitter );
+                        if ( pNewArr ) {
+                            pArr = pNewArr;
+                            nSlot = nNewSlot;
+                        }
+                    }
+                    else {
+                        // the slot is empty, try to insert data node
+                        node_ptr pNull;
+                        if ( pArr[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed )
+                        {
+                            // the new data node has been inserted
+                            f( val );
+                            ++m_ItemCounter;
+                            m_Stat.onInsertSuccess();
+                            return true;
+                        }
+
+                        // insert failed - slot has been changed by another thread
+                        // retry inserting
+                        m_Stat.onInsertRetry();
+                    }
+                }
+            } // while
         }
 
         /// Updates the node
@@ -225,6 +302,87 @@ namespace cds { namespace intrusive {
         template <typename Func>
         std::pair<bool, bool> update( value_type& val, bool bInsert = true )
         {
+            hash_type const& hash = hash_accessor()( val );
+            hash_splitter splitter( hash );
+            hash_comparator cmp;
+            gc::Guard guard;
+            back_off bkoff;
+
+            atomic_node_ptr * pArr = m_Head;
+            size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
+            assert( nSlot < m_Metrics.head_node_size );
+
+            while ( true ) {
+                node_ptr slot = pArr[nSlot].load( memory_model::memory_order_acquire );
+                if ( slot.bits() == array_node ) {
+                    // array node, go down the tree
+                    assert( slot.ptr() != nullptr );
+                    nSlot = splitter.cut( m_Metrics.array_node_size_log );
+                    assert( nSlot < m_Metrics.array_node_size );
+                    pArr = to_array( slot.ptr() );
+                }
+                else if ( slot.bits() == array_converting ) {
+                    // the slot is converting to array node right now
+                    bkoff();
+                    m_Stat.onSlotConverting();
+                }
+                else {
+                    // data node
+                    assert(slot.bits() == 0 );
+
+                    // protect data node by hazard pointer
+                    if ( guard.protect( pArr[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
+                        // slot value has been changed - retry
+                        m_Stat.onSlotChanged();
+                    }
+                    else if ( slot.ptr() ) {
+                        if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) {
+                            // the item with that hash value already exists
+                            // Replace it with val
+                            if ( pArr[nSlot].compare_exchange_strong( slot, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ) ) {
+                                // slot can be disposed
+                                gc::template retire<disposer>( slot.ptr() );
+                                m_Stat.onUpdateExisting();
+                                return std::make_pair( true, false );
+                            }
+
+                            m_Stat.onUpdateRetry();
+                            continue;
+                        }
+
+                        // the slot must be expanded
+                        atomic_node_ptr * pNewArr;
+                        size_t nNewSlot;
+                        std::tie( pNewArr, nNewSlot ) = expand_slot( pArr[ nSlot ], slot, splitter );
+                        if ( pNewArr ) {
+                            pArr = pNewArr;
+                            nSlot = nNewSlot;
+                        }
+                    }
+                    else {
+                        // the slot is empty, try to insert data node
+                        if ( bInsert ) {
+                            node_ptr pNull;
+                            if ( pArr[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed )
+                            {
+                                // the new data node has been inserted
+                                f( val );
+                                ++m_ItemCounter;
+                                m_Stat.onUpdateNew();
+                                return std::make_pair( true, true );
+                            }
+                        }
+                        else {
+                            m_Stat.onUpdateFailed();
+                            return std:make_pair( false, false );
+                        }
+
+                        // insert failed - slot has been changed by another thread
+                        // retry updating
+                        m_Stat.onUpdateRetry();
+                    }
+                }
+            } // while
         }
 
         /// Unlinks the item \p val from the set
@@ -236,6 +394,18 @@ namespace cds { namespace intrusive {
         */
         bool unlink( value_type& val )
         {
+            typename gc::Guard guard;
+            auto pred = [&val](value_type const& item) -> bool { return &item == &val; };
+            value_type * p = do_erase( hash, guard, std::ref( pred ));
+
+            // p is guarded by HP
+            if ( p ) {
+                gc::template retire<disposer>( p );
+                --m_ItemCounter;
+                m_Stat.onEraseSuccess();
+                return true;
+            }
+            return false;
         }
 
         /// Deletes the item from the set
@@ -249,6 +419,7 @@ namespace cds { namespace intrusive {
         */
         bool erase( hash_type const& hash )
         {
+            return erase(hash, [](value_type const&) {} );
         }
 
         /// Deletes the item from the set
@@ -261,7 +432,7 @@ namespace cds { namespace intrusive {
             The \p Func interface is
             \code
             struct functor {
-                void operator()( value_type const& item );
+                void operator()( value_type& item );
             };
             \endcode
 
@@ -270,6 +441,18 @@ namespace cds { namespace intrusive {
         template <typename Func>
         bool erase( hash_type const& hash, Func f )
         {
+            typename gc::Guard guard;
+            value_type * p = do_erase( hash, guard, []( value_type const&) -> bool {return true; } );
+
+            // p is guarded by HP
+            if ( p ) {
+                gc::template retire<disposer>( p );
+                --m_ItemCounter;
+                f( *p );
+                m_Stat.onEraseSuccess();
+                return true;
+            }
+            return false;
         }
 
         /// Extracts the item with specified \p hash
@@ -299,6 +482,17 @@ namespace cds { namespace intrusive {
         */
         guarded_ptr extract( hash_type const& hash )
         {
+            typename gc::Guard guard;
+            value_type * p = do_erase( hash, guard, []( value_type const&) -> bool {return true; } );
+
+            // p is guarded by HP
+            if ( p ) {
+                gc::template retire<disposer>( p );
+                --m_ItemCounter;
+                m_Stat.onEraseSuccess();
+                return guarded_ptr(p);
+            }
+            return guarded_ptr();
         }
 
         /// Finds an item by it's \p hash
@@ -322,6 +516,15 @@ namespace cds { namespace intrusive {
         template <typename Func>
         bool find( hash_type const& hash, Func f )
         {
+            typename gc::Guard guard;
+            value_type * p = search( hash, guard );
+
+            // p is guarded by HP
+            if ( p ) {
+                f( *p );
+                return true;
+            }
+            return false;
         }
 
         /// Checks whether the set contains \p hash
@@ -331,6 +534,7 @@ namespace cds { namespace intrusive {
         */
         bool contains( hash_type const& hash )
         {
+            return find( hash, [](value_type&) {} );
         }
 
         /// Finds an item by it's \p hash and returns the item found
@@ -358,20 +562,26 @@ namespace cds { namespace intrusive {
         */
         guarded_ptr get( hash_type const& hash )
         {
+            typename gc::Guard guard;
+            value_type * p = search( hash, guard );
+
+            // p is guarded by HP
+            if ( p )
+                return guarded_ptr( p );
+            return guarded_ptr();
         }
 
         /// Clears the set (non-atomic)
         /**
-            The function unlink all items from the set.
-            The function is not atomic. It removes all data nodes and then resets the item counter to zero.
-            If there are a thread that performs insertion while \p %clear() is working the result is undefined in general case:
-            \p empty() may return \p true but the set may contain item(s).
-            Therefore, \p %clear() may be used only for debugging purposes.
+            The function unlink all data node from the set.
+            The function is not atomic but is thread-safe.
+            After \p %clear() the set may not be empty because another threads may insert items.
 
             For each item the \p disposer is called after unlinking.
         */
         void clear()
         {
+            clear_array( m_Head, head_size() );
         }
 
         /// Checks if the set is empty
@@ -439,16 +649,218 @@ namespace cds { namespace intrusive {
             return (reinterpret_cast<uintptr_t>(p) & node_ptr::bitmask) == 0;
         }
 
+        atomic_node_ptr * alloc_array_node() const
+        {
+            return cxx_array_node_allocator().NewArray( array_node_size(), nullptr )
+        }
+
+        void free_array_node( atomic_node_ptr * parr ) const
+        {
+            cxx_array_node_allocator().Delete( parr, array_node_size() );
+        }
+
         void destroy_tree()
         {
             // The function is not thread-safe. For use in dtor only
             // Remove data node
             clear();
 
-            //TODO: free all array nodes
+            // Destroy all array nodes
+            destroy_array_nodes( m_Head, head_size());
+        }
+
+        void destroy_array_nodes( atomic_node_ptr * pArr, size_t nSize )
+        {
+            for ( atomic_node_ptr * pLast = pArr + nSize; pArr != pLast; ++pArr ) {
+                node_ptr slot = pArr->load( memory_model::memory_order_acquire );
+                if ( slot.bits() == array_node ) {
+                    destroy_array_nodes(to_array(slot.ptr()), array_node_size());
+                    free_array_node( to_array(slot.ptr()));
+                    pArr->store(node_ptr(), memory_model::memory_order_relaxed );
+                }
+            }
+        }
+
+        void clear_array( atomic_node_ptr * pArr, size_t nSize )
+        {
+            back_off bkoff;
+
+            for ( atomic_node_ptr * pLast = pArr + nSize; pArr != pLast; ++pArr ) {
+                while ( true ) {
+                    node_ptr slot = pArr->load( memory_model::memory_order_acquire );
+                    if ( slot.bits() == array_node ) {
+                        // array node, go down the tree
+                        assert( slot.ptr() != nullptr );
+                        clear_array( to_array( slot.ptr()), array_node_size() );
+                        break;
+                    }
+                    else if ( slot.bits() == array_converting ) {
+                        // the slot is converting to array node right now
+                        while ( (slot = pArr->load(memory_model::memory_order_acquire)).bits() == array_converting ) {
+                            bkoff();
+                            m_Stat.onSlotConverting();
+                        }
+                        bkoff.reset();
+
+                        assert( slot.ptr() != nullptr );
+                        assert(slot.bits() == array_node );
+                        clear_array( to_array( slot.ptr()), array_node_size() );
+                        break;
+                    }
+                    else {
+                        // data node
+                        if ( pArr->compare_exchange_strong( slot, node_ptr(), memory_model::memory_order_acquire, atomics::memory_order_relaxed ) ) {
+                            gc::template retire<disposer>( p );
+                            --m_ItemCounter;
+                            m_Stat.onEraseSuccess();
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        union converter {
+            value_type * pData;
+            atomic_node_ptr * pArr;
+
+            converter( value_type * p )
+                : pData( p )
+            {}
+
+            converter( atomic_node_ptr * p )
+                : pArr( p )
+            {}
+        };
+
+        static atomic_node_ptr * to_array( value_type * p )
+        {
+            return converter( p ).pArr;
+        }
+        static node_ptr * to_node( atomic_node_ptr * p )
+        {
+            return converter( p ).pData;
         }
+
+        std::pair< atomic_node_ptr *, size_t > expand_slot( atomic_node_ptr& slot, node_ptr current, hash_splitter& splitter )
+        {
+            node_ptr cur(current.ptr());
+            if ( !slot.compare_exchange_strong( cur, cur | array_converting, memory_model::memory_order_release, atomics::memory_order_relaxed )) {
+                m_Stat.onExpandNodeFailed();
+                return std::make_pair(static_cast<atomic_node_ptr *>(nullptr), size_t(0));
+            }
+
+            atomic_node_ptr * pArr = alloc_array_node();
+            size_t idx = splitter.cut( m_Metrics.array_node_size_log );
+            pArr[idx].store( current, memory_model::memory_order_release );
+
+            cur = cur | array_converting;
+            CDS_VERIFY( slot.compare_exchange_strong( cur, node_ptr( to_node( pArr ), array_node ), memory_model::memory_order_release, atomics::memory_order_relaxed )));
+
+            return std::make_pair( pArr, idx );
+        }
+
+        value_type * search( hash_type const& hash, typename gc::Guard& guard )
+        {
+            hash_splitter splitter( hash );
+            hash_comparator cmp;
+            back_off bkoff;
+
+            atomic_node_ptr * pArr = m_Head;
+            size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
+            assert( nSlot < m_Metrics.head_node_size );
+
+            while ( true ) {
+                node_ptr slot = pArr[nSlot].load( memory_model::memory_order_acquire );
+                if ( slot.bits() == array_node ) {
+                    // array node, go down the tree
+                    assert( slot.ptr() != nullptr );
+                    nSlot = splitter.cut( m_Metrics.array_node_size_log );
+                    assert( nSlot < m_Metrics.array_node_size );
+                    pArr = to_array( slot.ptr() );
+                }
+                else if ( slot.bits() == array_converting ) {
+                    // the slot is converting to array node right now
+                    bkoff();
+                    m_Stat.onSlotConverting();
+                }
+                else {
+                    // data node
+                    assert(slot.bits() == 0 );
+
+                    // protect data node by hazard pointer
+                    if ( guard.protect( pArr[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
+                        // slot value has been changed - retry
+                        m_Stat.onSlotChanged();
+                    }
+                    else if ( slot.ptr() && cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) {
+                        // item found
+                        m_Stat.onFindSucces();
+                        return slot.ptr();
+                    }
+                    m_Stat.onFindFailed();
+                    return nullptr;
+                }
+            } // while
+        }
+
+        template <typename Predicate>
+        value_type * do_erase( hash_type const& hash, typename gc::Guard& guard, Predicate pred )
+        {
+            hash_splitter splitter( hash );
+            hash_comparator cmp;
+            back_off bkoff;
+
+            atomic_node_ptr * pArr = m_Head;
+            size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
+            assert( nSlot < m_Metrics.head_node_size );
+
+            while ( true ) {
+                node_ptr slot = pArr[nSlot].load( memory_model::memory_order_acquire );
+                if ( slot.bits() == array_node ) {
+                    // array node, go down the tree
+                    assert( slot.ptr() != nullptr );
+                    nSlot = splitter.cut( m_Metrics.array_node_size_log );
+                    assert( nSlot < m_Metrics.array_node_size );
+                    pArr = to_array( slot.ptr() );
+                }
+                else if ( slot.bits() == array_converting ) {
+                    // the slot is converting to array node right now
+                    bkoff();
+                    m_Stat.onSlotConverting();
+                }
+                else {
+                    // data node
+                    assert(slot.bits() == 0 );
+
+                    // protect data node by hazard pointer
+                    if ( guard.protect( pArr[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
+                        // slot value has been changed - retry
+                        m_Stat.onSlotChanged();
+                    }
+                    else if ( slot.ptr() ) {
+                        if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 && pred( *slot.ptr() )) {
+                            // item found - replace it with nullptr
+                            if ( pArr[nSlot].compare_exchange_strong(slot, node_ptr(nullptr), memory_model::memory_order_acquire, atomics::memory_order_relaxed))
+                                return slot.ptr();
+                            m_Stat.onEraseRetry();
+                            continue;
+                        }
+                        m_Stat.onEraseFailed();
+                        return nullptr;
+                    }
+                    else {
+                        // the slot is empty
+                        m_Stat.onEraseFailed();
+                        return nullptr;
+                    }
+                }
+            } // while
+        }
+
         //@endcond
     };
 }} // namespace cds::intrusive
 
 #endif // #ifndef CDSLIB_INTRUSIVE_IMPL_MULTILEVEL_HASHSET_H
+1
\ No newline at end of file