c556e81637db1cfed8937536c9d2d06ce8e48a91
[libcds.git] / cds / intrusive / basket_queue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_BASKET_QUEUE_H
4 #define __CDS_INTRUSIVE_BASKET_QUEUE_H
5
6 #include <type_traits>
7 #include <cds/intrusive/details/base.h>
8 #include <cds/details/marked_ptr.h>
9 #include <cds/intrusive/details/queue_stat.h>
10 #include <cds/intrusive/details/single_link_struct.h>
11 #include <cds/ref.h>
12 #include <cds/intrusive/details/dummy_node_holder.h>
13
14 namespace cds { namespace intrusive {
15
16     /// BasketQueue -related definitions
17     /** @ingroup cds_intrusive_helper
18     */
19     namespace basket_queue {
20         /// BasketQueue node
21         /**
22             Template parameters:
23             - GC - garbage collector used
24             - Tag - a tag used to distinguish between different implementation
25         */
26         template <class GC, typename Tag = opt::none>
27         struct node: public GC::container_node
28         {
29             typedef GC      gc  ;   ///< Garbage collector
30             typedef Tag     tag ;   ///< tag
31
32             typedef cds::details::marked_ptr<node, 1>   marked_ptr         ;   ///< marked pointer
33             typedef typename gc::template atomic_marked_ptr< marked_ptr>     atomic_marked_ptr   ;   ///< atomic marked pointer specific for GC
34
35             /// Rebind node for other template parameters
36             template <class GC2, typename Tag2 = tag>
37             struct rebind {
38                 typedef node<GC2, Tag2>  other ;    ///< Rebinding result
39             };
40
41             atomic_marked_ptr m_pNext ; ///< pointer to the next node in the container
42
43             node()
44                 : m_pNext( nullptr )
45             {}
46         };
47
48         //@cond
49         // Specialization for HRC GC
50         template <typename Tag>
51         struct node< gc::HRC, Tag>: public gc::HRC::container_node
52         {
53             typedef gc::HRC gc  ;   ///< Garbage collector
54             typedef Tag     tag ;   ///< tag
55
56             typedef cds::details::marked_ptr<node, 1>   marked_ptr         ;   ///< marked pointer
57             typedef typename gc::template atomic_marked_ptr< marked_ptr>     atomic_marked_ptr   ;   ///< atomic marked pointer specific for GC
58
59             atomic_marked_ptr m_pNext ; ///< pointer to the next node in the container
60
61             node()
62                 : m_pNext( nullptr )
63             {}
64
65         protected:
66             virtual void cleanUp( cds::gc::hrc::ThreadGC * pGC )
67             {
68                 assert( pGC != nullptr );
69                 typename gc::template GuardArray<2> aGuards( *pGC );
70
71                 while ( true ) {
72                     marked_ptr pNext = aGuards.protect( 0, m_pNext );
73                     if ( pNext.ptr() && pNext->m_bDeleted.load(atomics::memory_order_acquire) ) {
74                         marked_ptr p = aGuards.protect( 1, pNext->m_pNext );
75                         m_pNext.compare_exchange_strong( pNext, p, atomics::memory_order_acquire, atomics::memory_order_relaxed );
76                         continue;
77                     }
78                     else {
79                         break;
80                     }
81                 }
82             }
83
84             virtual void terminate( cds::gc::hrc::ThreadGC * pGC, bool bConcurrent )
85             {
86                 if ( bConcurrent ) {
87                     marked_ptr pNext = m_pNext.load(atomics::memory_order_relaxed);
88                     do {} while ( !m_pNext.compare_exchange_weak( pNext, marked_ptr(), atomics::memory_order_release, atomics::memory_order_relaxed ) );
89                 }
90                 else {
91                     m_pNext.store( marked_ptr(), atomics::memory_order_relaxed );
92                 }
93             }
94         };
95         //@endcond
96
97         using single_link::default_hook;
98
99         //@cond
100         template < typename HookType, typename... Options>
101         struct hook
102         {
103             typedef typename opt::make_options< default_hook, Options...>::type  options;
104             typedef typename options::gc    gc;
105             typedef typename options::tag   tag;
106             typedef node<gc, tag> node_type;
107             typedef HookType     hook_type;
108         };
109         //@endcond
110
111
112         /// Base hook
113         /**
114             \p Options are:
115             - opt::gc - garbage collector used.
116             - opt::tag - tag
117         */
118         template < typename... Options >
119         struct base_hook: public hook< opt::base_hook_tag, Options... >
120         {};
121
122         /// Member hook
123         /**
124             \p MemberOffset defines offset in bytes of \ref node member into your structure.
125             Use \p offsetof macro to define \p MemberOffset
126
127             \p Options are:
128             - opt::gc - garbage collector used.
129             - opt::tag - tag
130         */
131         template < size_t MemberOffset, typename... Options >
132         struct member_hook: public hook< opt::member_hook_tag, Options... >
133         {
134             //@cond
135             static const size_t c_nMemberOffset = MemberOffset;
136             //@endcond
137         };
138
139         /// Traits hook
140         /**
141             \p NodeTraits defines type traits for node.
142             See \ref node_traits for \p NodeTraits interface description
143
144             \p Options are:
145             - opt::gc - garbage collector used.
146             - opt::tag - tag
147         */
148         template <typename NodeTraits, typename... Options >
149         struct traits_hook: public hook< opt::traits_hook_tag, Options... >
150         {
151             //@cond
152             typedef NodeTraits node_traits;
153             //@endcond
154         };
155
156         /// Metafunction for selecting appropriate link checking policy
157         template < typename Node, opt::link_check_type LinkType > using get_link_checker = single_link::get_link_checker< Node, LinkType >;
158
159         /// Basket queue internal statistics. May be used for debugging or profiling
160         /**
161             Basket queue statistics derives from cds::intrusive::queue_stat
162             and extends it by two additional fields specific for the algorithm.
163         */
164         template <typename Counter = cds::atomicity::event_counter >
165         struct stat: public cds::intrusive::queue_stat< Counter >
166         {
167             //@cond
168             typedef cds::intrusive::queue_stat< Counter >   base_class;
169             typedef typename base_class::counter_type       counter_type;
170             //@endcond
171
172             counter_type m_TryAddBasket      ;  ///< Count of attemps adding new item to a basket (only or BasketQueue, for other queue this metric is not used)
173             counter_type m_AddBasketCount    ;  ///< Count of events "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
174
175             /// Register an attempt t add new item to basket
176             void onTryAddBasket()           { ++m_TryAddBasket; }
177             /// Register event "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
178             void onAddBasket()              { ++m_AddBasketCount; }
179
180             //@cond
181             void reset()
182             {
183                 base_class::reset();
184                 m_TryAddBasket.reset();
185                 m_AddBasketCount.reset();
186             }
187
188             stat& operator +=( stat const& s )
189             {
190                 base_class::operator +=( s );
191                 m_TryAddBasket += s.m_TryAddBasket.get();
192                 m_AddBasketCount += s.m_AddBasketCount.get();
193                 return *this;
194             }
195             //@endcond
196         };
197
198         /// Dummy basket queue statistics - no counting is performed. Support interface like \ref stat
199         struct dummy_stat: public cds::intrusive::queue_dummy_stat
200         {
201             //@cond
202             void onTryAddBasket()           {}
203             void onAddBasket()              {}
204
205             void reset() {}
206             dummy_stat& operator +=( dummy_stat const& )
207             {
208                 return *this;
209             }
210             //@endcond
211         };
212
213     }   // namespace basket_queue
214
215     /// Basket lock-free queue (intrusive variant)
216     /** @ingroup cds_intrusive_queue
217         Implementation of basket queue algorithm.
218
219         \par Source:
220             [2007] Moshe Hoffman, Ori Shalev, Nir Shavit "The Baskets Queue"
221
222         <b>Key idea</b>
223
224         In the \93basket\94 approach, instead of
225         the traditional ordered list of nodes, the queue consists of an ordered list of groups
226         of nodes (logical baskets). The order of nodes in each basket need not be specified, and in
227         fact, it is easiest to maintain them in FIFO order. The baskets fulfill the following basic
228         rules:
229         - Each basket has a time interval in which all its nodes\92 enqueue operations overlap.
230         - The baskets are ordered by the order of their respective time intervals.
231         - For each basket, its nodes\92 dequeue operations occur after its time interval.
232         - The dequeue operations are performed according to the order of baskets.
233
234         Two properties define the FIFO order of nodes:
235         - The order of nodes in a basket is not specified.
236         - The order of nodes in different baskets is the FIFO-order of their respective baskets.
237
238         In algorithms such as the MS-queue or optimistic
239         queue, threads enqueue items by applying a Compare-and-swap (CAS) operation to the
240         queue\92s tail pointer, and all the threads that fail on a particular CAS operation (and also
241         the winner of that CAS) overlap in time. In particular, they share the time interval of
242         the CAS operation itself. Hence, all the threads that fail to CAS on the tail-node of
243         the queue may be inserted into the same basket. By integrating the basket-mechanism
244         as the back-off mechanism, the time usually spent on backing-off before trying to link
245         onto the new tail, can now be utilized to insert the failed operations into the basket,
246         allowing enqueues to complete sooner. In the meantime, the next successful CAS operations
247         by enqueues allow new baskets to be formed down the list, and these can be
248         filled concurrently. Moreover, the failed operations don\92t retry their link attempt on the
249         new tail, lowering the overall contention on it. This leads to a queue
250         algorithm that unlike all former concurrent queue algorithms requires virtually no tuning
251         of the backoff mechanisms to reduce contention, making the algorithm an attractive
252         out-of-the-box queue.
253
254         In order to enqueue, just as in MSQueue, a thread first tries to link the new node to
255         the last node. If it failed to do so, then another thread has already succeeded. Thus it
256         tries to insert the new node into the new basket that was created by the winner thread.
257         To dequeue a node, a thread first reads the head of the queue to obtain the
258         oldest basket. It may then dequeue any node in the oldest basket.
259
260         <b>Template arguments:</b>
261         - \p GC - garbage collector type: gc::HP, gc::HRC, gc::PTB
262         - \p T - type to be stored in the queue, should be convertible to \ref single_link::node
263         - \p Options - options
264
265         <b>Type of node</b>: \ref single_link::node
266
267         \p Options are:
268         - opt::hook - hook used. Possible values are: basket_queue::base_hook, basket_queue::member_hook, basket_queue::traits_hook.
269             If the option is not specified, <tt>basket_queue::base_hook<></tt> is used.
270             For Gidenstam's gc::HRC, only basket_queue::base_hook is supported.
271         - opt::back_off - back-off strategy used. If the option is not specified, the cds::backoff::empty is used.
272         - opt::disposer - the functor used for dispose removed items. Default is opt::v::empty_disposer. This option is used
273             in \ref dequeue function.
274         - opt::link_checker - the type of node's link fields checking. Default is \ref opt::debug_check_link
275             Note: for gc::HRC garbage collector, link checking policy is always selected as \ref opt::always_check_link.
276         - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter (no item counting feature)
277         - opt::stat - the type to gather internal statistics.
278             Possible option value are: \ref basket_queue::stat, \ref basket_queue::dummy_stat,
279             user-provided class that supports basket_queue::stat interface.
280             Default is \ref basket_queue::dummy_stat.
281             Generic option intrusive::queue_stat and intrusive::queue_dummy_stat are acceptable too, however,
282             they will be automatically converted to basket_queue::stat and basket_queue::dummy_stat
283             respectively.
284         - opt::alignment - the alignment for internal queue data. Default is opt::cache_line_alignment
285         - opt::memory_model - C++ memory ordering model. Can be opt::v::relaxed_ordering (relaxed memory model, the default)
286             or opt::v::sequential_consistent (sequentially consisnent memory model).
287
288         Garbage collecting schema \p GC must be consistent with the basket_queue::node GC.
289
290         \par About item disposing
291         Like MSQueue, the Baskets queue algo has a key feature: even if the queue is empty it contains one item that is "dummy" one from
292         the standpoint of the algo. See \ref dequeue function doc for explanation.
293
294         \par Examples
295         \code
296         #include <cds/intrusive/basket_queue.h>
297         #include <cds/gc/hp.h>
298
299         namespace ci = cds::inrtusive;
300         typedef cds::gc::HP hp_gc;
301
302         // Basket queue with Hazard Pointer garbage collector, base hook + item disposer:
303         struct Foo: public ci::basket_queue::node< hp_gc >
304         {
305             // Your data
306             ...
307         };
308
309         // Disposer for Foo struct just deletes the object passed in
310         struct fooDisposer {
311             void operator()( Foo * p )
312             {
313                 delete p;
314             }
315         };
316
317         typedef ci::BasketQueue< hp_gc,
318             Foo
319             ,ci::opt::hook<
320                 ci::basket_queue::base_hook< ci::opt::gc<hp_gc> >
321             >
322             ,ci::opt::disposer< fooDisposer >
323         > fooQueue;
324
325         // BasketQueue with Hazard Pointer garbage collector,
326         // member hook + item disposer + item counter,
327         // without alignment of internal queue data:
328         struct Bar
329         {
330             // Your data
331             ...
332             ci::basket_queue::node< hp_gc > hMember;
333         };
334
335         typedef ci::BasketQueue< hp_gc,
336             Foo
337             ,ci::opt::hook<
338                 ci::basket_queue::member_hook<
339                     offsetof(Bar, hMember)
340                     ,ci::opt::gc<hp_gc>
341                 >
342             >
343             ,ci::opt::disposer< fooDisposer >
344             ,cds::opt::item_counter< cds::atomicity::item_counter >
345             ,cds::opt::alignment< cds::opt::no_special_alignment >
346         > barQueue;
347         \endcode
348     */
349     template <typename GC, typename T, typename... Options>
350     class BasketQueue
351     {
352         //@cond
353         struct default_options
354         {
355             typedef cds::backoff::empty             back_off;
356             typedef basket_queue::base_hook<>       hook;
357             typedef opt::v::empty_disposer          disposer;
358             typedef atomicity::empty_item_counter   item_counter;
359             typedef basket_queue::dummy_stat        stat;
360             typedef opt::v::relaxed_ordering        memory_model;
361             static const opt::link_check_type link_checker = opt::debug_check_link;
362             enum { alignment = opt::cache_line_alignment };
363         };
364         //@endcond
365
366     public:
367         //@cond
368         typedef typename opt::make_options<
369             typename cds::opt::find_type_traits< default_options, Options... >::type
370             ,Options...
371         >::type   options;
372
373         typedef typename std::conditional<
374             std::is_same<typename options::stat, cds::intrusive::queue_stat<> >::value
375             ,basket_queue::stat<>
376             ,typename std::conditional<
377                 std::is_same<typename options::stat, cds::intrusive::queue_dummy_stat>::value
378                 ,basket_queue::dummy_stat
379                 ,typename options::stat
380             >::type
381         >::type stat_type_;
382
383         //@endcond
384
385     public:
386         typedef T  value_type   ;   ///< type of value stored in the queue
387         typedef typename options::hook      hook        ;   ///< hook type
388         typedef typename hook::node_type    node_type   ;   ///< node type
389         typedef typename options::disposer  disposer    ;   ///< disposer used
390         typedef typename get_node_traits< value_type, node_type, hook>::type node_traits ;    ///< node traits
391         typedef typename basket_queue::get_link_checker< node_type, options::link_checker >::type link_checker   ;   ///< link checker
392
393         typedef GC gc          ;   ///< Garbage collector
394         typedef typename options::back_off  back_off    ;   ///< back-off strategy
395         typedef typename options::item_counter item_counter ;   ///< Item counting policy used
396 #ifdef CDS_DOXYGEN_INVOKED
397         typedef typename options::stat      stat        ;   ///< Internal statistics policy used
398 #else
399         typedef stat_type_  stat;
400 #endif
401         typedef typename options::memory_model  memory_model ;   ///< Memory ordering. See cds::opt::memory_model option
402
403         /// Rebind template arguments
404         template <typename GC2, typename T2, typename... Options2>
405         struct rebind {
406             typedef BasketQueue< GC2, T2, Options2...> other   ;   ///< Rebinding result
407         };
408
409         static const size_t m_nHazardPtrCount = 6 ; ///< Count of hazard pointer required for the algorithm
410
411     protected:
412         //@cond
413
414         struct internal_disposer
415         {
416             void operator()( value_type * p )
417             {
418                 assert( p != nullptr );
419
420                 BasketQueue::clear_links( node_traits::to_node_ptr(p) );
421                 disposer()( p );
422             }
423         };
424
425         typedef typename node_type::marked_ptr   marked_ptr;
426         typedef typename node_type::atomic_marked_ptr atomic_marked_ptr;
427
428         typedef intrusive::node_to_value<BasketQueue> node_to_value;
429         typedef typename opt::details::alignment_setter< atomic_marked_ptr, options::alignment >::type aligned_node_ptr;
430         typedef typename opt::details::alignment_setter<
431             cds::intrusive::details::dummy_node< gc, node_type>,
432             options::alignment
433         >::type    dummy_node_type;
434
435         //@endcond
436
437         aligned_node_ptr    m_pHead ;           ///< Queue's head pointer (aligned)
438         aligned_node_ptr    m_pTail ;           ///< Queue's tail pointer (aligned)
439
440         dummy_node_type     m_Dummy ;           ///< dummy node
441         item_counter        m_ItemCounter   ;   ///< Item counter
442         stat                m_Stat  ;           ///< Internal statistics
443         //@cond
444         size_t const        m_nMaxHops;
445         //@endcond
446
447         //@cond
448
449         template <size_t Count>
450         static marked_ptr guard_node( typename gc::template GuardArray<Count>& g, size_t idx, atomic_marked_ptr const& p )
451         {
452             marked_ptr pg;
453             while ( true ) {
454                 pg = p.load( memory_model::memory_order_relaxed );
455                 g.assign( idx, node_traits::to_value_ptr( pg.ptr() ) );
456                 if ( p.load( memory_model::memory_order_acquire) == pg ) {
457                     return pg;
458                 }
459             }
460         }
461
462         static marked_ptr guard_node( typename gc::Guard& g, atomic_marked_ptr const& p )
463         {
464             marked_ptr pg;
465             while ( true ) {
466                 pg = p.load( memory_model::memory_order_relaxed );
467                 g.assign( node_traits::to_value_ptr( pg.ptr() ) );
468                 if ( p.load( memory_model::memory_order_acquire) == pg ) {
469                     return pg;
470                 }
471             }
472         }
473
474         struct dequeue_result {
475             typename gc::template GuardArray<3>  guards;
476             node_type * pNext;
477         };
478
479         bool do_dequeue( dequeue_result& res, bool bDeque )
480         {
481             // Note:
482             // If bDeque == false then the function is called from empty method and no real dequeuing operation is performed
483
484             back_off bkoff;
485
486             marked_ptr h;
487             marked_ptr t;
488             marked_ptr pNext;
489
490             while ( true ) {
491                 h = guard_node( res.guards, 0, m_pHead );
492                 t = guard_node( res.guards, 1, m_pTail );
493                 pNext = guard_node( res.guards, 2, h->m_pNext );
494
495                 if ( h == m_pHead.load( memory_model::memory_order_acquire ) ) {
496                     if ( h.ptr() == t.ptr() ) {
497                         if ( !pNext.ptr() )
498                             return false;
499
500                         {
501                             typename gc::Guard g;
502                             while ( pNext->m_pNext.load(memory_model::memory_order_relaxed).ptr() && m_pTail.load(memory_model::memory_order_relaxed) == t ) {
503                                 pNext = guard_node( g, pNext->m_pNext );
504                                 res.guards.assign( 2, g.template get<value_type>() );
505                             }
506                         }
507
508                         m_pTail.compare_exchange_weak( t, marked_ptr(pNext.ptr()), memory_model::memory_order_release, memory_model::memory_order_relaxed );
509                     }
510                     else {
511                         marked_ptr iter( h );
512                         size_t hops = 0;
513
514                         typename gc::Guard g;
515
516                         while ( pNext.ptr() && pNext.bits() && iter.ptr() != t.ptr() && m_pHead.load(memory_model::memory_order_relaxed) == h ) {
517                             iter = pNext;
518                             g.assign( res.guards.template get<value_type>(2) );
519                             pNext = guard_node( res.guards, 2, pNext->m_pNext );
520                             ++hops;
521                         }
522
523                         if ( m_pHead.load(memory_model::memory_order_relaxed) != h )
524                             continue;
525
526                         if ( iter.ptr() == t.ptr() )
527                             free_chain( h, iter );
528                         else if ( bDeque ) {
529                             res.pNext = pNext.ptr();
530
531                             if ( iter->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNext.ptr(), 1 ), memory_model::memory_order_release, memory_model::memory_order_relaxed ) ) {
532                                 if ( hops >= m_nMaxHops )
533                                     free_chain( h, pNext );
534                                 break;
535                             }
536                         }
537                         else
538                             return true;
539                     }
540                 }
541
542                 if ( bDeque )
543                     m_Stat.onDequeueRace();
544                 bkoff();
545             }
546
547             if ( bDeque ) {
548                 --m_ItemCounter;
549                 m_Stat.onDequeue();
550             }
551
552             return true;
553         }
554
555         void free_chain( marked_ptr head, marked_ptr newHead )
556         {
557             // "head" and "newHead" are guarded
558
559             if ( m_pHead.compare_exchange_strong( head, marked_ptr(newHead.ptr()), memory_model::memory_order_release, memory_model::memory_order_relaxed ))
560             {
561                 typename gc::template GuardArray<2> guards;
562                 guards.assign( 0, node_traits::to_value_ptr(head.ptr()) );
563                 while ( head.ptr() != newHead.ptr() ) {
564                     marked_ptr pNext = guard_node( guards, 1, head->m_pNext );
565                     assert( pNext.bits() != 0 );
566                     dispose_node( head.ptr() );
567                     guards.assign( 0, guards.template get<value_type>(1) );
568                     head = pNext;
569                 }
570             }
571         }
572
573         static void clear_links( node_type * pNode )
574         {
575             pNode->m_pNext.store( marked_ptr( nullptr ), memory_model::memory_order_release );
576         }
577
578         void dispose_node( node_type * p )
579         {
580             if ( p != m_Dummy.get() ) {
581                 gc::template retire<internal_disposer>( node_traits::to_value_ptr(p) );
582             }
583             else
584                 m_Dummy.retire();
585         }
586         //@endcond
587
588     public:
589         /// Initializes empty queue
590         BasketQueue()
591             : m_pHead( nullptr )
592             , m_pTail( nullptr )
593             , m_nMaxHops( 3 )
594         {
595             // GC and node_type::gc must be the same
596             static_assert(( std::is_same<gc, typename node_type::gc>::value ), "GC and node_type::gc must be the same");
597
598             // For cds::gc::HRC, only one base_hook is allowed
599             static_assert((
600                 std::conditional<
601                     std::is_same<gc, cds::gc::HRC>::value,
602                     std::is_same< typename hook::hook_type, opt::base_hook_tag >,
603                     boost::true_type
604                 >::type::value
605             ), "For cds::gc::HRC, only base_hook is allowed");
606
607             // Head/tail initialization should be made via store call
608             // because of gc::HRC manages reference counting
609             m_pHead.store( marked_ptr(m_Dummy.get()), memory_model::memory_order_relaxed );
610             m_pTail.store( marked_ptr(m_Dummy.get()), memory_model::memory_order_relaxed );
611         }
612
613         /// Destructor clears the queue
614         /**
615             Since the baskets queue contains at least one item even
616             if the queue is empty, the destructor may call item disposer.
617         */
618         ~BasketQueue()
619         {
620             clear();
621
622             node_type * pHead = m_pHead.load(memory_model::memory_order_relaxed).ptr();
623             assert( pHead != nullptr );
624
625             {
626                 node_type * pNext = pHead->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
627                 while ( pNext ) {
628                     node_type * p = pNext;
629                     pNext = pNext->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
630                     p->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
631                     dispose_node( p );
632                 }
633                 pHead->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
634                 //m_pTail.store( marked_ptr( pHead ), memory_model::memory_order_relaxed );
635             }
636
637             m_pHead.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
638             m_pTail.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
639
640             dispose_node( pHead );
641         }
642
643         /// Returns queue's item count
644         /**
645             The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
646             this function always returns 0.
647
648             <b>Warning</b>: even if you use real item counter and it returns 0, this fact is not mean that the queue
649             is empty. To check queue emptyness use \ref empty() method.
650         */
651         size_t size() const
652         {
653             return m_ItemCounter.value();
654         }
655
656         /// Returns reference to internal statistics
657         const stat& statistics() const
658         {
659             return m_Stat;
660         }
661
662         /// Enqueues \p val value into the queue.
663         /** @anchor cds_intrusive_BasketQueue_enqueue
664             The function always returns \p true.
665         */
666         bool enqueue( value_type& val )
667         {
668             node_type * pNew = node_traits::to_node_ptr( val );
669             link_checker::is_empty( pNew );
670
671             typename gc::Guard guard;
672             back_off bkoff;
673
674             marked_ptr t;
675             while ( true ) {
676                 t = guard_node( guard, m_pTail );
677
678                 marked_ptr pNext = t->m_pNext.load(memory_model::memory_order_acquire );
679
680                 if ( pNext.ptr() == nullptr ) {
681                     pNew->m_pNext.store( marked_ptr(), memory_model::memory_order_release );
682                     if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr(pNew), memory_model::memory_order_release, memory_model::memory_order_relaxed ) ) {
683                         if ( !m_pTail.compare_exchange_strong( t, marked_ptr(pNew), memory_model::memory_order_release, memory_model::memory_order_relaxed ))
684                             m_Stat.onAdvanceTailFailed();
685                         break;
686                     }
687
688                     // Try adding to basket
689                     m_Stat.onTryAddBasket();
690
691                     // Reread tail next
692                     typename gc::Guard gNext;
693
694                 try_again:
695                     pNext = guard_node( gNext, t->m_pNext );
696
697                     // add to the basket
698                     if ( m_pTail.load(memory_model::memory_order_relaxed) == t
699                          && t->m_pNext.load( memory_model::memory_order_relaxed) == pNext
700                          && !pNext.bits()  )
701                     {
702                         bkoff();
703                         pNew->m_pNext.store( pNext, memory_model::memory_order_release );
704                         if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNew ), memory_model::memory_order_release, memory_model::memory_order_relaxed )) {
705                             m_Stat.onAddBasket();
706                             break;
707                         }
708                         goto try_again;
709                     }
710                 }
711                 else {
712                     // Tail is misplaced, advance it
713
714                     typename gc::template GuardArray<2> g;
715                     g.assign( 0, node_traits::to_value_ptr( pNext.ptr() ) );
716                     if ( t->m_pNext.load( memory_model::memory_order_relaxed ) != pNext ) {
717                         m_Stat.onEnqueueRace();
718                         bkoff();
719                         continue;
720                     }
721
722                     marked_ptr p;
723                     bool bTailOk = true;
724                     while ( (p = pNext->m_pNext.load( memory_model::memory_order_relaxed )).ptr() != nullptr )
725                     {
726                         bTailOk = m_pTail.load( memory_model::memory_order_relaxed ) == t;
727                         if ( !bTailOk )
728                             break;
729
730                         g.assign( 1, node_traits::to_value_ptr( p.ptr() ));
731                         if ( pNext->m_pNext.load(memory_model::memory_order_relaxed) != p )
732                             continue;
733                         pNext = p;
734                         g.assign( 0, g.template get<value_type>( 1 ) );
735                     }
736                     if ( !bTailOk || !m_pTail.compare_exchange_weak( t, marked_ptr( pNext.ptr() ), memory_model::memory_order_release, memory_model::memory_order_relaxed ))
737                         m_Stat.onAdvanceTailFailed();
738
739                     m_Stat.onBadTail();
740                 }
741
742                 m_Stat.onEnqueueRace();
743             }
744
745             ++m_ItemCounter;
746             m_Stat.onEnqueue();
747
748             return true;
749         }
750
751         /// Dequeues a value from the queue
752         /** @anchor cds_intrusive_BasketQueue_dequeue
753             If the queue is empty the function returns \p nullptr.
754
755             <b>Warning</b>: see MSQueue::deque note about item disposing
756         */
757         value_type * dequeue()
758         {
759             dequeue_result res;
760
761             if ( do_dequeue( res, true ))
762                 return node_traits::to_value_ptr( *res.pNext );
763             return nullptr;
764         }
765
766         /// Synonym for \ref cds_intrusive_BasketQueue_enqueue "enqueue" function
767         bool push( value_type& val )
768         {
769             return enqueue( val );
770         }
771
772         /// Synonym for \ref cds_intrusive_BasketQueue_dequeue "dequeue" function
773         value_type * pop()
774         {
775             return dequeue();
776         }
777
778         /// Checks if the queue is empty
779         /**
780             Note that this function is not \p const.
781             The function is based on \ref dequeue algorithm
782             but really does not dequeued any item.
783         */
784         bool empty()
785         {
786             dequeue_result res;
787             return !do_dequeue( res, false );
788         }
789
790         /// Clear the queue
791         /**
792             The function repeatedly calls \ref dequeue until it returns \p nullptr.
793             The disposer defined in template \p Options is called for each item
794             that can be safely disposed.
795         */
796         void clear()
797         {
798             while ( dequeue() );
799         }
800     };
801
802 }} // namespace cds::intrusive
803
804 #endif // #ifndef __CDS_INTRUSIVE_BASKET_QUEUE_H