Replace NULL with nullptr
[libcds.git] / cds / intrusive / basket_queue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_BASKET_QUEUE_H
4 #define __CDS_INTRUSIVE_BASKET_QUEUE_H
5
6 #include <type_traits>
7 #include <cds/intrusive/base.h>
8 #include <cds/details/marked_ptr.h>
9 #include <cds/intrusive/queue_stat.h>
10 #include <cds/intrusive/single_link_struct.h>
11 #include <cds/ref.h>
12 #include <cds/intrusive/details/dummy_node_holder.h>
13
14 namespace cds { namespace intrusive {
15
16     /// BasketQueue -related definitions
17     /** @ingroup cds_intrusive_helper
18     */
19     namespace basket_queue {
20         /// BasketQueue node
21         /**
22             Template parameters:
23             - GC - garbage collector used
24             - Tag - a tag used to distinguish between different implementation
25         */
26         template <class GC, typename Tag = opt::none>
27         struct node: public GC::container_node
28         {
29             typedef GC      gc  ;   ///< Garbage collector
30             typedef Tag     tag ;   ///< tag
31
32             typedef cds::details::marked_ptr<node, 1>   marked_ptr         ;   ///< marked pointer
33             typedef typename gc::template atomic_marked_ptr< marked_ptr>     atomic_marked_ptr   ;   ///< atomic marked pointer specific for GC
34
35             /// Rebind node for other template parameters
36             template <class GC2, typename Tag2 = tag>
37             struct rebind {
38                 typedef node<GC2, Tag2>  other ;    ///< Rebinding result
39             };
40
41             atomic_marked_ptr m_pNext ; ///< pointer to the next node in the container
42
43             node()
44                 : m_pNext( nullptr )
45             {}
46         };
47
48         //@cond
49         // Specialization for HRC GC
50         template <typename Tag>
51         struct node< gc::HRC, Tag>: public gc::HRC::container_node
52         {
53             typedef gc::HRC gc  ;   ///< Garbage collector
54             typedef Tag     tag ;   ///< tag
55
56             typedef cds::details::marked_ptr<node, 1>   marked_ptr         ;   ///< marked pointer
57             typedef typename gc::template atomic_marked_ptr< marked_ptr>     atomic_marked_ptr   ;   ///< atomic marked pointer specific for GC
58
59             atomic_marked_ptr m_pNext ; ///< pointer to the next node in the container
60
61             node()
62                 : m_pNext( nullptr )
63             {}
64
65         protected:
66             virtual void cleanUp( cds::gc::hrc::ThreadGC * pGC )
67             {
68                 assert( pGC != nullptr );
69                 typename gc::template GuardArray<2> aGuards( *pGC );
70
71                 while ( true ) {
72                     marked_ptr pNext = aGuards.protect( 0, m_pNext );
73                     if ( pNext.ptr() && pNext->m_bDeleted.load(CDS_ATOMIC::memory_order_acquire) ) {
74                         marked_ptr p = aGuards.protect( 1, pNext->m_pNext );
75                         m_pNext.compare_exchange_strong( pNext, p, CDS_ATOMIC::memory_order_acquire, CDS_ATOMIC::memory_order_relaxed );
76                         continue;
77                     }
78                     else {
79                         break;
80                     }
81                 }
82             }
83
84             virtual void terminate( cds::gc::hrc::ThreadGC * pGC, bool bConcurrent )
85             {
86                 if ( bConcurrent ) {
87                     marked_ptr pNext = m_pNext.load(CDS_ATOMIC::memory_order_relaxed);
88                     do {} while ( !m_pNext.compare_exchange_weak( pNext, marked_ptr(), CDS_ATOMIC::memory_order_release, CDS_ATOMIC::memory_order_relaxed ) );
89                 }
90                 else {
91                     m_pNext.store( marked_ptr(), CDS_ATOMIC::memory_order_relaxed );
92                 }
93             }
94         };
95         //@endcond
96
97         using single_link::default_hook;
98
99         //@cond
100         template < typename HookType, CDS_DECL_OPTIONS2>
101         struct hook
102         {
103             typedef typename opt::make_options< default_hook, CDS_OPTIONS2>::type  options;
104             typedef typename options::gc    gc;
105             typedef typename options::tag   tag;
106             typedef node<gc, tag> node_type;
107             typedef HookType     hook_type;
108         };
109         //@endcond
110
111
112         /// Base hook
113         /**
114             \p Options are:
115             - opt::gc - garbage collector used.
116             - opt::tag - tag
117         */
118         template < CDS_DECL_OPTIONS2 >
119         struct base_hook: public hook< opt::base_hook_tag, CDS_OPTIONS2 >
120         {};
121
122         /// Member hook
123         /**
124             \p MemberOffset defines offset in bytes of \ref node member into your structure.
125             Use \p offsetof macro to define \p MemberOffset
126
127             \p Options are:
128             - opt::gc - garbage collector used.
129             - opt::tag - tag
130         */
131         template < size_t MemberOffset, CDS_DECL_OPTIONS2 >
132         struct member_hook: public hook< opt::member_hook_tag, CDS_OPTIONS2 >
133         {
134             //@cond
135             static const size_t c_nMemberOffset = MemberOffset;
136             //@endcond
137         };
138
139         /// Traits hook
140         /**
141             \p NodeTraits defines type traits for node.
142             See \ref node_traits for \p NodeTraits interface description
143
144             \p Options are:
145             - opt::gc - garbage collector used.
146             - opt::tag - tag
147         */
148         template <typename NodeTraits, CDS_DECL_OPTIONS2 >
149         struct traits_hook: public hook< opt::traits_hook_tag, CDS_OPTIONS2 >
150         {
151             //@cond
152             typedef NodeTraits node_traits;
153             //@endcond
154         };
155
156
157 #if defined(CDS_CXX11_TEMPLATE_ALIAS_SUPPORT) && !defined(CDS_DOXYGEN_INVOKED)
158         template < typename Node, opt::link_check_type LinkType > using get_link_checker = single_link::get_link_checker< Node, LinkType >;
159 #else
160         /// Metafunction for selecting appropriate link checking policy
161         template < typename Node, opt::link_check_type LinkType >
162         struct get_link_checker: public single_link::get_link_checker< Node, LinkType >
163         {};
164
165 #endif
166
167         /// Basket queue internal statistics. May be used for debugging or profiling
168         /**
169             Basket queue statistics derives from cds::intrusive::queue_stat
170             and extends it by two additional fields specific for the algorithm.
171         */
172         template <typename Counter = cds::atomicity::event_counter >
173         struct stat: public cds::intrusive::queue_stat< Counter >
174         {
175             //@cond
176             typedef cds::intrusive::queue_stat< Counter >   base_class;
177             typedef typename base_class::counter_type       counter_type;
178             //@endcond
179
180             counter_type m_TryAddBasket      ;  ///< Count of attemps adding new item to a basket (only or BasketQueue, for other queue this metric is not used)
181             counter_type m_AddBasketCount    ;  ///< Count of events "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
182
183             /// Register an attempt t add new item to basket
184             void onTryAddBasket()           { ++m_TryAddBasket; }
185             /// Register event "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
186             void onAddBasket()              { ++m_AddBasketCount; }
187
188             //@cond
189             void reset()
190             {
191                 base_class::reset();
192                 m_TryAddBasket.reset();
193                 m_AddBasketCount.reset();
194             }
195
196             stat& operator +=( stat const& s )
197             {
198                 base_class::operator +=( s );
199                 m_TryAddBasket += s.m_TryAddBasket.get();
200                 m_AddBasketCount += s.m_AddBasketCount.get();
201                 return *this;
202             }
203             //@endcond
204         };
205
206         /// Dummy basket queue statistics - no counting is performed. Support interface like \ref stat
207         struct dummy_stat: public cds::intrusive::queue_dummy_stat
208         {
209             //@cond
210             void onTryAddBasket()           {}
211             void onAddBasket()              {}
212
213             void reset() {}
214             dummy_stat& operator +=( dummy_stat const& )
215             {
216                 return *this;
217             }
218             //@endcond
219         };
220
221     }   // namespace basket_queue
222
223     /// Basket lock-free queue (intrusive variant)
224     /** @ingroup cds_intrusive_queue
225         Implementation of basket queue algorithm.
226
227         \par Source:
228             [2007] Moshe Hoffman, Ori Shalev, Nir Shavit "The Baskets Queue"
229
230         <b>Key idea</b>
231
232         In the \93basket\94 approach, instead of
233         the traditional ordered list of nodes, the queue consists of an ordered list of groups
234         of nodes (logical baskets). The order of nodes in each basket need not be specified, and in
235         fact, it is easiest to maintain them in FIFO order. The baskets fulfill the following basic
236         rules:
237         - Each basket has a time interval in which all its nodes\92 enqueue operations overlap.
238         - The baskets are ordered by the order of their respective time intervals.
239         - For each basket, its nodes\92 dequeue operations occur after its time interval.
240         - The dequeue operations are performed according to the order of baskets.
241
242         Two properties define the FIFO order of nodes:
243         - The order of nodes in a basket is not specified.
244         - The order of nodes in different baskets is the FIFO-order of their respective baskets.
245
246         In algorithms such as the MS-queue or optimistic
247         queue, threads enqueue items by applying a Compare-and-swap (CAS) operation to the
248         queue\92s tail pointer, and all the threads that fail on a particular CAS operation (and also
249         the winner of that CAS) overlap in time. In particular, they share the time interval of
250         the CAS operation itself. Hence, all the threads that fail to CAS on the tail-node of
251         the queue may be inserted into the same basket. By integrating the basket-mechanism
252         as the back-off mechanism, the time usually spent on backing-off before trying to link
253         onto the new tail, can now be utilized to insert the failed operations into the basket,
254         allowing enqueues to complete sooner. In the meantime, the next successful CAS operations
255         by enqueues allow new baskets to be formed down the list, and these can be
256         filled concurrently. Moreover, the failed operations don\92t retry their link attempt on the
257         new tail, lowering the overall contention on it. This leads to a queue
258         algorithm that unlike all former concurrent queue algorithms requires virtually no tuning
259         of the backoff mechanisms to reduce contention, making the algorithm an attractive
260         out-of-the-box queue.
261
262         In order to enqueue, just as in MSQueue, a thread first tries to link the new node to
263         the last node. If it failed to do so, then another thread has already succeeded. Thus it
264         tries to insert the new node into the new basket that was created by the winner thread.
265         To dequeue a node, a thread first reads the head of the queue to obtain the
266         oldest basket. It may then dequeue any node in the oldest basket.
267
268         <b>Template arguments:</b>
269         - \p GC - garbage collector type: gc::HP, gc::HRC, gc::PTB
270         - \p T - type to be stored in the queue, should be convertible to \ref single_link::node
271         - \p Options - options
272
273         <b>Type of node</b>: \ref single_link::node
274
275         \p Options are:
276         - opt::hook - hook used. Possible values are: basket_queue::base_hook, basket_queue::member_hook, basket_queue::traits_hook.
277             If the option is not specified, <tt>basket_queue::base_hook<></tt> is used.
278             For Gidenstam's gc::HRC, only basket_queue::base_hook is supported.
279         - opt::back_off - back-off strategy used. If the option is not specified, the cds::backoff::empty is used.
280         - opt::disposer - the functor used for dispose removed items. Default is opt::v::empty_disposer. This option is used
281             in \ref dequeue function.
282         - opt::link_checker - the type of node's link fields checking. Default is \ref opt::debug_check_link
283             Note: for gc::HRC garbage collector, link checking policy is always selected as \ref opt::always_check_link.
284         - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter (no item counting feature)
285         - opt::stat - the type to gather internal statistics.
286             Possible option value are: \ref basket_queue::stat, \ref basket_queue::dummy_stat,
287             user-provided class that supports basket_queue::stat interface.
288             Default is \ref basket_queue::dummy_stat.
289             Generic option intrusive::queue_stat and intrusive::queue_dummy_stat are acceptable too, however,
290             they will be automatically converted to basket_queue::stat and basket_queue::dummy_stat
291             respectively.
292         - opt::alignment - the alignment for internal queue data. Default is opt::cache_line_alignment
293         - opt::memory_model - C++ memory ordering model. Can be opt::v::relaxed_ordering (relaxed memory model, the default)
294             or opt::v::sequential_consistent (sequentially consisnent memory model).
295
296         Garbage collecting schema \p GC must be consistent with the basket_queue::node GC.
297
298         \par About item disposing
299         Like MSQueue, the Baskets queue algo has a key feature: even if the queue is empty it contains one item that is "dummy" one from
300         the standpoint of the algo. See \ref dequeue function doc for explanation.
301
302         \par Examples
303         \code
304         #include <cds/intrusive/basket_queue.h>
305         #include <cds/gc/hp.h>
306
307         namespace ci = cds::inrtusive;
308         typedef cds::gc::HP hp_gc;
309
310         // Basket queue with Hazard Pointer garbage collector, base hook + item disposer:
311         struct Foo: public ci::basket_queue::node< hp_gc >
312         {
313             // Your data
314             ...
315         };
316
317         // Disposer for Foo struct just deletes the object passed in
318         struct fooDisposer {
319             void operator()( Foo * p )
320             {
321                 delete p;
322             }
323         };
324
325         typedef ci::BasketQueue< hp_gc,
326             Foo
327             ,ci::opt::hook<
328                 ci::basket_queue::base_hook< ci::opt::gc<hp_gc> >
329             >
330             ,ci::opt::disposer< fooDisposer >
331         > fooQueue;
332
333         // BasketQueue with Hazard Pointer garbage collector,
334         // member hook + item disposer + item counter,
335         // without alignment of internal queue data:
336         struct Bar
337         {
338             // Your data
339             ...
340             ci::basket_queue::node< hp_gc > hMember;
341         };
342
343         typedef ci::BasketQueue< hp_gc,
344             Foo
345             ,ci::opt::hook<
346                 ci::basket_queue::member_hook<
347                     offsetof(Bar, hMember)
348                     ,ci::opt::gc<hp_gc>
349                 >
350             >
351             ,ci::opt::disposer< fooDisposer >
352             ,cds::opt::item_counter< cds::atomicity::item_counter >
353             ,cds::opt::alignment< cds::opt::no_special_alignment >
354         > barQueue;
355         \endcode
356     */
357     template <typename GC, typename T, CDS_DECL_OPTIONS9>
358     class BasketQueue
359     {
360         //@cond
361         struct default_options
362         {
363             typedef cds::backoff::empty             back_off;
364             typedef basket_queue::base_hook<>       hook;
365             typedef opt::v::empty_disposer          disposer;
366             typedef atomicity::empty_item_counter   item_counter;
367             typedef basket_queue::dummy_stat        stat;
368             typedef opt::v::relaxed_ordering        memory_model;
369             static const opt::link_check_type link_checker = opt::debug_check_link;
370             enum { alignment = opt::cache_line_alignment };
371         };
372         //@endcond
373
374     public:
375         //@cond
376         typedef typename opt::make_options<
377             typename cds::opt::find_type_traits< default_options, CDS_OPTIONS9 >::type
378             ,CDS_OPTIONS9
379         >::type   options;
380
381         typedef typename std::conditional<
382             std::is_same<typename options::stat, cds::intrusive::queue_stat<> >::value
383             ,basket_queue::stat<>
384             ,typename std::conditional<
385                 std::is_same<typename options::stat, cds::intrusive::queue_dummy_stat>::value
386                 ,basket_queue::dummy_stat
387                 ,typename options::stat
388             >::type
389         >::type stat_type_;
390
391         //@endcond
392
393     public:
394         typedef T  value_type   ;   ///< type of value stored in the queue
395         typedef typename options::hook      hook        ;   ///< hook type
396         typedef typename hook::node_type    node_type   ;   ///< node type
397         typedef typename options::disposer  disposer    ;   ///< disposer used
398         typedef typename get_node_traits< value_type, node_type, hook>::type node_traits ;    ///< node traits
399         typedef typename basket_queue::get_link_checker< node_type, options::link_checker >::type link_checker   ;   ///< link checker
400
401         typedef GC gc          ;   ///< Garbage collector
402         typedef typename options::back_off  back_off    ;   ///< back-off strategy
403         typedef typename options::item_counter item_counter ;   ///< Item counting policy used
404 #ifdef CDS_DOXYGEN_INVOKED
405         typedef typename options::stat      stat        ;   ///< Internal statistics policy used
406 #else
407         typedef stat_type_  stat;
408 #endif
409         typedef typename options::memory_model  memory_model ;   ///< Memory ordering. See cds::opt::memory_model option
410
411         /// Rebind template arguments
412         template <typename GC2, typename T2, CDS_DECL_OTHER_OPTIONS9>
413         struct rebind {
414             typedef BasketQueue< GC2, T2, CDS_OTHER_OPTIONS9> other   ;   ///< Rebinding result
415         };
416
417         static const size_t m_nHazardPtrCount = 6 ; ///< Count of hazard pointer required for the algorithm
418
419     protected:
420         //@cond
421
422         struct internal_disposer
423         {
424             void operator()( value_type * p )
425             {
426                 assert( p != nullptr );
427
428                 BasketQueue::clear_links( node_traits::to_node_ptr(p) );
429                 disposer()( p );
430             }
431         };
432
433         typedef typename node_type::marked_ptr   marked_ptr;
434         typedef typename node_type::atomic_marked_ptr atomic_marked_ptr;
435
436         typedef intrusive::node_to_value<BasketQueue> node_to_value;
437         typedef typename opt::details::alignment_setter< atomic_marked_ptr, options::alignment >::type aligned_node_ptr;
438         typedef typename opt::details::alignment_setter<
439             cds::intrusive::details::dummy_node< gc, node_type>,
440             options::alignment
441         >::type    dummy_node_type;
442
443         //@endcond
444
445         aligned_node_ptr    m_pHead ;           ///< Queue's head pointer (aligned)
446         aligned_node_ptr    m_pTail ;           ///< Queue's tail pointer (aligned)
447
448         dummy_node_type     m_Dummy ;           ///< dummy node
449         item_counter        m_ItemCounter   ;   ///< Item counter
450         stat                m_Stat  ;           ///< Internal statistics
451         //@cond
452         size_t const        m_nMaxHops;
453         //@endcond
454
455         //@cond
456
457         template <size_t Count>
458         static marked_ptr guard_node( typename gc::template GuardArray<Count>& g, size_t idx, atomic_marked_ptr const& p )
459         {
460             marked_ptr pg;
461             while ( true ) {
462                 pg = p.load( memory_model::memory_order_relaxed );
463                 g.assign( idx, node_traits::to_value_ptr( pg.ptr() ) );
464                 if ( p.load( memory_model::memory_order_acquire) == pg ) {
465                     return pg;
466                 }
467             }
468         }
469
470         static marked_ptr guard_node( typename gc::Guard& g, atomic_marked_ptr const& p )
471         {
472             marked_ptr pg;
473             while ( true ) {
474                 pg = p.load( memory_model::memory_order_relaxed );
475                 g.assign( node_traits::to_value_ptr( pg.ptr() ) );
476                 if ( p.load( memory_model::memory_order_acquire) == pg ) {
477                     return pg;
478                 }
479             }
480         }
481
482         struct dequeue_result {
483             typename gc::template GuardArray<3>  guards;
484             node_type * pNext;
485         };
486
487         bool do_dequeue( dequeue_result& res, bool bDeque )
488         {
489             // Note:
490             // If bDeque == false then the function is called from empty method and no real dequeuing operation is performed
491
492             back_off bkoff;
493
494             marked_ptr h;
495             marked_ptr t;
496             marked_ptr pNext;
497
498             while ( true ) {
499                 h = guard_node( res.guards, 0, m_pHead );
500                 t = guard_node( res.guards, 1, m_pTail );
501                 pNext = guard_node( res.guards, 2, h->m_pNext );
502
503                 if ( h == m_pHead.load( memory_model::memory_order_acquire ) ) {
504                     if ( h.ptr() == t.ptr() ) {
505                         if ( !pNext.ptr() )
506                             return false;
507
508                         {
509                             typename gc::Guard g;
510                             while ( pNext->m_pNext.load(memory_model::memory_order_relaxed).ptr() && m_pTail.load(memory_model::memory_order_relaxed) == t ) {
511                                 pNext = guard_node( g, pNext->m_pNext );
512                                 res.guards.assign( 2, g.template get<value_type>() );
513                             }
514                         }
515
516                         m_pTail.compare_exchange_weak( t, marked_ptr(pNext.ptr()), memory_model::memory_order_release, memory_model::memory_order_relaxed );
517                     }
518                     else {
519                         marked_ptr iter( h );
520                         size_t hops = 0;
521
522                         typename gc::Guard g;
523
524                         while ( pNext.ptr() && pNext.bits() && iter.ptr() != t.ptr() && m_pHead.load(memory_model::memory_order_relaxed) == h ) {
525                             iter = pNext;
526                             g.assign( res.guards.template get<value_type>(2) );
527                             pNext = guard_node( res.guards, 2, pNext->m_pNext );
528                             ++hops;
529                         }
530
531                         if ( m_pHead.load(memory_model::memory_order_relaxed) != h )
532                             continue;
533
534                         if ( iter.ptr() == t.ptr() )
535                             free_chain( h, iter );
536                         else if ( bDeque ) {
537                             res.pNext = pNext.ptr();
538
539                             if ( iter->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNext.ptr(), 1 ), memory_model::memory_order_release, memory_model::memory_order_relaxed ) ) {
540                                 if ( hops >= m_nMaxHops )
541                                     free_chain( h, pNext );
542                                 break;
543                             }
544                         }
545                         else
546                             return true;
547                     }
548                 }
549
550                 if ( bDeque )
551                     m_Stat.onDequeueRace();
552                 bkoff();
553             }
554
555             if ( bDeque ) {
556                 --m_ItemCounter;
557                 m_Stat.onDequeue();
558             }
559
560             return true;
561         }
562
563         void free_chain( marked_ptr head, marked_ptr newHead )
564         {
565             // "head" and "newHead" are guarded
566
567             if ( m_pHead.compare_exchange_strong( head, marked_ptr(newHead.ptr()), memory_model::memory_order_release, memory_model::memory_order_relaxed ))
568             {
569                 typename gc::template GuardArray<2> guards;
570                 guards.assign( 0, node_traits::to_value_ptr(head.ptr()) );
571                 while ( head.ptr() != newHead.ptr() ) {
572                     marked_ptr pNext = guard_node( guards, 1, head->m_pNext );
573                     assert( pNext.bits() != 0 );
574                     dispose_node( head.ptr() );
575                     guards.assign( 0, guards.template get<value_type>(1) );
576                     head = pNext;
577                 }
578             }
579         }
580
581         static void clear_links( node_type * pNode )
582         {
583             pNode->m_pNext.store( marked_ptr( nullptr ), memory_model::memory_order_release );
584         }
585
586         void dispose_node( node_type * p )
587         {
588             if ( p != m_Dummy.get() ) {
589                 gc::template retire<internal_disposer>( node_traits::to_value_ptr(p) );
590             }
591             else
592                 m_Dummy.retire();
593         }
594         //@endcond
595
596     public:
597         /// Initializes empty queue
598         BasketQueue()
599             : m_pHead( nullptr )
600             , m_pTail( nullptr )
601             , m_nMaxHops( 3 )
602         {
603             // GC and node_type::gc must be the same
604             static_assert(( std::is_same<gc, typename node_type::gc>::value ), "GC and node_type::gc must be the same");
605
606             // For cds::gc::HRC, only one base_hook is allowed
607             static_assert((
608                 std::conditional<
609                     std::is_same<gc, cds::gc::HRC>::value,
610                     std::is_same< typename hook::hook_type, opt::base_hook_tag >,
611                     boost::true_type
612                 >::type::value
613             ), "For cds::gc::HRC, only base_hook is allowed");
614
615             // Head/tail initialization should be made via store call
616             // because of gc::HRC manages reference counting
617             m_pHead.store( marked_ptr(m_Dummy.get()), memory_model::memory_order_relaxed );
618             m_pTail.store( marked_ptr(m_Dummy.get()), memory_model::memory_order_relaxed );
619         }
620
621         /// Destructor clears the queue
622         /**
623             Since the baskets queue contains at least one item even
624             if the queue is empty, the destructor may call item disposer.
625         */
626         ~BasketQueue()
627         {
628             clear();
629
630             node_type * pHead = m_pHead.load(memory_model::memory_order_relaxed).ptr();
631             assert( pHead != nullptr );
632
633             {
634                 node_type * pNext = pHead->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
635                 while ( pNext ) {
636                     node_type * p = pNext;
637                     pNext = pNext->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
638                     p->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
639                     dispose_node( p );
640                 }
641                 pHead->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
642                 //m_pTail.store( marked_ptr( pHead ), memory_model::memory_order_relaxed );
643             }
644
645             m_pHead.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
646             m_pTail.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
647
648             dispose_node( pHead );
649         }
650
651         /// Returns queue's item count
652         /**
653             The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
654             this function always returns 0.
655
656             <b>Warning</b>: even if you use real item counter and it returns 0, this fact is not mean that the queue
657             is empty. To check queue emptyness use \ref empty() method.
658         */
659         size_t size() const
660         {
661             return m_ItemCounter.value();
662         }
663
664         /// Returns reference to internal statistics
665         const stat& statistics() const
666         {
667             return m_Stat;
668         }
669
670         /// Enqueues \p val value into the queue.
671         /** @anchor cds_intrusive_BasketQueue_enqueue
672             The function always returns \p true.
673         */
674         bool enqueue( value_type& val )
675         {
676             node_type * pNew = node_traits::to_node_ptr( val );
677             link_checker::is_empty( pNew );
678
679             typename gc::Guard guard;
680             back_off bkoff;
681
682             marked_ptr t;
683             while ( true ) {
684                 t = guard_node( guard, m_pTail );
685
686                 marked_ptr pNext = t->m_pNext.load(memory_model::memory_order_acquire );
687
688                 if ( pNext.ptr() == nullptr ) {
689                     pNew->m_pNext.store( marked_ptr(), memory_model::memory_order_release );
690                     if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr(pNew), memory_model::memory_order_release, memory_model::memory_order_relaxed ) ) {
691                         if ( !m_pTail.compare_exchange_strong( t, marked_ptr(pNew), memory_model::memory_order_release, memory_model::memory_order_relaxed ))
692                             m_Stat.onAdvanceTailFailed();
693                         break;
694                     }
695
696                     // Try adding to basket
697                     m_Stat.onTryAddBasket();
698
699                     // Reread tail next
700                     typename gc::Guard gNext;
701
702                 try_again:
703                     pNext = guard_node( gNext, t->m_pNext );
704
705                     // add to the basket
706                     if ( m_pTail.load(memory_model::memory_order_relaxed) == t
707                          && t->m_pNext.load( memory_model::memory_order_relaxed) == pNext
708                          && !pNext.bits()  )
709                     {
710                         bkoff();
711                         pNew->m_pNext.store( pNext, memory_model::memory_order_release );
712                         if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNew ), memory_model::memory_order_release, memory_model::memory_order_relaxed )) {
713                             m_Stat.onAddBasket();
714                             break;
715                         }
716                         goto try_again;
717                     }
718                 }
719                 else {
720                     // Tail is misplaced, advance it
721
722                     typename gc::template GuardArray<2> g;
723                     g.assign( 0, node_traits::to_value_ptr( pNext.ptr() ) );
724                     if ( t->m_pNext.load( memory_model::memory_order_relaxed ) != pNext ) {
725                         m_Stat.onEnqueueRace();
726                         bkoff();
727                         continue;
728                     }
729
730                     marked_ptr p;
731                     bool bTailOk = true;
732                     while ( (p = pNext->m_pNext.load( memory_model::memory_order_relaxed )).ptr() != nullptr )
733                     {
734                         bTailOk = m_pTail.load( memory_model::memory_order_relaxed ) == t;
735                         if ( !bTailOk )
736                             break;
737
738                         g.assign( 1, node_traits::to_value_ptr( p.ptr() ));
739                         if ( pNext->m_pNext.load(memory_model::memory_order_relaxed) != p )
740                             continue;
741                         pNext = p;
742                         g.assign( 0, g.template get<value_type>( 1 ) );
743                     }
744                     if ( !bTailOk || !m_pTail.compare_exchange_weak( t, marked_ptr( pNext.ptr() ), memory_model::memory_order_release, memory_model::memory_order_relaxed ))
745                         m_Stat.onAdvanceTailFailed();
746
747                     m_Stat.onBadTail();
748                 }
749
750                 m_Stat.onEnqueueRace();
751             }
752
753             ++m_ItemCounter;
754             m_Stat.onEnqueue();
755
756             return true;
757         }
758
759         /// Dequeues a value from the queue
760         /** @anchor cds_intrusive_BasketQueue_dequeue
761             If the queue is empty the function returns \p nullptr.
762
763             <b>Warning</b>: see MSQueue::deque note about item disposing
764         */
765         value_type * dequeue()
766         {
767             dequeue_result res;
768
769             if ( do_dequeue( res, true ))
770                 return node_traits::to_value_ptr( *res.pNext );
771             return nullptr;
772         }
773
774         /// Synonym for \ref cds_intrusive_BasketQueue_enqueue "enqueue" function
775         bool push( value_type& val )
776         {
777             return enqueue( val );
778         }
779
780         /// Synonym for \ref cds_intrusive_BasketQueue_dequeue "dequeue" function
781         value_type * pop()
782         {
783             return dequeue();
784         }
785
786         /// Checks if the queue is empty
787         /**
788             Note that this function is not \p const.
789             The function is based on \ref dequeue algorithm
790             but really does not dequeued any item.
791         */
792         bool empty()
793         {
794             dequeue_result res;
795             return !do_dequeue( res, false );
796         }
797
798         /// Clear the queue
799         /**
800             The function repeatedly calls \ref dequeue until it returns \p nullptr.
801             The disposer defined in template \p Options is called for each item
802             that can be safely disposed.
803         */
804         void clear()
805         {
806             while ( dequeue() );
807         }
808     };
809
810 }} // namespace cds::intrusive
811
812 #endif // #ifndef __CDS_INTRUSIVE_BASKET_QUEUE_H