5569052a9789d9e6f4513cb53dfa2005dc612dcb
[libcds.git] / cds / intrusive / skip_list_rcu.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_SKIP_LIST_RCU_H
4 #define __CDS_INTRUSIVE_SKIP_LIST_RCU_H
5
6 #include <type_traits>
7 #include <memory>
8 #include <cds/intrusive/details/skip_list_base.h>
9 #include <cds/opt/compare.h>
10 #include <cds/urcu/details/check_deadlock.h>
11 #include <cds/details/binary_functor_wrapper.h>
12 #include <cds/urcu/exempt_ptr.h>
13
14
15 namespace cds { namespace intrusive {
16
17     //@cond
18     namespace skip_list {
19
20         template <class RCU, typename Tag>
21         class node< cds::urcu::gc< RCU >, Tag >
22         {
23         public:
24             typedef cds::urcu::gc< RCU >    gc; ///< Garbage collector
25             typedef Tag     tag               ; ///< tag
26
27             // Mark bits:
28             //  bit 0 - the item is logically deleted
29             //  bit 1 - the item is extracted (only for level 0)
30             typedef cds::details::marked_ptr<node, 3> marked_ptr;        ///< marked pointer
31             typedef atomics::atomic< marked_ptr >     atomic_marked_ptr; ///< atomic marked pointer
32             typedef atomic_marked_ptr                 tower_item_type;
33
34         protected:
35             atomic_marked_ptr       m_pNext;     ///< Next item in bottom-list (list at level 0)
36         public:
37             node *                  m_pDelChain; ///< Deleted node chain (local for a thread)
38 #       ifdef _DEBUG
39             bool volatile           m_bLinked;
40             bool volatile           m_bUnlinked;
41 #       endif
42         protected:
43             unsigned int            m_nHeight;   ///< Node height (size of m_arrNext array). For node at level 0 the height is 1.
44             atomic_marked_ptr *     m_arrNext;   ///< Array of next items for levels 1 .. m_nHeight - 1. For node at level 0 \p m_arrNext is \p nullptr
45
46         public:
47             /// Constructs a node of height 1 (a bottom-list node)
48             CDS_CONSTEXPR node()
49                 : m_pNext( nullptr )
50                 , m_pDelChain( nullptr )
51 #       ifdef _DEBUG
52                 , m_bLinked( false )
53                 , m_bUnlinked( false )
54 #       endif
55                 , m_nHeight(1)
56                 , m_arrNext( nullptr )
57             {}
58
59 #       ifdef _DEBUG
60             ~node()
61             {
62                 assert( !m_bLinked || m_bUnlinked );
63             }
64 #       endif
65
66             /// Constructs a node of height \p nHeight
67             void make_tower( unsigned int nHeight, atomic_marked_ptr * nextTower )
68             {
69                 assert( nHeight > 0 );
70                 assert( (nHeight == 1 && nextTower == nullptr)  // bottom-list node
71                         || (nHeight > 1 && nextTower != nullptr)   // node at level of more than 0
72                     );
73
74                 m_arrNext = nextTower;
75                 m_nHeight = nHeight;
76             }
77
78             atomic_marked_ptr * release_tower()
79             {
80                 atomic_marked_ptr * pTower = m_arrNext;
81                 m_arrNext = nullptr;
82                 m_nHeight = 1;
83                 return pTower;
84             }
85
86             atomic_marked_ptr * get_tower() const
87             {
88                 return m_arrNext;
89             }
90
91             void clear_tower()
92             {
93                 for ( unsigned int nLevel = 1; nLevel < m_nHeight; ++nLevel )
94                     next(nLevel).store( marked_ptr(), atomics::memory_order_relaxed );
95             }
96
97             /// Access to element of next pointer array
98             atomic_marked_ptr& next( unsigned int nLevel )
99             {
100                 assert( nLevel < height() );
101                 assert( nLevel == 0 || (nLevel > 0 && m_arrNext != nullptr) );
102
103                 return nLevel ? m_arrNext[nLevel - 1] : m_pNext;
104             }
105
106             /// Access to element of next pointer array (const version)
107             atomic_marked_ptr const& next( unsigned int nLevel ) const
108             {
109                 assert( nLevel < height() );
110                 assert( nLevel == 0 || nLevel > 0 && m_arrNext != nullptr );
111
112                 return nLevel ? m_arrNext[nLevel - 1] : m_pNext;
113             }
114
115             /// Access to element of next pointer array (same as \ref next function)
116             atomic_marked_ptr& operator[]( unsigned int nLevel )
117             {
118                 return next( nLevel );
119             }
120
121             /// Access to element of next pointer array (same as \ref next function)
122             atomic_marked_ptr const& operator[]( unsigned int nLevel ) const
123             {
124                 return next( nLevel );
125             }
126
127             /// Height of the node
128             unsigned int height() const
129             {
130                 return m_nHeight;
131             }
132
133             /// Clears internal links
134             void clear()
135             {
136                 assert( m_arrNext == nullptr );
137                 m_pNext.store( marked_ptr(), atomics::memory_order_release );
138                 m_pDelChain = nullptr;
139             }
140
141             bool is_cleared() const
142             {
143                 return m_pNext == atomic_marked_ptr()
144                     && m_arrNext == nullptr
145                     && m_nHeight <= 1;
146             }
147         };
148     } // namespace skip_list
149     //@endcond
150
151     //@cond
152     namespace skip_list { namespace details {
153
154         template <class RCU, typename NodeTraits, typename BackOff, bool IsConst>
155         class iterator< cds::urcu::gc< RCU >, NodeTraits, BackOff, IsConst >
156         {
157         public:
158             typedef cds::urcu::gc< RCU >                gc;
159             typedef NodeTraits                          node_traits;
160             typedef BackOff                             back_off;
161             typedef typename node_traits::node_type     node_type;
162             typedef typename node_traits::value_type    value_type;
163             static bool const c_isConst = IsConst;
164
165             typedef typename std::conditional< c_isConst, value_type const &, value_type &>::type   value_ref;
166
167         protected:
168             typedef typename node_type::marked_ptr          marked_ptr;
169             typedef typename node_type::atomic_marked_ptr   atomic_marked_ptr;
170
171             node_type *             m_pNode;
172
173         protected:
174             void next()
175             {
176                 // RCU should be locked before iterating!!!
177                 assert( gc::is_locked() );
178
179                 back_off bkoff;
180
181                 for (;;) {
182                     if ( m_pNode->next( m_pNode->height() - 1 ).load( atomics::memory_order_acquire ).bits() ) {
183                         // Current node is marked as deleted. So, its next pointer can point to anything
184                         // In this case we interrupt our iteration and returns end() iterator.
185                         *this = iterator();
186                         return;
187                     }
188
189                     marked_ptr p = m_pNode->next(0).load( atomics::memory_order_relaxed );
190                     node_type * pp = p.ptr();
191                     if ( p.bits() ) {
192                         // p is marked as deleted. Spin waiting for physical removal
193                         bkoff();
194                         continue;
195                     }
196                     else if ( pp && pp->next( pp->height() - 1 ).load( atomics::memory_order_relaxed ).bits() ) {
197                         // p is marked as deleted. Spin waiting for physical removal
198                         bkoff();
199                         continue;
200                     }
201
202                     m_pNode = pp;
203                     break;
204                 }
205             }
206
207         public: // for internal use only!!!
208             iterator( node_type& refHead )
209                 : m_pNode( nullptr )
210             {
211                 // RCU should be locked before iterating!!!
212                 assert( gc::is_locked() );
213
214                 back_off bkoff;
215
216                 for (;;) {
217                     marked_ptr p = refHead.next(0).load( atomics::memory_order_relaxed );
218                     if ( !p.ptr() ) {
219                         // empty skip-list
220                         break;
221                     }
222
223                     node_type * pp = p.ptr();
224                     // Logically deleted node is marked from highest level
225                     if ( !pp->next( pp->height() - 1 ).load( atomics::memory_order_acquire ).bits() ) {
226                         m_pNode = pp;
227                         break;
228                     }
229
230                     bkoff();
231                 }
232             }
233
234         public:
235             iterator()
236                 : m_pNode( nullptr )
237             {
238                 // RCU should be locked before iterating!!!
239                 assert( gc::is_locked() );
240             }
241
242             iterator( iterator const& s)
243                 : m_pNode( s.m_pNode )
244             {
245                 // RCU should be locked before iterating!!!
246                 assert( gc::is_locked() );
247             }
248
249             value_type * operator ->() const
250             {
251                 assert( m_pNode != nullptr );
252                 assert( node_traits::to_value_ptr( m_pNode ) != nullptr );
253
254                 return node_traits::to_value_ptr( m_pNode );
255             }
256
257             value_ref operator *() const
258             {
259                 assert( m_pNode != nullptr );
260                 assert( node_traits::to_value_ptr( m_pNode ) != nullptr );
261
262                 return *node_traits::to_value_ptr( m_pNode );
263             }
264
265             /// Pre-increment
266             iterator& operator ++()
267             {
268                 next();
269                 return *this;
270             }
271
272             iterator& operator = (const iterator& src)
273             {
274                 m_pNode = src.m_pNode;
275                 return *this;
276             }
277
278             template <typename Bkoff, bool C>
279             bool operator ==(iterator<gc, node_traits, Bkoff, C> const& i ) const
280             {
281                 return m_pNode == i.m_pNode;
282             }
283             template <typename Bkoff, bool C>
284             bool operator !=(iterator<gc, node_traits, Bkoff, C> const& i ) const
285             {
286                 return !( *this == i );
287             }
288         };
289     }}  // namespace skip_list::details
290     //@endcond
291
292     /// Lock-free skip-list set (template specialization for \ref cds_urcu_desc "RCU")
293     /** @ingroup cds_intrusive_map
294         @anchor cds_intrusive_SkipListSet_rcu
295
296         The implementation of well-known probabilistic data structure called skip-list
297         invented by W.Pugh in his papers:
298             - [1989] W.Pugh Skip Lists: A Probabilistic Alternative to Balanced Trees
299             - [1990] W.Pugh A Skip List Cookbook
300
301         A skip-list is a probabilistic data structure that provides expected logarithmic
302         time search without the need of rebalance. The skip-list is a collection of sorted
303         linked list. Nodes are ordered by key. Each node is linked into a subset of the lists.
304         Each list has a level, ranging from 0 to 32. The bottom-level list contains
305         all the nodes, and each higher-level list is a sublist of the lower-level lists.
306         Each node is created with a random top level (with a random height), and belongs
307         to all lists up to that level. The probability that a node has the height 1 is 1/2.
308         The probability that a node has the height N is 1/2 ** N (more precisely,
309         the distribution depends on an random generator provided, but our generators
310         have this property).
311
312         The lock-free variant of skip-list is implemented according to book
313             - [2008] M.Herlihy, N.Shavit "The Art of Multiprocessor Programming",
314                 chapter 14.4 "A Lock-Free Concurrent Skiplist".
315
316         <b>Template arguments</b>:
317             - \p RCU - one of \ref cds_urcu_gc "RCU type"
318             - \p T - type to be stored in the list. The type must be based on \p skip_list::node (for \p skip_list::base_hook)
319                 or it must have a member of type \p skip_list::node (for \p skip_list::member_hook).
320             - \p Traits - set traits, default is \p skip_list::type_traits
321                 It is possible to declare option-based list with \p cds::intrusive::skip_list::make_traits metafunction 
322                 instead of \p Traits template argument.
323
324         @note Before including <tt><cds/intrusive/skip_list_rcu.h></tt> you should include appropriate RCU header file,
325         see \ref cds_urcu_gc "RCU type" for list of existing RCU class and corresponding header files.
326
327         <b>Iterators</b>
328
329         The class supports a forward iterator (\ref iterator and \ref const_iterator).
330         The iteration is ordered.
331
332         You may iterate over skip-list set items only under RCU lock.
333         Only in this case the iterator is thread-safe since
334         while RCU is locked any set's item cannot be reclaimed.
335
336         @note The requirement of RCU lock during iterating means that any type of modification of the skip list
337         (i.e. inserting, erasing and so on) is not possible.
338
339         @warning The iterator object cannot be passed between threads.
340
341         Example how to use skip-list set iterators:
342         \code
343         // First, you should include the header for RCU type you have chosen
344         #include <cds/urcu/general_buffered.h>
345         #include <cds/intrusive/skip_list_rcu.h>
346
347         typedef cds::urcu::gc< cds::urcu::general_buffered<> > rcu_type;
348
349         struct Foo {
350             // ...
351         };
352
353         // Traits for your skip-list.
354         // At least, you should define cds::opt::less or cds::opt::compare for Foo struct
355         struct my_traits: public cds::intrusive::skip_list::traits
356         {
357             // ...
358         };
359         typedef cds::intrusive::SkipListSet< rcu_type, Foo, my_traits > my_skiplist_set;
360
361         my_skiplist_set theSet;
362
363         // ...
364
365         // Begin iteration
366         {
367             // Apply RCU locking manually
368             typename rcu_type::scoped_lock sl;
369
370             for ( auto it = theList.begin(); it != theList.end(); ++it ) {
371                 // ...
372             }
373
374             // rcu_type::scoped_lock destructor releases RCU lock implicitly
375         }
376         \endcode
377
378         The iterator class supports the following minimalistic interface:
379         \code
380         struct iterator {
381             // Default ctor
382             iterator();
383
384             // Copy ctor
385             iterator( iterator const& s);
386
387             value_type * operator ->() const;
388             value_type& operator *() const;
389
390             // Pre-increment
391             iterator& operator ++();
392
393             // Copy assignment
394             iterator& operator = (const iterator& src);
395
396             bool operator ==(iterator const& i ) const;
397             bool operator !=(iterator const& i ) const;
398         };
399         \endcode
400         Note, the iterator object returned by \ref end, \p cend member functions points to \p nullptr and should not be dereferenced.
401
402         <b>How to use</b>
403
404         You should incorporate skip_list::node into your struct \p T and provide
405         appropriate skip_list::traits::hook in your \p Traits template parameters. Usually, for \p Traits you
406         define a struct based on \p skip_list::traits.
407
408         Example for <tt>cds::urcu::general_buffered<></tt> RCU and base hook:
409         \code
410         // First, you should include the header for RCU type you have chosen
411         #include <cds/urcu/general_buffered.h>
412
413         // Include RCU skip-list specialization
414         #include <cds/intrusive/skip_list_rcu.h>
415
416         // RCU type typedef
417         typedef cds::urcu::gc< cds::urcu::general_buffered<> > rcu_type;
418
419         // Data stored in skip list
420         struct my_data: public cds::intrusive::skip_list::node< rcu_type >
421         {
422             // key field
423             std::string     strKey;
424
425             // other data
426             // ...
427         };
428
429         // my_data compare functor
430         struct my_data_cmp {
431             int operator()( const my_data& d1, const my_data& d2 )
432             {
433                 return d1.strKey.compare( d2.strKey );
434             }
435
436             int operator()( const my_data& d, const std::string& s )
437             {
438                 return d.strKey.compare(s);
439             }
440
441             int operator()( const std::string& s, const my_data& d )
442             {
443                 return s.compare( d.strKey );
444             }
445         };
446
447         // Declare traits
448         struct my_traits: public cds::intrusive::skip_list::traits
449         {
450             typedef cds::intrusive::skip_list::base_hook< cds::opt::gc< rcu_type > >   hook;
451             typedef my_data_cmp compare;
452         };
453
454         // Declare skip-list set type
455         typedef cds::intrusive::SkipListSet< rcu_type, my_data, my_traits >     traits_based_set;
456         \endcode
457
458         Equivalent option-based code:
459         \code
460         #include <cds/urcu/general_buffered.h>
461         #include <cds/intrusive/skip_list_rcu.h>
462
463         typedef cds::urcu::gc< cds::urcu::general_buffered<> > rcu_type;
464
465         struct my_data {
466             // see above
467         };
468         struct compare {
469             // see above
470         };
471
472         // Declare option-based skip-list set
473         typedef cds::intrusive::SkipListSet< rcu_type
474             ,my_data
475             , typename cds::intrusive::skip_list::make_traits<
476                 cds::intrusive::opt::hook< cds::intrusive::skip_list::base_hook< cds::opt::gc< rcu_type > > >
477                 ,cds::intrusive::opt::compare< my_data_cmp >
478             >::type
479         > option_based_set;
480
481         \endcode
482     */
483     template <
484         class RCU
485        ,typename T
486 #ifdef CDS_DOXYGEN_INVOKED
487        ,typename Traits = skip_list::traits
488 #else
489        ,typename Traits
490 #endif
491     >
492     class SkipListSet< cds::urcu::gc< RCU >, T, Traits >
493     {
494     public:
495         typedef cds::urcu::gc< RCU > gc; ///< Garbage collector
496         typedef T       value_type;      ///< type of value stored in the skip-list
497         typedef Traits  traits;          ///< Traits template parameter
498
499         typedef typename traits::hook    hook;      ///< hook type
500         typedef typename hook::node_type node_type; ///< node type
501
502 #   ifdef CDS_DOXYGEN_INVOKED
503         typedef implementation_defined key_comparator  ;    ///< key comparison functor based on \p Traits::compare and \p Traits::less
504 #   else
505         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
506 #   endif
507
508         typedef typename traits::disposer  disposer;   ///< disposer
509         typedef typename get_node_traits< value_type, node_type, hook>::type node_traits;    ///< node traits
510
511         typedef typename traits::item_counter  item_counter;   ///< Item counting policy used
512         typedef typename traits::memory_model  memory_model;   ///< Memory ordering, see \p cds::opt::memory_model option
513         typedef typename traits::random_level_generator    random_level_generator;   ///< random level generator
514         typedef typename traits::allocator     allocator_type; ///< allocator for maintaining array of next pointers of the node
515         typedef typename traits::back_off      back_off;       ///< Back-off strategy
516         typedef typename traits::stat          stat;           ///< internal statistics type
517         typedef typename traits::rcu_check_deadlock rcu_check_deadlock; ///< Deadlock checking policy
518         typedef typename gc::scoped_lock       rcu_lock;      ///< RCU scoped lock
519         static CDS_CONSTEXPR const bool c_bExtractLockExternal = false; ///< Group of \p extract_xxx functions does not require external locking
520
521
522         /// Max node height. The actual node height should be in range <tt>[0 .. c_nMaxHeight)</tt>
523         /**
524             The max height is specified by \ref skip_list::random_level_generator "random level generator" constant \p m_nUpperBound
525             but it should be no more than 32 (\ref skip_list::c_nHeightLimit).
526         */
527         static unsigned int const c_nMaxHeight = std::conditional<
528             (random_level_generator::c_nUpperBound <= skip_list::c_nHeightLimit),
529             std::integral_constant< unsigned int, random_level_generator::c_nUpperBound >,
530             std::integral_constant< unsigned int, skip_list::c_nHeightLimit >
531         >::type::value;
532
533         //@cond
534         static unsigned int const c_nMinHeight = 5;
535         //@endcond
536
537     protected:
538         typedef typename node_type::atomic_marked_ptr   atomic_node_ptr ;   ///< Atomic marked node pointer
539         typedef typename node_type::marked_ptr          marked_node_ptr ;   ///< Node marked pointer
540
541     protected:
542         //@cond
543         typedef skip_list::details::intrusive_node_builder< node_type, atomic_node_ptr, allocator_type > intrusive_node_builder;
544
545         typedef typename std::conditional<
546             std::is_same< typename traits::internal_node_builder, cds::opt::none >::value
547             ,intrusive_node_builder
548             ,typename traits::internal_node_builder
549         >::type node_builder;
550
551         typedef std::unique_ptr< node_type, typename node_builder::node_disposer >    scoped_node_ptr;
552
553         struct position {
554             node_type *   pPrev[ c_nMaxHeight ];
555             node_type *   pSucc[ c_nMaxHeight ];
556             node_type *   pNext[ c_nMaxHeight ];
557
558             node_type *   pCur;
559             node_type *   pDelChain;
560
561             position()
562                 : pDelChain( nullptr )
563             {}
564 #       ifdef _DEBUG
565             ~position()
566             {
567                 assert( pDelChain == nullptr );
568             }
569 #       endif
570         };
571
572         typedef cds::urcu::details::check_deadlock_policy< gc, rcu_check_deadlock>   check_deadlock_policy;
573         //@endcond
574
575     protected:
576         skip_list::details::head_node< node_type > m_Head;   ///< head tower (max height)
577
578         item_counter                m_ItemCounter;      ///< item counter
579         random_level_generator      m_RandomLevelGen;   ///< random level generator instance
580         atomics::atomic<unsigned int>    m_nHeight;     ///< estimated high level
581         atomics::atomic<node_type *>     m_pDeferredDelChain ;   ///< Deferred deleted node chain
582         mutable stat                m_Stat;             ///< internal statistics
583
584     protected:
585         //@cond
586         unsigned int random_level()
587         {
588             // Random generator produces a number from range [0..31]
589             // We need a number from range [1..32]
590             return m_RandomLevelGen() + 1;
591         }
592
593         template <typename Q>
594         node_type * build_node( Q v )
595         {
596             return node_builder::make_tower( v, m_RandomLevelGen );
597         }
598
599         static void dispose_node( value_type * pVal )
600         {
601             assert( pVal );
602
603             typename node_builder::node_disposer()( node_traits::to_node_ptr(pVal) );
604             disposer()( pVal );
605         }
606
607         struct node_disposer
608         {
609             void operator()( value_type * pVal )
610             {
611                 dispose_node( pVal );
612             }
613         };
614         //@endcond
615
616     public:
617         typedef cds::urcu::exempt_ptr< gc, value_type, value_type, node_disposer, void > exempt_ptr; ///< pointer to extracted node
618
619     protected:
620         //@cond
621
622         bool is_extracted( marked_node_ptr const p ) const
623         {
624             return (p.bits() & 2) != 0;
625         }
626
627         template <typename Q, typename Compare >
628         bool find_position( Q const& val, position& pos, Compare cmp, bool bStopIfFound )
629         {
630             assert( gc::is_locked() );
631
632             node_type * pPred;
633             marked_node_ptr pSucc;
634             marked_node_ptr pCur;
635             int nCmp = 1;
636
637         retry:
638             pPred = m_Head.head();
639
640             for ( int nLevel = static_cast<int>(c_nMaxHeight - 1); nLevel >= 0; --nLevel ) {
641
642                 while ( true ) {
643                     pCur = pPred->next( nLevel ).load( memory_model::memory_order_relaxed );
644                     if ( pCur.bits() ) {
645                         // pCur.bits() means that pPred is logically deleted
646                         goto retry;
647                     }
648
649                     if ( pCur.ptr() == nullptr ) {
650                         // end of the list at level nLevel - goto next level
651                         break;
652                     }
653
654                     // pSucc contains deletion mark for pCur
655                     pSucc = pCur->next( nLevel ).load( memory_model::memory_order_relaxed );
656
657                     if ( pPred->next( nLevel ).load( memory_model::memory_order_relaxed ).all() != pCur.ptr() )
658                         goto retry;
659
660                     if ( pSucc.bits() ) {
661                         // pCur is marked, i.e. logically deleted.
662                         marked_node_ptr p( pCur.ptr() );
663                         if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ),
664                              memory_model::memory_order_release, atomics::memory_order_relaxed ))
665                         {
666                             if ( nLevel == 0 ) {
667 #                       ifdef _DEBUG
668                                 pCur->m_bUnlinked = true;
669 #                       endif
670
671                                 if ( !is_extracted( pSucc )) {
672                                     // We cannot free the node at this moment since RCU is locked
673                                     // Link deleted nodes to a chain to free later
674                                     link_for_remove( pos, pCur.ptr() );
675                                     m_Stat.onEraseWhileFind();
676                                 }
677                                 else {
678                                     m_Stat.onExtractWhileFind();
679                                 }
680                             }
681                         }
682                         goto retry;
683                     }
684                     else {
685                         nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr()), val );
686                         if ( nCmp < 0 )
687                             pPred = pCur.ptr();
688                         else if ( nCmp == 0 && bStopIfFound )
689                             goto found;
690                         else
691                             break;
692                     }
693                 }
694
695                 // Next level
696                 pos.pPrev[ nLevel ] = pPred;
697                 pos.pSucc[ nLevel ] = pCur.ptr();
698             }
699
700             if ( nCmp != 0 )
701                 return false;
702
703         found:
704             pos.pCur = pCur.ptr();
705             return pCur.ptr() && nCmp == 0;
706         }
707
708         bool find_min_position( position& pos )
709         {
710             assert( gc::is_locked() );
711
712             node_type * pPred;
713             marked_node_ptr pSucc;
714             marked_node_ptr pCur;
715
716         retry:
717             pPred = m_Head.head();
718
719             for ( int nLevel = static_cast<int>(c_nMaxHeight - 1); nLevel >= 0; --nLevel ) {
720
721                 pCur = pPred->next( nLevel ).load( memory_model::memory_order_relaxed );
722                 // pCur.bits() means that pPred is logically deleted
723                 // head cannot be deleted
724                 assert( pCur.bits() == 0 );
725
726                 if ( pCur.ptr() ) {
727
728                     // pSucc contains deletion mark for pCur
729                     pSucc = pCur->next( nLevel ).load( memory_model::memory_order_relaxed );
730
731                     if ( pPred->next( nLevel ).load( memory_model::memory_order_relaxed ).all() != pCur.ptr() )
732                         goto retry;
733
734                     if ( pSucc.bits() ) {
735                         // pCur is marked, i.e. logically deleted.
736                         marked_node_ptr p( pCur.ptr() );
737                         if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ),
738                             memory_model::memory_order_release, atomics::memory_order_relaxed ))
739                         {
740                             if ( nLevel == 0 ) {
741 #                       ifdef _DEBUG
742                                 pCur->m_bUnlinked = true;
743 #                       endif
744
745                                 if ( !is_extracted( pSucc )) {
746                                     // We cannot free the node at this moment since RCU is locked
747                                     // Link deleted nodes to a chain to free later
748                                     link_for_remove( pos, pCur.ptr() );
749                                     m_Stat.onEraseWhileFind();
750                                 }
751                                 else {
752                                     m_Stat.onExtractWhileFind();
753                                 }
754                             }
755                         }
756                         goto retry;
757                     }
758                 }
759
760                 // Next level
761                 pos.pPrev[ nLevel ] = pPred;
762                 pos.pSucc[ nLevel ] = pCur.ptr();
763             }
764             return (pos.pCur = pCur.ptr()) != nullptr;
765         }
766
767         bool find_max_position( position& pos )
768         {
769             assert( gc::is_locked() );
770
771             node_type * pPred;
772             marked_node_ptr pSucc;
773             marked_node_ptr pCur;
774
775 retry:
776             pPred = m_Head.head();
777
778             for ( int nLevel = static_cast<int>(c_nMaxHeight - 1); nLevel >= 0; --nLevel ) {
779
780                 while ( true ) {
781                     pCur = pPred->next( nLevel ).load( memory_model::memory_order_relaxed );
782                     if ( pCur.bits() ) {
783                         // pCur.bits() means that pPred is logically deleted
784                         goto retry;
785                     }
786
787                     if ( pCur.ptr() == nullptr ) {
788                         // end of the list at level nLevel - goto next level
789                         break;
790                     }
791
792                     // pSucc contains deletion mark for pCur
793                     pSucc = pCur->next( nLevel ).load( memory_model::memory_order_relaxed );
794
795                     if ( pPred->next( nLevel ).load( memory_model::memory_order_relaxed ).all() != pCur.ptr() )
796                         goto retry;
797
798                     if ( pSucc.bits() ) {
799                         // pCur is marked, i.e. logically deleted.
800                         marked_node_ptr p( pCur.ptr() );
801                         if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ),
802                             memory_model::memory_order_release, atomics::memory_order_relaxed ))
803                         {
804                             if ( nLevel == 0 ) {
805 #                       ifdef _DEBUG
806                                 pCur->m_bUnlinked = true;
807 #                       endif
808
809                                 if ( !is_extracted( pSucc )) {
810                                     // We cannot free the node at this moment since RCU is locked
811                                     // Link deleted nodes to a chain to free later
812                                     link_for_remove( pos, pCur.ptr() );
813                                     m_Stat.onEraseWhileFind();
814                                 }
815                                 else {
816                                     m_Stat.onExtractWhileFind();
817                                 }
818                             }
819                         }
820                         goto retry;
821                     }
822                     else {
823                         if ( !pSucc.ptr() )
824                             break;
825
826                         pPred = pCur.ptr();
827                     }
828                 }
829
830                 // Next level
831                 pos.pPrev[ nLevel ] = pPred;
832                 pos.pSucc[ nLevel ] = pCur.ptr();
833             }
834
835             return (pos.pCur = pCur.ptr()) != nullptr;
836         }
837
838         template <typename Func>
839         bool insert_at_position( value_type& val, node_type * pNode, position& pos, Func f )
840         {
841             assert( gc::is_locked() );
842
843             unsigned int nHeight = pNode->height();
844             pNode->clear_tower();
845
846             {
847                 marked_node_ptr p( pos.pSucc[0] );
848                 pNode->next( 0 ).store( p, memory_model::memory_order_release );
849                 if ( !pos.pPrev[0]->next(0).compare_exchange_strong( p, marked_node_ptr(pNode), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
850                     return false;
851                 }
852 #       ifdef _DEBUG
853                 pNode->m_bLinked = true;
854 #       endif
855                 f( val );
856             }
857
858             for ( unsigned int nLevel = 1; nLevel < nHeight; ++nLevel ) {
859                 marked_node_ptr p;
860                 while ( true ) {
861                     marked_node_ptr q( pos.pSucc[ nLevel ]);
862                     if ( !pNode->next( nLevel ).compare_exchange_strong( p, q, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
863                         // pNode has been marked as removed while we are inserting it
864                         // Stop inserting
865                         assert( p.bits() );
866                         m_Stat.onLogicDeleteWhileInsert();
867                         return true;
868                     }
869                     p = q;
870                     if ( pos.pPrev[nLevel]->next(nLevel).compare_exchange_strong( q, marked_node_ptr( pNode ), memory_model::memory_order_release, atomics::memory_order_relaxed ) )
871                         break;
872
873                     // Renew insert position
874                     m_Stat.onRenewInsertPosition();
875                     if ( !find_position( val, pos, key_comparator(), false )) {
876                         // The node has been deleted while we are inserting it
877                         m_Stat.onNotFoundWhileInsert();
878                         return true;
879                     }
880                 }
881             }
882             return true;
883         }
884
885         static void link_for_remove( position& pos, node_type * pDel )
886         {
887             assert( pDel->m_pDelChain == nullptr );
888
889             pDel->m_pDelChain = pos.pDelChain;
890             pos.pDelChain = pDel;
891         }
892
893         template <typename Func>
894         bool try_remove_at( node_type * pDel, position& pos, Func f, bool bExtract )
895         {
896             assert( pDel != nullptr );
897             assert( gc::is_locked() );
898
899             marked_node_ptr pSucc;
900
901             // logical deletion (marking)
902             for ( unsigned int nLevel = pDel->height() - 1; nLevel > 0; --nLevel ) {
903                 pSucc = pDel->next(nLevel).load( memory_model::memory_order_relaxed );
904                 while ( true ) {
905                     if ( pSucc.bits()
906                       || pDel->next(nLevel).compare_exchange_weak( pSucc, pSucc | 1, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
907                     {
908                         break;
909                     }
910                 }
911             }
912
913             pSucc = pDel->next(0).load( memory_model::memory_order_relaxed );
914             while ( true ) {
915                 if ( pSucc.bits() )
916                     return false;
917
918                 int const nMask = bExtract ? 3 : 1;
919                 if ( pDel->next(0).compare_exchange_strong( pSucc, pSucc | nMask, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
920                 {
921                     f( *node_traits::to_value_ptr( pDel ));
922
923                     // physical deletion
924                     // try fast erase
925                     pSucc = pDel;
926                     for ( int nLevel = static_cast<int>( pDel->height() - 1 ); nLevel >= 0; --nLevel ) {
927                         if ( !pos.pPrev[nLevel]->next(nLevel).compare_exchange_strong( pSucc,
928                             marked_node_ptr( pDel->next(nLevel).load(memory_model::memory_order_relaxed).ptr() ),
929                             memory_model::memory_order_release, atomics::memory_order_relaxed) )
930                         {
931                             // Do slow erase
932                             find_position( *node_traits::to_value_ptr(pDel), pos, key_comparator(), false );
933                             if ( bExtract )
934                                 m_Stat.onSlowExtract();
935                             else
936                                 m_Stat.onSlowErase();
937 #                        ifdef _DEBUG
938                             assert( pDel->m_bUnlinked );
939 #                        endif
940                             return true;
941                         }
942                     }
943
944 #               ifdef _DEBUG
945                     pDel->m_bUnlinked = true;
946 #               endif
947                     if ( !bExtract ) {
948                         // We cannot free the node at this moment since RCU is locked
949                         // Link deleted nodes to a chain to free later
950                         link_for_remove( pos, pDel );
951                         m_Stat.onFastErase();
952                     }
953                     else
954                         m_Stat.onFastExtract();
955
956                     return true;
957                 }
958             }
959         }
960
961         enum finsd_fastpath_result {
962             find_fastpath_found,
963             find_fastpath_not_found,
964             find_fastpath_abort
965         };
966         template <typename Q, typename Compare, typename Func>
967         finsd_fastpath_result find_fastpath( Q& val, Compare cmp, Func f ) const
968         {
969             node_type * pPred;
970             marked_node_ptr pCur;
971             marked_node_ptr pSucc;
972             marked_node_ptr pNull;
973
974             back_off bkoff;
975
976             pPred = m_Head.head();
977             for ( int nLevel = static_cast<int>(m_nHeight.load(memory_model::memory_order_relaxed) - 1); nLevel >= 0; --nLevel ) {
978                 pCur = pPred->next(nLevel).load( memory_model::memory_order_acquire );
979                 if ( pCur == pNull )
980                     continue;
981
982                 while ( pCur != pNull ) {
983                     if ( pCur.bits() ) {
984                         // Wait until pCur is removed
985                         unsigned int nAttempt = 0;
986                         while ( pCur.bits() && nAttempt++ < 16 ) {
987                             bkoff();
988                             pCur = pPred->next(nLevel).load( memory_model::memory_order_acquire );
989                         }
990                         bkoff.reset();
991
992                         if ( pCur.bits() ) {
993                             // Maybe, we are on deleted node sequence
994                             // Abort searching, try slow-path
995                             return find_fastpath_abort;
996                         }
997                     }
998
999                     if ( pCur.ptr() ) {
1000                         int nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr() ), val );
1001                         if ( nCmp < 0 ) {
1002                             pPred = pCur.ptr();
1003                             pCur = pCur->next(nLevel).load( memory_model::memory_order_acquire );
1004                         }
1005                         else if ( nCmp == 0 ) {
1006                             // found
1007                             f( *node_traits::to_value_ptr( pCur.ptr() ), val );
1008                             return find_fastpath_found;
1009                         }
1010                         else // pCur > val - go down
1011                             break;
1012                     }
1013                 }
1014             }
1015
1016             return find_fastpath_not_found;
1017         }
1018
1019         template <typename Q, typename Compare, typename Func>
1020         bool find_slowpath( Q& val, Compare cmp, Func f, position& pos )
1021         {
1022             if ( find_position( val, pos, cmp, true )) {
1023                 assert( cmp( *node_traits::to_value_ptr( pos.pCur ), val ) == 0 );
1024
1025                 f( *node_traits::to_value_ptr( pos.pCur ), val );
1026                 return true;
1027             }
1028             else
1029                 return false;
1030         }
1031
1032         template <typename Q, typename Compare, typename Func>
1033         bool do_find_with( Q& val, Compare cmp, Func f )
1034         {
1035             position pos;
1036             bool bRet;
1037
1038             rcu_lock l;
1039
1040             switch ( find_fastpath( val, cmp, f )) {
1041             case find_fastpath_found:
1042                 m_Stat.onFindFastSuccess();
1043                 return true;
1044             case find_fastpath_not_found:
1045                 m_Stat.onFindFastFailed();
1046                 return false;
1047             default:
1048                 break;
1049             }
1050
1051             if ( find_slowpath( val, cmp, f, pos )) {
1052                 m_Stat.onFindSlowSuccess();
1053                 bRet = true;
1054             }
1055             else {
1056                 m_Stat.onFindSlowFailed();
1057                 bRet = false;
1058             }
1059
1060             defer_chain( pos );
1061
1062             return bRet;
1063         }
1064
1065         template <typename Q, typename Compare, typename Func>
1066         bool do_erase( Q const& val, Compare cmp, Func f )
1067         {
1068             check_deadlock_policy::check();
1069
1070             position pos;
1071             bool bRet;
1072
1073             {
1074                 rcu_lock rcuLock;
1075
1076                 if ( !find_position( val, pos, cmp, false ) ) {
1077                     m_Stat.onEraseFailed();
1078                     bRet = false;
1079                 }
1080                 else {
1081                     node_type * pDel = pos.pCur;
1082                     assert( cmp( *node_traits::to_value_ptr( pDel ), val ) == 0 );
1083
1084                     unsigned int nHeight = pDel->height();
1085                     if ( try_remove_at( pDel, pos, f, false )) {
1086                         --m_ItemCounter;
1087                         m_Stat.onRemoveNode( nHeight );
1088                         m_Stat.onEraseSuccess();
1089                         bRet = true;
1090                     }
1091                     else {
1092                         m_Stat.onEraseFailed();
1093                         bRet = false;
1094                     }
1095                 }
1096             }
1097
1098             dispose_chain( pos );
1099             return bRet;
1100         }
1101
1102         template <typename Q, typename Compare>
1103         value_type * do_extract_key( Q const& key, Compare cmp )
1104         {
1105             // RCU should be locked!!!
1106             assert( gc::is_locked() );
1107
1108             position pos;
1109             node_type * pDel;
1110
1111             if ( !find_position( key, pos, cmp, false ) ) {
1112                 m_Stat.onExtractFailed();
1113                 pDel = nullptr;
1114             }
1115             else {
1116                 pDel = pos.pCur;
1117                 assert( cmp( *node_traits::to_value_ptr( pDel ), key ) == 0 );
1118
1119                 unsigned int const nHeight = pDel->height();
1120
1121                 if ( try_remove_at( pDel, pos, [](value_type const&) {}, true )) {
1122                     --m_ItemCounter;
1123                     m_Stat.onRemoveNode( nHeight );
1124                     m_Stat.onExtractSuccess();
1125                 }
1126                 else {
1127                     m_Stat.onExtractFailed();
1128                     pDel = nullptr;
1129                 }
1130             }
1131
1132             defer_chain( pos );
1133             return pDel ? node_traits::to_value_ptr( pDel ) : nullptr;
1134         }
1135
1136         template <typename ExemptPtr, typename Q>
1137         bool do_extract( ExemptPtr& result, Q const& key )
1138         {
1139             check_deadlock_policy::check();
1140
1141             bool bReturn;
1142             {
1143                 rcu_lock l;
1144                 value_type * pDel = do_extract_key( key, key_comparator() );
1145                 bReturn = pDel != nullptr;
1146                 if ( bReturn )
1147                     result = pDel;
1148             }
1149
1150             dispose_deferred();
1151             return bReturn;
1152         }
1153
1154         template <typename ExemptPtr, typename Q, typename Less>
1155         bool do_extract_with( ExemptPtr& result, Q const& key, Less pred )
1156         {
1157             check_deadlock_policy::check();
1158
1159             bool bReturn;
1160             {
1161                 rcu_lock l;
1162                 value_type * pDel = do_extract_key( key, cds::opt::details::make_comparator_from_less<Less>() );
1163                 bReturn = pDel != nullptr;
1164                 if ( bReturn )
1165                     result = pDel;
1166             }
1167
1168             dispose_deferred();
1169             return bReturn;
1170         }
1171
1172         node_type * do_extract_min()
1173         {
1174             assert( gc::is_locked() );
1175
1176             position pos;
1177             node_type * pDel;
1178
1179             if ( !find_min_position( pos ) ) {
1180                 m_Stat.onExtractMinFailed();
1181                 pDel = nullptr;
1182             }
1183             else {
1184                 pDel = pos.pCur;
1185                 unsigned int const nHeight = pDel->height();
1186
1187                 if ( try_remove_at( pDel, pos, [](value_type const&) {}, true )) {
1188                     --m_ItemCounter;
1189                     m_Stat.onRemoveNode( nHeight );
1190                     m_Stat.onExtractMinSuccess();
1191                 }
1192                 else {
1193                     m_Stat.onExtractMinFailed();
1194                     pDel = nullptr;
1195                 }
1196             }
1197
1198             defer_chain( pos );
1199             return pDel;
1200         }
1201
1202         template <typename ExemptPtr>
1203         bool do_extract_min( ExemptPtr& result )
1204         {
1205             check_deadlock_policy::check();
1206
1207             bool bReturn;
1208             {
1209                 rcu_lock l;
1210                 node_type * pDel = do_extract_min();
1211                 bReturn = pDel != nullptr;
1212                 if ( bReturn )
1213                     result = node_traits::to_value_ptr(pDel);
1214             }
1215
1216             dispose_deferred();
1217             return bReturn;
1218         }
1219
1220         node_type * do_extract_max()
1221         {
1222             assert( gc::is_locked() );
1223
1224             position pos;
1225             node_type * pDel;
1226
1227             if ( !find_max_position( pos ) ) {
1228                 m_Stat.onExtractMaxFailed();
1229                 pDel = nullptr;
1230             }
1231             else {
1232                 pDel = pos.pCur;
1233                 unsigned int const nHeight = pDel->height();
1234
1235                 if ( try_remove_at( pDel, pos, [](value_type const&) {}, true )) {
1236                     --m_ItemCounter;
1237                     m_Stat.onRemoveNode( nHeight );
1238                     m_Stat.onExtractMaxSuccess();
1239                 }
1240                 else {
1241                     m_Stat.onExtractMaxFailed();
1242                     pDel = nullptr;
1243                 }
1244             }
1245
1246             defer_chain( pos );
1247             return pDel;
1248         }
1249
1250         template <typename ExemptPtr>
1251         bool do_extract_max( ExemptPtr& result )
1252         {
1253             check_deadlock_policy::check();
1254
1255             bool bReturn;
1256             {
1257                 rcu_lock l;
1258                 node_type * pDel = do_extract_max();
1259                 bReturn = pDel != nullptr;
1260                 if ( bReturn )
1261                     result = node_traits::to_value_ptr(pDel);
1262             }
1263
1264             dispose_deferred();
1265             return bReturn;
1266         }
1267
1268         void increase_height( unsigned int nHeight )
1269         {
1270             unsigned int nCur = m_nHeight.load( memory_model::memory_order_relaxed );
1271             if ( nCur < nHeight )
1272                 m_nHeight.compare_exchange_strong( nCur, nHeight, memory_model::memory_order_release, atomics::memory_order_relaxed );
1273         }
1274
1275         class deferred_list_iterator
1276         {
1277             node_type * pCur;
1278         public:
1279             explicit deferred_list_iterator( node_type * p )
1280                 : pCur(p)
1281             {}
1282             deferred_list_iterator()
1283                 : pCur( nullptr )
1284             {}
1285
1286             cds::urcu::retired_ptr operator *() const
1287             {
1288                 return cds::urcu::retired_ptr( node_traits::to_value_ptr(pCur), dispose_node );
1289             }
1290
1291             void operator ++()
1292             {
1293                 pCur = pCur->m_pDelChain;
1294             }
1295
1296             bool operator ==( deferred_list_iterator const& i ) const
1297             {
1298                 return pCur == i.pCur;
1299             }
1300             bool operator !=( deferred_list_iterator const& i ) const
1301             {
1302                 return !operator ==( i );
1303             }
1304         };
1305
1306         void dispose_chain( node_type * pHead )
1307         {
1308             // RCU should NOT be locked
1309             check_deadlock_policy::check();
1310
1311             gc::batch_retire( deferred_list_iterator( pHead ), deferred_list_iterator() );
1312         }
1313
1314         void dispose_chain( position& pos )
1315         {
1316             // RCU should NOT be locked
1317             check_deadlock_policy::check();
1318
1319             // Delete local chain
1320             if ( pos.pDelChain ) {
1321                 dispose_chain( pos.pDelChain );
1322                 pos.pDelChain = nullptr;
1323             }
1324
1325             // Delete deferred chain
1326             dispose_deferred();
1327         }
1328
1329         void dispose_deferred()
1330         {
1331             dispose_chain( m_pDeferredDelChain.exchange( nullptr, memory_model::memory_order_acq_rel ) );
1332         }
1333
1334         void defer_chain( position& pos )
1335         {
1336             if ( pos.pDelChain ) {
1337                 node_type * pHead = pos.pDelChain;
1338                 node_type * pTail = pHead;
1339                 while ( pTail->m_pDelChain )
1340                     pTail = pTail->m_pDelChain;
1341
1342                 node_type * pDeferList = m_pDeferredDelChain.load( memory_model::memory_order_relaxed );
1343                 do {
1344                     pTail->m_pDelChain = pDeferList;
1345                 } while ( !m_pDeferredDelChain.compare_exchange_weak( pDeferList, pHead, memory_model::memory_order_acq_rel, atomics::memory_order_relaxed ));
1346
1347                 pos.pDelChain = nullptr;
1348             }
1349         }
1350
1351         //@endcond
1352
1353     public:
1354         /// Default constructor
1355         SkipListSet()
1356             : m_Head( c_nMaxHeight )
1357             , m_nHeight( c_nMinHeight )
1358             , m_pDeferredDelChain( nullptr )
1359         {
1360             static_assert( (std::is_same< gc, typename node_type::gc >::value), "GC and node_type::gc must be the same type" );
1361
1362             // Barrier for head node
1363             atomics::atomic_thread_fence( memory_model::memory_order_release );
1364         }
1365
1366         /// Clears and destructs the skip-list
1367         ~SkipListSet()
1368         {
1369             clear();
1370         }
1371
1372     public:
1373         /// Iterator type
1374         typedef skip_list::details::iterator< gc, node_traits, back_off, false >  iterator;
1375
1376         /// Const iterator type
1377         typedef skip_list::details::iterator< gc, node_traits, back_off, true >   const_iterator;
1378
1379         /// Returns a forward iterator addressing the first element in a set
1380         iterator begin()
1381         {
1382             return iterator( *m_Head.head() );
1383         }
1384
1385         /// Returns a forward const iterator addressing the first element in a set
1386         const_iterator begin() const
1387         {
1388             return const_iterator( *m_Head.head() );
1389         }
1390
1391         /// Returns a forward const iterator addressing the first element in a set
1392         const_iterator cbegin() const
1393         {
1394             return const_iterator( *m_Head.head() );
1395         }
1396
1397         /// Returns a forward iterator that addresses the location succeeding the last element in a set.
1398         iterator end()
1399         {
1400             return iterator();
1401         }
1402
1403         /// Returns a forward const iterator that addresses the location succeeding the last element in a set.
1404         const_iterator end() const
1405         {
1406             return const_iterator();
1407         }
1408
1409         /// Returns a forward const iterator that addresses the location succeeding the last element in a set.
1410         const_iterator cend() const
1411         {
1412             return const_iterator();
1413         }
1414
1415     public:
1416         /// Inserts new node
1417         /**
1418             The function inserts \p val in the set if it does not contain
1419             an item with key equal to \p val.
1420
1421             The function applies RCU lock internally.
1422
1423             Returns \p true if \p val is placed into the set, \p false otherwise.
1424         */
1425         bool insert( value_type& val )
1426         {
1427             return insert( val, []( value_type& ) {} );
1428         }
1429
1430         /// Inserts new node
1431         /**
1432             This function is intended for derived non-intrusive containers.
1433
1434             The function allows to split creating of new item into two part:
1435             - create item with key only
1436             - insert new item into the set
1437             - if inserting is success, calls  \p f functor to initialize value-field of \p val.
1438
1439             The functor signature is:
1440             \code
1441                 void func( value_type& val );
1442             \endcode
1443             where \p val is the item inserted. User-defined functor \p f should guarantee that during changing
1444             \p val no any other changes could be made on this set's item by concurrent threads.
1445             The user-defined functor is called only if the inserting is success.
1446
1447             RCU \p synchronize method can be called. RCU should not be locked.
1448         */
1449         template <typename Func>
1450         bool insert( value_type& val, Func f )
1451         {
1452             check_deadlock_policy::check();
1453
1454             position pos;
1455             bool bRet;
1456
1457             {
1458                 node_type * pNode = node_traits::to_node_ptr( val );
1459                 scoped_node_ptr scp( pNode );
1460                 unsigned int nHeight = pNode->height();
1461                 bool bTowerOk = nHeight > 1 && pNode->get_tower() != nullptr;
1462                 bool bTowerMade = false;
1463
1464                 rcu_lock rcuLock;
1465
1466                 while ( true )
1467                 {
1468                     bool bFound = find_position( val, pos, key_comparator(), true );
1469                     if ( bFound ) {
1470                         // scoped_node_ptr deletes the node tower if we create it
1471                         if ( !bTowerMade )
1472                             scp.release();
1473
1474                         m_Stat.onInsertFailed();
1475                         bRet = false;
1476                         break;
1477                     }
1478
1479                     if ( !bTowerOk ) {
1480                         build_node( pNode );
1481                         nHeight = pNode->height();
1482                         bTowerMade =
1483                             bTowerOk = true;
1484                     }
1485
1486                     if ( !insert_at_position( val, pNode, pos, f )) {
1487                         m_Stat.onInsertRetry();
1488                         continue;
1489                     }
1490
1491                     increase_height( nHeight );
1492                     ++m_ItemCounter;
1493                     m_Stat.onAddNode( nHeight );
1494                     m_Stat.onInsertSuccess();
1495                     scp.release();
1496                     bRet =  true;
1497                     break;
1498                 }
1499             }
1500
1501             dispose_chain( pos );
1502
1503             return bRet;
1504         }
1505
1506         /// Ensures that the \p val exists in the set
1507         /**
1508             The operation performs inserting or changing data with lock-free manner.
1509
1510             If the item \p val is not found in the set, then \p val is inserted into the set.
1511             Otherwise, the functor \p func is called with item found.
1512             The functor signature is:
1513             \code
1514                 void func( bool bNew, value_type& item, value_type& val );
1515             \endcode
1516             with arguments:
1517             - \p bNew - \p true if the item has been inserted, \p false otherwise
1518             - \p item - item of the set
1519             - \p val - argument \p val passed into the \p %ensure() function
1520             If new item has been inserted (i.e. \p bNew is \p true) then \p item and \p val arguments
1521             refer to the same thing.
1522
1523             The functor can change non-key fields of the \p item; however, \p func must guarantee
1524             that during changing no any other modifications could be made on this item by concurrent threads.
1525
1526             RCU \p synchronize method can be called. RCU should not be locked.
1527
1528             Returns std::pair<bool, bool> where \p first is \p true if operation is successfull,
1529             \p second is \p true if new item has been added or \p false if the item with \p key
1530             already is in the set.
1531
1532             @warning See \ref cds_intrusive_item_creating "insert item troubleshooting"
1533         */
1534         template <typename Func>
1535         std::pair<bool, bool> ensure( value_type& val, Func func )
1536         {
1537             check_deadlock_policy::check();
1538
1539             position pos;
1540             std::pair<bool, bool> bRet( true, false );
1541
1542             {
1543                 node_type * pNode = node_traits::to_node_ptr( val );
1544                 scoped_node_ptr scp( pNode );
1545                 unsigned int nHeight = pNode->height();
1546                 bool bTowerOk = nHeight > 1 && pNode->get_tower() != nullptr;
1547                 bool bTowerMade = false;
1548
1549                 rcu_lock rcuLock;
1550                 while ( true )
1551                 {
1552                     bool bFound = find_position( val, pos, key_comparator(), true );
1553                     if ( bFound ) {
1554                         // scoped_node_ptr deletes the node tower if we create it before
1555                         if ( !bTowerMade )
1556                             scp.release();
1557
1558                         func( false, *node_traits::to_value_ptr(pos.pCur), val );
1559                         m_Stat.onEnsureExist();
1560                         break;
1561                     }
1562
1563                     if ( !bTowerOk ) {
1564                         build_node( pNode );
1565                         nHeight = pNode->height();
1566                         bTowerMade =
1567                             bTowerOk = true;
1568                     }
1569
1570                     if ( !insert_at_position( val, pNode, pos, [&func]( value_type& item ) { func( true, item, item ); })) {
1571                         m_Stat.onInsertRetry();
1572                         continue;
1573                     }
1574
1575                     increase_height( nHeight );
1576                     ++m_ItemCounter;
1577                     scp.release();
1578                     m_Stat.onAddNode( nHeight );
1579                     m_Stat.onEnsureNew();
1580                     bRet.second = true;
1581                     break;
1582                 }
1583             }
1584
1585             dispose_chain( pos );
1586
1587             return bRet;
1588         }
1589
1590         /// Unlinks the item \p val from the set
1591         /**
1592             The function searches the item \p val in the set and unlink it from the set
1593             if it is found and is equal to \p val.
1594
1595             Difference between \p erase() and \p %unlink() functions: \p erase() finds <i>a key</i>
1596             and deletes the item found. \p %unlink() searches an item by key and deletes it
1597             only if \p val is an item of that set, i.e. the pointer to item found
1598             is equal to <tt> &val </tt>.
1599
1600             RCU \p synchronize method can be called. RCU should not be locked.
1601
1602             The \ref disposer specified in \p Traits class template parameter is called
1603             by garbage collector \p GC asynchronously.
1604
1605             The function returns \p true if success and \p false otherwise.
1606         */
1607         bool unlink( value_type& val )
1608         {
1609             check_deadlock_policy::check();
1610
1611             position pos;
1612             bool bRet;
1613
1614             {
1615                 rcu_lock rcuLock;
1616
1617                 if ( !find_position( val, pos, key_comparator(), false ) ) {
1618                     m_Stat.onUnlinkFailed();
1619                     bRet = false;
1620                 }
1621                 else {
1622                     node_type * pDel = pos.pCur;
1623                     assert( key_comparator()( *node_traits::to_value_ptr( pDel ), val ) == 0 );
1624
1625                     unsigned int nHeight = pDel->height();
1626
1627                     if ( node_traits::to_value_ptr( pDel ) == &val && try_remove_at( pDel, pos, [](value_type const&) {}, false )) {
1628                         --m_ItemCounter;
1629                         m_Stat.onRemoveNode( nHeight );
1630                         m_Stat.onUnlinkSuccess();
1631                         bRet = true;
1632                     }
1633                     else {
1634                         m_Stat.onUnlinkFailed();
1635                         bRet = false;
1636                     }
1637                 }
1638             }
1639
1640             dispose_chain( pos );
1641
1642             return bRet;
1643         }
1644
1645         /// Extracts the item from the set with specified \p key
1646         /** \anchor cds_intrusive_SkipListSet_rcu_extract
1647             The function searches an item with key equal to \p key in the set,
1648             unlinks it from the set, places it to \p result parameter, and returns \p true.
1649             If the item with key equal to \p key is not found the function returns \p false.
1650
1651             Note the compare functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1652
1653             RCU \p synchronize method can be called. RCU should NOT be locked.
1654             The function does not call the disposer for the item found.
1655             The disposer will be implicitly invoked when \p result object is destroyed or when
1656             <tt>result.release()</tt> is called, see cds::urcu::exempt_ptr for explanation.
1657             @note Before reusing \p result object you should call its \p release() method.
1658             Example:
1659             \code
1660             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1661             skip_list theList;
1662             // ...
1663
1664             typename skip_list::exempt_ptr ep;
1665             if ( theList.extract( ep, 5 ) ) {
1666                 // Deal with ep
1667                 //...
1668
1669                 // Dispose returned item.
1670                 ep.release();
1671             }
1672             \endcode
1673         */
1674         template <typename Q>
1675         bool extract( exempt_ptr& result, Q const& key )
1676         {
1677             return do_extract( result, key );
1678         }
1679
1680         /// Extracts the item from the set with comparing functor \p pred
1681         /**
1682             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_extract "extract(exempt_ptr&, Q const&)"
1683             but \p pred predicate is used for key comparing.
1684             \p Less has the interface like \p std::less.
1685             \p pred must imply the same element order as the comparator used for building the set.
1686         */
1687         template <typename Q, typename Less>
1688         bool extract_with( exempt_ptr& result, Q const& key, Less pred )
1689         {
1690             return do_extract_with( result, key, pred );
1691         }
1692
1693         /// Extracts an item with minimal key from the list
1694         /**
1695             The function searches an item with minimal key, unlinks it, and returns the item found in \p result parameter.
1696             If the skip-list is empty the function returns \p false.
1697
1698             RCU \p synchronize method can be called. RCU should NOT be locked.
1699             The function does not call the disposer for the item found.
1700             The disposer will be implicitly invoked when \p result object is destroyed or when
1701             <tt>result.release()</tt> is called, see cds::urcu::exempt_ptr for explanation.
1702             @note Before reusing \p result object you should call its \p release() method.
1703             Example:
1704             \code
1705             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1706             skip_list theList;
1707             // ...
1708
1709             typename skip_list::exempt_ptr ep;
1710             if ( theList.extract_min(ep)) {
1711                 // Deal with ep
1712                 //...
1713
1714                 // Dispose returned item.
1715                 ep.release();
1716             }
1717             \endcode
1718
1719             @note Due the concurrent nature of the list, the function extracts <i>nearly</i> minimum key.
1720             It means that the function gets leftmost item and tries to unlink it.
1721             During unlinking, a concurrent thread may insert an item with key less than leftmost item's key.
1722             So, the function returns the item with minimum key at the moment of list traversing.
1723         */
1724         bool extract_min( exempt_ptr& result )
1725         {
1726             return do_extract_min( result );
1727         }
1728
1729         /// Extracts an item with maximal key from the list
1730         /**
1731             The function searches an item with maximal key, unlinks it, and returns the item found in \p result parameter.
1732             If the skip-list is empty the function returns \p false.
1733
1734             RCU \p synchronize method can be called. RCU should NOT be locked.
1735             The function does not call the disposer for the item found.
1736             The disposer will be implicitly invoked when \p result object is destroyed or when
1737             <tt>result.release()</tt> is called, see cds::urcu::exempt_ptr for explanation.
1738             @note Before reusing \p result object you should call its \p release() method.
1739             Example:
1740             \code
1741             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1742             skip_list theList;
1743             // ...
1744
1745             typename skip_list::exempt_ptr ep;
1746             if ( theList.extract_max(ep) ) {
1747                 // Deal with ep
1748                 //...
1749                 // Dispose returned item.
1750                 ep.release();
1751             }
1752             \endcode
1753
1754             @note Due the concurrent nature of the list, the function extracts <i>nearly</i> maximal key.
1755             It means that the function gets rightmost item and tries to unlink it.
1756             During unlinking, a concurrent thread can insert an item with key greater than rightmost item's key.
1757             So, the function returns the item with maximum key at the moment of list traversing.
1758         */
1759         bool extract_max( exempt_ptr& result )
1760         {
1761             return do_extract_max( result );
1762         }
1763
1764         /// Deletes the item from the set
1765         /** \anchor cds_intrusive_SkipListSet_rcu_erase
1766             The function searches an item with key equal to \p key in the set,
1767             unlinks it from the set, and returns \p true.
1768             If the item with key equal to \p key is not found the function return \p false.
1769
1770             Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1771
1772             RCU \p synchronize method can be called. RCU should not be locked.
1773         */
1774         template <typename Q>
1775         bool erase( const Q& key )
1776         {
1777             return do_erase( key, key_comparator(), [](value_type const&) {} );
1778         }
1779
1780         /// Delete the item from the set with comparing functor \p pred
1781         /**
1782             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_erase "erase(Q const&)"
1783             but \p pred predicate is used for key comparing.
1784             \p Less has the interface like \p std::less.
1785             \p pred must imply the same element order as the comparator used for building the set.
1786         */
1787         template <typename Q, typename Less>
1788         bool erase_with( const Q& key, Less pred )
1789         {
1790             return do_erase( key, cds::opt::details::make_comparator_from_less<Less>(), [](value_type const&) {} );
1791         }
1792
1793         /// Deletes the item from the set
1794         /** \anchor cds_intrusive_SkipListSet_rcu_erase_func
1795             The function searches an item with key equal to \p key in the set,
1796             call \p f functor with item found, unlinks it from the set, and returns \p true.
1797             The \ref disposer specified in \p Traits class template parameter is called
1798             by garbage collector \p GC asynchronously.
1799
1800             The \p Func interface is
1801             \code
1802             struct functor {
1803                 void operator()( value_type const& item );
1804             };
1805             \endcode
1806             If the item with key equal to \p key is not found the function return \p false.
1807
1808             Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1809
1810             RCU \p synchronize method can be called. RCU should not be locked.
1811         */
1812         template <typename Q, typename Func>
1813         bool erase( Q const& key, Func f )
1814         {
1815             return do_erase( key, key_comparator(), f );
1816         }
1817
1818         /// Delete the item from the set with comparing functor \p pred
1819         /**
1820             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_erase_func "erase(Q const&, Func)"
1821             but \p pred predicate is used for key comparing.
1822             \p Less has the interface like \p std::less.
1823             \p pred must imply the same element order as the comparator used for building the set.
1824         */
1825         template <typename Q, typename Less, typename Func>
1826         bool erase_with( Q const& key, Less pred, Func f )
1827         {
1828             return do_erase( key, cds::opt::details::make_comparator_from_less<Less>(), f );
1829         }
1830
1831         /// Finds \p key
1832         /** @anchor cds_intrusive_SkipListSet_rcu_find_func
1833             The function searches the item with key equal to \p key and calls the functor \p f for item found.
1834             The interface of \p Func functor is:
1835             \code
1836             struct functor {
1837                 void operator()( value_type& item, Q& key );
1838             };
1839             \endcode
1840             where \p item is the item found, \p key is the <tt>find</tt> function argument.
1841
1842             The functor can change non-key fields of \p item. Note that the functor is only guarantee
1843             that \p item cannot be disposed during functor is executing.
1844             The functor does not serialize simultaneous access to the set \p item. If such access is
1845             possible you must provide your own synchronization schema on item level to exclude unsafe item modifications.
1846
1847             The \p key argument is non-const since it can be used as \p f functor destination i.e., the functor
1848             can modify both arguments.
1849
1850             The function applies RCU lock internally.
1851
1852             The function returns \p true if \p key is found, \p false otherwise.
1853         */
1854         template <typename Q, typename Func>
1855         bool find( Q& key, Func f )
1856         {
1857             return do_find_with( key, key_comparator(), f );
1858         }
1859
1860         /// Finds the key \p key with comparing functor \p pred
1861         /**
1862             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_find_func "find(Q&, Func)"
1863             but \p cmp is used for key comparison.
1864             \p Less functor has the interface like \p std::less.
1865             \p cmp must imply the same element order as the comparator used for building the set.
1866         */
1867         template <typename Q, typename Less, typename Func>
1868         bool find_with( Q& key, Less pred, Func f )
1869         {
1870             return do_find_with( key, cds::opt::details::make_comparator_from_less<Less>(), f );
1871         }
1872
1873         /// Finds \p key
1874         /** @anchor cds_intrusive_SkipListSet_rcu_find_val
1875             The function searches the item with key equal to \p key
1876             and returns \p true if it is found, and \p false otherwise.
1877
1878             The function applies RCU lock internally.
1879         */
1880         template <typename Q>
1881         bool find( Q const& key )
1882         {
1883             return do_find_with( key, key_comparator(), [](value_type& , Q const& ) {} );
1884         }
1885
1886         /// Finds \p key with comparing functor \p pred
1887         /**
1888             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_find_val "find(Q const&)"
1889             but \p pred is used for key compare.
1890             \p Less functor has the interface like \p std::less.
1891             \p pred must imply the same element order as the comparator used for building the set.
1892         */
1893         template <typename Q, typename Less>
1894         bool find_with( Q const& key, Less pred )
1895         {
1896             return do_find_with( key, cds::opt::details::make_comparator_from_less<Less>(), [](value_type& , Q const& ) {} );
1897         }
1898
1899         /// Finds \p key and return the item found
1900         /** \anchor cds_intrusive_SkipListSet_rcu_get
1901             The function searches the item with key equal to \p key and returns the pointer to item found.
1902             If \p key is not found it returns \p nullptr.
1903
1904             Note the compare functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1905
1906             RCU should be locked before call of this function.
1907             Returned item is valid only while RCU is locked:
1908             \code
1909             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1910             skip_list theList;
1911             // ...
1912             {
1913                 // Lock RCU
1914                 skip_list::rcu_lock lock;
1915
1916                 foo * pVal = theList.get( 5 );
1917                 if ( pVal ) {
1918                     // Deal with pVal
1919                     //...
1920                 }
1921             }
1922             // Unlock RCU by rcu_lock destructor
1923             // pVal can be retired by disposer at any time after RCU has been unlocked
1924             \endcode
1925
1926             After RCU unlocking the \p %force_dispose member function can be called manually,
1927             see \ref force_dispose for explanation.
1928         */
1929         template <typename Q>
1930         value_type * get( Q const& key )
1931         {
1932             assert( gc::is_locked());
1933
1934             value_type * pFound;
1935             return do_find_with( key, key_comparator(), [&pFound](value_type& found, Q const& ) { pFound = &found; } )
1936                 ? pFound : nullptr;
1937         }
1938
1939         /// Finds \p key and return the item found
1940         /**
1941             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_get "get(Q const&)"
1942             but \p pred is used for comparing the keys.
1943
1944             \p Less functor has the semantics like \p std::less but should take arguments of type \ref value_type and \p Q
1945             in any order.
1946             \p pred must imply the same element order as the comparator used for building the set.
1947         */
1948         template <typename Q, typename Less>
1949         value_type * get_with( Q const& key, Less pred )
1950         {
1951             assert( gc::is_locked());
1952
1953             value_type * pFound;
1954             return do_find_with( key, cds::opt::details::make_comparator_from_less<Less>(),
1955                 [&pFound](value_type& found, Q const& ) { pFound = &found; } )
1956                 ? pFound : nullptr;
1957         }
1958
1959         /// Returns item count in the set
1960         /**
1961             The value returned depends on item counter type provided by \p Traits template parameter.
1962             For \p atomicity::empty_item_counter the function always returns 0.
1963             Therefore, the function is not suitable for checking the set emptiness, use \p empty()
1964             member function for this purpose.
1965         */
1966         size_t size() const
1967         {
1968             return m_ItemCounter;
1969         }
1970
1971         /// Checks if the set is empty
1972         bool empty() const
1973         {
1974             return m_Head.head()->next( 0 ).load( memory_model::memory_order_relaxed ) == nullptr;
1975         }
1976
1977         /// Clears the set (not atomic)
1978         /**
1979             The function unlink all items from the set.
1980             The function is not atomic, thus, in multi-threaded environment with parallel insertions
1981             this sequence
1982             \code
1983             set.clear();
1984             assert( set.empty() );
1985             \endcode
1986             the assertion could be raised.
1987
1988             For each item the \p disposer will be called automatically after unlinking.
1989         */
1990         void clear()
1991         {
1992             exempt_ptr ep;
1993             while ( extract_min(ep) )
1994                 ep.release();
1995         }
1996
1997         /// Returns maximum height of skip-list. The max height is a constant for each object and does not exceed 32.
1998         static CDS_CONSTEXPR unsigned int max_height() CDS_NOEXCEPT
1999         {
2000             return c_nMaxHeight;
2001         }
2002
2003         /// Returns const reference to internal statistics
2004         stat const& statistics() const
2005         {
2006             return m_Stat;
2007         }
2008
2009         /// Clears internal list of ready-to-remove items passing it to RCU reclamation cycle
2010         /** @anchor cds_intrusive_SkipListSet_rcu_force_dispose
2011             Skip list has complex multi-step algorithm for removing an item. In fact, when you
2012             remove the item it is just marked as removed that is enough for the success of your operation.
2013             Actual removing can take place in the future, in another call or even in another thread.
2014             Inside RCU lock the removed item cannot be passed to RCU reclamation cycle
2015             since it can lead to deadlock. To solve this problem, the current skip list implementation
2016             has internal list of items which is ready to remove but is not yet passed to RCU reclamation.
2017             Usually, this list will be passed to RCU reclamation in the next suitable call of skip list member function.
2018             In some cases we want to pass it to RCU reclamation immediately after RCU unlocking.
2019             This function provides such opportunity: it checks whether the RCU is not locked and if it is true
2020             the function passes the internal ready-to-remove list to RCU reclamation cycle.
2021
2022             The RCU \p synchronize can be called.
2023         */
2024         void force_dispose()
2025         {
2026             if ( !gc::is_locked() )
2027                 dispose_deferred();
2028         }
2029     };
2030
2031 }} // namespace cds::intrusive
2032
2033
2034 #endif // #ifndef __CDS_INTRUSIVE_SKIP_LIST_RCU_H