Changed internal structure of MultiLevelHashSet node to support thread-safe iterators
authorkhizmax <libcds.dev@gmail.com>
Mon, 10 Aug 2015 18:51:01 +0000 (21:51 +0300)
committerkhizmax <libcds.dev@gmail.com>
Mon, 10 Aug 2015 18:51:01 +0000 (21:51 +0300)
cds/intrusive/details/multilevel_hashset_base.h
cds/intrusive/impl/multilevel_hashset.h

index c47ea81793c7d6efce163576ae7c152c4c88fbc2..ba0f58c97251d5ba0575aba7a62f570f2827e26f 100644 (file)
@@ -34,34 +34,6 @@ namespace cds { namespace intrusive {
             //@endcond
         };
 
-        /// Head node allocator option
-        /**
-            @copydetails traits::head_node_allocator
-        */
-        template <typename Accessor>
-        struct head_node_allocator {
-            //@cond
-            template <typename Base> struct pack: public Base
-            {
-                typedef Accessor head_node_allocator;
-            };
-            //@endcond
-        };
-
-        /// Array node allocator option
-        /**
-            @copydetails traits::array_node_allocator
-        */
-        template <typename Accessor>
-        struct array_node_allocator {
-            //@cond
-            template <typename Base> struct pack: public Base
-            {
-                typedef Accessor array_node_allocator;
-            };
-            //@endcond
-        };
-
         /// \p MultiLevelHashSet internal statistics
         template <typename EventCounter = cds::atomicity::event_counter>
         struct stat {
@@ -188,21 +160,12 @@ namespace cds { namespace intrusive {
             */
             typedef cds::atomicity::item_counter item_counter;
 
-            /// Head node allocator
-            /**
-                Allocator for head node. That allocator uses only in the set's constructor for allocating
-                main head array and in the destructor for destroying the head array.
-                Default is \ref CDS_DEFAULT_ALLOCATOR
-            */
-            typedef CDS_DEFAULT_ALLOCATOR head_node_allocator;
-
             /// Array node allocator
             /**
-                Allocator for array nodes. That allocator is used for creating \p arrayNode when the set grows.
-                The size of each array node is fixed.
+                Allocator for array nodes. That allocator is used for creating \p headNode and \p arrayNode when the set grows.
                 Default is \ref CDS_DEFAULT_ALLOCATOR
             */
-            typedef CDS_DEFAULT_ALLOCATOR array_node_allocator;
+            typedef CDS_DEFAULT_ALLOCATOR node_allocator;
 
             /// C++ memory ordering model
             /**
@@ -233,10 +196,8 @@ namespace cds { namespace intrusive {
             Supported \p Options are:
             - \p multilevel_hashset::hash_accessor - mandatory option, hash accessor functor.
                 @copydetails traits::hash_accessor
-            - \p multilevel_hashset::head_node_allocator - head node allocator.
-                @copydetails traits::head_node_allocator
-            - \p multilevel_hashset::array_node_allocator - array node allocator.
-                @copydetails traits::array_node_allocator
+            - \p opt::node_allocator - array node allocator.
+                @copydetails traits::node_allocator
             - \p opt::compare - hash comparison functor. No default functor is provided.
                 If the option is not specified, the \p opt::less is used.
             - \p opt::less - specifies binary predicate used for hash comparison. 
index 4924713d6755330966b8e9f779d07e7a69527bd8..286872af68a133061ffa3e71a676dabf5a775abb 100644 (file)
@@ -114,12 +114,11 @@ namespace cds { namespace intrusive {
         >::type hash_comparator;
 #   endif
 
-        typedef typename traits::item_counter         item_counter;         ///< Item counter type
-        typedef typename traits::head_node_allocator  head_node_allocator;  ///< Head node allocator
-        typedef typename traits::array_node_allocator array_node_allocator; ///< Array node allocator
-        typedef typename traits::memory_model         memory_model;         ///< Memory model
-        typedef typename traits::back_off             back_off;             ///< Backoff strategy
-        typedef typename traits::stat                 stat;                 ///< Internal statistics type
+        typedef typename traits::item_counter   item_counter;   ///< Item counter type
+        typedef typename traits::node_allocator node_allocator; ///< Array node allocator
+        typedef typename traits::memory_model   memory_model;   ///< Memory model
+        typedef typename traits::back_off       back_off;       ///< Backoff strategy
+        typedef typename traits::stat           stat;           ///< Internal statistics type
 
         typedef typename gc::template guarded_ptr< value_type > guarded_ptr; ///< Guarded pointer
 
@@ -134,12 +133,26 @@ namespace cds { namespace intrusive {
     protected:
         //@cond
         enum node_flags {
-            array_converting = 1,   ///< the cell is converting from data node to an array node
-            array_node = 2          ///< the cell is a pointer to an array node
+            flag_array_converting = 1,   ///< the cell is converting from data node to an array node
+            flag_array_node = 2          ///< the cell is a pointer to an array node
         };
 
-        typedef cds::details::Allocator< atomic_node_ptr, head_node_allocator > cxx_head_node_allocator;
-        typedef cds::details::Allocator< atomic_node_ptr, array_node_allocator > cxx_array_node_allocator;
+        struct array_node {
+            array_node * const  pParent;    ///< parent array node
+            size_t const        idxParent;  ///< index in parent array node
+            atomic_node_ptr     nodes[1];   ///< node array
+
+            array_node( array_node * parent, size_t idx )
+                : pParent( parent )
+                , idxParent( idx )
+            {}
+
+            array_node() = delete;
+            array_node( array_node const&) = delete;
+            array_node( array_node&& ) = delete;
+        };
+
+        typedef cds::details::Allocator< array_node, node_allocator > cxx_array_node_allocator;
 
         typedef multilevel_hashset::details::hash_splitter< hash_type > hash_splitter;
 
@@ -149,13 +162,12 @@ namespace cds { namespace intrusive {
             size_t  array_node_size;    // power-of-two
             size_t  array_node_size_log;// log2( array_node_size )
         };
-
         //@endcond
 
     private:
         //@cond
         metrics const     m_Metrics;     ///< Metrics
-        atomic_node_ptr * m_Head;        ///< Head array
+        array_node *      m_Head;        ///< Head array
         item_counter      m_ItemCounter; ///< Item counter
         stat              m_Stat;        ///< Internal statistics
         //@endcond
@@ -174,14 +186,14 @@ namespace cds { namespace intrusive {
         */
         MultiLevelHashSet( size_t head_bits = 8, size_t array_bits = 4 )
             : m_Metrics(make_metrics( head_bits, array_bits ))
-            , m_Head( cxx_head_node_allocator().NewArray( head_size(), nullptr ))
+            , m_Head( alloc_head_node())
         {}
 
         /// Destructs the set and frees all data
         ~MultiLevelHashSet()
         {
             destroy_tree();
-            cxx_array_node_allocator().Delete( m_Head, head_size() );
+            free_array_node( m_Head );
         }
 
         /// Inserts new node
@@ -225,13 +237,13 @@ namespace cds { namespace intrusive {
             back_off bkoff;
 
             size_t nOffset = m_Metrics.head_node_size_log;
-            atomic_node_ptr * pArr = m_Head;
+            array_node * pArr = m_Head;
             size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
             assert( nSlot < m_Metrics.head_node_size );
 
             while ( true ) {
-                node_ptr slot = pArr[nSlot].load( memory_model::memory_order_acquire );
-                if ( slot.bits() == array_node ) {
+                node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
+                if ( slot.bits() == flag_array_node ) {
                     // array node, go down the tree
                     assert( slot.ptr() != nullptr );
                     nSlot = splitter.cut( m_Metrics.array_node_size_log );
@@ -239,7 +251,7 @@ namespace cds { namespace intrusive {
                     pArr = to_array( slot.ptr() );
                     nOffset += m_Metrics.array_node_size_log;
                 }
-                else if ( slot.bits() == array_converting ) {
+                else if ( slot.bits() == flag_array_converting ) {
                     // the slot is converting to array node right now
                     bkoff();
                     m_Stat.onSlotConverting();
@@ -249,7 +261,7 @@ namespace cds { namespace intrusive {
                     assert(slot.bits() == 0 );
 
                     // protect data node by hazard pointer
-                    if ( guard.protect( pArr[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
+                    if ( guard.protect( pArr->nodes[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
                         // slot value has been changed - retry
                         m_Stat.onSlotChanged();
                     }
@@ -261,12 +273,12 @@ namespace cds { namespace intrusive {
                         }
 
                         // the slot must be expanded
-                        expand_slot( pArr[ nSlot ], slot, nOffset );
+                        expand_slot( pArr, nSlot, slot, nOffset );
                     }
                     else {
                         // the slot is empty, try to insert data node
                         node_ptr pNull;
-                        if ( pArr[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
+                        if ( pArr->nodes[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
                         {
                             // the new data node has been inserted
                             f( val );
@@ -306,14 +318,14 @@ namespace cds { namespace intrusive {
             typename gc::Guard guard;
             back_off bkoff;
 
-            atomic_node_ptr * pArr = m_Head;
+            array_node * pArr = m_Head;
             size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
             assert( nSlot < m_Metrics.head_node_size );
             size_t nOffset = m_Metrics.head_node_size_log;
 
             while ( true ) {
-                node_ptr slot = pArr[nSlot].load( memory_model::memory_order_acquire );
-                if ( slot.bits() == array_node ) {
+                node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
+                if ( slot.bits() == flag_array_node ) {
                     // array node, go down the tree
                     assert( slot.ptr() != nullptr );
                     nSlot = splitter.cut( m_Metrics.array_node_size_log );
@@ -321,7 +333,7 @@ namespace cds { namespace intrusive {
                     pArr = to_array( slot.ptr() );
                     nOffset += m_Metrics.array_node_size_log;
                 }
-                else if ( slot.bits() == array_converting ) {
+                else if ( slot.bits() == flag_array_converting ) {
                     // the slot is converting to array node right now
                     bkoff();
                     m_Stat.onSlotConverting();
@@ -331,7 +343,7 @@ namespace cds { namespace intrusive {
                     assert(slot.bits() == 0 );
 
                     // protect data node by hazard pointer
-                    if ( guard.protect( pArr[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
+                    if ( guard.protect( pArr->nodes[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
                         // slot value has been changed - retry
                         m_Stat.onSlotChanged();
                     }
@@ -344,7 +356,7 @@ namespace cds { namespace intrusive {
                                 return std::make_pair( true, false );
                             }
 
-                            if ( pArr[nSlot].compare_exchange_strong( slot, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
+                            if ( pArr->nodes[nSlot].compare_exchange_strong( slot, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
                                 // slot can be disposed
                                 gc::template retire<disposer>( slot.ptr() );
                                 m_Stat.onUpdateExisting();
@@ -356,13 +368,13 @@ namespace cds { namespace intrusive {
                         }
 
                         // the slot must be expanded
-                        expand_slot( pArr[ nSlot ], slot, nOffset );
+                        expand_slot( pArr, nSlot, slot, nOffset );
                     }
                     else {
                         // the slot is empty, try to insert data node
                         if ( bInsert ) {
                             node_ptr pNull;
-                            if ( pArr[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
+                            if ( pArr->nodes[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
                             {
                                 // the new data node has been inserted
                                 ++m_ItemCounter;
@@ -643,20 +655,27 @@ namespace cds { namespace intrusive {
             return m;
         }
 
-        template <typename Q>
-        static bool check_node_alignment( Q const* p )
+        array_node * alloc_head_node() const
         {
-            return (reinterpret_cast<uintptr_t>(p) & node_ptr::bitmask) == 0;
+            return alloc_array_node( head_size(), nullptr, 0 );
         }
 
-        atomic_node_ptr * alloc_array_node() const
+        array_node * alloc_array_node( array_node * pParent, size_t idxParent ) const
         {
-            return cxx_array_node_allocator().NewArray( array_node_size(), nullptr );
+            return alloc_array_node( array_node_size(), pParent, idxParent );
         }
 
-        void free_array_node( atomic_node_ptr * parr ) const
+        static array_node * alloc_array_node( size_t nSize, array_node * pParent, size_t idxParent )
         {
-            cxx_array_node_allocator().Delete( parr, array_node_size() );
+            array_node * pNode = cxx_array_node_allocator().NewBlock( sizeof(array_node) + sizeof(atomic_node_ptr) * (nSize - 1), pParent, idxParent );
+            for ( atomic_node_ptr * p = pNode->nodes, *pEnd = pNode->nodes + nSize; p < pEnd; ++p )
+                p->store( node_ptr(), memory_model::memory_order_release );
+            return pNode;
+        }
+
+        static void free_array_node( array_node * parr )
+        {
+            cxx_array_node_allocator().Delete( parr );
         }
 
         void destroy_tree()
@@ -669,41 +688,42 @@ namespace cds { namespace intrusive {
             destroy_array_nodes( m_Head, head_size());
         }
 
-        void destroy_array_nodes( atomic_node_ptr * pArr, size_t nSize )
+        void destroy_array_nodes( array_node * pArr, size_t nSize )
         {
-            for ( atomic_node_ptr * pLast = pArr + nSize; pArr != pLast; ++pArr ) {
-                node_ptr slot = pArr->load( memory_model::memory_order_acquire );
-                if ( slot.bits() == array_node ) {
+            for ( atomic_node_ptr * p = pArr->nodes, *pLast = pArr->nodes + nSize; p != pLast; ++p ) {
+                node_ptr slot = p->load( memory_model::memory_order_acquire );
+                if ( slot.bits() == flag_array_node ) {
                     destroy_array_nodes(to_array(slot.ptr()), array_node_size());
                     free_array_node( to_array(slot.ptr()));
-                    pArr->store(node_ptr(), memory_model::memory_order_relaxed );
+                    p->store(node_ptr(), memory_model::memory_order_relaxed );
                 }
             }
         }
 
-        void clear_array( atomic_node_ptr * pArr, size_t nSize )
+        void clear_array( array_node * pArrNode, size_t nSize )
         {
             back_off bkoff;
 
-            for ( atomic_node_ptr * pLast = pArr + nSize; pArr != pLast; ++pArr ) {
+
+            for ( atomic_node_ptr * pArr = pArrNode->nodes, *pLast = pArr + nSize; pArr != pLast; ++pArr ) {
                 while ( true ) {
                     node_ptr slot = pArr->load( memory_model::memory_order_acquire );
-                    if ( slot.bits() == array_node ) {
+                    if ( slot.bits() == flag_array_node ) {
                         // array node, go down the tree
                         assert( slot.ptr() != nullptr );
                         clear_array( to_array( slot.ptr()), array_node_size() );
                         break;
                     }
-                    else if ( slot.bits() == array_converting ) {
+                    else if ( slot.bits() == flag_array_converting ) {
                         // the slot is converting to array node right now
-                        while ( (slot = pArr->load(memory_model::memory_order_acquire)).bits() == array_converting ) {
+                        while ( (slot = pArr->load(memory_model::memory_order_acquire)).bits() == flag_array_converting ) {
                             bkoff();
                             m_Stat.onSlotConverting();
                         }
                         bkoff.reset();
 
                         assert( slot.ptr() != nullptr );
-                        assert(slot.bits() == array_node );
+                        assert(slot.bits() == flag_array_node );
                         clear_array( to_array( slot.ptr()), array_node_size() );
                         break;
                     }
@@ -724,46 +744,48 @@ namespace cds { namespace intrusive {
 
         union converter {
             value_type * pData;
-            atomic_node_ptr * pArr;
+            array_node * pArr;
 
             converter( value_type * p )
                 : pData( p )
             {}
 
-            converter( atomic_node_ptr * p )
+            converter( array_node * p )
                 : pArr( p )
             {}
         };
 
-        static atomic_node_ptr * to_array( value_type * p )
+        static array_node * to_array( value_type * p )
         {
             return converter( p ).pArr;
         }
-        static value_type * to_node( atomic_node_ptr * p )
+        static value_type * to_node( array_node * p )
         {
             return converter( p ).pData;
         }
 
-        bool expand_slot( atomic_node_ptr& slot, node_ptr current, size_t nOffset )
+        bool expand_slot( array_node * pParent, size_t idxParent, node_ptr current, size_t nOffset )
         {
             assert( current.bits() == 0 );
             assert( current.ptr() );
 
             size_t idx = hash_splitter(hash_accessor()(*current.ptr()), nOffset).cut( m_Metrics.array_node_size_log );
-            atomic_node_ptr * pArr = alloc_array_node();
+            array_node * pArr = alloc_array_node( pParent, idxParent );
 
             node_ptr cur(current.ptr());
-            if ( !slot.compare_exchange_strong( cur, cur | array_converting, memory_model::memory_order_release, atomics::memory_order_relaxed )) {
+            atomic_node_ptr& slot = pParent->nodes[idxParent];
+            if ( !slot.compare_exchange_strong( cur, cur | flag_array_converting, memory_model::memory_order_release, atomics::memory_order_relaxed )) 
+            {
                 m_Stat.onExpandNodeFailed();
                 free_array_node( pArr );
                 return false;
             }
 
-            pArr[idx].store( current, memory_model::memory_order_release );
+            pArr->nodes[idx].store( current, memory_model::memory_order_release );
 
-            cur = cur | array_converting;
+            cur = cur | flag_array_converting;
             CDS_VERIFY( 
-                slot.compare_exchange_strong( cur, node_ptr( to_node( pArr ), array_node ), memory_model::memory_order_release, atomics::memory_order_relaxed )
+                slot.compare_exchange_strong( cur, node_ptr( to_node( pArr ), flag_array_node ), memory_model::memory_order_release, atomics::memory_order_relaxed )
             );
 
             m_Stat.onArrayNodeCreated();
@@ -776,20 +798,20 @@ namespace cds { namespace intrusive {
             hash_comparator cmp;
             back_off bkoff;
 
-            atomic_node_ptr * pArr = m_Head;
+            array_node * pArr = m_Head;
             size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
             assert( nSlot < m_Metrics.head_node_size );
 
             while ( true ) {
-                node_ptr slot = pArr[nSlot].load( memory_model::memory_order_acquire );
-                if ( slot.bits() == array_node ) {
+                node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
+                if ( slot.bits() == flag_array_node ) {
                     // array node, go down the tree
                     assert( slot.ptr() != nullptr );
                     nSlot = splitter.cut( m_Metrics.array_node_size_log );
                     assert( nSlot < m_Metrics.array_node_size );
                     pArr = to_array( slot.ptr() );
                 }
-                else if ( slot.bits() == array_converting ) {
+                else if ( slot.bits() == flag_array_converting ) {
                     // the slot is converting to array node right now
                     bkoff();
                     m_Stat.onSlotConverting();
@@ -799,7 +821,7 @@ namespace cds { namespace intrusive {
                     assert(slot.bits() == 0 );
 
                     // protect data node by hazard pointer
-                    if ( guard.protect( pArr[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
+                    if ( guard.protect( pArr->nodes[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
                         // slot value has been changed - retry
                         m_Stat.onSlotChanged();
                     }
@@ -821,20 +843,20 @@ namespace cds { namespace intrusive {
             hash_comparator cmp;
             back_off bkoff;
 
-            atomic_node_ptr * pArr = m_Head;
+            array_node * pArr = m_Head;
             size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
             assert( nSlot < m_Metrics.head_node_size );
 
             while ( true ) {
-                node_ptr slot = pArr[nSlot].load( memory_model::memory_order_acquire );
-                if ( slot.bits() == array_node ) {
+                node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
+                if ( slot.bits() == flag_array_node ) {
                     // array node, go down the tree
                     assert( slot.ptr() != nullptr );
                     nSlot = splitter.cut( m_Metrics.array_node_size_log );
                     assert( nSlot < m_Metrics.array_node_size );
                     pArr = to_array( slot.ptr() );
                 }
-                else if ( slot.bits() == array_converting ) {
+                else if ( slot.bits() == flag_array_converting ) {
                     // the slot is converting to array node right now
                     bkoff();
                     m_Stat.onSlotConverting();
@@ -844,14 +866,14 @@ namespace cds { namespace intrusive {
                     assert(slot.bits() == 0 );
 
                     // protect data node by hazard pointer
-                    if ( guard.protect( pArr[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
+                    if ( guard.protect( pArr->nodes[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
                         // slot value has been changed - retry
                         m_Stat.onSlotChanged();
                     }
                     else if ( slot.ptr() ) {
                         if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 && pred( *slot.ptr() )) {
                             // item found - replace it with nullptr
-                            if ( pArr[nSlot].compare_exchange_strong(slot, node_ptr(nullptr), memory_model::memory_order_acquire, atomics::memory_order_relaxed))
+                            if ( pArr->nodes[nSlot].compare_exchange_strong(slot, node_ptr(nullptr), memory_model::memory_order_acquire, atomics::memory_order_relaxed))
                                 return slot.ptr();
                             m_Stat.onEraseRetry();
                             continue;