Optimize fast path in inc_item_count.
[libcds.git] / cds / intrusive / split_list_rcu.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_INTRUSIVE_SPLIT_LIST_RCU_H
4 #define CDSLIB_INTRUSIVE_SPLIT_LIST_RCU_H
5
6 #include <cds/intrusive/details/split_list_base.h>
7 #include <cds/details/binary_functor_wrapper.h>
8 #include <limits>
9
10 namespace cds { namespace intrusive {
11
12     /// Split-ordered list RCU specialization
13     /** @ingroup cds_intrusive_map
14         \anchor cds_intrusive_SplitListSet_rcu
15
16         Hash table implementation based on split-ordered list algorithm discovered by Ori Shalev and Nir Shavit, see
17         - [2003] Ori Shalev, Nir Shavit "Split-Ordered Lists - Lock-free Resizable Hash Tables"
18         - [2008] Nir Shavit "The Art of Multiprocessor Programming"
19
20         The split-ordered list is a lock-free implementation of an extensible unbounded hash table. It uses original
21         recursive split-ordering algorithm discovered by Ori Shalev and Nir Shavit that allows to split buckets
22         without moving an item on resizing, see \ref cds_SplitList_algo_desc "short algo description".
23
24         <b>Implementation</b>
25
26         Template parameters are:
27         - \p RCU - one of \ref cds_urcu_gc "RCU type"
28         - \p OrderedList - ordered list implementation used as bucket for hash set, for example, MichaelList, LazyList.
29             The intrusive ordered list implementation specifies the type \p T stored in the hash-set,
30             the comparing functor for the type \p T and other features specific for the ordered list.
31         - \p Traits - set traits, default isd \p split_list::traits.
32             Instead of defining \p Traits struct you may use option-based syntax with \p split_list::make_traits metafunction.
33
34         @note About reqired features of hash functor see \ref cds_SplitList_hash_functor "SplitList general description".
35
36         \par How to use
37         Before including <tt><cds/intrusive/split_list_rcu.h></tt> you should include appropriate RCU header file,
38         see \ref cds_urcu_gc "RCU type" for list of existing RCU class and corresponding header files.
39         For example, for \ref cds_urcu_general_buffered_gc "general-purpose buffered RCU" and
40         MichaelList-based split-list you should include:
41         \code
42         #include <cds/urcu/general_buffered.h>
43         #include <cds/intrusive/michael_list_rcu.h>
44         #include <cds/intrusive/split_list_rcu.h>
45
46         // Declare Michael's list for type Foo and default traits:
47         typedef cds::intrusive::MichaelList< cds::urcu::gc< cds::urcu::general_buffered<> >, Foo > rcu_michael_list;
48
49         // Declare split-list based on rcu_michael_list
50         typedef cds::intrusive::SplitListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, rcu_michael_list > rcu_split_list;
51         \endcode
52
53     */
54     template <
55         class RCU,
56         class OrderedList,
57 #   ifdef CDS_DOXYGEN_INVOKED
58         class Traits = split_list::traits
59 #   else
60         class Traits
61 #   endif
62     >
63     class SplitListSet< cds::urcu::gc< RCU >, OrderedList, Traits >
64     {
65     public:
66         typedef cds::urcu::gc< RCU > gc;   ///< RCU garbage collector
67         typedef Traits           traits;   ///< Traits template parameters
68
69         /// Hash functor for \ref value_type and all its derivatives that you use
70         typedef typename cds::opt::v::hash_selector< typename traits::hash >::type   hash;
71
72         //@cond
73         typedef cds::intrusive::split_list::implementation_tag implementation_tag;
74         //@endcond
75
76     protected:
77         //@cond
78         typedef split_list::details::rebind_list_traits<OrderedList, traits> wrapped_ordered_list;
79         //@endcond
80
81     public:
82 #   ifdef CDS_DOXYGEN_INVOKED
83         typedef OrderedList         ordered_list;   ///< type of ordered list used as base for split-list
84 #   else
85         typedef typename wrapped_ordered_list::result    ordered_list;
86 #   endif
87         typedef typename ordered_list::value_type     value_type;     ///< type of value stored in the split-list
88         typedef typename ordered_list::key_comparator key_comparator; ///< key compare functor
89         typedef typename ordered_list::disposer       disposer;       ///< Node disposer functor
90         typedef typename ordered_list::rcu_lock       rcu_lock;       ///< RCU scoped lock
91         typedef typename ordered_list::exempt_ptr     exempt_ptr;     ///< pointer to extracted node
92         /// Group of \p extract_xxx functions require external locking if underlying ordered list requires that
93         static CDS_CONSTEXPR const bool c_bExtractLockExternal = ordered_list::c_bExtractLockExternal;
94
95         typedef typename traits::item_counter item_counter; ///< Item counter type
96         typedef typename traits::back_off     back_off;     ///< back-off strategy for spinning
97         typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
98         typedef typename traits::stat         stat;         ///< Internal statistics
99
100     protected:
101         typedef typename ordered_list::node_type    list_node_type;  ///< Node type as declared in ordered list
102         typedef split_list::node<list_node_type>    node_type;       ///< split-list node type
103         typedef node_type                           dummy_node_type; ///< dummy node type
104
105         /// Split-list node traits
106         /**
107             This traits is intended for converting between underlying ordered list node type \ref list_node_type
108             and split-list node type \ref node_type
109         */
110         typedef split_list::node_traits<typename ordered_list::node_traits>  node_traits;
111
112         //@cond
113         /// Bucket table implementation
114         typedef typename split_list::details::bucket_table_selector<
115             traits::dynamic_bucket_table
116             , gc
117             , dummy_node_type
118             , opt::allocator< typename traits::allocator >
119             , opt::memory_model< memory_model >
120         >::type bucket_table;
121
122         //@endcond
123
124     protected:
125         //@cond
126         /// Ordered list wrapper to access protected members of OrderedList
127         class ordered_list_wrapper: public ordered_list
128         {
129             typedef ordered_list base_class;
130             typedef typename base_class::auxiliary_head       bucket_head_type;
131
132         public:
133             bool insert_at( dummy_node_type * pHead, value_type& val )
134             {
135                 assert( pHead != nullptr );
136                 bucket_head_type h(pHead);
137                 return base_class::insert_at( h, val );
138             }
139
140             template <typename Func>
141             bool insert_at( dummy_node_type * pHead, value_type& val, Func f )
142             {
143                 assert( pHead != nullptr );
144                 bucket_head_type h(pHead);
145                 return base_class::insert_at( h, val, f );
146             }
147
148             template <typename Func>
149             std::pair<bool, bool> ensure_at( dummy_node_type * pHead, value_type& val, Func func )
150             {
151                 assert( pHead != nullptr );
152                 bucket_head_type h(pHead);
153                 return base_class::ensure_at( h, val, func );
154             }
155
156             bool unlink_at( dummy_node_type * pHead, value_type& val )
157             {
158                 assert( pHead != nullptr );
159                 bucket_head_type h(pHead);
160                 return base_class::unlink_at( h, val );
161             }
162
163             template <typename Q, typename Compare, typename Func>
164             bool erase_at( dummy_node_type * pHead, split_list::details::search_value_type<Q> const& val, Compare cmp, Func f )
165             {
166                 assert( pHead != nullptr );
167                 bucket_head_type h(pHead);
168                 return base_class::erase_at( h, val, cmp, f );
169             }
170
171             template <typename Q, typename Compare>
172             bool erase_at( dummy_node_type * pHead, split_list::details::search_value_type<Q> const& val, Compare cmp )
173             {
174                 assert( pHead != nullptr );
175                 bucket_head_type h(pHead);
176                 return base_class::erase_at( h, val, cmp );
177             }
178
179             template <typename Q, typename Compare>
180             value_type * extract_at( dummy_node_type * pHead, split_list::details::search_value_type<Q>& val, Compare cmp )
181             {
182                 assert( pHead != nullptr );
183                 bucket_head_type h(pHead);
184                 return base_class::extract_at( h, val, cmp );
185             }
186
187             template <typename Q, typename Compare, typename Func>
188             bool find_at( dummy_node_type * pHead, split_list::details::search_value_type<Q>& val, Compare cmp, Func f ) const
189             {
190                 assert( pHead != nullptr );
191                 bucket_head_type h(pHead);
192                 return base_class::find_at( h, val, cmp, f );
193             }
194
195             template <typename Q, typename Compare>
196             bool find_at( dummy_node_type * pHead, split_list::details::search_value_type<Q> const & val, Compare cmp ) const
197             {
198                 assert( pHead != nullptr );
199                 bucket_head_type h(pHead);
200                 return base_class::find_at( h, val, cmp );
201             }
202
203             template <typename Q, typename Compare>
204             value_type * get_at( dummy_node_type * pHead, split_list::details::search_value_type<Q>& val, Compare cmp ) const
205             {
206                 assert( pHead != nullptr );
207                 bucket_head_type h(pHead);
208                 return base_class::get_at( h, val, cmp );
209             }
210
211             bool insert_aux_node( dummy_node_type * pNode )
212             {
213                 return base_class::insert_aux_node( pNode );
214             }
215             bool insert_aux_node( dummy_node_type * pHead, dummy_node_type * pNode )
216             {
217                 bucket_head_type h(pHead);
218                 return base_class::insert_aux_node( h, pNode );
219             }
220         };
221
222         template <typename Less>
223         struct less_wrapper: public cds::opt::details::make_comparator_from_less<Less>
224         {
225             typedef cds::opt::details::make_comparator_from_less<Less> base_wrapper;
226
227             template <typename Q1, typename Q2>
228             int operator()( split_list::details::search_value_type<Q1> const& v1, Q2 const& v2 ) const
229             {
230                 return base_wrapper::operator()( v1.val, v2 );
231             }
232
233             template <typename Q1, typename Q2>
234             int operator()( Q1 const& v1, split_list::details::search_value_type<Q2> const& v2 ) const
235             {
236                 return base_wrapper::operator()( v1, v2.val );
237             }
238         };
239         //@endcond
240
241     protected:
242         ordered_list_wrapper    m_List;             ///< Ordered list containing split-list items
243         bucket_table            m_Buckets;          ///< bucket table
244         atomics::atomic<size_t> m_nBucketCountLog2; ///< log2( current bucket count )
245         atomics::atomic<size_t> m_nMaxItemCount;    ///< number of items container can hold, before we have to resize
246         item_counter            m_ItemCounter;      ///< Item counter
247         hash                    m_HashFunctor;      ///< Hash functor
248         stat                    m_Stat;             ///< Internal stattistics accumulator
249
250     protected:
251         //@cond
252         typedef cds::details::Allocator< dummy_node_type, typename traits::allocator >   dummy_node_allocator;
253
254         dummy_node_type * alloc_dummy_node( size_t nHash )
255         {
256             m_Stat.onHeadNodeAllocated();
257             return dummy_node_allocator().New( nHash );
258         }
259         void free_dummy_node( dummy_node_type * p )
260         {
261             dummy_node_allocator().Delete( p );
262             m_Stat.onHeadNodeFreed();
263         }
264
265         /// Calculates hash value of \p key
266         template <typename Q>
267         size_t hash_value( Q const& key ) const
268         {
269             return m_HashFunctor( key );
270         }
271
272         size_t bucket_no( size_t nHash ) const
273         {
274             return nHash & ( (1 << m_nBucketCountLog2.load(memory_model::memory_order_relaxed)) - 1 );
275         }
276
277         static size_t parent_bucket( size_t nBucket )
278         {
279             assert( nBucket > 0 );
280             return nBucket & ~( 1 << bitop::MSBnz( nBucket ) );
281         }
282
283         dummy_node_type * init_bucket( size_t nBucket )
284         {
285             assert( nBucket > 0 );
286             size_t nParent = parent_bucket( nBucket );
287
288             dummy_node_type * pParentBucket = m_Buckets.bucket( nParent );
289             if ( pParentBucket == nullptr ) {
290                 pParentBucket = init_bucket( nParent );
291                 m_Stat.onRecursiveInitBucket();
292             }
293
294             assert( pParentBucket != nullptr );
295
296             // Allocate a dummy node for new bucket
297             {
298                 dummy_node_type * pBucket = alloc_dummy_node( split_list::dummy_hash( nBucket ) );
299                 if ( m_List.insert_aux_node( pParentBucket, pBucket ) ) {
300                     m_Buckets.bucket( nBucket, pBucket );
301                     m_Stat.onNewBucket();
302                     return pBucket;
303                 }
304                 free_dummy_node( pBucket );
305             }
306
307             // Another thread set the bucket. Wait while it done
308
309             // In this point, we must wait while nBucket is empty.
310             // The compiler can decide that waiting loop can be "optimized" (stripped)
311             // To prevent this situation, we use waiting on volatile bucket_head_ptr pointer.
312             //
313             m_Stat.onBucketInitContenton();
314             back_off bkoff;
315             while ( true ) {
316                 dummy_node_type volatile * p = m_Buckets.bucket( nBucket );
317                 if ( p != nullptr )
318                     return const_cast<dummy_node_type *>( p );
319                 bkoff();
320                 m_Stat.onBusyWaitBucketInit();
321             }
322         }
323
324         dummy_node_type * get_bucket( size_t nHash )
325         {
326             size_t nBucket = bucket_no( nHash );
327
328             dummy_node_type * pHead = m_Buckets.bucket( nBucket );
329             if ( pHead == nullptr )
330                 pHead = init_bucket( nBucket );
331
332             assert( pHead->is_dummy() );
333
334             return pHead;
335         }
336
337         void init()
338         {
339             // GC and OrderedList::gc must be the same
340             static_assert( std::is_same<gc, typename ordered_list::gc>::value, "GC and OrderedList::gc must be the same");
341
342             // atomicity::empty_item_counter is not allowed as a item counter
343             static_assert( !std::is_same<item_counter, cds::atomicity::empty_item_counter>::value,
344                            "cds::atomicity::empty_item_counter is not allowed as a item counter");
345
346             // Initialize bucket 0
347             dummy_node_type * pNode = alloc_dummy_node( 0 /*split_list::dummy_hash(0)*/ );
348
349             // insert_aux_node cannot return false for empty list
350             CDS_VERIFY( m_List.insert_aux_node( pNode ));
351
352             m_Buckets.bucket( 0, pNode );
353         }
354
355         static size_t max_item_count( size_t nBucketCount, size_t nLoadFactor )
356         {
357             return nBucketCount * nLoadFactor;
358         }
359
360         void inc_item_count()
361         {
362             size_t nMaxCount = m_nMaxItemCount.load(memory_model::memory_order_relaxed);
363             if ( ++m_ItemCounter <= nMaxCount )
364                 return;
365
366             const size_t nLoadFactor = m_Buckets.load_factor();
367             size_t sz = m_nBucketCountLog2.load(memory_model::memory_order_relaxed);
368             const size_t nBucketCount = static_cast<size_t>(1) << sz;
369             if ( nMaxCount < max_item_count( nBucketCount, nLoadFactor ))
370                 return; // someone already have updated m_nBucketCountLog2, so stop here
371
372             const size_t nNewMaxCount = (nBucketCount < m_Buckets.capacity()) ? max_item_count( nBucketCount << 1, nLoadFactor )
373                                                                               : std::numeric_limits<size_t>::max();
374             m_nMaxItemCount.compare_exchange_strong( nMaxCount, nNewMaxCount, memory_model::memory_order_relaxed,
375                 memory_model::memory_order_relaxed );
376             m_nBucketCountLog2.compare_exchange_strong( sz, sz + 1, memory_model::memory_order_seq_cst, memory_model::memory_order_relaxed );
377         }
378
379         template <typename Q, typename Compare, typename Func>
380         bool find_( Q& val, Compare cmp, Func f )
381         {
382             size_t nHash = hash_value( val );
383             split_list::details::search_value_type<Q>  sv( val, split_list::regular_hash( nHash ));
384             dummy_node_type * pHead = get_bucket( nHash );
385             assert( pHead != nullptr );
386
387             return m_Stat.onFind( m_List.find_at( pHead, sv, cmp,
388                 [&f](value_type& item, split_list::details::search_value_type<Q>& val){ f(item, val.val ); }));
389         }
390
391         template <typename Q, typename Compare>
392         bool find_value( Q const& val, Compare cmp )
393         {
394             size_t nHash = hash_value( val );
395             split_list::details::search_value_type<Q const>  sv( val, split_list::regular_hash( nHash ));
396             dummy_node_type * pHead = get_bucket( nHash );
397             assert( pHead != nullptr );
398
399             return m_Stat.onFind( m_List.find_at( pHead, sv, cmp ));
400         }
401
402         template <typename Q, typename Compare>
403         value_type * get_( Q const& val, Compare cmp )
404         {
405             size_t nHash = hash_value( val );
406             split_list::details::search_value_type<Q const>  sv( val, split_list::regular_hash( nHash ));
407             dummy_node_type * pHead = get_bucket( nHash );
408             assert( pHead != nullptr );
409
410             value_type * p = m_List.get_at( pHead, sv, cmp );
411             m_Stat.onFind( p != nullptr );
412             return p;
413         }
414
415         template <typename Q, typename Compare>
416         value_type * extract_( Q const& val, Compare cmp )
417         {
418             size_t nHash = hash_value( val );
419             split_list::details::search_value_type<Q const>  sv( val, split_list::regular_hash( nHash ));
420             dummy_node_type * pHead = get_bucket( nHash );
421             assert( pHead != nullptr );
422
423             value_type * pNode = m_List.extract_at( pHead, sv, cmp );
424             if ( pNode ) {
425                 --m_ItemCounter;
426                 m_Stat.onExtractSuccess();
427             }
428             else
429                 m_Stat.onExtractFailed();
430             return pNode;
431         }
432
433         template <typename Q, typename Less>
434         value_type * extract_with_( Q const& val, Less pred )
435         {
436             CDS_UNUSED( pred );
437             return extract_( val, typename wrapped_ordered_list::template make_compare_from_less<Less>());
438         }
439
440         template <typename Q, typename Compare>
441         bool erase_( const Q& val, Compare cmp )
442         {
443             size_t nHash = hash_value( val );
444             split_list::details::search_value_type<Q const>  sv( val, split_list::regular_hash( nHash ));
445             dummy_node_type * pHead = get_bucket( nHash );
446             assert( pHead != nullptr );
447
448             if ( m_List.erase_at( pHead, sv, cmp ) ) {
449                 --m_ItemCounter;
450                 m_Stat.onEraseSuccess();
451                 return true;
452             }
453             m_Stat.onEraseFailed();
454             return false;
455         }
456
457         template <typename Q, typename Compare, typename Func>
458         bool erase_( Q const& val, Compare cmp, Func f )
459         {
460             size_t nHash = hash_value( val );
461             split_list::details::search_value_type<Q const>  sv( val, split_list::regular_hash( nHash ));
462             dummy_node_type * pHead = get_bucket( nHash );
463             assert( pHead != nullptr );
464
465             if ( m_List.erase_at( pHead, sv, cmp, f )) {
466                 --m_ItemCounter;
467                 m_Stat.onEraseSuccess();
468                 return true;
469             }
470             m_Stat.onEraseFailed();
471             return false;
472         }
473
474         //@endcond
475
476     public:
477         /// Initialize split-ordered list of default capacity
478         /**
479             The default capacity is defined in bucket table constructor.
480             See split_list::expandable_bucket_table, split_list::static_ducket_table
481             which selects by split_list::dynamic_bucket_table option.
482         */
483         SplitListSet()
484             : m_nBucketCountLog2(1)
485             , m_nMaxItemCount( max_item_count(2, m_Buckets.load_factor()) )
486         {
487             init();
488         }
489
490         /// Initialize split-ordered list
491         SplitListSet(
492             size_t nItemCount           ///< estimate average of item count
493             , size_t nLoadFactor = 1    ///< load factor - average item count per bucket. Small integer up to 8, default is 1.
494             )
495             : m_Buckets( nItemCount, nLoadFactor )
496             , m_nBucketCountLog2(1)
497             , m_nMaxItemCount( max_item_count(2, m_Buckets.load_factor()) )
498         {
499             init();
500         }
501
502     public:
503         /// Inserts new node
504         /**
505             The function inserts \p val in the set if it does not contain
506             an item with key equal to \p val.
507
508             The function makes RCU lock internally.
509
510             Returns \p true if \p val is placed into the set, \p false otherwise.
511         */
512         bool insert( value_type& val )
513         {
514             size_t nHash = hash_value( val );
515             dummy_node_type * pHead = get_bucket( nHash );
516             assert( pHead != nullptr );
517
518             node_traits::to_node_ptr( val )->m_nHash = split_list::regular_hash( nHash );
519
520             if ( m_List.insert_at( pHead, val )) {
521                 inc_item_count();
522                 m_Stat.onInsertSuccess();
523                 return true;
524             }
525             m_Stat.onInsertFailed();
526             return false;
527         }
528
529         /// Inserts new node
530         /**
531             This function is intended for derived non-intrusive containers.
532
533             The function allows to split creating of new item into two part:
534             - create item with key only
535             - insert new item into the set
536             - if inserting is success, calls  \p f functor to initialize value-field of \p val.
537
538             The functor signature is:
539             \code
540                 void func( value_type& val );
541             \endcode
542             where \p val is the item inserted.
543
544             The function makes RCU lock internally.
545
546             @warning For \ref cds_intrusive_MichaelList_rcu "MichaelList" as the bucket see \ref cds_intrusive_item_creating "insert item troubleshooting".
547             \ref cds_intrusive_LazyList_rcu "LazyList" provides exclusive access to inserted item and does not require any node-level
548             synchronization.
549             */
550         template <typename Func>
551         bool insert( value_type& val, Func f )
552         {
553             size_t nHash = hash_value( val );
554             dummy_node_type * pHead = get_bucket( nHash );
555             assert( pHead != nullptr );
556
557             node_traits::to_node_ptr( val )->m_nHash = split_list::regular_hash( nHash );
558
559             if ( m_List.insert_at( pHead, val, f )) {
560                 inc_item_count();
561                 m_Stat.onInsertSuccess();
562                 return true;
563             }
564             m_Stat.onInsertFailed();
565             return false;
566         }
567
568         /// Ensures that the \p val exists in the set
569         /**
570             The operation performs inserting or changing data with lock-free manner.
571
572             If the item \p val is not found in the set, then \p val is inserted into the set.
573             Otherwise, the functor \p func is called with item found.
574             The functor signature is:
575             \code
576                 void func( bool bNew, value_type& item, value_type& val );
577             \endcode
578             with arguments:
579             - \p bNew - \p true if the item has been inserted, \p false otherwise
580             - \p item - item of the set
581             - \p val - argument \p val passed into the \p ensure function
582             If new item has been inserted (i.e. \p bNew is \p true) then \p item and \p val arguments
583             refers to the same thing.
584
585             The function makes RCU lock internally.
586
587             Returns std::pair<bool, bool> where \p first is \p true if operation is successfull,
588             \p second is \p true if new item has been added or \p false if the item with \p key
589             already is in the set.
590
591             @warning For \ref cds_intrusive_MichaelList_rcu "MichaelList" as the bucket see \ref cds_intrusive_item_creating "insert item troubleshooting".
592             \ref cds_intrusive_LazyList_rcu "LazyList" provides exclusive access to inserted item and does not require any node-level
593             synchronization.
594             */
595         template <typename Func>
596         std::pair<bool, bool> ensure( value_type& val, Func func )
597         {
598             size_t nHash = hash_value( val );
599             dummy_node_type * pHead = get_bucket( nHash );
600             assert( pHead != nullptr );
601
602             node_traits::to_node_ptr( val )->m_nHash = split_list::regular_hash( nHash );
603
604             std::pair<bool, bool> bRet = m_List.ensure_at( pHead, val, func );
605             if ( bRet.first && bRet.second ) {
606                 inc_item_count();
607                 m_Stat.onEnsureNew();
608             }
609             else
610                 m_Stat.onEnsureExist();
611             return bRet;
612         }
613
614         /// Unlinks the item \p val from the set
615         /**
616             The function searches the item \p val in the set and unlinks it from the set
617             if it is found and is equal to \p val.
618
619             Difference between \ref erase and \p unlink functions: \p erase finds <i>a key</i>
620             and deletes the item found. \p unlink finds an item by key and deletes it
621             only if \p val is an item of that set, i.e. the pointer to item found
622             is equal to <tt> &val </tt>.
623
624             RCU \p synchronize method can be called, therefore, RCU should not be locked.
625
626             The function returns \p true if success and \p false otherwise.
627         */
628         bool unlink( value_type& val )
629         {
630             size_t nHash = hash_value( val );
631             dummy_node_type * pHead = get_bucket( nHash );
632             assert( pHead != nullptr );
633
634             if ( m_List.unlink_at( pHead, val ) ) {
635                 --m_ItemCounter;
636                 m_Stat.onEraseSuccess();
637                 return true;
638             }
639             m_Stat.onEraseFailed();
640             return false;
641         }
642
643         /// Deletes the item from the set
644         /** \anchor cds_intrusive_SplitListSet_rcu_erase
645             The function searches an item with key equal to \p key in the set,
646             unlinks it from the set, and returns \p true.
647             If the item with key equal to \p key is not found the function return \p false.
648
649             Difference between \ref erase and \p unlink functions: \p erase finds <i>a key</i>
650             and deletes the item found. \p unlink finds an item by key and deletes it
651             only if \p key is an item of that set, i.e. the pointer to item found
652             is equal to <tt> &key </tt>.
653
654             RCU \p synchronize method can be called, therefore, RCU should not be locked.
655
656             Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type.
657         */
658         template <typename Q>
659         bool erase( Q const& key )
660         {
661             return erase_( key, key_comparator() );
662         }
663
664         /// Deletes the item from the set using \p pred for searching
665         /**
666             The function is an analog of \ref cds_intrusive_SplitListSet_rcu_erase "erase(Q const&)"
667             but \p cmp is used for key compare.
668             \p Less has the interface like \p std::less.
669             \p pred must imply the same element order as the comparator used for building the set.
670         */
671         template <typename Q, typename Less>
672         bool erase_with( Q const& key, Less pred )
673         {
674             CDS_UNUSED( pred );
675             return erase_( key, typename wrapped_ordered_list::template make_compare_from_less<Less>() );
676         }
677
678         /// Deletes the item from the set
679         /** \anchor cds_intrusive_SplitListSet_rcu_erase_func
680             The function searches an item with key equal to \p key in the set,
681             call \p f functor with item found, unlinks it from the set, and returns \p true.
682             The \ref disposer specified by \p OrderedList class template parameter is called
683             by garbage collector \p GC asynchronously.
684
685             The \p Func interface is
686             \code
687             struct functor {
688                 void operator()( value_type const& item );
689             };
690             \endcode
691
692             If the item with key equal to \p key is not found the function return \p false.
693
694             RCU \p synchronize method can be called, therefore, RCU should not be locked.
695
696             Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type.
697         */
698         template <typename Q, typename Func>
699         bool erase( Q const& key, Func f )
700         {
701             return erase_( key, key_comparator(), f );
702         }
703
704         /// Deletes the item from the set using \p pred for searching
705         /**
706             The function is an analog of \ref cds_intrusive_SplitListSet_rcu_erase_func "erase(Q const&, Func)"
707             but \p cmp is used for key compare.
708             \p Less has the interface like \p std::less.
709             \p pred must imply the same element order as the comparator used for building the set.
710         */
711         template <typename Q, typename Less, typename Func>
712         bool erase_with( Q const& key, Less pred, Func f )
713         {
714             CDS_UNUSED( pred );
715             return erase_( key, typename wrapped_ordered_list::template make_compare_from_less<Less>(), f );
716         }
717
718         /// Extracts an item from the set
719         /** \anchor cds_intrusive_SplitListSet_rcu_extract
720             The function searches an item with key equal to \p key in the set,
721             unlinks it, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item found.
722             If the item with the key equal to \p key is not found the function returns an empty \p exempt_ptr.
723
724             @note The function does NOT call RCU read-side lock or synchronization,
725             and does NOT dispose the item found. It just excludes the item from the set
726             and returns a pointer to item found.
727             You should lock RCU before calling of the function, and you should synchronize RCU
728             outside the RCU lock before reusing returned pointer.
729
730             \code
731             typedef cds::urcu::gc< general_buffered<> > rcu;
732             typedef cds::intrusive::MichaelList< rcu, Foo > rcu_michael_list;
733             typedef cds::intrusive::SplitListSet< rcu, rcu_michael_list, foo_traits > rcu_splitlist_set;
734
735             rcu_splitlist_set theSet;
736             // ...
737
738             rcu_splitlist_set::exempt_ptr p;
739             {
740                 // first, we should lock RCU
741                 rcu_splitlist_set::rcu_lock lock;
742
743                 // Now, you can apply extract function
744                 // Note that you must not delete the item found inside the RCU lock
745                 p = theList.extract( 10 );
746                 if ( p ) {
747                     // do something with p
748                     ...
749                 }
750             }
751
752             // We may safely release p here
753             // release() passes the pointer to RCU reclamation cycle:
754             // it invokes RCU retire_ptr function with the disposer you provided for rcu_michael_list.
755             p.release();
756             \endcode
757         */
758         template <typename Q>
759         exempt_ptr extract( Q const& key )
760         {
761             return exempt_ptr(extract_( key, key_comparator() ));
762         }
763
764         /// Extracts an item from the set using \p pred for searching
765         /**
766             The function is an analog of \p extract(Q const&) but \p pred is used for key compare.
767             \p Less functor has the interface like \p std::less.
768             \p pred must imply the same element order as the comparator used for building the set.
769         */
770         template <typename Q, typename Less>
771         exempt_ptr extract_with( Q const& key, Less pred )
772         {
773             return exempt_ptr( extract_with_( key, pred ));
774         }
775
776         /// Finds the key \p key
777         /** \anchor cds_intrusive_SplitListSet_rcu_find_func
778             The function searches the item with key equal to \p key and calls the functor \p f for item found.
779             The interface of \p Func functor is:
780             \code
781             struct functor {
782                 void operator()( value_type& item, Q& key );
783             };
784             \endcode
785             where \p item is the item found, \p key is the <tt>find</tt> function argument.
786
787             The functor can change non-key fields of \p item. Note that the functor is only guarantee
788             that \p item cannot be disposed during functor is executing.
789             The functor does not serialize simultaneous access to the set \p item. If such access is
790             possible you must provide your own synchronization schema on item level to exclude unsafe item modifications.
791
792             The \p key argument is non-const since it can be used as \p f functor destination i.e., the functor
793             can modify both arguments.
794
795             Note the hash functor specified for class \p Traits template parameter
796             should accept a parameter of type \p Q that can be not the same as \p value_type.
797
798             The function applies RCU lock internally.
799
800             The function returns \p true if \p key is found, \p false otherwise.
801         */
802         template <typename Q, typename Func>
803         bool find( Q& key, Func f )
804         {
805             return find_( key, key_comparator(), f );
806         }
807         //@cond
808         template <typename Q, typename Func>
809         bool find( Q const& key, Func f )
810         {
811             return find_( key, key_comparator(), f );
812         }
813         //@endcond
814
815         /// Finds the key \p key with \p pred predicate for comparing
816         /**
817             The function is an analog of \ref cds_intrusive_SplitListSet_rcu_find_func "find(Q&, Func)"
818             but \p cmp is used for key compare.
819             \p Less has the interface like \p std::less.
820             \p cmp must imply the same element order as the comparator used for building the set.
821         */
822         template <typename Q, typename Less, typename Func>
823         bool find_with( Q& key, Less pred, Func f )
824         {
825             CDS_UNUSED( pred );
826             return find_( key, typename wrapped_ordered_list::template make_compare_from_less<Less>(), f );
827         }
828         //@cond
829         template <typename Q, typename Less, typename Func>
830         bool find_with( Q const& key, Less pred, Func f )
831         {
832             CDS_UNUSED( pred );
833             return find_( key, typename wrapped_ordered_list::template make_compare_from_less<Less>(), f );
834         }
835         //@endcond
836
837         /// Finds the key \p key
838         /** \anchor cds_intrusive_SplitListSet_rcu_find_val
839             The function searches the item with key equal to \p key
840             and returns \p true if \p key found or \p false otherwise.
841         */
842         template <typename Q>
843         bool find( Q const& key )
844         {
845             return find_value( key, key_comparator() );
846         }
847
848         /// Finds the key \p key with \p pred predicate for comparing
849         /**
850             The function is an analog of \ref cds_intrusive_SplitListSet_rcu_find_val "find(Q const&)"
851             but \p cmp is used for key compare.
852             \p Less has the interface like \p std::less.
853             \p pred must imply the same element order as the comparator used for building the set.
854         */
855         template <typename Q, typename Less>
856         bool find_with( Q const& key, Less pred )
857         {
858             CDS_UNUSED( pred );
859             return find_value( key, typename wrapped_ordered_list::template make_compare_from_less<Less>() );
860         }
861
862         /// Finds the key \p key and return the item found
863         /** \anchor cds_intrusive_SplitListSet_rcu_get
864             The function searches the item with key equal to \p key and returns the pointer to item found.
865             If \p key is not found it returns \p nullptr.
866
867             Note the compare functor should accept a parameter of type \p Q that can be not the same as \p value_type.
868
869             RCU should be locked before call of this function.
870             Returned item is valid only while RCU is locked:
871             \code
872             cds::intrusive::SplitListSet< your_template_parameters > theSet;
873             // ...
874             {
875                 // Lock RCU
876                 hash_set::rcu_lock lock;
877
878                 foo * pVal = theSet.get( 5 );
879                 if ( pVal ) {
880                     // Deal with pVal
881                     //...
882                 }
883                 // Unlock RCU by rcu_lock destructor
884                 // pVal can be retired by disposer at any time after RCU has been unlocked
885             }
886             \endcode
887         */
888         template <typename Q>
889         value_type * get( Q const& key )
890         {
891             return get_( key, key_comparator() );
892         }
893
894         /// Finds the key \p key and return the item found
895         /**
896             The function is an analog of \ref cds_intrusive_SplitListSet_rcu_get "get(Q const&)"
897             but \p pred is used for comparing the keys.
898
899             \p Less functor has the semantics like \p std::less but should take arguments of type \ref value_type and \p Q
900             in any order.
901             \p pred must imply the same element order as the comparator used for building the set.
902         */
903         template <typename Q, typename Less>
904         value_type * get_with( Q const& key, Less pred )
905         {
906             CDS_UNUSED( pred );
907             return get_( key, typename wrapped_ordered_list::template make_compare_from_less<Less>());
908         }
909
910
911         /// Returns item count in the set
912         size_t size() const
913         {
914             return m_ItemCounter;
915         }
916
917         /// Checks if the set is empty
918         /**
919             Emptiness is checked by item counting: if item count is zero then the set is empty.
920             Thus, the correct item counting feature is an important part of split-list set implementation.
921         */
922         bool empty() const
923         {
924             return size() == 0;
925         }
926
927         /// Clears the set (not atomic)
928         void clear()
929         {
930             iterator it = begin();
931             while ( it != end() ) {
932                 iterator i(it);
933                 ++i;
934                 unlink( *it );
935                 it = i;
936             }
937         }
938
939         /// Returns internal statistics
940         stat const& statistics() const
941         {
942             return m_Stat;
943         }
944
945     protected:
946         //@cond
947         template <bool IsConst>
948         class iterator_type
949             :public split_list::details::iterator_type<node_traits, ordered_list, IsConst>
950         {
951             typedef split_list::details::iterator_type<node_traits, ordered_list, IsConst> iterator_base_class;
952             typedef typename iterator_base_class::list_iterator list_iterator;
953         public:
954             iterator_type()
955                 : iterator_base_class()
956             {}
957
958             iterator_type( iterator_type const& src )
959                 : iterator_base_class( src )
960             {}
961
962             // This ctor should be protected...
963             iterator_type( list_iterator itCur, list_iterator itEnd )
964                 : iterator_base_class( itCur, itEnd )
965             {}
966         };
967         //@endcond
968     public:
969         /// Forward iterator
970         /**
971             The forward iterator for a split-list has some features:
972             - it has no post-increment operator
973             - it depends on iterator of underlying \p OrderedList
974             - The iterator cannot be moved across thread boundary since it may contain GC's guard that is thread-private GC data.
975             - Iterator ensures thread-safety even if you delete the item that iterator points to. However, in case of concurrent
976               deleting operations it is no guarantee that you iterate all item in the split-list.
977
978             Therefore, the use of iterators in concurrent environment is not good idea. Use the iterator on the concurrent container
979             for debug purpose only.
980         */
981         typedef iterator_type<false>    iterator;
982         /// Const forward iterator
983         /**
984             For iterator's features and requirements see \ref iterator
985         */
986         typedef iterator_type<true>     const_iterator;
987
988         /// Returns a forward iterator addressing the first element in a split-list
989         /**
990             For empty list \code begin() == end() \endcode
991         */
992         iterator begin()
993         {
994             return iterator( m_List.begin(), m_List.end() );
995         }
996
997         /// Returns an iterator that addresses the location succeeding the last element in a split-list
998         /**
999             Do not use the value returned by <tt>end</tt> function to access any item.
1000
1001             The returned value can be used only to control reaching the end of the split-list.
1002             For empty list \code begin() == end() \endcode
1003         */
1004         iterator end()
1005         {
1006             return iterator( m_List.end(), m_List.end() );
1007         }
1008
1009         /// Returns a forward const iterator addressing the first element in a split-list
1010         const_iterator begin() const
1011         {
1012             return cbegin();
1013         }
1014         /// Returns a forward const iterator addressing the first element in a split-list
1015         const_iterator cbegin() const
1016         {
1017             return const_iterator( m_List.cbegin(), m_List.cend() );
1018         }
1019
1020         /// Returns an const iterator that addresses the location succeeding the last element in a split-list
1021         const_iterator end() const
1022         {
1023             return cend();
1024         }
1025         /// Returns an const iterator that addresses the location succeeding the last element in a split-list
1026         const_iterator cend() const
1027         {
1028             return const_iterator( m_List.cend(), m_List.cend() );
1029         }
1030
1031     };
1032
1033 }}  // namespace cds::intrusive
1034
1035 #endif // #ifndef CDSLIB_INTRUSIVE_SPLIT_LIST_RCU_H