Added intrusive::MultiLevelHashSet<RCU> implementation
[libcds.git] / cds / intrusive / multilevel_hashset_rcu.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_INTRUSIVE_MULTILEVEL_HASHSET_RCU_H
4 #define CDSLIB_INTRUSIVE_MULTILEVEL_HASHSET_RCU_H
5
6 #include <functional>   // std::ref
7 #include <iterator>     // std::iterator_traits
8
9 #include <cds/intrusive/details/multilevel_hashset_base.h>
10 #include <cds/details/allocator.h>
11 #include <cds/urcu/details/check_deadlock.h>
12 #include <cds/urcu/exempt_ptr.h>
13 #include <cds/intrusive/details/raw_ptr_disposer.h>
14
15
16 namespace cds { namespace intrusive {
17     /// Intrusive hash set based on multi-level array, \ref cds_urcu_desc "RCU" specialization
18     /** @ingroup cds_intrusive_map
19         @anchor cds_intrusive_MultilevelHashSet_rcu
20
21         Source:
22         - [2013] Steven Feldman, Pierre LaBorde, Damian Dechev "Concurrent Multi-level Arrays:
23             Wait-free Extensible Hash Maps"
24
25         See algorithm short description @ref cds_intrusive_MultilevelHashSet_hp "here"
26
27         Template parameters:
28         - \p RCU - one of \ref cds_urcu_gc "RCU type"
29         - \p T - a value type to be stored in the set
30         - \p Traits - type traits, the structure based on \p multilevel_hashset::traits or result of \p multilevel_hashset::make_traits metafunction.
31         \p Traits is the mandatory argument because it has one mandatory type - an @ref multilevel_hashset::traits::hash_accessor "accessor"
32         to hash value of \p T. The set algorithm does not calculate that hash value.
33
34         @note Before including <tt><cds/intrusive/multilevel_hashset_rcu.h></tt> you should include appropriate RCU header file,
35         see \ref cds_urcu_gc "RCU type" for list of existing RCU class and corresponding header files.
36
37         The set supports @ref cds_intrusive_MultilevelHashSet_rcu_iterators "bidirectional thread-safe iterators" 
38         with some restrictions.
39     */
40     template <
41         class RCU,
42         class T,
43 #ifdef CDS_DOXYGEN_INVOKED
44         class Traits = multilevel_hashset::traits
45 #else
46         class Traits
47 #endif
48     >
49     class MultiLevelHashSet< cds::urcu::gc< RCU >, T, Traits >
50     {
51     public:
52         typedef cds::urcu::gc< RCU > gc; ///< RCU garbage collector
53         typedef T       value_type;      ///< type of value stored in the set
54         typedef Traits  traits;          ///< Traits template parameter
55
56         typedef typename traits::hash_accessor hash_accessor; ///< Hash accessor functor
57         static_assert(!std::is_same< hash_accessor, cds::opt::none >::value, "hash_accessor functor must be specified in Traits");
58
59         /// Hash type deduced from \p hash_accessor return type
60         typedef typename std::decay<
61             typename std::remove_reference<
62             decltype(hash_accessor()(std::declval<T>()))
63             >::type
64         >::type hash_type;
65         //typedef typename std::result_of< hash_accessor( std::declval<T>()) >::type hash_type;
66         static_assert(!std::is_pointer<hash_type>::value, "hash_accessor should return a reference to hash value");
67
68         typedef typename traits::disposer disposer; ///< data node disposer
69
70 #   ifdef CDS_DOXYGEN_INVOKED
71         typedef implementation_defined hash_comparator;    ///< hash compare functor based on opt::compare and opt::less option setter
72 #   else
73         typedef typename cds::opt::details::make_comparator_from<
74             hash_type,
75             traits,
76             multilevel_hashset::bitwise_compare< hash_type >
77         >::type hash_comparator;
78 #   endif
79
80         typedef typename traits::item_counter   item_counter;   ///< Item counter type
81         typedef typename traits::node_allocator node_allocator; ///< Array node allocator
82         typedef typename traits::memory_model   memory_model;   ///< Memory model
83         typedef typename traits::back_off       back_off;       ///< Backoff strategy
84         typedef typename traits::stat           stat;           ///< Internal statistics type
85         typedef typename traits::rcu_check_deadlock rcu_check_deadlock; ///< Deadlock checking policy
86         typedef typename gc::scoped_lock       rcu_lock;        ///< RCU scoped lock
87         static CDS_CONSTEXPR const bool c_bExtractLockExternal = false; ///< Group of \p extract_xxx functions does not require external locking
88
89         using exempt_ptr = cds::urcu::exempt_ptr< gc, value_type, value_type, disposer, void >; ///< pointer to extracted node
90
91         /// Node marked poiter
92         typedef cds::details::marked_ptr< value_type, 3 > node_ptr;
93         /// Array node element
94         typedef atomics::atomic< node_ptr > atomic_node_ptr;
95
96     protected:
97         //@cond
98         enum node_flags {
99             flag_array_converting = 1,   ///< the cell is converting from data node to an array node
100             flag_array_node = 2          ///< the cell is a pointer to an array node
101         };
102
103         struct array_node {
104             array_node * const  pParent;    ///< parent array node
105             size_t const        idxParent;  ///< index in parent array node
106             atomic_node_ptr     nodes[1];   ///< node array
107
108             array_node(array_node * parent, size_t idx)
109                 : pParent(parent)
110                 , idxParent(idx)
111             {}
112
113             array_node() = delete;
114             array_node(array_node const&) = delete;
115             array_node(array_node&&) = delete;
116         };
117
118         typedef cds::details::Allocator< array_node, node_allocator > cxx_array_node_allocator;
119         typedef multilevel_hashset::details::hash_splitter< hash_type > hash_splitter;
120         typedef cds::urcu::details::check_deadlock_policy< gc, rcu_check_deadlock> check_deadlock_policy;
121
122         //@endcond
123
124     private:
125         //@cond
126         multilevel_hashset::details::metrics const m_Metrics;     ///< Metrics
127         array_node *      m_Head;        ///< Head array
128         item_counter      m_ItemCounter; ///< Item counter
129         stat              m_Stat;        ///< Internal statistics
130         //@endcond
131
132     public:
133         /// Creates empty set
134         /**
135             @param head_bits: 2<sup>head_bits</sup> specifies the size of head array, minimum is 4.
136             @param array_bits: 2<sup>array_bits</sup> specifies the size of array node, minimum is 2.
137
138             Equation for \p head_bits and \p array_bits:
139             \code
140             sizeof(hash_type) * 8 == head_bits + N * array_bits
141             \endcode
142             where \p N is multi-level array depth.
143         */
144         MultiLevelHashSet(size_t head_bits = 8, size_t array_bits = 4)
145             : m_Metrics(multilevel_hashset::details::metrics::make(head_bits, array_bits, sizeof(hash_type)))
146             , m_Head(alloc_head_node())
147         {}
148
149         /// Destructs the set and frees all data
150         ~MultiLevelHashSet()
151         {
152             destroy_tree();
153             free_array_node(m_Head);
154         }
155
156         /// Inserts new node
157         /**
158             The function inserts \p val in the set if it does not contain
159             an item with that hash.
160
161             Returns \p true if \p val is placed into the set, \p false otherwise.
162
163             The function locks RCU internally.
164         */
165         bool insert( value_type& val )
166         {
167             return insert( val, [](value_type&) {} );
168         }
169
170         /// Inserts new node
171         /**
172             This function is intended for derived non-intrusive containers.
173
174             The function allows to split creating of new item into two part:
175             - create item with key only
176             - insert new item into the set
177             - if inserting is success, calls \p f functor to initialize \p val.
178
179             The functor signature is:
180             \code
181                 void func( value_type& val );
182             \endcode
183             where \p val is the item inserted.
184
185             The user-defined functor is called only if the inserting is success.
186
187             The function locks RCU internally.
188             @warning See \ref cds_intrusive_item_creating "insert item troubleshooting".
189         */
190         template <typename Func>
191         bool insert( value_type& val, Func f )
192         {
193             hash_type const& hash = hash_accessor()( val );
194             hash_splitter splitter( hash );
195             hash_comparator cmp;
196             back_off bkoff;
197
198             size_t nOffset = m_Metrics.head_node_size_log;
199             array_node * pArr = m_Head;
200             size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
201             assert( nSlot < m_Metrics.head_node_size );
202             size_t nHeight = 1;
203
204             while ( true ) {
205                 node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
206                 if ( slot.bits() == flag_array_node ) {
207                     // array node, go down the tree
208                     assert( slot.ptr() != nullptr );
209                     nSlot = splitter.cut( m_Metrics.array_node_size_log );
210                     assert( nSlot < m_Metrics.array_node_size );
211                     pArr = to_array( slot.ptr() );
212                     nOffset += m_Metrics.array_node_size_log;
213                     ++nHeight;
214                 }
215                 else if ( slot.bits() == flag_array_converting ) {
216                     // the slot is converting to array node right now
217                     bkoff();
218                     m_Stat.onSlotConverting();
219                 }
220                 else {
221                     // data node
222                     assert(slot.bits() == 0 );
223
224                     rcu_lock rcuLock;
225                     if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) == slot ) {
226                         if ( slot.ptr() ) {
227                             if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) {
228                                 // the item with that hash value already exists
229                                 m_Stat.onInsertFailed();
230                                 return false;
231                             }
232
233                             // the slot must be expanded
234                             expand_slot( pArr, nSlot, slot, nOffset );
235                         }
236                         else {
237                             // the slot is empty, try to insert data node
238                             node_ptr pNull;
239                             if ( pArr->nodes[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
240                             {
241                                 // the new data node has been inserted
242                                 f( val );
243                                 ++m_ItemCounter;
244                                 m_Stat.onInsertSuccess();
245                                 m_Stat.height( nHeight );
246                                 return true;
247                             }
248
249                             // insert failed - slot has been changed by another thread
250                             // retry inserting
251                             m_Stat.onInsertRetry();
252                         }
253                     }
254                     else
255                         m_Stat.onSlotChanged();
256                 }
257             } // while
258         }
259
260         /// Updates the node
261         /**
262             Performs inserting or updating the item with hash value equal to \p val.
263             - If hash value is found then existing item is replaced with \p val, old item is disposed
264               with \p Traits::disposer. Note that the disposer is called by \p GC asynchronously.
265               The function returns <tt> std::pair<true, false> </tt>
266             - If hash value is not found and \p bInsert is \p true then \p val is inserted,
267               the function returns <tt> std::pair<true, true> </tt>
268             - If hash value is not found and \p bInsert is \p false then the set is unchanged,
269               the function returns <tt> std::pair<false, false> </tt>
270
271             Returns <tt> std::pair<bool, bool> </tt> where \p first is \p true if operation is successfull
272             (i.e. the item has been inserted or updated),
273             \p second is \p true if new item has been added or \p false if the set contains that hash.
274
275             The function locks RCU internally.
276         */
277         std::pair<bool, bool> update( value_type& val, bool bInsert = true )
278         {
279             return do_update(val, [](value_type&, value_type *) {}, bInsert );
280         }
281
282         /// Unlinks the item \p val from the set
283         /**
284             The function searches the item \p val in the set and unlink it
285             if it is found and its address is equal to <tt>&val</tt>.
286
287             The function returns \p true if success and \p false otherwise.
288
289             RCU should not be locked. The function locks RCU internally.
290         */
291         bool unlink( value_type const& val )
292         {
293             check_deadlock_policy::check();
294
295             auto pred = [&val](value_type const& item) -> bool { return &item == &val; };
296             value_type * p;
297             {
298                 rcu_lock rcuLock;
299                 p = do_erase( hash_accessor()( val ), std::ref( pred ));
300             }
301             if ( p ) {
302                 gc::template retire_ptr<disposer>( p );
303                 return true;
304             }
305             return false;
306         }
307
308         /// Deletes the item from the set
309         /**
310             The function searches \p hash in the set,
311             unlinks the item found, and returns \p true.
312             If that item is not found the function returns \p false.
313
314             The \ref disposer specified in \p Traits is called by garbage collector \p GC asynchronously.
315
316             RCU should not be locked. The function locks RCU internally.
317         */
318         bool erase( hash_type const& hash )
319         {
320             return erase(hash, [](value_type const&) {} );
321         }
322
323         /// Deletes the item from the set
324         /**
325             The function searches \p hash in the set,
326             call \p f functor with item found, and unlinks it from the set.
327             The \ref disposer specified in \p Traits is called
328             by garbage collector \p GC asynchronously.
329
330             The \p Func interface is
331             \code
332             struct functor {
333                 void operator()( value_type& item );
334             };
335             \endcode
336
337             If \p hash is not found the function returns \p false.
338
339             RCU should not be locked. The function locks RCU internally.
340         */
341         template <typename Func>
342         bool erase( hash_type const& hash, Func f )
343         {
344             check_deadlock_policy::check();
345
346             value_type * p;
347
348             {
349                 rcu_lock rcuLock;
350                 p = do_erase( hash, []( value_type const&) -> bool { return true; } );
351             }
352
353             // p is guarded by HP
354             if ( p ) {
355                 f( *p );
356                 gc::template retire_ptr<disposer>(p);
357                 return true;
358             }
359             return false;
360         }
361
362         /// Extracts the item with specified \p hash
363         /**
364             The function searches \p hash in the set,
365             unlinks it from the set, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item found.
366             If the item with key equal to \p key is not found the function returns an empty \p exempt_ptr.
367
368             RCU \p synchronize method can be called. RCU should NOT be locked.
369             The function does not call the disposer for the item found.
370             The disposer will be implicitly invoked when the returned object is destroyed or when
371             its \p release() member function is called.
372             Example:
373             \code
374             typedef cds::intrusive::MultiLevelHashSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > set_type;
375             set_type theSet;
376             // ...
377
378             typename set_type::exempt_ptr ep( theSet.extract( 5 ));
379             if ( ep ) {
380                 // Deal with ep
381                 //...
382
383                 // Dispose returned item.
384                 ep.release();
385             }
386             \endcode
387         */
388         exempt_ptr extract( hash_type const& hash )
389         {
390             check_deadlock_policy::check();
391
392             value_type * p;
393             {
394                 rcu_lock rcuLock;
395                 p = do_erase( hash, []( value_type const&) -> bool {return true;} );
396             }
397             return exempt_ptr( p );
398         }
399
400         /// Finds an item by it's \p hash
401         /**
402             The function searches the item by \p hash and calls the functor \p f for item found.
403             The interface of \p Func functor is:
404             \code
405             struct functor {
406                 void operator()( value_type& item );
407             };
408             \endcode
409             where \p item is the item found.
410
411             The functor may change non-key fields of \p item. Note that the functor is only guarantee
412             that \p item cannot be disposed during the functor is executing.
413             The functor does not serialize simultaneous access to the set's \p item. If such access is
414             possible you must provide your own synchronization schema on item level to prevent unsafe item modifications.
415
416             The function returns \p true if \p hash is found, \p false otherwise.
417
418             The function applies RCU lock internally.
419         */
420         template <typename Func>
421         bool find( hash_type const& hash, Func f )
422         {
423             rcu_lock rcuLock;
424
425             value_type * p = search( hash );
426             if ( p ) {
427                 f( *p );
428                 return true;
429             }
430             return false;
431         }
432
433         /// Checks whether the set contains \p hash
434         /**
435             The function searches the item by its \p hash
436             and returns \p true if it is found, or \p false otherwise.
437
438             The function applies RCU lock internally.
439         */
440         bool contains( hash_type const& hash )
441         {
442             return find( hash, [](value_type&) {} );
443         }
444
445         /// Finds an item by it's \p hash and returns the item found
446         /**
447             The function searches the item by its \p hash
448             and returns the pointer to the item found.
449             If \p hash is not found the function returns \p nullptr.
450
451             RCU should be locked before the function invocation.
452             Returned pointer is valid only while RCU is locked.
453
454             Usage:
455             \code
456             typedef cds::intrusive::MultiLevelHashSet< your_template_params >  my_set;
457             my_set theSet;
458             // ...
459             {
460                 // lock RCU
461                 my_set::rcu_lock
462
463                 foo * p = theSet.get( 5 );
464                 if ( p ) {
465                     // Deal with p
466                     //...
467                 }
468             }
469             \endcode
470         */
471         value_type * get( hash_type const& hash )
472         {
473             assert( gc::is_locked());
474             return search( hash );
475         }
476
477         /// Clears the set (non-atomic)
478         /**
479             The function unlink all data node from the set.
480             The function is not atomic but is thread-safe.
481             After \p %clear() the set may not be empty because another threads may insert items.
482
483             For each item the \p disposer is called after unlinking.
484         */
485         void clear()
486         {
487             clear_array( m_Head, head_size() );
488         }
489
490         /// Checks if the set is empty
491         /**
492             Emptiness is checked by item counting: if item count is zero then the set is empty.
493             Thus, the correct item counting feature is an important part of the set implementation.
494         */
495         bool empty() const
496         {
497             return size() == 0;
498         }
499
500         /// Returns item count in the set
501         size_t size() const
502         {
503             return m_ItemCounter;
504         }
505
506         /// Returns const reference to internal statistics
507         stat const& statistics() const
508         {
509             return m_Stat;
510         }
511
512         /// Returns the size of head node
513         size_t head_size() const
514         {
515             return m_Metrics.head_node_size;
516         }
517
518         /// Returns the size of the array node
519         size_t array_node_size() const
520         {
521             return m_Metrics.array_node_size;
522         }
523
524     protected:
525         //@cond
526         class iterator_base
527         {
528             friend class MultiLevelHashSet;
529
530         protected:
531             array_node *        m_pNode;    ///< current array node
532             size_t              m_idx;      ///< current position in m_pNode
533             value_type *        m_pValue;   ///< current value
534             MultiLevelHashSet const*  m_set;    ///< Hash set
535
536         public:
537             iterator_base() CDS_NOEXCEPT
538                 : m_pNode(nullptr)
539                 , m_idx(0)
540                 , m_pValue(nullptr)
541                 , m_set(nullptr)
542             {}
543
544             iterator_base(iterator_base const& rhs) CDS_NOEXCEPT
545                 : m_pNode(rhs.m_pNode)
546                 , m_idx(rhs.m_idx)
547                 , m_pValue(rhs.m_pValue)
548                 , m_set(rhs.m_set)
549             {}
550
551             iterator_base& operator=(iterator_base const& rhs) CDS_NOEXCEPT
552             {
553                 m_pNode = rhs.m_pNode;
554                 m_idx = rhs.m_idx;
555                 m_pValue = rhs.m_pValue;
556                 m_set = rhs.m_set;
557                 return *this;
558             }
559
560             iterator_base& operator++()
561             {
562                 forward();
563                 return *this;
564             }
565
566             iterator_base& operator--()
567             {
568                 backward();
569                 return *this;
570             }
571
572             bool operator ==(iterator_base const& rhs) const CDS_NOEXCEPT
573             {
574                 return m_pNode == rhs.m_pNode && m_idx == rhs.m_idx && m_set == rhs.m_set;
575             }
576
577             bool operator !=(iterator_base const& rhs) const CDS_NOEXCEPT
578             {
579                 return !(*this == rhs);
580             }
581
582         protected:
583             iterator_base(MultiLevelHashSet const& set, array_node * pNode, size_t idx, bool)
584                 : m_pNode(pNode)
585                 , m_idx(idx)
586                 , m_pValue(nullptr)
587                 , m_set(&set)
588             {}
589
590             iterator_base(MultiLevelHashSet const& set, array_node * pNode, size_t idx)
591                 : m_pNode(pNode)
592                 , m_idx(idx)
593                 , m_pValue(nullptr)
594                 , m_set(&set)
595             {
596                 forward();
597             }
598
599             value_type * pointer() const CDS_NOEXCEPT
600             {
601                 assert(gc::is_locked());
602                 return m_pValue;
603             }
604
605             void forward()
606             {
607                 assert( gc::is_locked());
608                 assert(m_set != nullptr);
609                 assert(m_pNode != nullptr);
610
611                 size_t const arrayNodeSize = m_set->array_node_size();
612                 size_t const headSize = m_set->head_size();
613                 array_node * pNode = m_pNode;
614                 size_t idx = m_idx + 1;
615                 size_t nodeSize = m_pNode->pParent ? arrayNodeSize : headSize;
616
617                 for (;;) {
618                     if (idx < nodeSize) {
619                         node_ptr slot = pNode->nodes[idx].load(memory_model::memory_order_acquire);
620                         if (slot.bits() == flag_array_node) {
621                             // array node, go down the tree
622                             assert(slot.ptr() != nullptr);
623                             pNode = to_array(slot.ptr());
624                             idx = 0;
625                             nodeSize = arrayNodeSize;
626                         }
627                         else if (slot.bits() == flag_array_converting) {
628                             // the slot is converting to array node right now - skip the node
629                             ++idx;
630                         }
631                         else {
632                             if (slot.ptr()) {
633                                 // data node
634                                 m_pNode = pNode;
635                                 m_idx = idx;
636                                 m_pValue = slot.ptr();
637                                 return;
638                             }
639                             ++idx;
640                         }
641                     }
642                     else {
643                         // up to parent node
644                         if (pNode->pParent) {
645                             idx = pNode->idxParent + 1;
646                             pNode = pNode->pParent;
647                             nodeSize = pNode->pParent ? arrayNodeSize : headSize;
648                         }
649                         else {
650                             // end()
651                             assert(pNode == m_set->m_Head);
652                             assert(idx == headSize);
653                             m_pNode = pNode;
654                             m_idx = idx;
655                             m_pValue = nullptr;
656                             return;
657                         }
658                     }
659                 }
660             }
661
662             void backward()
663             {
664                 assert(gc::is_locked());
665                 assert(m_set != nullptr);
666                 assert(m_pNode != nullptr);
667
668                 size_t const arrayNodeSize = m_set->array_node_size();
669                 size_t const headSize = m_set->head_size();
670                 size_t const endIdx = size_t(0) - 1;
671
672                 array_node * pNode = m_pNode;
673                 size_t idx = m_idx - 1;
674                 size_t nodeSize = m_pNode->pParent ? arrayNodeSize : headSize;
675
676                 for (;;) {
677                     if (idx != endIdx) {
678                         node_ptr slot = pNode->nodes[idx].load(memory_model::memory_order_acquire);
679                         if (slot.bits() == flag_array_node) {
680                             // array node, go down the tree
681                             assert(slot.ptr() != nullptr);
682                             pNode = to_array(slot.ptr());
683                             nodeSize = arrayNodeSize;
684                             idx = nodeSize - 1;
685                         }
686                         else if (slot.bits() == flag_array_converting) {
687                             // the slot is converting to array node right now - skip the node
688                             --idx;
689                         }
690                         else {
691                             if (slot.ptr()) {
692                                 // data node
693                                 m_pNode = pNode;
694                                 m_idx = idx;
695                                 m_pValue = slot.ptr();
696                                 return;
697                             }
698                             --idx;
699                         }
700                     }
701                     else {
702                         // up to parent node
703                         if (pNode->pParent) {
704                             idx = pNode->idxParent - 1;
705                             pNode = pNode->pParent;
706                             nodeSize = pNode->pParent ? arrayNodeSize : headSize;
707                         }
708                         else {
709                             // rend()
710                             assert(pNode == m_set->m_Head);
711                             assert(idx == endIdx);
712                             m_pNode = pNode;
713                             m_idx = idx;
714                             m_pValue = nullptr;
715                             return;
716                         }
717                     }
718                 }
719             }
720         };
721
722         template <class Iterator>
723         Iterator init_begin() const
724         {
725             return Iterator(*this, m_Head, size_t(0) - 1);
726         }
727
728         template <class Iterator>
729         Iterator init_end() const
730         {
731             return Iterator(*this, m_Head, head_size(), false);
732         }
733
734         template <class Iterator>
735         Iterator init_rbegin() const
736         {
737             return Iterator(*this, m_Head, head_size());
738         }
739
740         template <class Iterator>
741         Iterator init_rend() const
742         {
743             return Iterator(*this, m_Head, size_t(0) - 1, false);
744         }
745
746         /// Bidirectional iterator class
747         template <bool IsConst>
748         class bidirectional_iterator : protected iterator_base
749         {
750             friend class MultiLevelHashSet;
751
752         protected:
753             static CDS_CONSTEXPR bool const c_bConstantIterator = IsConst;
754
755         public:
756             typedef typename std::conditional< IsConst, value_type const*, value_type*>::type value_ptr; ///< Value pointer
757             typedef typename std::conditional< IsConst, value_type const&, value_type&>::type value_ref; ///< Value reference
758
759         public:
760             bidirectional_iterator() CDS_NOEXCEPT
761             {}
762
763             bidirectional_iterator(bidirectional_iterator const& rhs) CDS_NOEXCEPT
764                 : iterator_base(rhs)
765             {}
766
767             bidirectional_iterator& operator=(bidirectional_iterator const& rhs) CDS_NOEXCEPT
768             {
769                 iterator_base::operator=(rhs);
770                 return *this;
771             }
772
773             bidirectional_iterator& operator++()
774             {
775                 iterator_base::operator++();
776                 return *this;
777             }
778
779             bidirectional_iterator& operator--()
780             {
781                 iterator_base::operator--();
782                 return *this;
783             }
784
785             value_ptr operator ->() const CDS_NOEXCEPT
786             {
787                 return iterator_base::pointer();
788             }
789
790             value_ref operator *() const CDS_NOEXCEPT
791             {
792                 value_ptr p = iterator_base::pointer();
793                 assert(p);
794                 return *p;
795             }
796
797             template <bool IsConst2>
798             bool operator ==(bidirectional_iterator<IsConst2> const& rhs) const CDS_NOEXCEPT
799             {
800                 return iterator_base::operator==(rhs);
801             }
802
803             template <bool IsConst2>
804             bool operator !=(bidirectional_iterator<IsConst2> const& rhs) const CDS_NOEXCEPT
805             {
806                 return !(*this == rhs);
807             }
808
809         protected:
810             bidirectional_iterator(MultiLevelHashSet& set, array_node * pNode, size_t idx, bool)
811                 : iterator_base(set, pNode, idx, false)
812             {}
813
814             bidirectional_iterator(MultiLevelHashSet& set, array_node * pNode, size_t idx)
815                 : iterator_base(set, pNode, idx)
816             {}
817         };
818
819         /// Reverse bidirectional iterator
820         template <bool IsConst>
821         class reverse_bidirectional_iterator : public iterator_base
822         {
823             friend class MultiLevelHashSet;
824
825         public:
826             typedef typename std::conditional< IsConst, value_type const*, value_type*>::type value_ptr; ///< Value pointer
827             typedef typename std::conditional< IsConst, value_type const&, value_type&>::type value_ref; ///< Value reference
828
829         public:
830             reverse_bidirectional_iterator() CDS_NOEXCEPT
831                 : iterator_base()
832             {}
833
834             reverse_bidirectional_iterator(reverse_bidirectional_iterator const& rhs) CDS_NOEXCEPT
835                 : iterator_base(rhs)
836             {}
837
838             reverse_bidirectional_iterator& operator=(reverse_bidirectional_iterator const& rhs) CDS_NOEXCEPT
839             {
840                 iterator_base::operator=(rhs);
841                 return *this;
842             }
843
844             reverse_bidirectional_iterator& operator++()
845             {
846                 iterator_base::operator--();
847                 return *this;
848             }
849
850             reverse_bidirectional_iterator& operator--()
851             {
852                 iterator_base::operator++();
853                 return *this;
854             }
855
856             value_ptr operator ->() const CDS_NOEXCEPT
857             {
858                 return iterator_base::pointer();
859             }
860
861             value_ref operator *() const CDS_NOEXCEPT
862             {
863                 value_ptr p = iterator_base::pointer();
864                 assert(p);
865                 return *p;
866             }
867
868             template <bool IsConst2>
869             bool operator ==(reverse_bidirectional_iterator<IsConst2> const& rhs) const
870             {
871                 return iterator_base::operator==(rhs);
872             }
873
874             template <bool IsConst2>
875             bool operator !=(reverse_bidirectional_iterator<IsConst2> const& rhs)
876             {
877                 return !(*this == rhs);
878             }
879
880         private:
881             reverse_bidirectional_iterator(MultiLevelHashSet& set, array_node * pNode, size_t idx, bool)
882                 : iterator_base(set, pNode, idx, false)
883             {}
884
885             reverse_bidirectional_iterator(MultiLevelHashSet& set, array_node * pNode, size_t idx)
886                 : iterator_base(set, pNode, idx, false)
887             {
888                 iterator_base::backward();
889             }
890         };
891         //@endcond
892
893     public:
894 #ifdef CDS_DOXYGEN_INVOKED
895         typedef implementation_defined iterator;            ///< @ref cds_intrusive_MultilevelHashSet_rcu_iterators "bidirectional iterator" type
896         typedef implementation_defined const_iterator;      ///< @ref cds_intrusive_MultilevelHashSet_rcu_iterators "bidirectional const iterator" type
897         typedef implementation_defined reverse_iterator;    ///< @ref cds_intrusive_MultilevelHashSet_rcu_iterators "bidirectional reverse iterator" type
898         typedef implementation_defined const_reverse_iterator; ///< @ref cds_intrusive_MultilevelHashSet_rcu_iterators "bidirectional reverse const iterator" type
899 #else
900         typedef bidirectional_iterator<false>   iterator;
901         typedef bidirectional_iterator<true>    const_iterator;
902         typedef reverse_bidirectional_iterator<false>   reverse_iterator;
903         typedef reverse_bidirectional_iterator<true>    const_reverse_iterator;
904 #endif
905
906         ///@name Thread-safe iterators
907         /** @anchor cds_intrusive_MultilevelHashSet_rcu_iterators
908             The set supports thread-safe iterators: you may iterate over the set in multi-threaded environment
909             under explicit RCU lock.
910             RCU lock requirement means that inserting or searching is allowed but you must not erase the items from the set
911             since erasing under RCU lock can lead to a deadlock. However, another thread can call \p erase() safely
912             while your thread is iterating.
913
914             A typical example is:
915             \code
916             struct foo {
917                 uint32_t    hash;
918                 // ... other fields
919                 uint32_t    payload; // only for example
920             };
921             struct set_traits: cds::intrusive::multilevel_hashset::traits
922             {
923                 struct hash_accessor {
924                     uint32_t operator()( foo const& src ) const
925                     {
926                         retur src.hash;
927                     }
928                 };
929             };
930
931             typedef cds::urcu::gc< cds::urcu::general_buffered<>> rcu;
932             typedef cds::intrusive::MultiLevelHashSet< rcu, foo, set_traits > set_type;
933
934             set_type s;
935
936             // ...
937
938             // iterate over the set
939             {
940                 // lock the RCU.
941                 typename set_type::rcu_lock l; // scoped RCU lock
942
943                 // traverse the set
944                 for ( auto i = s.begin(); i != s.end(); ++i ) {
945                     // deal with i. Remember, erasing is prohibited here!
946                     i->payload++;
947                 }
948             } // at this point RCU lock is released
949             /endcode
950
951             Each iterator object supports the common interface:
952             - dereference operators:
953                 @code
954                 value_type [const] * operator ->() noexcept
955                 value_type [const] & operator *() noexcept
956                 @endcode
957             - pre-increment and pre-decrement. Post-operators is not supported
958             - equality operators <tt>==</tt> and <tt>!=</tt>.
959                 Iterators are equal iff they point to the same cell of the same array node.
960                 Note that for two iterators \p it1 and \p it2 the condition <tt> it1 == it2 </tt>
961                 does not entail <tt> &(*it1) == &(*it2) </tt>: welcome to concurrent containers
962
963             @note It is possible the item can be iterated more that once, for example, if an iterator points to the item
964             in an array node that is being splitted.
965         */
966         ///@{
967
968         /// Returns an iterator to the beginning of the set
969         iterator begin()
970         {
971             return iterator(*this, m_Head, size_t(0) - 1);
972         }
973
974         /// Returns an const iterator to the beginning of the set
975         const_iterator begin() const
976         {
977             return const_iterator(*this, m_Head, size_t(0) - 1);
978         }
979
980         /// Returns an const iterator to the beginning of the set
981         const_iterator cbegin()
982         {
983             return const_iterator(*this, m_Head, size_t(0) - 1);
984         }
985
986         /// Returns an iterator to the element following the last element of the set. This element acts as a placeholder; attempting to access it results in undefined behavior.
987         iterator end()
988         {
989             return iterator(*this, m_Head, head_size(), false);
990         }
991
992         /// Returns a const iterator to the element following the last element of the set. This element acts as a placeholder; attempting to access it results in undefined behavior.
993         const_iterator end() const
994         {
995             return const_iterator(*this, m_Head, head_size(), false);
996         }
997
998         /// Returns a const iterator to the element following the last element of the set. This element acts as a placeholder; attempting to access it results in undefined behavior.
999         const_iterator cend()
1000         {
1001             return const_iterator(*this, m_Head, head_size(), false);
1002         }
1003
1004         /// Returns a reverse iterator to the first element of the reversed set
1005         reverse_iterator rbegin()
1006         {
1007             return reverse_iterator(*this, m_Head, head_size());
1008         }
1009
1010         /// Returns a const reverse iterator to the first element of the reversed set
1011         const_reverse_iterator rbegin() const
1012         {
1013             return const_reverse_iterator(*this, m_Head, head_size());
1014         }
1015
1016         /// Returns a const reverse iterator to the first element of the reversed set
1017         const_reverse_iterator crbegin()
1018         {
1019             return const_reverse_iterator(*this, m_Head, head_size());
1020         }
1021
1022         /// Returns a reverse iterator to the element following the last element of the reversed set
1023         /**
1024         It corresponds to the element preceding the first element of the non-reversed container.
1025         This element acts as a placeholder, attempting to access it results in undefined behavior.
1026         */
1027         reverse_iterator rend()
1028         {
1029             return reverse_iterator(*this, m_Head, size_t(0) - 1, false);
1030         }
1031
1032         /// Returns a const reverse iterator to the element following the last element of the reversed set
1033         /**
1034         It corresponds to the element preceding the first element of the non-reversed container.
1035         This element acts as a placeholder, attempting to access it results in undefined behavior.
1036         */
1037         const_reverse_iterator rend() const
1038         {
1039             return const_reverse_iterator(*this, m_Head, size_t(0) - 1, false);
1040         }
1041
1042         /// Returns a const reverse iterator to the element following the last element of the reversed set
1043         /**
1044         It corresponds to the element preceding the first element of the non-reversed container.
1045         This element acts as a placeholder, attempting to access it results in undefined behavior.
1046         */
1047         const_reverse_iterator crend()
1048         {
1049             return const_reverse_iterator(*this, m_Head, size_t(0) - 1, false);
1050         }
1051         ///@}
1052
1053     protected:
1054         //@cond
1055         template <typename Func>
1056         std::pair<bool, bool> do_update(value_type& val, Func f, bool bInsert = true)
1057         {
1058             hash_type const& hash = hash_accessor()(val);
1059             hash_splitter splitter(hash);
1060             hash_comparator cmp;
1061             back_off bkoff;
1062
1063             array_node * pArr = m_Head;
1064             size_t nSlot = splitter.cut(m_Metrics.head_node_size_log);
1065             assert(nSlot < m_Metrics.head_node_size);
1066             size_t nOffset = m_Metrics.head_node_size_log;
1067             size_t nHeight = 1;
1068
1069             while (true) {
1070                 node_ptr slot = pArr->nodes[nSlot].load(memory_model::memory_order_acquire);
1071                 if (slot.bits() == flag_array_node) {
1072                     // array node, go down the tree
1073                     assert(slot.ptr() != nullptr);
1074                     nSlot = splitter.cut(m_Metrics.array_node_size_log);
1075                     assert(nSlot < m_Metrics.array_node_size);
1076                     pArr = to_array(slot.ptr());
1077                     nOffset += m_Metrics.array_node_size_log;
1078                     ++nHeight;
1079                 }
1080                 else if (slot.bits() == flag_array_converting) {
1081                     // the slot is converting to array node right now
1082                     bkoff();
1083                     m_Stat.onSlotConverting();
1084                 }
1085                 else {
1086                     // data node
1087                     assert(slot.bits() == 0);
1088
1089                     value_type * pOld = nullptr;
1090                     {
1091                         rcu_lock rcuLock;
1092
1093                         if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) == slot ) {
1094                             if ( slot.ptr()) {
1095                                 if (cmp(hash, hash_accessor()(*slot.ptr())) == 0) {
1096                                     // the item with that hash value already exists
1097                                     // Replace it with val
1098                                     if (slot.ptr() == &val) {
1099                                         m_Stat.onUpdateExisting();
1100                                         return std::make_pair(true, false);
1101                                     }
1102
1103                                     if (pArr->nodes[nSlot].compare_exchange_strong(slot, node_ptr(&val), memory_model::memory_order_release, atomics::memory_order_relaxed)) {
1104                                         // slot can be disposed
1105                                         f(val, slot.ptr());
1106                                         pOld = slot.ptr();
1107                                         m_Stat.onUpdateExisting();
1108                                         goto update_existing_done;
1109                                     }
1110
1111                                     m_Stat.onUpdateRetry();
1112                                     continue;
1113                                 }
1114
1115                                 // the slot must be expanded
1116                                 expand_slot(pArr, nSlot, slot, nOffset);
1117                             }
1118                             else {
1119                                 // the slot is empty, try to insert data node
1120                                 if (bInsert) {
1121                                     node_ptr pNull;
1122                                     if (pArr->nodes[nSlot].compare_exchange_strong(pNull, node_ptr(&val), memory_model::memory_order_release, atomics::memory_order_relaxed))
1123                                     {
1124                                         // the new data node has been inserted
1125                                         f(val, nullptr);
1126                                         ++m_ItemCounter;
1127                                         m_Stat.onUpdateNew();
1128                                         m_Stat.height(nHeight);
1129                                         return std::make_pair(true, true);
1130                                     }
1131                                 }
1132                                 else {
1133                                     m_Stat.onUpdateFailed();
1134                                     return std::make_pair(false, false);
1135                                 }
1136
1137                                 // insert failed - slot has been changed by another thread
1138                                 // retry updating
1139                                 m_Stat.onUpdateRetry();
1140                             }
1141                         }
1142                         else
1143                             m_Stat.onSlotChanged();
1144                         continue;
1145                     } // rcu_lock
1146
1147                     // update success
1148         update_existing_done:
1149                     if ( pOld )
1150                         gc::template retire_ptr<disposer>( pOld );
1151                     return std::make_pair(true, false);
1152                 } 
1153             } // while
1154         }
1155
1156         template <typename Predicate>
1157         value_type * do_erase( hash_type const& hash, Predicate pred)
1158         {
1159             assert(gc::is_locked());
1160
1161             hash_splitter splitter(hash);
1162             hash_comparator cmp;
1163             back_off bkoff;
1164
1165             array_node * pArr = m_Head;
1166             size_t nSlot = splitter.cut(m_Metrics.head_node_size_log);
1167             assert(nSlot < m_Metrics.head_node_size);
1168
1169             while (true) {
1170                 node_ptr slot = pArr->nodes[nSlot].load(memory_model::memory_order_acquire);
1171                 if (slot.bits() == flag_array_node) {
1172                     // array node, go down the tree
1173                     assert(slot.ptr() != nullptr);
1174                     nSlot = splitter.cut(m_Metrics.array_node_size_log);
1175                     assert(nSlot < m_Metrics.array_node_size);
1176                     pArr = to_array(slot.ptr());
1177                 }
1178                 else if (slot.bits() == flag_array_converting) {
1179                     // the slot is converting to array node right now
1180                     bkoff();
1181                     m_Stat.onSlotConverting();
1182                 }
1183                 else {
1184                     // data node
1185                     assert(slot.bits() == 0);
1186
1187                     if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) == slot ) {
1188                         if (slot.ptr()) {
1189                             if (cmp(hash, hash_accessor()(*slot.ptr())) == 0 && pred(*slot.ptr())) {
1190                                 // item found - replace it with nullptr
1191                                 if (pArr->nodes[nSlot].compare_exchange_strong(slot, node_ptr(nullptr), memory_model::memory_order_acquire, atomics::memory_order_relaxed)) {
1192                                     --m_ItemCounter;
1193                                     m_Stat.onEraseSuccess();
1194
1195                                     return slot.ptr();
1196                                 }
1197                                 m_Stat.onEraseRetry();
1198                                 continue;
1199                             }
1200                             m_Stat.onEraseFailed();
1201                             return nullptr;
1202                         }
1203                         else {
1204                             // the slot is empty
1205                             m_Stat.onEraseFailed();
1206                             return nullptr;
1207                         }
1208                     }
1209                     else
1210                         m_Stat.onSlotChanged();
1211                 }
1212             } // while
1213         }
1214
1215         value_type * search(hash_type const& hash )
1216         {
1217             assert( gc::is_locked() );
1218
1219             hash_splitter splitter(hash);
1220             hash_comparator cmp;
1221             back_off bkoff;
1222
1223             array_node * pArr = m_Head;
1224             size_t nSlot = splitter.cut(m_Metrics.head_node_size_log);
1225             assert(nSlot < m_Metrics.head_node_size);
1226
1227             while (true) {
1228                 node_ptr slot = pArr->nodes[nSlot].load(memory_model::memory_order_acquire);
1229                 if (slot.bits() == flag_array_node) {
1230                     // array node, go down the tree
1231                     assert(slot.ptr() != nullptr);
1232                     nSlot = splitter.cut(m_Metrics.array_node_size_log);
1233                     assert(nSlot < m_Metrics.array_node_size);
1234                     pArr = to_array(slot.ptr());
1235                 }
1236                 else if (slot.bits() == flag_array_converting) {
1237                     // the slot is converting to array node right now
1238                     bkoff();
1239                     m_Stat.onSlotConverting();
1240                 }
1241                 else {
1242                     // data node
1243                     assert(slot.bits() == 0);
1244
1245                     // protect data node by hazard pointer
1246                     if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) != slot) {
1247                         // slot value has been changed - retry
1248                         m_Stat.onSlotChanged();
1249                     }
1250                     else if (slot.ptr() && cmp(hash, hash_accessor()(*slot.ptr())) == 0) {
1251                         // item found
1252                         m_Stat.onFindSuccess();
1253                         return slot.ptr();
1254                     }
1255                     m_Stat.onFindFailed();
1256                     return nullptr;
1257                 }
1258             } // while
1259         }
1260
1261         //@endcond
1262
1263     private:
1264         //@cond
1265         array_node * alloc_head_node() const
1266         {
1267             return alloc_array_node(head_size(), nullptr, 0);
1268         }
1269
1270         array_node * alloc_array_node(array_node * pParent, size_t idxParent) const
1271         {
1272             return alloc_array_node(array_node_size(), pParent, idxParent);
1273         }
1274
1275         static array_node * alloc_array_node(size_t nSize, array_node * pParent, size_t idxParent)
1276         {
1277             array_node * pNode = cxx_array_node_allocator().NewBlock(sizeof(array_node) + sizeof(atomic_node_ptr) * (nSize - 1), pParent, idxParent);
1278             for (atomic_node_ptr * p = pNode->nodes, *pEnd = pNode->nodes + nSize; p < pEnd; ++p)
1279                 p->store(node_ptr(), memory_model::memory_order_release);
1280             return pNode;
1281         }
1282
1283         static void free_array_node(array_node * parr)
1284         {
1285             cxx_array_node_allocator().Delete(parr);
1286         }
1287
1288         void destroy_tree()
1289         {
1290             // The function is not thread-safe. For use in dtor only
1291             // Remove data node
1292             clear();
1293
1294             // Destroy all array nodes
1295             destroy_array_nodes(m_Head, head_size());
1296         }
1297
1298         void destroy_array_nodes(array_node * pArr, size_t nSize)
1299         {
1300             for (atomic_node_ptr * p = pArr->nodes, *pLast = pArr->nodes + nSize; p != pLast; ++p) {
1301                 node_ptr slot = p->load(memory_model::memory_order_acquire);
1302                 if (slot.bits() == flag_array_node) {
1303                     destroy_array_nodes(to_array(slot.ptr()), array_node_size());
1304                     free_array_node(to_array(slot.ptr()));
1305                     p->store(node_ptr(), memory_model::memory_order_relaxed);
1306                 }
1307             }
1308         }
1309
1310         void clear_array(array_node * pArrNode, size_t nSize)
1311         {
1312             back_off bkoff;
1313
1314
1315             for (atomic_node_ptr * pArr = pArrNode->nodes, *pLast = pArr + nSize; pArr != pLast; ++pArr) {
1316                 while (true) {
1317                     node_ptr slot = pArr->load(memory_model::memory_order_acquire);
1318                     if (slot.bits() == flag_array_node) {
1319                         // array node, go down the tree
1320                         assert(slot.ptr() != nullptr);
1321                         clear_array(to_array(slot.ptr()), array_node_size());
1322                         break;
1323                     }
1324                     else if (slot.bits() == flag_array_converting) {
1325                         // the slot is converting to array node right now
1326                         while ((slot = pArr->load(memory_model::memory_order_acquire)).bits() == flag_array_converting) {
1327                             bkoff();
1328                             m_Stat.onSlotConverting();
1329                         }
1330                         bkoff.reset();
1331
1332                         assert(slot.ptr() != nullptr);
1333                         assert(slot.bits() == flag_array_node);
1334                         clear_array(to_array(slot.ptr()), array_node_size());
1335                         break;
1336                     }
1337                     else {
1338                         // data node
1339                         if (pArr->compare_exchange_strong(slot, node_ptr(), memory_model::memory_order_acquire, atomics::memory_order_relaxed)) {
1340                             if (slot.ptr()) {
1341                                 gc::template retire_ptr<disposer>(slot.ptr());
1342                                 --m_ItemCounter;
1343                                 m_Stat.onEraseSuccess();
1344                             }
1345                             break;
1346                         }
1347                     }
1348                 }
1349             }
1350         }
1351
1352         bool expand_slot(array_node * pParent, size_t idxParent, node_ptr current, size_t nOffset)
1353         {
1354             assert(current.bits() == 0);
1355             assert(current.ptr());
1356
1357             size_t idx = hash_splitter(hash_accessor()(*current.ptr()), nOffset).cut(m_Metrics.array_node_size_log);
1358             array_node * pArr = alloc_array_node(pParent, idxParent);
1359
1360             node_ptr cur(current.ptr());
1361             atomic_node_ptr& slot = pParent->nodes[idxParent];
1362             if (!slot.compare_exchange_strong(cur, cur | flag_array_converting, memory_model::memory_order_release, atomics::memory_order_relaxed))
1363             {
1364                 m_Stat.onExpandNodeFailed();
1365                 free_array_node(pArr);
1366                 return false;
1367             }
1368
1369             pArr->nodes[idx].store(current, memory_model::memory_order_release);
1370
1371             cur = cur | flag_array_converting;
1372             CDS_VERIFY(
1373                 slot.compare_exchange_strong(cur, node_ptr(to_node(pArr), flag_array_node), memory_model::memory_order_release, atomics::memory_order_relaxed)
1374                 );
1375
1376             m_Stat.onExpandNodeSuccess();
1377             m_Stat.onArrayNodeCreated();
1378             return true;
1379         }
1380
1381         union converter {
1382             value_type * pData;
1383             array_node * pArr;
1384
1385             converter(value_type * p)
1386                 : pData(p)
1387             {}
1388
1389             converter(array_node * p)
1390                 : pArr(p)
1391             {}
1392         };
1393
1394         static array_node * to_array(value_type * p)
1395         {
1396             return converter(p).pArr;
1397         }
1398         static value_type * to_node(array_node * p)
1399         {
1400             return converter(p).pData;
1401         }
1402         //@endcond
1403     };
1404
1405 }} // namespace cds::intrusive
1406
1407 #endif  // #ifndef CDSLIB_INTRUSIVE_MULTILEVEL_HASHSET_RCU_H