Merge branch 'dev' of github.com:khizmax/libcds into dev
[libcds.git] / cds / intrusive / feldman_hashset_rcu.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_INTRUSIVE_FELDMAN_HASHSET_RCU_H
4 #define CDSLIB_INTRUSIVE_FELDMAN_HASHSET_RCU_H
5
6 #include <functional>   // std::ref
7 #include <iterator>     // std::iterator_traits
8 #include <vector>
9
10 #include <cds/intrusive/details/feldman_hashset_base.h>
11 #include <cds/details/allocator.h>
12 #include <cds/urcu/details/check_deadlock.h>
13 #include <cds/urcu/exempt_ptr.h>
14 #include <cds/intrusive/details/raw_ptr_disposer.h>
15
16
17 namespace cds { namespace intrusive {
18     /// Intrusive hash set based on multi-level array, \ref cds_urcu_desc "RCU" specialization
19     /** @ingroup cds_intrusive_map
20         @anchor cds_intrusive_FeldmanHashSet_rcu
21
22         Source:
23         - [2013] Steven Feldman, Pierre LaBorde, Damian Dechev "Concurrent Multi-level Arrays:
24             Wait-free Extensible Hash Maps"
25
26         See algorithm short description @ref cds_intrusive_FeldmanHashSet_hp "here"
27
28         @note Two important things you should keep in mind when you're using \p %FeldmanHashSet:
29         - all keys must be fixed-size. It means that you cannot use \p std::string as a key for \p %FeldmanHashSet.
30           Instead, for the strings you should use well-known hashing algorithms like <a href="https://en.wikipedia.org/wiki/Secure_Hash_Algorithm">SHA1, SHA2</a>,
31           <a href="https://en.wikipedia.org/wiki/MurmurHash">MurmurHash</a>, <a href="https://en.wikipedia.org/wiki/CityHash">CityHash</a>
32           or its successor <a href="https://code.google.com/p/farmhash/">FarmHash</a> and so on, which
33           converts variable-length strings to fixed-length bit-strings, and use that hash as a key in \p %FeldmanHashSet.
34         - \p %FeldmanHashSet uses a perfect hashing. It means that if two different keys, for example, of type \p std::string,
35           have identical hash then you cannot insert both that keys in the set. \p %FeldmanHashSet does not maintain the key,
36           it maintains its fixed-size hash value.
37
38         Template parameters:
39         - \p RCU - one of \ref cds_urcu_gc "RCU type"
40         - \p T - a value type to be stored in the set
41         - \p Traits - type traits, the structure based on \p feldman_hashset::traits or result of \p feldman_hashset::make_traits metafunction.
42         \p Traits is the mandatory argument because it has one mandatory type - an @ref feldman_hashset::traits::hash_accessor "accessor"
43         to hash value of \p T. The set algorithm does not calculate that hash value.
44
45         @note Before including <tt><cds/intrusive/feldman_hashset_rcu.h></tt> you should include appropriate RCU header file,
46         see \ref cds_urcu_gc "RCU type" for list of existing RCU class and corresponding header files.
47
48         The set supports @ref cds_intrusive_FeldmanHashSet_rcu_iterators "bidirectional thread-safe iterators" 
49         with some restrictions.
50     */
51     template <
52         class RCU,
53         class T,
54 #ifdef CDS_DOXYGEN_INVOKED
55         class Traits = feldman_hashset::traits
56 #else
57         class Traits
58 #endif
59     >
60     class FeldmanHashSet< cds::urcu::gc< RCU >, T, Traits >
61     {
62     public:
63         typedef cds::urcu::gc< RCU > gc; ///< RCU garbage collector
64         typedef T       value_type;      ///< type of value stored in the set
65         typedef Traits  traits;          ///< Traits template parameter
66
67         typedef typename traits::hash_accessor hash_accessor; ///< Hash accessor functor
68         static_assert(!std::is_same< hash_accessor, cds::opt::none >::value, "hash_accessor functor must be specified in Traits");
69
70         /// Hash type deduced from \p hash_accessor return type
71         typedef typename std::decay<
72             typename std::remove_reference<
73             decltype(hash_accessor()(std::declval<T>()))
74             >::type
75         >::type hash_type;
76         //typedef typename std::result_of< hash_accessor( std::declval<T>()) >::type hash_type;
77         static_assert(!std::is_pointer<hash_type>::value, "hash_accessor should return a reference to hash value");
78
79         typedef typename traits::disposer disposer; ///< data node disposer
80
81 #   ifdef CDS_DOXYGEN_INVOKED
82         typedef implementation_defined hash_comparator;    ///< hash compare functor based on opt::compare and opt::less option setter
83 #   else
84         typedef typename cds::opt::details::make_comparator_from<
85             hash_type,
86             traits,
87             feldman_hashset::bitwise_compare< hash_type >
88         >::type hash_comparator;
89 #   endif
90
91         typedef typename traits::item_counter   item_counter;   ///< Item counter type
92         typedef typename traits::node_allocator node_allocator; ///< Array node allocator
93         typedef typename traits::memory_model   memory_model;   ///< Memory model
94         typedef typename traits::back_off       back_off;       ///< Backoff strategy
95         typedef typename traits::stat           stat;           ///< Internal statistics type
96         typedef typename traits::rcu_check_deadlock rcu_check_deadlock; ///< Deadlock checking policy
97         typedef typename gc::scoped_lock       rcu_lock;        ///< RCU scoped lock
98         static CDS_CONSTEXPR const bool c_bExtractLockExternal = false; ///< Group of \p extract_xxx functions does not require external locking
99
100         using exempt_ptr = cds::urcu::exempt_ptr< gc, value_type, value_type, disposer, void >; ///< pointer to extracted node
101
102         /// Node marked poiter
103         typedef cds::details::marked_ptr< value_type, 3 > node_ptr;
104         /// Array node element
105         typedef atomics::atomic< node_ptr > atomic_node_ptr;
106
107     protected:
108         //@cond
109         enum node_flags {
110             flag_array_converting = 1,   ///< the cell is converting from data node to an array node
111             flag_array_node = 2          ///< the cell is a pointer to an array node
112         };
113
114         struct array_node {
115             array_node * const  pParent;    ///< parent array node
116             size_t const        idxParent;  ///< index in parent array node
117             atomic_node_ptr     nodes[1];   ///< node array
118
119             array_node(array_node * parent, size_t idx)
120                 : pParent(parent)
121                 , idxParent(idx)
122             {}
123
124             array_node() = delete;
125             array_node(array_node const&) = delete;
126             array_node(array_node&&) = delete;
127         };
128
129         typedef cds::details::Allocator< array_node, node_allocator > cxx_array_node_allocator;
130         typedef feldman_hashset::details::hash_splitter< hash_type > hash_splitter;
131         typedef cds::urcu::details::check_deadlock_policy< gc, rcu_check_deadlock> check_deadlock_policy;
132
133         //@endcond
134
135     private:
136         //@cond
137         feldman_hashset::details::metrics const m_Metrics;     ///< Metrics
138         array_node *      m_Head;        ///< Head array
139         item_counter      m_ItemCounter; ///< Item counter
140         stat              m_Stat;        ///< Internal statistics
141         //@endcond
142
143     public:
144         /// Creates empty set
145         /**
146             @param head_bits: 2<sup>head_bits</sup> specifies the size of head array, minimum is 4.
147             @param array_bits: 2<sup>array_bits</sup> specifies the size of array node, minimum is 2.
148
149             Equation for \p head_bits and \p array_bits:
150             \code
151             sizeof(hash_type) * 8 == head_bits + N * array_bits
152             \endcode
153             where \p N is multi-level array depth.
154         */
155         FeldmanHashSet(size_t head_bits = 8, size_t array_bits = 4)
156             : m_Metrics(feldman_hashset::details::metrics::make(head_bits, array_bits, sizeof(hash_type)))
157             , m_Head(alloc_head_node())
158         {}
159
160         /// Destructs the set and frees all data
161         ~FeldmanHashSet()
162         {
163             destroy_tree();
164             free_array_node(m_Head);
165         }
166
167         /// Inserts new node
168         /**
169             The function inserts \p val in the set if it does not contain
170             an item with that hash.
171
172             Returns \p true if \p val is placed into the set, \p false otherwise.
173
174             The function locks RCU internally.
175         */
176         bool insert( value_type& val )
177         {
178             return insert( val, [](value_type&) {} );
179         }
180
181         /// Inserts new node
182         /**
183             This function is intended for derived non-intrusive containers.
184
185             The function allows to split creating of new item into two part:
186             - create item with key only
187             - insert new item into the set
188             - if inserting is success, calls \p f functor to initialize \p val.
189
190             The functor signature is:
191             \code
192                 void func( value_type& val );
193             \endcode
194             where \p val is the item inserted.
195
196             The user-defined functor is called only if the inserting is success.
197
198             The function locks RCU internally.
199             @warning See \ref cds_intrusive_item_creating "insert item troubleshooting".
200         */
201         template <typename Func>
202         bool insert( value_type& val, Func f )
203         {
204             hash_type const& hash = hash_accessor()( val );
205             hash_splitter splitter( hash );
206             hash_comparator cmp;
207             back_off bkoff;
208
209             size_t nOffset = m_Metrics.head_node_size_log;
210             array_node * pArr = m_Head;
211             size_t nSlot = splitter.cut( m_Metrics.head_node_size_log );
212             assert( nSlot < m_Metrics.head_node_size );
213             size_t nHeight = 1;
214
215             while ( true ) {
216                 node_ptr slot = pArr->nodes[nSlot].load( memory_model::memory_order_acquire );
217                 if ( slot.bits() == flag_array_node ) {
218                     // array node, go down the tree
219                     assert( slot.ptr() != nullptr );
220                     nSlot = splitter.cut( m_Metrics.array_node_size_log );
221                     assert( nSlot < m_Metrics.array_node_size );
222                     pArr = to_array( slot.ptr() );
223                     nOffset += m_Metrics.array_node_size_log;
224                     ++nHeight;
225                 }
226                 else if ( slot.bits() == flag_array_converting ) {
227                     // the slot is converting to array node right now
228                     bkoff();
229                     m_Stat.onSlotConverting();
230                 }
231                 else {
232                     // data node
233                     assert(slot.bits() == 0 );
234
235                     rcu_lock rcuLock;
236                     if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) == slot ) {
237                         if ( slot.ptr() ) {
238                             if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) {
239                                 // the item with that hash value already exists
240                                 m_Stat.onInsertFailed();
241                                 return false;
242                             }
243
244                             // the slot must be expanded
245                             expand_slot( pArr, nSlot, slot, nOffset );
246                         }
247                         else {
248                             // the slot is empty, try to insert data node
249                             node_ptr pNull;
250                             if ( pArr->nodes[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
251                             {
252                                 // the new data node has been inserted
253                                 f( val );
254                                 ++m_ItemCounter;
255                                 m_Stat.onInsertSuccess();
256                                 m_Stat.height( nHeight );
257                                 return true;
258                             }
259
260                             // insert failed - slot has been changed by another thread
261                             // retry inserting
262                             m_Stat.onInsertRetry();
263                         }
264                     }
265                     else
266                         m_Stat.onSlotChanged();
267                 }
268             } // while
269         }
270
271         /// Updates the node
272         /**
273             Performs inserting or updating the item with hash value equal to \p val.
274             - If hash value is found then existing item is replaced with \p val, old item is disposed
275               with \p Traits::disposer. Note that the disposer is called by \p GC asynchronously.
276               The function returns <tt> std::pair<true, false> </tt>
277             - If hash value is not found and \p bInsert is \p true then \p val is inserted,
278               the function returns <tt> std::pair<true, true> </tt>
279             - If hash value is not found and \p bInsert is \p false then the set is unchanged,
280               the function returns <tt> std::pair<false, false> </tt>
281
282             Returns <tt> std::pair<bool, bool> </tt> where \p first is \p true if operation is successfull
283             (i.e. the item has been inserted or updated),
284             \p second is \p true if new item has been added or \p false if the set contains that hash.
285
286             The function locks RCU internally.
287         */
288         std::pair<bool, bool> update( value_type& val, bool bInsert = true )
289         {
290             return do_update(val, [](value_type&, value_type *) {}, bInsert );
291         }
292
293         /// Unlinks the item \p val from the set
294         /**
295             The function searches the item \p val in the set and unlink it
296             if it is found and its address is equal to <tt>&val</tt>.
297
298             The function returns \p true if success and \p false otherwise.
299
300             RCU should not be locked. The function locks RCU internally.
301         */
302         bool unlink( value_type const& val )
303         {
304             check_deadlock_policy::check();
305
306             auto pred = [&val](value_type const& item) -> bool { return &item == &val; };
307             value_type * p;
308             {
309                 rcu_lock rcuLock;
310                 p = do_erase( hash_accessor()( val ), std::ref( pred ));
311             }
312             if ( p ) {
313                 gc::template retire_ptr<disposer>( p );
314                 return true;
315             }
316             return false;
317         }
318
319         /// Deletes the item from the set
320         /**
321             The function searches \p hash in the set,
322             unlinks the item found, and returns \p true.
323             If that item is not found the function returns \p false.
324
325             The \ref disposer specified in \p Traits is called by garbage collector \p GC asynchronously.
326
327             RCU should not be locked. The function locks RCU internally.
328         */
329         bool erase( hash_type const& hash )
330         {
331             return erase(hash, [](value_type const&) {} );
332         }
333
334         /// Deletes the item from the set
335         /**
336             The function searches \p hash in the set,
337             call \p f functor with item found, and unlinks it from the set.
338             The \ref disposer specified in \p Traits is called
339             by garbage collector \p GC asynchronously.
340
341             The \p Func interface is
342             \code
343             struct functor {
344                 void operator()( value_type& item );
345             };
346             \endcode
347
348             If \p hash is not found the function returns \p false.
349
350             RCU should not be locked. The function locks RCU internally.
351         */
352         template <typename Func>
353         bool erase( hash_type const& hash, Func f )
354         {
355             check_deadlock_policy::check();
356
357             value_type * p;
358
359             {
360                 rcu_lock rcuLock;
361                 p = do_erase( hash, []( value_type const&) -> bool { return true; } );
362             }
363
364             // p is guarded by HP
365             if ( p ) {
366                 f( *p );
367                 gc::template retire_ptr<disposer>(p);
368                 return true;
369             }
370             return false;
371         }
372
373         /// Extracts the item with specified \p hash
374         /**
375             The function searches \p hash in the set,
376             unlinks it from the set, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item found.
377             If the item with key equal to \p key is not found the function returns an empty \p exempt_ptr.
378
379             RCU \p synchronize method can be called. RCU should NOT be locked.
380             The function does not call the disposer for the item found.
381             The disposer will be implicitly invoked when the returned object is destroyed or when
382             its \p release() member function is called.
383             Example:
384             \code
385             typedef cds::intrusive::FeldmanHashSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > set_type;
386             set_type theSet;
387             // ...
388
389             typename set_type::exempt_ptr ep( theSet.extract( 5 ));
390             if ( ep ) {
391                 // Deal with ep
392                 //...
393
394                 // Dispose returned item.
395                 ep.release();
396             }
397             \endcode
398         */
399         exempt_ptr extract( hash_type const& hash )
400         {
401             check_deadlock_policy::check();
402
403             value_type * p;
404             {
405                 rcu_lock rcuLock;
406                 p = do_erase( hash, []( value_type const&) -> bool {return true;} );
407             }
408             return exempt_ptr( p );
409         }
410
411         /// Finds an item by it's \p hash
412         /**
413             The function searches the item by \p hash and calls the functor \p f for item found.
414             The interface of \p Func functor is:
415             \code
416             struct functor {
417                 void operator()( value_type& item );
418             };
419             \endcode
420             where \p item is the item found.
421
422             The functor may change non-key fields of \p item. Note that the functor is only guarantee
423             that \p item cannot be disposed during the functor is executing.
424             The functor does not serialize simultaneous access to the set's \p item. If such access is
425             possible you must provide your own synchronization schema on item level to prevent unsafe item modifications.
426
427             The function returns \p true if \p hash is found, \p false otherwise.
428
429             The function applies RCU lock internally.
430         */
431         template <typename Func>
432         bool find( hash_type const& hash, Func f )
433         {
434             rcu_lock rcuLock;
435
436             value_type * p = search( hash );
437             if ( p ) {
438                 f( *p );
439                 return true;
440             }
441             return false;
442         }
443
444         /// Checks whether the set contains \p hash
445         /**
446             The function searches the item by its \p hash
447             and returns \p true if it is found, or \p false otherwise.
448
449             The function applies RCU lock internally.
450         */
451         bool contains( hash_type const& hash )
452         {
453             return find( hash, [](value_type&) {} );
454         }
455
456         /// Finds an item by it's \p hash and returns the item found
457         /**
458             The function searches the item by its \p hash
459             and returns the pointer to the item found.
460             If \p hash is not found the function returns \p nullptr.
461
462             RCU should be locked before the function invocation.
463             Returned pointer is valid only while RCU is locked.
464
465             Usage:
466             \code
467             typedef cds::intrusive::FeldmanHashSet< your_template_params >  my_set;
468             my_set theSet;
469             // ...
470             {
471                 // lock RCU
472                 my_set::rcu_lock;
473
474                 foo * p = theSet.get( 5 );
475                 if ( p ) {
476                     // Deal with p
477                     //...
478                 }
479             }
480             \endcode
481         */
482         value_type * get( hash_type const& hash )
483         {
484             assert( gc::is_locked());
485             return search( hash );
486         }
487
488         /// Clears the set (non-atomic)
489         /**
490             The function unlink all data node from the set.
491             The function is not atomic but is thread-safe.
492             After \p %clear() the set may not be empty because another threads may insert items.
493
494             For each item the \p disposer is called after unlinking.
495         */
496         void clear()
497         {
498             clear_array( m_Head, head_size() );
499         }
500
501         /// Checks if the set is empty
502         /**
503             Emptiness is checked by item counting: if item count is zero then the set is empty.
504             Thus, the correct item counting feature is an important part of the set implementation.
505         */
506         bool empty() const
507         {
508             return size() == 0;
509         }
510
511         /// Returns item count in the set
512         size_t size() const
513         {
514             return m_ItemCounter;
515         }
516
517         /// Returns const reference to internal statistics
518         stat const& statistics() const
519         {
520             return m_Stat;
521         }
522
523         /// Returns the size of head node
524         size_t head_size() const
525         {
526             return m_Metrics.head_node_size;
527         }
528
529         /// Returns the size of the array node
530         size_t array_node_size() const
531         {
532             return m_Metrics.array_node_size;
533         }
534
535         /// Collects tree level statistics into \p stat
536         /** @copydetails cds::intrusive::FeldmanHashSet::get_level_statistics
537         */
538         void get_level_statistics(std::vector<feldman_hashset::level_statistics>& stat) const
539         {
540             stat.clear();
541             gather_level_statistics(stat, 0, m_Head, head_size());
542         }
543
544     protected:
545         //@cond
546         class iterator_base
547         {
548             friend class FeldmanHashSet;
549
550         protected:
551             array_node *        m_pNode;    ///< current array node
552             size_t              m_idx;      ///< current position in m_pNode
553             value_type *        m_pValue;   ///< current value
554             FeldmanHashSet const*  m_set;    ///< Hash set
555
556         public:
557             iterator_base() CDS_NOEXCEPT
558                 : m_pNode(nullptr)
559                 , m_idx(0)
560                 , m_pValue(nullptr)
561                 , m_set(nullptr)
562             {}
563
564             iterator_base(iterator_base const& rhs) CDS_NOEXCEPT
565                 : m_pNode(rhs.m_pNode)
566                 , m_idx(rhs.m_idx)
567                 , m_pValue(rhs.m_pValue)
568                 , m_set(rhs.m_set)
569             {}
570
571             iterator_base& operator=(iterator_base const& rhs) CDS_NOEXCEPT
572             {
573                 m_pNode = rhs.m_pNode;
574                 m_idx = rhs.m_idx;
575                 m_pValue = rhs.m_pValue;
576                 m_set = rhs.m_set;
577                 return *this;
578             }
579
580             iterator_base& operator++()
581             {
582                 forward();
583                 return *this;
584             }
585
586             iterator_base& operator--()
587             {
588                 backward();
589                 return *this;
590             }
591
592             bool operator ==(iterator_base const& rhs) const CDS_NOEXCEPT
593             {
594                 return m_pNode == rhs.m_pNode && m_idx == rhs.m_idx && m_set == rhs.m_set;
595             }
596
597             bool operator !=(iterator_base const& rhs) const CDS_NOEXCEPT
598             {
599                 return !(*this == rhs);
600             }
601
602         protected:
603             iterator_base(FeldmanHashSet const& set, array_node * pNode, size_t idx, bool)
604                 : m_pNode(pNode)
605                 , m_idx(idx)
606                 , m_pValue(nullptr)
607                 , m_set(&set)
608             {}
609
610             iterator_base(FeldmanHashSet const& set, array_node * pNode, size_t idx)
611                 : m_pNode(pNode)
612                 , m_idx(idx)
613                 , m_pValue(nullptr)
614                 , m_set(&set)
615             {
616                 forward();
617             }
618
619             value_type * pointer() const CDS_NOEXCEPT
620             {
621                 assert(gc::is_locked());
622                 return m_pValue;
623             }
624
625             void forward()
626             {
627                 assert( gc::is_locked());
628                 assert(m_set != nullptr);
629                 assert(m_pNode != nullptr);
630
631                 size_t const arrayNodeSize = m_set->array_node_size();
632                 size_t const headSize = m_set->head_size();
633                 array_node * pNode = m_pNode;
634                 size_t idx = m_idx + 1;
635                 size_t nodeSize = m_pNode->pParent ? arrayNodeSize : headSize;
636
637                 for (;;) {
638                     if (idx < nodeSize) {
639                         node_ptr slot = pNode->nodes[idx].load(memory_model::memory_order_acquire);
640                         if (slot.bits() == flag_array_node) {
641                             // array node, go down the tree
642                             assert(slot.ptr() != nullptr);
643                             pNode = to_array(slot.ptr());
644                             idx = 0;
645                             nodeSize = arrayNodeSize;
646                         }
647                         else if (slot.bits() == flag_array_converting) {
648                             // the slot is converting to array node right now - skip the node
649                             ++idx;
650                         }
651                         else {
652                             if (slot.ptr()) {
653                                 // data node
654                                 m_pNode = pNode;
655                                 m_idx = idx;
656                                 m_pValue = slot.ptr();
657                                 return;
658                             }
659                             ++idx;
660                         }
661                     }
662                     else {
663                         // up to parent node
664                         if (pNode->pParent) {
665                             idx = pNode->idxParent + 1;
666                             pNode = pNode->pParent;
667                             nodeSize = pNode->pParent ? arrayNodeSize : headSize;
668                         }
669                         else {
670                             // end()
671                             assert(pNode == m_set->m_Head);
672                             assert(idx == headSize);
673                             m_pNode = pNode;
674                             m_idx = idx;
675                             m_pValue = nullptr;
676                             return;
677                         }
678                     }
679                 }
680             }
681
682             void backward()
683             {
684                 assert(gc::is_locked());
685                 assert(m_set != nullptr);
686                 assert(m_pNode != nullptr);
687
688                 size_t const arrayNodeSize = m_set->array_node_size();
689                 size_t const headSize = m_set->head_size();
690                 size_t const endIdx = size_t(0) - 1;
691
692                 array_node * pNode = m_pNode;
693                 size_t idx = m_idx - 1;
694                 size_t nodeSize = m_pNode->pParent ? arrayNodeSize : headSize;
695
696                 for (;;) {
697                     if (idx != endIdx) {
698                         node_ptr slot = pNode->nodes[idx].load(memory_model::memory_order_acquire);
699                         if (slot.bits() == flag_array_node) {
700                             // array node, go down the tree
701                             assert(slot.ptr() != nullptr);
702                             pNode = to_array(slot.ptr());
703                             nodeSize = arrayNodeSize;
704                             idx = nodeSize - 1;
705                         }
706                         else if (slot.bits() == flag_array_converting) {
707                             // the slot is converting to array node right now - skip the node
708                             --idx;
709                         }
710                         else {
711                             if (slot.ptr()) {
712                                 // data node
713                                 m_pNode = pNode;
714                                 m_idx = idx;
715                                 m_pValue = slot.ptr();
716                                 return;
717                             }
718                             --idx;
719                         }
720                     }
721                     else {
722                         // up to parent node
723                         if (pNode->pParent) {
724                             idx = pNode->idxParent - 1;
725                             pNode = pNode->pParent;
726                             nodeSize = pNode->pParent ? arrayNodeSize : headSize;
727                         }
728                         else {
729                             // rend()
730                             assert(pNode == m_set->m_Head);
731                             assert(idx == endIdx);
732                             m_pNode = pNode;
733                             m_idx = idx;
734                             m_pValue = nullptr;
735                             return;
736                         }
737                     }
738                 }
739             }
740         };
741
742         template <class Iterator>
743         Iterator init_begin() const
744         {
745             return Iterator(*this, m_Head, size_t(0) - 1);
746         }
747
748         template <class Iterator>
749         Iterator init_end() const
750         {
751             return Iterator(*this, m_Head, head_size(), false);
752         }
753
754         template <class Iterator>
755         Iterator init_rbegin() const
756         {
757             return Iterator(*this, m_Head, head_size());
758         }
759
760         template <class Iterator>
761         Iterator init_rend() const
762         {
763             return Iterator(*this, m_Head, size_t(0) - 1, false);
764         }
765
766         /// Bidirectional iterator class
767         template <bool IsConst>
768         class bidirectional_iterator : protected iterator_base
769         {
770             friend class FeldmanHashSet;
771
772         protected:
773             static CDS_CONSTEXPR bool const c_bConstantIterator = IsConst;
774
775         public:
776             typedef typename std::conditional< IsConst, value_type const*, value_type*>::type value_ptr; ///< Value pointer
777             typedef typename std::conditional< IsConst, value_type const&, value_type&>::type value_ref; ///< Value reference
778
779         public:
780             bidirectional_iterator() CDS_NOEXCEPT
781             {}
782
783             bidirectional_iterator(bidirectional_iterator const& rhs) CDS_NOEXCEPT
784                 : iterator_base(rhs)
785             {}
786
787             bidirectional_iterator& operator=(bidirectional_iterator const& rhs) CDS_NOEXCEPT
788             {
789                 iterator_base::operator=(rhs);
790                 return *this;
791             }
792
793             bidirectional_iterator& operator++()
794             {
795                 iterator_base::operator++();
796                 return *this;
797             }
798
799             bidirectional_iterator& operator--()
800             {
801                 iterator_base::operator--();
802                 return *this;
803             }
804
805             value_ptr operator ->() const CDS_NOEXCEPT
806             {
807                 return iterator_base::pointer();
808             }
809
810             value_ref operator *() const CDS_NOEXCEPT
811             {
812                 value_ptr p = iterator_base::pointer();
813                 assert(p);
814                 return *p;
815             }
816
817             template <bool IsConst2>
818             bool operator ==(bidirectional_iterator<IsConst2> const& rhs) const CDS_NOEXCEPT
819             {
820                 return iterator_base::operator==(rhs);
821             }
822
823             template <bool IsConst2>
824             bool operator !=(bidirectional_iterator<IsConst2> const& rhs) const CDS_NOEXCEPT
825             {
826                 return !(*this == rhs);
827             }
828
829         protected:
830             bidirectional_iterator(FeldmanHashSet& set, array_node * pNode, size_t idx, bool)
831                 : iterator_base(set, pNode, idx, false)
832             {}
833
834             bidirectional_iterator(FeldmanHashSet& set, array_node * pNode, size_t idx)
835                 : iterator_base(set, pNode, idx)
836             {}
837         };
838
839         /// Reverse bidirectional iterator
840         template <bool IsConst>
841         class reverse_bidirectional_iterator : public iterator_base
842         {
843             friend class FeldmanHashSet;
844
845         public:
846             typedef typename std::conditional< IsConst, value_type const*, value_type*>::type value_ptr; ///< Value pointer
847             typedef typename std::conditional< IsConst, value_type const&, value_type&>::type value_ref; ///< Value reference
848
849         public:
850             reverse_bidirectional_iterator() CDS_NOEXCEPT
851                 : iterator_base()
852             {}
853
854             reverse_bidirectional_iterator(reverse_bidirectional_iterator const& rhs) CDS_NOEXCEPT
855                 : iterator_base(rhs)
856             {}
857
858             reverse_bidirectional_iterator& operator=(reverse_bidirectional_iterator const& rhs) CDS_NOEXCEPT
859             {
860                 iterator_base::operator=(rhs);
861                 return *this;
862             }
863
864             reverse_bidirectional_iterator& operator++()
865             {
866                 iterator_base::operator--();
867                 return *this;
868             }
869
870             reverse_bidirectional_iterator& operator--()
871             {
872                 iterator_base::operator++();
873                 return *this;
874             }
875
876             value_ptr operator ->() const CDS_NOEXCEPT
877             {
878                 return iterator_base::pointer();
879             }
880
881             value_ref operator *() const CDS_NOEXCEPT
882             {
883                 value_ptr p = iterator_base::pointer();
884                 assert(p);
885                 return *p;
886             }
887
888             template <bool IsConst2>
889             bool operator ==(reverse_bidirectional_iterator<IsConst2> const& rhs) const
890             {
891                 return iterator_base::operator==(rhs);
892             }
893
894             template <bool IsConst2>
895             bool operator !=(reverse_bidirectional_iterator<IsConst2> const& rhs)
896             {
897                 return !(*this == rhs);
898             }
899
900         private:
901             reverse_bidirectional_iterator(FeldmanHashSet& set, array_node * pNode, size_t idx, bool)
902                 : iterator_base(set, pNode, idx, false)
903             {}
904
905             reverse_bidirectional_iterator(FeldmanHashSet& set, array_node * pNode, size_t idx)
906                 : iterator_base(set, pNode, idx, false)
907             {
908                 iterator_base::backward();
909             }
910         };
911         //@endcond
912
913     public:
914 #ifdef CDS_DOXYGEN_INVOKED
915         typedef implementation_defined iterator;            ///< @ref cds_intrusive_FeldmanHashSet_rcu_iterators "bidirectional iterator" type
916         typedef implementation_defined const_iterator;      ///< @ref cds_intrusive_FeldmanHashSet_rcu_iterators "bidirectional const iterator" type
917         typedef implementation_defined reverse_iterator;    ///< @ref cds_intrusive_FeldmanHashSet_rcu_iterators "bidirectional reverse iterator" type
918         typedef implementation_defined const_reverse_iterator; ///< @ref cds_intrusive_FeldmanHashSet_rcu_iterators "bidirectional reverse const iterator" type
919 #else
920         typedef bidirectional_iterator<false>   iterator;
921         typedef bidirectional_iterator<true>    const_iterator;
922         typedef reverse_bidirectional_iterator<false>   reverse_iterator;
923         typedef reverse_bidirectional_iterator<true>    const_reverse_iterator;
924 #endif
925
926         ///@name Thread-safe iterators
927         /** @anchor cds_intrusive_FeldmanHashSet_rcu_iterators
928             The set supports thread-safe iterators: you may iterate over the set in multi-threaded environment
929             under explicit RCU lock.
930             RCU lock requirement means that inserting or searching is allowed but you must not erase the items from the set
931             since erasing under RCU lock can lead to a deadlock. However, another thread can call \p erase() safely
932             while your thread is iterating.
933
934             A typical example is:
935             \code
936             struct foo {
937                 uint32_t    hash;
938                 // ... other fields
939                 uint32_t    payload; // only for example
940             };
941             struct set_traits: cds::intrusive::feldman_hashset::traits
942             {
943                 struct hash_accessor {
944                     uint32_t operator()( foo const& src ) const
945                     {
946                         retur src.hash;
947                     }
948                 };
949             };
950
951             typedef cds::urcu::gc< cds::urcu::general_buffered<>> rcu;
952             typedef cds::intrusive::FeldmanHashSet< rcu, foo, set_traits > set_type;
953
954             set_type s;
955
956             // ...
957
958             // iterate over the set
959             {
960                 // lock the RCU.
961                 typename set_type::rcu_lock l; // scoped RCU lock
962
963                 // traverse the set
964                 for ( auto i = s.begin(); i != s.end(); ++i ) {
965                     // deal with i. Remember, erasing is prohibited here!
966                     i->payload++;
967                 }
968             } // at this point RCU lock is released
969             /endcode
970
971             Each iterator object supports the common interface:
972             - dereference operators:
973                 @code
974                 value_type [const] * operator ->() noexcept
975                 value_type [const] & operator *() noexcept
976                 @endcode
977             - pre-increment and pre-decrement. Post-operators is not supported
978             - equality operators <tt>==</tt> and <tt>!=</tt>.
979                 Iterators are equal iff they point to the same cell of the same array node.
980                 Note that for two iterators \p it1 and \p it2 the condition <tt> it1 == it2 </tt>
981                 does not entail <tt> &(*it1) == &(*it2) </tt>: welcome to concurrent containers
982
983             @note It is possible the item can be iterated more that once, for example, if an iterator points to the item
984             in an array node that is being splitted.
985         */
986         ///@{
987
988         /// Returns an iterator to the beginning of the set
989         iterator begin()
990         {
991             return iterator(*this, m_Head, size_t(0) - 1);
992         }
993
994         /// Returns an const iterator to the beginning of the set
995         const_iterator begin() const
996         {
997             return const_iterator(*this, m_Head, size_t(0) - 1);
998         }
999
1000         /// Returns an const iterator to the beginning of the set
1001         const_iterator cbegin()
1002         {
1003             return const_iterator(*this, m_Head, size_t(0) - 1);
1004         }
1005
1006         /// Returns an iterator to the element following the last element of the set. This element acts as a placeholder; attempting to access it results in undefined behavior.
1007         iterator end()
1008         {
1009             return iterator(*this, m_Head, head_size(), false);
1010         }
1011
1012         /// Returns a const iterator to the element following the last element of the set. This element acts as a placeholder; attempting to access it results in undefined behavior.
1013         const_iterator end() const
1014         {
1015             return const_iterator(*this, m_Head, head_size(), false);
1016         }
1017
1018         /// Returns a const iterator to the element following the last element of the set. This element acts as a placeholder; attempting to access it results in undefined behavior.
1019         const_iterator cend()
1020         {
1021             return const_iterator(*this, m_Head, head_size(), false);
1022         }
1023
1024         /// Returns a reverse iterator to the first element of the reversed set
1025         reverse_iterator rbegin()
1026         {
1027             return reverse_iterator(*this, m_Head, head_size());
1028         }
1029
1030         /// Returns a const reverse iterator to the first element of the reversed set
1031         const_reverse_iterator rbegin() const
1032         {
1033             return const_reverse_iterator(*this, m_Head, head_size());
1034         }
1035
1036         /// Returns a const reverse iterator to the first element of the reversed set
1037         const_reverse_iterator crbegin()
1038         {
1039             return const_reverse_iterator(*this, m_Head, head_size());
1040         }
1041
1042         /// Returns a reverse iterator to the element following the last element of the reversed set
1043         /**
1044         It corresponds to the element preceding the first element of the non-reversed container.
1045         This element acts as a placeholder, attempting to access it results in undefined behavior.
1046         */
1047         reverse_iterator rend()
1048         {
1049             return reverse_iterator(*this, m_Head, size_t(0) - 1, false);
1050         }
1051
1052         /// Returns a const reverse iterator to the element following the last element of the reversed set
1053         /**
1054         It corresponds to the element preceding the first element of the non-reversed container.
1055         This element acts as a placeholder, attempting to access it results in undefined behavior.
1056         */
1057         const_reverse_iterator rend() const
1058         {
1059             return const_reverse_iterator(*this, m_Head, size_t(0) - 1, false);
1060         }
1061
1062         /// Returns a const reverse iterator to the element following the last element of the reversed set
1063         /**
1064         It corresponds to the element preceding the first element of the non-reversed container.
1065         This element acts as a placeholder, attempting to access it results in undefined behavior.
1066         */
1067         const_reverse_iterator crend()
1068         {
1069             return const_reverse_iterator(*this, m_Head, size_t(0) - 1, false);
1070         }
1071         ///@}
1072
1073     protected:
1074         //@cond
1075         template <typename Func>
1076         std::pair<bool, bool> do_update(value_type& val, Func f, bool bInsert = true)
1077         {
1078             hash_type const& hash = hash_accessor()(val);
1079             hash_splitter splitter(hash);
1080             hash_comparator cmp;
1081             back_off bkoff;
1082
1083             array_node * pArr = m_Head;
1084             size_t nSlot = splitter.cut(m_Metrics.head_node_size_log);
1085             assert(nSlot < m_Metrics.head_node_size);
1086             size_t nOffset = m_Metrics.head_node_size_log;
1087             size_t nHeight = 1;
1088
1089             while (true) {
1090                 node_ptr slot = pArr->nodes[nSlot].load(memory_model::memory_order_acquire);
1091                 if (slot.bits() == flag_array_node) {
1092                     // array node, go down the tree
1093                     assert(slot.ptr() != nullptr);
1094                     nSlot = splitter.cut(m_Metrics.array_node_size_log);
1095                     assert(nSlot < m_Metrics.array_node_size);
1096                     pArr = to_array(slot.ptr());
1097                     nOffset += m_Metrics.array_node_size_log;
1098                     ++nHeight;
1099                 }
1100                 else if (slot.bits() == flag_array_converting) {
1101                     // the slot is converting to array node right now
1102                     bkoff();
1103                     m_Stat.onSlotConverting();
1104                 }
1105                 else {
1106                     // data node
1107                     assert(slot.bits() == 0);
1108
1109                     value_type * pOld = nullptr;
1110                     {
1111                         rcu_lock rcuLock;
1112
1113                         if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) == slot ) {
1114                             if ( slot.ptr()) {
1115                                 if (cmp(hash, hash_accessor()(*slot.ptr())) == 0) {
1116                                     // the item with that hash value already exists
1117                                     // Replace it with val
1118                                     if (slot.ptr() == &val) {
1119                                         m_Stat.onUpdateExisting();
1120                                         return std::make_pair(true, false);
1121                                     }
1122
1123                                     if (pArr->nodes[nSlot].compare_exchange_strong(slot, node_ptr(&val), memory_model::memory_order_release, atomics::memory_order_relaxed)) {
1124                                         // slot can be disposed
1125                                         f(val, slot.ptr());
1126                                         pOld = slot.ptr();
1127                                         m_Stat.onUpdateExisting();
1128                                         goto update_existing_done;
1129                                     }
1130
1131                                     m_Stat.onUpdateRetry();
1132                                     continue;
1133                                 }
1134
1135                                 // the slot must be expanded
1136                                 expand_slot(pArr, nSlot, slot, nOffset);
1137                             }
1138                             else {
1139                                 // the slot is empty, try to insert data node
1140                                 if (bInsert) {
1141                                     node_ptr pNull;
1142                                     if (pArr->nodes[nSlot].compare_exchange_strong(pNull, node_ptr(&val), memory_model::memory_order_release, atomics::memory_order_relaxed))
1143                                     {
1144                                         // the new data node has been inserted
1145                                         f(val, nullptr);
1146                                         ++m_ItemCounter;
1147                                         m_Stat.onUpdateNew();
1148                                         m_Stat.height(nHeight);
1149                                         return std::make_pair(true, true);
1150                                     }
1151                                 }
1152                                 else {
1153                                     m_Stat.onUpdateFailed();
1154                                     return std::make_pair(false, false);
1155                                 }
1156
1157                                 // insert failed - slot has been changed by another thread
1158                                 // retry updating
1159                                 m_Stat.onUpdateRetry();
1160                             }
1161                         }
1162                         else
1163                             m_Stat.onSlotChanged();
1164                         continue;
1165                     } // rcu_lock
1166
1167                     // update success
1168         update_existing_done:
1169                     if ( pOld )
1170                         gc::template retire_ptr<disposer>( pOld );
1171                     return std::make_pair(true, false);
1172                 } 
1173             } // while
1174         }
1175
1176         template <typename Predicate>
1177         value_type * do_erase( hash_type const& hash, Predicate pred)
1178         {
1179             assert(gc::is_locked());
1180
1181             hash_splitter splitter(hash);
1182             hash_comparator cmp;
1183             back_off bkoff;
1184
1185             array_node * pArr = m_Head;
1186             size_t nSlot = splitter.cut(m_Metrics.head_node_size_log);
1187             assert(nSlot < m_Metrics.head_node_size);
1188
1189             while (true) {
1190                 node_ptr slot = pArr->nodes[nSlot].load(memory_model::memory_order_acquire);
1191                 if (slot.bits() == flag_array_node) {
1192                     // array node, go down the tree
1193                     assert(slot.ptr() != nullptr);
1194                     nSlot = splitter.cut(m_Metrics.array_node_size_log);
1195                     assert(nSlot < m_Metrics.array_node_size);
1196                     pArr = to_array(slot.ptr());
1197                 }
1198                 else if (slot.bits() == flag_array_converting) {
1199                     // the slot is converting to array node right now
1200                     bkoff();
1201                     m_Stat.onSlotConverting();
1202                 }
1203                 else {
1204                     // data node
1205                     assert(slot.bits() == 0);
1206
1207                     if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) == slot ) {
1208                         if (slot.ptr()) {
1209                             if (cmp(hash, hash_accessor()(*slot.ptr())) == 0 && pred(*slot.ptr())) {
1210                                 // item found - replace it with nullptr
1211                                 if (pArr->nodes[nSlot].compare_exchange_strong(slot, node_ptr(nullptr), memory_model::memory_order_acquire, atomics::memory_order_relaxed)) {
1212                                     --m_ItemCounter;
1213                                     m_Stat.onEraseSuccess();
1214
1215                                     return slot.ptr();
1216                                 }
1217                                 m_Stat.onEraseRetry();
1218                                 continue;
1219                             }
1220                             m_Stat.onEraseFailed();
1221                             return nullptr;
1222                         }
1223                         else {
1224                             // the slot is empty
1225                             m_Stat.onEraseFailed();
1226                             return nullptr;
1227                         }
1228                     }
1229                     else
1230                         m_Stat.onSlotChanged();
1231                 }
1232             } // while
1233         }
1234
1235         value_type * search(hash_type const& hash )
1236         {
1237             assert( gc::is_locked() );
1238
1239             hash_splitter splitter(hash);
1240             hash_comparator cmp;
1241             back_off bkoff;
1242
1243             array_node * pArr = m_Head;
1244             size_t nSlot = splitter.cut(m_Metrics.head_node_size_log);
1245             assert(nSlot < m_Metrics.head_node_size);
1246
1247             while (true) {
1248                 node_ptr slot = pArr->nodes[nSlot].load(memory_model::memory_order_acquire);
1249                 if (slot.bits() == flag_array_node) {
1250                     // array node, go down the tree
1251                     assert(slot.ptr() != nullptr);
1252                     nSlot = splitter.cut(m_Metrics.array_node_size_log);
1253                     assert(nSlot < m_Metrics.array_node_size);
1254                     pArr = to_array(slot.ptr());
1255                 }
1256                 else if (slot.bits() == flag_array_converting) {
1257                     // the slot is converting to array node right now
1258                     bkoff();
1259                     m_Stat.onSlotConverting();
1260                 }
1261                 else {
1262                     // data node
1263                     assert(slot.bits() == 0);
1264
1265                     // protect data node by hazard pointer
1266                     if ( pArr->nodes[nSlot].load(memory_model::memory_order_acquire) != slot) {
1267                         // slot value has been changed - retry
1268                         m_Stat.onSlotChanged();
1269                     }
1270                     else if (slot.ptr() && cmp(hash, hash_accessor()(*slot.ptr())) == 0) {
1271                         // item found
1272                         m_Stat.onFindSuccess();
1273                         return slot.ptr();
1274                     }
1275                     m_Stat.onFindFailed();
1276                     return nullptr;
1277                 }
1278             } // while
1279         }
1280
1281         //@endcond
1282
1283     private:
1284         //@cond
1285         array_node * alloc_head_node() const
1286         {
1287             return alloc_array_node(head_size(), nullptr, 0);
1288         }
1289
1290         array_node * alloc_array_node(array_node * pParent, size_t idxParent) const
1291         {
1292             return alloc_array_node(array_node_size(), pParent, idxParent);
1293         }
1294
1295         static array_node * alloc_array_node(size_t nSize, array_node * pParent, size_t idxParent)
1296         {
1297             array_node * pNode = cxx_array_node_allocator().NewBlock(sizeof(array_node) + sizeof(atomic_node_ptr) * (nSize - 1), pParent, idxParent);
1298             new (pNode->nodes) atomic_node_ptr[nSize];
1299             return pNode;
1300         }
1301
1302         static void free_array_node(array_node * parr)
1303         {
1304             cxx_array_node_allocator().Delete(parr);
1305         }
1306
1307         void destroy_tree()
1308         {
1309             // The function is not thread-safe. For use in dtor only
1310             // Remove data node
1311             clear();
1312
1313             // Destroy all array nodes
1314             destroy_array_nodes(m_Head, head_size());
1315         }
1316
1317         void destroy_array_nodes(array_node * pArr, size_t nSize)
1318         {
1319             for (atomic_node_ptr * p = pArr->nodes, *pLast = pArr->nodes + nSize; p != pLast; ++p) {
1320                 node_ptr slot = p->load(memory_model::memory_order_relaxed);
1321                 if (slot.bits() == flag_array_node) {
1322                     destroy_array_nodes(to_array(slot.ptr()), array_node_size());
1323                     free_array_node(to_array(slot.ptr()));
1324                     p->store(node_ptr(), memory_model::memory_order_relaxed);
1325                 }
1326             }
1327         }
1328
1329         void gather_level_statistics(std::vector<feldman_hashset::level_statistics>& stat, size_t nLevel, array_node * pArr, size_t nSize) const
1330         {
1331             if (stat.size() <= nLevel) {
1332                 stat.resize(nLevel + 1);
1333                 stat[nLevel].node_capacity = nSize;
1334             }
1335
1336             ++stat[nLevel].array_node_count;
1337             for (atomic_node_ptr * p = pArr->nodes, *pLast = pArr->nodes + nSize; p != pLast; ++p) {
1338                 node_ptr slot = p->load(memory_model::memory_order_relaxed);
1339                 if (slot.bits() == flag_array_node) {
1340                     ++stat[nLevel].array_cell_count;
1341                     gather_level_statistics(stat, nLevel + 1, to_array(slot.ptr()), array_node_size());
1342                 }
1343                 else if (slot.bits() == flag_array_converting)
1344                     ++stat[nLevel].array_cell_count;
1345                 else if (slot.ptr())
1346                     ++stat[nLevel].data_cell_count;
1347                 else
1348                     ++stat[nLevel].empty_cell_count;
1349             }
1350         }
1351
1352         void clear_array(array_node * pArrNode, size_t nSize)
1353         {
1354             back_off bkoff;
1355
1356
1357             for (atomic_node_ptr * pArr = pArrNode->nodes, *pLast = pArr + nSize; pArr != pLast; ++pArr) {
1358                 while (true) {
1359                     node_ptr slot = pArr->load(memory_model::memory_order_acquire);
1360                     if (slot.bits() == flag_array_node) {
1361                         // array node, go down the tree
1362                         assert(slot.ptr() != nullptr);
1363                         clear_array(to_array(slot.ptr()), array_node_size());
1364                         break;
1365                     }
1366                     else if (slot.bits() == flag_array_converting) {
1367                         // the slot is converting to array node right now
1368                         while ((slot = pArr->load(memory_model::memory_order_acquire)).bits() == flag_array_converting) {
1369                             bkoff();
1370                             m_Stat.onSlotConverting();
1371                         }
1372                         bkoff.reset();
1373
1374                         assert(slot.ptr() != nullptr);
1375                         assert(slot.bits() == flag_array_node);
1376                         clear_array(to_array(slot.ptr()), array_node_size());
1377                         break;
1378                     }
1379                     else {
1380                         // data node
1381                         if (pArr->compare_exchange_strong(slot, node_ptr(), memory_model::memory_order_acquire, atomics::memory_order_relaxed)) {
1382                             if (slot.ptr()) {
1383                                 gc::template retire_ptr<disposer>(slot.ptr());
1384                                 --m_ItemCounter;
1385                                 m_Stat.onEraseSuccess();
1386                             }
1387                             break;
1388                         }
1389                     }
1390                 }
1391             }
1392         }
1393
1394         bool expand_slot(array_node * pParent, size_t idxParent, node_ptr current, size_t nOffset)
1395         {
1396             assert(current.bits() == 0);
1397             assert(current.ptr());
1398
1399             size_t idx = hash_splitter(hash_accessor()(*current.ptr()), nOffset).cut(m_Metrics.array_node_size_log);
1400             array_node * pArr = alloc_array_node(pParent, idxParent);
1401
1402             node_ptr cur(current.ptr());
1403             atomic_node_ptr& slot = pParent->nodes[idxParent];
1404             if (!slot.compare_exchange_strong(cur, cur | flag_array_converting, memory_model::memory_order_release, atomics::memory_order_relaxed))
1405             {
1406                 m_Stat.onExpandNodeFailed();
1407                 free_array_node(pArr);
1408                 return false;
1409             }
1410
1411             pArr->nodes[idx].store(current, memory_model::memory_order_release);
1412
1413             cur = cur | flag_array_converting;
1414             CDS_VERIFY(
1415                 slot.compare_exchange_strong(cur, node_ptr(to_node(pArr), flag_array_node), memory_model::memory_order_release, atomics::memory_order_relaxed)
1416                 );
1417
1418             m_Stat.onExpandNodeSuccess();
1419             m_Stat.onArrayNodeCreated();
1420             return true;
1421         }
1422
1423         union converter {
1424             value_type * pData;
1425             array_node * pArr;
1426
1427             converter(value_type * p)
1428                 : pData(p)
1429             {}
1430
1431             converter(array_node * p)
1432                 : pArr(p)
1433             {}
1434         };
1435
1436         static array_node * to_array(value_type * p)
1437         {
1438             return converter(p).pArr;
1439         }
1440         static value_type * to_node(array_node * p)
1441         {
1442             return converter(p).pData;
1443         }
1444         //@endcond
1445     };
1446
1447 }} // namespace cds::intrusive
1448
1449 #endif  // #ifndef CDSLIB_INTRUSIVE_FELDMAN_HASHSET_RCU_H