52e79b2d9b20f07d9a1014b2c37c72153b58bdd3
[libcds.git] / cds / intrusive / feldman_hashset_rcu.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_INTRUSIVE_FELDMAN_HASHSET_RCU_H
4 #define CDSLIB_INTRUSIVE_FELDMAN_HASHSET_RCU_H
5
6 #include <functional>   // std::ref
7 #include <iterator>     // std::iterator_traits
8
9 #include <cds/intrusive/details/feldman_hashset_base.h>
10 #include <cds/details/allocator.h>
11 #include <cds/urcu/details/check_deadlock.h>
12 #include <cds/urcu/exempt_ptr.h>
13 #include <cds/intrusive/details/raw_ptr_disposer.h>
14
15
16 namespace cds { namespace intrusive {
17     /// Intrusive hash set based on multi-level array, \ref cds_urcu_desc "RCU" specialization
18     /** @ingroup cds_intrusive_map
19         @anchor cds_intrusive_FeldmanHashSet_rcu
20
21         Source:
22         - [2013] Steven Feldman, Pierre LaBorde, Damian Dechev "Concurrent Multi-level Arrays:
23             Wait-free Extensible Hash Maps"
24
25         See algorithm short description @ref cds_intrusive_FeldmanHashSet_hp "here"
26
27         @note Two important things you should keep in mind when you're using \p %FeldmanHashSet:
28         - all keys must be fixed-size. It means that you cannot use \p std::string as a key for \p %FeldmanHashSet.
29           Instead, for the strings you should use well-known hashing algorithms like <a href="https://en.wikipedia.org/wiki/Secure_Hash_Algorithm">SHA1, SHA2</a>,
30           <a href="https://en.wikipedia.org/wiki/MurmurHash">MurmurHash</a>, <a href="https://en.wikipedia.org/wiki/CityHash">CityHash</a>
31           or its successor <a href="https://code.google.com/p/farmhash/">FarmHash</a> and so on, which
32           converts variable-length strings to fixed-length bit-strings, and use that hash as a key in \p %FeldmanHashSet.
33         - \p %FeldmanHashSet uses a perfect hashing. It means that if two different keys, for example, of type \p std::string,
34           have identical hash then you cannot insert both that keys in the set. \p %FeldmanHashSet does not maintain the key,
35           it maintains its fixed-size hash value.
36
37         Template parameters:
38         - \p RCU - one of \ref cds_urcu_gc "RCU type"
39         - \p T - a value type to be stored in the set
40         - \p Traits - type traits, the structure based on \p feldman_hashset::traits or result of \p feldman_hashset::make_traits metafunction.
41         \p Traits is the mandatory argument because it has one mandatory type - an @ref feldman_hashset::traits::hash_accessor "accessor"
42         to hash value of \p T. The set algorithm does not calculate that hash value.
43
44         @note Before including <tt><cds/intrusive/feldman_hashset_rcu.h></tt> you should include appropriate RCU header file,
45         see \ref cds_urcu_gc "RCU type" for list of existing RCU class and corresponding header files.
46
47         The set supports @ref cds_intrusive_FeldmanHashSet_rcu_iterators "bidirectional thread-safe iterators" 
48         with some restrictions.
49     */
50     template <
51         class RCU,
52         class T,
53 #ifdef CDS_DOXYGEN_INVOKED
54         class Traits = feldman_hashset::traits
55 #else
56         class Traits
57 #endif
58     >
59     class FeldmanHashSet< cds::urcu::gc< RCU >, T, Traits >
60     {
61     public:
62         typedef cds::urcu::gc< RCU > gc; ///< RCU garbage collector
63         typedef T       value_type;      ///< type of value stored in the set
64         typedef Traits  traits;          ///< Traits template parameter
65
66         typedef typename traits::hash_accessor hash_accessor; ///< Hash accessor functor
67         static_assert(!std::is_same< hash_accessor, cds::opt::none >::value, "hash_accessor functor must be specified in Traits");
68
69         /// Hash type deduced from \p hash_accessor return type
70         typedef typename std::decay<
71             typename std::remove_reference<
72             decltype(hash_accessor()(std::declval<T>()))
73             >::type
74         >::type hash_type;
75         //typedef typename std::result_of< hash_accessor( std::declval<T>()) >::type hash_type;
76         static_assert(!std::is_pointer<hash_type>::value, "hash_accessor should return a reference to hash value");
77
78         typedef typename traits::disposer disposer; ///< data node disposer
79
80 #   ifdef CDS_DOXYGEN_INVOKED
81         typedef implementation_defined hash_comparator;    ///< hash compare functor based on opt::compare and opt::less option setter
82 #   else
83         typedef typename cds::opt::details::make_comparator_from<
84             hash_type,
85             traits,
86             feldman_hashset::bitwise_compare< hash_type >
87         >::type hash_comparator;
88 #   endif
89
90         typedef typename traits::item_counter   item_counter;   ///< Item counter type
91         typedef typename traits::node_allocator node_allocator; ///< Array node allocator
92         typedef typename traits::memory_model   memory_model;   ///< Memory model
93         typedef typename traits::back_off       back_off;       ///< Backoff strategy
94         typedef typename traits::stat           stat;           ///< Internal statistics type
95         typedef typename traits::rcu_check_deadlock rcu_check_deadlock; ///< Deadlock checking policy
96         typedef typename gc::scoped_lock       rcu_lock;        ///< RCU scoped lock
97         static CDS_CONSTEXPR const bool c_bExtractLockExternal = false; ///< Group of \p extract_xxx functions does not require external locking
98
99         using exempt_ptr = cds::urcu::exempt_ptr< gc, value_type, value_type, disposer, void >; ///< pointer to extracted node
100
101         /// Node marked poiter
102         typedef cds::details::marked_ptr< value_type, 3 > node_ptr;
103         /// Array node element
104         typedef atomics::atomic< node_ptr > atomic_node_ptr;
105
106     protected:
107         //@cond
108         enum node_flags {
109             flag_array_converting = 1,   ///< the cell is converting from data node to an array node
110             flag_array_node = 2          ///< the cell is a pointer to an array node
111         };
112
113         struct array_node {
114             array_node * const  pParent;    ///< parent array node
115             size_t const        idxParent;  ///< index in parent array node
116             atomic_node_ptr     nodes[1];   ///< node array
117
118             array_node(array_node * parent, size_t idx)
119                 : pParent(parent)
120                 , idxParent(idx)
121             {}
122
123             array_node() = delete;
124             array_node(array_node const&) = delete;
125             array_node(array_node&&) = delete;
126         };
127
128         typedef cds::details::Allocator< array_node, node_allocator > cxx_array_node_allocator;
129         typedef feldman_hashset::details::hash_splitter< hash_type > hash_splitter;
130         typedef cds::urcu::details::check_deadlock_policy< gc, rcu_check_deadlock> check_deadlock_policy;
131
132         //@endcond
133
134     private:
135         //@cond
136         feldman_hashset::details::metrics const m_Metrics;     ///< Metrics
137         array_node *      m_Head;        ///< Head array
138         item_counter      m_ItemCounter; ///< Item counter
139         stat              m_Stat;        ///< Internal statistics
140         //@endcond
141
142     public:
143         /// Creates empty set
144         /**
145             @param head_bits: 2<sup>head_bits</sup> specifies the size of head array, minimum is 4.
146             @param array_bits: 2<sup>array_bits</sup> specifies the size of array node, minimum is 2.
147
148             Equation for \p head_bits and \p array_bits:
149             \code
150             sizeof(hash_type) * 8 == head_bits + N * array_bits
151             \endcode
152             where \p N is multi-level array depth.
153         */
154         FeldmanHashSet(size_t head_bits = 8, size_t array_bits = 4)
155             : m_Metrics(feldman_hashset::details::metrics::make(head_bits, array_bits, sizeof(hash_type)))
156             , m_Head(alloc_head_node())
157         {}
158
159         /// Destructs the set and frees all data
160         ~FeldmanHashSet()
161         {
162             destroy_tree();
163             free_array_node(m_Head);
164         }
165
166         /// Inserts new node
167         /**
168             The function inserts \p val in the set if it does not contain
169             an item with that hash.
170
171             Returns \p true if \p val is placed into the set, \p false otherwise.
172
173             The function locks RCU internally.
174         */
175         bool insert( value_type& val )
176         {
177             return insert( val, [](value_type&) {} );
178         }
179
180         /// Inserts new node
181         /**
182             This function is intended for derived non-intrusive containers.
183
184             The function allows to split creating of new item into two part:
185             - create item with key only
186             - insert new item into the set
187             - if inserting is success, calls \p f functor to initialize \p val.
188
189             The functor signature is:
190             \code
191                 void func( value_type& val );
192             \endcode
193             where \p val is the item inserted.
194
195             The user-defined functor is called only if the inserting is success.
196
197             The function locks RCU internally.
198             @warning See \ref cds_intrusive_item_creating "insert item troubleshooting".
199         */
200         template <typename Func>
201         bool insert( value_type& val, Func f )
202         {
203             hash_type const& hash = hash_accessor()( val );
204             hash_splitter splitter( hash );
205             hash_comparator cmp;
206             back_off bkoff;
207
208             size_t nOffset = m_Metrics.head_node_size_log;
209             array_node * pArr = m_Head;
210             size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
211             assert( nSlot < m_Metrics.head_node_size );
212             size_t nHeight = 1;
213
214             while ( true ) {
215                 node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
216                 if ( slot.bits() == flag_array_node ) {
217                     // array node, go down the tree
218                     assert( slot.ptr() != nullptr );
219                     nSlot = splitter.cut( m_Metrics.array_node_size_log );
220                     assert( nSlot < m_Metrics.array_node_size );
221                     pArr = to_array( slot.ptr() );
222                     nOffset += m_Metrics.array_node_size_log;
223                     ++nHeight;
224                 }
225                 else if ( slot.bits() == flag_array_converting ) {
226                     // the slot is converting to array node right now
227                     bkoff();
228                     m_Stat.onSlotConverting();
229                 }
230                 else {
231                     // data node
232                     assert(slot.bits() == 0 );
233
234                     rcu_lock rcuLock;
235                     if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) == slot ) {
236                         if ( slot.ptr() ) {
237                             if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) {
238                                 // the item with that hash value already exists
239                                 m_Stat.onInsertFailed();
240                                 return false;
241                             }
242
243                             // the slot must be expanded
244                             expand_slot( pArr, nSlot, slot, nOffset );
245                         }
246                         else {
247                             // the slot is empty, try to insert data node
248                             node_ptr pNull;
249                             if ( pArr->nodes[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
250                             {
251                                 // the new data node has been inserted
252                                 f( val );
253                                 ++m_ItemCounter;
254                                 m_Stat.onInsertSuccess();
255                                 m_Stat.height( nHeight );
256                                 return true;
257                             }
258
259                             // insert failed - slot has been changed by another thread
260                             // retry inserting
261                             m_Stat.onInsertRetry();
262                         }
263                     }
264                     else
265                         m_Stat.onSlotChanged();
266                 }
267             } // while
268         }
269
270         /// Updates the node
271         /**
272             Performs inserting or updating the item with hash value equal to \p val.
273             - If hash value is found then existing item is replaced with \p val, old item is disposed
274               with \p Traits::disposer. Note that the disposer is called by \p GC asynchronously.
275               The function returns <tt> std::pair<true, false> </tt>
276             - If hash value is not found and \p bInsert is \p true then \p val is inserted,
277               the function returns <tt> std::pair<true, true> </tt>
278             - If hash value is not found and \p bInsert is \p false then the set is unchanged,
279               the function returns <tt> std::pair<false, false> </tt>
280
281             Returns <tt> std::pair<bool, bool> </tt> where \p first is \p true if operation is successfull
282             (i.e. the item has been inserted or updated),
283             \p second is \p true if new item has been added or \p false if the set contains that hash.
284
285             The function locks RCU internally.
286         */
287         std::pair<bool, bool> update( value_type& val, bool bInsert = true )
288         {
289             return do_update(val, [](value_type&, value_type *) {}, bInsert );
290         }
291
292         /// Unlinks the item \p val from the set
293         /**
294             The function searches the item \p val in the set and unlink it
295             if it is found and its address is equal to <tt>&val</tt>.
296
297             The function returns \p true if success and \p false otherwise.
298
299             RCU should not be locked. The function locks RCU internally.
300         */
301         bool unlink( value_type const& val )
302         {
303             check_deadlock_policy::check();
304
305             auto pred = [&val](value_type const& item) -> bool { return &item == &val; };
306             value_type * p;
307             {
308                 rcu_lock rcuLock;
309                 p = do_erase( hash_accessor()( val ), std::ref( pred ));
310             }
311             if ( p ) {
312                 gc::template retire_ptr<disposer>( p );
313                 return true;
314             }
315             return false;
316         }
317
318         /// Deletes the item from the set
319         /**
320             The function searches \p hash in the set,
321             unlinks the item found, and returns \p true.
322             If that item is not found the function returns \p false.
323
324             The \ref disposer specified in \p Traits is called by garbage collector \p GC asynchronously.
325
326             RCU should not be locked. The function locks RCU internally.
327         */
328         bool erase( hash_type const& hash )
329         {
330             return erase(hash, [](value_type const&) {} );
331         }
332
333         /// Deletes the item from the set
334         /**
335             The function searches \p hash in the set,
336             call \p f functor with item found, and unlinks it from the set.
337             The \ref disposer specified in \p Traits is called
338             by garbage collector \p GC asynchronously.
339
340             The \p Func interface is
341             \code
342             struct functor {
343                 void operator()( value_type& item );
344             };
345             \endcode
346
347             If \p hash is not found the function returns \p false.
348
349             RCU should not be locked. The function locks RCU internally.
350         */
351         template <typename Func>
352         bool erase( hash_type const& hash, Func f )
353         {
354             check_deadlock_policy::check();
355
356             value_type * p;
357
358             {
359                 rcu_lock rcuLock;
360                 p = do_erase( hash, []( value_type const&) -> bool { return true; } );
361             }
362
363             // p is guarded by HP
364             if ( p ) {
365                 f( *p );
366                 gc::template retire_ptr<disposer>(p);
367                 return true;
368             }
369             return false;
370         }
371
372         /// Extracts the item with specified \p hash
373         /**
374             The function searches \p hash in the set,
375             unlinks it from the set, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item found.
376             If the item with key equal to \p key is not found the function returns an empty \p exempt_ptr.
377
378             RCU \p synchronize method can be called. RCU should NOT be locked.
379             The function does not call the disposer for the item found.
380             The disposer will be implicitly invoked when the returned object is destroyed or when
381             its \p release() member function is called.
382             Example:
383             \code
384             typedef cds::intrusive::FeldmanHashSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > set_type;
385             set_type theSet;
386             // ...
387
388             typename set_type::exempt_ptr ep( theSet.extract( 5 ));
389             if ( ep ) {
390                 // Deal with ep
391                 //...
392
393                 // Dispose returned item.
394                 ep.release();
395             }
396             \endcode
397         */
398         exempt_ptr extract( hash_type const& hash )
399         {
400             check_deadlock_policy::check();
401
402             value_type * p;
403             {
404                 rcu_lock rcuLock;
405                 p = do_erase( hash, []( value_type const&) -> bool {return true;} );
406             }
407             return exempt_ptr( p );
408         }
409
410         /// Finds an item by it's \p hash
411         /**
412             The function searches the item by \p hash and calls the functor \p f for item found.
413             The interface of \p Func functor is:
414             \code
415             struct functor {
416                 void operator()( value_type& item );
417             };
418             \endcode
419             where \p item is the item found.
420
421             The functor may change non-key fields of \p item. Note that the functor is only guarantee
422             that \p item cannot be disposed during the functor is executing.
423             The functor does not serialize simultaneous access to the set's \p item. If such access is
424             possible you must provide your own synchronization schema on item level to prevent unsafe item modifications.
425
426             The function returns \p true if \p hash is found, \p false otherwise.
427
428             The function applies RCU lock internally.
429         */
430         template <typename Func>
431         bool find( hash_type const& hash, Func f )
432         {
433             rcu_lock rcuLock;
434
435             value_type * p = search( hash );
436             if ( p ) {
437                 f( *p );
438                 return true;
439             }
440             return false;
441         }
442
443         /// Checks whether the set contains \p hash
444         /**
445             The function searches the item by its \p hash
446             and returns \p true if it is found, or \p false otherwise.
447
448             The function applies RCU lock internally.
449         */
450         bool contains( hash_type const& hash )
451         {
452             return find( hash, [](value_type&) {} );
453         }
454
455         /// Finds an item by it's \p hash and returns the item found
456         /**
457             The function searches the item by its \p hash
458             and returns the pointer to the item found.
459             If \p hash is not found the function returns \p nullptr.
460
461             RCU should be locked before the function invocation.
462             Returned pointer is valid only while RCU is locked.
463
464             Usage:
465             \code
466             typedef cds::intrusive::FeldmanHashSet< your_template_params >  my_set;
467             my_set theSet;
468             // ...
469             {
470                 // lock RCU
471                 my_set::rcu_lock;
472
473                 foo * p = theSet.get( 5 );
474                 if ( p ) {
475                     // Deal with p
476                     //...
477                 }
478             }
479             \endcode
480         */
481         value_type * get( hash_type const& hash )
482         {
483             assert( gc::is_locked());
484             return search( hash );
485         }
486
487         /// Clears the set (non-atomic)
488         /**
489             The function unlink all data node from the set.
490             The function is not atomic but is thread-safe.
491             After \p %clear() the set may not be empty because another threads may insert items.
492
493             For each item the \p disposer is called after unlinking.
494         */
495         void clear()
496         {
497             clear_array( m_Head, head_size() );
498         }
499
500         /// Checks if the set is empty
501         /**
502             Emptiness is checked by item counting: if item count is zero then the set is empty.
503             Thus, the correct item counting feature is an important part of the set implementation.
504         */
505         bool empty() const
506         {
507             return size() == 0;
508         }
509
510         /// Returns item count in the set
511         size_t size() const
512         {
513             return m_ItemCounter;
514         }
515
516         /// Returns const reference to internal statistics
517         stat const& statistics() const
518         {
519             return m_Stat;
520         }
521
522         /// Returns the size of head node
523         size_t head_size() const
524         {
525             return m_Metrics.head_node_size;
526         }
527
528         /// Returns the size of the array node
529         size_t array_node_size() const
530         {
531             return m_Metrics.array_node_size;
532         }
533
534     protected:
535         //@cond
536         class iterator_base
537         {
538             friend class FeldmanHashSet;
539
540         protected:
541             array_node *        m_pNode;    ///< current array node
542             size_t              m_idx;      ///< current position in m_pNode
543             value_type *        m_pValue;   ///< current value
544             FeldmanHashSet const*  m_set;    ///< Hash set
545
546         public:
547             iterator_base() CDS_NOEXCEPT
548                 : m_pNode(nullptr)
549                 , m_idx(0)
550                 , m_pValue(nullptr)
551                 , m_set(nullptr)
552             {}
553
554             iterator_base(iterator_base const& rhs) CDS_NOEXCEPT
555                 : m_pNode(rhs.m_pNode)
556                 , m_idx(rhs.m_idx)
557                 , m_pValue(rhs.m_pValue)
558                 , m_set(rhs.m_set)
559             {}
560
561             iterator_base& operator=(iterator_base const& rhs) CDS_NOEXCEPT
562             {
563                 m_pNode = rhs.m_pNode;
564                 m_idx = rhs.m_idx;
565                 m_pValue = rhs.m_pValue;
566                 m_set = rhs.m_set;
567                 return *this;
568             }
569
570             iterator_base& operator++()
571             {
572                 forward();
573                 return *this;
574             }
575
576             iterator_base& operator--()
577             {
578                 backward();
579                 return *this;
580             }
581
582             bool operator ==(iterator_base const& rhs) const CDS_NOEXCEPT
583             {
584                 return m_pNode == rhs.m_pNode && m_idx == rhs.m_idx && m_set == rhs.m_set;
585             }
586
587             bool operator !=(iterator_base const& rhs) const CDS_NOEXCEPT
588             {
589                 return !(*this == rhs);
590             }
591
592         protected:
593             iterator_base(FeldmanHashSet const& set, array_node * pNode, size_t idx, bool)
594                 : m_pNode(pNode)
595                 , m_idx(idx)
596                 , m_pValue(nullptr)
597                 , m_set(&set)
598             {}
599
600             iterator_base(FeldmanHashSet const& set, array_node * pNode, size_t idx)
601                 : m_pNode(pNode)
602                 , m_idx(idx)
603                 , m_pValue(nullptr)
604                 , m_set(&set)
605             {
606                 forward();
607             }
608
609             value_type * pointer() const CDS_NOEXCEPT
610             {
611                 assert(gc::is_locked());
612                 return m_pValue;
613             }
614
615             void forward()
616             {
617                 assert( gc::is_locked());
618                 assert(m_set != nullptr);
619                 assert(m_pNode != nullptr);
620
621                 size_t const arrayNodeSize = m_set->array_node_size();
622                 size_t const headSize = m_set->head_size();
623                 array_node * pNode = m_pNode;
624                 size_t idx = m_idx + 1;
625                 size_t nodeSize = m_pNode->pParent ? arrayNodeSize : headSize;
626
627                 for (;;) {
628                     if (idx < nodeSize) {
629                         node_ptr slot = pNode->nodes[idx].load(memory_model::memory_order_acquire);
630                         if (slot.bits() == flag_array_node) {
631                             // array node, go down the tree
632                             assert(slot.ptr() != nullptr);
633                             pNode = to_array(slot.ptr());
634                             idx = 0;
635                             nodeSize = arrayNodeSize;
636                         }
637                         else if (slot.bits() == flag_array_converting) {
638                             // the slot is converting to array node right now - skip the node
639                             ++idx;
640                         }
641                         else {
642                             if (slot.ptr()) {
643                                 // data node
644                                 m_pNode = pNode;
645                                 m_idx = idx;
646                                 m_pValue = slot.ptr();
647                                 return;
648                             }
649                             ++idx;
650                         }
651                     }
652                     else {
653                         // up to parent node
654                         if (pNode->pParent) {
655                             idx = pNode->idxParent + 1;
656                             pNode = pNode->pParent;
657                             nodeSize = pNode->pParent ? arrayNodeSize : headSize;
658                         }
659                         else {
660                             // end()
661                             assert(pNode == m_set->m_Head);
662                             assert(idx == headSize);
663                             m_pNode = pNode;
664                             m_idx = idx;
665                             m_pValue = nullptr;
666                             return;
667                         }
668                     }
669                 }
670             }
671
672             void backward()
673             {
674                 assert(gc::is_locked());
675                 assert(m_set != nullptr);
676                 assert(m_pNode != nullptr);
677
678                 size_t const arrayNodeSize = m_set->array_node_size();
679                 size_t const headSize = m_set->head_size();
680                 size_t const endIdx = size_t(0) - 1;
681
682                 array_node * pNode = m_pNode;
683                 size_t idx = m_idx - 1;
684                 size_t nodeSize = m_pNode->pParent ? arrayNodeSize : headSize;
685
686                 for (;;) {
687                     if (idx != endIdx) {
688                         node_ptr slot = pNode->nodes[idx].load(memory_model::memory_order_acquire);
689                         if (slot.bits() == flag_array_node) {
690                             // array node, go down the tree
691                             assert(slot.ptr() != nullptr);
692                             pNode = to_array(slot.ptr());
693                             nodeSize = arrayNodeSize;
694                             idx = nodeSize - 1;
695                         }
696                         else if (slot.bits() == flag_array_converting) {
697                             // the slot is converting to array node right now - skip the node
698                             --idx;
699                         }
700                         else {
701                             if (slot.ptr()) {
702                                 // data node
703                                 m_pNode = pNode;
704                                 m_idx = idx;
705                                 m_pValue = slot.ptr();
706                                 return;
707                             }
708                             --idx;
709                         }
710                     }
711                     else {
712                         // up to parent node
713                         if (pNode->pParent) {
714                             idx = pNode->idxParent - 1;
715                             pNode = pNode->pParent;
716                             nodeSize = pNode->pParent ? arrayNodeSize : headSize;
717                         }
718                         else {
719                             // rend()
720                             assert(pNode == m_set->m_Head);
721                             assert(idx == endIdx);
722                             m_pNode = pNode;
723                             m_idx = idx;
724                             m_pValue = nullptr;
725                             return;
726                         }
727                     }
728                 }
729             }
730         };
731
732         template <class Iterator>
733         Iterator init_begin() const
734         {
735             return Iterator(*this, m_Head, size_t(0) - 1);
736         }
737
738         template <class Iterator>
739         Iterator init_end() const
740         {
741             return Iterator(*this, m_Head, head_size(), false);
742         }
743
744         template <class Iterator>
745         Iterator init_rbegin() const
746         {
747             return Iterator(*this, m_Head, head_size());
748         }
749
750         template <class Iterator>
751         Iterator init_rend() const
752         {
753             return Iterator(*this, m_Head, size_t(0) - 1, false);
754         }
755
756         /// Bidirectional iterator class
757         template <bool IsConst>
758         class bidirectional_iterator : protected iterator_base
759         {
760             friend class FeldmanHashSet;
761
762         protected:
763             static CDS_CONSTEXPR bool const c_bConstantIterator = IsConst;
764
765         public:
766             typedef typename std::conditional< IsConst, value_type const*, value_type*>::type value_ptr; ///< Value pointer
767             typedef typename std::conditional< IsConst, value_type const&, value_type&>::type value_ref; ///< Value reference
768
769         public:
770             bidirectional_iterator() CDS_NOEXCEPT
771             {}
772
773             bidirectional_iterator(bidirectional_iterator const& rhs) CDS_NOEXCEPT
774                 : iterator_base(rhs)
775             {}
776
777             bidirectional_iterator& operator=(bidirectional_iterator const& rhs) CDS_NOEXCEPT
778             {
779                 iterator_base::operator=(rhs);
780                 return *this;
781             }
782
783             bidirectional_iterator& operator++()
784             {
785                 iterator_base::operator++();
786                 return *this;
787             }
788
789             bidirectional_iterator& operator--()
790             {
791                 iterator_base::operator--();
792                 return *this;
793             }
794
795             value_ptr operator ->() const CDS_NOEXCEPT
796             {
797                 return iterator_base::pointer();
798             }
799
800             value_ref operator *() const CDS_NOEXCEPT
801             {
802                 value_ptr p = iterator_base::pointer();
803                 assert(p);
804                 return *p;
805             }
806
807             template <bool IsConst2>
808             bool operator ==(bidirectional_iterator<IsConst2> const& rhs) const CDS_NOEXCEPT
809             {
810                 return iterator_base::operator==(rhs);
811             }
812
813             template <bool IsConst2>
814             bool operator !=(bidirectional_iterator<IsConst2> const& rhs) const CDS_NOEXCEPT
815             {
816                 return !(*this == rhs);
817             }
818
819         protected:
820             bidirectional_iterator(FeldmanHashSet& set, array_node * pNode, size_t idx, bool)
821                 : iterator_base(set, pNode, idx, false)
822             {}
823
824             bidirectional_iterator(FeldmanHashSet& set, array_node * pNode, size_t idx)
825                 : iterator_base(set, pNode, idx)
826             {}
827         };
828
829         /// Reverse bidirectional iterator
830         template <bool IsConst>
831         class reverse_bidirectional_iterator : public iterator_base
832         {
833             friend class FeldmanHashSet;
834
835         public:
836             typedef typename std::conditional< IsConst, value_type const*, value_type*>::type value_ptr; ///< Value pointer
837             typedef typename std::conditional< IsConst, value_type const&, value_type&>::type value_ref; ///< Value reference
838
839         public:
840             reverse_bidirectional_iterator() CDS_NOEXCEPT
841                 : iterator_base()
842             {}
843
844             reverse_bidirectional_iterator(reverse_bidirectional_iterator const& rhs) CDS_NOEXCEPT
845                 : iterator_base(rhs)
846             {}
847
848             reverse_bidirectional_iterator& operator=(reverse_bidirectional_iterator const& rhs) CDS_NOEXCEPT
849             {
850                 iterator_base::operator=(rhs);
851                 return *this;
852             }
853
854             reverse_bidirectional_iterator& operator++()
855             {
856                 iterator_base::operator--();
857                 return *this;
858             }
859
860             reverse_bidirectional_iterator& operator--()
861             {
862                 iterator_base::operator++();
863                 return *this;
864             }
865
866             value_ptr operator ->() const CDS_NOEXCEPT
867             {
868                 return iterator_base::pointer();
869             }
870
871             value_ref operator *() const CDS_NOEXCEPT
872             {
873                 value_ptr p = iterator_base::pointer();
874                 assert(p);
875                 return *p;
876             }
877
878             template <bool IsConst2>
879             bool operator ==(reverse_bidirectional_iterator<IsConst2> const& rhs) const
880             {
881                 return iterator_base::operator==(rhs);
882             }
883
884             template <bool IsConst2>
885             bool operator !=(reverse_bidirectional_iterator<IsConst2> const& rhs)
886             {
887                 return !(*this == rhs);
888             }
889
890         private:
891             reverse_bidirectional_iterator(FeldmanHashSet& set, array_node * pNode, size_t idx, bool)
892                 : iterator_base(set, pNode, idx, false)
893             {}
894
895             reverse_bidirectional_iterator(FeldmanHashSet& set, array_node * pNode, size_t idx)
896                 : iterator_base(set, pNode, idx, false)
897             {
898                 iterator_base::backward();
899             }
900         };
901         //@endcond
902
903     public:
904 #ifdef CDS_DOXYGEN_INVOKED
905         typedef implementation_defined iterator;            ///< @ref cds_intrusive_FeldmanHashSet_rcu_iterators "bidirectional iterator" type
906         typedef implementation_defined const_iterator;      ///< @ref cds_intrusive_FeldmanHashSet_rcu_iterators "bidirectional const iterator" type
907         typedef implementation_defined reverse_iterator;    ///< @ref cds_intrusive_FeldmanHashSet_rcu_iterators "bidirectional reverse iterator" type
908         typedef implementation_defined const_reverse_iterator; ///< @ref cds_intrusive_FeldmanHashSet_rcu_iterators "bidirectional reverse const iterator" type
909 #else
910         typedef bidirectional_iterator<false>   iterator;
911         typedef bidirectional_iterator<true>    const_iterator;
912         typedef reverse_bidirectional_iterator<false>   reverse_iterator;
913         typedef reverse_bidirectional_iterator<true>    const_reverse_iterator;
914 #endif
915
916         ///@name Thread-safe iterators
917         /** @anchor cds_intrusive_FeldmanHashSet_rcu_iterators
918             The set supports thread-safe iterators: you may iterate over the set in multi-threaded environment
919             under explicit RCU lock.
920             RCU lock requirement means that inserting or searching is allowed but you must not erase the items from the set
921             since erasing under RCU lock can lead to a deadlock. However, another thread can call \p erase() safely
922             while your thread is iterating.
923
924             A typical example is:
925             \code
926             struct foo {
927                 uint32_t    hash;
928                 // ... other fields
929                 uint32_t    payload; // only for example
930             };
931             struct set_traits: cds::intrusive::feldman_hashset::traits
932             {
933                 struct hash_accessor {
934                     uint32_t operator()( foo const& src ) const
935                     {
936                         retur src.hash;
937                     }
938                 };
939             };
940
941             typedef cds::urcu::gc< cds::urcu::general_buffered<>> rcu;
942             typedef cds::intrusive::FeldmanHashSet< rcu, foo, set_traits > set_type;
943
944             set_type s;
945
946             // ...
947
948             // iterate over the set
949             {
950                 // lock the RCU.
951                 typename set_type::rcu_lock l; // scoped RCU lock
952
953                 // traverse the set
954                 for ( auto i = s.begin(); i != s.end(); ++i ) {
955                     // deal with i. Remember, erasing is prohibited here!
956                     i->payload++;
957                 }
958             } // at this point RCU lock is released
959             /endcode
960
961             Each iterator object supports the common interface:
962             - dereference operators:
963                 @code
964                 value_type [const] * operator ->() noexcept
965                 value_type [const] & operator *() noexcept
966                 @endcode
967             - pre-increment and pre-decrement. Post-operators is not supported
968             - equality operators <tt>==</tt> and <tt>!=</tt>.
969                 Iterators are equal iff they point to the same cell of the same array node.
970                 Note that for two iterators \p it1 and \p it2 the condition <tt> it1 == it2 </tt>
971                 does not entail <tt> &(*it1) == &(*it2) </tt>: welcome to concurrent containers
972
973             @note It is possible the item can be iterated more that once, for example, if an iterator points to the item
974             in an array node that is being splitted.
975         */
976         ///@{
977
978         /// Returns an iterator to the beginning of the set
979         iterator begin()
980         {
981             return iterator(*this, m_Head, size_t(0) - 1);
982         }
983
984         /// Returns an const iterator to the beginning of the set
985         const_iterator begin() const
986         {
987             return const_iterator(*this, m_Head, size_t(0) - 1);
988         }
989
990         /// Returns an const iterator to the beginning of the set
991         const_iterator cbegin()
992         {
993             return const_iterator(*this, m_Head, size_t(0) - 1);
994         }
995
996         /// Returns an iterator to the element following the last element of the set. This element acts as a placeholder; attempting to access it results in undefined behavior.
997         iterator end()
998         {
999             return iterator(*this, m_Head, head_size(), false);
1000         }
1001
1002         /// Returns a const iterator to the element following the last element of the set. This element acts as a placeholder; attempting to access it results in undefined behavior.
1003         const_iterator end() const
1004         {
1005             return const_iterator(*this, m_Head, head_size(), false);
1006         }
1007
1008         /// Returns a const iterator to the element following the last element of the set. This element acts as a placeholder; attempting to access it results in undefined behavior.
1009         const_iterator cend()
1010         {
1011             return const_iterator(*this, m_Head, head_size(), false);
1012         }
1013
1014         /// Returns a reverse iterator to the first element of the reversed set
1015         reverse_iterator rbegin()
1016         {
1017             return reverse_iterator(*this, m_Head, head_size());
1018         }
1019
1020         /// Returns a const reverse iterator to the first element of the reversed set
1021         const_reverse_iterator rbegin() const
1022         {
1023             return const_reverse_iterator(*this, m_Head, head_size());
1024         }
1025
1026         /// Returns a const reverse iterator to the first element of the reversed set
1027         const_reverse_iterator crbegin()
1028         {
1029             return const_reverse_iterator(*this, m_Head, head_size());
1030         }
1031
1032         /// Returns a reverse iterator to the element following the last element of the reversed set
1033         /**
1034         It corresponds to the element preceding the first element of the non-reversed container.
1035         This element acts as a placeholder, attempting to access it results in undefined behavior.
1036         */
1037         reverse_iterator rend()
1038         {
1039             return reverse_iterator(*this, m_Head, size_t(0) - 1, false);
1040         }
1041
1042         /// Returns a const reverse iterator to the element following the last element of the reversed set
1043         /**
1044         It corresponds to the element preceding the first element of the non-reversed container.
1045         This element acts as a placeholder, attempting to access it results in undefined behavior.
1046         */
1047         const_reverse_iterator rend() const
1048         {
1049             return const_reverse_iterator(*this, m_Head, size_t(0) - 1, false);
1050         }
1051
1052         /// Returns a const reverse iterator to the element following the last element of the reversed set
1053         /**
1054         It corresponds to the element preceding the first element of the non-reversed container.
1055         This element acts as a placeholder, attempting to access it results in undefined behavior.
1056         */
1057         const_reverse_iterator crend()
1058         {
1059             return const_reverse_iterator(*this, m_Head, size_t(0) - 1, false);
1060         }
1061         ///@}
1062
1063     protected:
1064         //@cond
1065         template <typename Func>
1066         std::pair<bool, bool> do_update(value_type& val, Func f, bool bInsert = true)
1067         {
1068             hash_type const& hash = hash_accessor()(val);
1069             hash_splitter splitter(hash);
1070             hash_comparator cmp;
1071             back_off bkoff;
1072
1073             array_node * pArr = m_Head;
1074             size_t nSlot = splitter.cut(m_Metrics.head_node_size_log);
1075             assert(nSlot < m_Metrics.head_node_size);
1076             size_t nOffset = m_Metrics.head_node_size_log;
1077             size_t nHeight = 1;
1078
1079             while (true) {
1080                 node_ptr slot = pArr->nodes[nSlot].load(memory_model::memory_order_acquire);
1081                 if (slot.bits() == flag_array_node) {
1082                     // array node, go down the tree
1083                     assert(slot.ptr() != nullptr);
1084                     nSlot = splitter.cut(m_Metrics.array_node_size_log);
1085                     assert(nSlot < m_Metrics.array_node_size);
1086                     pArr = to_array(slot.ptr());
1087                     nOffset += m_Metrics.array_node_size_log;
1088                     ++nHeight;
1089                 }
1090                 else if (slot.bits() == flag_array_converting) {
1091                     // the slot is converting to array node right now
1092                     bkoff();
1093                     m_Stat.onSlotConverting();
1094                 }
1095                 else {
1096                     // data node
1097                     assert(slot.bits() == 0);
1098
1099                     value_type * pOld = nullptr;
1100                     {
1101                         rcu_lock rcuLock;
1102
1103                         if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) == slot ) {
1104                             if ( slot.ptr()) {
1105                                 if (cmp(hash, hash_accessor()(*slot.ptr())) == 0) {
1106                                     // the item with that hash value already exists
1107                                     // Replace it with val
1108                                     if (slot.ptr() == &val) {
1109                                         m_Stat.onUpdateExisting();
1110                                         return std::make_pair(true, false);
1111                                     }
1112
1113                                     if (pArr->nodes[nSlot].compare_exchange_strong(slot, node_ptr(&val), memory_model::memory_order_release, atomics::memory_order_relaxed)) {
1114                                         // slot can be disposed
1115                                         f(val, slot.ptr());
1116                                         pOld = slot.ptr();
1117                                         m_Stat.onUpdateExisting();
1118                                         goto update_existing_done;
1119                                     }
1120
1121                                     m_Stat.onUpdateRetry();
1122                                     continue;
1123                                 }
1124
1125                                 // the slot must be expanded
1126                                 expand_slot(pArr, nSlot, slot, nOffset);
1127                             }
1128                             else {
1129                                 // the slot is empty, try to insert data node
1130                                 if (bInsert) {
1131                                     node_ptr pNull;
1132                                     if (pArr->nodes[nSlot].compare_exchange_strong(pNull, node_ptr(&val), memory_model::memory_order_release, atomics::memory_order_relaxed))
1133                                     {
1134                                         // the new data node has been inserted
1135                                         f(val, nullptr);
1136                                         ++m_ItemCounter;
1137                                         m_Stat.onUpdateNew();
1138                                         m_Stat.height(nHeight);
1139                                         return std::make_pair(true, true);
1140                                     }
1141                                 }
1142                                 else {
1143                                     m_Stat.onUpdateFailed();
1144                                     return std::make_pair(false, false);
1145                                 }
1146
1147                                 // insert failed - slot has been changed by another thread
1148                                 // retry updating
1149                                 m_Stat.onUpdateRetry();
1150                             }
1151                         }
1152                         else
1153                             m_Stat.onSlotChanged();
1154                         continue;
1155                     } // rcu_lock
1156
1157                     // update success
1158         update_existing_done:
1159                     if ( pOld )
1160                         gc::template retire_ptr<disposer>( pOld );
1161                     return std::make_pair(true, false);
1162                 } 
1163             } // while
1164         }
1165
1166         template <typename Predicate>
1167         value_type * do_erase( hash_type const& hash, Predicate pred)
1168         {
1169             assert(gc::is_locked());
1170
1171             hash_splitter splitter(hash);
1172             hash_comparator cmp;
1173             back_off bkoff;
1174
1175             array_node * pArr = m_Head;
1176             size_t nSlot = splitter.cut(m_Metrics.head_node_size_log);
1177             assert(nSlot < m_Metrics.head_node_size);
1178
1179             while (true) {
1180                 node_ptr slot = pArr->nodes[nSlot].load(memory_model::memory_order_acquire);
1181                 if (slot.bits() == flag_array_node) {
1182                     // array node, go down the tree
1183                     assert(slot.ptr() != nullptr);
1184                     nSlot = splitter.cut(m_Metrics.array_node_size_log);
1185                     assert(nSlot < m_Metrics.array_node_size);
1186                     pArr = to_array(slot.ptr());
1187                 }
1188                 else if (slot.bits() == flag_array_converting) {
1189                     // the slot is converting to array node right now
1190                     bkoff();
1191                     m_Stat.onSlotConverting();
1192                 }
1193                 else {
1194                     // data node
1195                     assert(slot.bits() == 0);
1196
1197                     if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) == slot ) {
1198                         if (slot.ptr()) {
1199                             if (cmp(hash, hash_accessor()(*slot.ptr())) == 0 && pred(*slot.ptr())) {
1200                                 // item found - replace it with nullptr
1201                                 if (pArr->nodes[nSlot].compare_exchange_strong(slot, node_ptr(nullptr), memory_model::memory_order_acquire, atomics::memory_order_relaxed)) {
1202                                     --m_ItemCounter;
1203                                     m_Stat.onEraseSuccess();
1204
1205                                     return slot.ptr();
1206                                 }
1207                                 m_Stat.onEraseRetry();
1208                                 continue;
1209                             }
1210                             m_Stat.onEraseFailed();
1211                             return nullptr;
1212                         }
1213                         else {
1214                             // the slot is empty
1215                             m_Stat.onEraseFailed();
1216                             return nullptr;
1217                         }
1218                     }
1219                     else
1220                         m_Stat.onSlotChanged();
1221                 }
1222             } // while
1223         }
1224
1225         value_type * search(hash_type const& hash )
1226         {
1227             assert( gc::is_locked() );
1228
1229             hash_splitter splitter(hash);
1230             hash_comparator cmp;
1231             back_off bkoff;
1232
1233             array_node * pArr = m_Head;
1234             size_t nSlot = splitter.cut(m_Metrics.head_node_size_log);
1235             assert(nSlot < m_Metrics.head_node_size);
1236
1237             while (true) {
1238                 node_ptr slot = pArr->nodes[nSlot].load(memory_model::memory_order_acquire);
1239                 if (slot.bits() == flag_array_node) {
1240                     // array node, go down the tree
1241                     assert(slot.ptr() != nullptr);
1242                     nSlot = splitter.cut(m_Metrics.array_node_size_log);
1243                     assert(nSlot < m_Metrics.array_node_size);
1244                     pArr = to_array(slot.ptr());
1245                 }
1246                 else if (slot.bits() == flag_array_converting) {
1247                     // the slot is converting to array node right now
1248                     bkoff();
1249                     m_Stat.onSlotConverting();
1250                 }
1251                 else {
1252                     // data node
1253                     assert(slot.bits() == 0);
1254
1255                     // protect data node by hazard pointer
1256                     if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) != slot) {
1257                         // slot value has been changed - retry
1258                         m_Stat.onSlotChanged();
1259                     }
1260                     else if (slot.ptr() && cmp(hash, hash_accessor()(*slot.ptr())) == 0) {
1261                         // item found
1262                         m_Stat.onFindSuccess();
1263                         return slot.ptr();
1264                     }
1265                     m_Stat.onFindFailed();
1266                     return nullptr;
1267                 }
1268             } // while
1269         }
1270
1271         //@endcond
1272
1273     private:
1274         //@cond
1275         array_node * alloc_head_node() const
1276         {
1277             return alloc_array_node(head_size(), nullptr, 0);
1278         }
1279
1280         array_node * alloc_array_node(array_node * pParent, size_t idxParent) const
1281         {
1282             return alloc_array_node(array_node_size(), pParent, idxParent);
1283         }
1284
1285         static array_node * alloc_array_node(size_t nSize, array_node * pParent, size_t idxParent)
1286         {
1287             array_node * pNode = cxx_array_node_allocator().NewBlock(sizeof(array_node) + sizeof(atomic_node_ptr) * (nSize - 1), pParent, idxParent);
1288             new (pNode->nodes) atomic_node_ptr[nSize];
1289             return pNode;
1290         }
1291
1292         static void free_array_node(array_node * parr)
1293         {
1294             cxx_array_node_allocator().Delete(parr);
1295         }
1296
1297         void destroy_tree()
1298         {
1299             // The function is not thread-safe. For use in dtor only
1300             // Remove data node
1301             clear();
1302
1303             // Destroy all array nodes
1304             destroy_array_nodes(m_Head, head_size());
1305         }
1306
1307         void destroy_array_nodes(array_node * pArr, size_t nSize)
1308         {
1309             for (atomic_node_ptr * p = pArr->nodes, *pLast = pArr->nodes + nSize; p != pLast; ++p) {
1310                 node_ptr slot = p->load(memory_model::memory_order_acquire);
1311                 if (slot.bits() == flag_array_node) {
1312                     destroy_array_nodes(to_array(slot.ptr()), array_node_size());
1313                     free_array_node(to_array(slot.ptr()));
1314                     p->store(node_ptr(), memory_model::memory_order_relaxed);
1315                 }
1316             }
1317         }
1318
1319         void clear_array(array_node * pArrNode, size_t nSize)
1320         {
1321             back_off bkoff;
1322
1323
1324             for (atomic_node_ptr * pArr = pArrNode->nodes, *pLast = pArr + nSize; pArr != pLast; ++pArr) {
1325                 while (true) {
1326                     node_ptr slot = pArr->load(memory_model::memory_order_acquire);
1327                     if (slot.bits() == flag_array_node) {
1328                         // array node, go down the tree
1329                         assert(slot.ptr() != nullptr);
1330                         clear_array(to_array(slot.ptr()), array_node_size());
1331                         break;
1332                     }
1333                     else if (slot.bits() == flag_array_converting) {
1334                         // the slot is converting to array node right now
1335                         while ((slot = pArr->load(memory_model::memory_order_acquire)).bits() == flag_array_converting) {
1336                             bkoff();
1337                             m_Stat.onSlotConverting();
1338                         }
1339                         bkoff.reset();
1340
1341                         assert(slot.ptr() != nullptr);
1342                         assert(slot.bits() == flag_array_node);
1343                         clear_array(to_array(slot.ptr()), array_node_size());
1344                         break;
1345                     }
1346                     else {
1347                         // data node
1348                         if (pArr->compare_exchange_strong(slot, node_ptr(), memory_model::memory_order_acquire, atomics::memory_order_relaxed)) {
1349                             if (slot.ptr()) {
1350                                 gc::template retire_ptr<disposer>(slot.ptr());
1351                                 --m_ItemCounter;
1352                                 m_Stat.onEraseSuccess();
1353                             }
1354                             break;
1355                         }
1356                     }
1357                 }
1358             }
1359         }
1360
1361         bool expand_slot(array_node * pParent, size_t idxParent, node_ptr current, size_t nOffset)
1362         {
1363             assert(current.bits() == 0);
1364             assert(current.ptr());
1365
1366             size_t idx = hash_splitter(hash_accessor()(*current.ptr()), nOffset).cut(m_Metrics.array_node_size_log);
1367             array_node * pArr = alloc_array_node(pParent, idxParent);
1368
1369             node_ptr cur(current.ptr());
1370             atomic_node_ptr& slot = pParent->nodes[idxParent];
1371             if (!slot.compare_exchange_strong(cur, cur | flag_array_converting, memory_model::memory_order_release, atomics::memory_order_relaxed))
1372             {
1373                 m_Stat.onExpandNodeFailed();
1374                 free_array_node(pArr);
1375                 return false;
1376             }
1377
1378             pArr->nodes[idx].store(current, memory_model::memory_order_release);
1379
1380             cur = cur | flag_array_converting;
1381             CDS_VERIFY(
1382                 slot.compare_exchange_strong(cur, node_ptr(to_node(pArr), flag_array_node), memory_model::memory_order_release, atomics::memory_order_relaxed)
1383                 );
1384
1385             m_Stat.onExpandNodeSuccess();
1386             m_Stat.onArrayNodeCreated();
1387             return true;
1388         }
1389
1390         union converter {
1391             value_type * pData;
1392             array_node * pArr;
1393
1394             converter(value_type * p)
1395                 : pData(p)
1396             {}
1397
1398             converter(array_node * p)
1399                 : pArr(p)
1400             {}
1401         };
1402
1403         static array_node * to_array(value_type * p)
1404         {
1405             return converter(p).pArr;
1406         }
1407         static value_type * to_node(array_node * p)
1408         {
1409             return converter(p).pData;
1410         }
1411         //@endcond
1412     };
1413
1414 }} // namespace cds::intrusive
1415
1416 #endif  // #ifndef CDSLIB_INTRUSIVE_FELDMAN_HASHSET_RCU_H