On dev: bronson-unit-test
[libcds.git] / cds / container / impl / bronson_avltree_map_rcu.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_CONTAINER_IMPL_BRONSON_AVLTREE_MAP_RCU_H
4 #define CDSLIB_CONTAINER_IMPL_BRONSON_AVLTREE_MAP_RCU_H
5
6 #include <type_traits> // is_base_of
7 #include <cds/container/details/bronson_avltree_base.h>
8 #include <cds/urcu/details/check_deadlock.h>
9 #include <cds/urcu/exempt_ptr.h>
10
11 namespace cds { namespace container {
12
13     /// Bronson et al AVL-tree (RCU specialization for storing pointer to values)
14     /** @ingroup cds_nonintrusive_map
15         @ingroup cds_nonintrusive_tree
16         @headerfile cds/container/bronson_avltree_map_rcu.h
17
18         This is the specialization of \ref cds_container_BronsonAVLTreeMap_rcu "RCU-based Bronson et al AVL-tree"
19         for "key -> value pointer" map. This specialization stores the pointer to user-allocated values instead of the copy
20         of the value. When a tree node is removed, the algorithm does not free the value pointer directly, instead, it call
21         the disposer functor provided by \p Traits template parameter.
22
23         <b>Template arguments</b>:
24         - \p RCU - one of \ref cds_urcu_gc "RCU type"
25         - \p Key - key type
26         - \p T - value type to be stored in tree's nodes. Note, the specialization stores the pointer to user-allocated
27             value, not the copy.
28         - \p Traits - tree traits, default is \p bronson_avltree::traits
29             It is possible to declare option-based tree with \p bronson_avltree::make_traits metafunction
30             instead of \p Traits template argument.
31
32         @note Before including <tt><cds/container/bronson_avltree_map_rcu.h></tt> you should include appropriate RCU header file,
33         see \ref cds_urcu_gc "RCU type" for list of existing RCU class and corresponding header files.
34     */
35     template <
36         typename RCU,
37         typename Key,
38         typename T,
39 #   ifdef CDS_DOXYGEN_INVOKED
40         typename Traits = bronson_avltree::traits
41 #else
42         typename Traits
43 #endif
44     >
45     class BronsonAVLTreeMap< cds::urcu::gc<RCU>, Key, T*, Traits >
46     {
47     public:
48         typedef cds::urcu::gc<RCU>  gc;   ///< RCU Garbage collector
49         typedef Key     key_type;    ///< type of a key stored in the map
50         typedef T *     mapped_type; ///< type of value stored in the map
51         typedef Traits  traits;      ///< Traits template parameter
52
53 #   ifdef CDS_DOXYGEN_INVOKED
54         typedef implementation_defined key_comparator;    ///< key compare functor based on \p Traits::compare and \p Traits::less
55 #   else
56         typedef typename opt::details::make_comparator< key_type, traits >::type key_comparator;
57 #endif
58         typedef typename traits::item_counter           item_counter;       ///< Item counting policy
59         typedef typename traits::memory_model           memory_model;       ///< Memory ordering, see \p cds::opt::memory_model option
60         typedef typename traits::node_allocator         node_allocator_type; ///< allocator for maintaining internal nodes
61         typedef typename traits::stat                   stat;               ///< internal statistics
62         typedef typename traits::rcu_check_deadlock     rcu_check_deadlock; ///< Deadlock checking policy
63         typedef typename traits::back_off               back_off;           ///< Back-off strategy
64         typedef typename traits::disposer               disposer;           ///< Value disposer
65         typedef typename traits::sync_monitor           sync_monitor;       ///< @ref cds_sync_monitor "Synchronization monitor" type for node-level locking
66
67         /// Enabled or disabled @ref bronson_avltree::relaxed_insert "relaxed insertion"
68         static CDS_CONSTEXPR bool const c_bRelaxedInsert = traits::relaxed_insert;
69
70         /// Group of \p extract_xxx functions does not require external locking
71         static CDS_CONSTEXPR const bool c_bExtractLockExternal = false;
72
73 #   ifdef CDS_DOXYGEN_INVOKED
74         /// Returned pointer to \p mapped_type of extracted node
75         typedef cds::urcu::exempt_ptr< gc, T, T, disposer, void > exempt_ptr;
76 #   else
77         typedef cds::urcu::exempt_ptr< gc,
78             typename std::remove_pointer<mapped_type>::type,
79             typename std::remove_pointer<mapped_type>::type,
80             disposer,
81             void
82         > exempt_ptr;
83 #   endif
84
85         typedef typename gc::scoped_lock    rcu_lock;  ///< RCU scoped lock
86
87     protected:
88         //@cond
89         typedef bronson_avltree::node< key_type, mapped_type, sync_monitor > node_type;
90         typedef typename node_type::version_type version_type;
91
92         typedef cds::details::Allocator< node_type, node_allocator_type > cxx_allocator;
93         typedef cds::urcu::details::check_deadlock_policy< gc, rcu_check_deadlock >   check_deadlock_policy;
94
95         enum class find_result
96         {
97             not_found,
98             found,
99             retry
100         };
101
102         struct update_flags
103         {
104             enum {
105                 allow_insert = 1,
106                 allow_update = 2,
107                 //allow_remove = 4,
108
109                 retry = 1024,
110
111                 failed = 0,
112                 result_inserted = allow_insert,
113                 result_updated = allow_update,
114                 result_removed = 4
115             };
116         };
117
118         enum node_condition
119         {
120             nothing_required = -3,
121             rebalance_required = -2,
122             unlink_required = -1
123         };
124
125         enum direction {
126             left_child = -1,
127             right_child = 1
128         };
129
130         typedef typename sync_monitor::template scoped_lock<node_type> node_scoped_lock;
131         //@endcond
132
133     protected:
134         //@cond
135         template <typename K>
136         static node_type * alloc_node( K&& key, int nHeight, version_type version, node_type * pParent, node_type * pLeft, node_type * pRight )
137         {
138             return cxx_allocator().New( std::forward<K>( key ), nHeight, version, pParent, pLeft, pRight );
139         }
140
141         static void free_node( node_type * pNode )
142         {
143             // Free node without disposer
144             cxx_allocator().Delete( pNode );
145         }
146
147         static void free_value( mapped_type pVal )
148         {
149             disposer()(pVal);
150         }
151
152         static node_type * child( node_type * pNode, int nDir, atomics::memory_order order )
153         {
154             return pNode->child( nDir ).load( order );
155         }
156
157         static node_type * parent( node_type * pNode, atomics::memory_order order )
158         {
159             return pNode->m_pParent.load( order );
160         }
161
162         // RCU safe disposer 
163         class rcu_disposer
164         {
165             node_type *     m_pRetiredList;     ///< head of retired node list
166             mapped_type     m_pRetiredValue;    ///< value retired
167
168         public:
169             rcu_disposer()
170                 : m_pRetiredList( nullptr )
171                 , m_pRetiredValue( nullptr )
172             {}
173
174             ~rcu_disposer()
175             {
176                 clean();
177             }
178
179             void dispose( node_type * pNode )
180             {
181                 assert( !pNode->is_valued( memory_model::memory_order_relaxed ));
182                 pNode->m_pNextRemoved = m_pRetiredList;
183                 m_pRetiredList = pNode;
184             }
185             
186             void dispose_value( mapped_type pVal )
187             {
188                 assert( m_pRetiredValue == nullptr );
189                 m_pRetiredValue = pVal;
190             }
191             
192         private:
193             struct internal_disposer
194             {
195                 void operator()( node_type * p ) const
196                 {
197                     free_node( p );
198                 }
199             };
200
201             void clean()
202             {
203                 assert( !gc::is_locked() );
204                 
205                 // TODO: use RCU::batch_retire
206
207                 // Dispose nodes
208                 for ( node_type * p = m_pRetiredList; p; ) {
209                     node_type * pNext = static_cast<node_type *>( p->m_pNextRemoved );
210                     // Value already disposed
211                     gc::template retire_ptr<internal_disposer>( p );
212                     p = pNext;
213                 }
214
215                 // Dispose value
216                 if ( m_pRetiredValue  )
217                     gc::template retire_ptr<disposer>( m_pRetiredValue );
218             }
219         };
220
221         //@endcond
222
223     protected:
224         //@cond
225         typename node_type::base_class m_Root;
226         node_type *             m_pRoot;
227         item_counter            m_ItemCounter;
228         mutable sync_monitor    m_Monitor;
229         mutable stat            m_stat;
230         //@endcond
231
232     public:
233         /// Creates empty map
234         BronsonAVLTreeMap()
235             : m_pRoot( static_cast<node_type *>( &m_Root ))
236         {}
237
238         /// Destroys the map
239         ~BronsonAVLTreeMap()
240         {
241             unsafe_clear();
242         }
243
244         /// Inserts new node
245         /**
246             The \p key_type should be constructible from a value of type \p K.
247
248             RCU \p synchronize() can be called. RCU should not be locked.
249
250             Returns \p true if inserting successful, \p false otherwise.
251         */
252         template <typename K>
253         bool insert( K const& key, mapped_type pVal )
254         {
255             return do_update(key, key_comparator(),
256                 [pVal]( node_type * pNode ) -> mapped_type
257                 {
258                     assert( pNode->m_pValue.load( memory_model::memory_order_relaxed ) == nullptr );
259                     return pVal;
260                 }, 
261                 update_flags::allow_insert
262             ) == update_flags::result_inserted;
263         }
264
265         /// Updates the value for \p key
266         /**
267             The operation performs inserting or updating the value for \p key with lock-free manner.
268             If \p bInsert is \p false, only updating of existing node is possible.
269
270             If \p key is not found and inserting is allowed (i.e. \p bInsert is \p true),
271             then the new node created from \p key will be inserted into the map; note that in this case the \ref key_type should be
272             constructible from type \p K.
273             Otherwise, the value for \p key will be changed to \p pVal.
274
275             RCU \p synchronize() method can be called. RCU should not be locked.
276
277             Returns <tt> std::pair<bool, bool> </tt> where \p first is \p true if operation is successfull,
278             \p second is \p true if new node has been added or \p false if the node with \p key
279             already exists.
280         */
281         template <typename K>
282         std::pair<bool, bool> update( K const& key, mapped_type pVal, bool bInsert = true )
283         {
284             int result = do_update( key, key_comparator(),
285                 [pVal]( node_type * ) -> mapped_type 
286                 {
287                     return pVal;
288                 },
289                 update_flags::allow_update | (bInsert ? update_flags::allow_insert : 0) 
290             );
291             return std::make_pair( result != 0, (result & update_flags::result_inserted) != 0 );
292         }
293
294         //@cond
295         template <typename K>
296         std::pair<bool, bool> ensure( K const& key, mapped_type pVal )
297         {
298             return update( key, pVal, true );
299         }
300
301         //@endcond
302
303         /// Delete \p key from the map
304         /**
305             RCU \p synchronize() method can be called. RCU should not be locked.
306
307             Return \p true if \p key is found and deleted, \p false otherwise
308         */
309         template <typename K>
310         bool erase( K const& key )
311         {
312             return do_remove(
313                 key,
314                 key_comparator(),
315                 []( key_type const&, mapped_type pVal, rcu_disposer& disp ) -> bool { disp.dispose_value( pVal ); return true; }
316             );
317         }
318
319         /// Deletes the item from the map using \p pred predicate for searching
320         /**
321             The function is an analog of \p erase(K const&)
322             but \p pred is used for key comparing.
323             \p Less functor has the interface like \p std::less.
324             \p Less must imply the same element order as the comparator used for building the map.
325         */
326         template <typename K, typename Less>
327         bool erase_with( K const& key, Less pred )
328         {
329             CDS_UNUSED( pred );
330             return do_remove( 
331                 key, 
332                 cds::opt::details::make_comparator_from_less<Less>(),
333                 []( key_type const&, mapped_type pVal, rcu_disposer& disp ) -> bool { disp.dispose_value( pVal ); return true;  }
334             );
335         }
336
337         /// Delete \p key from the map
338         /**
339             The function searches an item with key \p key, calls \p f functor
340             and deletes the item. If \p key is not found, the functor is not called.
341
342             The functor \p Func interface:
343             \code
344             struct extractor {
345                 void operator()(mapped_type& item) { ... }
346             };
347             \endcode
348
349             RCU \p synchronize method can be called. RCU should not be locked.
350
351             Return \p true if key is found and deleted, \p false otherwise
352         */
353         template <typename K, typename Func>
354         bool erase( K const& key, Func f )
355         {
356             return do_remove( 
357                 key, 
358                 key_comparator(), 
359                 [&f]( key_type const&, mapped_type pVal, rcu_disposer& disp ) -> bool { 
360                     assert( pVal );
361                     f( *pVal ); 
362                     disp.dispose_value(pVal); 
363                     return true;
364                 }
365             );
366         }
367
368         /// Deletes the item from the map using \p pred predicate for searching
369         /**
370             The function is an analog of \p erase(K const&, Func)
371             but \p pred is used for key comparing.
372             \p Less functor has the interface like \p std::less.
373             \p Less must imply the same element order as the comparator used for building the map.
374         */
375         template <typename K, typename Less, typename Func>
376         bool erase_with( K const& key, Less pred, Func f )
377         {
378             CDS_UNUSED( pred );
379             return do_remove( 
380                 key, 
381                 cds::opt::details::make_comparator_from_less<Less>(),
382                 [&f]( key_type const&, mapped_type pVal, rcu_disposer& disp ) -> bool { 
383                     assert( pVal );
384                     f( *pVal ); 
385                     disp.dispose_value(pVal); 
386                     return true;
387                 }
388             );
389         }
390
391         /// Extracts a value with minimal key from the map
392         /**
393             Returns \p exempt_ptr to the leftmost item.
394             If the tree is empty, returns empty \p exempt_ptr.
395
396             Note that the function returns only the value for minimal key.
397             To retrieve its key use \p extract_min( Func ) member function.
398
399             @note Due the concurrent nature of the map, the function extracts <i>nearly</i> minimum key.
400             It means that the function gets leftmost leaf of the tree and tries to unlink it.
401             During unlinking, a concurrent thread may insert an item with key less than leftmost item's key.
402             So, the function returns the item with minimum key at the moment of tree traversing.
403
404             RCU \p synchronize method can be called. RCU should NOT be locked.
405             The function does not free the item.
406             The deallocator will be implicitly invoked when the returned object is destroyed or when
407             its \p release() member function is called.
408         */
409         exempt_ptr extract_min()
410         {
411             return exempt_ptr(do_extract_min( []( key_type const& ) {}));
412         }
413
414         /// Extracts minimal key key and corresponding value
415         /**
416             Returns \p exempt_ptr to the leftmost item.
417             If the tree is empty, returns empty \p exempt_ptr.
418
419             \p Func functor is used to store minimal key.
420             \p Func has the following signature:
421             \code
422             struct functor {
423                 void operator()( key_type const& key );
424             };
425             \endcode
426             If the tree is empty, \p f is not called.
427             Otherwise, is it called with minimal key, the pointer to corresponding value is returned
428             as \p exempt_ptr.
429
430             @note Due the concurrent nature of the map, the function extracts <i>nearly</i> minimum key.
431             It means that the function gets leftmost leaf of the tree and tries to unlink it.
432             During unlinking, a concurrent thread may insert an item with key less than leftmost item's key.
433             So, the function returns the item with minimum key at the moment of tree traversing.
434
435             RCU \p synchronize method can be called. RCU should NOT be locked.
436             The function does not free the item.
437             The deallocator will be implicitly invoked when the returned object is destroyed or when
438             its \p release() member function is called.
439         */
440         template <typename Func>
441         exempt_ptr extract_min( Func f )
442         {
443             return exempt_ptr(do_extract_min( [&f]( key_type const& key ) { f(key); }));
444         }
445
446         /// Extracts minimal key key and corresponding value
447         /**
448             This function is a shortcut for the following call:
449             \code
450             key_type key;
451             exempt_ptr xp = theTree.extract_min( [&key]( key_type const& k ) { key = k; } );
452             \endode
453             \p key_type should be copy-assignable. The copy of minimal key
454             is returned in \p min_key argument.
455         */
456         typename std::enable_if< std::is_copy_assignable<key_type>::value, exempt_ptr >::type
457         extract_min_key( key_type& min_key )
458         {
459             return exempt_ptr(do_extract_min( [&min_key]( key_type const& key ) { min_key = key; }));
460         }
461
462         /// Extracts a value with maximal key from the tree
463         /**
464             Returns \p exempt_ptr pointer to the rightmost item.
465             If the set is empty, returns empty \p exempt_ptr.
466
467             Note that the function returns only the value for maximal key.
468             To retrieve its key use \p extract_max( Func ) member function.
469
470             @note Due the concurrent nature of the map, the function extracts <i>nearly</i> maximal key.
471             It means that the function gets rightmost leaf of the tree and tries to unlink it.
472             During unlinking, a concurrent thread may insert an item with key great than leftmost item's key.
473             So, the function returns the item with maximum key at the moment of tree traversing.
474
475             RCU \p synchronize method can be called. RCU should NOT be locked.
476             The function does not free the item.
477             The deallocator will be implicitly invoked when the returned object is destroyed or when
478             its \p release() is called.
479         */
480         exempt_ptr extract_max()
481         {
482             return exempt_ptr(do_extract_max( []( key_type const& ) {}));
483         }
484
485         /// Extracts the maximal key and corresponding value
486         /**
487             Returns \p exempt_ptr pointer to the rightmost item.
488             If the set is empty, returns empty \p exempt_ptr.
489
490             \p Func functor is used to store maximal key.
491             \p Func has the following signature:
492             \code
493                 struct functor {
494                     void operator()( key_type const& key );
495                 };
496             \endcode
497             If the tree is empty, \p f is not called.
498             Otherwise, is it called with maximal key, the pointer to corresponding value is returned
499             as \p exempt_ptr.
500
501             @note Due the concurrent nature of the map, the function extracts <i>nearly</i> maximal key.
502             It means that the function gets rightmost leaf of the tree and tries to unlink it.
503             During unlinking, a concurrent thread may insert an item with key great than leftmost item's key.
504             So, the function returns the item with maximum key at the moment of tree traversing.
505
506             RCU \p synchronize method can be called. RCU should NOT be locked.
507             The function does not free the item.
508             The deallocator will be implicitly invoked when the returned object is destroyed or when
509             its \p release() is called.
510         */
511         template <typename Func>
512         exempt_ptr extract_max( Func f )
513         {
514             return exempt_ptr(do_extract_max( [&f]( key_type const& key ) { f(key); }));
515         }
516
517         /// Extracts the maximal key and corresponding value
518         /**
519             This function is a shortcut for the following call:
520             \code
521                 key_type key;
522                 exempt_ptr xp = theTree.extract_max( [&key]( key_type const& k ) { key = k; } );
523             \endode
524             \p key_type should be copy-assignable. The copy of maximal key
525             is returned in \p max_key argument.
526         */
527         typename std::enable_if< std::is_copy_assignable<key_type>::value, exempt_ptr >::type
528         extract_max_key( key_type& max_key )
529         {
530             return exempt_ptr(do_extract_max( [&max_key]( key_type const& key ) { max_key = key; }));
531         }
532
533         /// Extracts an item from the map
534         /**
535             The function searches an item with key equal to \p key in the tree,
536             unlinks it, and returns \p exempt_ptr pointer to a value found.
537             If \p key is not found the function returns an empty \p exempt_ptr.
538
539             RCU \p synchronize method can be called. RCU should NOT be locked.
540             The function does not destroy the value found.
541             The disposer will be implicitly invoked when the returned object is destroyed or when
542             its \p release() member function is called.
543         */
544         template <typename Q>
545         exempt_ptr extract( Q const& key )
546         {
547             return exempt_ptr(do_extract( key ));
548         }
549
550
551         /// Extracts an item from the map using \p pred for searching
552         /**
553             The function is an analog of \p extract(Q const&)
554             but \p pred is used for key compare.
555             \p Less has the interface like \p std::less.
556             \p pred must imply the same element order as the comparator used for building the tree.
557         */
558         template <typename Q, typename Less>
559         exempt_ptr extract_with( Q const& key, Less pred )
560         {
561             return exempt_ptr(do_extract_with( key, pred ));
562         }
563
564         /// Find the key \p key
565         /**
566             The function searches the item with key equal to \p key and calls the functor \p f for item found.
567             The interface of \p Func functor is:
568             \code
569             struct functor {
570                 void operator()( key_type const& key, mapped_type& item );
571             };
572             \endcode
573             where \p item is the item found.
574             The functor is called under node-level lock.
575
576             The function applies RCU lock internally.
577
578             The function returns \p true if \p key is found, \p false otherwise.
579         */
580         template <typename K, typename Func>
581         bool find( K const& key, Func f )
582         {
583             return do_find( key, key_comparator(), 
584                 [&f]( node_type * pNode ) -> bool {
585                     assert( pNode != nullptr );
586                     mapped_type pVal = pNode->m_pValue.load( memory_model::memory_order_relaxed );
587                     if ( pVal ) {
588                         f( pNode->m_key, *pVal );
589                         return true;
590                     }
591                     return false;
592                 }
593             );
594         }
595
596         /// Finds the key \p val using \p pred predicate for searching
597         /**
598             The function is an analog of \p find(K const&, Func)
599             but \p pred is used for key comparing.
600             \p Less functor has the interface like \p std::less.
601             \p Less must imply the same element order as the comparator used for building the map.
602         */
603         template <typename K, typename Less, typename Func>
604         bool find_with( K const& key, Less pred, Func f )
605         {
606             CDS_UNUSED( pred );
607             return do_find( key, cds::opt::details::make_comparator_from_less<Less>(), 
608                 [&f]( node_type * pNode ) -> bool {
609                     assert( pNode != nullptr );
610                     mapped_type pVal = pNode->m_pValue.load( memory_model::memory_order_relaxed );
611                     if ( pVal ) {
612                         f( pNode->m_key, *pVal );
613                         return true;
614                     }
615                     return false;
616                 } 
617             );
618         }
619
620         /// Find the key \p key
621         /**
622             The function searches the item with key equal to \p key
623             and returns \p true if it is found, and \p false otherwise.
624
625             The function applies RCU lock internally.
626         */
627         template <typename K>
628         bool find( K const& key )
629         {
630             return do_find( key, key_comparator(), []( node_type * ) -> bool { return true; });
631         }
632
633         /// Finds the key \p val using \p pred predicate for searching
634         /**
635             The function is an analog of \p find(K const&)
636             but \p pred is used for key comparing.
637             \p Less functor has the interface like \p std::less.
638             \p Less must imply the same element order as the comparator used for building the map.
639         */
640         template <typename K, typename Less>
641         bool find_with( K const& key, Less pred )
642         {
643             CDS_UNUSED( pred );
644             return do_find( key, cds::opt::details::make_comparator_from_less<Less>(), []( node_type * ) -> bool { return true; } );
645         }
646
647         /// Clears the tree (thread safe, not atomic)
648         /**
649             The function unlink all items from the tree.
650             The function is thread safe but not atomic: in multi-threaded environment with parallel insertions
651             this sequence
652             \code
653             set.clear();
654             assert( set.empty() );
655             \endcode
656             the assertion could be raised.
657
658             For each node the \ref disposer will be called after unlinking.
659
660             RCU \p synchronize method can be called. RCU should not be locked.
661         */
662         void clear()
663         {
664             while ( extract_min() );
665         }
666
667         /// Clears the tree (not thread safe)
668         /**
669             This function is not thread safe and may be called only when no other thread deals with the tree.
670             The function is used in the tree destructor.
671         */
672         void unsafe_clear()
673         {
674             clear(); // temp solution
675             //TODO
676         }
677
678         /// Checks if the map is empty
679         bool empty() const
680         {
681             return m_Root.m_pRight.load( memory_model::memory_order_relaxed ) == nullptr;
682         }
683
684         /// Returns item count in the map
685         /**
686             Only leaf nodes containing user data are counted.
687
688             The value returned depends on item counter type provided by \p Traits template parameter.
689             If it is \p atomicity::empty_item_counter this function always returns 0.
690
691             The function is not suitable for checking the tree emptiness, use \p empty()
692             member function for this purpose.
693         */
694         size_t size() const
695         {
696             return m_ItemCounter;
697         }
698
699         /// Returns const reference to internal statistics
700         stat const& statistics() const
701         {
702             return m_stat;
703         }
704
705         /// Checks internal consistency (not atomic, not thread-safe)
706         /**
707             The debugging function to check internal consistency of the tree.
708         */
709         bool check_consistency() const
710         {
711             return check_consistency([]( size_t /*nLevel*/, size_t /*hLeft*/, size_t /*hRight*/ ){} );
712         }
713
714         /// Checks internal consistency (not atomic, not thread-safe)
715         /**
716             The debugging function to check internal consistency of the tree.
717             The functor \p Func is called if a violation of internal tree structure
718             is found:
719             \code
720             struct functor {
721                 void operator()( size_t nLevel, size_t hLeft, size_t hRight );
722             };
723             \endcode
724             where 
725             - \p nLevel - the level where the violation is found
726             - \p hLeft - the height of left subtree
727             - \p hRight - the height of right subtree
728
729             The functor is called for each violation found.
730         */
731         template <typename Func>
732         bool check_consistency( Func f ) const
733         {
734             node_type * pChild = child( m_pRoot, right_child, memory_model::memory_order_relaxed );
735             if ( pChild ) {
736                 size_t nErrors = 0;
737                 do_check_consistency( pChild, 1, f, nErrors );
738                 return nErrors == 0;
739             }
740             return true;
741         }
742
743     protected:
744         //@cond
745         template <typename Func>
746         size_t do_check_consistency( node_type * pNode, size_t nLevel, Func f, size_t& nErrors ) const
747         {
748             if ( pNode ) {
749                 size_t hLeft = do_check_consistency( child( pNode, left_child, memory_model::memory_order_relaxed ), nLevel + 1, f, nErrors );
750                 size_t hRight = do_check_consistency( child( pNode, right_child, memory_model::memory_order_relaxed ), nLevel + 1, f, nErrors );
751
752                 if ( hLeft >= hRight ) {
753                     if ( hLeft - hRight > 1 ) {
754                         f( nLevel, hLeft, hRight );
755                         ++nErrors;
756                     }
757                     return hLeft;
758                 }
759                 else {
760                     if ( hRight - hLeft > 1 ) {
761                         f( nLevel, hLeft, hRight );
762                         ++nErrors;
763                     }
764                     return hRight;
765                 }
766             }
767             return 0;
768         }
769
770         template <typename Q, typename Compare, typename Func>
771         bool do_find( Q& key, Compare cmp, Func f ) const
772         {
773             find_result result;
774             {
775                 rcu_lock l;
776                 result = try_find( key, cmp, f, m_pRoot, 1, 0 );
777             }
778             assert( result != find_result::retry );
779             return result == find_result::found;
780         }
781
782         template <typename K, typename Compare, typename Func>
783         int do_update( K const& key, Compare cmp, Func funcUpdate, int nFlags )
784         {
785             check_deadlock_policy::check();
786
787             rcu_disposer removed_list;
788             {
789                 rcu_lock l;
790                 return try_update_root( key, cmp, nFlags, funcUpdate, removed_list );
791             }
792         }
793
794         template <typename K, typename Compare, typename Func>
795         bool do_remove( K const& key, Compare cmp, Func func )
796         {
797             // Func must return true if the value was disposed
798             //              or false if the value was extracted
799
800             check_deadlock_policy::check();
801
802             rcu_disposer removed_list;
803             {
804                 rcu_lock l;
805                 return try_remove_root( key, cmp, func, removed_list );
806             }
807         }
808
809         template <typename Func>
810         mapped_type do_extract_min( Func f )
811         {
812             mapped_type pExtracted = nullptr;
813             do_extract_minmax(
814                 left_child,
815                 [&pExtracted, &f]( key_type const& key, mapped_type pVal, rcu_disposer& ) -> bool { f( key ); pExtracted = pVal; return false; }
816             );
817             return pExtracted;
818         }
819
820         template <typename Func>
821         mapped_type do_extract_max( Func f )
822         {
823             mapped_type pExtracted = nullptr;
824             do_extract_minmax(
825                 right_child,
826                 [&pExtracted, &f]( key_type const& key, mapped_type pVal, rcu_disposer& ) -> bool { f( key ); pExtracted = pVal; return false; }
827             );
828             return pExtracted;
829         }
830
831         template <typename Func>
832         void do_extract_minmax( int nDir, Func func )
833         {
834             check_deadlock_policy::check();
835
836             rcu_disposer removed_list;
837             {
838                 rcu_lock l;
839
840                 int result = update_flags::failed;
841                 do {
842                     // get right child of root
843                     node_type * pChild = child( m_pRoot, right_child, memory_model::memory_order_acquire );
844                     if ( pChild ) {
845                         version_type nChildVersion = pChild->version( memory_model::memory_order_relaxed );
846                         if ( nChildVersion & node_type::shrinking ) {
847                             m_stat.onRemoveRootWaitShrinking();
848                             pChild->template wait_until_shrink_completed<back_off>( memory_model::memory_order_relaxed );
849                             result = update_flags::retry;
850                         }
851                         else if ( pChild == child( m_pRoot, right_child, memory_model::memory_order_acquire )) {
852                             result = try_extract_minmax( nDir, func, m_pRoot, pChild, nChildVersion, removed_list );
853                         }
854                     }
855                 } while ( result == update_flags::retry );
856             }
857         }
858
859         template <typename Q>
860         mapped_type do_extract( Q const& key )
861         {
862             mapped_type pExtracted = nullptr;
863             do_remove(
864                 key,
865                 key_comparator(),
866                 [&pExtracted]( key_type const&, mapped_type pVal, rcu_disposer& ) -> bool { pExtracted = pVal; return false; }
867             );
868             return pExtracted;
869         }
870
871         template <typename Q, typename Less>
872         mapped_type do_extract_with( Q const& key, Less pred )
873         {
874             CDS_UNUSED( pred );
875             mapped_type pExtracted = nullptr;
876             do_remove(
877                 key,
878                 cds::opt::details::make_comparator_from_less<Less>(),
879                 [&pExtracted]( key_type const&, mapped_type pVal, rcu_disposer& ) -> bool { pExtracted = pVal; return false; }
880             );
881             return pExtracted;
882         }
883
884         //@endcond
885
886     private:
887         //@cond
888         template <typename Q, typename Compare, typename Func>
889         find_result try_find( Q const& key, Compare cmp, Func f, node_type * pNode, int nDir, version_type nVersion ) const
890         {
891             assert( gc::is_locked() );
892             assert( pNode );
893
894             while ( true ) {
895                 node_type * pChild = child( pNode, nDir, memory_model::memory_order_relaxed );
896                 if ( !pChild ) {
897                     if ( pNode->version( memory_model::memory_order_acquire ) != nVersion ) {
898                         m_stat.onFindRetry();
899                         return find_result::retry;
900                     }
901
902                     m_stat.onFindFailed();
903                     return find_result::not_found;
904                 }
905
906                 int nCmp = cmp( key, pChild->m_key );
907                 if ( nCmp == 0 ) {
908                     if ( pChild->is_valued( memory_model::memory_order_relaxed ) ) {
909                         // key found
910                         node_scoped_lock l( m_Monitor, *pChild );
911                         if ( pChild->is_valued( memory_model::memory_order_relaxed )) {
912                             if ( f( pChild ) ) {
913                                 m_stat.onFindSuccess();
914                                 return find_result::found;
915                             }
916                         }
917                     }
918
919                     m_stat.onFindFailed();
920                     return find_result::not_found;
921                 }
922
923                 version_type nChildVersion = pChild->version( memory_model::memory_order_acquire );
924                 if ( nChildVersion & node_type::shrinking ) {
925                     m_stat.onFindWaitShrinking();
926                     pChild->template wait_until_shrink_completed<back_off>( memory_model::memory_order_relaxed );
927
928                     if ( pNode->version( memory_model::memory_order_acquire ) != nVersion ) {
929                         m_stat.onFindRetry();
930                         return find_result::retry;
931                     }
932                 }
933                 else if ( nChildVersion != node_type::unlinked ) {
934
935                     if ( pNode->version( memory_model::memory_order_acquire ) != nVersion ) {
936                         m_stat.onFindRetry();
937                         return find_result::retry;
938                     }
939
940                     find_result found = try_find( key, cmp, f, pChild, nCmp, nChildVersion );
941                     if ( found != find_result::retry )
942                         return found;
943                 }
944             }
945         }
946
947         template <typename K, typename Compare, typename Func>
948         int try_update_root( K const& key, Compare cmp, int nFlags, Func funcUpdate, rcu_disposer& disp )
949         {
950             assert( gc::is_locked() );
951
952             int result;
953             do {
954                 // get right child of root
955                 node_type * pChild = child( m_pRoot, right_child, memory_model::memory_order_acquire );
956                 if ( pChild ) {
957                     version_type nChildVersion = pChild->version( memory_model::memory_order_relaxed );
958                     if ( nChildVersion & node_type::shrinking ) {
959                         m_stat.onUpdateRootWaitShrinking();
960                         pChild->template wait_until_shrink_completed<back_off>( memory_model::memory_order_relaxed );
961                         result = update_flags::retry;
962                     }
963                     else if ( pChild == child( m_pRoot, right_child, memory_model::memory_order_acquire )) {
964                         result = try_update( key, cmp, nFlags, funcUpdate, m_pRoot, pChild, nChildVersion, disp );
965                     }
966                 } 
967                 else {
968                     // the tree is empty
969                     if ( nFlags & update_flags::allow_insert ) {
970                         // insert into tree as right child of the root
971                         {
972                             node_scoped_lock l( m_Monitor, *m_pRoot );
973                             if ( child( m_pRoot, right_child, memory_model::memory_order_acquire ) != nullptr ) {
974                                 result = result == update_flags::retry;
975                                 continue;
976                             }
977
978                             node_type * pNew = alloc_node( key, 1, 0, m_pRoot, nullptr, nullptr );
979                             mapped_type pVal = funcUpdate( pNew );
980                             assert( pVal != nullptr );
981                             pNew->m_pValue.store( pVal, memory_model::memory_order_release );
982
983                             m_pRoot->child( pNew, right_child, memory_model::memory_order_relaxed );
984                             m_pRoot->height( 2, memory_model::memory_order_relaxed );
985                         }
986
987                         ++m_ItemCounter;
988                         m_stat.onInsertSuccess();
989                         return update_flags::result_inserted;
990                     }
991
992                     return update_flags::failed;
993                 }
994             } while ( result == update_flags::retry );
995             return result;
996         }
997
998         template <typename K, typename Compare, typename Func>
999         bool try_remove_root( K const& key, Compare cmp, Func func, rcu_disposer& disp )
1000         {
1001             assert( gc::is_locked() );
1002
1003             int result;
1004             do {
1005                 // get right child of root
1006                 node_type * pChild = child( m_pRoot, right_child, memory_model::memory_order_acquire );
1007                 if ( pChild ) {
1008                     version_type nChildVersion = pChild->version( memory_model::memory_order_relaxed );
1009                     if ( nChildVersion & node_type::shrinking ) {
1010                         m_stat.onRemoveRootWaitShrinking();
1011                         pChild->template wait_until_shrink_completed<back_off>( memory_model::memory_order_relaxed );
1012                         result = update_flags::retry;
1013                     }
1014                     else if ( pChild == child( m_pRoot, right_child, memory_model::memory_order_acquire )) {
1015                         result = try_remove( key, cmp, func, m_pRoot, pChild, nChildVersion, disp );
1016                     }
1017                 }
1018                 else
1019                     return false;
1020             } while ( result == update_flags::retry );
1021
1022             return result == update_flags::result_removed;
1023         }
1024
1025         template <typename K, typename Compare, typename Func>
1026         int try_update( K const& key, Compare cmp, int nFlags, Func funcUpdate, node_type * pParent, node_type * pNode, version_type nVersion, rcu_disposer& disp )
1027         {
1028             assert( gc::is_locked() );
1029             assert( nVersion != node_type::unlinked );
1030             CDS_UNUSED( pParent );
1031
1032             int nCmp = cmp( key, pNode->m_key );
1033             if ( nCmp == 0 ) {
1034                 if ( nFlags & update_flags::allow_update ) {
1035                     return try_update_node( funcUpdate, pNode, disp );
1036                 }
1037                 return update_flags::failed;
1038             }
1039
1040             int result;
1041             do {
1042                 node_type * pChild = child( pNode, nCmp, memory_model::memory_order_relaxed );
1043                 if ( pNode->version(memory_model::memory_order_acquire) != nVersion ) {
1044                     m_stat.onUpdateRetry();
1045                     return update_flags::retry;
1046                 }
1047
1048                 if ( pChild == nullptr ) {
1049                     // insert new node
1050                     if ( nFlags & update_flags::allow_insert )
1051                         result = try_insert_node( key, funcUpdate, pNode, nCmp, nVersion, disp );
1052                     else
1053                         result = update_flags::failed;
1054                 }
1055                 else {
1056                     // update child
1057                     result = update_flags::retry;
1058                     version_type nChildVersion = pChild->version( memory_model::memory_order_acquire );
1059                     if ( nChildVersion & node_type::shrinking ) {
1060                         m_stat.onUpdateWaitShrinking();
1061                         pChild->template wait_until_shrink_completed<back_off>( memory_model::memory_order_relaxed );
1062                         // retry
1063                     }
1064                     else if ( pChild == child( pNode, nCmp, memory_model::memory_order_relaxed )) {
1065                         // this second read is important, because it is protected by nChildVersion
1066
1067                         // validate the read that our caller took to get to node
1068                         if ( pNode->version( memory_model::memory_order_relaxed ) != nVersion ) {
1069                             m_stat.onUpdateRetry();
1070                             return update_flags::retry;
1071                         }
1072
1073                         // At this point we know that the traversal our parent took to get to node is still valid.
1074                         // The recursive implementation will validate the traversal from node to
1075                         // child, so just prior to the node nVersion validation both traversals were definitely okay.
1076                         // This means that we are no longer vulnerable to node shrinks, and we don't need
1077                         // to validate node version any more.
1078                         result = try_update( key, cmp, nFlags, funcUpdate, pNode, pChild, nChildVersion, disp );
1079                     }
1080                 }
1081             } while ( result == update_flags::retry );
1082             return result;
1083         }
1084
1085         template <typename K, typename Compare, typename Func>
1086         int try_remove( K const& key, Compare cmp, Func func, node_type * pParent, node_type * pNode, version_type nVersion, rcu_disposer& disp )
1087         {
1088             assert( gc::is_locked() );
1089             assert( nVersion != node_type::unlinked );
1090
1091             int nCmp = cmp( key, pNode->m_key );
1092             if ( nCmp == 0 )
1093                 return try_remove_node( pParent, pNode, nVersion, func, disp );
1094
1095             int result;
1096             do {
1097                 node_type * pChild = child( pNode, nCmp, memory_model::memory_order_relaxed );
1098                 if ( pNode->version(memory_model::memory_order_acquire) != nVersion ) {
1099                     m_stat.onRemoveRetry();
1100                     return update_flags::retry;
1101                 }
1102
1103                 if ( pChild == nullptr ) {
1104                     return update_flags::failed;
1105                 }
1106                 else {
1107                     // update child
1108                     result = update_flags::retry;
1109                     version_type nChildVersion = pChild->version( memory_model::memory_order_acquire );
1110                     if ( nChildVersion & node_type::shrinking ) {
1111                         m_stat.onRemoveWaitShrinking();
1112                         pChild->template wait_until_shrink_completed<back_off>( memory_model::memory_order_relaxed );
1113                         // retry
1114                     }
1115                     else if ( pChild == child( pNode, nCmp, memory_model::memory_order_relaxed )) {
1116                         // this second read is important, because it is protected by nChildVersion
1117
1118                         // validate the read that our caller took to get to node
1119                         if ( pNode->version( memory_model::memory_order_relaxed ) != nVersion ) {
1120                             m_stat.onRemoveRetry();
1121                             return update_flags::retry;
1122                         }
1123
1124                         // At this point we know that the traversal our parent took to get to node is still valid.
1125                         // The recursive implementation will validate the traversal from node to
1126                         // child, so just prior to the node nVersion validation both traversals were definitely okay.
1127                         // This means that we are no longer vulnerable to node shrinks, and we don't need
1128                         // to validate node version any more.
1129                         result = try_remove( key, cmp, func, pNode, pChild, nChildVersion, disp );
1130                     }
1131                 }
1132             } while ( result == update_flags::retry );
1133             return result;
1134         }
1135
1136         template <typename Func>
1137         int try_extract_minmax( int nDir, Func func, node_type * pParent, node_type * pNode, version_type nVersion, rcu_disposer& disp )
1138         {
1139             assert( gc::is_locked() );
1140             assert( nVersion != node_type::unlinked );
1141
1142             int result;
1143             do {
1144                 node_type * pChild = child( pNode, nDir, memory_model::memory_order_relaxed );
1145                 if ( pNode->version(memory_model::memory_order_acquire) != nVersion ) {
1146                     m_stat.onRemoveRetry();
1147                     return update_flags::retry;
1148                 }
1149
1150                 if ( pChild == nullptr ) {
1151                     // Found min/max
1152                     return try_remove_node( pParent, pNode, nVersion, func, disp );
1153                 }
1154                 else {
1155                     result = update_flags::retry;
1156                     version_type nChildVersion = pChild->version( memory_model::memory_order_acquire );
1157                     if ( nChildVersion & node_type::shrinking ) {
1158                         m_stat.onRemoveWaitShrinking();
1159                         pChild->template wait_until_shrink_completed<back_off>( memory_model::memory_order_relaxed );
1160                         // retry
1161                     }
1162                     else if ( pChild == child( pNode, nDir, memory_model::memory_order_relaxed )) {
1163                         // this second read is important, because it is protected by nChildVersion
1164
1165                         // validate the read that our caller took to get to node
1166                         if ( pNode->version( memory_model::memory_order_relaxed ) != nVersion ) {
1167                             m_stat.onRemoveRetry();
1168                             return update_flags::retry;
1169                         }
1170
1171                         // At this point we know that the traversal our parent took to get to node is still valid.
1172                         // The recursive implementation will validate the traversal from node to
1173                         // child, so just prior to the node nVersion validation both traversals were definitely okay.
1174                         // This means that we are no longer vulnerable to node shrinks, and we don't need
1175                         // to validate node version any more.
1176                         result = try_extract_minmax( nDir, func, pNode, pChild, nChildVersion, disp );
1177                     }
1178                 }
1179             } while ( result == update_flags::retry );
1180             return result;
1181         }
1182
1183         template <typename K, typename Func>
1184         int try_insert_node( K const& key, Func funcUpdate, node_type * pNode, int nDir, version_type nVersion, rcu_disposer& disp )
1185         {
1186             node_type * pNew;
1187
1188             auto fnCreateNode = [&funcUpdate]( node_type * pNew ) {
1189                 mapped_type pVal = funcUpdate( pNew );
1190                 assert( pVal != nullptr );
1191                 pNew->m_pValue.store( pVal, memory_model::memory_order_relaxed );
1192             };
1193
1194             if ( c_bRelaxedInsert ) {
1195                 if ( pNode->version( memory_model::memory_order_acquire ) != nVersion
1196                      || child( pNode, nDir, memory_model::memory_order_relaxed ) != nullptr ) 
1197                 {
1198                     m_stat.onInsertRetry();
1199                     return update_flags::retry;
1200                 }
1201
1202                 fnCreateNode( pNew = alloc_node( key, 1, 0, pNode, nullptr, nullptr ));
1203             }
1204
1205             node_type * pDamaged;
1206             {
1207                 assert( pNode != nullptr );
1208                 node_scoped_lock l( m_Monitor, *pNode );
1209
1210                 if ( pNode->version( memory_model::memory_order_relaxed ) != nVersion
1211                      || child( pNode, nDir, memory_model::memory_order_relaxed ) != nullptr ) 
1212                 {
1213                     if ( c_bRelaxedInsert ) {
1214                         free_node( pNew );
1215                         m_stat.onRelaxedInsertFailed();
1216                     }
1217
1218                     m_stat.onInsertRetry();
1219                     return update_flags::retry;
1220                 }
1221
1222                 if ( !c_bRelaxedInsert )
1223                     fnCreateNode( pNew = alloc_node( key, 1, 0, pNode, nullptr, nullptr ));
1224
1225                 pNode->child( pNew, nDir, memory_model::memory_order_relaxed );
1226                 pDamaged = fix_height_locked( pNode );
1227             }
1228
1229             ++m_ItemCounter;
1230             m_stat.onInsertSuccess();
1231
1232             fix_height_and_rebalance( pDamaged, disp );
1233
1234             return update_flags::result_inserted;
1235         }
1236
1237         template <typename Func>
1238         int try_update_node( Func funcUpdate, node_type * pNode, rcu_disposer& disp )
1239         {
1240             mapped_type pOld;
1241             assert( pNode != nullptr );
1242             {
1243                 node_scoped_lock l( m_Monitor, *pNode );
1244
1245                 if ( pNode->is_unlinked( memory_model::memory_order_relaxed )) {
1246                     m_stat.onUpdateUnlinked();
1247                     return update_flags::retry;
1248                 }
1249
1250                 pOld = pNode->value( memory_model::memory_order_relaxed );
1251                 mapped_type pVal = funcUpdate( pNode );
1252                 if ( pVal == pOld )
1253                     pOld = nullptr;
1254                 else {
1255                     assert( pVal != nullptr );
1256                     pNode->m_pValue.store( pVal, memory_model::memory_order_relaxed );
1257                 }
1258             }
1259
1260             if ( pOld ) {
1261                 disp.dispose_value(pOld);
1262                 m_stat.onDisposeValue();
1263             }
1264
1265             m_stat.onUpdateSuccess();
1266             return update_flags::result_updated;
1267         }
1268
1269         template <typename Func>
1270         int try_remove_node( node_type * pParent, node_type * pNode, version_type nVersion, Func func, rcu_disposer& disp )
1271         {
1272             assert( pParent != nullptr );
1273             assert( pNode != nullptr );
1274
1275             if ( !pNode->is_valued( atomics::memory_order_relaxed ) )
1276                 return update_flags::failed;
1277
1278             if ( child( pNode, left_child, memory_model::memory_order_relaxed ) == nullptr 
1279               || child( pNode, right_child, memory_model::memory_order_relaxed ) == nullptr )
1280             { 
1281                 node_type * pDamaged;
1282                 mapped_type pOld;
1283                 {
1284                     node_scoped_lock lp( m_Monitor, *pParent );
1285                     if ( pParent->is_unlinked( atomics::memory_order_relaxed ) || parent( pNode, memory_model::memory_order_relaxed ) != pParent )
1286                         return update_flags::retry;
1287
1288                     {
1289                         node_scoped_lock ln( m_Monitor, *pNode );
1290                         pOld = pNode->value( memory_model::memory_order_relaxed );
1291                         if ( !( pNode->version( memory_model::memory_order_relaxed ) == nVersion
1292                           && pOld 
1293                           && try_unlink_locked( pParent, pNode, disp )))
1294                         {
1295                             return update_flags::retry;
1296                         }
1297                     }
1298                     pDamaged = fix_height_locked( pParent );
1299                 }
1300
1301                 --m_ItemCounter;
1302                 if ( func( pNode->m_key, pOld, disp ))   // calls pOld disposer inside
1303                     m_stat.onDisposeValue();
1304                 else
1305                     m_stat.onExtractValue();
1306
1307                 fix_height_and_rebalance( pDamaged, disp );
1308                 return update_flags::result_removed;
1309             }
1310             else {
1311                 int result = update_flags::retry;
1312                 mapped_type pOld;
1313                 {
1314                     node_scoped_lock ln( m_Monitor, *pNode );
1315                     pOld = pNode->value( atomics::memory_order_relaxed );
1316                     if ( pNode->version( atomics::memory_order_relaxed ) == nVersion && pOld ) {
1317                         pNode->m_pValue.store( nullptr, atomics::memory_order_relaxed );
1318                         result = update_flags::result_removed;
1319                     }
1320                 }
1321
1322                 if ( result == update_flags::result_removed ) {
1323                     --m_ItemCounter;
1324                     if ( func( pNode->m_key, pOld, disp ))  // calls pOld disposer inside
1325                         m_stat.onDisposeValue();
1326                     else
1327                         m_stat.onExtractValue();
1328                 }
1329
1330                 return result;
1331             }
1332         }
1333
1334         bool try_unlink_locked( node_type * pParent, node_type * pNode, rcu_disposer& disp )
1335         {
1336             // pParent and pNode must be locked
1337             assert( !pParent->is_unlinked(memory_model::memory_order_relaxed) );
1338
1339             node_type * pParentLeft = child( pParent, left_child, memory_model::memory_order_relaxed );
1340             node_type * pParentRight = child( pParent, right_child, memory_model::memory_order_relaxed );
1341             if ( pNode != pParentLeft && pNode != pParentRight ) {
1342                 // node is no longer a child of parent
1343                 return false;
1344             }
1345
1346             assert( !pNode->is_unlinked( memory_model::memory_order_relaxed ) );
1347             assert( pParent == parent( pNode, memory_model::memory_order_relaxed));
1348
1349             node_type * pLeft = child( pNode, left_child, memory_model::memory_order_relaxed );
1350             node_type * pRight = child( pNode, right_child, memory_model::memory_order_relaxed );
1351             if ( pLeft != nullptr && pRight != nullptr ) {
1352                 // splicing is no longer possible
1353                 return false;
1354             }
1355             node_type * pSplice = pLeft ? pLeft : pRight;
1356
1357             if ( pParentLeft == pNode )
1358                 pParent->m_pLeft.store( pSplice, memory_model::memory_order_relaxed );
1359             else
1360                 pParent->m_pRight.store( pSplice, memory_model::memory_order_relaxed );
1361
1362             if ( pSplice )
1363                 pSplice->m_pParent.store( pParent, memory_model::memory_order_release );
1364
1365             // Mark the node as unlinked
1366             pNode->version( node_type::unlinked, memory_model::memory_order_release );
1367
1368             // The value will be disposed by calling function
1369             pNode->m_pValue.store( nullptr, memory_model::memory_order_relaxed );
1370
1371             disp.dispose( pNode );
1372             m_stat.onDisposeNode();
1373
1374             return true;
1375         }
1376
1377         //@endcond
1378
1379     private: // rotations
1380         //@cond
1381         int estimate_node_condition( node_type * pNode )
1382         {
1383             node_type * pLeft = child( pNode, left_child, memory_model::memory_order_relaxed );
1384             node_type * pRight = child( pNode, right_child, memory_model::memory_order_relaxed );
1385
1386             if ( (pLeft == nullptr || pRight == nullptr) && !pNode->is_valued( memory_model::memory_order_relaxed ))
1387                 return unlink_required;
1388
1389             int h = pNode->height( memory_model::memory_order_relaxed );
1390             int hL = pLeft ? pLeft->height( memory_model::memory_order_relaxed ) : 0;
1391             int hR = pRight ? pRight->height( memory_model::memory_order_relaxed ) : 0;
1392
1393             int hNew = 1 + std::max( hL, hR );
1394             int nBalance = hL - hR;
1395
1396             if ( nBalance < -1 || nBalance > 1 )
1397                 return rebalance_required;
1398
1399             return h != hNew ? hNew : nothing_required;
1400         }
1401
1402         node_type * fix_height( node_type * pNode )
1403         {
1404             assert( pNode != nullptr );
1405             node_scoped_lock l( m_Monitor, *pNode );
1406             return fix_height_locked( pNode );
1407         }
1408
1409         node_type * fix_height_locked( node_type * pNode )
1410         {
1411             // pNode must be locked!!!
1412             int h = estimate_node_condition( pNode );
1413             switch ( h ) {
1414                 case rebalance_required:
1415                 case unlink_required:
1416                     return pNode;
1417                 case nothing_required:
1418                     return nullptr;
1419                 default:
1420                     pNode->height( h, memory_model::memory_order_relaxed );
1421                     return parent( pNode, memory_model::memory_order_relaxed );
1422             }
1423         }
1424
1425         void fix_height_and_rebalance( node_type * pNode, rcu_disposer& disp )
1426         {
1427             while ( pNode && parent( pNode, memory_model::memory_order_relaxed )) {
1428                 int nCond = estimate_node_condition( pNode );
1429                 if ( nCond == nothing_required || pNode->is_unlinked( memory_model::memory_order_relaxed ) )
1430                     return;
1431
1432                 if ( nCond != unlink_required && nCond != rebalance_required )
1433                     pNode = fix_height( pNode );
1434                 else {
1435                     node_type * pParent = parent( pNode, memory_model::memory_order_relaxed );
1436                     assert( pParent != nullptr );
1437                     {
1438                         node_scoped_lock lp( m_Monitor, *pParent );
1439                         if ( !pParent->is_unlinked( memory_model::memory_order_relaxed )
1440                              && parent( pNode, memory_model::memory_order_relaxed ) == pParent ) 
1441                         {
1442                             node_scoped_lock ln( m_Monitor, *pNode );
1443                             pNode = rebalance_locked( pParent, pNode, disp );
1444                         }
1445                     }
1446                 }
1447             }
1448         }
1449
1450         node_type * rebalance_locked( node_type * pParent, node_type * pNode, rcu_disposer& disp )
1451         {
1452             // pParent and pNode should be locked.
1453             // Returns a damaged node, or nullptr if no more rebalancing is necessary
1454             assert( parent( pNode, memory_model::memory_order_relaxed ) == pParent );
1455             assert( child( pParent, left_child, memory_model::memory_order_relaxed ) == pNode
1456                  || child( pParent, right_child, memory_model::memory_order_relaxed ) == pNode );
1457
1458             node_type * pLeft = child( pNode, left_child, memory_model::memory_order_relaxed );
1459             node_type * pRight = child( pNode, right_child, memory_model::memory_order_relaxed );
1460
1461             if ( (pLeft == nullptr || pRight == nullptr) && !pNode->is_valued( memory_model::memory_order_relaxed )) {
1462                 if ( try_unlink_locked( pParent, pNode, disp ))
1463                     return fix_height_locked( pParent );
1464                 else {
1465                     // retry needed for pNode
1466                     return pNode;
1467                 }
1468             }
1469
1470             int h = pNode->height( memory_model::memory_order_relaxed );
1471             int hL = pLeft ? pLeft->height( memory_model::memory_order_relaxed ) : 0;
1472             int hR = pRight ? pRight->height( memory_model::memory_order_relaxed ) : 0;
1473             int hNew = 1 + std::max( hL, hR );
1474             int balance = hL - hR;
1475
1476             if ( balance > 1 )
1477                 return rebalance_to_right_locked( pParent, pNode, pLeft, hR );
1478             else if ( balance < -1 )
1479                 return rebalance_to_left_locked( pParent, pNode, pRight, hL );
1480             else if ( hNew != h ) {
1481                 pNode->height( hNew, memory_model::memory_order_relaxed );
1482
1483                 // pParent is already locked
1484                 return fix_height_locked( pParent );
1485             }
1486             else
1487                 return nullptr;
1488         }
1489
1490         node_type * rebalance_to_right_locked( node_type * pParent, node_type * pNode, node_type * pLeft, int hR )
1491         {
1492             assert( parent( pNode, memory_model::memory_order_relaxed ) == pParent );
1493             assert( child( pParent, left_child, memory_model::memory_order_relaxed ) == pNode
1494                  || child( pParent, right_child, memory_model::memory_order_relaxed ) == pNode );
1495
1496             // pParent and pNode is locked yet
1497             // pNode->pLeft is too large, we will rotate-right.
1498             // If pLeft->pRight is taller than pLeft->pLeft, then we will first rotate-left pLeft.
1499
1500             {
1501                 assert( pLeft != nullptr );
1502                 node_scoped_lock l( m_Monitor, *pLeft );
1503                 if ( pNode->m_pLeft.load( memory_model::memory_order_relaxed ) != pLeft )
1504                     return pNode; // retry for pNode
1505
1506                 int hL = pLeft->height( memory_model::memory_order_relaxed );
1507                 if ( hL - hR <= 1 )
1508                     return pNode; // retry
1509
1510                 node_type * pLRight = child( pLeft, right_child, memory_model::memory_order_relaxed );
1511                 int hLR = pLRight ? pLRight->height( memory_model::memory_order_relaxed ) : 0;
1512                 node_type * pLLeft = child( pLeft, left_child, memory_model::memory_order_relaxed );
1513                 int hLL = pLLeft ? pLLeft->height( memory_model::memory_order_relaxed ) : 0;
1514
1515                 if ( hLL > hLR ) {
1516                     // rotate right
1517                     return rotate_right_locked( pParent, pNode, pLeft, hR, hLL, pLRight, hLR );
1518                 }
1519                 else {
1520                     assert( pLRight != nullptr );
1521                     {
1522                         node_scoped_lock lr( m_Monitor, *pLRight );
1523                         if ( pLeft->m_pRight.load( memory_model::memory_order_relaxed ) != pLRight )
1524                             return pNode; // retry
1525
1526                         hLR = pLRight->height( memory_model::memory_order_relaxed );
1527                         if ( hLL > hLR )
1528                             return rotate_right_locked( pParent, pNode, pLeft, hR, hLL, pLRight, hLR );
1529
1530                         node_type * pLRLeft = child( pLRight, left_child, memory_model::memory_order_relaxed );
1531                         int hLRL = pLRLeft ? pLRLeft->height( memory_model::memory_order_relaxed  ) : 0;
1532                         int balance = hLL - hLRL;
1533                         if ( balance >= -1 && balance <= 1 && !((hLL == 0 || hLRL == 0) && !pLeft->is_valued(memory_model::memory_order_relaxed))) {
1534                             // nParent.child.left won't be damaged after a double rotation
1535                             return rotate_right_over_left_locked( pParent, pNode, pLeft, hR, hLL, pLRight, hLRL );
1536                         }
1537                     }
1538
1539                     // focus on pLeft, if necessary pNode will be balanced later
1540                     return rebalance_to_left_locked( pNode, pLeft, pLRight, hLL );
1541                 }
1542             }
1543         }
1544
1545         node_type * rebalance_to_left_locked( node_type * pParent, node_type * pNode, node_type * pRight, int hL )
1546         {
1547             assert( parent( pNode, memory_model::memory_order_relaxed ) == pParent );
1548             assert( child( pParent, left_child, memory_model::memory_order_relaxed ) == pNode
1549                  || child( pParent, right_child, memory_model::memory_order_relaxed ) == pNode );
1550
1551             // pParent and pNode is locked yet
1552             {
1553                 assert( pRight != nullptr );
1554                 node_scoped_lock l( m_Monitor, *pRight );
1555                 if ( pNode->m_pRight.load( memory_model::memory_order_relaxed ) != pRight )
1556                     return pNode; // retry for pNode
1557
1558                 int hR = pRight->height( memory_model::memory_order_relaxed );
1559                 if ( hL - hR >= -1 )
1560                     return pNode; // retry
1561
1562                 node_type * pRLeft = child( pRight, left_child, memory_model::memory_order_relaxed );
1563                 int hRL = pRLeft ? pRLeft->height( memory_model::memory_order_relaxed ) : 0;
1564                 node_type * pRRight = child( pRight, right_child, memory_model::memory_order_relaxed );
1565                 int hRR = pRRight ? pRRight->height( memory_model::memory_order_relaxed ) : 0;
1566                 if ( hRR > hRL )
1567                     return rotate_left_locked( pParent, pNode, hL, pRight, pRLeft, hRL, hRR );
1568
1569                 {
1570                     assert( pRLeft != nullptr );
1571                     node_scoped_lock lrl( m_Monitor, *pRLeft );
1572                     if ( pRight->m_pLeft.load( memory_model::memory_order_relaxed ) != pRLeft )
1573                         return pNode; // retry
1574
1575                     hRL = pRLeft->height( memory_model::memory_order_relaxed );
1576                     if ( hRR >= hRL )
1577                         return rotate_left_locked( pParent, pNode, hL, pRight, pRLeft, hRL, hRR );
1578
1579                     node_type * pRLRight = child( pRLeft, right_child, memory_model::memory_order_relaxed );
1580                     int hRLR = pRLRight ? pRLRight->height( memory_model::memory_order_relaxed ) : 0;
1581                     int balance = hRR - hRLR;
1582                     if ( balance >= -1 && balance <= 1 && !((hRR == 0 || hRLR == 0) && !pRight->is_valued( memory_model::memory_order_relaxed )))
1583                          return rotate_left_over_right_locked( pParent, pNode, hL, pRight, pRLeft, hRR, hRLR );
1584                 }
1585                 return rebalance_to_right_locked( pNode, pRight, pRLeft, hRR );
1586             }
1587         }
1588
1589         static void begin_change( node_type * pNode, version_type version )
1590         {
1591             pNode->version( version | node_type::shrinking, memory_model::memory_order_release );
1592         }
1593         static void end_change( node_type * pNode, version_type version )
1594         {
1595             // Clear shrinking and unlinked flags and increment version
1596             pNode->version( (version | node_type::version_flags) + 1, memory_model::memory_order_release );
1597         }
1598
1599         node_type * rotate_right_locked( node_type * pParent, node_type * pNode, node_type * pLeft, int hR, int hLL, node_type * pLRight, int hLR )
1600         {
1601             version_type nodeVersion = pNode->version( memory_model::memory_order_relaxed );
1602             node_type * pParentLeft = child( pParent, left_child, memory_model::memory_order_relaxed );
1603
1604             begin_change( pNode, nodeVersion );
1605
1606             pNode->m_pLeft.store( pLRight, memory_model::memory_order_relaxed );
1607             if ( pLRight != nullptr )
1608                 pLRight->m_pParent.store( pNode, memory_model::memory_order_relaxed  );
1609
1610             pLeft->m_pRight.store( pNode, memory_model::memory_order_relaxed );
1611             pNode->m_pParent.store( pLeft, memory_model::memory_order_relaxed );
1612
1613             if ( pParentLeft == pNode )
1614                 pParent->m_pLeft.store( pLeft, memory_model::memory_order_relaxed );
1615             else {
1616                 assert( pParent->m_pRight.load( memory_model::memory_order_relaxed ) == pNode );
1617                 pParent->m_pRight.store( pLeft, memory_model::memory_order_relaxed );
1618             }
1619             pLeft->m_pParent.store( pParent, memory_model::memory_order_relaxed );
1620
1621             // fix up heights links
1622             int hNode = 1 + std::max( hLR, hR );
1623             pNode->height( hNode, memory_model::memory_order_relaxed );
1624             pLeft->height( 1 + std::max( hLL, hNode ), memory_model::memory_order_relaxed );
1625
1626             end_change( pNode, nodeVersion );
1627             m_stat.onRotateRight();
1628
1629             // We have damaged pParent, pNode (now parent.child.right), and pLeft (now
1630             // parent.child).  pNode is the deepest.  Perform as many fixes as we can
1631             // with the locks we've got.
1632
1633             // We've already fixed the height for pNode, but it might still be outside
1634             // our allowable balance range.  In that case a simple fix_height_locked()
1635             // won't help.
1636             int nodeBalance = hLR - hR;
1637             if ( nodeBalance < -1 || nodeBalance > 1 ) {
1638                 // we need another rotation at pNode
1639                 return pNode;
1640             }
1641
1642             // we've fixed balance and height damage for pNode, now handle
1643             // extra-routing node damage
1644             if ( (pLRight == nullptr || hR == 0) && !pNode->is_valued(memory_model::memory_order_relaxed)) {
1645                 // we need to remove pNode and then repair
1646                 return pNode;
1647             }
1648
1649             // we've already fixed the height at pLeft, do we need a rotation here?
1650             int leftBalance = hLL - hNode;
1651             if ( leftBalance < -1 || leftBalance > 1 )
1652                 return pLeft;
1653
1654             // pLeft might also have routing node damage (if pLeft.left was null)
1655             if ( hLL == 0 && !pLeft->is_valued( memory_model::memory_order_relaxed ))
1656                 return pLeft;
1657
1658             // try to fix the parent height while we've still got the lock
1659             return fix_height_locked( pParent );
1660         }
1661
1662         node_type * rotate_left_locked( node_type * pParent, node_type * pNode, int hL, node_type * pRight, node_type * pRLeft, int hRL, int hRR )
1663         {
1664             version_type nodeVersion = pNode->version( memory_model::memory_order_relaxed );
1665             node_type * pParentLeft = child( pParent, left_child, memory_model::memory_order_relaxed );
1666
1667             begin_change( pNode, nodeVersion );
1668
1669             // fix up pNode links, careful to be compatible with concurrent traversal for all but pNode
1670             pNode->m_pRight.store( pRLeft, memory_model::memory_order_relaxed );
1671             if ( pRLeft != nullptr )
1672                 pRLeft->m_pParent.store( pNode, memory_model::memory_order_relaxed );
1673
1674             pRight->m_pLeft.store( pNode, memory_model::memory_order_relaxed );
1675             pNode->m_pParent.store( pRight, memory_model::memory_order_relaxed );
1676
1677             if ( pParentLeft == pNode )
1678                 pParent->m_pLeft.store( pRight, memory_model::memory_order_relaxed );
1679             else {
1680                 assert( pParent->m_pRight.load( memory_model::memory_order_relaxed ) == pNode );
1681                 pParent->m_pRight.store( pRight, memory_model::memory_order_relaxed );
1682             }
1683             pRight->m_pParent.store( pParent, memory_model::memory_order_relaxed );
1684
1685             // fix up heights
1686             int hNode = 1 + std::max( hL, hRL );
1687             pNode->height( hNode, memory_model::memory_order_relaxed );
1688             pRight->height( 1 + std::max( hNode, hRR ), memory_model::memory_order_relaxed );
1689
1690             end_change( pNode, nodeVersion );
1691             m_stat.onRotateLeft();
1692
1693             int nodeBalance = hRL - hL;
1694             if ( nodeBalance < -1 || nodeBalance > 1 )
1695                 return pNode;
1696
1697             if ( (pRLeft == nullptr || hL == 0) && !pNode->is_valued( memory_model::memory_order_relaxed ))
1698                 return pNode;
1699
1700             int rightBalance = hRR - hNode;
1701             if ( rightBalance < -1 || rightBalance > 1 )
1702                 return pRight;
1703
1704             if ( hRR == 0 && !pRight->is_valued( memory_model::memory_order_relaxed ))
1705                 return pRight;
1706
1707             return fix_height_locked( pParent );
1708         }
1709
1710         node_type * rotate_right_over_left_locked( node_type * pParent, node_type * pNode, node_type * pLeft, int hR, int hLL, node_type * pLRight, int hLRL )
1711         {
1712             version_type nodeVersion = pNode->version( memory_model::memory_order_relaxed );
1713             version_type leftVersion = pLeft->version( memory_model::memory_order_relaxed );
1714
1715             node_type * pPL = child( pParent, left_child, memory_model::memory_order_relaxed );
1716             node_type * pLRL = child( pLRight, left_child, memory_model::memory_order_relaxed );
1717             node_type * pLRR = child( pLRight, right_child, memory_model::memory_order_relaxed );
1718             int hLRR = pLRR ? pLRR->height( memory_model::memory_order_relaxed ) : 0;
1719
1720             begin_change( pNode, nodeVersion );
1721             begin_change( pLeft, leftVersion );
1722
1723             // fix up pNode links, careful about the order!
1724             pNode->m_pLeft.store( pLRR, memory_model::memory_order_relaxed );
1725             if ( pLRR != nullptr )
1726                 pLRR->m_pParent.store( pNode, memory_model::memory_order_relaxed );
1727
1728             pLeft->m_pRight.store( pLRL, memory_model::memory_order_relaxed );
1729             if ( pLRL != nullptr )
1730                 pLRL->m_pParent.store( pLeft, memory_model::memory_order_relaxed );
1731
1732             pLRight->m_pLeft.store( pLeft, memory_model::memory_order_relaxed );
1733             pLeft->m_pParent.store( pLRight, memory_model::memory_order_relaxed );
1734             pLRight->m_pRight.store( pNode, memory_model::memory_order_relaxed );
1735             pNode->m_pParent.store( pLRight, memory_model::memory_order_relaxed );
1736
1737             if ( pPL == pNode )
1738                 pParent->m_pLeft.store( pLRight, memory_model::memory_order_relaxed );
1739             else {
1740                 assert( child( pParent, right_child, memory_model::memory_order_relaxed ) == pNode );
1741                 pParent->m_pRight.store( pLRight, memory_model::memory_order_relaxed );
1742             }
1743             pLRight->m_pParent.store( pParent, memory_model::memory_order_relaxed );
1744
1745             // fix up heights
1746             int hNode = 1 + std::max( hLRR, hR );
1747             pNode->height( hNode, memory_model::memory_order_relaxed );
1748             int hLeft = 1 + std::max( hLL, hLRL );
1749             pLeft->height( hLeft, memory_model::memory_order_relaxed );
1750             pLRight->height( 1 + std::max( hLeft, hNode ), memory_model::memory_order_relaxed );
1751
1752             end_change( pNode, nodeVersion );
1753             end_change( pLeft, leftVersion );
1754             m_stat.onRotateRightOverLeft();
1755
1756             // caller should have performed only a single rotation if pLeft was going
1757             // to end up damaged
1758             assert( hLL - hLRL <= 1 && hLRL - hLL <= 1 );
1759             assert( !((hLL == 0 || pLRL == nullptr) && !pLeft->is_valued( memory_model::memory_order_relaxed )));
1760
1761             // We have damaged pParent, pLR (now parent.child), and pNode (now
1762             // parent.child.right).  pNode is the deepest.  Perform as many fixes as we
1763             // can with the locks we've got.
1764
1765             // We've already fixed the height for pNode, but it might still be outside
1766             // our allowable balance range.  In that case a simple fix_height_locked()
1767             // won't help.
1768             int nodeBalance = hLRR - hR;
1769             if ( nodeBalance < -1 || nodeBalance > 1 ) {
1770                 // we need another rotation at pNode
1771                 return pNode;
1772             }
1773
1774             // pNode might also be damaged by being an unnecessary routing node
1775             if ( (pLRR == nullptr || hR == 0) && !pNode->is_valued( memory_model::memory_order_relaxed )) {
1776                 // repair involves splicing out pNode and maybe more rotations
1777                 return pNode;
1778             }
1779
1780             // we've already fixed the height at pLRight, do we need a rotation here?
1781             int balanceLR = hLeft - hNode;
1782             if ( balanceLR < -1 || balanceLR > 1 )
1783                 return pLRight;
1784
1785             // try to fix the parent height while we've still got the lock
1786             return fix_height_locked( pParent );
1787         }
1788
1789         node_type * rotate_left_over_right_locked( node_type * pParent, node_type * pNode, int hL, node_type * pRight, node_type * pRLeft, int hRR, int hRLR )
1790         {
1791             version_type nodeVersion = pNode->version( memory_model::memory_order_relaxed );
1792             version_type rightVersion = pRight->version( memory_model::memory_order_relaxed );
1793
1794             node_type * pPL = child( pParent, left_child, memory_model::memory_order_relaxed );
1795             node_type * pRLL = child( pRLeft, left_child, memory_model::memory_order_relaxed );
1796             node_type * pRLR = child( pRLeft, right_child, memory_model::memory_order_relaxed );
1797             int hRLL = pRLL ? pRLL->height( memory_model::memory_order_relaxed ) : 0;
1798
1799             begin_change( pNode, nodeVersion );
1800             begin_change( pRight, rightVersion );
1801
1802             // fix up pNode links, careful about the order!
1803             pNode->m_pRight.store( pRLL, memory_model::memory_order_relaxed );
1804             if ( pRLL != nullptr )
1805                 pRLL->m_pParent.store( pNode, memory_model::memory_order_relaxed );
1806
1807             pRight->m_pLeft.store( pRLR, memory_model::memory_order_relaxed );
1808             if ( pRLR != nullptr )
1809                 pRLR->m_pParent.store( pRight, memory_model::memory_order_relaxed );
1810
1811             pRLeft->m_pRight.store( pRight, memory_model::memory_order_relaxed );
1812             pRight->m_pParent.store( pRLeft, memory_model::memory_order_relaxed );
1813             pRLeft->m_pLeft.store( pNode, memory_model::memory_order_relaxed );
1814             pNode->m_pParent.store( pRLeft, memory_model::memory_order_relaxed );
1815
1816             if ( pPL == pNode )
1817                 pParent->m_pLeft.store( pRLeft, memory_model::memory_order_relaxed );
1818             else {
1819                 assert( pParent->m_pRight.load( memory_model::memory_order_relaxed ) == pNode );
1820                 pParent->m_pRight.store( pRLeft, memory_model::memory_order_relaxed );
1821             }
1822             pRLeft->m_pParent.store( pParent, memory_model::memory_order_relaxed );
1823
1824             // fix up heights
1825             int hNode = 1 + std::max( hL, hRLL );
1826             pNode->height( hNode, memory_model::memory_order_relaxed );
1827             int hRight = 1 + std::max( hRLR, hRR );
1828             pRight->height( hRight, memory_model::memory_order_relaxed );
1829             pRLeft->height( 1 + std::max( hNode, hRight ), memory_model::memory_order_relaxed );
1830
1831             end_change( pNode, nodeVersion );
1832             end_change( pRight, rightVersion );
1833             m_stat.onRotateLeftOverRight();
1834
1835             assert( hRR - hRLR <= 1 && hRLR - hRR <= 1 );
1836
1837             int nodeBalance = hRLL - hL;
1838             if ( nodeBalance < -1 || nodeBalance > 1 )
1839                 return pNode;
1840             if ( (pRLL == nullptr || hL == 0) && !pNode->is_valued( memory_model::memory_order_relaxed ))
1841                 return pNode;
1842
1843             int balRL = hRight - hNode;
1844             if ( balRL < -1 || balRL > 1 )
1845                 return pRLeft;
1846
1847             return fix_height_locked( pParent );
1848         }
1849
1850         //@endcond
1851     };
1852 }} // namespace cds::container
1853
1854 #endif // #ifndef CDSLIB_CONTAINER_IMPL_BRONSON_AVLTREE_MAP_RCU_H