Fixed spplit-list bucket allocation function
[libcds.git] / cds / intrusive / split_list_rcu.h
index 73f05a26fd3a0d9058de5c727e5cccd82337c117..68949e600798350209a8745d68acd3d62a24899a 100644 (file)
@@ -901,7 +901,7 @@ namespace cds { namespace intrusive {
             return nBucket & ~( 1 << bitop::MSBnz( nBucket ) );
         }
 
-        aux_node_type * init_bucket( size_t nBucket )
+        aux_node_type * init_bucket( size_t const nBucket )
         {
             assert( nBucket > 0 );
             size_t nParent = parent_bucket( nBucket );
@@ -914,32 +914,41 @@ namespace cds { namespace intrusive {
 
             assert( pParentBucket != nullptr );
 
-            // Allocate a dummy node for new bucket
-            {
-                aux_node_type * pBucket = alloc_aux_node( split_list::dummy_hash( nBucket ) );
-                if ( m_List.insert_aux_node( pParentBucket, pBucket ) ) {
-                    m_Buckets.bucket( nBucket, pBucket );
-                    m_Stat.onNewBucket();
+            // Allocate an aux node for new bucket
+            aux_node_type * pBucket = m_Buckets.bucket( nBucket );
+
+            back_off bkoff;
+            for ( ;; pBucket = m_Buckets.bucket( nBucket ) ) {
+                if ( pBucket )
                     return pBucket;
+
+                pBucket = alloc_aux_node( split_list::dummy_hash( nBucket ) );
+                if ( pBucket ) {
+                    if ( m_List.insert_aux_node( pParentBucket, pBucket ) ) {
+                        m_Buckets.bucket( nBucket, pBucket );
+                        m_Stat.onNewBucket();
+                        return pBucket;
+                    }
+
+                    // Another thread set the bucket. Wait while it done
+                    free_aux_node( pBucket );
+                    m_Stat.onBucketInitContenton();
+                    break;
                 }
-                free_aux_node( pBucket );
+
+                // There are no free buckets. It means that the bucket table is full
+                // Wait while another thread set the bucket or a free bucket will be available
+                m_Stat.onBucketsExhausted();
+                bkoff();
             }
 
             // Another thread set the bucket. Wait while it done
-
-            // In this point, we must wait while nBucket is empty.
-            // The compiler can decide that waiting loop can be "optimized" (stripped)
-            // To prevent this situation, we use waiting on volatile bucket_head_ptr pointer.
-            //
-            m_Stat.onBucketInitContenton();
-            back_off bkoff;
-            while ( true ) {
-                aux_node_type volatile * p = m_Buckets.bucket( nBucket );
-                if ( p != nullptr )
-                    return const_cast<aux_node_type *>( p );
+            for ( pBucket = m_Buckets.bucket( nBucket ); pBucket == nullptr; pBucket = m_Buckets.bucket( nBucket ) ) {
                 bkoff();
                 m_Stat.onBusyWaitBucketInit();
             }
+
+            return pBucket;
         }
 
         aux_node_type * get_bucket( size_t nHash )