Added MltiLevelHashSet bidirectional thread-safe iterators
[libcds.git] / cds / intrusive / impl / multilevel_hashset.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_INTRUSIVE_IMPL_MULTILEVEL_HASHSET_H
4 #define CDSLIB_INTRUSIVE_IMPL_MULTILEVEL_HASHSET_H
5
6 #include <tuple> // std::tie
7 #include <functional> // std::ref
8
9 #include <cds/intrusive/details/multilevel_hashset_base.h>
10 #include <cds/details/allocator.h>
11
12 namespace cds { namespace intrusive {
13     /// Intrusive hash set based on multi-level array
14     /** @ingroup cds_intrusive_map
15         @anchor cds_intrusive_MultilevelHashSet_hp
16
17         Source:
18         - [2013] Steven Feldman, Pierre LaBorde, Damian Dechev "Concurrent Multi-level Arrays:
19                  Wait-free Extensible Hash Maps"
20
21         [From the paper] The hardest problem encountered while developing a parallel hash map is how to perform
22         a global resize, the process of redistributing the elements in a hash map that occurs when adding new
23         buckets. The negative impact of blocking synchronization is multiplied during a global resize, because all
24         threads will be forced to wait on the thread that is performing the involved process of resizing the hash map
25         and redistributing the elements. \p %MultilevelHashSet implementation avoids global resizes through new array
26         allocation. By allowing concurrent expansion this structure is free from the overhead of an explicit resize,
27         which facilitates concurrent operations.
28
29         The presented design includes dynamic hashing, the use of sub-arrays within the hash map data structure;
30         which, in combination with <b>perfect hashing</b>, means that each element has a unique final, as well as current, position.
31         It is important to note that the perfect hash function required by our hash map is trivial to realize as
32         any hash function that permutes the bits of the key is suitable. This is possible because of our approach
33         to the hash function; we require that it produces hash values that are equal in size to that of the key.
34         We know that if we expand the hash map a fixed number of times there can be no collision as duplicate keys
35         are not provided for in the standard semantics of a hash map.
36
37         \p %MultiLevelHashSet is a multi-level array whitch has a structure similar to a tree:
38         @image html multilevel_hashset.png
39         The multi-level array differs from a tree in that each position on the tree could hold an array of nodes or a single node.
40         A position that holds a single node is a \p dataNode which holds the hash value of a key and the value that is associated\r
41         with that key; it is a simple struct holding two variables. A \p dataNode in the multi-level array could be marked.\r
42         A \p markedDataNode refers to a pointer to a \p dataNode that has been bitmarked at the least significant bit (LSB)\r
43         of the pointer to the node. This signifies that this \p dataNode is contended. An expansion must occur at this node;\r
44         any thread that sees this \p markedDataNode will try to replace it with an \p arrayNode; which is a position that holds\r
45         an array of nodes. The pointer to an \p arrayNode is differentiated from that of a pointer to a \p dataNode by a bitmark\r
46         on the second-least significant bit.\r
47 \r
48         \p %MultiLevelHashSet multi-level array is similar to a tree in that we keep a pointer to the root, which is a memory array\r
49         called \p head. The length of the \p head memory array is unique, whereas every other \p arrayNode has a uniform length;\r
50         a normal \p arrayNode has a fixed power-of-two length equal to the binary logarithm of a variable called \p arrayLength.\r
51         The maximum depth of the tree, \p maxDepth, is the maximum number of pointers that must be followed to reach any node.\r
52         We define \p currentDepth as the number of memory arrays that we need to traverse to reach the \p arrayNode on which\r
53         we need to operate; this is initially one, because of \p head.\r
54 \r
55         That approach to the structure of the hash map uses an extensible hashing scheme; <b> the hash value is treated as a bit\r
56         string</b> and rehash incrementally.\r
57 \r
58         @note Two important things you should keep in mind when you're using \p %MultiLevelHashSet:\r
59         - all keys must be fixed-size. It means that you cannot use \p std::string as a key for \p %MultiLevelHashSet.\r
60           Instead, for the strings you should use well-known hashing algorithms like <a href="https://en.wikipedia.org/wiki/Secure_Hash_Algorithm">SHA1, SHA2</a>,\r
61           <a href="https://en.wikipedia.org/wiki/MurmurHash">MurmurHash</a>, <a href="https://en.wikipedia.org/wiki/CityHash">CityHash</a> \r
62           or its successor <a href="https://code.google.com/p/farmhash/">FarmHash</a> and so on, which\r
63           converts variable-length strings to fixed-length bit-strings, and use that hash as a key in \p %MultiLevelHashSet.\r
64         - \p %MultiLevelHashSet uses a perfect hashing. It means that if two different keys, for example, of type \p std::string,\r
65           have identical hash then you cannot insert both that keys in the set. \p %MultiLevelHashSet does not maintain the key,\r
66           it maintains its fixed-size hash value.\r
67 \r
68         The set supports bidirectional thread-safe iterators. You may iterate over the set in multi-threaded environment.\r
69         It is guaranteed that the iterators will be valid even if you or another thread delete the node the iterator points to:\r
70         Hazard Pointer built into the iterator object protects the node from physical reclamation.\r
71 \r
72         Template parameters:\r
73         - \p GC - safe memory reclamation schema. Can be \p gc::HP, \p gc::DHP or one of \ref cds_urcu_type "RCU type"\r
74         - \p T - a value type to be stored in the set\r
75         - \p Traits - type traits, the structure based on \p multilevel_hashset::traits or result of \p multilevel_hashset::make_traits metafunction\r
76 \r
77         There are several specializations of \p %MultiLevelHashSet for each \p GC. You should include:
78         - <tt><cds/intrusive/multilevel_hashset_hp.h></tt> for \p gc::HP garbage collector
79         - <tt><cds/intrusive/multilevel_hashset_dhp.h></tt> for \p gc::DHP garbage collector
80         - <tt><cds/intrusive/multilevel_hashset_rcu.h></tt> for \ref cds_intrusive_MultiLevelHashSet_rcu "RCU type"
81     */
82     template <
83         class GC
84         ,typename T
85 #ifdef CDS_DOXYGEN_INVOKED
86        ,typename Traits = multilevel_hashset::traits
87 #else
88        ,typename Traits
89 #endif
90     >
91     class MultiLevelHashSet
92     {
93     public:
94         typedef GC      gc;         ///< Garbage collector
95         typedef T       value_type; ///< type of value stored in the set
96         typedef Traits  traits;     ///< Traits template parameter, see \p multilevel_hashset::traits
97
98         typedef typename traits::hash_accessor hash_accessor; ///< Hash accessor functor
99         static_assert(!std::is_same< hash_accessor, cds::opt::none >::value, "hash_accessor functor must be specified" );
100
101         /// Hash type defined as \p hash_accessor return type
102         typedef typename std::decay< 
103             typename std::remove_reference<
104                 decltype( hash_accessor()( std::declval<T>()) )
105             >::type
106         >::type hash_type;
107         static_assert( !std::is_pointer<hash_type>::value, "hash_accessor should return a reference to hash value" );
108
109         typedef typename traits::disposer disposer; ///< data node disposer
110
111 #   ifdef CDS_DOXYGEN_INVOKED
112         typedef implementation_defined hash_comparator  ;    ///< hash compare functor based on opt::compare and opt::less option setter
113 #   else
114         typedef typename cds::opt::details::make_comparator_from< 
115             hash_type,
116             traits,
117             multilevel_hashset::bitwise_compare< hash_type >
118         >::type hash_comparator;
119 #   endif
120
121         typedef typename traits::item_counter   item_counter;   ///< Item counter type
122         typedef typename traits::node_allocator node_allocator; ///< Array node allocator
123         typedef typename traits::memory_model   memory_model;   ///< Memory model
124         typedef typename traits::back_off       back_off;       ///< Backoff strategy
125         typedef typename traits::stat           stat;           ///< Internal statistics type
126
127         typedef typename gc::template guarded_ptr< value_type > guarded_ptr; ///< Guarded pointer
128
129         /// Count of hazard pointers required
130         static CDS_CONSTEXPR size_t const c_nHazardPtrCount = 1;
131
132         /// Node marked poiter
133         typedef cds::details::marked_ptr< value_type, 3 > node_ptr;
134         /// Array node element
135         typedef atomics::atomic< node_ptr > atomic_node_ptr;
136
137     protected:
138         //@cond
139         enum node_flags {
140             flag_array_converting = 1,   ///< the cell is converting from data node to an array node
141             flag_array_node = 2          ///< the cell is a pointer to an array node
142         };
143
144         struct array_node {
145             array_node * const  pParent;    ///< parent array node
146             size_t const        idxParent;  ///< index in parent array node
147             atomic_node_ptr     nodes[1];   ///< node array
148
149             array_node( array_node * parent, size_t idx )
150                 : pParent( parent )
151                 , idxParent( idx )
152             {}
153
154             array_node() = delete;
155             array_node( array_node const&) = delete;
156             array_node( array_node&& ) = delete;
157         };
158
159         typedef cds::details::Allocator< array_node, node_allocator > cxx_array_node_allocator;
160
161         typedef multilevel_hashset::details::hash_splitter< hash_type > hash_splitter;
162
163         struct metrics {
164             size_t  head_node_size;     // power-of-two
165             size_t  head_node_size_log; // log2( head_node_size )
166             size_t  array_node_size;    // power-of-two
167             size_t  array_node_size_log;// log2( array_node_size )
168         };
169         //@endcond
170
171     protected:
172         /// Bidirectional iterator class
173         /**
174             Each iterator object owns one Hazard Pointer that is a limited resource.
175
176             The iterator guarantees that while it points to a node
177             that node is guarded by Hazard Pointer and cannot be physically deleted but it can be excluded from the set.
178
179             The iterator object is a thread-private resource and should not be passed to another thread.
180
181             @note The iterator does not support post-increment/post-decrement operators.
182         */
183         template <bool IsConst>
184         class bidirectional_iterator
185         {
186             //@cond
187             friend class MultLevelHashSet;
188
189             array_node *        m_pNode;    ///< current array node
190             size_t              m_idx;      ///< current position in m_pNode
191             typename gc::Guard  m_guard;    ///< HP guard
192             MultiLevelHashSet const*  m_set;    ///< Hash set
193             //@endcond
194
195         public:
196             typedef typename std::conditional< IsConst, value_type const*, value_type*>::type value_ptr;
197             typedef typename std::conditional< IsConst, value_type const&, value_type&>::type value_ref;
198
199         public:
200             /// Default ctor
201             bidirectional_iterator() CDS_NOEXCEPT
202                 : m_pNode( nullptr )
203                 , m_idx( 0 )
204                 , m_set( nullptr )
205             {}
206
207             /// Copy ctor
208             bidirectional_iterator( bidirectional_iterator const& rhs ) CDS_NOEXCEPT
209                 : m_pNode( rhs.m_pNode )
210                 , m_idx( rhs.m_idx )
211                 , m_set( rhs.m_set )
212             {
213                 m_guard.copy( rhs.m_guard );
214             }
215
216             bidirectional_iterator& operator=(bidirectional_iterator const& rhs) CDS_NOEXCEPT
217             {
218                 m_pNode = rhs.m_pNode;
219                 m_idx = rhs.m_idx;
220                 m_set = rhs.m_set;
221                 m_guard.copy( rhs.m_guard );
222                 return *this;
223             }
224
225             /// Pre-increment
226             bidirectional_iterator& operator++()
227             {
228                 forward();
229                 return *this;
230             }
231
232             /// Pre-decrement
233             bidirectional_iterator& operator--()
234             {
235                 backward();
236                 return *this;
237             }
238
239             /// Dereference the iterator
240             value_ptr operator ->() const CDS_NOEXCEPT
241             {
242                 return pointer();
243             }
244
245             /// Dereference the iterator
246             value_ref operator *() const CDS_NOEXCEPT
247             {
248                 value_ptr p = pointer();
249                 assert( p );
250                 return *p;
251             }
252
253             ///
254             void release()
255             {
256                 m_guard.clear();
257             }
258
259             /// Comparing two iterators
260             /**
261                 Iterators are equal iff they point to the same cell of the same array node.
262                 @note for two iterators \p it1 and \p it2, the conditon <tt> it1 == it2 </tt> 
263                 does not entail <tt> &(*it1) == &(*it2) </tt>: welcome to concurrent containers
264                 
265             */
266             template <bool IsConst1, bool IsConst2>
267             friend bool operator ==(bidirectional_iterator<IsConst1> const& lhs, bidirectional_iterator<IsConst2> const& rhs)
268             {
269                 return lhs.m_pNode == rhs.m_pNode && lhs.m_idx == rhs.m_idx && lhs.m_set == rhs.m_set;
270             }
271
272             /// Comparing two iterators
273             template <bool IsConst1, bool IsConst2>
274             friend bool operator !=(bidirectional_iterator<IsConst1> const& lhs, bidirectional_iterator<IsConst2> const& rhs)
275             {
276                 return !( lhs == rhs );
277             }
278
279         private:
280             //@cond
281             bidirectional_iterator( MultiLevelHashSet& set, array_node * pNode, size_t idx, bool bForward )
282                 : m_pNode( pNode )
283                 , m_idx( idx )
284                 , m_set( &set )
285             {
286                 if ( bForward )
287                     forward();
288                 else
289                     backward();
290             }
291
292             value_ptr pointer() const CDS_NOEXCEPT
293             {
294                 return m_guard.template get<value_ptr>();
295             }
296
297             void forward()
298             {
299                 assert( m_set != nullptr );
300                 assert( m_pNode != nullptr );
301
302                 size_t const arrayNodeSize = m_set->array_node_size();
303                 size_t const headSize = m_set->head_size();
304                 array_node * pNode = m_pNode;
305                 size_t idx = m_idx + 1;
306                 size_t nodeSize = m_pNode->pParent? arrayNodeSize : headSize;
307
308                 for ( ;; ) {
309                     if ( idx < nodeSize ) {
310                         node_ptr slot = pNode->nodes[idx].load( memory_model::memory_order_acquire );
311                         if ( slot.bits() == flag_array_node ) {
312                             // array node, go down the tree
313                             assert( slot.ptr() != nullptr );
314                             pNode = to_array( slot.ptr() );
315                             idx = 0;
316                             nodeSize = arrayNodeSize;
317                         }
318                         else if ( slot.bits() == flag_array_converting ) {
319                             // the slot is converting to array node right now - skip the node
320                             ++idx;
321                         }
322                         else if ( slot.ptr() ) {
323                             // data node
324                             if ( m_guard.protect( pNode->nodes[idx], [](node_ptr p) -> value_type * { return p.ptr(); }) == slot ) {
325                                 m_pNode = pNode;
326                                 m_idx = idx;
327                                 return;
328                             }
329                         }
330                     }
331                     else {
332                         // up to parent node
333                         if ( pNode->pParent ) {
334                             idx = pNode->m_idx + 1;
335                             pNode = pNode->pParent;
336                             nodeSize = pNode->pParent ? arrayNodeSize : headSize;
337                         }
338                         else {
339                             // end()
340                             assert( pNode == m_set->m_Head );
341                             assert( idx == headSize );
342                             m_pNode = pNode;
343                             m_idx = idx;
344                             return;
345                         }
346                     }
347                 }
348             }
349
350             void backward()
351             {
352                 assert( m_set != nullptr );
353                 assert( m_pNode != nullptr );
354
355                 size_t const arrayNodeSize = m_set->array_node_size();
356                 size_t const headSize = m_set->head_size();
357                 size_t const endIdx = size_t(0) - 1;
358
359                 array_node * pNode = m_pNode;
360                 size_t idx = m_idx - 1;
361                 size_t nodeSize = m_pNode->pParent? arrayNodeSize : headSize;
362
363                 for ( ;; ) {
364                     if ( idx != endIdx ) {
365                         node_ptr slot = pNode->nodes[idx].load( memory_model::memory_order_acquire );
366                         if ( slot.bits() == flag_array_node ) {
367                             // array node, go down the tree
368                             assert( slot.ptr() != nullptr );
369                             pNode = to_array( slot.ptr() );
370                             nodeSize = arrayNodeSize;
371                             idx = nodeSize - 1;
372                         }
373                         else if ( slot.bits() == flag_array_converting ) {
374                             // the slot is converting to array node right now - skip the node
375                             --idx;
376                         }
377                         else if ( slot.ptr() ) {
378                             // data node
379                             if ( m_guard.protect( pNode->nodes[idx], [](node_ptr p) -> value_type * { return p.ptr(); }) == slot ) {
380                                 m_pNode = pNode;
381                                 m_idx = idx;
382                                 return;
383                             }
384                         }
385                     }
386                     else {
387                         // up to parent node
388                         if ( pNode->pParent ) {
389                             idx = pNode->m_idx - 1;
390                             pNode = pNode->pParent;
391                             nodeSize = pNode->pParent ? arrayNodeSize : headSize;
392                         }
393                         else {
394                             // rend()
395                             assert( pNode == m_set->m_Head );
396                             assert( idx == endIdx );
397                             m_pNode = pNode;
398                             m_idx = idx;
399                             return;
400                         }
401                     }
402                 }
403             }
404             //@endcond
405         };
406
407     public:
408         typedef bidirectional_iterator<false>   iterator;       ///< bidiretional iterator type
409         typedef bidirectional_iterator<true>    const_iterator; ///< bidirectional const iterator type
410
411     private:
412         //@cond
413         metrics const     m_Metrics;     ///< Metrics
414         array_node *      m_Head;        ///< Head array
415         item_counter      m_ItemCounter; ///< Item counter
416         stat              m_Stat;        ///< Internal statistics
417         //@endcond
418
419     public:
420         /// Creates empty set
421         /**
422             @param head_bits: 2<sup>head_bits</sup> specifies the size of head array, minimum is 4.
423             @param array_bits: 2<sup>array_bits</sup> specifies the size of array node, minimum is 2.
424
425             Equation for \p head_bits and \p array_bits:
426             \code
427             sizeof(hash_type) * 8 == head_bits + N * array_bits
428             \endcode
429             where \p N is multi-level array depth.
430         */
431         MultiLevelHashSet( size_t head_bits = 8, size_t array_bits = 4 )
432             : m_Metrics(make_metrics( head_bits, array_bits ))
433             , m_Head( alloc_head_node())
434         {}
435
436         /// Destructs the set and frees all data
437         ~MultiLevelHashSet()
438         {
439             destroy_tree();
440             free_array_node( m_Head );
441         }
442
443         /// Inserts new node
444         /**
445             The function inserts \p val in the set if it does not contain 
446             an item with that hash.
447
448             Returns \p true if \p val is placed into the set, \p false otherwise.
449         */
450         bool insert( value_type& val )
451         {
452             return insert( val, [](value_type&) {} );
453         }
454
455         /// Inserts new node
456         /**
457             This function is intended for derived non-intrusive containers.
458
459             The function allows to split creating of new item into two part:
460             - create item with key only
461             - insert new item into the set
462             - if inserting is success, calls \p f functor to initialize \p val.
463
464             The functor signature is:
465             \code
466                 void func( value_type& val );
467             \endcode
468             where \p val is the item inserted.
469
470             The user-defined functor is called only if the inserting is success.
471
472             @warning See \ref cds_intrusive_item_creating "insert item troubleshooting".
473         */
474         template <typename Func>
475         bool insert( value_type& val, Func f )
476         {
477             hash_type const& hash = hash_accessor()( val );
478             hash_splitter splitter( hash );
479             hash_comparator cmp;
480             typename gc::Guard guard;
481             back_off bkoff;
482
483             size_t nOffset = m_Metrics.head_node_size_log;
484             array_node * pArr = m_Head;
485             size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
486             assert( nSlot < m_Metrics.head_node_size );
487
488             while ( true ) {
489                 node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
490                 if ( slot.bits() == flag_array_node ) {
491                     // array node, go down the tree
492                     assert( slot.ptr() != nullptr );
493                     nSlot = splitter.cut( m_Metrics.array_node_size_log );
494                     assert( nSlot < m_Metrics.array_node_size );
495                     pArr = to_array( slot.ptr() );
496                     nOffset += m_Metrics.array_node_size_log;
497                 }
498                 else if ( slot.bits() == flag_array_converting ) {
499                     // the slot is converting to array node right now
500                     bkoff();
501                     m_Stat.onSlotConverting();
502                 }
503                 else {
504                     // data node
505                     assert(slot.bits() == 0 );
506
507                     // protect data node by hazard pointer
508                     if ( guard.protect( pArr->nodes[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
509                         // slot value has been changed - retry
510                         m_Stat.onSlotChanged();
511                     }
512                     else if ( slot.ptr() ) {
513                         if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) {
514                             // the item with that hash value already exists
515                             m_Stat.onInsertFailed();
516                             return false;
517                         }
518
519                         // the slot must be expanded
520                         expand_slot( pArr, nSlot, slot, nOffset );
521                     }
522                     else {
523                         // the slot is empty, try to insert data node
524                         node_ptr pNull;
525                         if ( pArr->nodes[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
526                         {
527                             // the new data node has been inserted
528                             f( val );
529                             ++m_ItemCounter;
530                             m_Stat.onInsertSuccess();
531                             return true;
532                         }
533
534                         // insert failed - slot has been changed by another thread
535                         // retry inserting
536                         m_Stat.onInsertRetry();
537                     }
538                 }
539             } // while
540         }
541
542         /// Updates the node
543         /**
544             Performs inserting or updating the item with hash value equal to \p val.
545             - If hash value is found then existing item is replaced with \p val, old item is disposed
546               with \p Traits::disposer. Note that the disposer is called by \p GC asynchronously.
547               The function returns <tt> std::pair<true, false> </tt>
548             - If hash value is not found and \p bInsert is \p true then \p val is inserted,
549               the function returns <tt> std::pair<true, true> </tt>
550             - If hash value is not found and \p bInsert is \p false then the set is unchanged,
551               the function returns <tt> std::pair<false, false> </tt>
552
553             Returns <tt> std::pair<bool, bool> </tt> where \p first is \p true if operation is successfull
554             (i.e. the item has been inserted or updated),
555             \p second is \p true if new item has been added or \p false if the set contains that hash.
556         */
557         std::pair<bool, bool> update( value_type& val, bool bInsert = true )
558         {
559             hash_type const& hash = hash_accessor()( val );
560             hash_splitter splitter( hash );
561             hash_comparator cmp;
562             typename gc::Guard guard;
563             back_off bkoff;
564
565             array_node * pArr = m_Head;
566             size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
567             assert( nSlot < m_Metrics.head_node_size );
568             size_t nOffset = m_Metrics.head_node_size_log;
569
570             while ( true ) {
571                 node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
572                 if ( slot.bits() == flag_array_node ) {
573                     // array node, go down the tree
574                     assert( slot.ptr() != nullptr );
575                     nSlot = splitter.cut( m_Metrics.array_node_size_log );
576                     assert( nSlot < m_Metrics.array_node_size );
577                     pArr = to_array( slot.ptr() );
578                     nOffset += m_Metrics.array_node_size_log;
579                 }
580                 else if ( slot.bits() == flag_array_converting ) {
581                     // the slot is converting to array node right now
582                     bkoff();
583                     m_Stat.onSlotConverting();
584                 }
585                 else {
586                     // data node
587                     assert(slot.bits() == 0 );
588
589                     // protect data node by hazard pointer
590                     if ( guard.protect( pArr->nodes[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
591                         // slot value has been changed - retry
592                         m_Stat.onSlotChanged();
593                     }
594                     else if ( slot.ptr() ) {
595                         if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) {
596                             // the item with that hash value already exists
597                             // Replace it with val
598                             if ( slot.ptr() == &val ) {
599                                 m_Stat.onUpdateExisting();
600                                 return std::make_pair( true, false );
601                             }
602
603                             if ( pArr->nodes[nSlot].compare_exchange_strong( slot, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
604                                 // slot can be disposed
605                                 gc::template retire<disposer>( slot.ptr() );
606                                 m_Stat.onUpdateExisting();
607                                 return std::make_pair( true, false );
608                             }
609
610                             m_Stat.onUpdateRetry();
611                             continue;
612                         }
613
614                         // the slot must be expanded
615                         expand_slot( pArr, nSlot, slot, nOffset );
616                     }
617                     else {
618                         // the slot is empty, try to insert data node
619                         if ( bInsert ) {
620                             node_ptr pNull;
621                             if ( pArr->nodes[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
622                             {
623                                 // the new data node has been inserted
624                                 ++m_ItemCounter;
625                                 m_Stat.onUpdateNew();
626                                 return std::make_pair( true, true );
627                             }
628                         }
629                         else {
630                             m_Stat.onUpdateFailed();
631                             return std::make_pair( false, false );
632                         }
633
634                         // insert failed - slot has been changed by another thread
635                         // retry updating
636                         m_Stat.onUpdateRetry();
637                     }
638                 }
639             } // while
640         }
641
642         /// Unlinks the item \p val from the set
643         /**
644             The function searches the item \p val in the set and unlink it
645             if it is found and its address is equal to <tt>&val</tt>.
646
647             The function returns \p true if success and \p false otherwise.
648         */
649         bool unlink( value_type const& val )
650         {
651             typename gc::Guard guard;
652             auto pred = [&val](value_type const& item) -> bool { return &item == &val; };
653             value_type * p = do_erase( hash_accessor()( val ), guard, std::ref( pred ));
654
655             // p is guarded by HP
656             if ( p ) {
657                 gc::template retire<disposer>( p );
658                 --m_ItemCounter;
659                 m_Stat.onEraseSuccess();
660                 return true;
661             }
662             return false;
663         }
664
665         /// Deletes the item from the set
666         /** 
667             The function searches \p hash in the set,
668             unlinks the item found, and returns \p true.
669             If that item is not found the function returns \p false.
670
671             The \ref disposer specified in \p Traits is called by garbage collector \p GC asynchronously.
672
673         */
674         bool erase( hash_type const& hash )
675         {
676             return erase(hash, [](value_type const&) {} );
677         }
678
679         /// Deletes the item from the set
680         /**
681             The function searches \p hash in the set,
682             call \p f functor with item found, and unlinks it from the set.
683             The \ref disposer specified in \p Traits is called
684             by garbage collector \p GC asynchronously.
685
686             The \p Func interface is
687             \code
688             struct functor {
689                 void operator()( value_type& item );
690             };
691             \endcode
692
693             If \p hash is not found the function returns \p false.
694         */
695         template <typename Func>
696         bool erase( hash_type const& hash, Func f )
697         {
698             typename gc::Guard guard;
699             value_type * p = do_erase( hash, guard, []( value_type const&) -> bool {return true; } );
700
701             // p is guarded by HP
702             if ( p ) {
703                 gc::template retire<disposer>( p );
704                 --m_ItemCounter;
705                 f( *p );
706                 m_Stat.onEraseSuccess();
707                 return true;
708             }
709             return false;
710         }
711
712         /// Extracts the item with specified \p hash
713         /** 
714             The function searches \p hash in the set,
715             unlinks it from the set, and returns an guarded pointer to the item extracted.
716             If \p hash is not found the function returns an empty guarded pointer.
717
718             The \p disposer specified in \p Traits class' template parameter is called automatically
719             by garbage collector \p GC when returned \ref guarded_ptr object to be destroyed or released.
720             @note Each \p guarded_ptr object uses the GC's guard that can be limited resource.
721
722             Usage:
723             \code
724             typedef cds::intrusive::MultiLevelHashSet< your_template_args > my_set;
725             my_set theSet;
726             // ...
727             {
728                 my_set::guarded_ptr gp( theSet.extract( 5 ));
729                 if ( gp ) {
730                     // Deal with gp
731                     // ...
732                 }
733                 // Destructor of gp releases internal HP guard
734             }
735             \endcode
736         */
737         guarded_ptr extract( hash_type const& hash )
738         {
739             guarded_ptr gp;
740             {
741                 typename gc::Guard guard;
742                 value_type * p = do_erase( hash, guard, []( value_type const&) -> bool {return true; } );
743
744                 // p is guarded by HP
745                 if ( p ) {
746                     gc::template retire<disposer>( p );
747                     --m_ItemCounter;
748                     m_Stat.onEraseSuccess();
749                     gp.reset( p );
750                 }
751             }
752             return gp;
753         }
754
755         /// Finds an item by it's \p hash
756         /**
757             The function searches the item by \p hash and calls the functor \p f for item found.
758             The interface of \p Func functor is:
759             \code
760             struct functor {
761                 void operator()( value_type& item );
762             };
763             \endcode
764             where \p item is the item found.
765
766             The functor may change non-key fields of \p item. Note that the functor is only guarantee
767             that \p item cannot be disposed during the functor is executing.
768             The functor does not serialize simultaneous access to the set's \p item. If such access is
769             possible you must provide your own synchronization schema on item level to prevent unsafe item modifications.
770
771             The function returns \p true if \p hash is found, \p false otherwise.
772         */
773         template <typename Func>
774         bool find( hash_type const& hash, Func f )
775         {
776             typename gc::Guard guard;
777             value_type * p = search( hash, guard );
778
779             // p is guarded by HP
780             if ( p ) {
781                 f( *p );
782                 return true;
783             }
784             return false;
785         }
786
787         /// Checks whether the set contains \p hash
788         /**
789             The function searches the item by its \p hash
790             and returns \p true if it is found, or \p false otherwise.
791         */
792         bool contains( hash_type const& hash )
793         {
794             return find( hash, [](value_type&) {} );
795         }
796
797         /// Finds an item by it's \p hash and returns the item found
798         /**
799             The function searches the item by its \p hash
800             and returns the guarded pointer to the item found.
801             If \p hash is not found the function returns an empty \p guarded_ptr.
802
803             @note Each \p guarded_ptr object uses one GC's guard which can be limited resource.
804
805             Usage:
806             \code
807             typedef cds::intrusive::MultiLevelHashSet< your_template_params >  my_set;
808             my_set theSet;
809             // ...
810             {
811                 my_set::guarded_ptr gp( theSet.get( 5 ));
812                 if ( theSet.get( 5 )) {
813                     // Deal with gp
814                     //...
815                 }
816                 // Destructor of guarded_ptr releases internal HP guard
817             }
818             \endcode
819         */
820         guarded_ptr get( hash_type const& hash )
821         {
822             guarded_ptr gp;
823             {
824                 typename gc::Guard guard;
825                 gp.reset( search( hash, guard ));
826             }
827             return gp;
828         }
829
830         /// Clears the set (non-atomic)
831         /**
832             The function unlink all data node from the set.
833             The function is not atomic but is thread-safe.
834             After \p %clear() the set may not be empty because another threads may insert items.
835
836             For each item the \p disposer is called after unlinking.
837         */
838         void clear()
839         {
840             clear_array( m_Head, head_size() );
841         }
842
843         /// Checks if the set is empty
844         /**
845             Emptiness is checked by item counting: if item count is zero then the set is empty.
846             Thus, the correct item counting feature is an important part of the set implementation.
847         */
848         bool empty() const
849         {
850             return size() == 0;
851         }
852
853         /// Returns item count in the set
854         size_t size() const
855         {
856             return m_ItemCounter;
857         }
858
859         /// Returns const reference to internal statistics
860         stat const& statistics() const
861         {
862             return m_Stat;
863         }
864
865         /// Returns the size of head node
866         size_t head_size() const
867         {
868             return m_Metrics.head_node_size;
869         }
870
871         /// Returns the size of the array node
872         size_t array_node_size() const
873         {
874             return m_Metrics.array_node_size;
875         }
876
877     public:
878         /// Returns an iterator to the beginning of the set
879         iterator begin()
880         {
881             return iterator( *this, m_Head, size_t(0) - 1, true );
882         }
883
884         /// Returns an const iterator to the beginning of the set
885         const_iterator begin() const
886         {
887             return const_iterator( *this, m_Head, size_t(0) - 1, true );
888         }
889
890         /// Returns an const iterator to the beginning of the set
891         const_iterator cbegin()
892         {
893             return const_iterator( *this, m_Head, size_t(0) - 1, true );
894         }
895
896         /// Returns an iterator to the end of the set
897         iterator end()
898         {
899             return iterator( *this, m_Head, head_size() - 1, true );
900         }
901
902         /// Returns an const iterator to the end of the set
903         const_iterator end() const
904         {
905             return const_iterator( *this, m_Head, head_size() - 1, true );
906         }
907
908         /// Returns an const iterator to the end of the set
909         const_iterator cend()
910         {
911             return const_iterator( *this, m_Head, 0, false );
912         }
913
914     private:
915         //@cond
916         static metrics make_metrics( size_t head_bits, size_t array_bits )
917         {
918             size_t const hash_bits = sizeof( hash_type ) * 8;
919
920             if ( array_bits < 2 )
921                 array_bits = 2;
922             if ( head_bits < 4 )
923                 head_bits = 4;
924             if ( head_bits > hash_bits )
925                 head_bits = hash_bits;
926             if ( (hash_bits - head_bits) % array_bits != 0 )
927                 head_bits += (hash_bits - head_bits) % array_bits;
928
929             assert( (hash_bits - head_bits) % array_bits == 0 );
930
931             metrics m;
932             m.head_node_size_log = head_bits;
933             m.head_node_size = size_t(1) << head_bits;
934             m.array_node_size_log = array_bits;
935             m.array_node_size = size_t(1) << array_bits;
936             return m;
937         }
938
939         array_node * alloc_head_node() const
940         {
941             return alloc_array_node( head_size(), nullptr, 0 );
942         }
943
944         array_node * alloc_array_node( array_node * pParent, size_t idxParent ) const
945         {
946             return alloc_array_node( array_node_size(), pParent, idxParent );
947         }
948
949         static array_node * alloc_array_node( size_t nSize, array_node * pParent, size_t idxParent )
950         {
951             array_node * pNode = cxx_array_node_allocator().NewBlock( sizeof(array_node) + sizeof(atomic_node_ptr) * (nSize - 1), pParent, idxParent );
952             for ( atomic_node_ptr * p = pNode->nodes, *pEnd = pNode->nodes + nSize; p < pEnd; ++p )
953                 p->store( node_ptr(), memory_model::memory_order_release );
954             return pNode;
955         }
956
957         static void free_array_node( array_node * parr )
958         {
959             cxx_array_node_allocator().Delete( parr );
960         }
961
962         void destroy_tree()
963         {
964             // The function is not thread-safe. For use in dtor only
965             // Remove data node
966             clear();
967
968             // Destroy all array nodes
969             destroy_array_nodes( m_Head, head_size());
970         }
971
972         void destroy_array_nodes( array_node * pArr, size_t nSize )
973         {
974             for ( atomic_node_ptr * p = pArr->nodes, *pLast = pArr->nodes + nSize; p != pLast; ++p ) {
975                 node_ptr slot = p->load( memory_model::memory_order_acquire );
976                 if ( slot.bits() == flag_array_node ) {
977                     destroy_array_nodes(to_array(slot.ptr()), array_node_size());
978                     free_array_node( to_array(slot.ptr()));
979                     p->store(node_ptr(), memory_model::memory_order_relaxed );
980                 }
981             }
982         }
983
984         void clear_array( array_node * pArrNode, size_t nSize )
985         {
986             back_off bkoff;
987
988
989             for ( atomic_node_ptr * pArr = pArrNode->nodes, *pLast = pArr + nSize; pArr != pLast; ++pArr ) {
990                 while ( true ) {
991                     node_ptr slot = pArr->load( memory_model::memory_order_acquire );
992                     if ( slot.bits() == flag_array_node ) {
993                         // array node, go down the tree
994                         assert( slot.ptr() != nullptr );
995                         clear_array( to_array( slot.ptr()), array_node_size() );
996                         break;
997                     }
998                     else if ( slot.bits() == flag_array_converting ) {
999                         // the slot is converting to array node right now
1000                         while ( (slot = pArr->load(memory_model::memory_order_acquire)).bits() == flag_array_converting ) {
1001                             bkoff();
1002                             m_Stat.onSlotConverting();
1003                         }
1004                         bkoff.reset();
1005
1006                         assert( slot.ptr() != nullptr );
1007                         assert(slot.bits() == flag_array_node );
1008                         clear_array( to_array( slot.ptr()), array_node_size() );
1009                         break;
1010                     }
1011                     else {
1012                         // data node
1013                         if ( pArr->compare_exchange_strong( slot, node_ptr(), memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
1014                             if ( slot.ptr() ) {
1015                                 gc::template retire<disposer>( slot.ptr() );
1016                                 --m_ItemCounter;
1017                                 m_Stat.onEraseSuccess();
1018                             }
1019                             break;
1020                         }
1021                     }
1022                 }
1023             }
1024         }
1025
1026         union converter {
1027             value_type * pData;
1028             array_node * pArr;
1029
1030             converter( value_type * p )
1031                 : pData( p )
1032             {}
1033
1034             converter( array_node * p )
1035                 : pArr( p )
1036             {}
1037         };
1038
1039         static array_node * to_array( value_type * p )
1040         {
1041             return converter( p ).pArr;
1042         }
1043         static value_type * to_node( array_node * p )
1044         {
1045             return converter( p ).pData;
1046         }
1047
1048         bool expand_slot( array_node * pParent, size_t idxParent, node_ptr current, size_t nOffset )
1049         {
1050             assert( current.bits() == 0 );
1051             assert( current.ptr() );
1052
1053             size_t idx = hash_splitter(hash_accessor()(*current.ptr()), nOffset).cut( m_Metrics.array_node_size_log );
1054             array_node * pArr = alloc_array_node( pParent, idxParent );
1055
1056             node_ptr cur(current.ptr());
1057             atomic_node_ptr& slot = pParent->nodes[idxParent];
1058             if ( !slot.compare_exchange_strong( cur, cur | flag_array_converting, memory_model::memory_order_release, atomics::memory_order_relaxed )) 
1059             {
1060                 m_Stat.onExpandNodeFailed();
1061                 free_array_node( pArr );
1062                 return false;
1063             }
1064
1065             pArr->nodes[idx].store( current, memory_model::memory_order_release );
1066
1067             cur = cur | flag_array_converting;
1068             CDS_VERIFY( 
1069                 slot.compare_exchange_strong( cur, node_ptr( to_node( pArr ), flag_array_node ), memory_model::memory_order_release, atomics::memory_order_relaxed )
1070             );
1071
1072             m_Stat.onArrayNodeCreated();
1073             return true;
1074         }
1075
1076         value_type * search( hash_type const& hash, typename gc::Guard& guard )
1077         {
1078             hash_splitter splitter( hash );
1079             hash_comparator cmp;
1080             back_off bkoff;
1081
1082             array_node * pArr = m_Head;
1083             size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
1084             assert( nSlot < m_Metrics.head_node_size );
1085
1086             while ( true ) {
1087                 node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
1088                 if ( slot.bits() == flag_array_node ) {
1089                     // array node, go down the tree
1090                     assert( slot.ptr() != nullptr );
1091                     nSlot = splitter.cut( m_Metrics.array_node_size_log );
1092                     assert( nSlot < m_Metrics.array_node_size );
1093                     pArr = to_array( slot.ptr() );
1094                 }
1095                 else if ( slot.bits() == flag_array_converting ) {
1096                     // the slot is converting to array node right now
1097                     bkoff();
1098                     m_Stat.onSlotConverting();
1099                 }
1100                 else {
1101                     // data node
1102                     assert(slot.bits() == 0 );
1103
1104                     // protect data node by hazard pointer
1105                     if ( guard.protect( pArr->nodes[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
1106                         // slot value has been changed - retry
1107                         m_Stat.onSlotChanged();
1108                     }
1109                     else if ( slot.ptr() && cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) {
1110                         // item found
1111                         m_Stat.onFindSuccess();
1112                         return slot.ptr();
1113                     }
1114                     m_Stat.onFindFailed();
1115                     return nullptr;
1116                 }
1117             } // while
1118         }
1119
1120         template <typename Predicate>
1121         value_type * do_erase( hash_type const& hash, typename gc::Guard& guard, Predicate pred )
1122         {
1123             hash_splitter splitter( hash );
1124             hash_comparator cmp;
1125             back_off bkoff;
1126
1127             array_node * pArr = m_Head;
1128             size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
1129             assert( nSlot < m_Metrics.head_node_size );
1130
1131             while ( true ) {
1132                 node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
1133                 if ( slot.bits() == flag_array_node ) {
1134                     // array node, go down the tree
1135                     assert( slot.ptr() != nullptr );
1136                     nSlot = splitter.cut( m_Metrics.array_node_size_log );
1137                     assert( nSlot < m_Metrics.array_node_size );
1138                     pArr = to_array( slot.ptr() );
1139                 }
1140                 else if ( slot.bits() == flag_array_converting ) {
1141                     // the slot is converting to array node right now
1142                     bkoff();
1143                     m_Stat.onSlotConverting();
1144                 }
1145                 else {
1146                     // data node
1147                     assert(slot.bits() == 0 );
1148
1149                     // protect data node by hazard pointer
1150                     if ( guard.protect( pArr->nodes[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
1151                         // slot value has been changed - retry
1152                         m_Stat.onSlotChanged();
1153                     }
1154                     else if ( slot.ptr() ) {
1155                         if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 && pred( *slot.ptr() )) {
1156                             // item found - replace it with nullptr
1157                             if ( pArr->nodes[nSlot].compare_exchange_strong(slot, node_ptr(nullptr), memory_model::memory_order_acquire, atomics::memory_order_relaxed))
1158                                 return slot.ptr();
1159                             m_Stat.onEraseRetry();
1160                             continue;
1161                         }
1162                         m_Stat.onEraseFailed();
1163                         return nullptr;
1164                     }
1165                     else {
1166                         // the slot is empty
1167                         m_Stat.onEraseFailed();
1168                         return nullptr;
1169                     }
1170                 }
1171             } // while
1172         }
1173
1174         //@endcond
1175     };
1176 }} // namespace cds::intrusive
1177
1178 #endif // #ifndef CDSLIB_INTRUSIVE_IMPL_MULTILEVEL_HASHSET_H