Merge branch 'flat_combinig_add_stress_and_unint_tests' of https://github.com/mgalimu...
[libcds.git] / cds / intrusive / feldman_hashset_rcu.h
index 6eea7ed4934c87451c214cfaa54ad9fb2aac8089..446582fccaa07de7549fbd9876275eb935788444 100644 (file)
@@ -1,4 +1,32 @@
-//$$CDS-header$$
+/*
+    This file is a part of libcds - Concurrent Data Structures library
+
+    (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
+
+    Source code repo: http://github.com/khizmax/libcds/
+    Download: http://sourceforge.net/projects/libcds/files/
+
+    Redistribution and use in source and binary forms, with or without
+    modification, are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright notice, this
+      list of conditions and the following disclaimer.
+
+    * Redistributions in binary form must reproduce the above copyright notice,
+      this list of conditions and the following disclaimer in the documentation
+      and/or other materials provided with the distribution.
+
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+    AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+    IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+    FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+    SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+    OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
 
 #ifndef CDSLIB_INTRUSIVE_FELDMAN_HASHSET_RCU_H
 #define CDSLIB_INTRUSIVE_FELDMAN_HASHSET_RCU_H
@@ -45,7 +73,8 @@ namespace cds { namespace intrusive {
         @note Before including <tt><cds/intrusive/feldman_hashset_rcu.h></tt> you should include appropriate RCU header file,
         see \ref cds_urcu_gc "RCU type" for list of existing RCU class and corresponding header files.
 
-        The set supports @ref cds_intrusive_FeldmanHashSet_rcu_iterators "bidirectional thread-safe iterators" 
+        The set supports @ref cds_intrusive_FeldmanHashSet_rcu_iterators "bidirectional thread-safe iterators"
+
         with some restrictions.
     */
     template <
