Fixed TSan warnings in SplitList
[libcds.git] / cds / intrusive / split_list_rcu.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_INTRUSIVE_SPLIT_LIST_RCU_H
4 #define CDSLIB_INTRUSIVE_SPLIT_LIST_RCU_H
5
6 #include <limits>
7
8 #include <cds/intrusive/details/split_list_base.h>
9 #include <cds/details/binary_functor_wrapper.h>
10
11 namespace cds { namespace intrusive {
12
13     /// Split-ordered list RCU specialization
14     /** @ingroup cds_intrusive_map
15         \anchor cds_intrusive_SplitListSet_rcu
16
17         Hash table implementation based on split-ordered list algorithm discovered by Ori Shalev and Nir Shavit, see
18         - [2003] Ori Shalev, Nir Shavit "Split-Ordered Lists - Lock-free Resizable Hash Tables"
19         - [2008] Nir Shavit "The Art of Multiprocessor Programming"
20
21         The split-ordered list is a lock-free implementation of an extensible unbounded hash table. It uses original
22         recursive split-ordering algorithm discovered by Ori Shalev and Nir Shavit that allows to split buckets
23         without moving an item on resizing, see \ref cds_SplitList_algo_desc "short algo description".
24
25         <b>Implementation</b>
26
27         Template parameters are:
28         - \p RCU - one of \ref cds_urcu_gc "RCU type"
29         - \p OrderedList - ordered list implementation used as bucket for hash set, for example, MichaelList, LazyList.
30             The intrusive ordered list implementation specifies the type \p T stored in the hash-set,
31             the comparing functor for the type \p T and other features specific for the ordered list.
32         - \p Traits - set traits, default isd \p split_list::traits.
33             Instead of defining \p Traits struct you may use option-based syntax with \p split_list::make_traits metafunction.
34
35         @note About reqired features of hash functor see \ref cds_SplitList_hash_functor "SplitList general description".
36
37         \par How to use
38         Before including <tt><cds/intrusive/split_list_rcu.h></tt> you should include appropriate RCU header file,
39         see \ref cds_urcu_gc "RCU type" for list of existing RCU class and corresponding header files.
40         For example, for \ref cds_urcu_general_buffered_gc "general-purpose buffered RCU" and
41         MichaelList-based split-list you should include:
42         \code
43         #include <cds/urcu/general_buffered.h>
44         #include <cds/intrusive/michael_list_rcu.h>
45         #include <cds/intrusive/split_list_rcu.h>
46
47         // Declare Michael's list for type Foo and default traits:
48         typedef cds::intrusive::MichaelList< cds::urcu::gc< cds::urcu::general_buffered<> >, Foo > rcu_michael_list;
49
50         // Declare split-list based on rcu_michael_list
51         typedef cds::intrusive::SplitListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, rcu_michael_list > rcu_split_list;
52         \endcode
53
54     */
55     template <
56         class RCU,
57         class OrderedList,
58 #   ifdef CDS_DOXYGEN_INVOKED
59         class Traits = split_list::traits
60 #   else
61         class Traits
62 #   endif
63     >
64     class SplitListSet< cds::urcu::gc< RCU >, OrderedList, Traits >
65     {
66     public:
67         typedef cds::urcu::gc< RCU > gc;   ///< RCU garbage collector
68         typedef Traits           traits;   ///< Traits template parameters
69
70         /// Hash functor for \ref value_type and all its derivatives that you use
71         typedef typename cds::opt::v::hash_selector< typename traits::hash >::type   hash;
72
73         //@cond
74         typedef cds::intrusive::split_list::implementation_tag implementation_tag;
75         //@endcond
76
77     protected:
78         //@cond
79         typedef split_list::details::rebind_list_traits<OrderedList, traits> wrapped_ordered_list;
80         //@endcond
81
82     public:
83 #   ifdef CDS_DOXYGEN_INVOKED
84         typedef OrderedList         ordered_list;   ///< type of ordered list used as base for split-list
85 #   else
86         typedef typename wrapped_ordered_list::result    ordered_list;
87 #   endif
88         typedef typename ordered_list::value_type     value_type;     ///< type of value stored in the split-list
89         typedef typename ordered_list::key_comparator key_comparator; ///< key compare functor
90         typedef typename ordered_list::disposer       disposer;       ///< Node disposer functor
91         typedef typename ordered_list::rcu_lock       rcu_lock;       ///< RCU scoped lock
92         typedef typename ordered_list::exempt_ptr     exempt_ptr;     ///< pointer to extracted node
93         /// Group of \p extract_xxx functions require external locking if underlying ordered list requires that
94         static CDS_CONSTEXPR const bool c_bExtractLockExternal = ordered_list::c_bExtractLockExternal;
95
96         typedef typename traits::item_counter item_counter; ///< Item counter type
97         typedef typename traits::back_off     back_off;     ///< back-off strategy for spinning
98         typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
99         typedef typename traits::stat         stat;         ///< Internal statistics
100
101     protected:
102         typedef typename ordered_list::node_type    list_node_type;  ///< Node type as declared in ordered list
103         typedef split_list::node<list_node_type>    node_type;       ///< split-list node type
104         typedef node_type                           dummy_node_type; ///< dummy node type
105
106         /// Split-list node traits
107         /**
108             This traits is intended for converting between underlying ordered list node type \ref list_node_type
109             and split-list node type \ref node_type
110         */
111         typedef split_list::node_traits<typename ordered_list::node_traits>  node_traits;
112
113         //@cond
114         /// Bucket table implementation
115         typedef typename split_list::details::bucket_table_selector<
116             traits::dynamic_bucket_table
117             , gc
118             , dummy_node_type
119             , opt::allocator< typename traits::allocator >
120             , opt::memory_model< memory_model >
121         >::type bucket_table;
122
123         //@endcond
124
125     protected:
126         //@cond
127         /// Ordered list wrapper to access protected members of OrderedList
128         class ordered_list_wrapper: public ordered_list
129         {
130             typedef ordered_list base_class;
131             typedef typename base_class::auxiliary_head       bucket_head_type;
132
133         public:
134             bool insert_at( dummy_node_type * pHead, value_type& val )
135             {
136                 assert( pHead != nullptr );
137                 bucket_head_type h(pHead);
138                 return base_class::insert_at( h, val );
139             }
140
141             template <typename Func>
142             bool insert_at( dummy_node_type * pHead, value_type& val, Func f )
143             {
144                 assert( pHead != nullptr );
145                 bucket_head_type h(pHead);
146                 return base_class::insert_at( h, val, f );
147             }
148
149             template <typename Func>
150             std::pair<bool, bool> ensure_at( dummy_node_type * pHead, value_type& val, Func func )
151             {
152                 assert( pHead != nullptr );
153                 bucket_head_type h(pHead);
154                 return base_class::ensure_at( h, val, func );
155             }
156
157             bool unlink_at( dummy_node_type * pHead, value_type& val )
158             {
159                 assert( pHead != nullptr );
160                 bucket_head_type h(pHead);
161                 return base_class::unlink_at( h, val );
162             }
163
164             template <typename Q, typename Compare, typename Func>
165             bool erase_at( dummy_node_type * pHead, split_list::details::search_value_type<Q> const& val, Compare cmp, Func f )
166             {
167                 assert( pHead != nullptr );
168                 bucket_head_type h(pHead);
169                 return base_class::erase_at( h, val, cmp, f );
170             }
171
172             template <typename Q, typename Compare>
173             bool erase_at( dummy_node_type * pHead, split_list::details::search_value_type<Q> const& val, Compare cmp )
174             {
175                 assert( pHead != nullptr );
176                 bucket_head_type h(pHead);
177                 return base_class::erase_at( h, val, cmp );
178             }
179
180             template <typename Q, typename Compare>
181             value_type * extract_at( dummy_node_type * pHead, split_list::details::search_value_type<Q>& val, Compare cmp )
182             {
183                 assert( pHead != nullptr );
184                 bucket_head_type h(pHead);
185                 return base_class::extract_at( h, val, cmp );
186             }
187
188             template <typename Q, typename Compare, typename Func>
189             bool find_at( dummy_node_type * pHead, split_list::details::search_value_type<Q>& val, Compare cmp, Func f ) const
190             {
191                 assert( pHead != nullptr );
192                 bucket_head_type h(pHead);
193                 return base_class::find_at( h, val, cmp, f );
194             }
195
196             template <typename Q, typename Compare>
197             bool find_at( dummy_node_type * pHead, split_list::details::search_value_type<Q> const & val, Compare cmp ) const
198             {
199                 assert( pHead != nullptr );
200                 bucket_head_type h(pHead);
201                 return base_class::find_at( h, val, cmp );
202             }
203
204             template <typename Q, typename Compare>
205             value_type * get_at( dummy_node_type * pHead, split_list::details::search_value_type<Q>& val, Compare cmp ) const
206             {
207                 assert( pHead != nullptr );
208                 bucket_head_type h(pHead);
209                 return base_class::get_at( h, val, cmp );
210             }
211
212             bool insert_aux_node( dummy_node_type * pNode )
213             {
214                 return base_class::insert_aux_node( pNode );
215             }
216             bool insert_aux_node( dummy_node_type * pHead, dummy_node_type * pNode )
217             {
218                 bucket_head_type h(pHead);
219                 return base_class::insert_aux_node( h, pNode );
220             }
221         };
222
223         template <typename Less>
224         struct less_wrapper: public cds::opt::details::make_comparator_from_less<Less>
225         {
226             typedef cds::opt::details::make_comparator_from_less<Less> base_wrapper;
227
228             template <typename Q1, typename Q2>
229             int operator()( split_list::details::search_value_type<Q1> const& v1, Q2 const& v2 ) const
230             {
231                 return base_wrapper::operator()( v1.val, v2 );
232             }
233
234             template <typename Q1, typename Q2>
235             int operator()( Q1 const& v1, split_list::details::search_value_type<Q2> const& v2 ) const
236             {
237                 return base_wrapper::operator()( v1, v2.val );
238             }
239         };
240         //@endcond
241
242     protected:
243         ordered_list_wrapper    m_List;             ///< Ordered list containing split-list items
244         bucket_table            m_Buckets;          ///< bucket table
245         atomics::atomic<size_t> m_nBucketCountLog2; ///< log2( current bucket count )
246         atomics::atomic<size_t> m_nMaxItemCount;    ///< number of items container can hold, before we have to resize
247         item_counter            m_ItemCounter;      ///< Item counter
248         hash                    m_HashFunctor;      ///< Hash functor
249         stat                    m_Stat;             ///< Internal stattistics accumulator
250
251     protected:
252         //@cond
253         typedef cds::details::Allocator< dummy_node_type, typename traits::allocator >   dummy_node_allocator;
254
255         dummy_node_type * alloc_dummy_node( size_t nHash )
256         {
257             m_Stat.onHeadNodeAllocated();
258             CDS_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN;
259             dummy_node_type * p = dummy_node_allocator().New( nHash );
260             CDS_TSAN_ANNOTATE_IGNORE_WRITES_END;
261             return p;
262         }
263         void free_dummy_node( dummy_node_type * p )
264         {
265             CDS_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN;
266             dummy_node_allocator().Delete( p );
267             CDS_TSAN_ANNOTATE_IGNORE_WRITES_END;
268             m_Stat.onHeadNodeFreed();
269         }
270
271         /// Calculates hash value of \p key
272         template <typename Q>
273         size_t hash_value( Q const& key ) const
274         {
275             return m_HashFunctor( key );
276         }
277
278         size_t bucket_no( size_t nHash ) const
279         {
280             return nHash & ( (1 << m_nBucketCountLog2.load(memory_model::memory_order_relaxed)) - 1 );
281         }
282
283         static size_t parent_bucket( size_t nBucket )
284         {
285             assert( nBucket > 0 );
286             return nBucket & ~( 1 << bitop::MSBnz( nBucket ) );
287         }
288
289         dummy_node_type * init_bucket( size_t nBucket )
290         {
291             assert( nBucket > 0 );
292             size_t nParent = parent_bucket( nBucket );
293
294             dummy_node_type * pParentBucket = m_Buckets.bucket( nParent );
295             if ( pParentBucket == nullptr ) {
296                 pParentBucket = init_bucket( nParent );
297                 m_Stat.onRecursiveInitBucket();
298             }
299
300             assert( pParentBucket != nullptr );
301
302             // Allocate a dummy node for new bucket
303             {
304                 dummy_node_type * pBucket = alloc_dummy_node( split_list::dummy_hash( nBucket ) );
305                 if ( m_List.insert_aux_node( pParentBucket, pBucket ) ) {
306                     m_Buckets.bucket( nBucket, pBucket );
307                     m_Stat.onNewBucket();
308                     return pBucket;
309                 }
310                 free_dummy_node( pBucket );
311             }
312
313             // Another thread set the bucket. Wait while it done
314
315             // In this point, we must wait while nBucket is empty.
316             // The compiler can decide that waiting loop can be "optimized" (stripped)
317             // To prevent this situation, we use waiting on volatile bucket_head_ptr pointer.
318             //
319             m_Stat.onBucketInitContenton();
320             back_off bkoff;
321             while ( true ) {
322                 dummy_node_type volatile * p = m_Buckets.bucket( nBucket );
323                 if ( p != nullptr )
324                     return const_cast<dummy_node_type *>( p );
325                 bkoff();
326                 m_Stat.onBusyWaitBucketInit();
327             }
328         }
329
330         dummy_node_type * get_bucket( size_t nHash )
331         {
332             size_t nBucket = bucket_no( nHash );
333
334             dummy_node_type * pHead = m_Buckets.bucket( nBucket );
335             if ( pHead == nullptr )
336                 pHead = init_bucket( nBucket );
337
338             assert( pHead->is_dummy() );
339
340             return pHead;
341         }
342
343         void init()
344         {
345             // GC and OrderedList::gc must be the same
346             static_assert( std::is_same<gc, typename ordered_list::gc>::value, "GC and OrderedList::gc must be the same");
347
348             // atomicity::empty_item_counter is not allowed as a item counter
349             static_assert( !std::is_same<item_counter, cds::atomicity::empty_item_counter>::value,
350                            "cds::atomicity::empty_item_counter is not allowed as a item counter");
351
352             // Initialize bucket 0
353             dummy_node_type * pNode = alloc_dummy_node( 0 /*split_list::dummy_hash(0)*/ );
354
355             // insert_aux_node cannot return false for empty list
356             CDS_VERIFY( m_List.insert_aux_node( pNode ));
357
358             m_Buckets.bucket( 0, pNode );
359         }
360
361         static size_t max_item_count( size_t nBucketCount, size_t nLoadFactor )
362         {
363             return nBucketCount * nLoadFactor;
364         }
365
366         void inc_item_count()
367         {
368             size_t nMaxCount = m_nMaxItemCount.load(memory_model::memory_order_relaxed);
369             if ( ++m_ItemCounter <= nMaxCount )
370                 return;
371
372             size_t sz = m_nBucketCountLog2.load(memory_model::memory_order_relaxed);
373             const size_t nBucketCount = static_cast<size_t>(1) << sz;
374             if ( nBucketCount < m_Buckets.capacity() ) {
375                 // we may grow the bucket table
376                 const size_t nLoadFactor = m_Buckets.load_factor();
377                 if ( nMaxCount < max_item_count( nBucketCount, nLoadFactor ))
378                     return; // someone already have updated m_nBucketCountLog2, so stop here
379
380                 m_nMaxItemCount.compare_exchange_strong( nMaxCount, max_item_count( nBucketCount << 1, nLoadFactor ), 
381                                                          memory_model::memory_order_relaxed, atomics::memory_order_relaxed );
382                 m_nBucketCountLog2.compare_exchange_strong( sz, sz + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed );
383             }
384             else
385                 m_nMaxItemCount.store( std::numeric_limits<size_t>::max(), memory_model::memory_order_relaxed );
386         }
387
388         template <typename Q, typename Compare, typename Func>
389         bool find_( Q& val, Compare cmp, Func f )
390         {
391             size_t nHash = hash_value( val );
392             split_list::details::search_value_type<Q>  sv( val, split_list::regular_hash( nHash ));
393             dummy_node_type * pHead = get_bucket( nHash );
394             assert( pHead != nullptr );
395
396             return m_Stat.onFind( m_List.find_at( pHead, sv, cmp,
397                 [&f](value_type& item, split_list::details::search_value_type<Q>& val){ f(item, val.val ); }));
398         }
399
400         template <typename Q, typename Compare>
401         bool find_value( Q const& val, Compare cmp )
402         {
403             size_t nHash = hash_value( val );
404             split_list::details::search_value_type<Q const>  sv( val, split_list::regular_hash( nHash ));
405             dummy_node_type * pHead = get_bucket( nHash );
406             assert( pHead != nullptr );
407
408             return m_Stat.onFind( m_List.find_at( pHead, sv, cmp ));
409         }
410
411         template <typename Q, typename Compare>
412         value_type * get_( Q const& val, Compare cmp )
413         {
414             size_t nHash = hash_value( val );
415             split_list::details::search_value_type<Q const>  sv( val, split_list::regular_hash( nHash ));
416             dummy_node_type * pHead = get_bucket( nHash );
417             assert( pHead != nullptr );
418
419             value_type * p = m_List.get_at( pHead, sv, cmp );
420             m_Stat.onFind( p != nullptr );
421             return p;
422         }
423
424         template <typename Q, typename Compare>
425         value_type * extract_( Q const& val, Compare cmp )
426         {
427             size_t nHash = hash_value( val );
428             split_list::details::search_value_type<Q const>  sv( val, split_list::regular_hash( nHash ));
429             dummy_node_type * pHead = get_bucket( nHash );
430             assert( pHead != nullptr );
431
432             value_type * pNode = m_List.extract_at( pHead, sv, cmp );
433             if ( pNode ) {
434                 --m_ItemCounter;
435                 m_Stat.onExtractSuccess();
436             }
437             else
438                 m_Stat.onExtractFailed();
439             return pNode;
440         }
441
442         template <typename Q, typename Less>
443         value_type * extract_with_( Q const& val, Less pred )
444         {
445             CDS_UNUSED( pred );
446             return extract_( val, typename wrapped_ordered_list::template make_compare_from_less<Less>());
447         }
448
449         template <typename Q, typename Compare>
450         bool erase_( const Q& val, Compare cmp )
451         {
452             size_t nHash = hash_value( val );
453             split_list::details::search_value_type<Q const>  sv( val, split_list::regular_hash( nHash ));
454             dummy_node_type * pHead = get_bucket( nHash );
455             assert( pHead != nullptr );
456
457             if ( m_List.erase_at( pHead, sv, cmp ) ) {
458                 --m_ItemCounter;
459                 m_Stat.onEraseSuccess();
460                 return true;
461             }
462             m_Stat.onEraseFailed();
463             return false;
464         }
465
466         template <typename Q, typename Compare, typename Func>
467         bool erase_( Q const& val, Compare cmp, Func f )
468         {
469             size_t nHash = hash_value( val );
470             split_list::details::search_value_type<Q const>  sv( val, split_list::regular_hash( nHash ));
471             dummy_node_type * pHead = get_bucket( nHash );
472             assert( pHead != nullptr );
473
474             if ( m_List.erase_at( pHead, sv, cmp, f )) {
475                 --m_ItemCounter;
476                 m_Stat.onEraseSuccess();
477                 return true;
478             }
479             m_Stat.onEraseFailed();
480             return false;
481         }
482
483         //@endcond
484
485     public:
486         /// Initialize split-ordered list of default capacity
487         /**
488             The default capacity is defined in bucket table constructor.
489             See split_list::expandable_bucket_table, split_list::static_ducket_table
490             which selects by split_list::dynamic_bucket_table option.
491         */
492         SplitListSet()
493             : m_nBucketCountLog2(1)
494             , m_nMaxItemCount( max_item_count(2, m_Buckets.load_factor()) )
495         {
496             init();
497         }
498
499         /// Initialize split-ordered list
500         SplitListSet(
501             size_t nItemCount           ///< estimate average of item count
502             , size_t nLoadFactor = 1    ///< load factor - average item count per bucket. Small integer up to 8, default is 1.
503             )
504             : m_Buckets( nItemCount, nLoadFactor )
505             , m_nBucketCountLog2(1)
506             , m_nMaxItemCount( max_item_count(2, m_Buckets.load_factor()) )
507         {
508             init();
509         }
510
511     public:
512         /// Inserts new node
513         /**
514             The function inserts \p val in the set if it does not contain
515             an item with key equal to \p val.
516
517             The function makes RCU lock internally.
518
519             Returns \p true if \p val is placed into the set, \p false otherwise.
520         */
521         bool insert( value_type& val )
522         {
523             size_t nHash = hash_value( val );
524             dummy_node_type * pHead = get_bucket( nHash );
525             assert( pHead != nullptr );
526
527             node_traits::to_node_ptr( val )->m_nHash = split_list::regular_hash( nHash );
528
529             if ( m_List.insert_at( pHead, val )) {
530                 inc_item_count();
531                 m_Stat.onInsertSuccess();
532                 return true;
533             }
534             m_Stat.onInsertFailed();
535             return false;
536         }
537
538         /// Inserts new node
539         /**
540             This function is intended for derived non-intrusive containers.
541
542             The function allows to split creating of new item into two part:
543             - create item with key only
544             - insert new item into the set
545             - if inserting is success, calls  \p f functor to initialize value-field of \p val.
546
547             The functor signature is:
548             \code
549                 void func( value_type& val );
550             \endcode
551             where \p val is the item inserted.
552
553             The function makes RCU lock internally.
554
555             @warning For \ref cds_intrusive_MichaelList_rcu "MichaelList" as the bucket see \ref cds_intrusive_item_creating "insert item troubleshooting".
556             \ref cds_intrusive_LazyList_rcu "LazyList" provides exclusive access to inserted item and does not require any node-level
557             synchronization.
558             */
559         template <typename Func>
560         bool insert( value_type& val, Func f )
561         {
562             size_t nHash = hash_value( val );
563             dummy_node_type * pHead = get_bucket( nHash );
564             assert( pHead != nullptr );
565
566             node_traits::to_node_ptr( val )->m_nHash = split_list::regular_hash( nHash );
567
568             if ( m_List.insert_at( pHead, val, f )) {
569                 inc_item_count();
570                 m_Stat.onInsertSuccess();
571                 return true;
572             }
573             m_Stat.onInsertFailed();
574             return false;
575         }
576
577         /// Ensures that the \p val exists in the set
578         /**
579             The operation performs inserting or changing data with lock-free manner.
580
581             If the item \p val is not found in the set, then \p val is inserted into the set.
582             Otherwise, the functor \p func is called with item found.
583             The functor signature is:
584             \code
585                 void func( bool bNew, value_type& item, value_type& val );
586             \endcode
587             with arguments:
588             - \p bNew - \p true if the item has been inserted, \p false otherwise
589             - \p item - item of the set
590             - \p val - argument \p val passed into the \p ensure function
591             If new item has been inserted (i.e. \p bNew is \p true) then \p item and \p val arguments
592             refers to the same thing.
593
594             The function makes RCU lock internally.
595
596             Returns std::pair<bool, bool> where \p first is \p true if operation is successfull,
597             \p second is \p true if new item has been added or \p false if the item with \p key
598             already is in the set.
599
600             @warning For \ref cds_intrusive_MichaelList_rcu "MichaelList" as the bucket see \ref cds_intrusive_item_creating "insert item troubleshooting".
601             \ref cds_intrusive_LazyList_rcu "LazyList" provides exclusive access to inserted item and does not require any node-level
602             synchronization.
603             */
604         template <typename Func>
605         std::pair<bool, bool> ensure( value_type& val, Func func )
606         {
607             size_t nHash = hash_value( val );
608             dummy_node_type * pHead = get_bucket( nHash );
609             assert( pHead != nullptr );
610
611             node_traits::to_node_ptr( val )->m_nHash = split_list::regular_hash( nHash );
612
613             std::pair<bool, bool> bRet = m_List.ensure_at( pHead, val, func );
614             if ( bRet.first && bRet.second ) {
615                 inc_item_count();
616                 m_Stat.onEnsureNew();
617             }
618             else
619                 m_Stat.onEnsureExist();
620             return bRet;
621         }
622
623         /// Unlinks the item \p val from the set
624         /**
625             The function searches the item \p val in the set and unlinks it from the set
626             if it is found and is equal to \p val.
627
628             Difference between \ref erase and \p unlink functions: \p erase finds <i>a key</i>
629             and deletes the item found. \p unlink finds an item by key and deletes it
630             only if \p val is an item of that set, i.e. the pointer to item found
631             is equal to <tt> &val </tt>.
632
633             RCU \p synchronize method can be called, therefore, RCU should not be locked.
634
635             The function returns \p true if success and \p false otherwise.
636         */
637         bool unlink( value_type& val )
638         {
639             size_t nHash = hash_value( val );
640             dummy_node_type * pHead = get_bucket( nHash );
641             assert( pHead != nullptr );
642
643             if ( m_List.unlink_at( pHead, val ) ) {
644                 --m_ItemCounter;
645                 m_Stat.onEraseSuccess();
646                 return true;
647             }
648             m_Stat.onEraseFailed();
649             return false;
650         }
651
652         /// Deletes the item from the set
653         /** \anchor cds_intrusive_SplitListSet_rcu_erase
654             The function searches an item with key equal to \p key in the set,
655             unlinks it from the set, and returns \p true.
656             If the item with key equal to \p key is not found the function return \p false.
657
658             Difference between \ref erase and \p unlink functions: \p erase finds <i>a key</i>
659             and deletes the item found. \p unlink finds an item by key and deletes it
660             only if \p key is an item of that set, i.e. the pointer to item found
661             is equal to <tt> &key </tt>.
662
663             RCU \p synchronize method can be called, therefore, RCU should not be locked.
664
665             Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type.
666         */
667         template <typename Q>
668         bool erase( Q const& key )
669         {
670             return erase_( key, key_comparator() );
671         }
672
673         /// Deletes the item from the set using \p pred for searching
674         /**
675             The function is an analog of \ref cds_intrusive_SplitListSet_rcu_erase "erase(Q const&)"
676             but \p cmp is used for key compare.
677             \p Less has the interface like \p std::less.
678             \p pred must imply the same element order as the comparator used for building the set.
679         */
680         template <typename Q, typename Less>
681         bool erase_with( Q const& key, Less pred )
682         {
683             CDS_UNUSED( pred );
684             return erase_( key, typename wrapped_ordered_list::template make_compare_from_less<Less>() );
685         }
686
687         /// Deletes the item from the set
688         /** \anchor cds_intrusive_SplitListSet_rcu_erase_func
689             The function searches an item with key equal to \p key in the set,
690             call \p f functor with item found, unlinks it from the set, and returns \p true.
691             The \ref disposer specified by \p OrderedList class template parameter is called
692             by garbage collector \p GC asynchronously.
693
694             The \p Func interface is
695             \code
696             struct functor {
697                 void operator()( value_type const& item );
698             };
699             \endcode
700
701             If the item with key equal to \p key is not found the function return \p false.
702
703             RCU \p synchronize method can be called, therefore, RCU should not be locked.
704
705             Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type.
706         */
707         template <typename Q, typename Func>
708         bool erase( Q const& key, Func f )
709         {
710             return erase_( key, key_comparator(), f );
711         }
712
713         /// Deletes the item from the set using \p pred for searching
714         /**
715             The function is an analog of \ref cds_intrusive_SplitListSet_rcu_erase_func "erase(Q const&, Func)"
716             but \p cmp is used for key compare.
717             \p Less has the interface like \p std::less.
718             \p pred must imply the same element order as the comparator used for building the set.
719         */
720         template <typename Q, typename Less, typename Func>
721         bool erase_with( Q const& key, Less pred, Func f )
722         {
723             CDS_UNUSED( pred );
724             return erase_( key, typename wrapped_ordered_list::template make_compare_from_less<Less>(), f );
725         }
726
727         /// Extracts an item from the set
728         /** \anchor cds_intrusive_SplitListSet_rcu_extract
729             The function searches an item with key equal to \p key in the set,
730             unlinks it, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item found.
731             If the item with the key equal to \p key is not found the function returns an empty \p exempt_ptr.
732
733             @note The function does NOT call RCU read-side lock or synchronization,
734             and does NOT dispose the item found. It just excludes the item from the set
735             and returns a pointer to item found.
736             You should lock RCU before calling of the function, and you should synchronize RCU
737             outside the RCU lock before reusing returned pointer.
738
739             \code
740             typedef cds::urcu::gc< general_buffered<> > rcu;
741             typedef cds::intrusive::MichaelList< rcu, Foo > rcu_michael_list;
742             typedef cds::intrusive::SplitListSet< rcu, rcu_michael_list, foo_traits > rcu_splitlist_set;
743
744             rcu_splitlist_set theSet;
745             // ...
746
747             rcu_splitlist_set::exempt_ptr p;
748             {
749                 // first, we should lock RCU
750                 rcu_splitlist_set::rcu_lock lock;
751
752                 // Now, you can apply extract function
753                 // Note that you must not delete the item found inside the RCU lock
754                 p = theList.extract( 10 );
755                 if ( p ) {
756                     // do something with p
757                     ...
758                 }
759             }
760
761             // We may safely release p here
762             // release() passes the pointer to RCU reclamation cycle:
763             // it invokes RCU retire_ptr function with the disposer you provided for rcu_michael_list.
764             p.release();
765             \endcode
766         */
767         template <typename Q>
768         exempt_ptr extract( Q const& key )
769         {
770             return exempt_ptr(extract_( key, key_comparator() ));
771         }
772
773         /// Extracts an item from the set using \p pred for searching
774         /**
775             The function is an analog of \p extract(Q const&) but \p pred is used for key compare.
776             \p Less functor has the interface like \p std::less.
777             \p pred must imply the same element order as the comparator used for building the set.
778         */
779         template <typename Q, typename Less>
780         exempt_ptr extract_with( Q const& key, Less pred )
781         {
782             return exempt_ptr( extract_with_( key, pred ));
783         }
784
785         /// Finds the key \p key
786         /** \anchor cds_intrusive_SplitListSet_rcu_find_func
787             The function searches the item with key equal to \p key and calls the functor \p f for item found.
788             The interface of \p Func functor is:
789             \code
790             struct functor {
791                 void operator()( value_type& item, Q& key );
792             };
793             \endcode
794             where \p item is the item found, \p key is the <tt>find</tt> function argument.
795
796             The functor can change non-key fields of \p item. Note that the functor is only guarantee
797             that \p item cannot be disposed during functor is executing.
798             The functor does not serialize simultaneous access to the set \p item. If such access is
799             possible you must provide your own synchronization schema on item level to exclude unsafe item modifications.
800
801             The \p key argument is non-const since it can be used as \p f functor destination i.e., the functor
802             can modify both arguments.
803
804             Note the hash functor specified for class \p Traits template parameter
805             should accept a parameter of type \p Q that can be not the same as \p value_type.
806
807             The function applies RCU lock internally.
808
809             The function returns \p true if \p key is found, \p false otherwise.
810         */
811         template <typename Q, typename Func>
812         bool find( Q& key, Func f )
813         {
814             return find_( key, key_comparator(), f );
815         }
816         //@cond
817         template <typename Q, typename Func>
818         bool find( Q const& key, Func f )
819         {
820             return find_( key, key_comparator(), f );
821         }
822         //@endcond
823
824         /// Finds the key \p key with \p pred predicate for comparing
825         /**
826             The function is an analog of \ref cds_intrusive_SplitListSet_rcu_find_func "find(Q&, Func)"
827             but \p cmp is used for key compare.
828             \p Less has the interface like \p std::less.
829             \p cmp must imply the same element order as the comparator used for building the set.
830         */
831         template <typename Q, typename Less, typename Func>
832         bool find_with( Q& key, Less pred, Func f )
833         {
834             CDS_UNUSED( pred );
835             return find_( key, typename wrapped_ordered_list::template make_compare_from_less<Less>(), f );
836         }
837         //@cond
838         template <typename Q, typename Less, typename Func>
839         bool find_with( Q const& key, Less pred, Func f )
840         {
841             CDS_UNUSED( pred );
842             return find_( key, typename wrapped_ordered_list::template make_compare_from_less<Less>(), f );
843         }
844         //@endcond
845
846         /// Finds the key \p key
847         /** \anchor cds_intrusive_SplitListSet_rcu_find_val
848             The function searches the item with key equal to \p key
849             and returns \p true if \p key found or \p false otherwise.
850         */
851         template <typename Q>
852         bool find( Q const& key )
853         {
854             return find_value( key, key_comparator() );
855         }
856
857         /// Finds the key \p key with \p pred predicate for comparing
858         /**
859             The function is an analog of \ref cds_intrusive_SplitListSet_rcu_find_val "find(Q const&)"
860             but \p cmp is used for key compare.
861             \p Less has the interface like \p std::less.
862             \p pred must imply the same element order as the comparator used for building the set.
863         */
864         template <typename Q, typename Less>
865         bool find_with( Q const& key, Less pred )
866         {
867             CDS_UNUSED( pred );
868             return find_value( key, typename wrapped_ordered_list::template make_compare_from_less<Less>() );
869         }
870
871         /// Finds the key \p key and return the item found
872         /** \anchor cds_intrusive_SplitListSet_rcu_get
873             The function searches the item with key equal to \p key and returns the pointer to item found.
874             If \p key is not found it returns \p nullptr.
875
876             Note the compare functor should accept a parameter of type \p Q that can be not the same as \p value_type.
877
878             RCU should be locked before call of this function.
879             Returned item is valid only while RCU is locked:
880             \code
881             cds::intrusive::SplitListSet< your_template_parameters > theSet;
882             // ...
883             {
884                 // Lock RCU
885                 hash_set::rcu_lock lock;
886
887                 foo * pVal = theSet.get( 5 );
888                 if ( pVal ) {
889                     // Deal with pVal
890                     //...
891                 }
892                 // Unlock RCU by rcu_lock destructor
893                 // pVal can be retired by disposer at any time after RCU has been unlocked
894             }
895             \endcode
896         */
897         template <typename Q>
898         value_type * get( Q const& key )
899         {
900             return get_( key, key_comparator() );
901         }
902
903         /// Finds the key \p key and return the item found
904         /**
905             The function is an analog of \ref cds_intrusive_SplitListSet_rcu_get "get(Q const&)"
906             but \p pred is used for comparing the keys.
907
908             \p Less functor has the semantics like \p std::less but should take arguments of type \ref value_type and \p Q
909             in any order.
910             \p pred must imply the same element order as the comparator used for building the set.
911         */
912         template <typename Q, typename Less>
913         value_type * get_with( Q const& key, Less pred )
914         {
915             CDS_UNUSED( pred );
916             return get_( key, typename wrapped_ordered_list::template make_compare_from_less<Less>());
917         }
918
919
920         /// Returns item count in the set
921         size_t size() const
922         {
923             return m_ItemCounter;
924         }
925
926         /// Checks if the set is empty
927         /**
928             Emptiness is checked by item counting: if item count is zero then the set is empty.
929             Thus, the correct item counting feature is an important part of split-list set implementation.
930         */
931         bool empty() const
932         {
933             return size() == 0;
934         }
935
936         /// Clears the set (not atomic)
937         void clear()
938         {
939             iterator it = begin();
940             while ( it != end() ) {
941                 iterator i(it);
942                 ++i;
943                 unlink( *it );
944                 it = i;
945             }
946         }
947
948         /// Returns internal statistics
949         stat const& statistics() const
950         {
951             return m_Stat;
952         }
953
954     protected:
955         //@cond
956         template <bool IsConst>
957         class iterator_type
958             :public split_list::details::iterator_type<node_traits, ordered_list, IsConst>
959         {
960             typedef split_list::details::iterator_type<node_traits, ordered_list, IsConst> iterator_base_class;
961             typedef typename iterator_base_class::list_iterator list_iterator;
962         public:
963             iterator_type()
964                 : iterator_base_class()
965             {}
966
967             iterator_type( iterator_type const& src )
968                 : iterator_base_class( src )
969             {}
970
971             // This ctor should be protected...
972             iterator_type( list_iterator itCur, list_iterator itEnd )
973                 : iterator_base_class( itCur, itEnd )
974             {}
975         };
976         //@endcond
977     public:
978         /// Forward iterator
979         /**
980             The forward iterator for a split-list has some features:
981             - it has no post-increment operator
982             - it depends on iterator of underlying \p OrderedList
983             - The iterator cannot be moved across thread boundary since it may contain GC's guard that is thread-private GC data.
984             - Iterator ensures thread-safety even if you delete the item that iterator points to. However, in case of concurrent
985               deleting operations it is no guarantee that you iterate all item in the split-list.
986
987             Therefore, the use of iterators in concurrent environment is not good idea. Use the iterator on the concurrent container
988             for debug purpose only.
989         */
990         typedef iterator_type<false>    iterator;
991         /// Const forward iterator
992         /**
993             For iterator's features and requirements see \ref iterator
994         */
995         typedef iterator_type<true>     const_iterator;
996
997         /// Returns a forward iterator addressing the first element in a split-list
998         /**
999             For empty list \code begin() == end() \endcode
1000         */
1001         iterator begin()
1002         {
1003             return iterator( m_List.begin(), m_List.end() );
1004         }
1005
1006         /// Returns an iterator that addresses the location succeeding the last element in a split-list
1007         /**
1008             Do not use the value returned by <tt>end</tt> function to access any item.
1009
1010             The returned value can be used only to control reaching the end of the split-list.
1011             For empty list \code begin() == end() \endcode
1012         */
1013         iterator end()
1014         {
1015             return iterator( m_List.end(), m_List.end() );
1016         }
1017
1018         /// Returns a forward const iterator addressing the first element in a split-list
1019         const_iterator begin() const
1020         {
1021             return cbegin();
1022         }
1023         /// Returns a forward const iterator addressing the first element in a split-list
1024         const_iterator cbegin() const
1025         {
1026             return const_iterator( m_List.cbegin(), m_List.cend() );
1027         }
1028
1029         /// Returns an const iterator that addresses the location succeeding the last element in a split-list
1030         const_iterator end() const
1031         {
1032             return cend();
1033         }
1034         /// Returns an const iterator that addresses the location succeeding the last element in a split-list
1035         const_iterator cend() const
1036         {
1037             return const_iterator( m_List.cend(), m_List.cend() );
1038         }
1039
1040     };
1041
1042 }}  // namespace cds::intrusive
1043
1044 #endif // #ifndef CDSLIB_INTRUSIVE_SPLIT_LIST_RCU_H