Merge branch 'integration' into dev
[libcds.git] / cds / intrusive / skip_list_rcu.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_INTRUSIVE_SKIP_LIST_RCU_H
4 #define CDSLIB_INTRUSIVE_SKIP_LIST_RCU_H
5
6 #include <type_traits>
7 #include <memory>
8 #include <cds/intrusive/details/skip_list_base.h>
9 #include <cds/opt/compare.h>
10 #include <cds/urcu/details/check_deadlock.h>
11 #include <cds/details/binary_functor_wrapper.h>
12 #include <cds/urcu/exempt_ptr.h>
13
14
15 namespace cds { namespace intrusive {
16
17     //@cond
18     namespace skip_list {
19
20         template <class RCU, typename Tag>
21         class node< cds::urcu::gc< RCU >, Tag >
22         {
23         public:
24             typedef cds::urcu::gc< RCU >    gc; ///< Garbage collector
25             typedef Tag     tag               ; ///< tag
26
27             // Mark bits:
28             //  bit 0 - the item is logically deleted
29             //  bit 1 - the item is extracted (only for level 0)
30             typedef cds::details::marked_ptr<node, 3> marked_ptr;        ///< marked pointer
31             typedef atomics::atomic< marked_ptr >     atomic_marked_ptr; ///< atomic marked pointer
32             typedef atomic_marked_ptr                 tower_item_type;
33
34         protected:
35             atomic_marked_ptr       m_pNext;     ///< Next item in bottom-list (list at level 0)
36         public:
37             node *                  m_pDelChain; ///< Deleted node chain (local for a thread)
38 #       ifdef _DEBUG
39             bool volatile           m_bLinked;
40             bool volatile           m_bUnlinked;
41 #       endif
42         protected:
43             unsigned int            m_nHeight;   ///< Node height (size of m_arrNext array). For node at level 0 the height is 1.
44             atomic_marked_ptr *     m_arrNext;   ///< Array of next items for levels 1 .. m_nHeight - 1. For node at level 0 \p m_arrNext is \p nullptr
45
46         public:
47             /// Constructs a node of height 1 (a bottom-list node)
48             CDS_CONSTEXPR node()
49                 : m_pNext( nullptr )
50                 , m_pDelChain( nullptr )
51 #       ifdef _DEBUG
52                 , m_bLinked( false )
53                 , m_bUnlinked( false )
54 #       endif
55                 , m_nHeight(1)
56                 , m_arrNext( nullptr )
57             {}
58
59 #       ifdef _DEBUG
60             ~node()
61             {
62                 assert( !m_bLinked || m_bUnlinked );
63             }
64 #       endif
65
66             /// Constructs a node of height \p nHeight
67             void make_tower( unsigned int nHeight, atomic_marked_ptr * nextTower )
68             {
69                 assert( nHeight > 0 );
70                 assert( (nHeight == 1 && nextTower == nullptr)  // bottom-list node
71                         || (nHeight > 1 && nextTower != nullptr)   // node at level of more than 0
72                     );
73
74                 m_arrNext = nextTower;
75                 m_nHeight = nHeight;
76             }
77
78             atomic_marked_ptr * release_tower()
79             {
80                 atomic_marked_ptr * pTower = m_arrNext;
81                 m_arrNext = nullptr;
82                 m_nHeight = 1;
83                 return pTower;
84             }
85
86             atomic_marked_ptr * get_tower() const
87             {
88                 return m_arrNext;
89             }
90
91             void clear_tower()
92             {
93                 for ( unsigned int nLevel = 1; nLevel < m_nHeight; ++nLevel )
94                     next(nLevel).store( marked_ptr(), atomics::memory_order_relaxed );
95             }
96
97             /// Access to element of next pointer array
98             atomic_marked_ptr& next( unsigned int nLevel )
99             {
100                 assert( nLevel < height() );
101                 assert( nLevel == 0 || (nLevel > 0 && m_arrNext != nullptr) );
102
103                 return nLevel ? m_arrNext[nLevel - 1] : m_pNext;
104             }
105
106             /// Access to element of next pointer array (const version)
107             atomic_marked_ptr const& next( unsigned int nLevel ) const
108             {
109                 assert( nLevel < height() );
110                 assert( nLevel == 0 || nLevel > 0 && m_arrNext != nullptr );
111
112                 return nLevel ? m_arrNext[nLevel - 1] : m_pNext;
113             }
114
115             /// Access to element of next pointer array (same as \ref next function)
116             atomic_marked_ptr& operator[]( unsigned int nLevel )
117             {
118                 return next( nLevel );
119             }
120
121             /// Access to element of next pointer array (same as \ref next function)
122             atomic_marked_ptr const& operator[]( unsigned int nLevel ) const
123             {
124                 return next( nLevel );
125             }
126
127             /// Height of the node
128             unsigned int height() const
129             {
130                 return m_nHeight;
131             }
132
133             /// Clears internal links
134             void clear()
135             {
136                 assert( m_arrNext == nullptr );
137                 m_pNext.store( marked_ptr(), atomics::memory_order_release );
138                 m_pDelChain = nullptr;
139             }
140
141             bool is_cleared() const
142             {
143                 return m_pNext == atomic_marked_ptr()
144                     && m_arrNext == nullptr
145                     && m_nHeight <= 1;
146             }
147         };
148     } // namespace skip_list
149     //@endcond
150
151     //@cond
152     namespace skip_list { namespace details {
153
154         template <class RCU, typename NodeTraits, typename BackOff, bool IsConst>
155         class iterator< cds::urcu::gc< RCU >, NodeTraits, BackOff, IsConst >
156         {
157         public:
158             typedef cds::urcu::gc< RCU >                gc;
159             typedef NodeTraits                          node_traits;
160             typedef BackOff                             back_off;
161             typedef typename node_traits::node_type     node_type;
162             typedef typename node_traits::value_type    value_type;
163             static bool const c_isConst = IsConst;
164
165             typedef typename std::conditional< c_isConst, value_type const &, value_type &>::type   value_ref;
166
167         protected:
168             typedef typename node_type::marked_ptr          marked_ptr;
169             typedef typename node_type::atomic_marked_ptr   atomic_marked_ptr;
170
171             node_type *             m_pNode;
172
173         protected:
174             void next()
175             {
176                 // RCU should be locked before iterating!!!
177                 assert( gc::is_locked() );
178
179                 back_off bkoff;
180
181                 for (;;) {
182                     if ( m_pNode->next( m_pNode->height() - 1 ).load( atomics::memory_order_acquire ).bits() ) {
183                         // Current node is marked as deleted. So, its next pointer can point to anything
184                         // In this case we interrupt our iteration and returns end() iterator.
185                         *this = iterator();
186                         return;
187                     }
188
189                     marked_ptr p = m_pNode->next(0).load( atomics::memory_order_relaxed );
190                     node_type * pp = p.ptr();
191                     if ( p.bits() ) {
192                         // p is marked as deleted. Spin waiting for physical removal
193                         bkoff();
194                         continue;
195                     }
196                     else if ( pp && pp->next( pp->height() - 1 ).load( atomics::memory_order_relaxed ).bits() ) {
197                         // p is marked as deleted. Spin waiting for physical removal
198                         bkoff();
199                         continue;
200                     }
201
202                     m_pNode = pp;
203                     break;
204                 }
205             }
206
207         public: // for internal use only!!!
208             iterator( node_type& refHead )
209                 : m_pNode( nullptr )
210             {
211                 // RCU should be locked before iterating!!!
212                 assert( gc::is_locked() );
213
214                 back_off bkoff;
215
216                 for (;;) {
217                     marked_ptr p = refHead.next(0).load( atomics::memory_order_relaxed );
218                     if ( !p.ptr() ) {
219                         // empty skip-list
220                         break;
221                     }
222
223                     node_type * pp = p.ptr();
224                     // Logically deleted node is marked from highest level
225                     if ( !pp->next( pp->height() - 1 ).load( atomics::memory_order_acquire ).bits() ) {
226                         m_pNode = pp;
227                         break;
228                     }
229
230                     bkoff();
231                 }
232             }
233
234         public:
235             iterator()
236                 : m_pNode( nullptr )
237             {
238                 // RCU should be locked before iterating!!!
239                 assert( gc::is_locked() );
240             }
241
242             iterator( iterator const& s)
243                 : m_pNode( s.m_pNode )
244             {
245                 // RCU should be locked before iterating!!!
246                 assert( gc::is_locked() );
247             }
248
249             value_type * operator ->() const
250             {
251                 assert( m_pNode != nullptr );
252                 assert( node_traits::to_value_ptr( m_pNode ) != nullptr );
253
254                 return node_traits::to_value_ptr( m_pNode );
255             }
256
257             value_ref operator *() const
258             {
259                 assert( m_pNode != nullptr );
260                 assert( node_traits::to_value_ptr( m_pNode ) != nullptr );
261
262                 return *node_traits::to_value_ptr( m_pNode );
263             }
264
265             /// Pre-increment
266             iterator& operator ++()
267             {
268                 next();
269                 return *this;
270             }
271
272             iterator& operator = (const iterator& src)
273             {
274                 m_pNode = src.m_pNode;
275                 return *this;
276             }
277
278             template <typename Bkoff, bool C>
279             bool operator ==(iterator<gc, node_traits, Bkoff, C> const& i ) const
280             {
281                 return m_pNode == i.m_pNode;
282             }
283             template <typename Bkoff, bool C>
284             bool operator !=(iterator<gc, node_traits, Bkoff, C> const& i ) const
285             {
286                 return !( *this == i );
287             }
288         };
289     }}  // namespace skip_list::details
290     //@endcond
291
292     /// Lock-free skip-list set (template specialization for \ref cds_urcu_desc "RCU")
293     /** @ingroup cds_intrusive_map
294         @anchor cds_intrusive_SkipListSet_rcu
295
296         The implementation of well-known probabilistic data structure called skip-list
297         invented by W.Pugh in his papers:
298             - [1989] W.Pugh Skip Lists: A Probabilistic Alternative to Balanced Trees
299             - [1990] W.Pugh A Skip List Cookbook
300
301         A skip-list is a probabilistic data structure that provides expected logarithmic
302         time search without the need of rebalance. The skip-list is a collection of sorted
303         linked list. Nodes are ordered by key. Each node is linked into a subset of the lists.
304         Each list has a level, ranging from 0 to 32. The bottom-level list contains
305         all the nodes, and each higher-level list is a sublist of the lower-level lists.
306         Each node is created with a random top level (with a random height), and belongs
307         to all lists up to that level. The probability that a node has the height 1 is 1/2.
308         The probability that a node has the height N is 1/2 ** N (more precisely,
309         the distribution depends on an random generator provided, but our generators
310         have this property).
311
312         The lock-free variant of skip-list is implemented according to book
313             - [2008] M.Herlihy, N.Shavit "The Art of Multiprocessor Programming",
314                 chapter 14.4 "A Lock-Free Concurrent Skiplist".
315
316         <b>Template arguments</b>:
317             - \p RCU - one of \ref cds_urcu_gc "RCU type"
318             - \p T - type to be stored in the list. The type must be based on \p skip_list::node (for \p skip_list::base_hook)
319                 or it must have a member of type \p skip_list::node (for \p skip_list::member_hook).
320             - \p Traits - set traits, default is \p skip_list::traits
321                 It is possible to declare option-based list with \p cds::intrusive::skip_list::make_traits metafunction
322                 instead of \p Traits template argument.
323
324         @note Before including <tt><cds/intrusive/skip_list_rcu.h></tt> you should include appropriate RCU header file,
325         see \ref cds_urcu_gc "RCU type" for list of existing RCU class and corresponding header files.
326
327         <b>Iterators</b>
328
329         The class supports a forward iterator (\ref iterator and \ref const_iterator).
330         The iteration is ordered.
331
332         You may iterate over skip-list set items only under RCU lock.
333         Only in this case the iterator is thread-safe since
334         while RCU is locked any set's item cannot be reclaimed.
335
336         @note The requirement of RCU lock during iterating means that any type of modification of the skip list
337         (i.e. inserting, erasing and so on) is not possible.
338
339         @warning The iterator object cannot be passed between threads.
340
341         Example how to use skip-list set iterators:
342         \code
343         // First, you should include the header for RCU type you have chosen
344         #include <cds/urcu/general_buffered.h>
345         #include <cds/intrusive/skip_list_rcu.h>
346
347         typedef cds::urcu::gc< cds::urcu::general_buffered<> > rcu_type;
348
349         struct Foo {
350             // ...
351         };
352
353         // Traits for your skip-list.
354         // At least, you should define cds::opt::less or cds::opt::compare for Foo struct
355         struct my_traits: public cds::intrusive::skip_list::traits
356         {
357             // ...
358         };
359         typedef cds::intrusive::SkipListSet< rcu_type, Foo, my_traits > my_skiplist_set;
360
361         my_skiplist_set theSet;
362
363         // ...
364
365         // Begin iteration
366         {
367             // Apply RCU locking manually
368             typename rcu_type::scoped_lock sl;
369
370             for ( auto it = theList.begin(); it != theList.end(); ++it ) {
371                 // ...
372             }
373
374             // rcu_type::scoped_lock destructor releases RCU lock implicitly
375         }
376         \endcode
377
378         The iterator class supports the following minimalistic interface:
379         \code
380         struct iterator {
381             // Default ctor
382             iterator();
383
384             // Copy ctor
385             iterator( iterator const& s);
386
387             value_type * operator ->() const;
388             value_type& operator *() const;
389
390             // Pre-increment
391             iterator& operator ++();
392
393             // Copy assignment
394             iterator& operator = (const iterator& src);
395
396             bool operator ==(iterator const& i ) const;
397             bool operator !=(iterator const& i ) const;
398         };
399         \endcode
400         Note, the iterator object returned by \ref end, \p cend member functions points to \p nullptr and should not be dereferenced.
401
402         <b>How to use</b>
403
404         You should incorporate skip_list::node into your struct \p T and provide
405         appropriate skip_list::traits::hook in your \p Traits template parameters. Usually, for \p Traits you
406         define a struct based on \p skip_list::traits.
407
408         Example for <tt>cds::urcu::general_buffered<></tt> RCU and base hook:
409         \code
410         // First, you should include the header for RCU type you have chosen
411         #include <cds/urcu/general_buffered.h>
412
413         // Include RCU skip-list specialization
414         #include <cds/intrusive/skip_list_rcu.h>
415
416         // RCU type typedef
417         typedef cds::urcu::gc< cds::urcu::general_buffered<> > rcu_type;
418
419         // Data stored in skip list
420         struct my_data: public cds::intrusive::skip_list::node< rcu_type >
421         {
422             // key field
423             std::string     strKey;
424
425             // other data
426             // ...
427         };
428
429         // my_data compare functor
430         struct my_data_cmp {
431             int operator()( const my_data& d1, const my_data& d2 )
432             {
433                 return d1.strKey.compare( d2.strKey );
434             }
435
436             int operator()( const my_data& d, const std::string& s )
437             {
438                 return d.strKey.compare(s);
439             }
440
441             int operator()( const std::string& s, const my_data& d )
442             {
443                 return s.compare( d.strKey );
444             }
445         };
446
447         // Declare traits
448         struct my_traits: public cds::intrusive::skip_list::traits
449         {
450             typedef cds::intrusive::skip_list::base_hook< cds::opt::gc< rcu_type > >   hook;
451             typedef my_data_cmp compare;
452         };
453
454         // Declare skip-list set type
455         typedef cds::intrusive::SkipListSet< rcu_type, my_data, my_traits >     traits_based_set;
456         \endcode
457
458         Equivalent option-based code:
459         \code
460         #include <cds/urcu/general_buffered.h>
461         #include <cds/intrusive/skip_list_rcu.h>
462
463         typedef cds::urcu::gc< cds::urcu::general_buffered<> > rcu_type;
464
465         struct my_data {
466             // see above
467         };
468         struct compare {
469             // see above
470         };
471
472         // Declare option-based skip-list set
473         typedef cds::intrusive::SkipListSet< rcu_type
474             ,my_data
475             , typename cds::intrusive::skip_list::make_traits<
476                 cds::intrusive::opt::hook< cds::intrusive::skip_list::base_hook< cds::opt::gc< rcu_type > > >
477                 ,cds::intrusive::opt::compare< my_data_cmp >
478             >::type
479         > option_based_set;
480
481         \endcode
482     */
483     template <
484         class RCU
485        ,typename T
486 #ifdef CDS_DOXYGEN_INVOKED
487        ,typename Traits = skip_list::traits
488 #else
489        ,typename Traits
490 #endif
491     >
492     class SkipListSet< cds::urcu::gc< RCU >, T, Traits >
493     {
494     public:
495         typedef cds::urcu::gc< RCU > gc; ///< Garbage collector
496         typedef T       value_type;      ///< type of value stored in the skip-list
497         typedef Traits  traits;          ///< Traits template parameter
498
499         typedef typename traits::hook    hook;      ///< hook type
500         typedef typename hook::node_type node_type; ///< node type
501
502 #   ifdef CDS_DOXYGEN_INVOKED
503         typedef implementation_defined key_comparator  ;    ///< key comparison functor based on \p Traits::compare and \p Traits::less
504 #   else
505         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
506 #   endif
507
508         typedef typename traits::disposer  disposer;   ///< disposer
509         typedef typename get_node_traits< value_type, node_type, hook>::type node_traits;    ///< node traits
510
511         typedef typename traits::item_counter  item_counter;   ///< Item counting policy used
512         typedef typename traits::memory_model  memory_model;   ///< Memory ordering, see \p cds::opt::memory_model option
513         typedef typename traits::random_level_generator    random_level_generator;   ///< random level generator
514         typedef typename traits::allocator     allocator_type; ///< allocator for maintaining array of next pointers of the node
515         typedef typename traits::back_off      back_off;       ///< Back-off strategy
516         typedef typename traits::stat          stat;           ///< internal statistics type
517         typedef typename traits::rcu_check_deadlock rcu_check_deadlock; ///< Deadlock checking policy
518         typedef typename gc::scoped_lock       rcu_lock;      ///< RCU scoped lock
519         static CDS_CONSTEXPR const bool c_bExtractLockExternal = false; ///< Group of \p extract_xxx functions does not require external locking
520
521
522         /// Max node height. The actual node height should be in range <tt>[0 .. c_nMaxHeight)</tt>
523         /**
524             The max height is specified by \ref skip_list::random_level_generator "random level generator" constant \p m_nUpperBound
525             but it should be no more than 32 (\ref skip_list::c_nHeightLimit).
526         */
527         static unsigned int const c_nMaxHeight = std::conditional<
528             (random_level_generator::c_nUpperBound <= skip_list::c_nHeightLimit),
529             std::integral_constant< unsigned int, random_level_generator::c_nUpperBound >,
530             std::integral_constant< unsigned int, skip_list::c_nHeightLimit >
531         >::type::value;
532
533         //@cond
534         static unsigned int const c_nMinHeight = 5;
535         typedef cds::intrusive::skip_list::implementation_tag implementation_tag;
536         //@endcond
537
538     protected:
539         typedef typename node_type::atomic_marked_ptr   atomic_node_ptr ;   ///< Atomic marked node pointer
540         typedef typename node_type::marked_ptr          marked_node_ptr ;   ///< Node marked pointer
541
542     protected:
543         //@cond
544         typedef skip_list::details::intrusive_node_builder< node_type, atomic_node_ptr, allocator_type > intrusive_node_builder;
545
546         typedef typename std::conditional<
547             std::is_same< typename traits::internal_node_builder, cds::opt::none >::value
548             ,intrusive_node_builder
549             ,typename traits::internal_node_builder
550         >::type node_builder;
551
552         typedef std::unique_ptr< node_type, typename node_builder::node_disposer >    scoped_node_ptr;
553
554         struct position {
555             node_type *   pPrev[ c_nMaxHeight ];
556             node_type *   pSucc[ c_nMaxHeight ];
557             node_type *   pNext[ c_nMaxHeight ];
558
559             node_type *   pCur;
560             node_type *   pDelChain;
561
562             position()
563                 : pDelChain( nullptr )
564             {}
565 #       ifdef _DEBUG
566             ~position()
567             {
568                 assert( pDelChain == nullptr );
569             }
570 #       endif
571         };
572
573         typedef cds::urcu::details::check_deadlock_policy< gc, rcu_check_deadlock>   check_deadlock_policy;
574         //@endcond
575
576     protected:
577         skip_list::details::head_node< node_type > m_Head;   ///< head tower (max height)
578
579         item_counter                m_ItemCounter;      ///< item counter
580         random_level_generator      m_RandomLevelGen;   ///< random level generator instance
581         atomics::atomic<unsigned int>    m_nHeight;     ///< estimated high level
582         atomics::atomic<node_type *>     m_pDeferredDelChain ;   ///< Deferred deleted node chain
583         mutable stat                m_Stat;             ///< internal statistics
584
585     protected:
586         //@cond
587         unsigned int random_level()
588         {
589             // Random generator produces a number from range [0..31]
590             // We need a number from range [1..32]
591             return m_RandomLevelGen() + 1;
592         }
593
594         template <typename Q>
595         node_type * build_node( Q v )
596         {
597             return node_builder::make_tower( v, m_RandomLevelGen );
598         }
599
600         static void dispose_node( value_type * pVal )
601         {
602             assert( pVal );
603
604             typename node_builder::node_disposer()( node_traits::to_node_ptr(pVal) );
605             disposer()( pVal );
606         }
607
608         struct node_disposer
609         {
610             void operator()( value_type * pVal )
611             {
612                 dispose_node( pVal );
613             }
614         };
615         //@endcond
616
617     public:
618         using exempt_ptr = cds::urcu::exempt_ptr< gc, value_type, value_type, node_disposer, void >; ///< pointer to extracted node
619
620     protected:
621         //@cond
622
623         bool is_extracted( marked_node_ptr const p ) const
624         {
625             return (p.bits() & 2) != 0;
626         }
627
628         template <typename Q, typename Compare >
629         bool find_position( Q const& val, position& pos, Compare cmp, bool bStopIfFound )
630         {
631             assert( gc::is_locked() );
632
633             node_type * pPred;
634             marked_node_ptr pSucc;
635             marked_node_ptr pCur;
636             int nCmp = 1;
637
638         retry:
639             pPred = m_Head.head();
640
641             for ( int nLevel = static_cast<int>(c_nMaxHeight - 1); nLevel >= 0; --nLevel ) {
642
643                 while ( true ) {
644                     pCur = pPred->next( nLevel ).load( memory_model::memory_order_relaxed );
645                     if ( pCur.bits() ) {
646                         // pCur.bits() means that pPred is logically deleted
647                         goto retry;
648                     }
649
650                     if ( pCur.ptr() == nullptr ) {
651                         // end of the list at level nLevel - goto next level
652                         break;
653                     }
654
655                     // pSucc contains deletion mark for pCur
656                     pSucc = pCur->next( nLevel ).load( memory_model::memory_order_relaxed );
657
658                     if ( pPred->next( nLevel ).load( memory_model::memory_order_relaxed ).all() != pCur.ptr() )
659                         goto retry;
660
661                     if ( pSucc.bits() ) {
662                         // pCur is marked, i.e. logically deleted.
663                         marked_node_ptr p( pCur.ptr() );
664                         if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ),
665                              memory_model::memory_order_release, atomics::memory_order_relaxed ))
666                         {
667                             if ( nLevel == 0 ) {
668 #                       ifdef _DEBUG
669                                 pCur->m_bUnlinked = true;
670 #                       endif
671
672                                 if ( !is_extracted( pSucc )) {
673                                     // We cannot free the node at this moment since RCU is locked
674                                     // Link deleted nodes to a chain to free later
675                                     link_for_remove( pos, pCur.ptr() );
676                                     m_Stat.onEraseWhileFind();
677                                 }
678                                 else {
679                                     m_Stat.onExtractWhileFind();
680                                 }
681                             }
682                         }
683                         goto retry;
684                     }
685                     else {
686                         nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr()), val );
687                         if ( nCmp < 0 )
688                             pPred = pCur.ptr();
689                         else if ( nCmp == 0 && bStopIfFound )
690                             goto found;
691                         else
692                             break;
693                     }
694                 }
695
696                 // Next level
697                 pos.pPrev[ nLevel ] = pPred;
698                 pos.pSucc[ nLevel ] = pCur.ptr();
699             }
700
701             if ( nCmp != 0 )
702                 return false;
703
704         found:
705             pos.pCur = pCur.ptr();
706             return pCur.ptr() && nCmp == 0;
707         }
708
709         bool find_min_position( position& pos )
710         {
711             assert( gc::is_locked() );
712
713             node_type * pPred;
714             marked_node_ptr pSucc;
715             marked_node_ptr pCur;
716
717         retry:
718             pPred = m_Head.head();
719
720             for ( int nLevel = static_cast<int>(c_nMaxHeight - 1); nLevel >= 0; --nLevel ) {
721
722                 pCur = pPred->next( nLevel ).load( memory_model::memory_order_relaxed );
723                 // pCur.bits() means that pPred is logically deleted
724                 // head cannot be deleted
725                 assert( pCur.bits() == 0 );
726
727                 if ( pCur.ptr() ) {
728
729                     // pSucc contains deletion mark for pCur
730                     pSucc = pCur->next( nLevel ).load( memory_model::memory_order_relaxed );
731
732                     if ( pPred->next( nLevel ).load( memory_model::memory_order_relaxed ).all() != pCur.ptr() )
733                         goto retry;
734
735                     if ( pSucc.bits() ) {
736                         // pCur is marked, i.e. logically deleted.
737                         marked_node_ptr p( pCur.ptr() );
738                         if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ),
739                             memory_model::memory_order_release, atomics::memory_order_relaxed ))
740                         {
741                             if ( nLevel == 0 ) {
742 #                       ifdef _DEBUG
743                                 pCur->m_bUnlinked = true;
744 #                       endif
745
746                                 if ( !is_extracted( pSucc )) {
747                                     // We cannot free the node at this moment since RCU is locked
748                                     // Link deleted nodes to a chain to free later
749                                     link_for_remove( pos, pCur.ptr() );
750                                     m_Stat.onEraseWhileFind();
751                                 }
752                                 else {
753                                     m_Stat.onExtractWhileFind();
754                                 }
755                             }
756                         }
757                         goto retry;
758                     }
759                 }
760
761                 // Next level
762                 pos.pPrev[ nLevel ] = pPred;
763                 pos.pSucc[ nLevel ] = pCur.ptr();
764             }
765             return (pos.pCur = pCur.ptr()) != nullptr;
766         }
767
768         bool find_max_position( position& pos )
769         {
770             assert( gc::is_locked() );
771
772             node_type * pPred;
773             marked_node_ptr pSucc;
774             marked_node_ptr pCur;
775
776 retry:
777             pPred = m_Head.head();
778
779             for ( int nLevel = static_cast<int>(c_nMaxHeight - 1); nLevel >= 0; --nLevel ) {
780
781                 while ( true ) {
782                     pCur = pPred->next( nLevel ).load( memory_model::memory_order_relaxed );
783                     if ( pCur.bits() ) {
784                         // pCur.bits() means that pPred is logically deleted
785                         goto retry;
786                     }
787
788                     if ( pCur.ptr() == nullptr ) {
789                         // end of the list at level nLevel - goto next level
790                         break;
791                     }
792
793                     // pSucc contains deletion mark for pCur
794                     pSucc = pCur->next( nLevel ).load( memory_model::memory_order_relaxed );
795
796                     if ( pPred->next( nLevel ).load( memory_model::memory_order_relaxed ).all() != pCur.ptr() )
797                         goto retry;
798
799                     if ( pSucc.bits() ) {
800                         // pCur is marked, i.e. logically deleted.
801                         marked_node_ptr p( pCur.ptr() );
802                         if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ),
803                             memory_model::memory_order_release, atomics::memory_order_relaxed ))
804                         {
805                             if ( nLevel == 0 ) {
806 #                       ifdef _DEBUG
807                                 pCur->m_bUnlinked = true;
808 #                       endif
809
810                                 if ( !is_extracted( pSucc )) {
811                                     // We cannot free the node at this moment since RCU is locked
812                                     // Link deleted nodes to a chain to free later
813                                     link_for_remove( pos, pCur.ptr() );
814                                     m_Stat.onEraseWhileFind();
815                                 }
816                                 else {
817                                     m_Stat.onExtractWhileFind();
818                                 }
819                             }
820                         }
821                         goto retry;
822                     }
823                     else {
824                         if ( !pSucc.ptr() )
825                             break;
826
827                         pPred = pCur.ptr();
828                     }
829                 }
830
831                 // Next level
832                 pos.pPrev[ nLevel ] = pPred;
833                 pos.pSucc[ nLevel ] = pCur.ptr();
834             }
835
836             return (pos.pCur = pCur.ptr()) != nullptr;
837         }
838
839         template <typename Func>
840         bool insert_at_position( value_type& val, node_type * pNode, position& pos, Func f )
841         {
842             assert( gc::is_locked() );
843
844             unsigned int nHeight = pNode->height();
845             pNode->clear_tower();
846
847             {
848                 marked_node_ptr p( pos.pSucc[0] );
849                 pNode->next( 0 ).store( p, memory_model::memory_order_release );
850                 if ( !pos.pPrev[0]->next(0).compare_exchange_strong( p, marked_node_ptr(pNode), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
851                     return false;
852                 }
853 #       ifdef _DEBUG
854                 pNode->m_bLinked = true;
855 #       endif
856                 f( val );
857             }
858
859             for ( unsigned int nLevel = 1; nLevel < nHeight; ++nLevel ) {
860                 marked_node_ptr p;
861                 while ( true ) {
862                     marked_node_ptr q( pos.pSucc[ nLevel ]);
863                     if ( !pNode->next( nLevel ).compare_exchange_strong( p, q, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
864                         // pNode has been marked as removed while we are inserting it
865                         // Stop inserting
866                         assert( p.bits() );
867                         m_Stat.onLogicDeleteWhileInsert();
868                         return true;
869                     }
870                     p = q;
871                     if ( pos.pPrev[nLevel]->next(nLevel).compare_exchange_strong( q, marked_node_ptr( pNode ), memory_model::memory_order_release, atomics::memory_order_relaxed ) )
872                         break;
873
874                     // Renew insert position
875                     m_Stat.onRenewInsertPosition();
876                     if ( !find_position( val, pos, key_comparator(), false )) {
877                         // The node has been deleted while we are inserting it
878                         m_Stat.onNotFoundWhileInsert();
879                         return true;
880                     }
881                 }
882             }
883             return true;
884         }
885
886         static void link_for_remove( position& pos, node_type * pDel )
887         {
888             assert( pDel->m_pDelChain == nullptr );
889
890             pDel->m_pDelChain = pos.pDelChain;
891             pos.pDelChain = pDel;
892         }
893
894         template <typename Func>
895         bool try_remove_at( node_type * pDel, position& pos, Func f, bool bExtract )
896         {
897             assert( pDel != nullptr );
898             assert( gc::is_locked() );
899
900             marked_node_ptr pSucc;
901
902             // logical deletion (marking)
903             for ( unsigned int nLevel = pDel->height() - 1; nLevel > 0; --nLevel ) {
904                 pSucc = pDel->next(nLevel).load( memory_model::memory_order_relaxed );
905                 while ( true ) {
906                     if ( pSucc.bits()
907                       || pDel->next(nLevel).compare_exchange_weak( pSucc, pSucc | 1, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
908                     {
909                         break;
910                     }
911                 }
912             }
913
914             pSucc = pDel->next(0).load( memory_model::memory_order_relaxed );
915             while ( true ) {
916                 if ( pSucc.bits() )
917                     return false;
918
919                 int const nMask = bExtract ? 3 : 1;
920                 if ( pDel->next(0).compare_exchange_strong( pSucc, pSucc | nMask, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
921                 {
922                     f( *node_traits::to_value_ptr( pDel ));
923
924                     // physical deletion
925                     // try fast erase
926                     pSucc = pDel;
927                     for ( int nLevel = static_cast<int>( pDel->height() - 1 ); nLevel >= 0; --nLevel ) {
928                         if ( !pos.pPrev[nLevel]->next(nLevel).compare_exchange_strong( pSucc,
929                             marked_node_ptr( pDel->next(nLevel).load(memory_model::memory_order_relaxed).ptr() ),
930                             memory_model::memory_order_release, atomics::memory_order_relaxed) )
931                         {
932                             // Do slow erase
933                             find_position( *node_traits::to_value_ptr(pDel), pos, key_comparator(), false );
934                             if ( bExtract )
935                                 m_Stat.onSlowExtract();
936                             else
937                                 m_Stat.onSlowErase();
938 #                        ifdef _DEBUG
939                             assert( pDel->m_bUnlinked );
940 #                        endif
941                             return true;
942                         }
943                     }
944
945 #               ifdef _DEBUG
946                     pDel->m_bUnlinked = true;
947 #               endif
948                     if ( !bExtract ) {
949                         // We cannot free the node at this moment since RCU is locked
950                         // Link deleted nodes to a chain to free later
951                         link_for_remove( pos, pDel );
952                         m_Stat.onFastErase();
953                     }
954                     else
955                         m_Stat.onFastExtract();
956
957                     return true;
958                 }
959             }
960         }
961
962         enum finsd_fastpath_result {
963             find_fastpath_found,
964             find_fastpath_not_found,
965             find_fastpath_abort
966         };
967         template <typename Q, typename Compare, typename Func>
968         finsd_fastpath_result find_fastpath( Q& val, Compare cmp, Func f ) const
969         {
970             node_type * pPred;
971             marked_node_ptr pCur;
972             marked_node_ptr pSucc;
973             marked_node_ptr pNull;
974
975             back_off bkoff;
976
977             pPred = m_Head.head();
978             for ( int nLevel = static_cast<int>(m_nHeight.load(memory_model::memory_order_relaxed) - 1); nLevel >= 0; --nLevel ) {
979                 pCur = pPred->next(nLevel).load( memory_model::memory_order_acquire );
980                 if ( pCur == pNull )
981                     continue;
982
983                 while ( pCur != pNull ) {
984                     if ( pCur.bits() ) {
985                         // Wait until pCur is removed
986                         unsigned int nAttempt = 0;
987                         while ( pCur.bits() && nAttempt++ < 16 ) {
988                             bkoff();
989                             pCur = pPred->next(nLevel).load( memory_model::memory_order_acquire );
990                         }
991                         bkoff.reset();
992
993                         if ( pCur.bits() ) {
994                             // Maybe, we are on deleted node sequence
995                             // Abort searching, try slow-path
996                             return find_fastpath_abort;
997                         }
998                     }
999
1000                     if ( pCur.ptr() ) {
1001                         int nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr() ), val );
1002                         if ( nCmp < 0 ) {
1003                             pPred = pCur.ptr();
1004                             pCur = pCur->next(nLevel).load( memory_model::memory_order_acquire );
1005                         }
1006                         else if ( nCmp == 0 ) {
1007                             // found
1008                             f( *node_traits::to_value_ptr( pCur.ptr() ), val );
1009                             return find_fastpath_found;
1010                         }
1011                         else // pCur > val - go down
1012                             break;
1013                     }
1014                 }
1015             }
1016
1017             return find_fastpath_not_found;
1018         }
1019
1020         template <typename Q, typename Compare, typename Func>
1021         bool find_slowpath( Q& val, Compare cmp, Func f, position& pos )
1022         {
1023             if ( find_position( val, pos, cmp, true )) {
1024                 assert( cmp( *node_traits::to_value_ptr( pos.pCur ), val ) == 0 );
1025
1026                 f( *node_traits::to_value_ptr( pos.pCur ), val );
1027                 return true;
1028             }
1029             else
1030                 return false;
1031         }
1032
1033         template <typename Q, typename Compare, typename Func>
1034         bool do_find_with( Q& val, Compare cmp, Func f )
1035         {
1036             position pos;
1037             bool bRet;
1038
1039             rcu_lock l;
1040
1041             switch ( find_fastpath( val, cmp, f )) {
1042             case find_fastpath_found:
1043                 m_Stat.onFindFastSuccess();
1044                 return true;
1045             case find_fastpath_not_found:
1046                 m_Stat.onFindFastFailed();
1047                 return false;
1048             default:
1049                 break;
1050             }
1051
1052             if ( find_slowpath( val, cmp, f, pos )) {
1053                 m_Stat.onFindSlowSuccess();
1054                 bRet = true;
1055             }
1056             else {
1057                 m_Stat.onFindSlowFailed();
1058                 bRet = false;
1059             }
1060
1061             defer_chain( pos );
1062
1063             return bRet;
1064         }
1065
1066         template <typename Q, typename Compare, typename Func>
1067         bool do_erase( Q const& val, Compare cmp, Func f )
1068         {
1069             check_deadlock_policy::check();
1070
1071             position pos;
1072             bool bRet;
1073
1074             {
1075                 rcu_lock rcuLock;
1076
1077                 if ( !find_position( val, pos, cmp, false ) ) {
1078                     m_Stat.onEraseFailed();
1079                     bRet = false;
1080                 }
1081                 else {
1082                     node_type * pDel = pos.pCur;
1083                     assert( cmp( *node_traits::to_value_ptr( pDel ), val ) == 0 );
1084
1085                     unsigned int nHeight = pDel->height();
1086                     if ( try_remove_at( pDel, pos, f, false )) {
1087                         --m_ItemCounter;
1088                         m_Stat.onRemoveNode( nHeight );
1089                         m_Stat.onEraseSuccess();
1090                         bRet = true;
1091                     }
1092                     else {
1093                         m_Stat.onEraseFailed();
1094                         bRet = false;
1095                     }
1096                 }
1097             }
1098
1099             dispose_chain( pos );
1100             return bRet;
1101         }
1102
1103         template <typename Q, typename Compare>
1104         value_type * do_extract_key( Q const& key, Compare cmp )
1105         {
1106             // RCU should be locked!!!
1107             assert( gc::is_locked() );
1108
1109             position pos;
1110             node_type * pDel;
1111
1112             if ( !find_position( key, pos, cmp, false ) ) {
1113                 m_Stat.onExtractFailed();
1114                 pDel = nullptr;
1115             }
1116             else {
1117                 pDel = pos.pCur;
1118                 assert( cmp( *node_traits::to_value_ptr( pDel ), key ) == 0 );
1119
1120                 unsigned int const nHeight = pDel->height();
1121
1122                 if ( try_remove_at( pDel, pos, [](value_type const&) {}, true )) {
1123                     --m_ItemCounter;
1124                     m_Stat.onRemoveNode( nHeight );
1125                     m_Stat.onExtractSuccess();
1126                 }
1127                 else {
1128                     m_Stat.onExtractFailed();
1129                     pDel = nullptr;
1130                 }
1131             }
1132
1133             defer_chain( pos );
1134             return pDel ? node_traits::to_value_ptr( pDel ) : nullptr;
1135         }
1136
1137         template <typename Q>
1138         value_type * do_extract( Q const& key )
1139         {
1140             check_deadlock_policy::check();
1141             value_type * pDel = nullptr;
1142             {
1143                 rcu_lock l;
1144                 pDel = do_extract_key( key, key_comparator() );
1145             }
1146
1147             dispose_deferred();
1148             return pDel;
1149         }
1150
1151         template <typename Q, typename Less>
1152         value_type * do_extract_with( Q const& key, Less pred )
1153         {
1154             CDS_UNUSED(pred);
1155             check_deadlock_policy::check();
1156             value_type * pDel = nullptr;
1157
1158             {
1159                 rcu_lock l;
1160                 pDel = do_extract_key( key, cds::opt::details::make_comparator_from_less<Less>() );
1161             }
1162
1163             dispose_deferred();
1164             return pDel;
1165         }
1166
1167         value_type * do_extract_min()
1168         {
1169             assert( !gc::is_locked() );
1170
1171             position pos;
1172             node_type * pDel;
1173
1174             {
1175                 rcu_lock l;
1176
1177                 if ( !find_min_position( pos ) ) {
1178                     m_Stat.onExtractMinFailed();
1179                     pDel = nullptr;
1180                 }
1181                 else {
1182                     pDel = pos.pCur;
1183                     unsigned int const nHeight = pDel->height();
1184
1185                     if ( try_remove_at( pDel, pos, []( value_type const& ) {}, true ) ) {
1186                         --m_ItemCounter;
1187                         m_Stat.onRemoveNode( nHeight );
1188                         m_Stat.onExtractMinSuccess();
1189                     }
1190                     else {
1191                         m_Stat.onExtractMinFailed();
1192                         pDel = nullptr;
1193                     }
1194                 }
1195
1196                 defer_chain( pos );
1197             }
1198
1199             dispose_deferred();
1200             return pDel ? node_traits::to_value_ptr( pDel ) : nullptr;
1201         }
1202
1203         value_type * do_extract_max()
1204         {
1205             assert( !gc::is_locked() );
1206
1207             position pos;
1208             node_type * pDel;
1209
1210             {
1211                 rcu_lock l;
1212
1213                 if ( !find_max_position( pos ) ) {
1214                     m_Stat.onExtractMaxFailed();
1215                     pDel = nullptr;
1216                 }
1217                 else {
1218                     pDel = pos.pCur;
1219                     unsigned int const nHeight = pDel->height();
1220
1221                     if ( try_remove_at( pDel, pos, []( value_type const& ) {}, true ) ) {
1222                         --m_ItemCounter;
1223                         m_Stat.onRemoveNode( nHeight );
1224                         m_Stat.onExtractMaxSuccess();
1225                     }
1226                     else {
1227                         m_Stat.onExtractMaxFailed();
1228                         pDel = nullptr;
1229                     }
1230                 }
1231
1232                 defer_chain( pos );
1233             }
1234
1235             dispose_deferred();
1236             return pDel ? node_traits::to_value_ptr( pDel ) : nullptr;
1237         }
1238
1239         void increase_height( unsigned int nHeight )
1240         {
1241             unsigned int nCur = m_nHeight.load( memory_model::memory_order_relaxed );
1242             if ( nCur < nHeight )
1243                 m_nHeight.compare_exchange_strong( nCur, nHeight, memory_model::memory_order_release, atomics::memory_order_relaxed );
1244         }
1245
1246         class deferred_list_iterator
1247         {
1248             node_type * pCur;
1249         public:
1250             explicit deferred_list_iterator( node_type * p )
1251                 : pCur(p)
1252             {}
1253             deferred_list_iterator()
1254                 : pCur( nullptr )
1255             {}
1256
1257             cds::urcu::retired_ptr operator *() const
1258             {
1259                 return cds::urcu::retired_ptr( node_traits::to_value_ptr(pCur), dispose_node );
1260             }
1261
1262             void operator ++()
1263             {
1264                 pCur = pCur->m_pDelChain;
1265             }
1266
1267             bool operator ==( deferred_list_iterator const& i ) const
1268             {
1269                 return pCur == i.pCur;
1270             }
1271             bool operator !=( deferred_list_iterator const& i ) const
1272             {
1273                 return !operator ==( i );
1274             }
1275         };
1276
1277         void dispose_chain( node_type * pHead )
1278         {
1279             // RCU should NOT be locked
1280             check_deadlock_policy::check();
1281
1282             gc::batch_retire( deferred_list_iterator( pHead ), deferred_list_iterator() );
1283         }
1284
1285         void dispose_chain( position& pos )
1286         {
1287             // RCU should NOT be locked
1288             check_deadlock_policy::check();
1289
1290             // Delete local chain
1291             if ( pos.pDelChain ) {
1292                 dispose_chain( pos.pDelChain );
1293                 pos.pDelChain = nullptr;
1294             }
1295
1296             // Delete deferred chain
1297             dispose_deferred();
1298         }
1299
1300         void dispose_deferred()
1301         {
1302             dispose_chain( m_pDeferredDelChain.exchange( nullptr, memory_model::memory_order_acq_rel ) );
1303         }
1304
1305         void defer_chain( position& pos )
1306         {
1307             if ( pos.pDelChain ) {
1308                 node_type * pHead = pos.pDelChain;
1309                 node_type * pTail = pHead;
1310                 while ( pTail->m_pDelChain )
1311                     pTail = pTail->m_pDelChain;
1312
1313                 node_type * pDeferList = m_pDeferredDelChain.load( memory_model::memory_order_relaxed );
1314                 do {
1315                     pTail->m_pDelChain = pDeferList;
1316                 } while ( !m_pDeferredDelChain.compare_exchange_weak( pDeferList, pHead, memory_model::memory_order_acq_rel, atomics::memory_order_relaxed ));
1317
1318                 pos.pDelChain = nullptr;
1319             }
1320         }
1321
1322         //@endcond
1323
1324     public:
1325         /// Default constructor
1326         SkipListSet()
1327             : m_Head( c_nMaxHeight )
1328             , m_nHeight( c_nMinHeight )
1329             , m_pDeferredDelChain( nullptr )
1330         {
1331             static_assert( (std::is_same< gc, typename node_type::gc >::value), "GC and node_type::gc must be the same type" );
1332
1333             // Barrier for head node
1334             atomics::atomic_thread_fence( memory_model::memory_order_release );
1335         }
1336
1337         /// Clears and destructs the skip-list
1338         ~SkipListSet()
1339         {
1340             clear();
1341         }
1342
1343     public:
1344         /// Iterator type
1345         typedef skip_list::details::iterator< gc, node_traits, back_off, false >  iterator;
1346
1347         /// Const iterator type
1348         typedef skip_list::details::iterator< gc, node_traits, back_off, true >   const_iterator;
1349
1350         /// Returns a forward iterator addressing the first element in a set
1351         iterator begin()
1352         {
1353             return iterator( *m_Head.head() );
1354         }
1355
1356         /// Returns a forward const iterator addressing the first element in a set
1357         const_iterator begin() const
1358         {
1359             return const_iterator( *m_Head.head() );
1360         }
1361
1362         /// Returns a forward const iterator addressing the first element in a set
1363         const_iterator cbegin() const
1364         {
1365             return const_iterator( *m_Head.head() );
1366         }
1367
1368         /// Returns a forward iterator that addresses the location succeeding the last element in a set.
1369         iterator end()
1370         {
1371             return iterator();
1372         }
1373
1374         /// Returns a forward const iterator that addresses the location succeeding the last element in a set.
1375         const_iterator end() const
1376         {
1377             return const_iterator();
1378         }
1379
1380         /// Returns a forward const iterator that addresses the location succeeding the last element in a set.
1381         const_iterator cend() const
1382         {
1383             return const_iterator();
1384         }
1385
1386     public:
1387         /// Inserts new node
1388         /**
1389             The function inserts \p val in the set if it does not contain
1390             an item with key equal to \p val.
1391
1392             The function applies RCU lock internally.
1393
1394             Returns \p true if \p val is placed into the set, \p false otherwise.
1395         */
1396         bool insert( value_type& val )
1397         {
1398             return insert( val, []( value_type& ) {} );
1399         }
1400
1401         /// Inserts new node
1402         /**
1403             This function is intended for derived non-intrusive containers.
1404
1405             The function allows to split creating of new item into two part:
1406             - create item with key only
1407             - insert new item into the set
1408             - if inserting is success, calls  \p f functor to initialize value-field of \p val.
1409
1410             The functor signature is:
1411             \code
1412                 void func( value_type& val );
1413             \endcode
1414             where \p val is the item inserted. User-defined functor \p f should guarantee that during changing
1415             \p val no any other changes could be made on this set's item by concurrent threads.
1416             The user-defined functor is called only if the inserting is success.
1417
1418             RCU \p synchronize method can be called. RCU should not be locked.
1419         */
1420         template <typename Func>
1421         bool insert( value_type& val, Func f )
1422         {
1423             check_deadlock_policy::check();
1424
1425             position pos;
1426             bool bRet;
1427
1428             {
1429                 node_type * pNode = node_traits::to_node_ptr( val );
1430                 scoped_node_ptr scp( pNode );
1431                 unsigned int nHeight = pNode->height();
1432                 bool bTowerOk = nHeight > 1 && pNode->get_tower() != nullptr;
1433                 bool bTowerMade = false;
1434
1435                 rcu_lock rcuLock;
1436
1437                 while ( true )
1438                 {
1439                     bool bFound = find_position( val, pos, key_comparator(), true );
1440                     if ( bFound ) {
1441                         // scoped_node_ptr deletes the node tower if we create it
1442                         if ( !bTowerMade )
1443                             scp.release();
1444
1445                         m_Stat.onInsertFailed();
1446                         bRet = false;
1447                         break;
1448                     }
1449
1450                     if ( !bTowerOk ) {
1451                         build_node( pNode );
1452                         nHeight = pNode->height();
1453                         bTowerMade =
1454                             bTowerOk = true;
1455                     }
1456
1457                     if ( !insert_at_position( val, pNode, pos, f )) {
1458                         m_Stat.onInsertRetry();
1459                         continue;
1460                     }
1461
1462                     increase_height( nHeight );
1463                     ++m_ItemCounter;
1464                     m_Stat.onAddNode( nHeight );
1465                     m_Stat.onInsertSuccess();
1466                     scp.release();
1467                     bRet =  true;
1468                     break;
1469                 }
1470             }
1471
1472             dispose_chain( pos );
1473
1474             return bRet;
1475         }
1476
1477         /// Ensures that the \p val exists in the set
1478         /**
1479             The operation performs inserting or changing data with lock-free manner.
1480
1481             If the item \p val is not found in the set, then \p val is inserted into the set.
1482             Otherwise, the functor \p func is called with item found.
1483             The functor signature is:
1484             \code
1485                 void func( bool bNew, value_type& item, value_type& val );
1486             \endcode
1487             with arguments:
1488             - \p bNew - \p true if the item has been inserted, \p false otherwise
1489             - \p item - item of the set
1490             - \p val - argument \p val passed into the \p %ensure() function
1491             If new item has been inserted (i.e. \p bNew is \p true) then \p item and \p val arguments
1492             refer to the same thing.
1493
1494             The functor can change non-key fields of the \p item; however, \p func must guarantee
1495             that during changing no any other modifications could be made on this item by concurrent threads.
1496
1497             RCU \p synchronize method can be called. RCU should not be locked.
1498
1499             Returns std::pair<bool, bool> where \p first is \p true if operation is successfull,
1500             \p second is \p true if new item has been added or \p false if the item with \p key
1501             already is in the set.
1502
1503             @warning See \ref cds_intrusive_item_creating "insert item troubleshooting"
1504         */
1505         template <typename Func>
1506         std::pair<bool, bool> ensure( value_type& val, Func func )
1507         {
1508             check_deadlock_policy::check();
1509
1510             position pos;
1511             std::pair<bool, bool> bRet( true, false );
1512
1513             {
1514                 node_type * pNode = node_traits::to_node_ptr( val );
1515                 scoped_node_ptr scp( pNode );
1516                 unsigned int nHeight = pNode->height();
1517                 bool bTowerOk = nHeight > 1 && pNode->get_tower() != nullptr;
1518                 bool bTowerMade = false;
1519
1520                 rcu_lock rcuLock;
1521                 while ( true )
1522                 {
1523                     bool bFound = find_position( val, pos, key_comparator(), true );
1524                     if ( bFound ) {
1525                         // scoped_node_ptr deletes the node tower if we create it before
1526                         if ( !bTowerMade )
1527                             scp.release();
1528
1529                         func( false, *node_traits::to_value_ptr(pos.pCur), val );
1530                         m_Stat.onEnsureExist();
1531                         break;
1532                     }
1533
1534                     if ( !bTowerOk ) {
1535                         build_node( pNode );
1536                         nHeight = pNode->height();
1537                         bTowerMade =
1538                             bTowerOk = true;
1539                     }
1540
1541                     if ( !insert_at_position( val, pNode, pos, [&func]( value_type& item ) { func( true, item, item ); })) {
1542                         m_Stat.onInsertRetry();
1543                         continue;
1544                     }
1545
1546                     increase_height( nHeight );
1547                     ++m_ItemCounter;
1548                     scp.release();
1549                     m_Stat.onAddNode( nHeight );
1550                     m_Stat.onEnsureNew();
1551                     bRet.second = true;
1552                     break;
1553                 }
1554             }
1555
1556             dispose_chain( pos );
1557
1558             return bRet;
1559         }
1560
1561         /// Unlinks the item \p val from the set
1562         /**
1563             The function searches the item \p val in the set and unlink it from the set
1564             if it is found and is equal to \p val.
1565
1566             Difference between \p erase() and \p %unlink() functions: \p erase() finds <i>a key</i>
1567             and deletes the item found. \p %unlink() searches an item by key and deletes it
1568             only if \p val is an item of that set, i.e. the pointer to item found
1569             is equal to <tt> &val </tt>.
1570
1571             RCU \p synchronize method can be called. RCU should not be locked.
1572
1573             The \ref disposer specified in \p Traits class template parameter is called
1574             by garbage collector \p GC asynchronously.
1575
1576             The function returns \p true if success and \p false otherwise.
1577         */
1578         bool unlink( value_type& val )
1579         {
1580             check_deadlock_policy::check();
1581
1582             position pos;
1583             bool bRet;
1584
1585             {
1586                 rcu_lock rcuLock;
1587
1588                 if ( !find_position( val, pos, key_comparator(), false ) ) {
1589                     m_Stat.onUnlinkFailed();
1590                     bRet = false;
1591                 }
1592                 else {
1593                     node_type * pDel = pos.pCur;
1594                     assert( key_comparator()( *node_traits::to_value_ptr( pDel ), val ) == 0 );
1595
1596                     unsigned int nHeight = pDel->height();
1597
1598                     if ( node_traits::to_value_ptr( pDel ) == &val && try_remove_at( pDel, pos, [](value_type const&) {}, false )) {
1599                         --m_ItemCounter;
1600                         m_Stat.onRemoveNode( nHeight );
1601                         m_Stat.onUnlinkSuccess();
1602                         bRet = true;
1603                     }
1604                     else {
1605                         m_Stat.onUnlinkFailed();
1606                         bRet = false;
1607                     }
1608                 }
1609             }
1610
1611             dispose_chain( pos );
1612
1613             return bRet;
1614         }
1615
1616         /// Extracts the item from the set with specified \p key
1617         /** \anchor cds_intrusive_SkipListSet_rcu_extract
1618             The function searches an item with key equal to \p key in the set,
1619             unlinks it from the set, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item found.
1620             If the item with key equal to \p key is not found the function returns an empty \p exempt_ptr.
1621
1622             Note the compare functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1623
1624             RCU \p synchronize method can be called. RCU should NOT be locked.
1625             The function does not call the disposer for the item found.
1626             The disposer will be implicitly invoked when the returned object is destroyed or when
1627             its \p release() member function is called.
1628             Example:
1629             \code
1630             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1631             skip_list theList;
1632             // ...
1633
1634             typename skip_list::exempt_ptr ep( theList.extract( 5 ));
1635             if ( ep ) {
1636                 // Deal with ep
1637                 //...
1638
1639                 // Dispose returned item.
1640                 ep.release();
1641             }
1642             \endcode
1643         */
1644         template <typename Q>
1645         exempt_ptr extract( Q const& key )
1646         {
1647             return exempt_ptr( do_extract( key ));
1648         }
1649
1650         /// Extracts the item from the set with comparing functor \p pred
1651         /**
1652             The function is an analog of \p extract(Q const&) but \p pred predicate is used for key comparing.
1653             \p Less has the interface like \p std::less.
1654             \p pred must imply the same element order as the comparator used for building the set.
1655         */
1656         template <typename Q, typename Less>
1657         exempt_ptr extract_with( Q const& key, Less pred )
1658         {
1659             return exempt_ptr( do_extract_with( key, pred ));
1660         }
1661
1662         /// Extracts an item with minimal key from the list
1663         /**
1664             The function searches an item with minimal key, unlinks it, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item.
1665             If the skip-list is empty the function returns an empty \p exempt_ptr.
1666
1667             RCU \p synchronize method can be called. RCU should NOT be locked.
1668             The function does not call the disposer for the item found.
1669             The disposer will be implicitly invoked when the returned object is destroyed or when
1670             its \p release() member function is manually called.
1671             Example:
1672             \code
1673             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1674             skip_list theList;
1675             // ...
1676
1677             typename skip_list::exempt_ptr ep(theList.extract_min());
1678             if ( ep ) {
1679                 // Deal with ep
1680                 //...
1681
1682                 // Dispose returned item.
1683                 ep.release();
1684             }
1685             \endcode
1686
1687             @note Due the concurrent nature of the list, the function extracts <i>nearly</i> minimum key.
1688             It means that the function gets leftmost item and tries to unlink it.
1689             During unlinking, a concurrent thread may insert an item with key less than leftmost item's key.
1690             So, the function returns the item with minimum key at the moment of list traversing.
1691         */
1692         exempt_ptr extract_min()
1693         {
1694             return exempt_ptr( do_extract_min());
1695         }
1696
1697         /// Extracts an item with maximal key from the list
1698         /**
1699             The function searches an item with maximal key, unlinks it, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item.
1700             If the skip-list is empty the function returns an empty \p exempt_ptr.
1701
1702             RCU \p synchronize method can be called. RCU should NOT be locked.
1703             The function does not call the disposer for the item found.
1704             The disposer will be implicitly invoked when the returned object is destroyed or when
1705             its \p release() member function is manually called.
1706             Example:
1707             \code
1708             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1709             skip_list theList;
1710             // ...
1711
1712             typename skip_list::exempt_ptr ep( theList.extract_max() );
1713             if ( ep ) {
1714                 // Deal with ep
1715                 //...
1716                 // Dispose returned item.
1717                 ep.release();
1718             }
1719             \endcode
1720
1721             @note Due the concurrent nature of the list, the function extracts <i>nearly</i> maximal key.
1722             It means that the function gets rightmost item and tries to unlink it.
1723             During unlinking, a concurrent thread can insert an item with key greater than rightmost item's key.
1724             So, the function returns the item with maximum key at the moment of list traversing.
1725         */
1726         exempt_ptr extract_max()
1727         {
1728             return exempt_ptr( do_extract_max());
1729         }
1730
1731         /// Deletes the item from the set
1732         /** \anchor cds_intrusive_SkipListSet_rcu_erase
1733             The function searches an item with key equal to \p key in the set,
1734             unlinks it from the set, and returns \p true.
1735             If the item with key equal to \p key is not found the function return \p false.
1736
1737             Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1738
1739             RCU \p synchronize method can be called. RCU should not be locked.
1740         */
1741         template <typename Q>
1742         bool erase( const Q& key )
1743         {
1744             return do_erase( key, key_comparator(), [](value_type const&) {} );
1745         }
1746
1747         /// Delete the item from the set with comparing functor \p pred
1748         /**
1749             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_erase "erase(Q const&)"
1750             but \p pred predicate is used for key comparing.
1751             \p Less has the interface like \p std::less.
1752             \p pred must imply the same element order as the comparator used for building the set.
1753         */
1754         template <typename Q, typename Less>
1755         bool erase_with( const Q& key, Less pred )
1756         {
1757             CDS_UNUSED( pred );
1758             return do_erase( key, cds::opt::details::make_comparator_from_less<Less>(), [](value_type const&) {} );
1759         }
1760
1761         /// Deletes the item from the set
1762         /** \anchor cds_intrusive_SkipListSet_rcu_erase_func
1763             The function searches an item with key equal to \p key in the set,
1764             call \p f functor with item found, unlinks it from the set, and returns \p true.
1765             The \ref disposer specified in \p Traits class template parameter is called
1766             by garbage collector \p GC asynchronously.
1767
1768             The \p Func interface is
1769             \code
1770             struct functor {
1771                 void operator()( value_type const& item );
1772             };
1773             \endcode
1774             If the item with key equal to \p key is not found the function return \p false.
1775
1776             Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1777
1778             RCU \p synchronize method can be called. RCU should not be locked.
1779         */
1780         template <typename Q, typename Func>
1781         bool erase( Q const& key, Func f )
1782         {
1783             return do_erase( key, key_comparator(), f );
1784         }
1785
1786         /// Delete the item from the set with comparing functor \p pred
1787         /**
1788             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_erase_func "erase(Q const&, Func)"
1789             but \p pred predicate is used for key comparing.
1790             \p Less has the interface like \p std::less.
1791             \p pred must imply the same element order as the comparator used for building the set.
1792         */
1793         template <typename Q, typename Less, typename Func>
1794         bool erase_with( Q const& key, Less pred, Func f )
1795         {
1796             CDS_UNUSED( pred );
1797             return do_erase( key, cds::opt::details::make_comparator_from_less<Less>(), f );
1798         }
1799
1800         /// Finds \p key
1801         /** @anchor cds_intrusive_SkipListSet_rcu_find_func
1802             The function searches the item with key equal to \p key and calls the functor \p f for item found.
1803             The interface of \p Func functor is:
1804             \code
1805             struct functor {
1806                 void operator()( value_type& item, Q& key );
1807             };
1808             \endcode
1809             where \p item is the item found, \p key is the <tt>find</tt> function argument.
1810
1811             The functor can change non-key fields of \p item. Note that the functor is only guarantee
1812             that \p item cannot be disposed during functor is executing.
1813             The functor does not serialize simultaneous access to the set \p item. If such access is
1814             possible you must provide your own synchronization schema on item level to exclude unsafe item modifications.
1815
1816             The \p key argument is non-const since it can be used as \p f functor destination i.e., the functor
1817             can modify both arguments.
1818
1819             The function applies RCU lock internally.
1820
1821             The function returns \p true if \p key is found, \p false otherwise.
1822         */
1823         template <typename Q, typename Func>
1824         bool find( Q& key, Func f )
1825         {
1826             return do_find_with( key, key_comparator(), f );
1827         }
1828         //@cond
1829         template <typename Q, typename Func>
1830         bool find( Q const& key, Func f )
1831         {
1832             return do_find_with( key, key_comparator(), f );
1833         }
1834         //@endcond
1835
1836         /// Finds the key \p key with comparing functor \p pred
1837         /**
1838             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_find_func "find(Q&, Func)"
1839             but \p cmp is used for key comparison.
1840             \p Less functor has the interface like \p std::less.
1841             \p cmp must imply the same element order as the comparator used for building the set.
1842         */
1843         template <typename Q, typename Less, typename Func>
1844         bool find_with( Q& key, Less pred, Func f )
1845         {
1846             CDS_UNUSED( pred );
1847             return do_find_with( key, cds::opt::details::make_comparator_from_less<Less>(), f );
1848         }
1849         //@cond
1850         template <typename Q, typename Less, typename Func>
1851         bool find_with( Q const& key, Less pred, Func f )
1852         {
1853             CDS_UNUSED( pred );
1854             return do_find_with( key, cds::opt::details::make_comparator_from_less<Less>(), f );
1855         }
1856         //@endcond
1857
1858         /// Finds \p key
1859         /** @anchor cds_intrusive_SkipListSet_rcu_find_val
1860             The function searches the item with key equal to \p key
1861             and returns \p true if it is found, and \p false otherwise.
1862
1863             The function applies RCU lock internally.
1864         */
1865         template <typename Q>
1866         bool find( Q const& key )
1867         {
1868             return do_find_with( key, key_comparator(), [](value_type& , Q const& ) {} );
1869         }
1870
1871         /// Finds \p key with comparing functor \p pred
1872         /**
1873             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_find_val "find(Q const&)"
1874             but \p pred is used for key compare.
1875             \p Less functor has the interface like \p std::less.
1876             \p pred must imply the same element order as the comparator used for building the set.
1877         */
1878         template <typename Q, typename Less>
1879         bool find_with( Q const& key, Less pred )
1880         {
1881             CDS_UNUSED( pred );
1882             return do_find_with( key, cds::opt::details::make_comparator_from_less<Less>(), [](value_type& , Q const& ) {} );
1883         }
1884
1885         /// Finds \p key and return the item found
1886         /** \anchor cds_intrusive_SkipListSet_rcu_get
1887             The function searches the item with key equal to \p key and returns the pointer to item found.
1888             If \p key is not found it returns \p nullptr.
1889
1890             Note the compare functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1891
1892             RCU should be locked before call of this function.
1893             Returned item is valid only while RCU is locked:
1894             \code
1895             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1896             skip_list theList;
1897             // ...
1898             {
1899                 // Lock RCU
1900                 skip_list::rcu_lock lock;
1901
1902                 foo * pVal = theList.get( 5 );
1903                 if ( pVal ) {
1904                     // Deal with pVal
1905                     //...
1906                 }
1907             }
1908             // Unlock RCU by rcu_lock destructor
1909             // pVal can be retired by disposer at any time after RCU has been unlocked
1910             \endcode
1911
1912             After RCU unlocking the \p %force_dispose member function can be called manually,
1913             see \ref force_dispose for explanation.
1914         */
1915         template <typename Q>
1916         value_type * get( Q const& key )
1917         {
1918             assert( gc::is_locked());
1919
1920             value_type * pFound;
1921             return do_find_with( key, key_comparator(), [&pFound](value_type& found, Q const& ) { pFound = &found; } )
1922                 ? pFound : nullptr;
1923         }
1924
1925         /// Finds \p key and return the item found
1926         /**
1927             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_get "get(Q const&)"
1928             but \p pred is used for comparing the keys.
1929
1930             \p Less functor has the semantics like \p std::less but should take arguments of type \ref value_type and \p Q
1931             in any order.
1932             \p pred must imply the same element order as the comparator used for building the set.
1933         */
1934         template <typename Q, typename Less>
1935         value_type * get_with( Q const& key, Less pred )
1936         {
1937             CDS_UNUSED( pred );
1938             assert( gc::is_locked());
1939
1940             value_type * pFound;
1941             return do_find_with( key, cds::opt::details::make_comparator_from_less<Less>(),
1942                 [&pFound](value_type& found, Q const& ) { pFound = &found; } )
1943                 ? pFound : nullptr;
1944         }
1945
1946         /// Returns item count in the set
1947         /**
1948             The value returned depends on item counter type provided by \p Traits template parameter.
1949             For \p atomicity::empty_item_counter the function always returns 0.
1950             Therefore, the function is not suitable for checking the set emptiness, use \p empty()
1951             member function for this purpose.
1952         */
1953         size_t size() const
1954         {
1955             return m_ItemCounter;
1956         }
1957
1958         /// Checks if the set is empty
1959         bool empty() const
1960         {
1961             return m_Head.head()->next( 0 ).load( memory_model::memory_order_relaxed ) == nullptr;
1962         }
1963
1964         /// Clears the set (not atomic)
1965         /**
1966             The function unlink all items from the set.
1967             The function is not atomic, thus, in multi-threaded environment with parallel insertions
1968             this sequence
1969             \code
1970             set.clear();
1971             assert( set.empty() );
1972             \endcode
1973             the assertion could be raised.
1974
1975             For each item the \p disposer will be called automatically after unlinking.
1976         */
1977         void clear()
1978         {
1979             exempt_ptr ep;
1980             while ( (ep = extract_min()) );
1981         }
1982
1983         /// Returns maximum height of skip-list. The max height is a constant for each object and does not exceed 32.
1984         static CDS_CONSTEXPR unsigned int max_height() CDS_NOEXCEPT
1985         {
1986             return c_nMaxHeight;
1987         }
1988
1989         /// Returns const reference to internal statistics
1990         stat const& statistics() const
1991         {
1992             return m_Stat;
1993         }
1994
1995         /// Clears internal list of ready-to-remove items passing it to RCU reclamation cycle
1996         /** @anchor cds_intrusive_SkipListSet_rcu_force_dispose
1997             Skip list has complex multi-step algorithm for removing an item. In fact, when you
1998             remove the item it is just marked as removed that is enough for the success of your operation.
1999             Actual removing can take place in the future, in another call or even in another thread.
2000             Inside RCU lock the removed item cannot be passed to RCU reclamation cycle
2001             since it can lead to deadlock. To solve this problem, the current skip list implementation
2002             has internal list of items which is ready to remove but is not yet passed to RCU reclamation.
2003             Usually, this list will be passed to RCU reclamation in the next suitable call of skip list member function.
2004             In some cases we want to pass it to RCU reclamation immediately after RCU unlocking.
2005             This function provides such opportunity: it checks whether the RCU is not locked and if it is true
2006             the function passes the internal ready-to-remove list to RCU reclamation cycle.
2007
2008             The RCU \p synchronize can be called.
2009         */
2010         void force_dispose()
2011         {
2012             if ( !gc::is_locked() )
2013                 dispose_deferred();
2014         }
2015     };
2016
2017 }} // namespace cds::intrusive
2018
2019
2020 #endif // #ifndef CDSLIB_INTRUSIVE_SKIP_LIST_RCU_H