@@ -57,36 +86,21 @@ namespace cds { namespace intrusive {
         class Traits
 #endif
     >
-    class FeldmanHashSet< cds::urcu::gc< RCU >, T, Traits >
+    class FeldmanHashSet< cds::urcu::gc< RCU >, T, Traits >: protected feldman_hashset::multilevel_array<T, Traits>
     {
+        //@cond
+        typedef feldman_hashset::multilevel_array<T, Traits> base_class;
+        //@endcond
+
     public:
         typedef cds::urcu::gc< RCU > gc; ///< RCU garbage collector
         typedef T       value_type;      ///< type of value stored in the set
         typedef Traits  traits;          ///< Traits template parameter
 
         typedef typename traits::hash_accessor hash_accessor; ///< Hash accessor functor
-        static_assert(!std::is_same< hash_accessor, cds::opt::none >::value, "hash_accessor functor must be specified in Traits");
-
-        /// Hash type deduced from \p hash_accessor return type
-        typedef typename std::decay<
-            typename std::remove_reference<
-            decltype(hash_accessor()(std::declval<T>()))
-            >::type
-        >::type hash_type;
-        //typedef typename std::result_of< hash_accessor( std::declval<T>()) >::type hash_type;
-        static_assert(!std::is_pointer<hash_type>::value, "hash_accessor should return a reference to hash value");
-
-        typedef typename traits::disposer disposer; ///< data node disposer
-
-#   ifdef CDS_DOXYGEN_INVOKED
-        typedef implementation_defined hash_comparator;    ///< hash compare functor based on opt::compare and opt::less option setter
-#   else
-        typedef typename cds::opt::details::make_comparator_from<
-            hash_type,
-            traits,
-            feldman_hashset::bitwise_compare< hash_type >
-        >::type hash_comparator;
-#   endif
+        typedef typename base_class::hash_type hash_type;     ///< Hash type deduced from \p hash_accessor return type
+        typedef typename traits::disposer disposer;           ///< data node disposer
+        typedef typename base_class::hash_comparator hash_comparator; ///< hash compare functor based on \p traits::compare and \p traits::less options
 
         typedef typename traits::item_counter   item_counter;   ///< Item counter type
         typedef typename traits::node_allocator node_allocator; ///< Array node allocator
@@ -94,50 +108,37 @@ namespace cds { namespace intrusive {
         typedef typename traits::back_off       back_off;       ///< Backoff strategy
         typedef typename traits::stat           stat;           ///< Internal statistics type
         typedef typename traits::rcu_check_deadlock rcu_check_deadlock; ///< Deadlock checking policy
-        typedef typename gc::scoped_lock       rcu_lock;        ///< RCU scoped lock
+        typedef typename gc::scoped_lock        rcu_lock;       ///< RCU scoped lock
         static CDS_CONSTEXPR const bool c_bExtractLockExternal = false; ///< Group of \p extract_xxx functions does not require external locking
 
         using exempt_ptr = cds::urcu::exempt_ptr< gc, value_type, value_type, disposer, void >; ///< pointer to extracted node
 
-        /// Node marked poiter
-        typedef cds::details::marked_ptr< value_type, 3 > node_ptr;
-        /// Array node element
-        typedef atomics::atomic< node_ptr > atomic_node_ptr;
+        /// The size of hash_type in bytes, see \p feldman_hashset::traits::hash_size for explanation
+        static CDS_CONSTEXPR size_t const c_hash_size = base_class::c_hash_size;
 
-    protected:
         //@cond
-        enum node_flags {
-            flag_array_converting = 1,   ///< the cell is converting from data node to an array node
-            flag_array_node = 2          ///< the cell is a pointer to an array node
-        };
-
-        struct array_node {
-            array_node * const  pParent;    ///< parent array node
-            size_t const        idxParent;  ///< index in parent array node
-            atomic_node_ptr     nodes[1];   ///< node array
+        typedef feldman_hashset::level_statistics level_statistics;
+        //@endcond
 
-            array_node(array_node * parent, size_t idx)
-                : pParent(parent)
-                , idxParent(idx)
-            {}
+    protected:
+        //@cond
+        typedef typename base_class::node_ptr node_ptr;
+        typedef typename base_class::atomic_node_ptr atomic_node_ptr;
+        typedef typename base_class::array_node array_node;
+        typedef typename base_class::traverse_data traverse_data;
 
-            array_node() = delete;
-            array_node(array_node const&) = delete;
-            array_node(array_node&&) = delete;
-        };
+        using base_class::to_array;
+        using base_class::to_node;
+        using base_class::stats;
+        using base_class::head;
+        using base_class::metrics;
 
-        typedef cds::details::Allocator< array_node, node_allocator > cxx_array_node_allocator;
-        typedef feldman_hashset::details::hash_splitter< hash_type > hash_splitter;
         typedef cds::urcu::details::check_deadlock_policy< gc, rcu_check_deadlock> check_deadlock_policy;
-
         //@endcond
 
     private:
         //@cond
-        feldman_hashset::details::metrics const m_Metrics;     ///< Metrics
-        array_node *      m_Head;        ///< Head array
         item_counter      m_ItemCounter; ///< Item counter
-        stat              m_Stat;        ///< Internal statistics
         //@endcond
 
     public:
@@ -153,15 +154,13 @@ namespace cds { namespace intrusive {
             where \p N is multi-level array depth.
         */
         FeldmanHashSet(size_t head_bits = 8, size_t array_bits = 4)
-            : m_Metrics(feldman_hashset::details::metrics::make(head_bits, array_bits, sizeof(hash_type)))
-            , m_Head(alloc_head_node())
+            : base_class(head_bits, array_bits)
         {}
 
         /// Destructs the set and frees all data
         ~FeldmanHashSet()
         {
-            destroy_tree();
-            free_array_node(m_Head);
+            clear();
         }
 
         /// Inserts new node
@@ -202,70 +201,47 @@ namespace cds { namespace intrusive {
         bool insert( value_type& val, Func f )
         {
             hash_type const& hash = hash_accessor()( val );
-            hash_splitter splitter( hash );
+            traverse_data pos( hash, *this );
             hash_comparator cmp;
-            back_off bkoff;
 
-            size_t nOffset = m_Metrics.head_node_size_log;
-            array_node * pArr = m_Head;
-            size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
-            assert( nSlot < m_Metrics.head_node_size );
-            size_t nHeight = 1;
+            while (true) {
+                rcu_lock rcuLock;
 
-            while ( true ) {
-                node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
-                if ( slot.bits() == flag_array_node ) {
-                    // array node, go down the tree
-                    assert( slot.ptr() != nullptr );
-                    nSlot = splitter.cut( m_Metrics.array_node_size_log );
-                    assert( nSlot < m_Metrics.array_node_size );
-                    pArr = to_array( slot.ptr() );
-                    nOffset += m_Metrics.array_node_size_log;
-                    ++nHeight;
-                }
-                else if ( slot.bits() == flag_array_converting ) {
-                    // the slot is converting to array node right now
-                    bkoff();
-                    m_Stat.onSlotConverting();
-                }
-                else {
-                    // data node
-                    assert(slot.bits() == 0 );
-
-                    rcu_lock rcuLock;
-                    if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) == slot ) {
-                        if ( slot.ptr() ) {
-                            if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) {
-                                // the item with that hash value already exists
-                                m_Stat.onInsertFailed();
-                                return false;
-                            }
+                node_ptr slot = base_class::traverse( pos );
+                assert(slot.bits() == 0);
 
-                            // the slot must be expanded
-                            expand_slot( pArr, nSlot, slot, nOffset );
+                if ( pos.pArr->nodes[pos.nSlot].load(memory_model::memory_order_acquire) == slot) {
+                    if (slot.ptr()) {
+                        if ( cmp( hash, hash_accessor()(*slot.ptr())) == 0 ) {
+                            // the item with that hash value already exists
+                            stats().onInsertFailed();
+                            return false;
                         }
-                        else {
-                            // the slot is empty, try to insert data node
-                            node_ptr pNull;
-                            if ( pArr->nodes[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
-                            {
-                                // the new data node has been inserted
-                                f( val );
-                                ++m_ItemCounter;
-                                m_Stat.onInsertSuccess();
-                                m_Stat.height( nHeight );
-                                return true;
-                            }
 
-                            // insert failed - slot has been changed by another thread
-                            // retry inserting
-                            m_Stat.onInsertRetry();
+                        // the slot must be expanded
+                        base_class::expand_slot( pos, slot );
+                    }
+                    else {
+                        // the slot is empty, try to insert data node
+                        node_ptr pNull;
+                        if ( pos.pArr->nodes[pos.nSlot].compare_exchange_strong(pNull, node_ptr(&val), memory_model::memory_order_release, atomics::memory_order_relaxed))
+                        {
+                            // the new data node has been inserted
+                            f(val);
+                            ++m_ItemCounter;
+                            stats().onInsertSuccess();
+                            stats().height( pos.nHeight );
+                            return true;
                         }
+
+                        // insert failed - slot has been changed by another thread
+                        // retry inserting
+                        stats().onInsertRetry();
                     }
-                    else
-                        m_Stat.onSlotChanged();
                 }
-            } // while
+                else
+                    stats().onSlotChanged();
+            }
         }
 
         /// Updates the node
@@ -279,7 +255,7 @@ namespace cds { namespace intrusive {
             - If hash value is not found and \p bInsert is \p false then the set is unchanged,
               the function returns <tt> std::pair<false, false> </tt>
 
-            Returns <tt> std::pair<bool, bool> </tt> where \p first is \p true if operation is successfull
+            Returns <tt> std::pair<bool, bool> </tt> where \p first is \p true if operation is successful
             (i.e. the item has been inserted or updated),
             \p second is \p true if new item has been added or \p false if the set contains that hash.
 
@@ -495,7 +471,7 @@ namespace cds { namespace intrusive {
         */
         void clear()
         {
-            clear_array( m_Head, head_size() );
+            clear_array( head(), head_size());
         }
 
         /// Checks if the set is empty
@@ -517,28 +493,27 @@ namespace cds { namespace intrusive {
         /// Returns const reference to internal statistics
         stat const& statistics() const
         {
-            return m_Stat;
+            return stats();
         }
 
         /// Returns the size of head node
-        size_t head_size() const
-        {
-            return m_Metrics.head_node_size;
-        }
+        using base_class::head_size;
 
         /// Returns the size of the array node
-        size_t array_node_size() const
-        {
-            return m_Metrics.array_node_size;
-        }
+        using base_class::array_node_size;
 
         /// Collects tree level statistics into \p stat
-        /** @copydetails cds::intrusive::FeldmanHashSet::get_level_statistics
+        /**
+            The function traverses the set and collects statistics for each level of the tree
+            into \p feldman_hashset::level_statistics struct. The element of \p stat[i]
+            represents statistics for level \p i, level 0 is head array.
+            The function is thread-safe and may be called in multi-threaded environment.
+
+            Result can be useful for estimating efficiency of hash functor you use.
         */
         void get_level_statistics(std::vector<feldman_hashset::level_statistics>& stat) const
         {
-            stat.clear();
-            gather_level_statistics(stat, 0, m_Head, head_size());
+            base_class::get_level_statistics(stat);
         }
 
     protected:
@@ -618,13 +593,11 @@ namespace cds { namespace intrusive {
 
             value_type * pointer() const CDS_NOEXCEPT
             {
-                assert(gc::is_locked());
                 return m_pValue;
             }
 
             void forward()
             {
-                assert( gc::is_locked());
                 assert(m_set != nullptr);
                 assert(m_pNode != nullptr);
 
@@ -637,14 +610,14 @@ namespace cds { namespace intrusive {
                 for (;;) {
                     if (idx < nodeSize) {
                         node_ptr slot = pNode->nodes[idx].load(memory_model::memory_order_acquire);
-                        if (slot.bits() == flag_array_node) {
+                        if (slot.bits() == base_class::flag_array_node ) {
                             // array node, go down the tree
                             assert(slot.ptr() != nullptr);
                             pNode = to_array(slot.ptr());
                             idx = 0;
                             nodeSize = arrayNodeSize;
                         }
-                        else if (slot.bits() == flag_array_converting) {
+                        else if (slot.bits() == base_class::flag_array_converting ) {
                             // the slot is converting to array node right now - skip the node
                             ++idx;
                         }
@@ -668,7 +641,7 @@ namespace cds { namespace intrusive {
                         }
                         else {
                             // end()
-                            assert(pNode == m_set->m_Head);
+                            assert(pNode == m_set->head());
                             assert(idx == headSize);
                             m_pNode = pNode;
                             m_idx = idx;
@@ -681,7 +654,6 @@ namespace cds { namespace intrusive {
 
             void backward()
             {
-                assert(gc::is_locked());
                 assert(m_set != nullptr);
                 assert(m_pNode != nullptr);
 
@@ -696,14 +668,14 @@ namespace cds { namespace intrusive {
                 for (;;) {
                     if (idx != endIdx) {
                         node_ptr slot = pNode->nodes[idx].load(memory_model::memory_order_acquire);
-                        if (slot.bits() == flag_array_node) {
+                        if (slot.bits() == base_class::flag_array_node ) {
                             // array node, go down the tree
                             assert(slot.ptr() != nullptr);
                             pNode = to_array(slot.ptr());
                             nodeSize = arrayNodeSize;
                             idx = nodeSize - 1;
                         }
-                        else if (slot.bits() == flag_array_converting) {
+                        else if (slot.bits() == base_class::flag_array_converting ) {
                             // the slot is converting to array node right now - skip the node
                             --idx;
                         }
@@ -727,7 +699,7 @@ namespace cds { namespace intrusive {
                         }
                         else {
                             // rend()
-                            assert(pNode == m_set->m_Head);
+                            assert(pNode == m_set->head());
                             assert(idx == endIdx);
                             m_pNode = pNode;
                             m_idx = idx;
@@ -742,25 +714,25 @@ namespace cds { namespace intrusive {
         template <class Iterator>
         Iterator init_begin() const
         {
-            return Iterator(*this, m_Head, size_t(0) - 1);
+            return Iterator(*this, head(), size_t(0) - 1);
         }
 
         template <class Iterator>
         Iterator init_end() const
         {
-            return Iterator(*this, m_Head, head_size(), false);
+            return Iterator(*this, head(), head_size(), false);
         }
 
         template <class Iterator>
         Iterator init_rbegin() const
         {
-            return Iterator(*this, m_Head, head_size());
+            return Iterator(*this, head(), head_size());
         }
 
         template <class Iterator>
         Iterator init_rend() const
         {
-            return Iterator(*this, m_Head, size_t(0) - 1, false);
+            return Iterator(*this, head(), size_t(0) - 1, false);
         }
 
         /// Bidirectional iterator class
@@ -927,9 +899,10 @@ namespace cds { namespace intrusive {
         /** @anchor cds_intrusive_FeldmanHashSet_rcu_iterators
             The set supports thread-safe iterators: you may iterate over the set in multi-threaded environment
             under explicit RCU lock.
-            RCU lock requirement means that inserting or searching is allowed but you must not erase the items from the set
-            since erasing under RCU lock can lead to a deadlock. However, another thread can call \p erase() safely
-            while your thread is iterating.
+
+            RCU lock requirement means that inserting or searching is allowed for iterating thread
+            but you must not erase the items from the set because erasing under RCU lock can lead
+            to a deadlock. However, another thread can call \p erase() safely while your thread is iterating.
 
             A typical example is:
             \code
@@ -966,7 +939,7 @@ namespace cds { namespace intrusive {
                     i->payload++;
                 }
             } // at this point RCU lock is released
-            /endcode
+            \endcode
 
             Each iterator object supports the common interface:
             - dereference operators:
@@ -988,55 +961,55 @@ namespace cds { namespace intrusive {
         /// Returns an iterator to the beginning of the set
         iterator begin()
         {
-            return iterator(*this, m_Head, size_t(0) - 1);
+            return iterator(*this, head(), size_t(0) - 1);
         }
 
         /// Returns an const iterator to the beginning of the set
         const_iterator begin() const
         {
-            return const_iterator(*this, m_Head, size_t(0) - 1);
+            return const_iterator(*this, head(), size_t(0) - 1);
         }
 
         /// Returns an const iterator to the beginning of the set
         const_iterator cbegin()
         {
-            return const_iterator(*this, m_Head, size_t(0) - 1);
+            return const_iterator(*this, head(), size_t(0) - 1);
         }
 
         /// Returns an iterator to the element following the last element of the set. This element acts as a placeholder; attempting to access it results in undefined behavior.
         iterator end()
         {
-            return iterator(*this, m_Head, head_size(), false);
+            return iterator(*this, head(), head_size(), false);
         }
 
         /// Returns a const iterator to the element following the last element of the set. This element acts as a placeholder; attempting to access it results in undefined behavior.
         const_iterator end() const
         {
-            return const_iterator(*this, m_Head, head_size(), false);
+            return const_iterator(*this, head(), head_size(), false);
         }
 
         /// Returns a const iterator to the element following the last element of the set. This element acts as a placeholder; attempting to access it results in undefined behavior.
         const_iterator cend()
         {
-            return const_iterator(*this, m_Head, head_size(), false);
+            return const_iterator(*this, head(), head_size(), false);
         }
 
         /// Returns a reverse iterator to the first element of the reversed set
         reverse_iterator rbegin()
         {
-            return reverse_iterator(*this, m_Head, head_size());
+            return reverse_iterator(*this, head(), head_size());
         }
 
         /// Returns a const reverse iterator to the first element of the reversed set
         const_reverse_iterator rbegin() const
         {
-            return const_reverse_iterator(*this, m_Head, head_size());
+            return const_reverse_iterator(*this, head(), head_size());
         }
 
         /// Returns a const reverse iterator to the first element of the reversed set
         const_reverse_iterator crbegin()
         {
-            return const_reverse_iterator(*this, m_Head, head_size());
+            return const_reverse_iterator(*this, head(), head_size());
         }
 
         /// Returns a reverse iterator to the element following the last element of the reversed set
@@ -1046,7 +1019,7 @@ namespace cds { namespace intrusive {
         */
         reverse_iterator rend()
         {
-            return reverse_iterator(*this, m_Head, size_t(0) - 1, false);
+            return reverse_iterator(*this, head(), size_t(0) - 1, false);
         }
 
         /// Returns a const reverse iterator to the element following the last element of the reversed set
@@ -1056,7 +1029,7 @@ namespace cds { namespace intrusive {
         */
         const_reverse_iterator rend() const
         {
-            return const_reverse_iterator(*this, m_Head, size_t(0) - 1, false);
+            return const_reverse_iterator(*this, head(), size_t(0) - 1, false);
         }
 
         /// Returns a const reverse iterator to the element following the last element of the reversed set
@@ -1066,7 +1039,7 @@ namespace cds { namespace intrusive {
         */
         const_reverse_iterator crend()
         {
-            return const_reverse_iterator(*this, m_Head, size_t(0) - 1, false);
+            return const_reverse_iterator(*this, head(), size_t(0) - 1, false);
         }
         ///@}
 
@@ -1076,279 +1049,151 @@ namespace cds { namespace intrusive {
         std::pair<bool, bool> do_update(value_type& val, Func f, bool bInsert = true)
         {
             hash_type const& hash = hash_accessor()(val);
-            hash_splitter splitter(hash);
+            traverse_data pos( hash, *this );
             hash_comparator cmp;
-            back_off bkoff;
+            value_type * pOld;
 
-            array_node * pArr = m_Head;
-            size_t nSlot = splitter.cut(m_Metrics.head_node_size_log);
-            assert(nSlot < m_Metrics.head_node_size);
-            size_t nOffset = m_Metrics.head_node_size_log;
-            size_t nHeight = 1;
+            while ( true ) {
+                rcu_lock rcuLock;
 
-            while (true) {
-                node_ptr slot = pArr->nodes[nSlot].load(memory_model::memory_order_acquire);
-                if (slot.bits() == flag_array_node) {
-                    // array node, go down the tree
-                    assert(slot.ptr() != nullptr);
-                    nSlot = splitter.cut(m_Metrics.array_node_size_log);
-                    assert(nSlot < m_Metrics.array_node_size);
-                    pArr = to_array(slot.ptr());
-                    nOffset += m_Metrics.array_node_size_log;
-                    ++nHeight;
-                }
-                else if (slot.bits() == flag_array_converting) {
-                    // the slot is converting to array node right now
-                    bkoff();
-                    m_Stat.onSlotConverting();
-                }
-                else {
-                    // data node
-                    assert(slot.bits() == 0);
+                node_ptr slot = base_class::traverse( pos );
+                assert(slot.bits() == 0);
+
+                pOld = nullptr;
+                if ( pos.pArr->nodes[pos.nSlot].load(memory_model::memory_order_acquire) == slot) {
+                    if ( slot.ptr()) {
+                        if ( cmp( hash, hash_accessor()(*slot.ptr())) == 0 ) {
+                            // the item with that hash value already exists
+                            // Replace it with val
+                            if ( slot.ptr() == &val ) {
+                                stats().onUpdateExisting();
+                                return std::make_pair(true, false);
+                            }
 
-                    value_type * pOld = nullptr;
-                    {
-                        rcu_lock rcuLock;
-
-                        if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) == slot ) {
-                            if ( slot.ptr()) {
-                                if (cmp(hash, hash_accessor()(*slot.ptr())) == 0) {
-                                    // the item with that hash value already exists
-                                    // Replace it with val
-                                    if (slot.ptr() == &val) {
-                                        m_Stat.onUpdateExisting();
-                                        return std::make_pair(true, false);
-                                    }
-
-                                    if (pArr->nodes[nSlot].compare_exchange_strong(slot, node_ptr(&val), memory_model::memory_order_release, atomics::memory_order_relaxed)) {
-                                        // slot can be disposed
-                                        f(val, slot.ptr());
-                                        pOld = slot.ptr();
-                                        m_Stat.onUpdateExisting();
-                                        goto update_existing_done;
-                                    }
-
-                                    m_Stat.onUpdateRetry();
-                                    continue;
-                                }
+                            if ( pos.pArr->nodes[pos.nSlot].compare_exchange_strong(slot, node_ptr(&val), memory_model::memory_order_release, atomics::memory_order_relaxed)) {
+                                // slot can be disposed
+                                f( val, slot.ptr());
+                                pOld = slot.ptr();
+                                stats().onUpdateExisting();
+                                goto update_existing_done;
+                            }
 
+                            stats().onUpdateRetry();
+                        }
+                        else {
+                            if ( bInsert ) {
                                 // the slot must be expanded
-                                expand_slot(pArr, nSlot, slot, nOffset);
+                                base_class::expand_slot( pos, slot );
                             }
                             else {
-                                // the slot is empty, try to insert data node
-                                if (bInsert) {
-                                    node_ptr pNull;
-                                    if (pArr->nodes[nSlot].compare_exchange_strong(pNull, node_ptr(&val), memory_model::memory_order_release, atomics::memory_order_relaxed))
-                                    {
-                                        // the new data node has been inserted
-                                        f(val, nullptr);
-                                        ++m_ItemCounter;
-                                        m_Stat.onUpdateNew();
-                                        m_Stat.height(nHeight);
-                                        return std::make_pair(true, true);
-                                    }
-                                }
-                                else {
-                                    m_Stat.onUpdateFailed();
-                                    return std::make_pair(false, false);
-                                }
-
-                                // insert failed - slot has been changed by another thread
-                                // retry updating
-                                m_Stat.onUpdateRetry();
+                                stats().onUpdateFailed();
+                                return std::make_pair(false, false);
+                            }
+                        }
+                    }
+                    else {
+                        // the slot is empty, try to insert data node
+                        if (bInsert) {
+                            node_ptr pNull;
+                            if ( pos.pArr->nodes[pos.nSlot].compare_exchange_strong(pNull, node_ptr(&val), memory_model::memory_order_release, atomics::memory_order_relaxed))
+                            {
+                                // the new data node has been inserted
+                                f(val, nullptr);
+                                ++m_ItemCounter;
+                                stats().onUpdateNew();
+                                stats().height( pos.nHeight );
+                                return std::make_pair(true, true);
                             }
                         }
-                        else
-                            m_Stat.onSlotChanged();
-                        continue;
-                    } // rcu_lock
+                        else {
+                            stats().onUpdateFailed();
+                            return std::make_pair(false, false);
+                        }
 
-                    // update success
-        update_existing_done:
-                    if ( pOld )
-                        gc::template retire_ptr<disposer>( pOld );
-                    return std::make_pair(true, false);
-                } 
+                        // insert failed - slot has been changed by another thread
+                        // retry updating
+                        stats().onUpdateRetry();
+                    }
+                }
+                else
+                    stats().onSlotChanged();
             } // while
+
+            // update success
+            // retire_ptr must be called only outside of RCU lock
+        update_existing_done:
+            if (pOld)
+                gc::template retire_ptr<disposer>(pOld);
+            return std::make_pair(true, false);
         }
 
         template <typename Predicate>
         value_type * do_erase( hash_type const& hash, Predicate pred)
         {
             assert(gc::is_locked());
-
-            hash_splitter splitter(hash);
+            traverse_data pos( hash, *this );
             hash_comparator cmp;
-            back_off bkoff;
 
-            array_node * pArr = m_Head;
-            size_t nSlot = splitter.cut(m_Metrics.head_node_size_log);
-            assert(nSlot < m_Metrics.head_node_size);
+            while ( true ) {
+                node_ptr slot = base_class::traverse( pos );
+                assert( slot.bits() == 0 );
+
+                if ( pos.pArr->nodes[pos.nSlot].load( memory_model::memory_order_acquire ) == slot ) {
+                    if ( slot.ptr()) {
+                        if ( cmp( hash, hash_accessor()(*slot.ptr())) == 0 && pred( *slot.ptr())) {
+                            // item found - replace it with nullptr
+                            if ( pos.pArr->nodes[pos.nSlot].compare_exchange_strong( slot, node_ptr( nullptr ), memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
+                                --m_ItemCounter;
+                                stats().onEraseSuccess();
 
-            while (true) {
-                node_ptr slot = pArr->nodes[nSlot].load(memory_model::memory_order_acquire);
-                if (slot.bits() == flag_array_node) {
-                    // array node, go down the tree
-                    assert(slot.ptr() != nullptr);
-                    nSlot = splitter.cut(m_Metrics.array_node_size_log);
-                    assert(nSlot < m_Metrics.array_node_size);
-                    pArr = to_array(slot.ptr());
-                }
-                else if (slot.bits() == flag_array_converting) {
-                    // the slot is converting to array node right now
-                    bkoff();
-                    m_Stat.onSlotConverting();
-                }
-                else {
-                    // data node
-                    assert(slot.bits() == 0);
-
-                    if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) == slot ) {
-                        if (slot.ptr()) {
-                            if (cmp(hash, hash_accessor()(*slot.ptr())) == 0 && pred(*slot.ptr())) {
-                                // item found - replace it with nullptr
-                                if (pArr->nodes[nSlot].compare_exchange_strong(slot, node_ptr(nullptr), memory_model::memory_order_acquire, atomics::memory_order_relaxed)) {
-                                    --m_ItemCounter;
-                                    m_Stat.onEraseSuccess();
-
-                                    return slot.ptr();
-                                }
-                                m_Stat.onEraseRetry();
-                                continue;
+                                return slot.ptr();
                             }
-                            m_Stat.onEraseFailed();
-                            return nullptr;
-                        }
-                        else {
-                            // the slot is empty
-                            m_Stat.onEraseFailed();
-                            return nullptr;
+                            stats().onEraseRetry();
+                            continue;
                         }
+                        stats().onEraseFailed();
+                        return nullptr;
+                    }
+                    else {
+                        // the slot is empty
+                        stats().onEraseFailed();
+                        return nullptr;
                     }
-                    else
-                        m_Stat.onSlotChanged();
                 }
-            } // while
+                else
+                    stats().onSlotChanged();
+            }
         }
 
         value_type * search(hash_type const& hash )
         {
-            assert( gc::is_locked() );
-
-            hash_splitter splitter(hash);
+            assert( gc::is_locked());
+            traverse_data pos( hash, *this );
             hash_comparator cmp;
-            back_off bkoff;
 
-            array_node * pArr = m_Head;
-            size_t nSlot = splitter.cut(m_Metrics.head_node_size_log);
-            assert(nSlot < m_Metrics.head_node_size);
+            while ( true ) {
+                node_ptr slot = base_class::traverse( pos );
+                assert( slot.bits() == 0 );
 
-            while (true) {
-                node_ptr slot = pArr->nodes[nSlot].load(memory_model::memory_order_acquire);
-                if (slot.bits() == flag_array_node) {
-                    // array node, go down the tree
-                    assert(slot.ptr() != nullptr);
-                    nSlot = splitter.cut(m_Metrics.array_node_size_log);
-                    assert(nSlot < m_Metrics.array_node_size);
-                    pArr = to_array(slot.ptr());
+                if ( pos.pArr->nodes[pos.nSlot].load( memory_model::memory_order_acquire ) != slot ) {
+                    // slot value has been changed - retry
+                    stats().onSlotChanged();
+                    continue;
                 }
-                else if (slot.bits() == flag_array_converting) {
-                    // the slot is converting to array node right now
-                    bkoff();
-                    m_Stat.onSlotConverting();
+                else if ( slot.ptr() && cmp( hash, hash_accessor()(*slot.ptr())) == 0 ) {
+                    // item found
+                    stats().onFindSuccess();
+                    return slot.ptr();
                 }
-                else {
-                    // data node
-                    assert(slot.bits() == 0);
-
-                    // protect data node by hazard pointer
-                    if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) != slot) {
-                        // slot value has been changed - retry
-                        m_Stat.onSlotChanged();
-                    }
-                    else if (slot.ptr() && cmp(hash, hash_accessor()(*slot.ptr())) == 0) {
-                        // item found
-                        m_Stat.onFindSuccess();
-                        return slot.ptr();
-                    }
-                    m_Stat.onFindFailed();
-                    return nullptr;
-                }
-            } // while
+                stats().onFindFailed();
+                return nullptr;
+            }
         }
 
         //@endcond
 
     private:
         //@cond
-        array_node * alloc_head_node() const
-        {
-            return alloc_array_node(head_size(), nullptr, 0);
-        }
-
-        array_node * alloc_array_node(array_node * pParent, size_t idxParent) const
-        {
-            return alloc_array_node(array_node_size(), pParent, idxParent);
-        }
-
-        static array_node * alloc_array_node(size_t nSize, array_node * pParent, size_t idxParent)
-        {
-            array_node * pNode = cxx_array_node_allocator().NewBlock(sizeof(array_node) + sizeof(atomic_node_ptr) * (nSize - 1), pParent, idxParent);
-            new (pNode->nodes) atomic_node_ptr[nSize];
-            return pNode;
-        }
-
-        static void free_array_node(array_node * parr)
-        {
-            cxx_array_node_allocator().Delete(parr);
-        }
-
-        void destroy_tree()
-        {
-            // The function is not thread-safe. For use in dtor only
-            // Remove data node
-            clear();
-
-            // Destroy all array nodes
-            destroy_array_nodes(m_Head, head_size());
-        }
-
-        void destroy_array_nodes(array_node * pArr, size_t nSize)
-        {
-            for (atomic_node_ptr * p = pArr->nodes, *pLast = pArr->nodes + nSize; p != pLast; ++p) {
-                node_ptr slot = p->load(memory_model::memory_order_relaxed);
-                if (slot.bits() == flag_array_node) {
-                    destroy_array_nodes(to_array(slot.ptr()), array_node_size());
-                    free_array_node(to_array(slot.ptr()));
-                    p->store(node_ptr(), memory_model::memory_order_relaxed);
-                }
-            }
-        }
-
-        void gather_level_statistics(std::vector<feldman_hashset::level_statistics>& stat, size_t nLevel, array_node * pArr, size_t nSize) const
-        {
-            if (stat.size() <= nLevel) {
-                stat.resize(nLevel + 1);
-                stat[nLevel].node_capacity = nSize;
-            }
-
-            ++stat[nLevel].array_node_count;
-            for (atomic_node_ptr * p = pArr->nodes, *pLast = pArr->nodes + nSize; p != pLast; ++p) {
-                node_ptr slot = p->load(memory_model::memory_order_relaxed);
-                if (slot.bits() == flag_array_node) {
-                    ++stat[nLevel].array_cell_count;
-                    gather_level_statistics(stat, nLevel + 1, to_array(slot.ptr()), array_node_size());
-                }
-                else if (slot.bits() == flag_array_converting)
-                    ++stat[nLevel].array_cell_count;
-                else if (slot.ptr())
-                    ++stat[nLevel].data_cell_count;
-                else
-                    ++stat[nLevel].empty_cell_count;
-            }
-        }
-
         void clear_array(array_node * pArrNode, size_t nSize)
         {
             back_off bkoff;
@@ -1357,22 +1202,22 @@ namespace cds { namespace intrusive {
             for (atomic_node_ptr * pArr = pArrNode->nodes, *pLast = pArr + nSize; pArr != pLast; ++pArr) {
                 while (true) {
                     node_ptr slot = pArr->load(memory_model::memory_order_acquire);
-                    if (slot.bits() == flag_array_node) {
+                    if (slot.bits() == base_class::flag_array_node ) {
                         // array node, go down the tree
                         assert(slot.ptr() != nullptr);
                         clear_array(to_array(slot.ptr()), array_node_size());
                         break;
                     }
-                    else if (slot.bits() == flag_array_converting) {
+                    else if (slot.bits() == base_class::flag_array_converting ) {
                         // the slot is converting to array node right now
-                        while ((slot = pArr->load(memory_model::memory_order_acquire)).bits() == flag_array_converting) {
+                        while ((slot = pArr->load(memory_model::memory_order_acquire)).bits() == base_class::flag_array_converting ) {
                             bkoff();
-                            m_Stat.onSlotConverting();
+                            stats().onSlotConverting();
                         }
                         bkoff.reset();
 
                         assert(slot.ptr() != nullptr);
-                        assert(slot.bits() == flag_array_node);
+                        assert(slot.bits() == base_class::flag_array_node );
                         clear_array(to_array(slot.ptr()), array_node_size());
                         break;
                     }
@@ -1382,7 +1227,7 @@ namespace cds { namespace intrusive {
                             if (slot.ptr()) {
                                 gc::template retire_ptr<disposer>(slot.ptr());
                                 --m_ItemCounter;
-                                m_Stat.onEraseSuccess();
+                                stats().onEraseSuccess();
                             }
                             break;
                         }
@@ -1390,57 +1235,6 @@ namespace cds { namespace intrusive {
                 }
             }
         }
-
-        bool expand_slot(array_node * pParent, size_t idxParent, node_ptr current, size_t nOffset)
-        {
-            assert(current.bits() == 0);
-            assert(current.ptr());
-
-            size_t idx = hash_splitter(hash_accessor()(*current.ptr()), nOffset).cut(m_Metrics.array_node_size_log);
-            array_node * pArr = alloc_array_node(pParent, idxParent);
-
-            node_ptr cur(current.ptr());
-            atomic_node_ptr& slot = pParent->nodes[idxParent];
-            if (!slot.compare_exchange_strong(cur, cur | flag_array_converting, memory_model::memory_order_release, atomics::memory_order_relaxed))
-            {
-                m_Stat.onExpandNodeFailed();
-                free_array_node(pArr);
-                return false;
-            }
-
-            pArr->nodes[idx].store(current, memory_model::memory_order_release);
-
-            cur = cur | flag_array_converting;
-            CDS_VERIFY(
-                slot.compare_exchange_strong(cur, node_ptr(to_node(pArr), flag_array_node), memory_model::memory_order_release, atomics::memory_order_relaxed)
-                );
-
-            m_Stat.onExpandNodeSuccess();
-            m_Stat.onArrayNodeCreated();
-            return true;
-        }
-
-        union converter {
-            value_type * pData;
-            array_node * pArr;
-
-            converter(value_type * p)
-                : pData(p)
-            {}
-
-            converter(array_node * p)
-                : pArr(p)
-            {}
-        };
-
-        static array_node * to_array(value_type * p)
-        {
-            return converter(p).pArr;
-        }
-        static value_type * to_node(array_node * p)
-        {
-            return converter(p).pData;
-        }
         //@endcond
     };