intrusive MultiLevelHashSet: added reverse iterator support
[libcds.git] / cds / intrusive / impl / multilevel_hashset.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_INTRUSIVE_IMPL_MULTILEVEL_HASHSET_H
4 #define CDSLIB_INTRUSIVE_IMPL_MULTILEVEL_HASHSET_H
5
6 #include <tuple> // std::tie
7 #include <functional> // std::ref
8
9 #include <cds/intrusive/details/multilevel_hashset_base.h>
10 #include <cds/details/allocator.h>
11
12 namespace cds { namespace intrusive {
13     /// Intrusive hash set based on multi-level array
14     /** @ingroup cds_intrusive_map
15         @anchor cds_intrusive_MultilevelHashSet_hp
16
17         Source:
18         - [2013] Steven Feldman, Pierre LaBorde, Damian Dechev "Concurrent Multi-level Arrays:
19                  Wait-free Extensible Hash Maps"
20
21         [From the paper] The hardest problem encountered while developing a parallel hash map is how to perform
22         a global resize, the process of redistributing the elements in a hash map that occurs when adding new
23         buckets. The negative impact of blocking synchronization is multiplied during a global resize, because all
24         threads will be forced to wait on the thread that is performing the involved process of resizing the hash map
25         and redistributing the elements. \p %MultilevelHashSet implementation avoids global resizes through new array
26         allocation. By allowing concurrent expansion this structure is free from the overhead of an explicit resize,
27         which facilitates concurrent operations.
28
29         The presented design includes dynamic hashing, the use of sub-arrays within the hash map data structure;
30         which, in combination with <b>perfect hashing</b>, means that each element has a unique final, as well as current, position.
31         It is important to note that the perfect hash function required by our hash map is trivial to realize as
32         any hash function that permutes the bits of the key is suitable. This is possible because of our approach
33         to the hash function; we require that it produces hash values that are equal in size to that of the key.
34         We know that if we expand the hash map a fixed number of times there can be no collision as duplicate keys
35         are not provided for in the standard semantics of a hash map.
36
37         \p %MultiLevelHashSet is a multi-level array whitch has a structure similar to a tree:
38         @image html multilevel_hashset.png
39         The multi-level array differs from a tree in that each position on the tree could hold an array of nodes or a single node.
40         A position that holds a single node is a \p dataNode which holds the hash value of a key and the value that is associated\r
41         with that key; it is a simple struct holding two variables. A \p dataNode in the multi-level array could be marked.\r
42         A \p markedDataNode refers to a pointer to a \p dataNode that has been bitmarked at the least significant bit (LSB)\r
43         of the pointer to the node. This signifies that this \p dataNode is contended. An expansion must occur at this node;\r
44         any thread that sees this \p markedDataNode will try to replace it with an \p arrayNode; which is a position that holds\r
45         an array of nodes. The pointer to an \p arrayNode is differentiated from that of a pointer to a \p dataNode by a bitmark\r
46         on the second-least significant bit.\r
47 \r
48         \p %MultiLevelHashSet multi-level array is similar to a tree in that we keep a pointer to the root, which is a memory array\r
49         called \p head. The length of the \p head memory array is unique, whereas every other \p arrayNode has a uniform length;\r
50         a normal \p arrayNode has a fixed power-of-two length equal to the binary logarithm of a variable called \p arrayLength.\r
51         The maximum depth of the tree, \p maxDepth, is the maximum number of pointers that must be followed to reach any node.\r
52         We define \p currentDepth as the number of memory arrays that we need to traverse to reach the \p arrayNode on which\r
53         we need to operate; this is initially one, because of \p head.\r
54 \r
55         That approach to the structure of the hash map uses an extensible hashing scheme; <b> the hash value is treated as a bit\r
56         string</b> and rehash incrementally.\r
57 \r
58         @note Two important things you should keep in mind when you're using \p %MultiLevelHashSet:\r
59         - all keys must be fixed-size. It means that you cannot use \p std::string as a key for \p %MultiLevelHashSet.\r
60           Instead, for the strings you should use well-known hashing algorithms like <a href="https://en.wikipedia.org/wiki/Secure_Hash_Algorithm">SHA1, SHA2</a>,\r
61           <a href="https://en.wikipedia.org/wiki/MurmurHash">MurmurHash</a>, <a href="https://en.wikipedia.org/wiki/CityHash">CityHash</a> \r
62           or its successor <a href="https://code.google.com/p/farmhash/">FarmHash</a> and so on, which\r
63           converts variable-length strings to fixed-length bit-strings, and use that hash as a key in \p %MultiLevelHashSet.\r
64         - \p %MultiLevelHashSet uses a perfect hashing. It means that if two different keys, for example, of type \p std::string,\r
65           have identical hash then you cannot insert both that keys in the set. \p %MultiLevelHashSet does not maintain the key,\r
66           it maintains its fixed-size hash value.\r
67 \r
68         The set supports @ref cds_intrusive_MultilevelHashSet_iterators "bidirectional thread-safe iterators".\r
69 \r
70         Template parameters:\r
71         - \p GC - safe memory reclamation schema. Can be \p gc::HP, \p gc::DHP or one of \ref cds_urcu_type "RCU type"\r
72         - \p T - a value type to be stored in the set\r
73         - \p Traits - type traits, the structure based on \p multilevel_hashset::traits or result of \p multilevel_hashset::make_traits metafunction\r
74 \r
75         There are several specializations of \p %MultiLevelHashSet for each \p GC. You should include:
76         - <tt><cds/intrusive/multilevel_hashset_hp.h></tt> for \p gc::HP garbage collector
77         - <tt><cds/intrusive/multilevel_hashset_dhp.h></tt> for \p gc::DHP garbage collector
78         - <tt><cds/intrusive/multilevel_hashset_rcu.h></tt> for \ref cds_intrusive_MultiLevelHashSet_rcu "RCU type"
79     */
80     template <
81         class GC
82         ,typename T
83 #ifdef CDS_DOXYGEN_INVOKED
84        ,typename Traits = multilevel_hashset::traits
85 #else
86        ,typename Traits
87 #endif
88     >
89     class MultiLevelHashSet
90     {
91     public:
92         typedef GC      gc;         ///< Garbage collector
93         typedef T       value_type; ///< type of value stored in the set
94         typedef Traits  traits;     ///< Traits template parameter, see \p multilevel_hashset::traits
95
96         typedef typename traits::hash_accessor hash_accessor; ///< Hash accessor functor
97         static_assert(!std::is_same< hash_accessor, cds::opt::none >::value, "hash_accessor functor must be specified" );
98
99         /// Hash type defined as \p hash_accessor return type
100         typedef typename std::decay< 
101             typename std::remove_reference<
102                 decltype( hash_accessor()( std::declval<T>()) )
103             >::type
104         >::type hash_type;
105         static_assert( !std::is_pointer<hash_type>::value, "hash_accessor should return a reference to hash value" );
106
107         typedef typename traits::disposer disposer; ///< data node disposer
108
109 #   ifdef CDS_DOXYGEN_INVOKED
110         typedef implementation_defined hash_comparator  ;    ///< hash compare functor based on opt::compare and opt::less option setter
111 #   else
112         typedef typename cds::opt::details::make_comparator_from< 
113             hash_type,
114             traits,
115             multilevel_hashset::bitwise_compare< hash_type >
116         >::type hash_comparator;
117 #   endif
118
119         typedef typename traits::item_counter   item_counter;   ///< Item counter type
120         typedef typename traits::node_allocator node_allocator; ///< Array node allocator
121         typedef typename traits::memory_model   memory_model;   ///< Memory model
122         typedef typename traits::back_off       back_off;       ///< Backoff strategy
123         typedef typename traits::stat           stat;           ///< Internal statistics type
124
125         typedef typename gc::template guarded_ptr< value_type > guarded_ptr; ///< Guarded pointer
126
127         /// Count of hazard pointers required
128         static CDS_CONSTEXPR size_t const c_nHazardPtrCount = 1;
129
130         /// Node marked poiter
131         typedef cds::details::marked_ptr< value_type, 3 > node_ptr;
132         /// Array node element
133         typedef atomics::atomic< node_ptr > atomic_node_ptr;
134
135     protected:
136         //@cond
137         enum node_flags {
138             flag_array_converting = 1,   ///< the cell is converting from data node to an array node
139             flag_array_node = 2          ///< the cell is a pointer to an array node
140         };
141
142         struct array_node {
143             array_node * const  pParent;    ///< parent array node
144             size_t const        idxParent;  ///< index in parent array node
145             atomic_node_ptr     nodes[1];   ///< node array
146
147             array_node( array_node * parent, size_t idx )
148                 : pParent( parent )
149                 , idxParent( idx )
150             {}
151
152             array_node() = delete;
153             array_node( array_node const&) = delete;
154             array_node( array_node&& ) = delete;
155         };
156
157         typedef cds::details::Allocator< array_node, node_allocator > cxx_array_node_allocator;
158
159         typedef multilevel_hashset::details::hash_splitter< hash_type > hash_splitter;
160
161         struct metrics {
162             size_t  head_node_size;     // power-of-two
163             size_t  head_node_size_log; // log2( head_node_size )
164             size_t  array_node_size;    // power-of-two
165             size_t  array_node_size_log;// log2( array_node_size )
166         };
167         //@endcond
168
169     protected:
170         //@cond
171         /// Bidirectional iterator class
172         template <bool IsConst>
173         class bidirectional_iterator
174         {
175             friend class MultLevelHashSet;
176
177             array_node *        m_pNode;    ///< current array node
178             size_t              m_idx;      ///< current position in m_pNode
179             typename gc::Guard  m_guard;    ///< HP guard
180             MultiLevelHashSet const*  m_set;    ///< Hash set
181
182         public:
183             typedef typename std::conditional< IsConst, value_type const*, value_type*>::type value_ptr; ///< Value pointer
184             typedef typename std::conditional< IsConst, value_type const&, value_type&>::type value_ref; ///< Value reference
185
186         public:
187             bidirectional_iterator() CDS_NOEXCEPT
188                 : m_pNode( nullptr )
189                 , m_idx( 0 )
190                 , m_set( nullptr )
191             {}
192
193             bidirectional_iterator( bidirectional_iterator const& rhs ) CDS_NOEXCEPT
194                 : m_pNode( rhs.m_pNode )
195                 , m_idx( rhs.m_idx )
196                 , m_set( rhs.m_set )
197             {
198                 m_guard.copy( rhs.m_guard );
199             }
200
201             bidirectional_iterator& operator=(bidirectional_iterator const& rhs) CDS_NOEXCEPT
202             {
203                 m_pNode = rhs.m_pNode;
204                 m_idx = rhs.m_idx;
205                 m_set = rhs.m_set;
206                 m_guard.copy( rhs.m_guard );
207                 return *this;
208             }
209
210             bidirectional_iterator& operator++()
211             {
212                 forward();
213                 return *this;
214             }
215
216             bidirectional_iterator& operator--()
217             {
218                 backward();
219                 return *this;
220             }
221
222             value_ptr operator ->() const CDS_NOEXCEPT
223             {
224                 return pointer();
225             }
226
227             value_ref operator *() const CDS_NOEXCEPT
228             {
229                 value_ptr p = pointer();
230                 assert( p );
231                 return *p;
232             }
233
234             void release()
235             {
236                 m_guard.clear();
237             }
238
239             template <bool IsConst1, bool IsConst2>
240             friend bool operator ==(bidirectional_iterator<IsConst1> const& lhs, bidirectional_iterator<IsConst2> const& rhs)
241             {
242                 return lhs.m_pNode == rhs.m_pNode && lhs.m_idx == rhs.m_idx && lhs.m_set == rhs.m_set;
243             }
244
245             template <bool IsConst1, bool IsConst2>
246             friend bool operator !=(bidirectional_iterator<IsConst1> const& lhs, bidirectional_iterator<IsConst2> const& rhs)
247             {
248                 return !( lhs == rhs );
249             }
250
251         protected:
252             bidirectional_iterator( MultiLevelHashSet& set, array_node * pNode, size_t idx, bool )
253                 : m_pNode( pNode )
254                 , m_idx( idx )
255                 , m_set( &set )
256             {}
257
258             bidirectional_iterator( MultiLevelHashSet& set, array_node * pNode, size_t idx )
259                 : m_pNode( pNode )
260                 , m_idx( idx )
261                 , m_set( &set )
262             {
263                 forward();
264             }
265
266             value_ptr pointer() const CDS_NOEXCEPT
267             {
268                 return m_guard.template get<value_ptr>();
269             }
270
271             void forward()
272             {
273                 assert( m_set != nullptr );
274                 assert( m_pNode != nullptr );
275
276                 size_t const arrayNodeSize = m_set->array_node_size();
277                 size_t const headSize = m_set->head_size();
278                 array_node * pNode = m_pNode;
279                 size_t idx = m_idx + 1;
280                 size_t nodeSize = m_pNode->pParent? arrayNodeSize : headSize;
281
282                 for ( ;; ) {
283                     if ( idx < nodeSize ) {
284                         node_ptr slot = pNode->nodes[idx].load( memory_model::memory_order_acquire );
285                         if ( slot.bits() == flag_array_node ) {
286                             // array node, go down the tree
287                             assert( slot.ptr() != nullptr );
288                             pNode = to_array( slot.ptr() );
289                             idx = 0;
290                             nodeSize = arrayNodeSize;
291                         }
292                         else if ( slot.bits() == flag_array_converting ) {
293                             // the slot is converting to array node right now - skip the node
294                             ++idx;
295                         }
296                         else if ( slot.ptr() ) {
297                             // data node
298                             if ( m_guard.protect( pNode->nodes[idx], [](node_ptr p) -> value_type * { return p.ptr(); }) == slot ) {
299                                 m_pNode = pNode;
300                                 m_idx = idx;
301                                 return;
302                             }
303                         }
304                     }
305                     else {
306                         // up to parent node
307                         if ( pNode->pParent ) {
308                             idx = pNode->m_idx + 1;
309                             pNode = pNode->pParent;
310                             nodeSize = pNode->pParent ? arrayNodeSize : headSize;
311                         }
312                         else {
313                             // end()
314                             assert( pNode == m_set->m_Head );
315                             assert( idx == headSize );
316                             m_pNode = pNode;
317                             m_idx = idx;
318                             return;
319                         }
320                     }
321                 }
322             }
323
324             void backward()
325             {
326                 assert( m_set != nullptr );
327                 assert( m_pNode != nullptr );
328
329                 size_t const arrayNodeSize = m_set->array_node_size();
330                 size_t const headSize = m_set->head_size();
331                 size_t const endIdx = size_t(0) - 1;
332
333                 array_node * pNode = m_pNode;
334                 size_t idx = m_idx - 1;
335                 size_t nodeSize = m_pNode->pParent? arrayNodeSize : headSize;
336
337                 for ( ;; ) {
338                     if ( idx != endIdx ) {
339                         node_ptr slot = pNode->nodes[idx].load( memory_model::memory_order_acquire );
340                         if ( slot.bits() == flag_array_node ) {
341                             // array node, go down the tree
342                             assert( slot.ptr() != nullptr );
343                             pNode = to_array( slot.ptr() );
344                             nodeSize = arrayNodeSize;
345                             idx = nodeSize - 1;
346                         }
347                         else if ( slot.bits() == flag_array_converting ) {
348                             // the slot is converting to array node right now - skip the node
349                             --idx;
350                         }
351                         else if ( slot.ptr() ) {
352                             // data node
353                             if ( m_guard.protect( pNode->nodes[idx], [](node_ptr p) -> value_type * { return p.ptr(); }) == slot ) {
354                                 m_pNode = pNode;
355                                 m_idx = idx;
356                                 return;
357                             }
358                         }
359                     }
360                     else {
361                         // up to parent node
362                         if ( pNode->pParent ) {
363                             idx = pNode->m_idx - 1;
364                             pNode = pNode->pParent;
365                             nodeSize = pNode->pParent ? arrayNodeSize : headSize;
366                         }
367                         else {
368                             // rend()
369                             assert( pNode == m_set->m_Head );
370                             assert( idx == endIdx );
371                             m_pNode = pNode;
372                             m_idx = idx;
373                             return;
374                         }
375                     }
376                 }
377             }
378         };
379
380         /// Reverse bidirectional iterator
381         template <bool IsConst>
382         class reverse_bidirectional_iterator : protected bidirectional_iterator<IsConst>
383         {
384             typedef bidirectional_iterator<IsConst> base_class;
385             friend class MultLevelHashSet;
386
387         public:
388             typedef typename base_class::value_ptr value_ptr;
389             typedef typename base_class::value_ref value_ref;
390
391         public:
392             reverse_bidirectional_iterator() CDS_NOEXCEPT
393                 : base_class()
394             {}
395
396             reverse_bidirectional_iterator( reverse_bidirectional_iterator const& rhs ) CDS_NOEXCEPT
397                 : base_class( rhs )
398             {}
399
400             reverse_bidirectional_iterator& operator=( reverse_bidirectional_iterator const& rhs) CDS_NOEXCEPT
401             {
402                 base_class::operator=( rhs );
403                 return *this;
404             }
405
406             bidirectional_iterator& operator++()
407             {
408                 base_class::backward();
409                 return *this;
410             }
411
412             bidirectional_iterator& operator--()
413             {
414                 base_class::forward();
415                 return *this;
416             }
417
418             value_ptr operator ->() const CDS_NOEXCEPT
419             {
420                 return base_class::operator->();
421             }
422
423             value_ref operator *() const CDS_NOEXCEPT
424             {
425                 return base_class::operator*();
426             }
427
428             void release()
429             {
430                 base_class::release();
431             }
432
433             template <bool IsConst1, bool IsConst2>
434             friend bool operator ==(reverse_bidirectional_iterator<IsConst1> const& lhs, reverse_bidirectional_iterator<IsConst2> const& rhs)
435             {
436                 return lhs.m_pNode == rhs.m_pNode && lhs.m_idx == rhs.m_idx && lhs.m_set == rhs.m_set;
437             }
438
439             template <bool IsConst1, bool IsConst2>
440             friend bool operator !=(reverse_bidirectional_iterator<IsConst1> const& lhs, reverse_bidirectional_iterator<IsConst2> const& rhs)
441             {
442                 return !( lhs == rhs );
443             }
444
445         private:
446             reverse_bidirectional_iterator( MultiLevelHashSet& set, array_node * pNode, size_t idx )
447                 : base_class( set, pNode, idx, false )
448             {
449                 base_class::backward();
450             }
451         };
452         //@endcond
453
454     public:
455         typedef bidirectional_iterator<false>   iterator;       ///< @ref cds_intrusive_MultilevelHashSet_iterators "bidirectional iterator" type
456         typedef bidirectional_iterator<true>    const_iterator; ///< @ref cds_intrusive_MultilevelHashSet_iterators "bidirectional const iterator" type
457         typedef reverse_bidirectional_iterator<false>   reverse_iterator;       ///< @ref cds_intrusive_MultilevelHashSet_iterators "bidirectional reverse iterator" type
458         typedef reverse_bidirectional_iterator<true>    const_reverse_iterator; ///< @ref cds_intrusive_MultilevelHashSet_iterators "bidirectional reverse const iterator" type
459
460     private:
461         //@cond
462         metrics const     m_Metrics;     ///< Metrics
463         array_node *      m_Head;        ///< Head array
464         item_counter      m_ItemCounter; ///< Item counter
465         stat              m_Stat;        ///< Internal statistics
466         //@endcond
467
468     public:
469         /// Creates empty set
470         /**
471             @param head_bits: 2<sup>head_bits</sup> specifies the size of head array, minimum is 4.
472             @param array_bits: 2<sup>array_bits</sup> specifies the size of array node, minimum is 2.
473
474             Equation for \p head_bits and \p array_bits:
475             \code
476             sizeof(hash_type) * 8 == head_bits + N * array_bits
477             \endcode
478             where \p N is multi-level array depth.
479         */
480         MultiLevelHashSet( size_t head_bits = 8, size_t array_bits = 4 )
481             : m_Metrics(make_metrics( head_bits, array_bits ))
482             , m_Head( alloc_head_node())
483         {}
484
485         /// Destructs the set and frees all data
486         ~MultiLevelHashSet()
487         {
488             destroy_tree();
489             free_array_node( m_Head );
490         }
491
492         /// Inserts new node
493         /**
494             The function inserts \p val in the set if it does not contain 
495             an item with that hash.
496
497             Returns \p true if \p val is placed into the set, \p false otherwise.
498         */
499         bool insert( value_type& val )
500         {
501             return insert( val, [](value_type&) {} );
502         }
503
504         /// Inserts new node
505         /**
506             This function is intended for derived non-intrusive containers.
507
508             The function allows to split creating of new item into two part:
509             - create item with key only
510             - insert new item into the set
511             - if inserting is success, calls \p f functor to initialize \p val.
512
513             The functor signature is:
514             \code
515                 void func( value_type& val );
516             \endcode
517             where \p val is the item inserted.
518
519             The user-defined functor is called only if the inserting is success.
520
521             @warning See \ref cds_intrusive_item_creating "insert item troubleshooting".
522         */
523         template <typename Func>
524         bool insert( value_type& val, Func f )
525         {
526             hash_type const& hash = hash_accessor()( val );
527             hash_splitter splitter( hash );
528             hash_comparator cmp;
529             typename gc::Guard guard;
530             back_off bkoff;
531
532             size_t nOffset = m_Metrics.head_node_size_log;
533             array_node * pArr = m_Head;
534             size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
535             assert( nSlot < m_Metrics.head_node_size );
536
537             while ( true ) {
538                 node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
539                 if ( slot.bits() == flag_array_node ) {
540                     // array node, go down the tree
541                     assert( slot.ptr() != nullptr );
542                     nSlot = splitter.cut( m_Metrics.array_node_size_log );
543                     assert( nSlot < m_Metrics.array_node_size );
544                     pArr = to_array( slot.ptr() );
545                     nOffset += m_Metrics.array_node_size_log;
546                 }
547                 else if ( slot.bits() == flag_array_converting ) {
548                     // the slot is converting to array node right now
549                     bkoff();
550                     m_Stat.onSlotConverting();
551                 }
552                 else {
553                     // data node
554                     assert(slot.bits() == 0 );
555
556                     // protect data node by hazard pointer
557                     if ( guard.protect( pArr->nodes[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
558                         // slot value has been changed - retry
559                         m_Stat.onSlotChanged();
560                     }
561                     else if ( slot.ptr() ) {
562                         if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) {
563                             // the item with that hash value already exists
564                             m_Stat.onInsertFailed();
565                             return false;
566                         }
567
568                         // the slot must be expanded
569                         expand_slot( pArr, nSlot, slot, nOffset );
570                     }
571                     else {
572                         // the slot is empty, try to insert data node
573                         node_ptr pNull;
574                         if ( pArr->nodes[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
575                         {
576                             // the new data node has been inserted
577                             f( val );
578                             ++m_ItemCounter;
579                             m_Stat.onInsertSuccess();
580                             return true;
581                         }
582
583                         // insert failed - slot has been changed by another thread
584                         // retry inserting
585                         m_Stat.onInsertRetry();
586                     }
587                 }
588             } // while
589         }
590
591         /// Updates the node
592         /**
593             Performs inserting or updating the item with hash value equal to \p val.
594             - If hash value is found then existing item is replaced with \p val, old item is disposed
595               with \p Traits::disposer. Note that the disposer is called by \p GC asynchronously.
596               The function returns <tt> std::pair<true, false> </tt>
597             - If hash value is not found and \p bInsert is \p true then \p val is inserted,
598               the function returns <tt> std::pair<true, true> </tt>
599             - If hash value is not found and \p bInsert is \p false then the set is unchanged,
600               the function returns <tt> std::pair<false, false> </tt>
601
602             Returns <tt> std::pair<bool, bool> </tt> where \p first is \p true if operation is successfull
603             (i.e. the item has been inserted or updated),
604             \p second is \p true if new item has been added or \p false if the set contains that hash.
605         */
606         std::pair<bool, bool> update( value_type& val, bool bInsert = true )
607         {
608             hash_type const& hash = hash_accessor()( val );
609             hash_splitter splitter( hash );
610             hash_comparator cmp;
611             typename gc::Guard guard;
612             back_off bkoff;
613
614             array_node * pArr = m_Head;
615             size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
616             assert( nSlot < m_Metrics.head_node_size );
617             size_t nOffset = m_Metrics.head_node_size_log;
618
619             while ( true ) {
620                 node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
621                 if ( slot.bits() == flag_array_node ) {
622                     // array node, go down the tree
623                     assert( slot.ptr() != nullptr );
624                     nSlot = splitter.cut( m_Metrics.array_node_size_log );
625                     assert( nSlot < m_Metrics.array_node_size );
626                     pArr = to_array( slot.ptr() );
627                     nOffset += m_Metrics.array_node_size_log;
628                 }
629                 else if ( slot.bits() == flag_array_converting ) {
630                     // the slot is converting to array node right now
631                     bkoff();
632                     m_Stat.onSlotConverting();
633                 }
634                 else {
635                     // data node
636                     assert(slot.bits() == 0 );
637
638                     // protect data node by hazard pointer
639                     if ( guard.protect( pArr->nodes[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
640                         // slot value has been changed - retry
641                         m_Stat.onSlotChanged();
642                     }
643                     else if ( slot.ptr() ) {
644                         if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) {
645                             // the item with that hash value already exists
646                             // Replace it with val
647                             if ( slot.ptr() == &val ) {
648                                 m_Stat.onUpdateExisting();
649                                 return std::make_pair( true, false );
650                             }
651
652                             if ( pArr->nodes[nSlot].compare_exchange_strong( slot, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
653                                 // slot can be disposed
654                                 gc::template retire<disposer>( slot.ptr() );
655                                 m_Stat.onUpdateExisting();
656                                 return std::make_pair( true, false );
657                             }
658
659                             m_Stat.onUpdateRetry();
660                             continue;
661                         }
662
663                         // the slot must be expanded
664                         expand_slot( pArr, nSlot, slot, nOffset );
665                     }
666                     else {
667                         // the slot is empty, try to insert data node
668                         if ( bInsert ) {
669                             node_ptr pNull;
670                             if ( pArr->nodes[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
671                             {
672                                 // the new data node has been inserted
673                                 ++m_ItemCounter;
674                                 m_Stat.onUpdateNew();
675                                 return std::make_pair( true, true );
676                             }
677                         }
678                         else {
679                             m_Stat.onUpdateFailed();
680                             return std::make_pair( false, false );
681                         }
682
683                         // insert failed - slot has been changed by another thread
684                         // retry updating
685                         m_Stat.onUpdateRetry();
686                     }
687                 }
688             } // while
689         }
690
691         /// Unlinks the item \p val from the set
692         /**
693             The function searches the item \p val in the set and unlink it
694             if it is found and its address is equal to <tt>&val</tt>.
695
696             The function returns \p true if success and \p false otherwise.
697         */
698         bool unlink( value_type const& val )
699         {
700             typename gc::Guard guard;
701             auto pred = [&val](value_type const& item) -> bool { return &item == &val; };
702             value_type * p = do_erase( hash_accessor()( val ), guard, std::ref( pred ));
703
704             // p is guarded by HP
705             if ( p ) {
706                 gc::template retire<disposer>( p );
707                 --m_ItemCounter;
708                 m_Stat.onEraseSuccess();
709                 return true;
710             }
711             return false;
712         }
713
714         /// Deletes the item from the set
715         /** 
716             The function searches \p hash in the set,
717             unlinks the item found, and returns \p true.
718             If that item is not found the function returns \p false.
719
720             The \ref disposer specified in \p Traits is called by garbage collector \p GC asynchronously.
721
722         */
723         bool erase( hash_type const& hash )
724         {
725             return erase(hash, [](value_type const&) {} );
726         }
727
728         /// Deletes the item from the set
729         /**
730             The function searches \p hash in the set,
731             call \p f functor with item found, and unlinks it from the set.
732             The \ref disposer specified in \p Traits is called
733             by garbage collector \p GC asynchronously.
734
735             The \p Func interface is
736             \code
737             struct functor {
738                 void operator()( value_type& item );
739             };
740             \endcode
741
742             If \p hash is not found the function returns \p false.
743         */
744         template <typename Func>
745         bool erase( hash_type const& hash, Func f )
746         {
747             typename gc::Guard guard;
748             value_type * p = do_erase( hash, guard, []( value_type const&) -> bool {return true; } );
749
750             // p is guarded by HP
751             if ( p ) {
752                 gc::template retire<disposer>( p );
753                 --m_ItemCounter;
754                 f( *p );
755                 m_Stat.onEraseSuccess();
756                 return true;
757             }
758             return false;
759         }
760
761         /// Extracts the item with specified \p hash
762         /** 
763             The function searches \p hash in the set,
764             unlinks it from the set, and returns an guarded pointer to the item extracted.
765             If \p hash is not found the function returns an empty guarded pointer.
766
767             The \p disposer specified in \p Traits class' template parameter is called automatically
768             by garbage collector \p GC when returned \ref guarded_ptr object to be destroyed or released.
769             @note Each \p guarded_ptr object uses the GC's guard that can be limited resource.
770
771             Usage:
772             \code
773             typedef cds::intrusive::MultiLevelHashSet< your_template_args > my_set;
774             my_set theSet;
775             // ...
776             {
777                 my_set::guarded_ptr gp( theSet.extract( 5 ));
778                 if ( gp ) {
779                     // Deal with gp
780                     // ...
781                 }
782                 // Destructor of gp releases internal HP guard
783             }
784             \endcode
785         */
786         guarded_ptr extract( hash_type const& hash )
787         {
788             guarded_ptr gp;
789             {
790                 typename gc::Guard guard;
791                 value_type * p = do_erase( hash, guard, []( value_type const&) -> bool {return true; } );
792
793                 // p is guarded by HP
794                 if ( p ) {
795                     gc::template retire<disposer>( p );
796                     --m_ItemCounter;
797                     m_Stat.onEraseSuccess();
798                     gp.reset( p );
799                 }
800             }
801             return gp;
802         }
803
804         /// Finds an item by it's \p hash
805         /**
806             The function searches the item by \p hash and calls the functor \p f for item found.
807             The interface of \p Func functor is:
808             \code
809             struct functor {
810                 void operator()( value_type& item );
811             };
812             \endcode
813             where \p item is the item found.
814
815             The functor may change non-key fields of \p item. Note that the functor is only guarantee
816             that \p item cannot be disposed during the functor is executing.
817             The functor does not serialize simultaneous access to the set's \p item. If such access is
818             possible you must provide your own synchronization schema on item level to prevent unsafe item modifications.
819
820             The function returns \p true if \p hash is found, \p false otherwise.
821         */
822         template <typename Func>
823         bool find( hash_type const& hash, Func f )
824         {
825             typename gc::Guard guard;
826             value_type * p = search( hash, guard );
827
828             // p is guarded by HP
829             if ( p ) {
830                 f( *p );
831                 return true;
832             }
833             return false;
834         }
835
836         /// Checks whether the set contains \p hash
837         /**
838             The function searches the item by its \p hash
839             and returns \p true if it is found, or \p false otherwise.
840         */
841         bool contains( hash_type const& hash )
842         {
843             return find( hash, [](value_type&) {} );
844         }
845
846         /// Finds an item by it's \p hash and returns the item found
847         /**
848             The function searches the item by its \p hash
849             and returns the guarded pointer to the item found.
850             If \p hash is not found the function returns an empty \p guarded_ptr.
851
852             @note Each \p guarded_ptr object uses one GC's guard which can be limited resource.
853
854             Usage:
855             \code
856             typedef cds::intrusive::MultiLevelHashSet< your_template_params >  my_set;
857             my_set theSet;
858             // ...
859             {
860                 my_set::guarded_ptr gp( theSet.get( 5 ));
861                 if ( theSet.get( 5 )) {
862                     // Deal with gp
863                     //...
864                 }
865                 // Destructor of guarded_ptr releases internal HP guard
866             }
867             \endcode
868         */
869         guarded_ptr get( hash_type const& hash )
870         {
871             guarded_ptr gp;
872             {
873                 typename gc::Guard guard;
874                 gp.reset( search( hash, guard ));
875             }
876             return gp;
877         }
878
879         /// Clears the set (non-atomic)
880         /**
881             The function unlink all data node from the set.
882             The function is not atomic but is thread-safe.
883             After \p %clear() the set may not be empty because another threads may insert items.
884
885             For each item the \p disposer is called after unlinking.
886         */
887         void clear()
888         {
889             clear_array( m_Head, head_size() );
890         }
891
892         /// Checks if the set is empty
893         /**
894             Emptiness is checked by item counting: if item count is zero then the set is empty.
895             Thus, the correct item counting feature is an important part of the set implementation.
896         */
897         bool empty() const
898         {
899             return size() == 0;
900         }
901
902         /// Returns item count in the set
903         size_t size() const
904         {
905             return m_ItemCounter;
906         }
907
908         /// Returns const reference to internal statistics
909         stat const& statistics() const
910         {
911             return m_Stat;
912         }
913
914         /// Returns the size of head node
915         size_t head_size() const
916         {
917             return m_Metrics.head_node_size;
918         }
919
920         /// Returns the size of the array node
921         size_t array_node_size() const
922         {
923             return m_Metrics.array_node_size;
924         }
925
926     public:
927     ///@name Thread-safe iterators
928         /** @anchor cds_intrusive_MultilevelHashSet_iterators
929             The set supports thread-safe iterators: you may iterate over the set in multi-threaded environment.\r
930             It is guaranteed that the iterators will remain valid even if another thread deletes the node the iterator points to:\r
931             Hazard Pointer embedded into the iterator object protects the node from physical reclamation.\r
932 \r
933             @note Since the iterator object contains hazard pointer that is a thread-local resource,\r
934             the iterator should not be passed to another thread.\r
935 \r
936             Each iterator object supports the common interface:\r
937             - dereference operators:\r
938                 @code\r
939                 value_type [const] * operator ->() noexcept\r
940                 value_type [const] & operator *() noexcept\r
941                 @endcode\r
942             - pre-increment and pre-decrement. Post-operators is not supported\r
943             - equality operators <tt>==</tt> and <tt>!=</tt>.\r
944                 Iterators are equal iff they point to the same cell of the same array node.
945                 Note that for two iterators \p it1 and \p it2, the conditon <tt> it1 == it2 </tt> 
946                 does not entail <tt> &(*it1) == &(*it2) </tt>: welcome to concurrent containers
947             - helper member function \p release() that clears internal hazard pointer.\r
948                 After \p release() call the iterator points to \p nullptr but it still remain valid: further iterating is possible.
949         */
950     ///@{
951
952         /// Returns an iterator to the beginning of the set
953         iterator begin()
954         {
955             return iterator( *this, m_Head, size_t(0) - 1 );
956         }
957
958         /// Returns an const iterator to the beginning of the set
959         const_iterator begin() const
960         {
961             return const_iterator( *this, m_Head, size_t(0) - 1 );
962         }
963
964         /// Returns an const iterator to the beginning of the set
965         const_iterator cbegin()
966         {
967             return const_iterator( *this, m_Head, size_t(0) - 1 );
968         }
969
970         /// Returns an iterator to the element following the last element of the set. This element acts as a placeholder; attempting to access it results in undefined behavior. 
971         iterator end()
972         {
973             return iterator( *this, m_Head, head_size() - 1 );
974         }
975
976         /// Returns a const iterator to the element following the last element of the set. This element acts as a placeholder; attempting to access it results in undefined behavior. 
977         const_iterator end() const
978         {
979             return const_iterator( *this, m_Head, head_size() - 1 );
980         }
981
982         /// Returns a const iterator to the element following the last element of the set. This element acts as a placeholder; attempting to access it results in undefined behavior. 
983         const_iterator cend()
984         {
985             return const_iterator( *this, m_Head, head_size() - 1 );
986         }
987
988         /// Returns a reverse iterator to the first element of the reversed set
989         reverse_iterator rbegin()
990         {
991             return reverse_iterator( *this, m_Head, head_size() );
992         }
993
994         /// Returns a const reverse iterator to the first element of the reversed set
995         const_reverse_iterator rbegin() const
996         {
997             return const_reverse_iterator( *this, m_Head, head_size() );
998         }
999
1000         /// Returns a const reverse iterator to the first element of the reversed set
1001         const_reverse_iterator crbegin()
1002         {
1003             return const_reverse_iterator( *this, m_Head, head_size() );
1004         }
1005
1006         /// Returns a reverse iterator to the element following the last element of the reversed set
1007         /**
1008             It corresponds to the element preceding the first element of the non-reversed container. 
1009             This element acts as a placeholder, attempting to access it results in undefined behavior. 
1010         */
1011         reverse_iterator rend()
1012         {
1013             return reverse_iterator( *this, m_Head, 0 );
1014         }
1015
1016         /// Returns a const reverse iterator to the element following the last element of the reversed set
1017         /**
1018             It corresponds to the element preceding the first element of the non-reversed container. 
1019             This element acts as a placeholder, attempting to access it results in undefined behavior. 
1020         */
1021         const_reverse_iterator rend() const
1022         {
1023             return const_reverse_iterator( *this, m_Head, 0 );
1024         }
1025
1026         /// Returns a const reverse iterator to the element following the last element of the reversed set
1027         /**
1028             It corresponds to the element preceding the first element of the non-reversed container. 
1029             This element acts as a placeholder, attempting to access it results in undefined behavior. 
1030         */
1031         const_reverse_iterator crend()
1032         {
1033             return const_reverse_iterator( *this, m_Head, 0 );
1034         }
1035     ///@}
1036
1037     private:
1038         //@cond
1039         static metrics make_metrics( size_t head_bits, size_t array_bits )
1040         {
1041             size_t const hash_bits = sizeof( hash_type ) * 8;
1042
1043             if ( array_bits < 2 )
1044                 array_bits = 2;
1045             if ( head_bits < 4 )
1046                 head_bits = 4;
1047             if ( head_bits > hash_bits )
1048                 head_bits = hash_bits;
1049             if ( (hash_bits - head_bits) % array_bits != 0 )
1050                 head_bits += (hash_bits - head_bits) % array_bits;
1051
1052             assert( (hash_bits - head_bits) % array_bits == 0 );
1053
1054             metrics m;
1055             m.head_node_size_log = head_bits;
1056             m.head_node_size = size_t(1) << head_bits;
1057             m.array_node_size_log = array_bits;
1058             m.array_node_size = size_t(1) << array_bits;
1059             return m;
1060         }
1061
1062         array_node * alloc_head_node() const
1063         {
1064             return alloc_array_node( head_size(), nullptr, 0 );
1065         }
1066
1067         array_node * alloc_array_node( array_node * pParent, size_t idxParent ) const
1068         {
1069             return alloc_array_node( array_node_size(), pParent, idxParent );
1070         }
1071
1072         static array_node * alloc_array_node( size_t nSize, array_node * pParent, size_t idxParent )
1073         {
1074             array_node * pNode = cxx_array_node_allocator().NewBlock( sizeof(array_node) + sizeof(atomic_node_ptr) * (nSize - 1), pParent, idxParent );
1075             for ( atomic_node_ptr * p = pNode->nodes, *pEnd = pNode->nodes + nSize; p < pEnd; ++p )
1076                 p->store( node_ptr(), memory_model::memory_order_release );
1077             return pNode;
1078         }
1079
1080         static void free_array_node( array_node * parr )
1081         {
1082             cxx_array_node_allocator().Delete( parr );
1083         }
1084
1085         void destroy_tree()
1086         {
1087             // The function is not thread-safe. For use in dtor only
1088             // Remove data node
1089             clear();
1090
1091             // Destroy all array nodes
1092             destroy_array_nodes( m_Head, head_size());
1093         }
1094
1095         void destroy_array_nodes( array_node * pArr, size_t nSize )
1096         {
1097             for ( atomic_node_ptr * p = pArr->nodes, *pLast = pArr->nodes + nSize; p != pLast; ++p ) {
1098                 node_ptr slot = p->load( memory_model::memory_order_acquire );
1099                 if ( slot.bits() == flag_array_node ) {
1100                     destroy_array_nodes(to_array(slot.ptr()), array_node_size());
1101                     free_array_node( to_array(slot.ptr()));
1102                     p->store(node_ptr(), memory_model::memory_order_relaxed );
1103                 }
1104             }
1105         }
1106
1107         void clear_array( array_node * pArrNode, size_t nSize )
1108         {
1109             back_off bkoff;
1110
1111
1112             for ( atomic_node_ptr * pArr = pArrNode->nodes, *pLast = pArr + nSize; pArr != pLast; ++pArr ) {
1113                 while ( true ) {
1114                     node_ptr slot = pArr->load( memory_model::memory_order_acquire );
1115                     if ( slot.bits() == flag_array_node ) {
1116                         // array node, go down the tree
1117                         assert( slot.ptr() != nullptr );
1118                         clear_array( to_array( slot.ptr()), array_node_size() );
1119                         break;
1120                     }
1121                     else if ( slot.bits() == flag_array_converting ) {
1122                         // the slot is converting to array node right now
1123                         while ( (slot = pArr->load(memory_model::memory_order_acquire)).bits() == flag_array_converting ) {
1124                             bkoff();
1125                             m_Stat.onSlotConverting();
1126                         }
1127                         bkoff.reset();
1128
1129                         assert( slot.ptr() != nullptr );
1130                         assert(slot.bits() == flag_array_node );
1131                         clear_array( to_array( slot.ptr()), array_node_size() );
1132                         break;
1133                     }
1134                     else {
1135                         // data node
1136                         if ( pArr->compare_exchange_strong( slot, node_ptr(), memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
1137                             if ( slot.ptr() ) {
1138                                 gc::template retire<disposer>( slot.ptr() );
1139                                 --m_ItemCounter;
1140                                 m_Stat.onEraseSuccess();
1141                             }
1142                             break;
1143                         }
1144                     }
1145                 }
1146             }
1147         }
1148
1149         union converter {
1150             value_type * pData;
1151             array_node * pArr;
1152
1153             converter( value_type * p )
1154                 : pData( p )
1155             {}
1156
1157             converter( array_node * p )
1158                 : pArr( p )
1159             {}
1160         };
1161
1162         static array_node * to_array( value_type * p )
1163         {
1164             return converter( p ).pArr;
1165         }
1166         static value_type * to_node( array_node * p )
1167         {
1168             return converter( p ).pData;
1169         }
1170
1171         bool expand_slot( array_node * pParent, size_t idxParent, node_ptr current, size_t nOffset )
1172         {
1173             assert( current.bits() == 0 );
1174             assert( current.ptr() );
1175
1176             size_t idx = hash_splitter(hash_accessor()(*current.ptr()), nOffset).cut( m_Metrics.array_node_size_log );
1177             array_node * pArr = alloc_array_node( pParent, idxParent );
1178
1179             node_ptr cur(current.ptr());
1180             atomic_node_ptr& slot = pParent->nodes[idxParent];
1181             if ( !slot.compare_exchange_strong( cur, cur | flag_array_converting, memory_model::memory_order_release, atomics::memory_order_relaxed )) 
1182             {
1183                 m_Stat.onExpandNodeFailed();
1184                 free_array_node( pArr );
1185                 return false;
1186             }
1187
1188             pArr->nodes[idx].store( current, memory_model::memory_order_release );
1189
1190             cur = cur | flag_array_converting;
1191             CDS_VERIFY( 
1192                 slot.compare_exchange_strong( cur, node_ptr( to_node( pArr ), flag_array_node ), memory_model::memory_order_release, atomics::memory_order_relaxed )
1193             );
1194
1195             m_Stat.onArrayNodeCreated();
1196             return true;
1197         }
1198
1199         value_type * search( hash_type const& hash, typename gc::Guard& guard )
1200         {
1201             hash_splitter splitter( hash );
1202             hash_comparator cmp;
1203             back_off bkoff;
1204
1205             array_node * pArr = m_Head;
1206             size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
1207             assert( nSlot < m_Metrics.head_node_size );
1208
1209             while ( true ) {
1210                 node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
1211                 if ( slot.bits() == flag_array_node ) {
1212                     // array node, go down the tree
1213                     assert( slot.ptr() != nullptr );
1214                     nSlot = splitter.cut( m_Metrics.array_node_size_log );
1215                     assert( nSlot < m_Metrics.array_node_size );
1216                     pArr = to_array( slot.ptr() );
1217                 }
1218                 else if ( slot.bits() == flag_array_converting ) {
1219                     // the slot is converting to array node right now
1220                     bkoff();
1221                     m_Stat.onSlotConverting();
1222                 }
1223                 else {
1224                     // data node
1225                     assert(slot.bits() == 0 );
1226
1227                     // protect data node by hazard pointer
1228                     if ( guard.protect( pArr->nodes[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
1229                         // slot value has been changed - retry
1230                         m_Stat.onSlotChanged();
1231                     }
1232                     else if ( slot.ptr() && cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) {
1233                         // item found
1234                         m_Stat.onFindSuccess();
1235                         return slot.ptr();
1236                     }
1237                     m_Stat.onFindFailed();
1238                     return nullptr;
1239                 }
1240             } // while
1241         }
1242
1243         template <typename Predicate>
1244         value_type * do_erase( hash_type const& hash, typename gc::Guard& guard, Predicate pred )
1245         {
1246             hash_splitter splitter( hash );
1247             hash_comparator cmp;
1248             back_off bkoff;
1249
1250             array_node * pArr = m_Head;
1251             size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
1252             assert( nSlot < m_Metrics.head_node_size );
1253
1254             while ( true ) {
1255                 node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
1256                 if ( slot.bits() == flag_array_node ) {
1257                     // array node, go down the tree
1258                     assert( slot.ptr() != nullptr );
1259                     nSlot = splitter.cut( m_Metrics.array_node_size_log );
1260                     assert( nSlot < m_Metrics.array_node_size );
1261                     pArr = to_array( slot.ptr() );
1262                 }
1263                 else if ( slot.bits() == flag_array_converting ) {
1264                     // the slot is converting to array node right now
1265                     bkoff();
1266                     m_Stat.onSlotConverting();
1267                 }
1268                 else {
1269                     // data node
1270                     assert(slot.bits() == 0 );
1271
1272                     // protect data node by hazard pointer
1273                     if ( guard.protect( pArr->nodes[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) {
1274                         // slot value has been changed - retry
1275                         m_Stat.onSlotChanged();
1276                     }
1277                     else if ( slot.ptr() ) {
1278                         if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 && pred( *slot.ptr() )) {
1279                             // item found - replace it with nullptr
1280                             if ( pArr->nodes[nSlot].compare_exchange_strong(slot, node_ptr(nullptr), memory_model::memory_order_acquire, atomics::memory_order_relaxed))
1281                                 return slot.ptr();
1282                             m_Stat.onEraseRetry();
1283                             continue;
1284                         }
1285                         m_Stat.onEraseFailed();
1286                         return nullptr;
1287                     }
1288                     else {
1289                         // the slot is empty
1290                         m_Stat.onEraseFailed();
1291                         return nullptr;
1292                     }
1293                 }
1294             } // while
1295         }
1296
1297         //@endcond
1298     };
1299 }} // namespace cds::intrusive
1300
1301 #endif // #ifndef CDSLIB_INTRUSIVE_IMPL_MULTILEVEL_HASHSET_H