Added sync_monitor option
[libcds.git] / cds / container / impl / bronson_avltree_map_rcu.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_CONTAINER_IMPL_BRONSON_AVLTREE_MAP_RCU_H
4 #define CDSLIB_CONTAINER_IMPL_BRONSON_AVLTREE_MAP_RCU_H
5
6 #include <cds/container/details/bronson_avltree_base.h>
7 #include <cds/urcu/details/check_deadlock.h>
8
9 namespace cds { namespace container {
10
11     /// Bronson et al AVL-tree (RCU specialization for storing pointer to values)
12     /** @ingroup cds_nonintrusive_map
13         @ingroup cds_nonintrusive_tree
14         @headerfile cds/container/bronson_avltree_map_rcu.h
15
16         This is the specialization of \ref cds_container_BronsonAVLTreeMap_rcu "RCU-based Bronson et al AVL-tree"
17         for "key -> value pointer" map. This specialization stores the pointer to user-allocated values instead of the copy
18         of the value. When a tree node is removed, the algorithm does not free the value pointer directly, instead, it call
19         the disposer functor provided by \p Traits template parameter.
20
21         <b>Template arguments</b>:
22         - \p RCU - one of \ref cds_urcu_gc "RCU type"
23         - \p Key - key type
24         - \p T - value type to be stored in tree's nodes. Note, the specialization stores the pointer to user-allocated
25             value, not the copy.
26         - \p Traits - tree traits, default is \p bronson_avltree::traits
27             It is possible to declare option-based tree with \p bronson_avltree::make_traits metafunction
28             instead of \p Traits template argument.
29
30         @note Before including <tt><cds/container/bronson_avltree_map_rcu.h></tt> you should include appropriate RCU header file,
31         see \ref cds_urcu_gc "RCU type" for list of existing RCU class and corresponding header files.
32     */
33     template <
34         typename RCU,
35         typename Key,
36         typename T,
37 #   ifdef CDS_DOXYGEN_INVOKED
38         typename Traits = bronson_avltree::traits
39 #else
40         typename Traits
41 #endif
42     >
43     class BronsonAVLTreeMap< cds::urcu::gc<RCU>, Key, T*, Traits >
44     {
45     public:
46         typedef cds::urcu::gc<RCU>  gc;   ///< RCU Garbage collector
47         typedef Key     key_type;    ///< type of a key stored in the map
48         typedef T *     mapped_type; ///< type of value stored in the map
49         typedef Traits  traits;      ///< Traits template parameter
50
51 #   ifdef CDS_DOXYGEN_INVOKED
52         typedef implementation_defined key_comparator;    ///< key compare functor based on \p Traits::compare and \p Traits::less
53 #   else
54         typedef typename opt::details::make_comparator< key_type, traits >::type key_comparator;
55 #endif
56         typedef typename traits::item_counter           item_counter;       ///< Item counting policy
57         typedef typename traits::memory_model           memory_model;       ///< Memory ordering, see \p cds::opt::memory_model option
58         typedef typename traits::node_allocator         node_allocator_type; ///< allocator for maintaining internal nodes
59         typedef typename traits::stat                   stat;               ///< internal statistics
60         typedef typename traits::rcu_check_deadlock     rcu_check_deadlock; ///< Deadlock checking policy
61         typedef typename traits::back_off               back_off;           ///< Back-off strategy
62         typedef typename traits::disposer               disposer;           ///< Value disposer
63         typedef typename traits::sync_monitor           sync_monitor;       ///< @ref cds_sync_monitor "Synchronization monitor" type for node-level locking
64
65         /// Enabled or disabled @ref bronson_avltree::relaxed_insert "relaxed insertion"
66         static bool const c_bRelaxedInsert = traits::relaxed_insert;
67
68     protected:
69         //@cond
70         typedef typename sync_monitor::template node_injection< bronson_avltree::node< key_type, mapped_type >> node_type;
71         typedef typename node_type::version_type version_type;
72
73         typedef cds::details::Allocator< node_type, node_allocator_type > cxx_allocator;
74         typedef cds::urcu::details::check_deadlock_policy< gc, rcu_check_deadlock >   check_deadlock_policy;
75
76         enum class find_result
77         {
78             not_found,
79             found,
80             retry
81         };
82
83         enum class update_flags
84         {
85             allow_insert = 1,
86             allow_update = 2,
87             retry = 4,
88
89             failed = 0,
90             result_insert = allow_insert,
91             result_update = allow_update
92         };
93
94         enum node_condition
95         {
96             nothing_required = -3,
97             rebalance_required = -2,
98             unlink_required = -1
99         };
100
101         typedef typename sync_monitor::template scoped_lock<node_type> node_scoped_lock;
102         //@endcond
103
104     protected:
105         //@cond
106         template <typename K>
107         static node_type * alloc_node( K&& key, int nHeight, version_type version, node_type * pParent, node_type * pLeft, node_type * pRight )
108         {
109             return cxx_allocator().New( std::forward<K>( key ), nHeight, version, pParent, pLeft, pRight );
110         }
111
112         static void free_node( node_type * pNode )
113         {
114             // Free node without disposer
115             cxx_allocator().Delete( pNode );
116         }
117
118         // RCU safe disposer
119         class rcu_disposer
120         {
121             node_type *     m_pRetiredList; ///< head of retired list
122
123         public:
124             rcu_disposer()
125                 : m_pRetiredList( nullptr )
126             {}
127
128             ~rcu_disposer()
129             {
130                 clean();
131             }
132
133             void dispose( node_type * pNode )
134             {
135                 mapped_type * pVal = pNode->value( memory_model::memory_order_relaxed );
136                 if ( pVal ) {
137                     pNode->m_pValue.store( nullptr, memory_model::memory_order_relaxed );
138                     disposer()(pVal);
139                 }
140
141                 pNode->m_pNextRemoved = m_pRetiredList;
142                 m_pRetiredList = pNode;
143             }
144
145         private:
146             struct internal_disposer
147             {
148                 void operator()( node_type * p ) const
149                 {
150                     free_node( p );
151                 }
152             };
153
154             void clean()
155             {
156                 assert( !gc::is_locked() );
157
158                 for ( node_type * p = m_pRetiredList; p; ) {
159                     node_type * pNext = p->m_pNextRemoved;
160                     // Value already disposed
161                     gc::template retire_ptr<internal_disposer>( p );
162                     p = pNext;
163                 }
164             }
165         };
166
167         //@endcond
168
169     protected:
170         //@cond
171         typename node_type::base_class      m_Root;
172         node_type *                         m_pRoot;
173         item_counter                        m_ItemCounter;
174         mutable sync_monitor                m_Monitor;
175         mutable stat                        m_stat;
176         //@endcond
177
178     public:
179         /// Creates empty map
180         BronsonAVLTreeMap()
181             : m_pRoot( static_cast<node_type *>(&m_Root) )
182         {}
183
184         /// Destroys the map
185         ~BronsonAVLTreeMap()
186         {
187             unsafe_clear();
188         }
189
190         /// Inserts new node
191         /**
192             The \p key_type should be constructible from a value of type \p K.
193
194             RCU \p synchronize() can be called. RCU should not be locked.
195
196             Returns \p true if inserting successful, \p false otherwise.
197         */
198         template <typename K>
199         bool insert( K const& key, mapped_type * pVal )
200         {
201             return do_update(key, key_comparator(),
202                 [pVal]( node_type * pNode ) -> mapped_type* 
203                 {
204                     assert( pNode->m_pValue.load( memory_model::memory_order_relaxed ) == nullptr );
205                     return pVal;
206                 }, 
207                 update_flags::allow_insert 
208             ) == update_flags::result_insert;
209         }
210
211         /// Updates the value for \p key
212         /**
213             The operation performs inserting or updating the value for \p key with lock-free manner.
214             If \p bInsert is \p false, only updating of existing node is possible.
215
216             If \p key is not found and inserting is allowed (i.e. \p bInsert is \p true),
217             then the new node created from \p key is inserted into the map; note that in this case the \ref key_type should be
218             constructible from type \p K.
219             Otherwise, the value is changed to \p pVal.
220
221             RCU \p synchronize() method can be called. RCU should not be locked.
222
223             Returns <tt> std::pair<bool, bool> </tt> where \p first is \p true if operation is successfull,
224             \p second is \p true if new node has been added or \p false if the node with \p key
225             already is in the tree.
226         */
227         template <typename K, typename Func>
228         std::pair<bool, bool> update( K const& key, mapped_type * pVal, bool bInsert = true )
229         {
230             int result = do_update( key, key_comparator(),
231                 [pVal]( node_type * ) -> mapped_type* 
232                 {
233                     return pVal;
234                 },
235                 update_flags::allow_update | (bInsert ? update_flags::allow_insert : 0) 
236             );
237             return std::make_pair( result != 0, (result & update_flags::result_insert) != 0 );
238         }
239
240         /// Delete \p key from the map
241         /**
242             RCU \p synchronize() method can be called. RCU should not be locked.
243
244             Return \p true if \p key is found and deleted, \p false otherwise
245         */
246         template <typename K>
247         bool erase( K const& key )
248         {
249             //TODO
250         }
251
252         /// Deletes the item from the map using \p pred predicate for searching
253         /**
254             The function is an analog of \p erase(K const&)
255             but \p pred is used for key comparing.
256             \p Less functor has the interface like \p std::less.
257             \p Less must imply the same element order as the comparator used for building the map.
258         */
259         template <typename K, typename Less>
260         bool erase_with( K const& key, Less pred )
261         {
262             CDS_UNUSED( pred );
263             //TODO
264         }
265
266         /// Delete \p key from the map
267         /**
268             The function searches an item with key \p key, calls \p f functor
269             and deletes the item. If \p key is not found, the functor is not called.
270
271             The functor \p Func interface:
272             \code
273             struct extractor {
274                 void operator()(mapped_type& item) { ... }
275             };
276             \endcode
277
278             RCU \p synchronize method can be called. RCU should not be locked.
279
280             Return \p true if key is found and deleted, \p false otherwise
281         */
282         template <typename K, typename Func>
283         bool erase( K const& key, Func f )
284         {
285             //TODO
286         }
287
288         /// Deletes the item from the map using \p pred predicate for searching
289         /**
290             The function is an analog of \p erase(K const&, Func)
291             but \p pred is used for key comparing.
292             \p Less functor has the interface like \p std::less.
293             \p Less must imply the same element order as the comparator used for building the map.
294         */
295         template <typename K, typename Less, typename Func>
296         bool erase_with( K const& key, Less pred, Func f )
297         {
298             CDS_UNUSED( pred );
299             //TODO
300         }
301
302         /// Extracts an item with minimal key from the map
303         /**
304             Returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the leftmost item.
305             If the set is empty, returns empty \p exempt_ptr.
306
307             @note Due the concurrent nature of the map, the function extracts <i>nearly</i> minimum key.
308             It means that the function gets leftmost leaf of the tree and tries to unlink it.
309             During unlinking, a concurrent thread may insert an item with key less than leftmost item's key.
310             So, the function returns the item with minimum key at the moment of tree traversing.
311
312             RCU \p synchronize method can be called. RCU should NOT be locked.
313             The function does not free the item.
314             The deallocator will be implicitly invoked when the returned object is destroyed or when
315             its \p release() member function is called.
316         */
317         exempt_ptr extract_min()
318         {
319             //TODO
320         }
321
322         /// Extracts an item with maximal key from the map
323         /**
324             Returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the rightmost item.
325             If the set is empty, returns empty \p exempt_ptr.
326
327             @note Due the concurrent nature of the map, the function extracts <i>nearly</i> maximal key.
328             It means that the function gets rightmost leaf of the tree and tries to unlink it.
329             During unlinking, a concurrent thread may insert an item with key great than leftmost item's key.
330             So, the function returns the item with maximum key at the moment of tree traversing.
331
332             RCU \p synchronize method can be called. RCU should NOT be locked.
333             The function does not free the item.
334             The deallocator will be implicitly invoked when the returned object is destroyed or when
335             its \p release() is called.
336             @note Before reusing \p result object you should call its \p release() method.
337         */
338         exempt_ptr extract_max()
339         {
340             //TODO
341         }
342
343         /// Extracts an item from the map
344         /**
345             The function searches an item with key equal to \p key in the tree,
346             unlinks it, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to an item found.
347             If \p key is not found the function returns an empty \p exempt_ptr.
348
349             RCU \p synchronize method can be called. RCU should NOT be locked.
350             The function does not destroy the item found.
351             The dealloctor will be implicitly invoked when the returned object is destroyed or when
352             its \p release() member function is called.
353         */
354         template <typename Q>
355         exempt_ptr extract( Q const& key )
356         {
357             //TODO
358         }
359
360         /// Extracts an item from the map using \p pred for searching
361         /**
362             The function is an analog of \p extract(exempt_ptr&, Q const&)
363             but \p pred is used for key compare.
364             \p Less has the interface like \p std::less and should meet \ref cds_container_EllenBinTreeSet_rcu_less
365             "predicate requirements".
366             \p pred must imply the same element order as the comparator used for building the map.
367         */
368         template <typename Q, typename Less>
369         exempt_ptr extract_with( Q const& val, Less pred )
370         {
371             CDS_UNUSED( pred );
372             //TODO
373         }
374
375         /// Find the key \p key
376         /**
377             The function searches the item with key equal to \p key and calls the functor \p f for item found.
378             The interface of \p Func functor is:
379             \code
380             struct functor {
381                 void operator()( mapped_type& item );
382             };
383             \endcode
384             where \p item is the item found.
385             The functor is called under node-level lock.
386
387             The function applies RCU lock internally.
388
389             The function returns \p true if \p key is found, \p false otherwise.
390         */
391         template <typename K, typename Func>
392         bool find( K const& key, Func f )
393         {
394             return do_find( key, key_comparator(), [&f]( sync_monitor& monitor, node_type * pNode ) -> bool {
395                 assert( pNode != nullptr );
396                 node_scoped_lock l( monitor, *pNode );
397                 mapped_type * pVal = pNode->m_pValue.load( memory_model::memory_order_relaxed );
398                 if ( pVal ) {
399                     f( *pVal );
400                     return true;
401                 }
402                 return false;
403             });
404         }
405
406         /// Finds the key \p val using \p pred predicate for searching
407         /**
408             The function is an analog of \p find(K const&, Func)
409             but \p pred is used for key comparing.
410             \p Less functor has the interface like \p std::less.
411             \p Less must imply the same element order as the comparator used for building the map.
412         */
413         template <typename K, typename Less, typename Func>
414         bool find_with( K const& key, Less pred, Func f )
415         {
416             CDS_UNUSED( pred );
417             return do_find( key, opt::details::make_comparator_from_less<Less>(), 
418                 [&f]( sync_monitor& monitor, node_type * pNode ) -> bool {
419                     assert( pNode != nullptr );
420                     node_scoped_lock l( monitor, *pNode );
421                     mapped_type * pVal = pNode->m_pValue.load( memory_model::memory_order_relaxed );
422                     if ( pVal ) {
423                         f( *pVal );
424                         return true;
425                     }
426                     return false;
427                 } 
428             );
429         }
430
431         /// Find the key \p key
432         /**
433             The function searches the item with key equal to \p key
434             and returns \p true if it is found, and \p false otherwise.
435
436             The function applies RCU lock internally.
437         */
438         template <typename K>
439         bool find( K const& key )
440         {
441             return do_find( key, key_comparator(), []( node_type * ) -> bool { return true; });
442         }
443
444         /// Finds the key \p val using \p pred predicate for searching
445         /**
446             The function is an analog of \p find(K const&)
447             but \p pred is used for key comparing.
448             \p Less functor has the interface like \p std::less.
449             \p Less must imply the same element order as the comparator used for building the map.
450         */
451         template <typename K, typename Less>
452         bool find_with( K const& key, Less pred )
453         {
454             CDS_UNUSED( pred );
455             return do_find( key, opt::details::make_comparator_from_less<Less>(), []( node_type * ) -> bool { return true; } );
456         }
457
458         /// Finds \p key and return the item found
459         /**
460             The function searches the item with key equal to \p key and returns the pointer to item found.
461             If \p key is not found it returns \p nullptr.
462
463             RCU should be locked before call the function.
464             Returned pointer is valid while RCU is locked.
465         */
466         template <typename Q>
467         mapped_type * get( Q const& key ) const
468         {
469             //TODO
470         }
471
472         /// Finds \p key with \p pred predicate and return the item found
473         /**
474             The function is an analog of \p get(Q const&)
475             but \p pred is used for comparing the keys.
476
477             \p Less functor has the semantics like \p std::less but should take arguments of type \p key_type
478             and \p Q in any order.
479             \p pred must imply the same element order as the comparator used for building the map.
480         */
481         template <typename Q, typename Less>
482         mapped_type * get_with( Q const& key, Less pred ) const
483         {
484             CDS_UNUSED( pred );
485             //TODO
486         }
487
488         /// Clears the tree (thread safe, not atomic)
489         /**
490             The function unlink all items from the tree.
491             The function is thread safe but not atomic: in multi-threaded environment with parallel insertions
492             this sequence
493             \code
494             set.clear();
495             assert( set.empty() );
496             \endcode
497             the assertion could be raised.
498
499             For each node the \ref disposer will be called after unlinking.
500
501             RCU \p synchronize method can be called. RCU should not be locked.
502         */
503         void clear()
504         {
505             for ( exempt_ptr ep = extract_min(); !ep.empty(); ep = extract_min() )
506                 ep.release();
507         }
508
509         /// Clears the tree (not thread safe)
510         /**
511             This function is not thread safe and may be called only when no other thread deals with the tree.
512             The function is used in the tree destructor.
513         */
514         void unsafe_clear()
515         {
516             //TODO
517         }
518
519         /// Checks if the map is empty
520         bool empty() const
521         {
522             return m_Root.m_pRight.load( memory_model::memory_order_relaxed ) == nullptr;
523         }
524
525         /// Returns item count in the map
526         /**
527             Only leaf nodes containing user data are counted.
528
529             The value returned depends on item counter type provided by \p Traits template parameter.
530             If it is \p atomicity::empty_item_counter this function always returns 0.
531
532             The function is not suitable for checking the tree emptiness, use \p empty()
533             member function for this purpose.
534         */
535         size_t size() const
536         {
537             return m_ItemCounter;
538         }
539
540         /// Returns const reference to internal statistics
541         stat const& statistics() const
542         {
543             return m_stat;
544         }
545
546         /// Checks internal consistency (not atomic, not thread-safe)
547         /**
548             The debugging function to check internal consistency of the tree.
549         */
550         bool check_consistency() const
551         {
552             //TODO
553         }
554
555     protected:
556         //@cond
557         template <typename Q, typename Compare, typename Func>
558         bool do_find( Q& key, Compare cmp, Func f ) const
559         {
560             find_result result;
561             {
562                 rcu_lock l;
563                 result = try_find( key, cmp, f, m_pRoot, 1, 0 );
564             }
565             assert( result != find_result::retry );
566             return result == find_result::found;
567         }
568
569         template <typename K, typename Compare, typename Func>
570         int do_update( K const& key, Compare cmp, Func funcUpdate, int nFlags )
571         {
572             check_deadlock_policy::check();
573
574             rcu_disposer removed_list;
575             {
576                 rcu_lock l;
577                 return try_udate_root( key, cmp, nFlags, funcUpdate, removed_list );
578             }
579         }
580
581         //@endcond
582
583     private:
584         //@cond
585         template <typename Q, typename Compare, typename Func>
586         find_result try_find( Q const& key, Compare cmp, Func f, node_type * pNode, int nDir, version_type nVersion ) const
587         {
588             assert( gc::is_locked() );
589             assert( pNode );
590
591             while ( true ) {
592                 node_type * pChild = pNode->child( nDir ).load( memory_model::memory_order_relaxed );
593                 if ( !pChild ) {
594                     if ( pNode->version( memory_model::memory_order_acquire ) != nVersion ) {
595                         m_stat.onFindRetry();
596                         return find_result::retry;
597                     }
598
599                     m_stat.onFindFailed();
600                     return find_result::not_found;
601                 }
602
603                 int nCmp = cmp( key, pChild->m_key );
604                 if ( nCmp == 0 ) {
605                     if ( pChild->is_valued( memory_model::memory_order_relaxed ) ) {
606                         // key found
607                         if ( f( m_Monitor, pChild ) ) {
608                             m_stat.onFindSuccess();
609                             return find_result::found;
610                         }
611                     }
612
613                     m_stat.onFindFailed();
614                     return find_result::not_found;
615                 }
616
617                 version_type nChildVersion = pChild->version( memory_model::memory_order_acquire );
618                 if ( nChildVersion & node_type::shrinking ) {
619                     m_stat.onFindWaitShrinking();
620                     pChild->template wait_until_shrink_completed<back_off>( memory_model::memory_order_relaxed );
621
622                     if ( pNode->version( memory_model::memory_order_acquire ) != nVersion ) {
623                         m_stat.onFindRetry();
624                         return find_result::retry;
625                     }
626                 }
627                 else if ( nChildVersion != node_type::unlinked ) {
628
629                     if ( pNode->version( memory_model::memory_order_acquire ) != nVersion ) {
630                         m_stat.onFindRetry();
631                         return find_result::retry;
632                     }
633
634                     find_result found = try_find( key, cmp, f, pChild, nCmp, nChildVersion );
635                     if ( found != find_result::retry )
636                         return found;
637                 }
638             }
639         }
640
641         template <typename K, typename Compare, typename Func>
642         int try_update_root( K const* key, Compare cmp, int nFlags, Func funcUpdate, rcu_disposer& disp )
643         {
644             assert( gc::is_locked() );
645
646             int result;
647             do {
648                 node_type * pChild = m_Root.child( 1, memory_model::memory_order_acquire );
649                 if ( pChild ) {
650                     version_type nChildVersion = pChild->version( memory_model::memory_order_relaxed );
651                     if ( nChildVersion & node_type::shrinking ) {
652                         m_stat.onUpdateRootWaitShrinking();
653                         pChild->template wait_until_shrink_completed<back_off>( memory_model::memory_order_relaxed );
654                         // retry
655                     }
656                     else if ( pChild == m_Root.child( 1, memory_model::memory_order_acquire ) ) {
657                         result = try_update( key, cmp, nFlags, funcUpdate, m_pRoot, pChild, nChildVersion, disp );
658                     }
659                 } 
660                 else {
661                     // the tree is empty
662                     if ( nFlags & update_flags::allow_insert ) {
663                         // insert into tree as right child of the root
664                         {
665                             node_scoped_lock l( m_Monitor, *m_pRoot );
666
667                             node_type * pNew = alloc_node( key, 1, 0, m_pRoot, nullptr, nullptr );
668                             mapped_type * pVal = funcUpdate( pNew );
669                             assert( pVal != nullptr );
670                             pNew->m_pValue.store( pVal, memory_model::memory_order_release );
671
672                             m_pRoot->child( pNew, 1, memory_model::memory_order_relaxed );
673                             m_pRoot->height( 2, memory_model::memory_order_relaxed );
674                         }
675
676                         ++m_ItemCounter;
677                         m_stat.onInsertSuccess();
678                         return update_flags::result_insert;
679                     }
680
681                     return update_flags::failed;
682                 }
683             } while ( result != update_flags::retry );
684             return result;
685         }
686
687         template <typename K, typename Compare, typename Func>
688         int try_update( K const& key, Compare cmp, int nFlags, Func funcUpdate, node_type * pParent, node_type * pNode, version_type nVersion, rcu_disposer& disp )
689         {
690             assert( gc::is_locked() );
691             assert( nVersion != node_type::unlinked );
692
693             int nCmp = cmp( key, pNode->m_key );
694             if ( nCmp == 0 ) {
695                 if ( nFlags & update_flags::allow_update ) {
696                     return try_update_node( funcUpdate, pNode );
697                 }
698                 return update_flags::failed;
699             }
700
701             int result;
702             do {
703                 node_type * pChild = pNode->child( nCmp ).load( memory_model::memory_order_relaxed );
704                 if ( pNode->version( memory_model::memory_order_acquire ) != nVersion )
705                     return update_flags::retry;
706
707                 if ( pChild == nullptr ) {
708                     // insert new node
709                     if ( nFlags & update_flags::allow_insert )
710                         result = try_insert_node( key, funcUpdate, pNode, nCmp, nVersion, disp );
711                     else
712                         result = update_flags::failed;
713                 }
714                 else {
715                     // update child
716                     result = update_flags::retry;
717                     version_type nChildVersion = pChild->version( memory_model::memory_order_acquire );
718                     if ( nChildVersion & node_type::shrinking ) {
719                         m_stat.onUpdateWaitShrinking();
720                         pChild->template wait_until_shrink_completed<back_off>( memory_model::memory_order_relaxed );
721                         // retry
722                     }
723                     else if ( pChild == pNode->child( nCmp ).load( memory_model::memory_order_relaxed ) ) {
724                         // this second read is important, because it is protected by nChildVersion
725
726                         // validate the read that our caller took to get to node
727                         if ( pNode->version( memory_model::memory_order_relaxed ) != nVersion ) {
728                             m_stat.onUpdateRetry();
729                             return update_flags::retry;
730                         }
731
732                         // At this point we know that the traversal our parent took to get to node is still valid.
733                         // The recursive implementation will validate the traversal from node to
734                         // child, so just prior to the node nVersion validation both traversals were definitely okay.
735                         // This means that we are no longer vulnerable to node shrinks, and we don't need
736                         // to validate node version any more.
737                         result = try_update( key, cmp, nFlags, funcUpdate, pNode, pChild, nChildVersion, disp );
738                     }
739                 }
740             } while ( result == update_flags::retry );
741             return result;
742         }
743
744         template <typename K, typename Func>
745         int try_insert_node( K const& key, Func funcUpdate, node_type * pNode, int nDir, version_type nVersion, rcu_disposer& disp )
746         {
747             node_type * pNew;
748
749             auto fnCreateNode = [&funcUpdate]( node_type * pNew ) {
750                 mapped_type * pVal = funcUpdate( pNew );
751                 assert( pVal != nullptr );
752                 pNew->m_pValue.store( pVal, memory_model::memory_order_relaxed );
753             };
754
755             if ( c_bRelaxedInsert ) {
756                 if ( pNode->version( memory_model::memory_order_acquire ) != nVersion
757                      || pNode->child( nDir ).load( memory_model::memory_order_relaxed ) != nullptr ) 
758                 {
759                     m_stat.onInsertRetry();
760                     return update_flags::retry;
761                 }
762
763                 fnCreateNode( pNew = alloc_node( key, 1, 0, pNode, nullptr, nullptr ));
764             }
765
766             node_type * pDamaged;
767             {
768                 assert( pNode != nullptr );
769                 node_scoped_lock l( m_Monitor, *pNode );
770
771                 if ( pNode->version( memory_model::memory_order_relaxed ) != nVersion
772                      || pNode->child( nDir ).load( memory_model::memory_order_relaxed ) != nullptr ) 
773                 {
774                     if ( c_RelaxedInsert ) {
775                         free_node( pNew );
776                         m_stat.onRelaxedInsertFailed();
777                     }
778
779                     m_stat.onInsertRetry();
780                     return update_flags::retry;
781                 }
782
783                 if ( !c_bRelaxedInsert )
784                     fnCreateNode( pNew = alloc_node( key, 1, 0, pNode, nullptr, nullptr ));
785
786                 pNode->child( pNew, nDir, memory_model::memory_order_relaxed );
787                 pDamaged = fix_height_locked( pNode );
788             }
789             m_stat.onInsertSuccess();
790
791             fix_height_and_rebalance( pDamaged, disp );
792
793             return update_flags::result_insert;
794         }
795
796         template <typename Func>
797         int try_update_node( Func funcUpdate, node_type * pNode )
798         {
799             mapped_type * pOld;
800             {
801                 assert( pNode != nullptr );
802                 node_scoped_lock l( m_Monitor, *pNode );
803
804                 if ( pNode->version( memory_model::memory_order_relaxed ) == node_type::unlinked ) {
805                     if ( c_RelaxedInsert )
806                         disposer()(pVal);
807                     m_stat.onUpdateUnlinked();
808                     return update_flags::retry;
809                 }
810
811                 pOld = pNode->value( memory_model::memory_order_relaxed );
812                 mapped_type * pVal = funcUpdate( pNode );
813                 assert( pVal != nullptr );
814                 pNode->m_pValue.store( pVal, memory_model::memory_order_relaxed ));
815             }
816
817             if ( pOld ) {
818                 disposer()(pOld);
819                 m_stat.onDisposeValue();
820             }
821
822             m_stat.onUpdateSuccess();
823             return update_flags::result_update;
824         }
825
826         bool try_unlink_locked( node_type * pParent, node_type * pNode, rcu_disposer& disp )
827         {
828             // pParent and pNode must be locked
829             assert( !pParent->is_unlinked(memory_model::memory_order_relaxed) );
830
831             node_type * pParentLeft = pParent->m_pLeft.load( memory_model::memory_order_relaxed );
832             node_type * pParentRight = pParent->m_pRight.load( memory_model::memory_order_relaxed );
833             if ( pNode != pParentLeft && pNode != pParentRight ) {
834                 // node is no longer a child of parent
835                 return false;
836             }
837
838             assert( !pNode->is_unlinked());
839             assert( pParent == pNode->m_pParent.load(memory_model::memory_order_relaxed));
840
841             node_type * pLeft = pNode->m_pLeft.load( memory_model::memory_order_relaxed );
842             node_type * pRight = pNode->m_pRight.load( memory_model::memory_order_relaxed );
843             if ( pLeft != nullptr && pRight != nullptr ) {
844                 // splicing is no longer possible
845                 return false;
846             }
847             node_type * pSplice = pLeft ? pLeft : pRight;
848
849             if ( pParentLeft == pNode )
850                 pParent->m_pLeft.store( pSplice, memory_model::memory_order_relaxed );
851             else
852                 pParent->m_pRight.store( pSplice, memory_model::memory_order_relaxed );
853
854             if ( pSplice )
855                 pSplice->pParent.store( pParent, memory_model::memory_order_release );
856
857             // Mark the node as unlinked
858             pNode->version( node_type::unlinked, memory_model::memory_order_release );
859             disp.dispose( pNode );
860
861             return true;
862         }
863
864         //@endcond
865     private: // rotations
866         //@cond
867         int estimate_node_condition( node_type * pNode )
868         {
869             node_type * pLeft = pNode->m_pLeft.load( memory_model::memory_order_relaxed );
870             node_type * pRight = pNode->m_pRight.load( memory_model::memory_order_relaxed );
871
872             if ( (pLeft == nullptr || pRight == nullptr) && pNode->value( memory_model::memory_order_relaxed ) == nullptr )
873                 return unlink_required;
874
875             int h = pNode->height( memory_model::memory_order_relaxed );
876             int hL = pLeft ? pLeft->height( memory_model::memory_order_relaxed ) : 0;
877             int hR = pRight ? pRight->height( memory_model::memory_order_relaxed ) : 0;
878
879             int hNew = 1 + std::max( hL, hR );
880             int nBalance = hL - hR;
881
882             if ( nBalance < -1 || nBalance > 1 )
883                 return rebalance_required;
884
885             return h != hNew ? hNew : nothing_required;
886         }
887
888         node_type * fix_height( node_type * pNode )
889         {
890             assert( pNode != nullptr );
891             node_scoped_lock l( m_Monitor, *pNode );
892             return fix_height_locked( pNode );
893         }
894
895         node_type * fix_height_locked( node_type * pNode )
896         {
897             // pNode must be locked!!!
898             int h = estimate_node_condition( pNode );
899             switch ( h ) {
900                 case rebalance_required:
901                 case unlink_required:
902                     return pNode;
903                 case nothing_required:
904                     return nullptr;
905                 default:
906                     pNode->height( h, memory_model::memory_order_relaxed );
907                     return pNode->m_pParent.load( memory_model::memory_order_relaxed );
908             }
909         }
910
911         void fix_height_and_rebalance( node_type * pNode, rcu_disposer& disp )
912         {
913             while ( pNode && pNode->m_pParent.load( memory_model::memory_order_relaxed ) ) {
914                 int nCond = estimate_node_condition( pNode );
915                 if ( nCond == nothing_required || pNode->is_unlinked( memory_model::memory_order_relaxed ) )
916                     return;
917
918                 if ( nCond != unlink_required && nCond != rebalance_required )
919                     pNode = fix_height( pNode );
920                 else {
921                     node_type * pParent = pNode->m_pParent.load( memory_model::memry_order_relaxed );
922                     assert( pParent != nullptr );
923                     {
924                         node_scoped_lock lp( m_Monitor, *pParent );
925                         if ( !pParent->is_unlinked( memory_model::memory_order_relaxed )
926                              && pNode->m_pParent.load( memory_model::memory_order_relaxed ) == pParent ) 
927                         {
928                             node_scoped_lock ln( m_Monitor, *pNode );
929                             pNode = rebalance_locked( pParent, pNode, disp );
930                         }
931                     }
932                 }
933             }
934         }
935
936         node_type * rebalance_locked( node_type * pParent, node_type * pNode, rcu_disposer& disp )
937         {
938             // pParent and pNode shlould be locked.
939             // Returns a damaged node, or nullptr if no more rebalancing is necessary
940             assert( pNode->m_pParent.load( memory_model::memry_order_relaxed ) == pNode );
941             assert( m_pParent->m_pLeft.load( memory_model::memory_order_relaxed ) == pNode
942                  || m_pParent->m_pRight.load( memory_model::memory_order_relaxed ) == pNode );
943
944             node_type * pLeft = pNode->m_pLeft.load( memory_model::memory_order_relaxed );
945             node_type * pRight = pNode->m_pRight.load( memory_model::memory_order_relaxed );
946
947             if ( (pLeft == nullptr || pRight == nullptr) && pNode->value( memory_model::memory_order_relaxed ) == nullptr ) {
948                 if ( try_unlink_locked( pParent, pNode, disp ))
949                     return fix_height_locked( pParent );
950                 else {
951                     // retry needed for pNode
952                     return pNode;
953                 }
954             }
955
956             int h = pNode->height( memory_model::memory_order_relaxed );
957             int hL = pLeft ? pLeft->height( memory_model::memory_order_relaxed ) : 0;
958             int hR = pRight ? pRight->height( memory_model::memory_order_relaxed ) : 0;
959             int hNew = 1 + std::max( hL, hR );
960             int balance = hL - hR;
961
962             if ( balance > 1 )
963                 return rebalance_to_right_locked( pParent, pNode, pLeft, hR );
964             else if ( balance < -1 )
965                 return rebalance_to_left_locked( pParent, pNode, pRight, hL );
966             else if ( hNew != h ) {
967                 pNode->height( hNew, memory_model::memory_order_relaxed );
968
969                 // pParent is already locked
970                 return fix_height_locked( pParent );
971             }
972             else
973                 return nullptr;
974         }
975
976         node_type * rebalance_to_right_locked( node_type * pParent, node_type * pNode, node_type * pLeft, int hR )
977         {
978             assert( pNode->m_pParent.load( memory_model::memry_order_relaxed ) == pNode );
979             assert( m_pParent->m_pLeft.load( memory_model::memory_order_relaxed ) == pNode
980                  || m_pParent->m_pRight.load( memory_model::memory_order_relaxed ) == pNode );
981
982             // pParent and pNode is locked yet
983             // pNode->pLeft is too large, we will rotate-right.
984             // If pLeft->pRight is taller than pLeft->pLeft, then we will first rotate-left pLeft.
985
986             {
987                 assert( pLeft != nullptr );
988                 node_scoped_lock l( m_Monitor, *pLeft );
989                 if ( pNode->m_pLeft.load( memory_model::memory_order_relaxed ) != pLeft )
990                     return pNode; // retry for pNode
991
992                 int hL = pLeft->height( memory_model::memory_order_relaxed );
993                 if ( hL - hR <= 1 )
994                     return pNode; // retry
995
996                 node_type * pLRight = pLeft->m_pRight.load( memory_model::memory_order_relaxed );
997                 int hLR = pLRight ? pLRight->height( memory_model::memory_order_relaxed ) : 0;
998                 node_type * pLLeft = pLeft->m_pLeft.load( memory_model::memory_order_relaxed );
999                 int hLL = pLLeft ? pLLeft->height( memory_model::memory_order_relaxed ) : 0;
1000
1001                 if ( hLL > hLR ) {
1002                     // rotate right
1003                     return rotate_right_locked( pParent, pNode, pLeft, hR, hLL, pLRight, hLR );
1004                 }
1005                 else {
1006                     assert( pLRight != nullptr );
1007                     {
1008                         node_scoped_lock lr( m_Monitor, *pLRight );
1009                         if ( pLeft->m_pRight.load( memory_model::memory_order_relaxed ) != pLRight )
1010                             return pNode; // retry
1011
1012                         hLR = pLRight->height( memory_model::memory_order_relaxed );
1013                         if ( hLL > hLR )
1014                             return rotate_right_locked( pParent, pNode, pLeft, hR, hLL, pLRight, hLR );
1015
1016                         node_type * pLRLeft = pLRight->m_pLeft.load( memory_model::memory_order_relaxed );
1017                         int hLRL = pLRLeft ? pLRLeft->height( memory_model::memory_order_relaxed  ) : 0;
1018                         int balance = hLL - hLRL;
1019                         if ( balance >= -1 && balance <= 1 && !((hLL == 0 || hLRL == 0) && pLeft->value(memory_model::memory_order_relaxed) == nullptr) ) {
1020                             // nParent.child.left won't be damaged after a double rotation
1021                             return rotate_right_over_left_locked( pParent, pNode, pLeft, hR, hLL, pLRight, hLRL );
1022                         }
1023                     }
1024
1025                     // focus on pLeft, if necessary pNode will be balanced later
1026                     return rebalance_to_left_locked( pNode, pLeft, pLRight, hLL );
1027                 }
1028             }
1029         }
1030
1031         node_type * rebalance_to_left_locked( node_type * pParent, node_type * pNode, node_type * pRight, int hL )
1032         {
1033             assert( pNode->m_pParent.load( memory_model::memry_order_relaxed ) == pNode );
1034             assert( m_pParent->m_pLeft.load( memory_model::memory_order_relaxed ) == pNode
1035                     || m_pParent->m_pRight.load( memory_model::memory_order_relaxed ) == pNode );
1036
1037             // pParent and pNode is locked yet
1038             {
1039                 assert( pRight != nullptr );
1040                 node_scoped_lock l( m_Monitor, *pRight );
1041                 if ( pNode->m_pRight.load( memory_model::memory_order_relaxed ) != pRight )
1042                     return pNode; // retry for pNode
1043
1044                 int hR = pRight->height( memory_model::memory_order_relaxed );
1045                 if ( hL - hR >= -1 )
1046                     return pNode; // retry
1047
1048                 node_type * pRLeft = pRight->m_pLeft.load( memory_model::memory_order_relaxed );
1049                 int hRL = pRLeft > pRLeft->height( memory_model::memory_order_relaxed ) : 0;
1050                 node_type * pRRight = pRight->m_pRight.load( memory_model::memory_order_relaxed );
1051                 int hRR = pRRight ? pRRight->height( memory_model::memory_order_relaxed ) : 0;
1052                 if ( hRR > hRL )
1053                     return rotate_left_locked( pParent, pNode, hL, pRight, pRLeft, hRL, hRR );
1054
1055                 {
1056                     assert( pRLeft != nullptr );
1057                     node_scoped_lock lrl( m_Monitor, *pRLeft );
1058                     if ( pRight->m_pLeft.load( memory_model::memory_order_relaxed ) != pRLeft )
1059                         return pNode; // retry
1060
1061                     hRL = pRLeft->height( memory_model::memory_order_relaxed );
1062                     if ( hRR >= hRL )
1063                         return rotate_left_locked( pParent, pNode, hL, pRight, pRLeft, hRL, hRR );
1064
1065                     node_type * pRLRight = pRLeft->m_pRight.load( memory_model::memory_order_relaxed );
1066                     int hRLR = pRLRight ? pRLRight->height( memory_model::memory_order_relaxed ) : 0;
1067                     int balance = hRR - hRLR;
1068                     if ( b >= -1 && b <= 1 && !((hRR == 0 || hRLR == 0) && pRight->value( memory_odel::memory_order_relaxed ) == nullptr)
1069                          return rotate_left_over_right_locked( pParent, pNode, hL, pRight, pRLeft, hRR, hRLR );
1070                 }
1071                 return rebalance_to_right_locked( pNode, pRight, pRLeft, hRR );
1072             }
1073         }
1074
1075         static void begin_change( node_type * pNode, version_type version )
1076         {
1077             pNode->version( version | node_type::shrinking, memory_model::memory_order_release );
1078         }
1079         static void end_change( node_type * pNode, version_type version )
1080         {
1081             // Clear shrinking and unlinked flags and increment version
1082             pNode->version( (version | node_type::version_flags) + 1, memory_model::memory_order_release );
1083         }
1084
1085         node_type * rotate_right_locked( node_type * pParent, node_type * pNode, node_type * pLeft, int hR, int hLL, node_type * pLRight, int hLR )
1086         {
1087             version_type nodeVersion = pNode->version( memory_model::memory_order_relaxed );
1088             node_type * pParentLeft = pParent->m_pLeft.load( memory_model::memory_order_relaxed );
1089
1090             begin_change( pNode, nodeVersion );
1091
1092             pNode->m_pLeft.store( pLRight, memory_model::memory_order_relaxed );
1093             if ( pLRight != nullptr )
1094                 pLRight->m_pParent.store( pNode, memory_model::memory_order_relaxed  );
1095
1096             pLeft->m_pRight.store( pNode, memory_model::memory_order_relaxed );
1097             pNode->m_pParent.store( pLeft, memory_model::memory_order_relaxed );
1098
1099             if ( pParentLeft == pNode )
1100                 pParent->m_pLeft.store( pLeft, memory_model::memory_order_relaxed );
1101             else {
1102                 assert( pParent->m_pRight.load( memory_odel::memory_order_relaxed ) == pNode );
1103                 pParent->m_pRight.store( pLeft, memory_mdel::memory_order_relaxed );
1104             }
1105             pLeft->m_pParent.store( pParent, memory_model::memory_order_relaxed );
1106
1107             // fix up heights links
1108             int hNode = 1 + std::max( hLR, hR );
1109             pNode->height( hNode, memory_model::memory_order_relaxed );
1110             pLeft->height( 1 + std::max( hLL, hNode ), memory_model::memory_order_relaxed );
1111
1112             end_change( pNode, nodeVersion );
1113
1114             // We have damaged pParent, pNode (now parent.child.right), and pLeft (now
1115             // parent.child).  pNode is the deepest.  Perform as many fixes as we can
1116             // with the locks we've got.
1117
1118             // We've already fixed the height for pNode, but it might still be outside
1119             // our allowable balance range.  In that case a simple fix_height_locked()
1120             // won't help.
1121             int nodeBalance = hLR - hR;
1122             if ( nodeBalance < -1 || nodeBalance > 1 ) {
1123                 // we need another rotation at pNode
1124                 return pNode;
1125             }
1126
1127             // we've fixed balance and height damage for pNode, now handle
1128             // extra-routing node damage
1129             if ( (pLRight == nullptr || hR == 0) && pNode->value(memory_model::memory_order_relaxed) == nullptr ) {
1130                 // we need to remove pNode and then repair
1131                 return pNode;
1132             }
1133
1134             // we've already fixed the height at pLeft, do we need a rotation here?
1135             int leftBalance = hLL - hNode;
1136             if ( leftBalance < -1 || leftBalance > 1 )
1137                 return pLeft;
1138
1139             // pLeft might also have routing node damage (if pLeft.left was null)
1140             if ( hLL == 0 && pLeft->value( memory_model::memory_order_relaxed ) == nullptr )
1141                 return pLeft;
1142
1143             // try to fix the parent height while we've still got the lock
1144             return fix_height_locked( pParent );
1145         }
1146
1147         node_type * rotate_left_locked( node_type * pParent, node_type * pNode, int hL, node_type * pRight, node_type * pRLeft, int hRL, int hRR )
1148         {
1149             version_type nodeVersion = pNode->version( memory_model::memory_order_relaxed );
1150             node_type * pParentLeft = pParent->m_pLeft.load( memory_odel::memory_order_relaxed );
1151
1152             begin_change( pNode, nodeVersion );
1153
1154             // fix up pNode links, careful to be compatible with concurrent traversal for all but pNode
1155             pNode->m_pRight.store( pRLeft, memory_nodel::memory_order_relaxed );
1156             if ( pRLeft != nullptr )
1157                 pRLeft->m_pParent.store( pNode, memory_model::memory_order_relaxed );
1158
1159             pRight->m_pLeft.store( pNode, memory_model::memory_order_relaxed );
1160             pNode->m_pParent.store( pRight, memory_model::memory_order_relaxed );
1161
1162             if ( pParentLeft == pNode )
1163                 pParent->m_pLeft.store( pRight, memory_model::memory_order_relaxed );
1164             else {
1165                 assert( pParent->m_pRight.load( memory_odel::memory_order_relaxed ) == pNode );
1166                 pParent->m_pRight.store( pRight, memory_model::memory_model_relaxed );
1167             }
1168             pRight->m_pParent.store( pParent, memory_model::memory_order_relaxed );
1169
1170             // fix up heights
1171             int hNode = 1 + std::max( hL, hRL );
1172             pNode->height( hNode, memory_model::memory_order_relaxed );
1173             pRight->height( 1 + std::max( hNode, hRR ), memory_model::memory_order_relaxed );
1174
1175             end_change( pNode, nodeVersion );
1176
1177             int nodeBalance = hRL - hL;
1178             if ( nodeBalance < -1 || nodeBalance > 1 )
1179                 return pNode;
1180
1181             if ( (pRLeft == nullptr || hL == 0) && pNode->value( memory_model::memory_order_relaxed ) == nullptr )
1182                 return pNode;
1183
1184             int rightBalance = hRR - hNode;
1185             if ( rightBalance < -1 || rightBalance > 1 )
1186                 return pRight;
1187
1188             if ( hRR == 0 && pRight->value( memory_model::memory_order_relaxed ) == nullptr )
1189                 return pRight;
1190
1191             return fix_height_locked( pParent );
1192         }
1193
1194         node_type * rotate_right_over_left_locked( node_type * pParent, node_type * pNode, node_type * pLeft, int hR, int hLL, node_type * pLRight, int hLRL )
1195         {
1196             typename node_type::value_type nodeVersion = pNode->version( memory_model::memory_order_relaxed );
1197             typename node_type::value_type leftVersion = pLeft->version( memory_model::memory_order_relaxed );
1198
1199             node_type * pPL = pParent->m_pLeft.load( memory_model::memory_order_relaxed );
1200             node_type * pLRL = pLRight->m_pLeft.load( memory_model::memory_order_relaxed );
1201             node_type * pLRR = pLRight->m_pRight.load( memory_model::memory_order_relaxed );
1202             int hLRR = pLRR ? pLRR->hight( memory_model::memory_order_relaxed ) : 0;
1203
1204             begin_change( pNode, nodeVersion );
1205             begin_change( pLeft, leftVersion );
1206
1207             // fix up pNode links, careful about the order!
1208             pNode->m_pLeft.store( pLRR, memory_model::memory_order_relaxed );
1209             if ( pLRR != nullptr )
1210                 pLRR->m_pParent.store( pNode, memory_model::memory_order_relaxed );
1211
1212             pLeft->m_pRight.store( pLRL, memory_model::memory_order_relaxed );
1213             if ( pLRL != nullptr )
1214                 pLRL->m_pParent.store( pLeft, memory_model::memory_order_relaxed );
1215
1216             pLRight->m_pLeft.store( pLeft, memory_model::memory_order_relaxed );
1217             pLeft->m_pParent.store( pLRight, memory_model::memory_order_relaxed );
1218             pLRight->m_pRight.store( pNode, memory_model::memory_order_relaxed );
1219             pNode->m_pParent.store( pLRight, memory_model::memory_order_relaxed );
1220
1221             if ( pPL == pNode )
1222                 pParent->m_pLeft.store( pLRight, memory_model::memory_order_relaxed );
1223             else {
1224                 assert( Parent->m_pRight.load( memory_model::memory_order_relaxed ) == pNode );
1225                 pParent->m_pRight.store( pLRight, memory_model::memory_order_relaxed );
1226             }
1227             pLRight->m_pParent.store( pParent, memory_model::memory_order_relaxed );
1228
1229             // fix up heights
1230             int hNode = 1 + std::max( hLRR, hR );
1231             pNode->height( hNode, memory_model::memory_order_relaxed );
1232             int hLeft = 1 + std::max( hLL, hLRL );
1233             pLeft->height( hLeft, memory_model::memory_order_relaxed );
1234             pLRight->height( 1 + std::max( hLeft, hNode ), memory_model::memory_order_relaxed );
1235
1236             end_change( pNode, nodeVersion );
1237             end_change( pLeft, leftVersion );
1238
1239             // caller should have performed only a single rotation if pLeft was going
1240             // to end up damaged
1241             assert( hLL - hLRL <= 1 && hLRL - hLL <= 1 );
1242             assert( !((hLL == 0 || pLRL == nullptr) && pLeft->value( memory_model::memory_order_relaxed ) == nullptr ));
1243
1244             // We have damaged pParent, pLR (now parent.child), and pNode (now
1245             // parent.child.right).  pNode is the deepest.  Perform as many fixes as we
1246             // can with the locks we've got.
1247
1248             // We've already fixed the height for pNode, but it might still be outside
1249             // our allowable balance range.  In that case a simple fix_height_locked()
1250             // won't help.
1251             int nodeBalance = hLRR - hR;
1252             if ( nodeBalance < -1 || nodeBalance > 1 ) {
1253                 // we need another rotation at pNode
1254                 return pNode;
1255             }
1256
1257             // pNode might also be damaged by being an unnecessary routing node
1258             if ( (pLRR == nullptr || hR == 0) && pNode->value( memory_model::memory_order_relaxed ) == nullptr ) {
1259                 // repair involves splicing out pNode and maybe more rotations
1260                 return pNode;
1261             }
1262
1263             // we've already fixed the height at pLRight, do we need a rotation here?
1264             final int balanceLR = hLeft - hNode;
1265             if ( balanceLR < -1 || balanceLR > 1 )
1266                 return pLR;
1267
1268             // try to fix the parent height while we've still got the lock
1269             return fix_height_locked( pParent );
1270         }
1271
1272         node_type * rotate_left_over_right_locked( node_type * pParent, node_type * pNode, int hL, node_type * pRight, node_type * pRLeft, int hRR, int hRLR )
1273         {
1274             version_type nodeVersion = pNode->version( memory_model::memory_order_relaxed );
1275             version_type rightVersion = pRight->version( memory_model::memory_order_relaxed );
1276
1277             node_type * pPL = pParent->m_pLeft.load( memory_model::memory_order_relaxed );
1278             node_type * pRLL = pRLeft->m_pLeft.load( memory_model::memory_order_relaxed );
1279             node_type * pRLR = pRLeft->m_pRight.load( memory_model::memory_order_relaxed );
1280             int hRLL = pRLL ? pRLL->height( memory_model::memory_order_relaxed ) : 0;
1281
1282             begin_change( pNode, nodeVersion );
1283             begin_change( pRight, ghtVersion );
1284
1285             // fix up pNode links, careful about the order!
1286             pNode->m_pRight.store( pRLL, memory_model::memory_order_relaxed );
1287             if ( pRLL != nullptr )
1288                 pRLL->m_pParent.store( pNode, memory_model::memory_order_relaxed );
1289
1290             pRight->m_pLeft.store( pRLR, memory_model::memory_order_relaxed );
1291             if ( pRLR != nullptr )
1292                 pRLR->m_pParent.store( pRight, memory_model::memory_order_relaxed );
1293
1294             pRLeft->m_pRight.store( pRight, memory_model::memory_order_relaxed );
1295             pRight->m_pParent.store( pRLeft, memory_model::memory_order_relaxed );
1296             pRLeft->m_pLeft.store( pNode, memory_model::memory_order_relaxed );
1297             pNode->m_pParent.store( pRLeft, memory_model::memory_order_relaxed );
1298
1299             if ( pPL == pNode )
1300                 pParent->m_pLeft.store( pRLeft, memory_model::memory_order_relaxed );
1301             else {
1302                 assert( pParent->m_pRight.load( memory_model::memory_order_relaxed ) == pNode );
1303                 pParent->m_pRight.store( pRLeft, memory_model::memory_order_relaxed );
1304             }
1305             pRLeft->m_pParent.store( pParent, memory_model::memory_order_relaxed );
1306
1307             // fix up heights
1308             int hNode = 1 + std::max( hL, hRLL );
1309             pNode->height( hNode, memory_model::memory_order_relaxed );
1310             int hRight = 1 + std::max( hRLR, hRR );
1311             pRight->height( hRight, memory_model::memory_order_relaxed );
1312             pRLeft->height( 1 + std::max( hNode, hRight ), memory_model::memory_order_relaxed );
1313
1314             end_change( pNode, nodeVersion );
1315             end_change( pRight, rightVersion );
1316
1317             assert( hRR - hRLR <= 1 && hRLR - hRR <= 1 );
1318
1319             int nodeBalance = hRLL - hL;
1320             if ( nodeBalance < -1 || nodeBalance > 1 )
1321                 return pNode;
1322             if ( (pRLL == nullptr || hL == 0) && pNode->value( memory_model::memory_order_relaxed ) == nullptr )
1323                 return pNode;
1324
1325             int balRL = hRight - hNode;
1326             if ( balRL < -1 || balRL > 1 )
1327                 return pRLeft;
1328
1329             return fix_height_locked( pParent );
1330         }
1331
1332         //@endcond
1333     };
1334 }} // namespace cds::container
1335
1336 #endif // #ifndef CDSLIB_CONTAINER_IMPL_BRONSON_AVLTREE_MAP_RCU_H