SkipListMap/Set refactoring
[libcds.git] / cds / intrusive / skip_list_rcu.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_SKIP_LIST_RCU_H
4 #define __CDS_INTRUSIVE_SKIP_LIST_RCU_H
5
6 #include <type_traits>
7 #include <memory>
8 #include <cds/intrusive/details/skip_list_base.h>
9 #include <cds/opt/compare.h>
10 #include <cds/urcu/details/check_deadlock.h>
11 #include <cds/details/binary_functor_wrapper.h>
12 #include <cds/urcu/exempt_ptr.h>
13
14
15 namespace cds { namespace intrusive {
16
17     //@cond
18     namespace skip_list {
19
20         template <class RCU, typename Tag>
21         class node< cds::urcu::gc< RCU >, Tag >
22         {
23         public:
24             typedef cds::urcu::gc< RCU >    gc; ///< Garbage collector
25             typedef Tag     tag               ; ///< tag
26
27             // Mark bits:
28             //  bit 0 - the item is logically deleted
29             //  bit 1 - the item is extracted (only for level 0)
30             typedef cds::details::marked_ptr<node, 3> marked_ptr;        ///< marked pointer
31             typedef atomics::atomic< marked_ptr >     atomic_marked_ptr; ///< atomic marked pointer
32             typedef atomic_marked_ptr                 tower_item_type;
33
34         protected:
35             atomic_marked_ptr       m_pNext;     ///< Next item in bottom-list (list at level 0)
36         public:
37             node *                  m_pDelChain; ///< Deleted node chain (local for a thread)
38 #       ifdef _DEBUG
39             bool volatile           m_bLinked;
40             bool volatile           m_bUnlinked;
41 #       endif
42         protected:
43             unsigned int            m_nHeight;   ///< Node height (size of m_arrNext array). For node at level 0 the height is 1.
44             atomic_marked_ptr *     m_arrNext;   ///< Array of next items for levels 1 .. m_nHeight - 1. For node at level 0 \p m_arrNext is \p nullptr
45
46         public:
47             /// Constructs a node of height 1 (a bottom-list node)
48             CDS_CONSTEXPR node()
49                 : m_pNext( nullptr )
50                 , m_pDelChain( nullptr )
51 #       ifdef _DEBUG
52                 , m_bLinked( false )
53                 , m_bUnlinked( false )
54 #       endif
55                 , m_nHeight(1)
56                 , m_arrNext( nullptr )
57             {}
58
59 #       ifdef _DEBUG
60             ~node()
61             {
62                 assert( !m_bLinked || m_bUnlinked );
63             }
64 #       endif
65
66             /// Constructs a node of height \p nHeight
67             void make_tower( unsigned int nHeight, atomic_marked_ptr * nextTower )
68             {
69                 assert( nHeight > 0 );
70                 assert( (nHeight == 1 && nextTower == nullptr)  // bottom-list node
71                         || (nHeight > 1 && nextTower != nullptr)   // node at level of more than 0
72                     );
73
74                 m_arrNext = nextTower;
75                 m_nHeight = nHeight;
76             }
77
78             atomic_marked_ptr * release_tower()
79             {
80                 atomic_marked_ptr * pTower = m_arrNext;
81                 m_arrNext = nullptr;
82                 m_nHeight = 1;
83                 return pTower;
84             }
85
86             atomic_marked_ptr * get_tower() const
87             {
88                 return m_arrNext;
89             }
90
91             void clear_tower()
92             {
93                 for ( unsigned int nLevel = 1; nLevel < m_nHeight; ++nLevel )
94                     next(nLevel).store( marked_ptr(), atomics::memory_order_relaxed );
95             }
96
97             /// Access to element of next pointer array
98             atomic_marked_ptr& next( unsigned int nLevel )
99             {
100                 assert( nLevel < height() );
101                 assert( nLevel == 0 || (nLevel > 0 && m_arrNext != nullptr) );
102
103                 return nLevel ? m_arrNext[nLevel - 1] : m_pNext;
104             }
105
106             /// Access to element of next pointer array (const version)
107             atomic_marked_ptr const& next( unsigned int nLevel ) const
108             {
109                 assert( nLevel < height() );
110                 assert( nLevel == 0 || nLevel > 0 && m_arrNext != nullptr );
111
112                 return nLevel ? m_arrNext[nLevel - 1] : m_pNext;
113             }
114
115             /// Access to element of next pointer array (same as \ref next function)
116             atomic_marked_ptr& operator[]( unsigned int nLevel )
117             {
118                 return next( nLevel );
119             }
120
121             /// Access to element of next pointer array (same as \ref next function)
122             atomic_marked_ptr const& operator[]( unsigned int nLevel ) const
123             {
124                 return next( nLevel );
125             }
126
127             /// Height of the node
128             unsigned int height() const
129             {
130                 return m_nHeight;
131             }
132
133             /// Clears internal links
134             void clear()
135             {
136                 assert( m_arrNext == nullptr );
137                 m_pNext.store( marked_ptr(), atomics::memory_order_release );
138                 m_pDelChain = nullptr;
139             }
140
141             bool is_cleared() const
142             {
143                 return m_pNext == atomic_marked_ptr()
144                     && m_arrNext == nullptr
145                     && m_nHeight <= 1;
146             }
147         };
148     } // namespace skip_list
149     //@endcond
150
151     //@cond
152     namespace skip_list { namespace details {
153
154         template <class RCU, typename NodeTraits, typename BackOff, bool IsConst>
155         class iterator< cds::urcu::gc< RCU >, NodeTraits, BackOff, IsConst >
156         {
157         public:
158             typedef cds::urcu::gc< RCU >                gc;
159             typedef NodeTraits                          node_traits;
160             typedef BackOff                             back_off;
161             typedef typename node_traits::node_type     node_type;
162             typedef typename node_traits::value_type    value_type;
163             static bool const c_isConst = IsConst;
164
165             typedef typename std::conditional< c_isConst, value_type const &, value_type &>::type   value_ref;
166
167         protected:
168             typedef typename node_type::marked_ptr          marked_ptr;
169             typedef typename node_type::atomic_marked_ptr   atomic_marked_ptr;
170
171             node_type *             m_pNode;
172
173         protected:
174             void next()
175             {
176                 // RCU should be locked before iterating!!!
177                 assert( gc::is_locked() );
178
179                 back_off bkoff;
180
181                 for (;;) {
182                     if ( m_pNode->next( m_pNode->height() - 1 ).load( atomics::memory_order_acquire ).bits() ) {
183                         // Current node is marked as deleted. So, its next pointer can point to anything
184                         // In this case we interrupt our iteration and returns end() iterator.
185                         *this = iterator();
186                         return;
187                     }
188
189                     marked_ptr p = m_pNode->next(0).load( atomics::memory_order_relaxed );
190                     node_type * pp = p.ptr();
191                     if ( p.bits() ) {
192                         // p is marked as deleted. Spin waiting for physical removal
193                         bkoff();
194                         continue;
195                     }
196                     else if ( pp && pp->next( pp->height() - 1 ).load( atomics::memory_order_relaxed ).bits() ) {
197                         // p is marked as deleted. Spin waiting for physical removal
198                         bkoff();
199                         continue;
200                     }
201
202                     m_pNode = pp;
203                     break;
204                 }
205             }
206
207         public: // for internal use only!!!
208             iterator( node_type& refHead )
209                 : m_pNode( nullptr )
210             {
211                 // RCU should be locked before iterating!!!
212                 assert( gc::is_locked() );
213
214                 back_off bkoff;
215
216                 for (;;) {
217                     marked_ptr p = refHead.next(0).load( atomics::memory_order_relaxed );
218                     if ( !p.ptr() ) {
219                         // empty skip-list
220                         break;
221                     }
222
223                     node_type * pp = p.ptr();
224                     // Logically deleted node is marked from highest level
225                     if ( !pp->next( pp->height() - 1 ).load( atomics::memory_order_acquire ).bits() ) {
226                         m_pNode = pp;
227                         break;
228                     }
229
230                     bkoff();
231                 }
232             }
233
234         public:
235             iterator()
236                 : m_pNode( nullptr )
237             {
238                 // RCU should be locked before iterating!!!
239                 assert( gc::is_locked() );
240             }
241
242             iterator( iterator const& s)
243                 : m_pNode( s.m_pNode )
244             {
245                 // RCU should be locked before iterating!!!
246                 assert( gc::is_locked() );
247             }
248
249             value_type * operator ->() const
250             {
251                 assert( m_pNode != nullptr );
252                 assert( node_traits::to_value_ptr( m_pNode ) != nullptr );
253
254                 return node_traits::to_value_ptr( m_pNode );
255             }
256
257             value_ref operator *() const
258             {
259                 assert( m_pNode != nullptr );
260                 assert( node_traits::to_value_ptr( m_pNode ) != nullptr );
261
262                 return *node_traits::to_value_ptr( m_pNode );
263             }
264
265             /// Pre-increment
266             iterator& operator ++()
267             {
268                 next();
269                 return *this;
270             }
271
272             iterator& operator = (const iterator& src)
273             {
274                 m_pNode = src.m_pNode;
275                 return *this;
276             }
277
278             template <typename Bkoff, bool C>
279             bool operator ==(iterator<gc, node_traits, Bkoff, C> const& i ) const
280             {
281                 return m_pNode == i.m_pNode;
282             }
283             template <typename Bkoff, bool C>
284             bool operator !=(iterator<gc, node_traits, Bkoff, C> const& i ) const
285             {
286                 return !( *this == i );
287             }
288         };
289     }}  // namespace skip_list::details
290     //@endcond
291
292     /// Lock-free skip-list set (template specialization for \ref cds_urcu_desc "RCU")
293     /** @ingroup cds_intrusive_map
294         @anchor cds_intrusive_SkipListSet_rcu
295
296         The implementation of well-known probabilistic data structure called skip-list
297         invented by W.Pugh in his papers:
298             - [1989] W.Pugh Skip Lists: A Probabilistic Alternative to Balanced Trees
299             - [1990] W.Pugh A Skip List Cookbook
300
301         A skip-list is a probabilistic data structure that provides expected logarithmic
302         time search without the need of rebalance. The skip-list is a collection of sorted
303         linked list. Nodes are ordered by key. Each node is linked into a subset of the lists.
304         Each list has a level, ranging from 0 to 32. The bottom-level list contains
305         all the nodes, and each higher-level list is a sublist of the lower-level lists.
306         Each node is created with a random top level (with a random height), and belongs
307         to all lists up to that level. The probability that a node has the height 1 is 1/2.
308         The probability that a node has the height N is 1/2 ** N (more precisely,
309         the distribution depends on an random generator provided, but our generators
310         have this property).
311
312         The lock-free variant of skip-list is implemented according to book
313             - [2008] M.Herlihy, N.Shavit "The Art of Multiprocessor Programming",
314                 chapter 14.4 "A Lock-Free Concurrent Skiplist".
315
316         <b>Template arguments</b>:
317             - \p RCU - one of \ref cds_urcu_gc "RCU type"
318             - \p T - type to be stored in the list. The type must be based on \p skip_list::node (for \p skip_list::base_hook)
319                 or it must have a member of type \p skip_list::node (for \p skip_list::member_hook).
320             - \p Traits - set traits, default is \p skip_list::type_traits
321                 It is possible to declare option-based list with \p cds::intrusive::skip_list::make_traits metafunction 
322                 instead of \p Traits template argument.
323
324         @note Before including <tt><cds/intrusive/skip_list_rcu.h></tt> you should include appropriate RCU header file,
325         see \ref cds_urcu_gc "RCU type" for list of existing RCU class and corresponding header files.
326
327         <b>Iterators</b>
328
329         The class supports a forward iterator (\ref iterator and \ref const_iterator).
330         The iteration is ordered.
331
332         You may iterate over skip-list set items only under RCU lock.
333         Only in this case the iterator is thread-safe since
334         while RCU is locked any set's item cannot be reclaimed.
335
336         @note The requirement of RCU lock during iterating means that any type of modification of the skip list
337         (i.e. inserting, erasing and so on) is not possible.
338
339         @warning The iterator object cannot be passed between threads.
340
341         Example how to use skip-list set iterators:
342         \code
343         // First, you should include the header for RCU type you have chosen
344         #include <cds/urcu/general_buffered.h>
345         #include <cds/intrusive/skip_list_rcu.h>
346
347         typedef cds::urcu::gc< cds::urcu::general_buffered<> > rcu_type;
348
349         struct Foo {
350             // ...
351         };
352
353         // Traits for your skip-list.
354         // At least, you should define cds::opt::less or cds::opt::compare for Foo struct
355         struct my_traits: public cds::intrusive::skip_list::traits
356         {
357             // ...
358         };
359         typedef cds::intrusive::SkipListSet< rcu_type, Foo, my_traits > my_skiplist_set;
360
361         my_skiplist_set theSet;
362
363         // ...
364
365         // Begin iteration
366         {
367             // Apply RCU locking manually
368             typename rcu_type::scoped_lock sl;
369
370             for ( auto it = theList.begin(); it != theList.end(); ++it ) {
371                 // ...
372             }
373
374             // rcu_type::scoped_lock destructor releases RCU lock implicitly
375         }
376         \endcode
377
378         The iterator class supports the following minimalistic interface:
379         \code
380         struct iterator {
381             // Default ctor
382             iterator();
383
384             // Copy ctor
385             iterator( iterator const& s);
386
387             value_type * operator ->() const;
388             value_type& operator *() const;
389
390             // Pre-increment
391             iterator& operator ++();
392
393             // Copy assignment
394             iterator& operator = (const iterator& src);
395
396             bool operator ==(iterator const& i ) const;
397             bool operator !=(iterator const& i ) const;
398         };
399         \endcode
400         Note, the iterator object returned by \ref end, \p cend member functions points to \p nullptr and should not be dereferenced.
401
402         <b>How to use</b>
403
404         You should incorporate skip_list::node into your struct \p T and provide
405         appropriate skip_list::traits::hook in your \p Traits template parameters. Usually, for \p Traits you
406         define a struct based on \p skip_list::traits.
407
408         Example for <tt>cds::urcu::general_buffered<></tt> RCU and base hook:
409         \code
410         // First, you should include the header for RCU type you have chosen
411         #include <cds/urcu/general_buffered.h>
412
413         // Include RCU skip-list specialization
414         #include <cds/intrusive/skip_list_rcu.h>
415
416         // RCU type typedef
417         typedef cds::urcu::gc< cds::urcu::general_buffered<> > rcu_type;
418
419         // Data stored in skip list
420         struct my_data: public cds::intrusive::skip_list::node< rcu_type >
421         {
422             // key field
423             std::string     strKey;
424
425             // other data
426             // ...
427         };
428
429         // my_data compare functor
430         struct my_data_cmp {
431             int operator()( const my_data& d1, const my_data& d2 )
432             {
433                 return d1.strKey.compare( d2.strKey );
434             }
435
436             int operator()( const my_data& d, const std::string& s )
437             {
438                 return d.strKey.compare(s);
439             }
440
441             int operator()( const std::string& s, const my_data& d )
442             {
443                 return s.compare( d.strKey );
444             }
445         };
446
447         // Declare traits
448         struct my_traits: public cds::intrusive::skip_list::traits
449         {
450             typedef cds::intrusive::skip_list::base_hook< cds::opt::gc< rcu_type > >   hook;
451             typedef my_data_cmp compare;
452         };
453
454         // Declare skip-list set type
455         typedef cds::intrusive::SkipListSet< rcu_type, my_data, my_traits >     traits_based_set;
456         \endcode
457
458         Equivalent option-based code:
459         \code
460         #include <cds/urcu/general_buffered.h>
461         #include <cds/intrusive/skip_list_rcu.h>
462
463         typedef cds::urcu::gc< cds::urcu::general_buffered<> > rcu_type;
464
465         struct my_data {
466             // see above
467         };
468         struct compare {
469             // see above
470         };
471
472         // Declare option-based skip-list set
473         typedef cds::intrusive::SkipListSet< rcu_type
474             ,my_data
475             , typename cds::intrusive::skip_list::make_traits<
476                 cds::intrusive::opt::hook< cds::intrusive::skip_list::base_hook< cds::opt::gc< rcu_type > > >
477                 ,cds::intrusive::opt::compare< my_data_cmp >
478             >::type
479         > option_based_set;
480
481         \endcode
482     */
483     template <
484         class RCU
485        ,typename T
486 #ifdef CDS_DOXYGEN_INVOKED
487        ,typename Traits = skip_list::traits
488 #else
489        ,typename Traits
490 #endif
491     >
492     class SkipListSet< cds::urcu::gc< RCU >, T, Traits >
493     {
494     public:
495         typedef cds::urcu::gc< RCU > gc; ///< Garbage collector
496         typedef T       value_type;      ///< type of value stored in the skip-list
497         typedef Traits  traits;          ///< Traits template parameter
498
499         typedef typename traits::hook    hook;      ///< hook type
500         typedef typename hook::node_type node_type; ///< node type
501
502 #   ifdef CDS_DOXYGEN_INVOKED
503         typedef implementation_defined key_comparator  ;    ///< key comparison functor based on \p Traits::compare and \p Traits::less
504 #   else
505         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
506 #   endif
507
508         typedef typename traits::disposer  disposer;   ///< disposer
509         typedef typename get_node_traits< value_type, node_type, hook>::type node_traits;    ///< node traits
510
511         typedef typename traits::item_counter  item_counter;   ///< Item counting policy used
512         typedef typename traits::memory_model  memory_model;   ///< Memory ordering, see \p cds::opt::memory_model option
513         typedef typename traits::random_level_generator    random_level_generator;   ///< random level generator
514         typedef typename traits::allocator     allocator_type; ///< allocator for maintaining array of next pointers of the node
515         typedef typename traits::back_off      back_off;       ///< Back-off strategy
516         typedef typename traits::stat          stat;           ///< internal statistics type
517         typedef typename traits::rcu_check_deadlock rcu_check_deadlock; ///< Deadlock checking policy
518         typedef typename gc::scoped_lock       rcu_lock;      ///< RCU scoped lock
519         static CDS_CONSTEXPR const bool c_bExtractLockExternal = false; ///< Group of \p extract_xxx functions does not require external locking
520
521
522         /// Max node height. The actual node height should be in range <tt>[0 .. c_nMaxHeight)</tt>
523         /**
524             The max height is specified by \ref skip_list::random_level_generator "random level generator" constant \p m_nUpperBound
525             but it should be no more than 32 (\ref skip_list::c_nHeightLimit).
526         */
527         static unsigned int const c_nMaxHeight = std::conditional<
528             (random_level_generator::c_nUpperBound <= skip_list::c_nHeightLimit),
529             std::integral_constant< unsigned int, random_level_generator::c_nUpperBound >,
530             std::integral_constant< unsigned int, skip_list::c_nHeightLimit >
531         >::type::value;
532
533         //@cond
534         static unsigned int const c_nMinHeight = 5;
535         //@endcond
536
537     protected:
538         typedef typename node_type::atomic_marked_ptr   atomic_node_ptr ;   ///< Atomic marked node pointer
539         typedef typename node_type::marked_ptr          marked_node_ptr ;   ///< Node marked pointer
540
541     protected:
542         //@cond
543         typedef skip_list::details::intrusive_node_builder< node_type, atomic_node_ptr, allocator_type > intrusive_node_builder;
544
545         typedef typename std::conditional<
546             std::is_same< typename traits::internal_node_builder, cds::opt::none >::value
547             ,intrusive_node_builder
548             ,typename traits::internal_node_builder
549         >::type node_builder;
550
551         typedef std::unique_ptr< node_type, typename node_builder::node_disposer >    scoped_node_ptr;
552
553         struct position {
554             node_type *   pPrev[ c_nMaxHeight ];
555             node_type *   pSucc[ c_nMaxHeight ];
556             node_type *   pNext[ c_nMaxHeight ];
557
558             node_type *   pCur;
559             node_type *   pDelChain;
560
561             position()
562                 : pDelChain( nullptr )
563             {}
564 #       ifdef _DEBUG
565             ~position()
566             {
567                 assert( pDelChain == nullptr );
568             }
569 #       endif
570         };
571
572         typedef cds::urcu::details::check_deadlock_policy< gc, rcu_check_deadlock>   check_deadlock_policy;
573         //@endcond
574
575     protected:
576         skip_list::details::head_node< node_type > m_Head;   ///< head tower (max height)
577
578         item_counter                m_ItemCounter;      ///< item counter
579         random_level_generator      m_RandomLevelGen;   ///< random level generator instance
580         atomics::atomic<unsigned int>    m_nHeight;     ///< estimated high level
581         atomics::atomic<node_type *>     m_pDeferredDelChain ;   ///< Deferred deleted node chain
582         mutable stat                m_Stat;             ///< internal statistics
583
584     protected:
585         //@cond
586         unsigned int random_level()
587         {
588             // Random generator produces a number from range [0..31]
589             // We need a number from range [1..32]
590             return m_RandomLevelGen() + 1;
591         }
592
593         template <typename Q>
594         node_type * build_node( Q v )
595         {
596             return node_builder::make_tower( v, m_RandomLevelGen );
597         }
598
599         static void dispose_node( value_type * pVal )
600         {
601             assert( pVal );
602
603             typename node_builder::node_disposer()( node_traits::to_node_ptr(pVal) );
604             disposer()( pVal );
605         }
606
607         struct node_disposer
608         {
609             void operator()( value_type * pVal )
610             {
611                 dispose_node( pVal );
612             }
613         };
614         //@endcond
615
616     public:
617         typedef cds::urcu::exempt_ptr< gc, value_type, value_type, node_disposer, void > exempt_ptr; ///< pointer to extracted node
618
619     protected:
620         //@cond
621
622         bool is_extracted( marked_node_ptr const p ) const
623         {
624             return (p.bits() & 2) != 0;
625         }
626
627         template <typename Q, typename Compare >
628         bool find_position( Q const& val, position& pos, Compare cmp, bool bStopIfFound )
629         {
630             assert( gc::is_locked() );
631
632             node_type * pPred;
633             marked_node_ptr pSucc;
634             marked_node_ptr pCur;
635             int nCmp = 1;
636
637         retry:
638             pPred = m_Head.head();
639
640             for ( int nLevel = static_cast<int>(c_nMaxHeight - 1); nLevel >= 0; --nLevel ) {
641
642                 while ( true ) {
643                     pCur = pPred->next( nLevel ).load( memory_model::memory_order_relaxed );
644                     if ( pCur.bits() ) {
645                         // pCur.bits() means that pPred is logically deleted
646                         goto retry;
647                     }
648
649                     if ( pCur.ptr() == nullptr ) {
650                         // end of the list at level nLevel - goto next level
651                         break;
652                     }
653
654                     // pSucc contains deletion mark for pCur
655                     pSucc = pCur->next( nLevel ).load( memory_model::memory_order_relaxed );
656
657                     if ( pPred->next( nLevel ).load( memory_model::memory_order_relaxed ).all() != pCur.ptr() )
658                         goto retry;
659
660                     if ( pSucc.bits() ) {
661                         // pCur is marked, i.e. logically deleted.
662                         marked_node_ptr p( pCur.ptr() );
663                         if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ),
664                              memory_model::memory_order_release, atomics::memory_order_relaxed ))
665                         {
666                             if ( nLevel == 0 ) {
667 #                       ifdef _DEBUG
668                                 pCur->m_bUnlinked = true;
669 #                       endif
670
671                                 if ( !is_extracted( pSucc )) {
672                                     // We cannot free the node at this moment since RCU is locked
673                                     // Link deleted nodes to a chain to free later
674                                     link_for_remove( pos, pCur.ptr() );
675                                     m_Stat.onEraseWhileFind();
676                                 }
677                                 else {
678                                     m_Stat.onExtractWhileFind();
679                                 }
680                             }
681                         }
682                         goto retry;
683                     }
684                     else {
685                         nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr()), val );
686                         if ( nCmp < 0 )
687                             pPred = pCur.ptr();
688                         else if ( nCmp == 0 && bStopIfFound )
689                             goto found;
690                         else
691                             break;
692                     }
693                 }
694
695                 // Next level
696                 pos.pPrev[ nLevel ] = pPred;
697                 pos.pSucc[ nLevel ] = pCur.ptr();
698             }
699
700             if ( nCmp != 0 )
701                 return false;
702
703         found:
704             pos.pCur = pCur.ptr();
705             return pCur.ptr() && nCmp == 0;
706         }
707
708         bool find_min_position( position& pos )
709         {
710             assert( gc::is_locked() );
711
712             node_type * pPred;
713             marked_node_ptr pSucc;
714             marked_node_ptr pCur;
715
716         retry:
717             pPred = m_Head.head();
718
719             for ( int nLevel = static_cast<int>(c_nMaxHeight - 1); nLevel >= 0; --nLevel ) {
720
721                 pCur = pPred->next( nLevel ).load( memory_model::memory_order_relaxed );
722                 // pCur.bits() means that pPred is logically deleted
723                 // head cannot be deleted
724                 assert( pCur.bits() == 0 );
725
726                 if ( pCur.ptr() ) {
727
728                     // pSucc contains deletion mark for pCur
729                     pSucc = pCur->next( nLevel ).load( memory_model::memory_order_relaxed );
730
731                     if ( pPred->next( nLevel ).load( memory_model::memory_order_relaxed ).all() != pCur.ptr() )
732                         goto retry;
733
734                     if ( pSucc.bits() ) {
735                         // pCur is marked, i.e. logically deleted.
736                         marked_node_ptr p( pCur.ptr() );
737                         if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ),
738                             memory_model::memory_order_release, atomics::memory_order_relaxed ))
739                         {
740                             if ( nLevel == 0 ) {
741 #                       ifdef _DEBUG
742                                 pCur->m_bUnlinked = true;
743 #                       endif
744
745                                 if ( !is_extracted( pSucc )) {
746                                     // We cannot free the node at this moment since RCU is locked
747                                     // Link deleted nodes to a chain to free later
748                                     link_for_remove( pos, pCur.ptr() );
749                                     m_Stat.onEraseWhileFind();
750                                 }
751                                 else {
752                                     m_Stat.onExtractWhileFind();
753                                 }
754                             }
755                         }
756                         goto retry;
757                     }
758                 }
759
760                 // Next level
761                 pos.pPrev[ nLevel ] = pPred;
762                 pos.pSucc[ nLevel ] = pCur.ptr();
763             }
764             return (pos.pCur = pCur.ptr()) != nullptr;
765         }
766
767         bool find_max_position( position& pos )
768         {
769             assert( gc::is_locked() );
770
771             node_type * pPred;
772             marked_node_ptr pSucc;
773             marked_node_ptr pCur;
774
775 retry:
776             pPred = m_Head.head();
777
778             for ( int nLevel = static_cast<int>(c_nMaxHeight - 1); nLevel >= 0; --nLevel ) {
779
780                 while ( true ) {
781                     pCur = pPred->next( nLevel ).load( memory_model::memory_order_relaxed );
782                     if ( pCur.bits() ) {
783                         // pCur.bits() means that pPred is logically deleted
784                         goto retry;
785                     }
786
787                     if ( pCur.ptr() == nullptr ) {
788                         // end of the list at level nLevel - goto next level
789                         break;
790                     }
791
792                     // pSucc contains deletion mark for pCur
793                     pSucc = pCur->next( nLevel ).load( memory_model::memory_order_relaxed );
794
795                     if ( pPred->next( nLevel ).load( memory_model::memory_order_relaxed ).all() != pCur.ptr() )
796                         goto retry;
797
798                     if ( pSucc.bits() ) {
799                         // pCur is marked, i.e. logically deleted.
800                         marked_node_ptr p( pCur.ptr() );
801                         if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ),
802                             memory_model::memory_order_release, atomics::memory_order_relaxed ))
803                         {
804                             if ( nLevel == 0 ) {
805 #                       ifdef _DEBUG
806                                 pCur->m_bUnlinked = true;
807 #                       endif
808
809                                 if ( !is_extracted( pSucc )) {
810                                     // We cannot free the node at this moment since RCU is locked
811                                     // Link deleted nodes to a chain to free later
812                                     link_for_remove( pos, pCur.ptr() );
813                                     m_Stat.onEraseWhileFind();
814                                 }
815                                 else {
816                                     m_Stat.onExtractWhileFind();
817                                 }
818                             }
819                         }
820                         goto retry;
821                     }
822                     else {
823                         if ( !pSucc.ptr() )
824                             break;
825
826                         pPred = pCur.ptr();
827                     }
828                 }
829
830                 // Next level
831                 pos.pPrev[ nLevel ] = pPred;
832                 pos.pSucc[ nLevel ] = pCur.ptr();
833             }
834
835             return (pos.pCur = pCur.ptr()) != nullptr;
836         }
837
838         template <typename Func>
839         bool insert_at_position( value_type& val, node_type * pNode, position& pos, Func f )
840         {
841             assert( gc::is_locked() );
842
843             unsigned int nHeight = pNode->height();
844             pNode->clear_tower();
845
846             {
847                 marked_node_ptr p( pos.pSucc[0] );
848                 pNode->next( 0 ).store( p, memory_model::memory_order_release );
849                 if ( !pos.pPrev[0]->next(0).compare_exchange_strong( p, marked_node_ptr(pNode), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
850                     return false;
851                 }
852 #       ifdef _DEBUG
853                 pNode->m_bLinked = true;
854 #       endif
855                 f( val );
856             }
857
858             for ( unsigned int nLevel = 1; nLevel < nHeight; ++nLevel ) {
859                 marked_node_ptr p;
860                 while ( true ) {
861                     marked_node_ptr q( pos.pSucc[ nLevel ]);
862                     if ( !pNode->next( nLevel ).compare_exchange_strong( p, q, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
863                         // pNode has been marked as removed while we are inserting it
864                         // Stop inserting
865                         assert( p.bits() );
866                         m_Stat.onLogicDeleteWhileInsert();
867                         return true;
868                     }
869                     p = q;
870                     if ( pos.pPrev[nLevel]->next(nLevel).compare_exchange_strong( q, marked_node_ptr( pNode ), memory_model::memory_order_release, atomics::memory_order_relaxed ) )
871                         break;
872
873                     // Renew insert position
874                     m_Stat.onRenewInsertPosition();
875                     if ( !find_position( val, pos, key_comparator(), false )) {
876                         // The node has been deleted while we are inserting it
877                         m_Stat.onNotFoundWhileInsert();
878                         return true;
879                     }
880                 }
881             }
882             return true;
883         }
884
885         static void link_for_remove( position& pos, node_type * pDel )
886         {
887             assert( pDel->m_pDelChain == nullptr );
888
889             pDel->m_pDelChain = pos.pDelChain;
890             pos.pDelChain = pDel;
891         }
892
893         template <typename Func>
894         bool try_remove_at( node_type * pDel, position& pos, Func f, bool bExtract )
895         {
896             assert( pDel != nullptr );
897             assert( gc::is_locked() );
898
899             marked_node_ptr pSucc;
900
901             // logical deletion (marking)
902             for ( unsigned int nLevel = pDel->height() - 1; nLevel > 0; --nLevel ) {
903                 pSucc = pDel->next(nLevel).load( memory_model::memory_order_relaxed );
904                 while ( true ) {
905                     if ( pSucc.bits()
906                       || pDel->next(nLevel).compare_exchange_weak( pSucc, pSucc | 1, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
907                     {
908                         break;
909                     }
910                 }
911             }
912
913             pSucc = pDel->next(0).load( memory_model::memory_order_relaxed );
914             while ( true ) {
915                 if ( pSucc.bits() )
916                     return false;
917
918                 int const nMask = bExtract ? 3 : 1;
919                 if ( pDel->next(0).compare_exchange_strong( pSucc, pSucc | nMask, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
920                 {
921                     f( *node_traits::to_value_ptr( pDel ));
922
923                     // physical deletion
924                     // try fast erase
925                     pSucc = pDel;
926                     for ( int nLevel = static_cast<int>( pDel->height() - 1 ); nLevel >= 0; --nLevel ) {
927                         if ( !pos.pPrev[nLevel]->next(nLevel).compare_exchange_strong( pSucc,
928                             marked_node_ptr( pDel->next(nLevel).load(memory_model::memory_order_relaxed).ptr() ),
929                             memory_model::memory_order_release, atomics::memory_order_relaxed) )
930                         {
931                             // Do slow erase
932                             find_position( *node_traits::to_value_ptr(pDel), pos, key_comparator(), false );
933                             if ( bExtract )
934                                 m_Stat.onSlowExtract();
935                             else
936                                 m_Stat.onSlowErase();
937 #                        ifdef _DEBUG
938                             assert( pDel->m_bUnlinked );
939 #                        endif
940                             return true;
941                         }
942                     }
943
944 #               ifdef _DEBUG
945                     pDel->m_bUnlinked = true;
946 #               endif
947                     if ( !bExtract ) {
948                         // We cannot free the node at this moment since RCU is locked
949                         // Link deleted nodes to a chain to free later
950                         link_for_remove( pos, pDel );
951                         m_Stat.onFastErase();
952                     }
953                     else
954                         m_Stat.onFastExtract();
955
956                     return true;
957                 }
958             }
959         }
960
961         enum finsd_fastpath_result {
962             find_fastpath_found,
963             find_fastpath_not_found,
964             find_fastpath_abort
965         };
966         template <typename Q, typename Compare, typename Func>
967         finsd_fastpath_result find_fastpath( Q& val, Compare cmp, Func f ) const
968         {
969             node_type * pPred;
970             marked_node_ptr pCur;
971             marked_node_ptr pSucc;
972             marked_node_ptr pNull;
973
974             back_off bkoff;
975
976             pPred = m_Head.head();
977             for ( int nLevel = static_cast<int>(m_nHeight.load(memory_model::memory_order_relaxed) - 1); nLevel >= 0; --nLevel ) {
978                 pCur = pPred->next(nLevel).load( memory_model::memory_order_acquire );
979                 if ( pCur == pNull )
980                     continue;
981
982                 while ( pCur != pNull ) {
983                     if ( pCur.bits() ) {
984                         // Wait until pCur is removed
985                         unsigned int nAttempt = 0;
986                         while ( pCur.bits() && nAttempt++ < 16 ) {
987                             bkoff();
988                             pCur = pPred->next(nLevel).load( memory_model::memory_order_acquire );
989                         }
990                         bkoff.reset();
991
992                         if ( pCur.bits() ) {
993                             // Maybe, we are on deleted node sequence
994                             // Abort searching, try slow-path
995                             return find_fastpath_abort;
996                         }
997                     }
998
999                     if ( pCur.ptr() ) {
1000                         int nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr() ), val );
1001                         if ( nCmp < 0 ) {
1002                             pPred = pCur.ptr();
1003                             pCur = pCur->next(nLevel).load( memory_model::memory_order_acquire );
1004                         }
1005                         else if ( nCmp == 0 ) {
1006                             // found
1007                             f( *node_traits::to_value_ptr( pCur.ptr() ), val );
1008                             return find_fastpath_found;
1009                         }
1010                         else // pCur > val - go down
1011                             break;
1012                     }
1013                 }
1014             }
1015
1016             return find_fastpath_not_found;
1017         }
1018
1019         template <typename Q, typename Compare, typename Func>
1020         bool find_slowpath( Q& val, Compare cmp, Func f, position& pos )
1021         {
1022             if ( find_position( val, pos, cmp, true )) {
1023                 assert( cmp( *node_traits::to_value_ptr( pos.pCur ), val ) == 0 );
1024
1025                 f( *node_traits::to_value_ptr( pos.pCur ), val );
1026                 return true;
1027             }
1028             else
1029                 return false;
1030         }
1031
1032         template <typename Q, typename Compare, typename Func>
1033         bool do_find_with( Q& val, Compare cmp, Func f )
1034         {
1035             position pos;
1036             bool bRet;
1037
1038             rcu_lock l;
1039
1040             switch ( find_fastpath( val, cmp, f )) {
1041             case find_fastpath_found:
1042                 m_Stat.onFindFastSuccess();
1043                 return true;
1044             case find_fastpath_not_found:
1045                 m_Stat.onFindFastFailed();
1046                 return false;
1047             default:
1048                 break;
1049             }
1050
1051             if ( find_slowpath( val, cmp, f, pos )) {
1052                 m_Stat.onFindSlowSuccess();
1053                 bRet = true;
1054             }
1055             else {
1056                 m_Stat.onFindSlowFailed();
1057                 bRet = false;
1058             }
1059
1060             defer_chain( pos );
1061
1062             return bRet;
1063         }
1064
1065         template <typename Q, typename Compare, typename Func>
1066         bool do_erase( Q const& val, Compare cmp, Func f )
1067         {
1068             check_deadlock_policy::check();
1069
1070             position pos;
1071             bool bRet;
1072
1073             {
1074                 rcu_lock rcuLock;
1075
1076                 if ( !find_position( val, pos, cmp, false ) ) {
1077                     m_Stat.onEraseFailed();
1078                     bRet = false;
1079                 }
1080                 else {
1081                     node_type * pDel = pos.pCur;
1082                     assert( cmp( *node_traits::to_value_ptr( pDel ), val ) == 0 );
1083
1084                     unsigned int nHeight = pDel->height();
1085                     if ( try_remove_at( pDel, pos, f, false )) {
1086                         --m_ItemCounter;
1087                         m_Stat.onRemoveNode( nHeight );
1088                         m_Stat.onEraseSuccess();
1089                         bRet = true;
1090                     }
1091                     else {
1092                         m_Stat.onEraseFailed();
1093                         bRet = false;
1094                     }
1095                 }
1096             }
1097
1098             dispose_chain( pos );
1099             return bRet;
1100         }
1101
1102         template <typename Q, typename Compare>
1103         value_type * do_extract_key( Q const& key, Compare cmp )
1104         {
1105             // RCU should be locked!!!
1106             assert( gc::is_locked() );
1107
1108             position pos;
1109             node_type * pDel;
1110
1111             if ( !find_position( key, pos, cmp, false ) ) {
1112                 m_Stat.onExtractFailed();
1113                 pDel = nullptr;
1114             }
1115             else {
1116                 pDel = pos.pCur;
1117                 assert( cmp( *node_traits::to_value_ptr( pDel ), key ) == 0 );
1118
1119                 unsigned int const nHeight = pDel->height();
1120
1121                 if ( try_remove_at( pDel, pos, [](value_type const&) {}, true )) {
1122                     --m_ItemCounter;
1123                     m_Stat.onRemoveNode( nHeight );
1124                     m_Stat.onExtractSuccess();
1125                 }
1126                 else {
1127                     m_Stat.onExtractFailed();
1128                     pDel = nullptr;
1129                 }
1130             }
1131
1132             defer_chain( pos );
1133             return pDel ? node_traits::to_value_ptr( pDel ) : nullptr;
1134         }
1135
1136         template <typename ExemptPtr, typename Q>
1137         bool do_extract( ExemptPtr& result, Q const& key )
1138         {
1139             check_deadlock_policy::check();
1140
1141             bool bReturn;
1142             {
1143                 rcu_lock l;
1144                 value_type * pDel = do_extract_key( key, key_comparator() );
1145                 bReturn = pDel != nullptr;
1146                 if ( bReturn )
1147                     result = pDel;
1148             }
1149
1150             dispose_deferred();
1151             return bReturn;
1152         }
1153
1154         template <typename ExemptPtr, typename Q, typename Less>
1155         bool do_extract_with( ExemptPtr& result, Q const& key, Less pred )
1156         {
1157             check_deadlock_policy::check();
1158
1159             bool bReturn;
1160             {
1161                 rcu_lock l;
1162                 value_type * pDel = do_extract_key( key, cds::opt::details::make_comparator_from_less<Less>() );
1163                 bReturn = pDel != nullptr;
1164                 if ( bReturn )
1165                     result = pDel;
1166             }
1167
1168             dispose_deferred();
1169             return bReturn;
1170         }
1171
1172         node_type * do_extract_min()
1173         {
1174             assert( gc::is_locked() );
1175
1176             position pos;
1177             node_type * pDel;
1178
1179             if ( !find_min_position( pos ) ) {
1180                 m_Stat.onExtractMinFailed();
1181                 pDel = nullptr;
1182             }
1183             else {
1184                 pDel = pos.pCur;
1185                 unsigned int const nHeight = pDel->height();
1186
1187                 if ( try_remove_at( pDel, pos, [](value_type const&) {}, true )) {
1188                     --m_ItemCounter;
1189                     m_Stat.onRemoveNode( nHeight );
1190                     m_Stat.onExtractMinSuccess();
1191                 }
1192                 else {
1193                     m_Stat.onExtractMinFailed();
1194                     pDel = nullptr;
1195                 }
1196             }
1197
1198             defer_chain( pos );
1199             return pDel;
1200         }
1201
1202         template <typename ExemptPtr>
1203         bool do_extract_min( ExemptPtr& result )
1204         {
1205             check_deadlock_policy::check();
1206
1207             bool bReturn;
1208             {
1209                 rcu_lock l;
1210                 node_type * pDel = do_extract_min();
1211                 bReturn = pDel != nullptr;
1212                 if ( bReturn )
1213                     result = node_traits::to_value_ptr(pDel);
1214             }
1215
1216             dispose_deferred();
1217             return bReturn;
1218         }
1219
1220         node_type * do_extract_max()
1221         {
1222             assert( gc::is_locked() );
1223
1224             position pos;
1225             node_type * pDel;
1226
1227             if ( !find_max_position( pos ) ) {
1228                 m_Stat.onExtractMaxFailed();
1229                 pDel = nullptr;
1230             }
1231             else {
1232                 pDel = pos.pCur;
1233                 unsigned int const nHeight = pDel->height();
1234
1235                 if ( try_remove_at( pDel, pos, [](value_type const&) {}, true )) {
1236                     --m_ItemCounter;
1237                     m_Stat.onRemoveNode( nHeight );
1238                     m_Stat.onExtractMaxSuccess();
1239                 }
1240                 else {
1241                     m_Stat.onExtractMaxFailed();
1242                     pDel = nullptr;
1243                 }
1244             }
1245
1246             defer_chain( pos );
1247             return pDel;
1248         }
1249
1250         template <typename ExemptPtr>
1251         bool do_extract_max( ExemptPtr& result )
1252         {
1253             check_deadlock_policy::check();
1254
1255             bool bReturn;
1256             {
1257                 rcu_lock l;
1258                 node_type * pDel = do_extract_max();
1259                 bReturn = pDel != nullptr;
1260                 if ( bReturn )
1261                     result = node_traits::to_value_ptr(pDel);
1262             }
1263
1264             dispose_deferred();
1265             return bReturn;
1266         }
1267
1268         void increase_height( unsigned int nHeight )
1269         {
1270             unsigned int nCur = m_nHeight.load( memory_model::memory_order_relaxed );
1271             if ( nCur < nHeight )
1272                 m_nHeight.compare_exchange_strong( nCur, nHeight, memory_model::memory_order_release, atomics::memory_order_relaxed );
1273         }
1274
1275         class deferred_list_iterator
1276         {
1277             node_type * pCur;
1278         public:
1279             explicit deferred_list_iterator( node_type * p )
1280                 : pCur(p)
1281             {}
1282             deferred_list_iterator()
1283                 : pCur( nullptr )
1284             {}
1285
1286             cds::urcu::retired_ptr operator *() const
1287             {
1288                 return cds::urcu::retired_ptr( node_traits::to_value_ptr(pCur), dispose_node );
1289             }
1290
1291             void operator ++()
1292             {
1293                 pCur = pCur->m_pDelChain;
1294             }
1295
1296             bool operator ==( deferred_list_iterator const& i ) const
1297             {
1298                 return pCur == i.pCur;
1299             }
1300             bool operator !=( deferred_list_iterator const& i ) const
1301             {
1302                 return !operator ==( i );
1303             }
1304         };
1305
1306         void dispose_chain( node_type * pHead )
1307         {
1308             // RCU should NOT be locked
1309             check_deadlock_policy::check();
1310
1311             gc::batch_retire( deferred_list_iterator( pHead ), deferred_list_iterator() );
1312         }
1313
1314         void dispose_chain( position& pos )
1315         {
1316             // RCU should NOT be locked
1317             check_deadlock_policy::check();
1318
1319             // Delete local chain
1320             if ( pos.pDelChain ) {
1321                 dispose_chain( pos.pDelChain );
1322                 pos.pDelChain = nullptr;
1323             }
1324
1325             // Delete deferred chain
1326             dispose_deferred();
1327         }
1328
1329         void dispose_deferred()
1330         {
1331             dispose_chain( m_pDeferredDelChain.exchange( nullptr, memory_model::memory_order_acq_rel ) );
1332         }
1333
1334         void defer_chain( position& pos )
1335         {
1336             if ( pos.pDelChain ) {
1337                 node_type * pHead = pos.pDelChain;
1338                 node_type * pTail = pHead;
1339                 while ( pTail->m_pDelChain )
1340                     pTail = pTail->m_pDelChain;
1341
1342                 node_type * pDeferList = m_pDeferredDelChain.load( memory_model::memory_order_relaxed );
1343                 do {
1344                     pTail->m_pDelChain = pDeferList;
1345                 } while ( !m_pDeferredDelChain.compare_exchange_weak( pDeferList, pHead, memory_model::memory_order_acq_rel, atomics::memory_order_relaxed ));
1346
1347                 pos.pDelChain = nullptr;
1348             }
1349         }
1350
1351         //@endcond
1352
1353     public:
1354         /// Default constructor
1355         SkipListSet()
1356             : m_Head( c_nMaxHeight )
1357             , m_nHeight( c_nMinHeight )
1358             , m_pDeferredDelChain( nullptr )
1359         {
1360             static_assert( (std::is_same< gc, typename node_type::gc >::value), "GC and node_type::gc must be the same type" );
1361
1362             // Barrier for head node
1363             atomics::atomic_thread_fence( memory_model::memory_order_release );
1364         }
1365
1366         /// Clears and destructs the skip-list
1367         ~SkipListSet()
1368         {
1369             clear();
1370         }
1371
1372     public:
1373         /// Iterator type
1374         typedef skip_list::details::iterator< gc, node_traits, back_off, false >  iterator;
1375
1376         /// Const iterator type
1377         typedef skip_list::details::iterator< gc, node_traits, back_off, true >   const_iterator;
1378
1379         /// Returns a forward iterator addressing the first element in a set
1380         iterator begin()
1381         {
1382             return iterator( *m_Head.head() );
1383         }
1384
1385         /// Returns a forward const iterator addressing the first element in a set
1386         const_iterator begin() const
1387         {
1388             return const_iterator( *m_Head.head() );
1389         }
1390
1391         /// Returns a forward const iterator addressing the first element in a set
1392         const_iterator cbegin() const
1393         {
1394             return const_iterator( *m_Head.head() );
1395         }
1396
1397         /// Returns a forward iterator that addresses the location succeeding the last element in a set.
1398         iterator end()
1399         {
1400             return iterator();
1401         }
1402
1403         /// Returns a forward const iterator that addresses the location succeeding the last element in a set.
1404         const_iterator end() const
1405         {
1406             return const_iterator();
1407         }
1408
1409         /// Returns a forward const iterator that addresses the location succeeding the last element in a set.
1410         const_iterator cend() const
1411         {
1412             return const_iterator();
1413         }
1414
1415     public:
1416         /// Inserts new node
1417         /**
1418             The function inserts \p val in the set if it does not contain
1419             an item with key equal to \p val.
1420
1421             The function applies RCU lock internally.
1422
1423             Returns \p true if \p val is placed into the set, \p false otherwise.
1424         */
1425         bool insert( value_type& val )
1426         {
1427             return insert( val, []( value_type& ) {} );
1428         }
1429
1430         /// Inserts new node
1431         /**
1432             This function is intended for derived non-intrusive containers.
1433
1434             The function allows to split creating of new item into two part:
1435             - create item with key only
1436             - insert new item into the set
1437             - if inserting is success, calls  \p f functor to initialize value-field of \p val.
1438
1439             The functor signature is:
1440             \code
1441                 void func( value_type& val );
1442             \endcode
1443             where \p val is the item inserted. User-defined functor \p f should guarantee that during changing
1444             \p val no any other changes could be made on this set's item by concurrent threads.
1445             The user-defined functor is called only if the inserting is success and may be passed by reference
1446             using \p std::ref.
1447
1448             RCU \p synchronize method can be called. RCU should not be locked.
1449         */
1450         template <typename Func>
1451         bool insert( value_type& val, Func f )
1452         {
1453             check_deadlock_policy::check();
1454
1455             position pos;
1456             bool bRet;
1457
1458             {
1459                 node_type * pNode = node_traits::to_node_ptr( val );
1460                 scoped_node_ptr scp( pNode );
1461                 unsigned int nHeight = pNode->height();
1462                 bool bTowerOk = nHeight > 1 && pNode->get_tower() != nullptr;
1463                 bool bTowerMade = false;
1464
1465                 rcu_lock rcuLock;
1466
1467                 while ( true )
1468                 {
1469                     bool bFound = find_position( val, pos, key_comparator(), true );
1470                     if ( bFound ) {
1471                         // scoped_node_ptr deletes the node tower if we create it
1472                         if ( !bTowerMade )
1473                             scp.release();
1474
1475                         m_Stat.onInsertFailed();
1476                         bRet = false;
1477                         break;
1478                     }
1479
1480                     if ( !bTowerOk ) {
1481                         build_node( pNode );
1482                         nHeight = pNode->height();
1483                         bTowerMade =
1484                             bTowerOk = true;
1485                     }
1486
1487                     if ( !insert_at_position( val, pNode, pos, f )) {
1488                         m_Stat.onInsertRetry();
1489                         continue;
1490                     }
1491
1492                     increase_height( nHeight );
1493                     ++m_ItemCounter;
1494                     m_Stat.onAddNode( nHeight );
1495                     m_Stat.onInsertSuccess();
1496                     scp.release();
1497                     bRet =  true;
1498                     break;
1499                 }
1500             }
1501
1502             dispose_chain( pos );
1503
1504             return bRet;
1505         }
1506
1507         /// Ensures that the \p val exists in the set
1508         /**
1509             The operation performs inserting or changing data with lock-free manner.
1510
1511             If the item \p val is not found in the set, then \p val is inserted into the set.
1512             Otherwise, the functor \p func is called with item found.
1513             The functor signature is:
1514             \code
1515                 void func( bool bNew, value_type& item, value_type& val );
1516             \endcode
1517             with arguments:
1518             - \p bNew - \p true if the item has been inserted, \p false otherwise
1519             - \p item - item of the set
1520             - \p val - argument \p val passed into the \p %ensure() function
1521             If new item has been inserted (i.e. \p bNew is \p true) then \p item and \p val arguments
1522             refer to the same thing.
1523
1524             The functor can change non-key fields of the \p item; however, \p func must guarantee
1525             that during changing no any other modifications could be made on this item by concurrent threads.
1526
1527             RCU \p synchronize method can be called. RCU should not be locked.
1528
1529             Returns std::pair<bool, bool> where \p first is \p true if operation is successfull,
1530             \p second is \p true if new item has been added or \p false if the item with \p key
1531             already is in the set.
1532
1533             @warning See \ref cds_intrusive_item_creating "insert item troubleshooting"
1534         */
1535         template <typename Func>
1536         std::pair<bool, bool> ensure( value_type& val, Func func )
1537         {
1538             check_deadlock_policy::check();
1539
1540             position pos;
1541             std::pair<bool, bool> bRet( true, false );
1542
1543             {
1544                 node_type * pNode = node_traits::to_node_ptr( val );
1545                 scoped_node_ptr scp( pNode );
1546                 unsigned int nHeight = pNode->height();
1547                 bool bTowerOk = nHeight > 1 && pNode->get_tower() != nullptr;
1548                 bool bTowerMade = false;
1549
1550                 rcu_lock rcuLock;
1551                 while ( true )
1552                 {
1553                     bool bFound = find_position( val, pos, key_comparator(), true );
1554                     if ( bFound ) {
1555                         // scoped_node_ptr deletes the node tower if we create it before
1556                         if ( !bTowerMade )
1557                             scp.release();
1558
1559                         func( false, *node_traits::to_value_ptr(pos.pCur), val );
1560                         m_Stat.onEnsureExist();
1561                         break;
1562                     }
1563
1564                     if ( !bTowerOk ) {
1565                         build_node( pNode );
1566                         nHeight = pNode->height();
1567                         bTowerMade =
1568                             bTowerOk = true;
1569                     }
1570
1571                     if ( !insert_at_position( val, pNode, pos, [&func]( value_type& item ) { func( true, item, item ); })) {
1572                         m_Stat.onInsertRetry();
1573                         continue;
1574                     }
1575
1576                     increase_height( nHeight );
1577                     ++m_ItemCounter;
1578                     scp.release();
1579                     m_Stat.onAddNode( nHeight );
1580                     m_Stat.onEnsureNew();
1581                     bRet.second = true;
1582                     break;
1583                 }
1584             }
1585
1586             dispose_chain( pos );
1587
1588             return bRet;
1589         }
1590
1591         /// Unlinks the item \p val from the set
1592         /**
1593             The function searches the item \p val in the set and unlink it from the set
1594             if it is found and is equal to \p val.
1595
1596             Difference between \p erase() and \p %unlink() functions: \p erase() finds <i>a key</i>
1597             and deletes the item found. \p %unlink() searches an item by key and deletes it
1598             only if \p val is an item of that set, i.e. the pointer to item found
1599             is equal to <tt> &val </tt>.
1600
1601             RCU \p synchronize method can be called. RCU should not be locked.
1602
1603             The \ref disposer specified in \p Traits class template parameter is called
1604             by garbage collector \p GC asynchronously.
1605
1606             The function returns \p true if success and \p false otherwise.
1607         */
1608         bool unlink( value_type& val )
1609         {
1610             check_deadlock_policy::check();
1611
1612             position pos;
1613             bool bRet;
1614
1615             {
1616                 rcu_lock rcuLock;
1617
1618                 if ( !find_position( val, pos, key_comparator(), false ) ) {
1619                     m_Stat.onUnlinkFailed();
1620                     bRet = false;
1621                 }
1622                 else {
1623                     node_type * pDel = pos.pCur;
1624                     assert( key_comparator()( *node_traits::to_value_ptr( pDel ), val ) == 0 );
1625
1626                     unsigned int nHeight = pDel->height();
1627
1628                     if ( node_traits::to_value_ptr( pDel ) == &val && try_remove_at( pDel, pos, [](value_type const&) {}, false )) {
1629                         --m_ItemCounter;
1630                         m_Stat.onRemoveNode( nHeight );
1631                         m_Stat.onUnlinkSuccess();
1632                         bRet = true;
1633                     }
1634                     else {
1635                         m_Stat.onUnlinkFailed();
1636                         bRet = false;
1637                     }
1638                 }
1639             }
1640
1641             dispose_chain( pos );
1642
1643             return bRet;
1644         }
1645
1646         /// Extracts the item from the set with specified \p key
1647         /** \anchor cds_intrusive_SkipListSet_rcu_extract
1648             The function searches an item with key equal to \p key in the set,
1649             unlinks it from the set, places it to \p result parameter, and returns \p true.
1650             If the item with key equal to \p key is not found the function returns \p false.
1651
1652             Note the compare functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1653
1654             RCU \p synchronize method can be called. RCU should NOT be locked.
1655             The function does not call the disposer for the item found.
1656             The disposer will be implicitly invoked when \p result object is destroyed or when
1657             <tt>result.release()</tt> is called, see cds::urcu::exempt_ptr for explanation.
1658             @note Before reusing \p result object you should call its \p release() method.
1659             Example:
1660             \code
1661             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1662             skip_list theList;
1663             // ...
1664
1665             typename skip_list::exempt_ptr ep;
1666             if ( theList.extract( ep, 5 ) ) {
1667                 // Deal with ep
1668                 //...
1669
1670                 // Dispose returned item.
1671                 ep.release();
1672             }
1673             \endcode
1674         */
1675         template <typename Q>
1676         bool extract( exempt_ptr& result, Q const& key )
1677         {
1678             return do_extract( result, key );
1679         }
1680
1681         /// Extracts the item from the set with comparing functor \p pred
1682         /**
1683             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_extract "extract(exempt_ptr&, Q const&)"
1684             but \p pred predicate is used for key comparing.
1685             \p Less has the interface like \p std::less.
1686             \p pred must imply the same element order as the comparator used for building the set.
1687         */
1688         template <typename Q, typename Less>
1689         bool extract_with( exempt_ptr& result, Q const& key, Less pred )
1690         {
1691             return do_extract_with( result, key, pred );
1692         }
1693
1694         /// Extracts an item with minimal key from the list
1695         /**
1696             The function searches an item with minimal key, unlinks it, and returns the item found in \p result parameter.
1697             If the skip-list is empty the function returns \p false.
1698
1699             RCU \p synchronize method can be called. RCU should NOT be locked.
1700             The function does not call the disposer for the item found.
1701             The disposer will be implicitly invoked when \p result object is destroyed or when
1702             <tt>result.release()</tt> is called, see cds::urcu::exempt_ptr for explanation.
1703             @note Before reusing \p result object you should call its \p release() method.
1704             Example:
1705             \code
1706             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1707             skip_list theList;
1708             // ...
1709
1710             typename skip_list::exempt_ptr ep;
1711             if ( theList.extract_min(ep)) {
1712                 // Deal with ep
1713                 //...
1714
1715                 // Dispose returned item.
1716                 ep.release();
1717             }
1718             \endcode
1719
1720             @note Due the concurrent nature of the list, the function extracts <i>nearly</i> minimum key.
1721             It means that the function gets leftmost item and tries to unlink it.
1722             During unlinking, a concurrent thread may insert an item with key less than leftmost item's key.
1723             So, the function returns the item with minimum key at the moment of list traversing.
1724         */
1725         bool extract_min( exempt_ptr& result )
1726         {
1727             return do_extract_min( result );
1728         }
1729
1730         /// Extracts an item with maximal key from the list
1731         /**
1732             The function searches an item with maximal key, unlinks it, and returns the item found in \p result parameter.
1733             If the skip-list is empty the function returns \p false.
1734
1735             RCU \p synchronize method can be called. RCU should NOT be locked.
1736             The function does not call the disposer for the item found.
1737             The disposer will be implicitly invoked when \p result object is destroyed or when
1738             <tt>result.release()</tt> is called, see cds::urcu::exempt_ptr for explanation.
1739             @note Before reusing \p result object you should call its \p release() method.
1740             Example:
1741             \code
1742             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1743             skip_list theList;
1744             // ...
1745
1746             typename skip_list::exempt_ptr ep;
1747             if ( theList.extract_max(ep) ) {
1748                 // Deal with ep
1749                 //...
1750                 // Dispose returned item.
1751                 ep.release();
1752             }
1753             \endcode
1754
1755             @note Due the concurrent nature of the list, the function extracts <i>nearly</i> maximal key.
1756             It means that the function gets rightmost item and tries to unlink it.
1757             During unlinking, a concurrent thread can insert an item with key greater than rightmost item's key.
1758             So, the function returns the item with maximum key at the moment of list traversing.
1759         */
1760         bool extract_max( exempt_ptr& result )
1761         {
1762             return do_extract_max( result );
1763         }
1764
1765         /// Deletes the item from the set
1766         /** \anchor cds_intrusive_SkipListSet_rcu_erase
1767             The function searches an item with key equal to \p key in the set,
1768             unlinks it from the set, and returns \p true.
1769             If the item with key equal to \p key is not found the function return \p false.
1770
1771             Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1772
1773             RCU \p synchronize method can be called. RCU should not be locked.
1774         */
1775         template <typename Q>
1776         bool erase( const Q& key )
1777         {
1778             return do_erase( key, key_comparator(), [](value_type const&) {} );
1779         }
1780
1781         /// Delete the item from the set with comparing functor \p pred
1782         /**
1783             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_erase "erase(Q const&)"
1784             but \p pred predicate is used for key comparing.
1785             \p Less has the interface like \p std::less.
1786             \p pred must imply the same element order as the comparator used for building the set.
1787         */
1788         template <typename Q, typename Less>
1789         bool erase_with( const Q& key, Less pred )
1790         {
1791             return do_erase( key, cds::opt::details::make_comparator_from_less<Less>(), [](value_type const&) {} );
1792         }
1793
1794         /// Deletes the item from the set
1795         /** \anchor cds_intrusive_SkipListSet_rcu_erase_func
1796             The function searches an item with key equal to \p key in the set,
1797             call \p f functor with item found, unlinks it from the set, and returns \p true.
1798             The \ref disposer specified in \p Traits class template parameter is called
1799             by garbage collector \p GC asynchronously.
1800
1801             The \p Func interface is
1802             \code
1803             struct functor {
1804                 void operator()( value_type const& item );
1805             };
1806             \endcode
1807             If the item with key equal to \p key is not found the function return \p false.
1808
1809             Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1810
1811             RCU \p synchronize method can be called. RCU should not be locked.
1812         */
1813         template <typename Q, typename Func>
1814         bool erase( Q const& key, Func f )
1815         {
1816             return do_erase( key, key_comparator(), f );
1817         }
1818
1819         /// Delete the item from the set with comparing functor \p pred
1820         /**
1821             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_erase_func "erase(Q const&, Func)"
1822             but \p pred predicate is used for key comparing.
1823             \p Less has the interface like \p std::less.
1824             \p pred must imply the same element order as the comparator used for building the set.
1825         */
1826         template <typename Q, typename Less, typename Func>
1827         bool erase_with( Q const& key, Less pred, Func f )
1828         {
1829             return do_erase( key, cds::opt::details::make_comparator_from_less<Less>(), f );
1830         }
1831
1832         /// Finds \p key
1833         /** @anchor cds_intrusive_SkipListSet_rcu_find_func
1834             The function searches the item with key equal to \p key and calls the functor \p f for item found.
1835             The interface of \p Func functor is:
1836             \code
1837             struct functor {
1838                 void operator()( value_type& item, Q& key );
1839             };
1840             \endcode
1841             where \p item is the item found, \p key is the <tt>find</tt> function argument.
1842
1843             You can pass \p f argument by value or by reference using \p std::ref.
1844
1845             The functor can change non-key fields of \p item. Note that the functor is only guarantee
1846             that \p item cannot be disposed during functor is executing.
1847             The functor does not serialize simultaneous access to the set \p item. If such access is
1848             possible you must provide your own synchronization schema on item level to exclude unsafe item modifications.
1849
1850             The \p key argument is non-const since it can be used as \p f functor destination i.e., the functor
1851             can modify both arguments.
1852
1853             The function applies RCU lock internally.
1854
1855             The function returns \p true if \p key is found, \p false otherwise.
1856         */
1857         template <typename Q, typename Func>
1858         bool find( Q& key, Func f )
1859         {
1860             return do_find_with( key, key_comparator(), f );
1861         }
1862
1863         /// Finds the key \p key with comparing functor \p pred
1864         /**
1865             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_find_func "find(Q&, Func)"
1866             but \p cmp is used for key comparison.
1867             \p Less functor has the interface like \p std::less.
1868             \p cmp must imply the same element order as the comparator used for building the set.
1869         */
1870         template <typename Q, typename Less, typename Func>
1871         bool find_with( Q& key, Less pred, Func f )
1872         {
1873             return do_find_with( key, cds::opt::details::make_comparator_from_less<Less>(), f );
1874         }
1875
1876         /// Finds \p key
1877         /** @anchor cds_intrusive_SkipListSet_rcu_find_val
1878             The function searches the item with key equal to \p key
1879             and returns \p true if it is found, and \p false otherwise.
1880
1881             The function applies RCU lock internally.
1882         */
1883         template <typename Q>
1884         bool find( Q const& key )
1885         {
1886             return do_find_with( key, key_comparator(), [](value_type& , Q const& ) {} );
1887         }
1888
1889         /// Finds \p key with comparing functor \p pred
1890         /**
1891             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_find_val "find(Q const&)"
1892             but \p pred is used for key compare.
1893             \p Less functor has the interface like \p std::less.
1894             \p pred must imply the same element order as the comparator used for building the set.
1895         */
1896         template <typename Q, typename Less>
1897         bool find_with( Q const& key, Less pred )
1898         {
1899             return do_find_with( key, cds::opt::details::make_comparator_from_less<Less>(), [](value_type& , Q const& ) {} );
1900         }
1901
1902         /// Finds \p key and return the item found
1903         /** \anchor cds_intrusive_SkipListSet_rcu_get
1904             The function searches the item with key equal to \p key and returns the pointer to item found.
1905             If \p key is not found it returns \p nullptr.
1906
1907             Note the compare functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1908
1909             RCU should be locked before call of this function.
1910             Returned item is valid only while RCU is locked:
1911             \code
1912             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1913             skip_list theList;
1914             // ...
1915             {
1916                 // Lock RCU
1917                 skip_list::rcu_lock lock;
1918
1919                 foo * pVal = theList.get( 5 );
1920                 if ( pVal ) {
1921                     // Deal with pVal
1922                     //...
1923                 }
1924             }
1925             // Unlock RCU by rcu_lock destructor
1926             // pVal can be retired by disposer at any time after RCU has been unlocked
1927             \endcode
1928
1929             After RCU unlocking the \p %force_dispose member function can be called manually,
1930             see \ref force_dispose for explanation.
1931         */
1932         template <typename Q>
1933         value_type * get( Q const& key )
1934         {
1935             assert( gc::is_locked());
1936
1937             value_type * pFound;
1938             return do_find_with( key, key_comparator(), [&pFound](value_type& found, Q const& ) { pFound = &found; } )
1939                 ? pFound : nullptr;
1940         }
1941
1942         /// Finds \p key and return the item found
1943         /**
1944             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_get "get(Q const&)"
1945             but \p pred is used for comparing the keys.
1946
1947             \p Less functor has the semantics like \p std::less but should take arguments of type \ref value_type and \p Q
1948             in any order.
1949             \p pred must imply the same element order as the comparator used for building the set.
1950         */
1951         template <typename Q, typename Less>
1952         value_type * get_with( Q const& key, Less pred )
1953         {
1954             assert( gc::is_locked());
1955
1956             value_type * pFound;
1957             return do_find_with( key, cds::opt::details::make_comparator_from_less<Less>(),
1958                 [&pFound](value_type& found, Q const& ) { pFound = &found; } )
1959                 ? pFound : nullptr;
1960         }
1961
1962         /// Returns item count in the set
1963         /**
1964             The value returned depends on item counter type provided by \p Traits template parameter.
1965             For \p atomicity::empty_item_counter the function always returns 0.
1966             Therefore, the function is not suitable for checking the set emptiness, use \p empty()
1967             member function for this purpose.
1968         */
1969         size_t size() const
1970         {
1971             return m_ItemCounter;
1972         }
1973
1974         /// Checks if the set is empty
1975         bool empty() const
1976         {
1977             return m_Head.head()->next( 0 ).load( memory_model::memory_order_relaxed ) == nullptr;
1978         }
1979
1980         /// Clears the set (not atomic)
1981         /**
1982             The function unlink all items from the set.
1983             The function is not atomic, thus, in multi-threaded environment with parallel insertions
1984             this sequence
1985             \code
1986             set.clear();
1987             assert( set.empty() );
1988             \endcode
1989             the assertion could be raised.
1990
1991             For each item the \p disposer will be called automatically after unlinking.
1992         */
1993         void clear()
1994         {
1995             exempt_ptr ep;
1996             while ( extract_min(ep) )
1997                 ep.release();
1998         }
1999
2000         /// Returns maximum height of skip-list. The max height is a constant for each object and does not exceed 32.
2001         static CDS_CONSTEXPR unsigned int max_height() CDS_NOEXCEPT
2002         {
2003             return c_nMaxHeight;
2004         }
2005
2006         /// Returns const reference to internal statistics
2007         stat const& statistics() const
2008         {
2009             return m_Stat;
2010         }
2011
2012         /// Clears internal list of ready-to-remove items passing it to RCU reclamation cycle
2013         /** @anchor cds_intrusive_SkipListSet_rcu_force_dispose
2014             Skip list has complex multi-step algorithm for removing an item. In fact, when you
2015             remove the item it is just marked as removed that is enough for the success of your operation.
2016             Actual removing can take place in the future, in another call or even in another thread.
2017             Inside RCU lock the removed item cannot be passed to RCU reclamation cycle
2018             since it can lead to deadlock. To solve this problem, the current skip list implementation
2019             has internal list of items which is ready to remove but is not yet passed to RCU reclamation.
2020             Usually, this list will be passed to RCU reclamation in the next suitable call of skip list member function.
2021             In some cases we want to pass it to RCU reclamation immediately after RCU unlocking.
2022             This function provides such opportunity: it checks whether the RCU is not locked and if it is true
2023             the function passes the internal ready-to-remove list to RCU reclamation cycle.
2024
2025             The RCU \p synchronize can be called.
2026         */
2027         void force_dispose()
2028         {
2029             if ( !gc::is_locked() )
2030                 dispose_deferred();
2031         }
2032     };
2033
2034 }} // namespace cds::intrusive
2035
2036
2037 #endif // #ifndef __CDS_INTRUSIVE_SKIP_LIST_RCU_H