add find(Q const&, Func), find_with(Q const&, Pred, Func) to all sets
[libcds.git] / cds / intrusive / skip_list_rcu.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_SKIP_LIST_RCU_H
4 #define __CDS_INTRUSIVE_SKIP_LIST_RCU_H
5
6 #include <type_traits>
7 #include <memory>
8 #include <cds/intrusive/details/skip_list_base.h>
9 #include <cds/opt/compare.h>
10 #include <cds/urcu/details/check_deadlock.h>
11 #include <cds/details/binary_functor_wrapper.h>
12 #include <cds/urcu/exempt_ptr.h>
13
14
15 namespace cds { namespace intrusive {
16
17     //@cond
18     namespace skip_list {
19
20         template <class RCU, typename Tag>
21         class node< cds::urcu::gc< RCU >, Tag >
22         {
23         public:
24             typedef cds::urcu::gc< RCU >    gc; ///< Garbage collector
25             typedef Tag     tag               ; ///< tag
26
27             // Mark bits:
28             //  bit 0 - the item is logically deleted
29             //  bit 1 - the item is extracted (only for level 0)
30             typedef cds::details::marked_ptr<node, 3> marked_ptr;        ///< marked pointer
31             typedef atomics::atomic< marked_ptr >     atomic_marked_ptr; ///< atomic marked pointer
32             typedef atomic_marked_ptr                 tower_item_type;
33
34         protected:
35             atomic_marked_ptr       m_pNext;     ///< Next item in bottom-list (list at level 0)
36         public:
37             node *                  m_pDelChain; ///< Deleted node chain (local for a thread)
38 #       ifdef _DEBUG
39             bool volatile           m_bLinked;
40             bool volatile           m_bUnlinked;
41 #       endif
42         protected:
43             unsigned int            m_nHeight;   ///< Node height (size of m_arrNext array). For node at level 0 the height is 1.
44             atomic_marked_ptr *     m_arrNext;   ///< Array of next items for levels 1 .. m_nHeight - 1. For node at level 0 \p m_arrNext is \p nullptr
45
46         public:
47             /// Constructs a node of height 1 (a bottom-list node)
48             CDS_CONSTEXPR node()
49                 : m_pNext( nullptr )
50                 , m_pDelChain( nullptr )
51 #       ifdef _DEBUG
52                 , m_bLinked( false )
53                 , m_bUnlinked( false )
54 #       endif
55                 , m_nHeight(1)
56                 , m_arrNext( nullptr )
57             {}
58
59 #       ifdef _DEBUG
60             ~node()
61             {
62                 assert( !m_bLinked || m_bUnlinked );
63             }
64 #       endif
65
66             /// Constructs a node of height \p nHeight
67             void make_tower( unsigned int nHeight, atomic_marked_ptr * nextTower )
68             {
69                 assert( nHeight > 0 );
70                 assert( (nHeight == 1 && nextTower == nullptr)  // bottom-list node
71                         || (nHeight > 1 && nextTower != nullptr)   // node at level of more than 0
72                     );
73
74                 m_arrNext = nextTower;
75                 m_nHeight = nHeight;
76             }
77
78             atomic_marked_ptr * release_tower()
79             {
80                 atomic_marked_ptr * pTower = m_arrNext;
81                 m_arrNext = nullptr;
82                 m_nHeight = 1;
83                 return pTower;
84             }
85
86             atomic_marked_ptr * get_tower() const
87             {
88                 return m_arrNext;
89             }
90
91             void clear_tower()
92             {
93                 for ( unsigned int nLevel = 1; nLevel < m_nHeight; ++nLevel )
94                     next(nLevel).store( marked_ptr(), atomics::memory_order_relaxed );
95             }
96
97             /// Access to element of next pointer array
98             atomic_marked_ptr& next( unsigned int nLevel )
99             {
100                 assert( nLevel < height() );
101                 assert( nLevel == 0 || (nLevel > 0 && m_arrNext != nullptr) );
102
103                 return nLevel ? m_arrNext[nLevel - 1] : m_pNext;
104             }
105
106             /// Access to element of next pointer array (const version)
107             atomic_marked_ptr const& next( unsigned int nLevel ) const
108             {
109                 assert( nLevel < height() );
110                 assert( nLevel == 0 || nLevel > 0 && m_arrNext != nullptr );
111
112                 return nLevel ? m_arrNext[nLevel - 1] : m_pNext;
113             }
114
115             /// Access to element of next pointer array (same as \ref next function)
116             atomic_marked_ptr& operator[]( unsigned int nLevel )
117             {
118                 return next( nLevel );
119             }
120
121             /// Access to element of next pointer array (same as \ref next function)
122             atomic_marked_ptr const& operator[]( unsigned int nLevel ) const
123             {
124                 return next( nLevel );
125             }
126
127             /// Height of the node
128             unsigned int height() const
129             {
130                 return m_nHeight;
131             }
132
133             /// Clears internal links
134             void clear()
135             {
136                 assert( m_arrNext == nullptr );
137                 m_pNext.store( marked_ptr(), atomics::memory_order_release );
138                 m_pDelChain = nullptr;
139             }
140
141             bool is_cleared() const
142             {
143                 return m_pNext == atomic_marked_ptr()
144                     && m_arrNext == nullptr
145                     && m_nHeight <= 1;
146             }
147         };
148     } // namespace skip_list
149     //@endcond
150
151     //@cond
152     namespace skip_list { namespace details {
153
154         template <class RCU, typename NodeTraits, typename BackOff, bool IsConst>
155         class iterator< cds::urcu::gc< RCU >, NodeTraits, BackOff, IsConst >
156         {
157         public:
158             typedef cds::urcu::gc< RCU >                gc;
159             typedef NodeTraits                          node_traits;
160             typedef BackOff                             back_off;
161             typedef typename node_traits::node_type     node_type;
162             typedef typename node_traits::value_type    value_type;
163             static bool const c_isConst = IsConst;
164
165             typedef typename std::conditional< c_isConst, value_type const &, value_type &>::type   value_ref;
166
167         protected:
168             typedef typename node_type::marked_ptr          marked_ptr;
169             typedef typename node_type::atomic_marked_ptr   atomic_marked_ptr;
170
171             node_type *             m_pNode;
172
173         protected:
174             void next()
175             {
176                 // RCU should be locked before iterating!!!
177                 assert( gc::is_locked() );
178
179                 back_off bkoff;
180
181                 for (;;) {
182                     if ( m_pNode->next( m_pNode->height() - 1 ).load( atomics::memory_order_acquire ).bits() ) {
183                         // Current node is marked as deleted. So, its next pointer can point to anything
184                         // In this case we interrupt our iteration and returns end() iterator.
185                         *this = iterator();
186                         return;
187                     }
188
189                     marked_ptr p = m_pNode->next(0).load( atomics::memory_order_relaxed );
190                     node_type * pp = p.ptr();
191                     if ( p.bits() ) {
192                         // p is marked as deleted. Spin waiting for physical removal
193                         bkoff();
194                         continue;
195                     }
196                     else if ( pp && pp->next( pp->height() - 1 ).load( atomics::memory_order_relaxed ).bits() ) {
197                         // p is marked as deleted. Spin waiting for physical removal
198                         bkoff();
199                         continue;
200                     }
201
202                     m_pNode = pp;
203                     break;
204                 }
205             }
206
207         public: // for internal use only!!!
208             iterator( node_type& refHead )
209                 : m_pNode( nullptr )
210             {
211                 // RCU should be locked before iterating!!!
212                 assert( gc::is_locked() );
213
214                 back_off bkoff;
215
216                 for (;;) {
217                     marked_ptr p = refHead.next(0).load( atomics::memory_order_relaxed );
218                     if ( !p.ptr() ) {
219                         // empty skip-list
220                         break;
221                     }
222
223                     node_type * pp = p.ptr();
224                     // Logically deleted node is marked from highest level
225                     if ( !pp->next( pp->height() - 1 ).load( atomics::memory_order_acquire ).bits() ) {
226                         m_pNode = pp;
227                         break;
228                     }
229
230                     bkoff();
231                 }
232             }
233
234         public:
235             iterator()
236                 : m_pNode( nullptr )
237             {
238                 // RCU should be locked before iterating!!!
239                 assert( gc::is_locked() );
240             }
241
242             iterator( iterator const& s)
243                 : m_pNode( s.m_pNode )
244             {
245                 // RCU should be locked before iterating!!!
246                 assert( gc::is_locked() );
247             }
248
249             value_type * operator ->() const
250             {
251                 assert( m_pNode != nullptr );
252                 assert( node_traits::to_value_ptr( m_pNode ) != nullptr );
253
254                 return node_traits::to_value_ptr( m_pNode );
255             }
256
257             value_ref operator *() const
258             {
259                 assert( m_pNode != nullptr );
260                 assert( node_traits::to_value_ptr( m_pNode ) != nullptr );
261
262                 return *node_traits::to_value_ptr( m_pNode );
263             }
264
265             /// Pre-increment
266             iterator& operator ++()
267             {
268                 next();
269                 return *this;
270             }
271
272             iterator& operator = (const iterator& src)
273             {
274                 m_pNode = src.m_pNode;
275                 return *this;
276             }
277
278             template <typename Bkoff, bool C>
279             bool operator ==(iterator<gc, node_traits, Bkoff, C> const& i ) const
280             {
281                 return m_pNode == i.m_pNode;
282             }
283             template <typename Bkoff, bool C>
284             bool operator !=(iterator<gc, node_traits, Bkoff, C> const& i ) const
285             {
286                 return !( *this == i );
287             }
288         };
289     }}  // namespace skip_list::details
290     //@endcond
291
292     /// Lock-free skip-list set (template specialization for \ref cds_urcu_desc "RCU")
293     /** @ingroup cds_intrusive_map
294         @anchor cds_intrusive_SkipListSet_rcu
295
296         The implementation of well-known probabilistic data structure called skip-list
297         invented by W.Pugh in his papers:
298             - [1989] W.Pugh Skip Lists: A Probabilistic Alternative to Balanced Trees
299             - [1990] W.Pugh A Skip List Cookbook
300
301         A skip-list is a probabilistic data structure that provides expected logarithmic
302         time search without the need of rebalance. The skip-list is a collection of sorted
303         linked list. Nodes are ordered by key. Each node is linked into a subset of the lists.
304         Each list has a level, ranging from 0 to 32. The bottom-level list contains
305         all the nodes, and each higher-level list is a sublist of the lower-level lists.
306         Each node is created with a random top level (with a random height), and belongs
307         to all lists up to that level. The probability that a node has the height 1 is 1/2.
308         The probability that a node has the height N is 1/2 ** N (more precisely,
309         the distribution depends on an random generator provided, but our generators
310         have this property).
311
312         The lock-free variant of skip-list is implemented according to book
313             - [2008] M.Herlihy, N.Shavit "The Art of Multiprocessor Programming",
314                 chapter 14.4 "A Lock-Free Concurrent Skiplist".
315
316         <b>Template arguments</b>:
317             - \p RCU - one of \ref cds_urcu_gc "RCU type"
318             - \p T - type to be stored in the list. The type must be based on \p skip_list::node (for \p skip_list::base_hook)
319                 or it must have a member of type \p skip_list::node (for \p skip_list::member_hook).
320             - \p Traits - set traits, default is \p skip_list::traits
321                 It is possible to declare option-based list with \p cds::intrusive::skip_list::make_traits metafunction 
322                 instead of \p Traits template argument.
323
324         @note Before including <tt><cds/intrusive/skip_list_rcu.h></tt> you should include appropriate RCU header file,
325         see \ref cds_urcu_gc "RCU type" for list of existing RCU class and corresponding header files.
326
327         <b>Iterators</b>
328
329         The class supports a forward iterator (\ref iterator and \ref const_iterator).
330         The iteration is ordered.
331
332         You may iterate over skip-list set items only under RCU lock.
333         Only in this case the iterator is thread-safe since
334         while RCU is locked any set's item cannot be reclaimed.
335
336         @note The requirement of RCU lock during iterating means that any type of modification of the skip list
337         (i.e. inserting, erasing and so on) is not possible.
338
339         @warning The iterator object cannot be passed between threads.
340
341         Example how to use skip-list set iterators:
342         \code
343         // First, you should include the header for RCU type you have chosen
344         #include <cds/urcu/general_buffered.h>
345         #include <cds/intrusive/skip_list_rcu.h>
346
347         typedef cds::urcu::gc< cds::urcu::general_buffered<> > rcu_type;
348
349         struct Foo {
350             // ...
351         };
352
353         // Traits for your skip-list.
354         // At least, you should define cds::opt::less or cds::opt::compare for Foo struct
355         struct my_traits: public cds::intrusive::skip_list::traits
356         {
357             // ...
358         };
359         typedef cds::intrusive::SkipListSet< rcu_type, Foo, my_traits > my_skiplist_set;
360
361         my_skiplist_set theSet;
362
363         // ...
364
365         // Begin iteration
366         {
367             // Apply RCU locking manually
368             typename rcu_type::scoped_lock sl;
369
370             for ( auto it = theList.begin(); it != theList.end(); ++it ) {
371                 // ...
372             }
373
374             // rcu_type::scoped_lock destructor releases RCU lock implicitly
375         }
376         \endcode
377
378         The iterator class supports the following minimalistic interface:
379         \code
380         struct iterator {
381             // Default ctor
382             iterator();
383
384             // Copy ctor
385             iterator( iterator const& s);
386
387             value_type * operator ->() const;
388             value_type& operator *() const;
389
390             // Pre-increment
391             iterator& operator ++();
392
393             // Copy assignment
394             iterator& operator = (const iterator& src);
395
396             bool operator ==(iterator const& i ) const;
397             bool operator !=(iterator const& i ) const;
398         };
399         \endcode
400         Note, the iterator object returned by \ref end, \p cend member functions points to \p nullptr and should not be dereferenced.
401
402         <b>How to use</b>
403
404         You should incorporate skip_list::node into your struct \p T and provide
405         appropriate skip_list::traits::hook in your \p Traits template parameters. Usually, for \p Traits you
406         define a struct based on \p skip_list::traits.
407
408         Example for <tt>cds::urcu::general_buffered<></tt> RCU and base hook:
409         \code
410         // First, you should include the header for RCU type you have chosen
411         #include <cds/urcu/general_buffered.h>
412
413         // Include RCU skip-list specialization
414         #include <cds/intrusive/skip_list_rcu.h>
415
416         // RCU type typedef
417         typedef cds::urcu::gc< cds::urcu::general_buffered<> > rcu_type;
418
419         // Data stored in skip list
420         struct my_data: public cds::intrusive::skip_list::node< rcu_type >
421         {
422             // key field
423             std::string     strKey;
424
425             // other data
426             // ...
427         };
428
429         // my_data compare functor
430         struct my_data_cmp {
431             int operator()( const my_data& d1, const my_data& d2 )
432             {
433                 return d1.strKey.compare( d2.strKey );
434             }
435
436             int operator()( const my_data& d, const std::string& s )
437             {
438                 return d.strKey.compare(s);
439             }
440
441             int operator()( const std::string& s, const my_data& d )
442             {
443                 return s.compare( d.strKey );
444             }
445         };
446
447         // Declare traits
448         struct my_traits: public cds::intrusive::skip_list::traits
449         {
450             typedef cds::intrusive::skip_list::base_hook< cds::opt::gc< rcu_type > >   hook;
451             typedef my_data_cmp compare;
452         };
453
454         // Declare skip-list set type
455         typedef cds::intrusive::SkipListSet< rcu_type, my_data, my_traits >     traits_based_set;
456         \endcode
457
458         Equivalent option-based code:
459         \code
460         #include <cds/urcu/general_buffered.h>
461         #include <cds/intrusive/skip_list_rcu.h>
462
463         typedef cds::urcu::gc< cds::urcu::general_buffered<> > rcu_type;
464
465         struct my_data {
466             // see above
467         };
468         struct compare {
469             // see above
470         };
471
472         // Declare option-based skip-list set
473         typedef cds::intrusive::SkipListSet< rcu_type
474             ,my_data
475             , typename cds::intrusive::skip_list::make_traits<
476                 cds::intrusive::opt::hook< cds::intrusive::skip_list::base_hook< cds::opt::gc< rcu_type > > >
477                 ,cds::intrusive::opt::compare< my_data_cmp >
478             >::type
479         > option_based_set;
480
481         \endcode
482     */
483     template <
484         class RCU
485        ,typename T
486 #ifdef CDS_DOXYGEN_INVOKED
487        ,typename Traits = skip_list::traits
488 #else
489        ,typename Traits
490 #endif
491     >
492     class SkipListSet< cds::urcu::gc< RCU >, T, Traits >
493     {
494     public:
495         typedef cds::urcu::gc< RCU > gc; ///< Garbage collector
496         typedef T       value_type;      ///< type of value stored in the skip-list
497         typedef Traits  traits;          ///< Traits template parameter
498
499         typedef typename traits::hook    hook;      ///< hook type
500         typedef typename hook::node_type node_type; ///< node type
501
502 #   ifdef CDS_DOXYGEN_INVOKED
503         typedef implementation_defined key_comparator  ;    ///< key comparison functor based on \p Traits::compare and \p Traits::less
504 #   else
505         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
506 #   endif
507
508         typedef typename traits::disposer  disposer;   ///< disposer
509         typedef typename get_node_traits< value_type, node_type, hook>::type node_traits;    ///< node traits
510
511         typedef typename traits::item_counter  item_counter;   ///< Item counting policy used
512         typedef typename traits::memory_model  memory_model;   ///< Memory ordering, see \p cds::opt::memory_model option
513         typedef typename traits::random_level_generator    random_level_generator;   ///< random level generator
514         typedef typename traits::allocator     allocator_type; ///< allocator for maintaining array of next pointers of the node
515         typedef typename traits::back_off      back_off;       ///< Back-off strategy
516         typedef typename traits::stat          stat;           ///< internal statistics type
517         typedef typename traits::rcu_check_deadlock rcu_check_deadlock; ///< Deadlock checking policy
518         typedef typename gc::scoped_lock       rcu_lock;      ///< RCU scoped lock
519         static CDS_CONSTEXPR const bool c_bExtractLockExternal = false; ///< Group of \p extract_xxx functions does not require external locking
520
521
522         /// Max node height. The actual node height should be in range <tt>[0 .. c_nMaxHeight)</tt>
523         /**
524             The max height is specified by \ref skip_list::random_level_generator "random level generator" constant \p m_nUpperBound
525             but it should be no more than 32 (\ref skip_list::c_nHeightLimit).
526         */
527         static unsigned int const c_nMaxHeight = std::conditional<
528             (random_level_generator::c_nUpperBound <= skip_list::c_nHeightLimit),
529             std::integral_constant< unsigned int, random_level_generator::c_nUpperBound >,
530             std::integral_constant< unsigned int, skip_list::c_nHeightLimit >
531         >::type::value;
532
533         //@cond
534         static unsigned int const c_nMinHeight = 5;
535         //@endcond
536
537     protected:
538         typedef typename node_type::atomic_marked_ptr   atomic_node_ptr ;   ///< Atomic marked node pointer
539         typedef typename node_type::marked_ptr          marked_node_ptr ;   ///< Node marked pointer
540
541     protected:
542         //@cond
543         typedef skip_list::details::intrusive_node_builder< node_type, atomic_node_ptr, allocator_type > intrusive_node_builder;
544
545         typedef typename std::conditional<
546             std::is_same< typename traits::internal_node_builder, cds::opt::none >::value
547             ,intrusive_node_builder
548             ,typename traits::internal_node_builder
549         >::type node_builder;
550
551         typedef std::unique_ptr< node_type, typename node_builder::node_disposer >    scoped_node_ptr;
552
553         struct position {
554             node_type *   pPrev[ c_nMaxHeight ];
555             node_type *   pSucc[ c_nMaxHeight ];
556             node_type *   pNext[ c_nMaxHeight ];
557
558             node_type *   pCur;
559             node_type *   pDelChain;
560
561             position()
562                 : pDelChain( nullptr )
563             {}
564 #       ifdef _DEBUG
565             ~position()
566             {
567                 assert( pDelChain == nullptr );
568             }
569 #       endif
570         };
571
572         typedef cds::urcu::details::check_deadlock_policy< gc, rcu_check_deadlock>   check_deadlock_policy;
573         //@endcond
574
575     protected:
576         skip_list::details::head_node< node_type > m_Head;   ///< head tower (max height)
577
578         item_counter                m_ItemCounter;      ///< item counter
579         random_level_generator      m_RandomLevelGen;   ///< random level generator instance
580         atomics::atomic<unsigned int>    m_nHeight;     ///< estimated high level
581         atomics::atomic<node_type *>     m_pDeferredDelChain ;   ///< Deferred deleted node chain
582         mutable stat                m_Stat;             ///< internal statistics
583
584     protected:
585         //@cond
586         unsigned int random_level()
587         {
588             // Random generator produces a number from range [0..31]
589             // We need a number from range [1..32]
590             return m_RandomLevelGen() + 1;
591         }
592
593         template <typename Q>
594         node_type * build_node( Q v )
595         {
596             return node_builder::make_tower( v, m_RandomLevelGen );
597         }
598
599         static void dispose_node( value_type * pVal )
600         {
601             assert( pVal );
602
603             typename node_builder::node_disposer()( node_traits::to_node_ptr(pVal) );
604             disposer()( pVal );
605         }
606
607         struct node_disposer
608         {
609             void operator()( value_type * pVal )
610             {
611                 dispose_node( pVal );
612             }
613         };
614         //@endcond
615
616     public:
617         using exempt_ptr = cds::urcu::exempt_ptr< gc, value_type, value_type, node_disposer, void >; ///< pointer to extracted node
618
619     protected:
620         //@cond
621
622         bool is_extracted( marked_node_ptr const p ) const
623         {
624             return (p.bits() & 2) != 0;
625         }
626
627         template <typename Q, typename Compare >
628         bool find_position( Q const& val, position& pos, Compare cmp, bool bStopIfFound )
629         {
630             assert( gc::is_locked() );
631
632             node_type * pPred;
633             marked_node_ptr pSucc;
634             marked_node_ptr pCur;
635             int nCmp = 1;
636
637         retry:
638             pPred = m_Head.head();
639
640             for ( int nLevel = static_cast<int>(c_nMaxHeight - 1); nLevel >= 0; --nLevel ) {
641
642                 while ( true ) {
643                     pCur = pPred->next( nLevel ).load( memory_model::memory_order_relaxed );
644                     if ( pCur.bits() ) {
645                         // pCur.bits() means that pPred is logically deleted
646                         goto retry;
647                     }
648
649                     if ( pCur.ptr() == nullptr ) {
650                         // end of the list at level nLevel - goto next level
651                         break;
652                     }
653
654                     // pSucc contains deletion mark for pCur
655                     pSucc = pCur->next( nLevel ).load( memory_model::memory_order_relaxed );
656
657                     if ( pPred->next( nLevel ).load( memory_model::memory_order_relaxed ).all() != pCur.ptr() )
658                         goto retry;
659
660                     if ( pSucc.bits() ) {
661                         // pCur is marked, i.e. logically deleted.
662                         marked_node_ptr p( pCur.ptr() );
663                         if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ),
664                              memory_model::memory_order_release, atomics::memory_order_relaxed ))
665                         {
666                             if ( nLevel == 0 ) {
667 #                       ifdef _DEBUG
668                                 pCur->m_bUnlinked = true;
669 #                       endif
670
671                                 if ( !is_extracted( pSucc )) {
672                                     // We cannot free the node at this moment since RCU is locked
673                                     // Link deleted nodes to a chain to free later
674                                     link_for_remove( pos, pCur.ptr() );
675                                     m_Stat.onEraseWhileFind();
676                                 }
677                                 else {
678                                     m_Stat.onExtractWhileFind();
679                                 }
680                             }
681                         }
682                         goto retry;
683                     }
684                     else {
685                         nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr()), val );
686                         if ( nCmp < 0 )
687                             pPred = pCur.ptr();
688                         else if ( nCmp == 0 && bStopIfFound )
689                             goto found;
690                         else
691                             break;
692                     }
693                 }
694
695                 // Next level
696                 pos.pPrev[ nLevel ] = pPred;
697                 pos.pSucc[ nLevel ] = pCur.ptr();
698             }
699
700             if ( nCmp != 0 )
701                 return false;
702
703         found:
704             pos.pCur = pCur.ptr();
705             return pCur.ptr() && nCmp == 0;
706         }
707
708         bool find_min_position( position& pos )
709         {
710             assert( gc::is_locked() );
711
712             node_type * pPred;
713             marked_node_ptr pSucc;
714             marked_node_ptr pCur;
715
716         retry:
717             pPred = m_Head.head();
718
719             for ( int nLevel = static_cast<int>(c_nMaxHeight - 1); nLevel >= 0; --nLevel ) {
720
721                 pCur = pPred->next( nLevel ).load( memory_model::memory_order_relaxed );
722                 // pCur.bits() means that pPred is logically deleted
723                 // head cannot be deleted
724                 assert( pCur.bits() == 0 );
725
726                 if ( pCur.ptr() ) {
727
728                     // pSucc contains deletion mark for pCur
729                     pSucc = pCur->next( nLevel ).load( memory_model::memory_order_relaxed );
730
731                     if ( pPred->next( nLevel ).load( memory_model::memory_order_relaxed ).all() != pCur.ptr() )
732                         goto retry;
733
734                     if ( pSucc.bits() ) {
735                         // pCur is marked, i.e. logically deleted.
736                         marked_node_ptr p( pCur.ptr() );
737                         if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ),
738                             memory_model::memory_order_release, atomics::memory_order_relaxed ))
739                         {
740                             if ( nLevel == 0 ) {
741 #                       ifdef _DEBUG
742                                 pCur->m_bUnlinked = true;
743 #                       endif
744
745                                 if ( !is_extracted( pSucc )) {
746                                     // We cannot free the node at this moment since RCU is locked
747                                     // Link deleted nodes to a chain to free later
748                                     link_for_remove( pos, pCur.ptr() );
749                                     m_Stat.onEraseWhileFind();
750                                 }
751                                 else {
752                                     m_Stat.onExtractWhileFind();
753                                 }
754                             }
755                         }
756                         goto retry;
757                     }
758                 }
759
760                 // Next level
761                 pos.pPrev[ nLevel ] = pPred;
762                 pos.pSucc[ nLevel ] = pCur.ptr();
763             }
764             return (pos.pCur = pCur.ptr()) != nullptr;
765         }
766
767         bool find_max_position( position& pos )
768         {
769             assert( gc::is_locked() );
770
771             node_type * pPred;
772             marked_node_ptr pSucc;
773             marked_node_ptr pCur;
774
775 retry:
776             pPred = m_Head.head();
777
778             for ( int nLevel = static_cast<int>(c_nMaxHeight - 1); nLevel >= 0; --nLevel ) {
779
780                 while ( true ) {
781                     pCur = pPred->next( nLevel ).load( memory_model::memory_order_relaxed );
782                     if ( pCur.bits() ) {
783                         // pCur.bits() means that pPred is logically deleted
784                         goto retry;
785                     }
786
787                     if ( pCur.ptr() == nullptr ) {
788                         // end of the list at level nLevel - goto next level
789                         break;
790                     }
791
792                     // pSucc contains deletion mark for pCur
793                     pSucc = pCur->next( nLevel ).load( memory_model::memory_order_relaxed );
794
795                     if ( pPred->next( nLevel ).load( memory_model::memory_order_relaxed ).all() != pCur.ptr() )
796                         goto retry;
797
798                     if ( pSucc.bits() ) {
799                         // pCur is marked, i.e. logically deleted.
800                         marked_node_ptr p( pCur.ptr() );
801                         if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ),
802                             memory_model::memory_order_release, atomics::memory_order_relaxed ))
803                         {
804                             if ( nLevel == 0 ) {
805 #                       ifdef _DEBUG
806                                 pCur->m_bUnlinked = true;
807 #                       endif
808
809                                 if ( !is_extracted( pSucc )) {
810                                     // We cannot free the node at this moment since RCU is locked
811                                     // Link deleted nodes to a chain to free later
812                                     link_for_remove( pos, pCur.ptr() );
813                                     m_Stat.onEraseWhileFind();
814                                 }
815                                 else {
816                                     m_Stat.onExtractWhileFind();
817                                 }
818                             }
819                         }
820                         goto retry;
821                     }
822                     else {
823                         if ( !pSucc.ptr() )
824                             break;
825
826                         pPred = pCur.ptr();
827                     }
828                 }
829
830                 // Next level
831                 pos.pPrev[ nLevel ] = pPred;
832                 pos.pSucc[ nLevel ] = pCur.ptr();
833             }
834
835             return (pos.pCur = pCur.ptr()) != nullptr;
836         }
837
838         template <typename Func>
839         bool insert_at_position( value_type& val, node_type * pNode, position& pos, Func f )
840         {
841             assert( gc::is_locked() );
842
843             unsigned int nHeight = pNode->height();
844             pNode->clear_tower();
845
846             {
847                 marked_node_ptr p( pos.pSucc[0] );
848                 pNode->next( 0 ).store( p, memory_model::memory_order_release );
849                 if ( !pos.pPrev[0]->next(0).compare_exchange_strong( p, marked_node_ptr(pNode), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
850                     return false;
851                 }
852 #       ifdef _DEBUG
853                 pNode->m_bLinked = true;
854 #       endif
855                 f( val );
856             }
857
858             for ( unsigned int nLevel = 1; nLevel < nHeight; ++nLevel ) {
859                 marked_node_ptr p;
860                 while ( true ) {
861                     marked_node_ptr q( pos.pSucc[ nLevel ]);
862                     if ( !pNode->next( nLevel ).compare_exchange_strong( p, q, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
863                         // pNode has been marked as removed while we are inserting it
864                         // Stop inserting
865                         assert( p.bits() );
866                         m_Stat.onLogicDeleteWhileInsert();
867                         return true;
868                     }
869                     p = q;
870                     if ( pos.pPrev[nLevel]->next(nLevel).compare_exchange_strong( q, marked_node_ptr( pNode ), memory_model::memory_order_release, atomics::memory_order_relaxed ) )
871                         break;
872
873                     // Renew insert position
874                     m_Stat.onRenewInsertPosition();
875                     if ( !find_position( val, pos, key_comparator(), false )) {
876                         // The node has been deleted while we are inserting it
877                         m_Stat.onNotFoundWhileInsert();
878                         return true;
879                     }
880                 }
881             }
882             return true;
883         }
884
885         static void link_for_remove( position& pos, node_type * pDel )
886         {
887             assert( pDel->m_pDelChain == nullptr );
888
889             pDel->m_pDelChain = pos.pDelChain;
890             pos.pDelChain = pDel;
891         }
892
893         template <typename Func>
894         bool try_remove_at( node_type * pDel, position& pos, Func f, bool bExtract )
895         {
896             assert( pDel != nullptr );
897             assert( gc::is_locked() );
898
899             marked_node_ptr pSucc;
900
901             // logical deletion (marking)
902             for ( unsigned int nLevel = pDel->height() - 1; nLevel > 0; --nLevel ) {
903                 pSucc = pDel->next(nLevel).load( memory_model::memory_order_relaxed );
904                 while ( true ) {
905                     if ( pSucc.bits()
906                       || pDel->next(nLevel).compare_exchange_weak( pSucc, pSucc | 1, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
907                     {
908                         break;
909                     }
910                 }
911             }
912
913             pSucc = pDel->next(0).load( memory_model::memory_order_relaxed );
914             while ( true ) {
915                 if ( pSucc.bits() )
916                     return false;
917
918                 int const nMask = bExtract ? 3 : 1;
919                 if ( pDel->next(0).compare_exchange_strong( pSucc, pSucc | nMask, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
920                 {
921                     f( *node_traits::to_value_ptr( pDel ));
922
923                     // physical deletion
924                     // try fast erase
925                     pSucc = pDel;
926                     for ( int nLevel = static_cast<int>( pDel->height() - 1 ); nLevel >= 0; --nLevel ) {
927                         if ( !pos.pPrev[nLevel]->next(nLevel).compare_exchange_strong( pSucc,
928                             marked_node_ptr( pDel->next(nLevel).load(memory_model::memory_order_relaxed).ptr() ),
929                             memory_model::memory_order_release, atomics::memory_order_relaxed) )
930                         {
931                             // Do slow erase
932                             find_position( *node_traits::to_value_ptr(pDel), pos, key_comparator(), false );
933                             if ( bExtract )
934                                 m_Stat.onSlowExtract();
935                             else
936                                 m_Stat.onSlowErase();
937 #                        ifdef _DEBUG
938                             assert( pDel->m_bUnlinked );
939 #                        endif
940                             return true;
941                         }
942                     }
943
944 #               ifdef _DEBUG
945                     pDel->m_bUnlinked = true;
946 #               endif
947                     if ( !bExtract ) {
948                         // We cannot free the node at this moment since RCU is locked
949                         // Link deleted nodes to a chain to free later
950                         link_for_remove( pos, pDel );
951                         m_Stat.onFastErase();
952                     }
953                     else
954                         m_Stat.onFastExtract();
955
956                     return true;
957                 }
958             }
959         }
960
961         enum finsd_fastpath_result {
962             find_fastpath_found,
963             find_fastpath_not_found,
964             find_fastpath_abort
965         };
966         template <typename Q, typename Compare, typename Func>
967         finsd_fastpath_result find_fastpath( Q& val, Compare cmp, Func f ) const
968         {
969             node_type * pPred;
970             marked_node_ptr pCur;
971             marked_node_ptr pSucc;
972             marked_node_ptr pNull;
973
974             back_off bkoff;
975
976             pPred = m_Head.head();
977             for ( int nLevel = static_cast<int>(m_nHeight.load(memory_model::memory_order_relaxed) - 1); nLevel >= 0; --nLevel ) {
978                 pCur = pPred->next(nLevel).load( memory_model::memory_order_acquire );
979                 if ( pCur == pNull )
980                     continue;
981
982                 while ( pCur != pNull ) {
983                     if ( pCur.bits() ) {
984                         // Wait until pCur is removed
985                         unsigned int nAttempt = 0;
986                         while ( pCur.bits() && nAttempt++ < 16 ) {
987                             bkoff();
988                             pCur = pPred->next(nLevel).load( memory_model::memory_order_acquire );
989                         }
990                         bkoff.reset();
991
992                         if ( pCur.bits() ) {
993                             // Maybe, we are on deleted node sequence
994                             // Abort searching, try slow-path
995                             return find_fastpath_abort;
996                         }
997                     }
998
999                     if ( pCur.ptr() ) {
1000                         int nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr() ), val );
1001                         if ( nCmp < 0 ) {
1002                             pPred = pCur.ptr();
1003                             pCur = pCur->next(nLevel).load( memory_model::memory_order_acquire );
1004                         }
1005                         else if ( nCmp == 0 ) {
1006                             // found
1007                             f( *node_traits::to_value_ptr( pCur.ptr() ), val );
1008                             return find_fastpath_found;
1009                         }
1010                         else // pCur > val - go down
1011                             break;
1012                     }
1013                 }
1014             }
1015
1016             return find_fastpath_not_found;
1017         }
1018
1019         template <typename Q, typename Compare, typename Func>
1020         bool find_slowpath( Q& val, Compare cmp, Func f, position& pos )
1021         {
1022             if ( find_position( val, pos, cmp, true )) {
1023                 assert( cmp( *node_traits::to_value_ptr( pos.pCur ), val ) == 0 );
1024
1025                 f( *node_traits::to_value_ptr( pos.pCur ), val );
1026                 return true;
1027             }
1028             else
1029                 return false;
1030         }
1031
1032         template <typename Q, typename Compare, typename Func>
1033         bool do_find_with( Q& val, Compare cmp, Func f )
1034         {
1035             position pos;
1036             bool bRet;
1037
1038             rcu_lock l;
1039
1040             switch ( find_fastpath( val, cmp, f )) {
1041             case find_fastpath_found:
1042                 m_Stat.onFindFastSuccess();
1043                 return true;
1044             case find_fastpath_not_found:
1045                 m_Stat.onFindFastFailed();
1046                 return false;
1047             default:
1048                 break;
1049             }
1050
1051             if ( find_slowpath( val, cmp, f, pos )) {
1052                 m_Stat.onFindSlowSuccess();
1053                 bRet = true;
1054             }
1055             else {
1056                 m_Stat.onFindSlowFailed();
1057                 bRet = false;
1058             }
1059
1060             defer_chain( pos );
1061
1062             return bRet;
1063         }
1064
1065         template <typename Q, typename Compare, typename Func>
1066         bool do_erase( Q const& val, Compare cmp, Func f )
1067         {
1068             check_deadlock_policy::check();
1069
1070             position pos;
1071             bool bRet;
1072
1073             {
1074                 rcu_lock rcuLock;
1075
1076                 if ( !find_position( val, pos, cmp, false ) ) {
1077                     m_Stat.onEraseFailed();
1078                     bRet = false;
1079                 }
1080                 else {
1081                     node_type * pDel = pos.pCur;
1082                     assert( cmp( *node_traits::to_value_ptr( pDel ), val ) == 0 );
1083
1084                     unsigned int nHeight = pDel->height();
1085                     if ( try_remove_at( pDel, pos, f, false )) {
1086                         --m_ItemCounter;
1087                         m_Stat.onRemoveNode( nHeight );
1088                         m_Stat.onEraseSuccess();
1089                         bRet = true;
1090                     }
1091                     else {
1092                         m_Stat.onEraseFailed();
1093                         bRet = false;
1094                     }
1095                 }
1096             }
1097
1098             dispose_chain( pos );
1099             return bRet;
1100         }
1101
1102         template <typename Q, typename Compare>
1103         value_type * do_extract_key( Q const& key, Compare cmp )
1104         {
1105             // RCU should be locked!!!
1106             assert( gc::is_locked() );
1107
1108             position pos;
1109             node_type * pDel;
1110
1111             if ( !find_position( key, pos, cmp, false ) ) {
1112                 m_Stat.onExtractFailed();
1113                 pDel = nullptr;
1114             }
1115             else {
1116                 pDel = pos.pCur;
1117                 assert( cmp( *node_traits::to_value_ptr( pDel ), key ) == 0 );
1118
1119                 unsigned int const nHeight = pDel->height();
1120
1121                 if ( try_remove_at( pDel, pos, [](value_type const&) {}, true )) {
1122                     --m_ItemCounter;
1123                     m_Stat.onRemoveNode( nHeight );
1124                     m_Stat.onExtractSuccess();
1125                 }
1126                 else {
1127                     m_Stat.onExtractFailed();
1128                     pDel = nullptr;
1129                 }
1130             }
1131
1132             defer_chain( pos );
1133             return pDel ? node_traits::to_value_ptr( pDel ) : nullptr;
1134         }
1135
1136         template <typename Q>
1137         value_type * do_extract( Q const& key )
1138         {
1139             check_deadlock_policy::check();
1140             value_type * pDel = nullptr;
1141             {
1142                 rcu_lock l;
1143                 pDel = do_extract_key( key, key_comparator() );
1144             }
1145
1146             dispose_deferred();
1147             return pDel;
1148         }
1149
1150         template <typename Q, typename Less>
1151         value_type * do_extract_with( Q const& key, Less pred )
1152         {
1153             check_deadlock_policy::check();
1154             value_type * pDel = nullptr;
1155
1156             {
1157                 rcu_lock l;
1158                 pDel = do_extract_key( key, cds::opt::details::make_comparator_from_less<Less>() );
1159             }
1160
1161             dispose_deferred();
1162             return pDel;
1163         }
1164
1165         value_type * do_extract_min()
1166         {
1167             assert( !gc::is_locked() );
1168
1169             position pos;
1170             node_type * pDel;
1171
1172             {
1173                 rcu_lock l;
1174
1175                 if ( !find_min_position( pos ) ) {
1176                     m_Stat.onExtractMinFailed();
1177                     pDel = nullptr;
1178                 }
1179                 else {
1180                     pDel = pos.pCur;
1181                     unsigned int const nHeight = pDel->height();
1182
1183                     if ( try_remove_at( pDel, pos, []( value_type const& ) {}, true ) ) {
1184                         --m_ItemCounter;
1185                         m_Stat.onRemoveNode( nHeight );
1186                         m_Stat.onExtractMinSuccess();
1187                     }
1188                     else {
1189                         m_Stat.onExtractMinFailed();
1190                         pDel = nullptr;
1191                     }
1192                 }
1193
1194                 defer_chain( pos );
1195             }
1196
1197             dispose_deferred();
1198             return pDel ? node_traits::to_value_ptr( pDel ) : nullptr;
1199         }
1200
1201         value_type * do_extract_max()
1202         {
1203             assert( !gc::is_locked() );
1204
1205             position pos;
1206             node_type * pDel;
1207
1208             {
1209                 rcu_lock l;
1210
1211                 if ( !find_max_position( pos ) ) {
1212                     m_Stat.onExtractMaxFailed();
1213                     pDel = nullptr;
1214                 }
1215                 else {
1216                     pDel = pos.pCur;
1217                     unsigned int const nHeight = pDel->height();
1218
1219                     if ( try_remove_at( pDel, pos, []( value_type const& ) {}, true ) ) {
1220                         --m_ItemCounter;
1221                         m_Stat.onRemoveNode( nHeight );
1222                         m_Stat.onExtractMaxSuccess();
1223                     }
1224                     else {
1225                         m_Stat.onExtractMaxFailed();
1226                         pDel = nullptr;
1227                     }
1228                 }
1229
1230                 defer_chain( pos );
1231             }
1232
1233             dispose_deferred();
1234             return pDel ? node_traits::to_value_ptr( pDel ) : nullptr;
1235         }
1236
1237         void increase_height( unsigned int nHeight )
1238         {
1239             unsigned int nCur = m_nHeight.load( memory_model::memory_order_relaxed );
1240             if ( nCur < nHeight )
1241                 m_nHeight.compare_exchange_strong( nCur, nHeight, memory_model::memory_order_release, atomics::memory_order_relaxed );
1242         }
1243
1244         class deferred_list_iterator
1245         {
1246             node_type * pCur;
1247         public:
1248             explicit deferred_list_iterator( node_type * p )
1249                 : pCur(p)
1250             {}
1251             deferred_list_iterator()
1252                 : pCur( nullptr )
1253             {}
1254
1255             cds::urcu::retired_ptr operator *() const
1256             {
1257                 return cds::urcu::retired_ptr( node_traits::to_value_ptr(pCur), dispose_node );
1258             }
1259
1260             void operator ++()
1261             {
1262                 pCur = pCur->m_pDelChain;
1263             }
1264
1265             bool operator ==( deferred_list_iterator const& i ) const
1266             {
1267                 return pCur == i.pCur;
1268             }
1269             bool operator !=( deferred_list_iterator const& i ) const
1270             {
1271                 return !operator ==( i );
1272             }
1273         };
1274
1275         void dispose_chain( node_type * pHead )
1276         {
1277             // RCU should NOT be locked
1278             check_deadlock_policy::check();
1279
1280             gc::batch_retire( deferred_list_iterator( pHead ), deferred_list_iterator() );
1281         }
1282
1283         void dispose_chain( position& pos )
1284         {
1285             // RCU should NOT be locked
1286             check_deadlock_policy::check();
1287
1288             // Delete local chain
1289             if ( pos.pDelChain ) {
1290                 dispose_chain( pos.pDelChain );
1291                 pos.pDelChain = nullptr;
1292             }
1293
1294             // Delete deferred chain
1295             dispose_deferred();
1296         }
1297
1298         void dispose_deferred()
1299         {
1300             dispose_chain( m_pDeferredDelChain.exchange( nullptr, memory_model::memory_order_acq_rel ) );
1301         }
1302
1303         void defer_chain( position& pos )
1304         {
1305             if ( pos.pDelChain ) {
1306                 node_type * pHead = pos.pDelChain;
1307                 node_type * pTail = pHead;
1308                 while ( pTail->m_pDelChain )
1309                     pTail = pTail->m_pDelChain;
1310
1311                 node_type * pDeferList = m_pDeferredDelChain.load( memory_model::memory_order_relaxed );
1312                 do {
1313                     pTail->m_pDelChain = pDeferList;
1314                 } while ( !m_pDeferredDelChain.compare_exchange_weak( pDeferList, pHead, memory_model::memory_order_acq_rel, atomics::memory_order_relaxed ));
1315
1316                 pos.pDelChain = nullptr;
1317             }
1318         }
1319
1320         //@endcond
1321
1322     public:
1323         /// Default constructor
1324         SkipListSet()
1325             : m_Head( c_nMaxHeight )
1326             , m_nHeight( c_nMinHeight )
1327             , m_pDeferredDelChain( nullptr )
1328         {
1329             static_assert( (std::is_same< gc, typename node_type::gc >::value), "GC and node_type::gc must be the same type" );
1330
1331             // Barrier for head node
1332             atomics::atomic_thread_fence( memory_model::memory_order_release );
1333         }
1334
1335         /// Clears and destructs the skip-list
1336         ~SkipListSet()
1337         {
1338             clear();
1339         }
1340
1341     public:
1342         /// Iterator type
1343         typedef skip_list::details::iterator< gc, node_traits, back_off, false >  iterator;
1344
1345         /// Const iterator type
1346         typedef skip_list::details::iterator< gc, node_traits, back_off, true >   const_iterator;
1347
1348         /// Returns a forward iterator addressing the first element in a set
1349         iterator begin()
1350         {
1351             return iterator( *m_Head.head() );
1352         }
1353
1354         /// Returns a forward const iterator addressing the first element in a set
1355         const_iterator begin() const
1356         {
1357             return const_iterator( *m_Head.head() );
1358         }
1359
1360         /// Returns a forward const iterator addressing the first element in a set
1361         const_iterator cbegin() const
1362         {
1363             return const_iterator( *m_Head.head() );
1364         }
1365
1366         /// Returns a forward iterator that addresses the location succeeding the last element in a set.
1367         iterator end()
1368         {
1369             return iterator();
1370         }
1371
1372         /// Returns a forward const iterator that addresses the location succeeding the last element in a set.
1373         const_iterator end() const
1374         {
1375             return const_iterator();
1376         }
1377
1378         /// Returns a forward const iterator that addresses the location succeeding the last element in a set.
1379         const_iterator cend() const
1380         {
1381             return const_iterator();
1382         }
1383
1384     public:
1385         /// Inserts new node
1386         /**
1387             The function inserts \p val in the set if it does not contain
1388             an item with key equal to \p val.
1389
1390             The function applies RCU lock internally.
1391
1392             Returns \p true if \p val is placed into the set, \p false otherwise.
1393         */
1394         bool insert( value_type& val )
1395         {
1396             return insert( val, []( value_type& ) {} );
1397         }
1398
1399         /// Inserts new node
1400         /**
1401             This function is intended for derived non-intrusive containers.
1402
1403             The function allows to split creating of new item into two part:
1404             - create item with key only
1405             - insert new item into the set
1406             - if inserting is success, calls  \p f functor to initialize value-field of \p val.
1407
1408             The functor signature is:
1409             \code
1410                 void func( value_type& val );
1411             \endcode
1412             where \p val is the item inserted. User-defined functor \p f should guarantee that during changing
1413             \p val no any other changes could be made on this set's item by concurrent threads.
1414             The user-defined functor is called only if the inserting is success.
1415
1416             RCU \p synchronize method can be called. RCU should not be locked.
1417         */
1418         template <typename Func>
1419         bool insert( value_type& val, Func f )
1420         {
1421             check_deadlock_policy::check();
1422
1423             position pos;
1424             bool bRet;
1425
1426             {
1427                 node_type * pNode = node_traits::to_node_ptr( val );
1428                 scoped_node_ptr scp( pNode );
1429                 unsigned int nHeight = pNode->height();
1430                 bool bTowerOk = nHeight > 1 && pNode->get_tower() != nullptr;
1431                 bool bTowerMade = false;
1432
1433                 rcu_lock rcuLock;
1434
1435                 while ( true )
1436                 {
1437                     bool bFound = find_position( val, pos, key_comparator(), true );
1438                     if ( bFound ) {
1439                         // scoped_node_ptr deletes the node tower if we create it
1440                         if ( !bTowerMade )
1441                             scp.release();
1442
1443                         m_Stat.onInsertFailed();
1444                         bRet = false;
1445                         break;
1446                     }
1447
1448                     if ( !bTowerOk ) {
1449                         build_node( pNode );
1450                         nHeight = pNode->height();
1451                         bTowerMade =
1452                             bTowerOk = true;
1453                     }
1454
1455                     if ( !insert_at_position( val, pNode, pos, f )) {
1456                         m_Stat.onInsertRetry();
1457                         continue;
1458                     }
1459
1460                     increase_height( nHeight );
1461                     ++m_ItemCounter;
1462                     m_Stat.onAddNode( nHeight );
1463                     m_Stat.onInsertSuccess();
1464                     scp.release();
1465                     bRet =  true;
1466                     break;
1467                 }
1468             }
1469
1470             dispose_chain( pos );
1471
1472             return bRet;
1473         }
1474
1475         /// Ensures that the \p val exists in the set
1476         /**
1477             The operation performs inserting or changing data with lock-free manner.
1478
1479             If the item \p val is not found in the set, then \p val is inserted into the set.
1480             Otherwise, the functor \p func is called with item found.
1481             The functor signature is:
1482             \code
1483                 void func( bool bNew, value_type& item, value_type& val );
1484             \endcode
1485             with arguments:
1486             - \p bNew - \p true if the item has been inserted, \p false otherwise
1487             - \p item - item of the set
1488             - \p val - argument \p val passed into the \p %ensure() function
1489             If new item has been inserted (i.e. \p bNew is \p true) then \p item and \p val arguments
1490             refer to the same thing.
1491
1492             The functor can change non-key fields of the \p item; however, \p func must guarantee
1493             that during changing no any other modifications could be made on this item by concurrent threads.
1494
1495             RCU \p synchronize method can be called. RCU should not be locked.
1496
1497             Returns std::pair<bool, bool> where \p first is \p true if operation is successfull,
1498             \p second is \p true if new item has been added or \p false if the item with \p key
1499             already is in the set.
1500
1501             @warning See \ref cds_intrusive_item_creating "insert item troubleshooting"
1502         */
1503         template <typename Func>
1504         std::pair<bool, bool> ensure( value_type& val, Func func )
1505         {
1506             check_deadlock_policy::check();
1507
1508             position pos;
1509             std::pair<bool, bool> bRet( true, false );
1510
1511             {
1512                 node_type * pNode = node_traits::to_node_ptr( val );
1513                 scoped_node_ptr scp( pNode );
1514                 unsigned int nHeight = pNode->height();
1515                 bool bTowerOk = nHeight > 1 && pNode->get_tower() != nullptr;
1516                 bool bTowerMade = false;
1517
1518                 rcu_lock rcuLock;
1519                 while ( true )
1520                 {
1521                     bool bFound = find_position( val, pos, key_comparator(), true );
1522                     if ( bFound ) {
1523                         // scoped_node_ptr deletes the node tower if we create it before
1524                         if ( !bTowerMade )
1525                             scp.release();
1526
1527                         func( false, *node_traits::to_value_ptr(pos.pCur), val );
1528                         m_Stat.onEnsureExist();
1529                         break;
1530                     }
1531
1532                     if ( !bTowerOk ) {
1533                         build_node( pNode );
1534                         nHeight = pNode->height();
1535                         bTowerMade =
1536                             bTowerOk = true;
1537                     }
1538
1539                     if ( !insert_at_position( val, pNode, pos, [&func]( value_type& item ) { func( true, item, item ); })) {
1540                         m_Stat.onInsertRetry();
1541                         continue;
1542                     }
1543
1544                     increase_height( nHeight );
1545                     ++m_ItemCounter;
1546                     scp.release();
1547                     m_Stat.onAddNode( nHeight );
1548                     m_Stat.onEnsureNew();
1549                     bRet.second = true;
1550                     break;
1551                 }
1552             }
1553
1554             dispose_chain( pos );
1555
1556             return bRet;
1557         }
1558
1559         /// Unlinks the item \p val from the set
1560         /**
1561             The function searches the item \p val in the set and unlink it from the set
1562             if it is found and is equal to \p val.
1563
1564             Difference between \p erase() and \p %unlink() functions: \p erase() finds <i>a key</i>
1565             and deletes the item found. \p %unlink() searches an item by key and deletes it
1566             only if \p val is an item of that set, i.e. the pointer to item found
1567             is equal to <tt> &val </tt>.
1568
1569             RCU \p synchronize method can be called. RCU should not be locked.
1570
1571             The \ref disposer specified in \p Traits class template parameter is called
1572             by garbage collector \p GC asynchronously.
1573
1574             The function returns \p true if success and \p false otherwise.
1575         */
1576         bool unlink( value_type& val )
1577         {
1578             check_deadlock_policy::check();
1579
1580             position pos;
1581             bool bRet;
1582
1583             {
1584                 rcu_lock rcuLock;
1585
1586                 if ( !find_position( val, pos, key_comparator(), false ) ) {
1587                     m_Stat.onUnlinkFailed();
1588                     bRet = false;
1589                 }
1590                 else {
1591                     node_type * pDel = pos.pCur;
1592                     assert( key_comparator()( *node_traits::to_value_ptr( pDel ), val ) == 0 );
1593
1594                     unsigned int nHeight = pDel->height();
1595
1596                     if ( node_traits::to_value_ptr( pDel ) == &val && try_remove_at( pDel, pos, [](value_type const&) {}, false )) {
1597                         --m_ItemCounter;
1598                         m_Stat.onRemoveNode( nHeight );
1599                         m_Stat.onUnlinkSuccess();
1600                         bRet = true;
1601                     }
1602                     else {
1603                         m_Stat.onUnlinkFailed();
1604                         bRet = false;
1605                     }
1606                 }
1607             }
1608
1609             dispose_chain( pos );
1610
1611             return bRet;
1612         }
1613
1614         /// Extracts the item from the set with specified \p key
1615         /** \anchor cds_intrusive_SkipListSet_rcu_extract
1616             The function searches an item with key equal to \p key in the set,
1617             unlinks it from the set, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item found.
1618             If the item with key equal to \p key is not found the function returns an empty \p exempt_ptr.
1619
1620             Note the compare functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1621
1622             RCU \p synchronize method can be called. RCU should NOT be locked.
1623             The function does not call the disposer for the item found.
1624             The disposer will be implicitly invoked when the returned object is destroyed or when
1625             its \p release() member function is called.
1626             Example:
1627             \code
1628             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1629             skip_list theList;
1630             // ...
1631
1632             typename skip_list::exempt_ptr ep( theList.extract( 5 ));
1633             if ( ep ) {
1634                 // Deal with ep
1635                 //...
1636
1637                 // Dispose returned item.
1638                 ep.release();
1639             }
1640             \endcode
1641         */
1642         template <typename Q>
1643         exempt_ptr extract( Q const& key )
1644         {
1645             return exempt_ptr( do_extract( key ));
1646         }
1647
1648         /// Extracts the item from the set with comparing functor \p pred
1649         /**
1650             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_extract "extract(exempt_ptr&, Q const&)"
1651             but \p pred predicate is used for key comparing.
1652             \p Less has the interface like \p std::less.
1653             \p pred must imply the same element order as the comparator used for building the set.
1654         */
1655         template <typename Q, typename Less>
1656         exempt_ptr extract_with( Q const& key, Less pred )
1657         {
1658             return exempt_ptr( do_extract_with( key, pred ));
1659         }
1660
1661         /// Extracts an item with minimal key from the list
1662         /**
1663             The function searches an item with minimal key, unlinks it, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item.
1664             If the skip-list is empty the function returns an empty \p exempt_ptr.
1665
1666             RCU \p synchronize method can be called. RCU should NOT be locked.
1667             The function does not call the disposer for the item found.
1668             The disposer will be implicitly invoked when the returned object is destroyed or when
1669             its \p release() member function is manually called.
1670             Example:
1671             \code
1672             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1673             skip_list theList;
1674             // ...
1675
1676             typename skip_list::exempt_ptr ep(theList.extract_min());
1677             if ( ep ) {
1678                 // Deal with ep
1679                 //...
1680
1681                 // Dispose returned item.
1682                 ep.release();
1683             }
1684             \endcode
1685
1686             @note Due the concurrent nature of the list, the function extracts <i>nearly</i> minimum key.
1687             It means that the function gets leftmost item and tries to unlink it.
1688             During unlinking, a concurrent thread may insert an item with key less than leftmost item's key.
1689             So, the function returns the item with minimum key at the moment of list traversing.
1690         */
1691         exempt_ptr extract_min()
1692         {
1693             return exempt_ptr( do_extract_min());
1694         }
1695
1696         /// Extracts an item with maximal key from the list
1697         /**
1698             The function searches an item with maximal key, unlinks it, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item.
1699             If the skip-list is empty the function returns an empty \p exempt_ptr.
1700
1701             RCU \p synchronize method can be called. RCU should NOT be locked.
1702             The function does not call the disposer for the item found.
1703             The disposer will be implicitly invoked when the returned object is destroyed or when
1704             its \p release() member function is manually called.
1705             Example:
1706             \code
1707             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1708             skip_list theList;
1709             // ...
1710
1711             typename skip_list::exempt_ptr ep( theList.extract_max() );
1712             if ( ep ) {
1713                 // Deal with ep
1714                 //...
1715                 // Dispose returned item.
1716                 ep.release();
1717             }
1718             \endcode
1719
1720             @note Due the concurrent nature of the list, the function extracts <i>nearly</i> maximal key.
1721             It means that the function gets rightmost item and tries to unlink it.
1722             During unlinking, a concurrent thread can insert an item with key greater than rightmost item's key.
1723             So, the function returns the item with maximum key at the moment of list traversing.
1724         */
1725         exempt_ptr extract_max()
1726         {
1727             return exempt_ptr( do_extract_max());
1728         }
1729
1730         /// Deletes the item from the set
1731         /** \anchor cds_intrusive_SkipListSet_rcu_erase
1732             The function searches an item with key equal to \p key in the set,
1733             unlinks it from the set, and returns \p true.
1734             If the item with key equal to \p key is not found the function return \p false.
1735
1736             Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1737
1738             RCU \p synchronize method can be called. RCU should not be locked.
1739         */
1740         template <typename Q>
1741         bool erase( const Q& key )
1742         {
1743             return do_erase( key, key_comparator(), [](value_type const&) {} );
1744         }
1745
1746         /// Delete the item from the set with comparing functor \p pred
1747         /**
1748             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_erase "erase(Q const&)"
1749             but \p pred predicate is used for key comparing.
1750             \p Less has the interface like \p std::less.
1751             \p pred must imply the same element order as the comparator used for building the set.
1752         */
1753         template <typename Q, typename Less>
1754         bool erase_with( const Q& key, Less pred )
1755         {
1756             return do_erase( key, cds::opt::details::make_comparator_from_less<Less>(), [](value_type const&) {} );
1757         }
1758
1759         /// Deletes the item from the set
1760         /** \anchor cds_intrusive_SkipListSet_rcu_erase_func
1761             The function searches an item with key equal to \p key in the set,
1762             call \p f functor with item found, unlinks it from the set, and returns \p true.
1763             The \ref disposer specified in \p Traits class template parameter is called
1764             by garbage collector \p GC asynchronously.
1765
1766             The \p Func interface is
1767             \code
1768             struct functor {
1769                 void operator()( value_type const& item );
1770             };
1771             \endcode
1772             If the item with key equal to \p key is not found the function return \p false.
1773
1774             Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1775
1776             RCU \p synchronize method can be called. RCU should not be locked.
1777         */
1778         template <typename Q, typename Func>
1779         bool erase( Q const& key, Func f )
1780         {
1781             return do_erase( key, key_comparator(), f );
1782         }
1783
1784         /// Delete the item from the set with comparing functor \p pred
1785         /**
1786             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_erase_func "erase(Q const&, Func)"
1787             but \p pred predicate is used for key comparing.
1788             \p Less has the interface like \p std::less.
1789             \p pred must imply the same element order as the comparator used for building the set.
1790         */
1791         template <typename Q, typename Less, typename Func>
1792         bool erase_with( Q const& key, Less pred, Func f )
1793         {
1794             return do_erase( key, cds::opt::details::make_comparator_from_less<Less>(), f );
1795         }
1796
1797         /// Finds \p key
1798         /** @anchor cds_intrusive_SkipListSet_rcu_find_func
1799             The function searches the item with key equal to \p key and calls the functor \p f for item found.
1800             The interface of \p Func functor is:
1801             \code
1802             struct functor {
1803                 void operator()( value_type& item, Q& key );
1804             };
1805             \endcode
1806             where \p item is the item found, \p key is the <tt>find</tt> function argument.
1807
1808             The functor can change non-key fields of \p item. Note that the functor is only guarantee
1809             that \p item cannot be disposed during functor is executing.
1810             The functor does not serialize simultaneous access to the set \p item. If such access is
1811             possible you must provide your own synchronization schema on item level to exclude unsafe item modifications.
1812
1813             The \p key argument is non-const since it can be used as \p f functor destination i.e., the functor
1814             can modify both arguments.
1815
1816             The function applies RCU lock internally.
1817
1818             The function returns \p true if \p key is found, \p false otherwise.
1819         */
1820         template <typename Q, typename Func>
1821         bool find( Q& key, Func f )
1822         {
1823             return do_find_with( key, key_comparator(), f );
1824         }
1825         //@cond
1826         template <typename Q, typename Func>
1827         bool find( Q const& key, Func f )
1828         {
1829             return do_find_with( key, key_comparator(), f );
1830         }
1831         //@endcond
1832
1833         /// Finds the key \p key with comparing functor \p pred
1834         /**
1835             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_find_func "find(Q&, Func)"
1836             but \p cmp is used for key comparison.
1837             \p Less functor has the interface like \p std::less.
1838             \p cmp must imply the same element order as the comparator used for building the set.
1839         */
1840         template <typename Q, typename Less, typename Func>
1841         bool find_with( Q& key, Less pred, Func f )
1842         {
1843             return do_find_with( key, cds::opt::details::make_comparator_from_less<Less>(), f );
1844         }
1845         //@cond
1846         template <typename Q, typename Less, typename Func>
1847         bool find_with( Q const& key, Less pred, Func f )
1848         {
1849             return do_find_with( key, cds::opt::details::make_comparator_from_less<Less>(), f );
1850         }
1851         //@endcond
1852
1853         /// Finds \p key
1854         /** @anchor cds_intrusive_SkipListSet_rcu_find_val
1855             The function searches the item with key equal to \p key
1856             and returns \p true if it is found, and \p false otherwise.
1857
1858             The function applies RCU lock internally.
1859         */
1860         template <typename Q>
1861         bool find( Q const& key )
1862         {
1863             return do_find_with( key, key_comparator(), [](value_type& , Q const& ) {} );
1864         }
1865
1866         /// Finds \p key with comparing functor \p pred
1867         /**
1868             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_find_val "find(Q const&)"
1869             but \p pred is used for key compare.
1870             \p Less functor has the interface like \p std::less.
1871             \p pred must imply the same element order as the comparator used for building the set.
1872         */
1873         template <typename Q, typename Less>
1874         bool find_with( Q const& key, Less pred )
1875         {
1876             return do_find_with( key, cds::opt::details::make_comparator_from_less<Less>(), [](value_type& , Q const& ) {} );
1877         }
1878
1879         /// Finds \p key and return the item found
1880         /** \anchor cds_intrusive_SkipListSet_rcu_get
1881             The function searches the item with key equal to \p key and returns the pointer to item found.
1882             If \p key is not found it returns \p nullptr.
1883
1884             Note the compare functor should accept a parameter of type \p Q that can be not the same as \p value_type.
1885
1886             RCU should be locked before call of this function.
1887             Returned item is valid only while RCU is locked:
1888             \code
1889             typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list;
1890             skip_list theList;
1891             // ...
1892             {
1893                 // Lock RCU
1894                 skip_list::rcu_lock lock;
1895
1896                 foo * pVal = theList.get( 5 );
1897                 if ( pVal ) {
1898                     // Deal with pVal
1899                     //...
1900                 }
1901             }
1902             // Unlock RCU by rcu_lock destructor
1903             // pVal can be retired by disposer at any time after RCU has been unlocked
1904             \endcode
1905
1906             After RCU unlocking the \p %force_dispose member function can be called manually,
1907             see \ref force_dispose for explanation.
1908         */
1909         template <typename Q>
1910         value_type * get( Q const& key )
1911         {
1912             assert( gc::is_locked());
1913
1914             value_type * pFound;
1915             return do_find_with( key, key_comparator(), [&pFound](value_type& found, Q const& ) { pFound = &found; } )
1916                 ? pFound : nullptr;
1917         }
1918
1919         /// Finds \p key and return the item found
1920         /**
1921             The function is an analog of \ref cds_intrusive_SkipListSet_rcu_get "get(Q const&)"
1922             but \p pred is used for comparing the keys.
1923
1924             \p Less functor has the semantics like \p std::less but should take arguments of type \ref value_type and \p Q
1925             in any order.
1926             \p pred must imply the same element order as the comparator used for building the set.
1927         */
1928         template <typename Q, typename Less>
1929         value_type * get_with( Q const& key, Less pred )
1930         {
1931             assert( gc::is_locked());
1932
1933             value_type * pFound;
1934             return do_find_with( key, cds::opt::details::make_comparator_from_less<Less>(),
1935                 [&pFound](value_type& found, Q const& ) { pFound = &found; } )
1936                 ? pFound : nullptr;
1937         }
1938
1939         /// Returns item count in the set
1940         /**
1941             The value returned depends on item counter type provided by \p Traits template parameter.
1942             For \p atomicity::empty_item_counter the function always returns 0.
1943             Therefore, the function is not suitable for checking the set emptiness, use \p empty()
1944             member function for this purpose.
1945         */
1946         size_t size() const
1947         {
1948             return m_ItemCounter;
1949         }
1950
1951         /// Checks if the set is empty
1952         bool empty() const
1953         {
1954             return m_Head.head()->next( 0 ).load( memory_model::memory_order_relaxed ) == nullptr;
1955         }
1956
1957         /// Clears the set (not atomic)
1958         /**
1959             The function unlink all items from the set.
1960             The function is not atomic, thus, in multi-threaded environment with parallel insertions
1961             this sequence
1962             \code
1963             set.clear();
1964             assert( set.empty() );
1965             \endcode
1966             the assertion could be raised.
1967
1968             For each item the \p disposer will be called automatically after unlinking.
1969         */
1970         void clear()
1971         {
1972             exempt_ptr ep;
1973             while ( (ep = extract_min()) );
1974         }
1975
1976         /// Returns maximum height of skip-list. The max height is a constant for each object and does not exceed 32.
1977         static CDS_CONSTEXPR unsigned int max_height() CDS_NOEXCEPT
1978         {
1979             return c_nMaxHeight;
1980         }
1981
1982         /// Returns const reference to internal statistics
1983         stat const& statistics() const
1984         {
1985             return m_Stat;
1986         }
1987
1988         /// Clears internal list of ready-to-remove items passing it to RCU reclamation cycle
1989         /** @anchor cds_intrusive_SkipListSet_rcu_force_dispose
1990             Skip list has complex multi-step algorithm for removing an item. In fact, when you
1991             remove the item it is just marked as removed that is enough for the success of your operation.
1992             Actual removing can take place in the future, in another call or even in another thread.
1993             Inside RCU lock the removed item cannot be passed to RCU reclamation cycle
1994             since it can lead to deadlock. To solve this problem, the current skip list implementation
1995             has internal list of items which is ready to remove but is not yet passed to RCU reclamation.
1996             Usually, this list will be passed to RCU reclamation in the next suitable call of skip list member function.
1997             In some cases we want to pass it to RCU reclamation immediately after RCU unlocking.
1998             This function provides such opportunity: it checks whether the RCU is not locked and if it is true
1999             the function passes the internal ready-to-remove list to RCU reclamation cycle.
2000
2001             The RCU \p synchronize can be called.
2002         */
2003         void force_dispose()
2004         {
2005             if ( !gc::is_locked() )
2006                 dispose_deferred();
2007         }
2008     };
2009
2010 }} // namespace cds::intrusive
2011
2012
2013 #endif // #ifndef __CDS_INTRUSIVE_SKIP_LIST_RCU_H