Remove trailing spaces
[libcds.git] / cds / intrusive / basket_queue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_BASKET_QUEUE_H
4 #define __CDS_INTRUSIVE_BASKET_QUEUE_H
5
6 #include <type_traits>
7 #include <cds/intrusive/details/single_link_struct.h>
8 #include <cds/details/marked_ptr.h>
9
10 namespace cds { namespace intrusive {
11
12     /// BasketQueue -related definitions
13     /** @ingroup cds_intrusive_helper
14     */
15     namespace basket_queue {
16         /// BasketQueue node
17         /**
18             Template parameters:
19             Template parameters:
20             - GC - garbage collector used
21             - Tag - a \ref cds_intrusive_hook_tag "tag"
22             */
23         template <class GC, typename Tag = opt::none>
24         struct node
25         {
26             typedef GC      gc  ;   ///< Garbage collector
27             typedef Tag     tag ;   ///< tag
28
29             typedef cds::details::marked_ptr<node, 1>                    marked_ptr;        ///< marked pointer
30             typedef typename gc::template atomic_marked_ptr< marked_ptr> atomic_marked_ptr; ///< atomic marked pointer specific for GC
31
32             /// Rebind node for other template parameters
33             template <class GC2, typename Tag2 = tag>
34             struct rebind {
35                 typedef node<GC2, Tag2>  other ;    ///< Rebinding result
36             };
37
38             atomic_marked_ptr m_pNext ; ///< pointer to the next node in the container
39
40             node()
41                 : m_pNext( nullptr )
42             {}
43         };
44
45         using cds::intrusive::single_link::default_hook;
46
47         //@cond
48         template < typename HookType, typename... Options>
49         struct hook
50         {
51             typedef typename opt::make_options< default_hook, Options...>::type  options;
52             typedef typename options::gc    gc;
53             typedef typename options::tag   tag;
54             typedef node<gc, tag> node_type;
55             typedef HookType     hook_type;
56         };
57         //@endcond
58
59
60         /// Base hook
61         /**
62             \p Options are:
63             - opt::gc - garbage collector used.
64             - opt::tag - a \ref cds_intrusive_hook_tag "tag"
65         */
66         template < typename... Options >
67         struct base_hook: public hook< opt::base_hook_tag, Options... >
68         {};
69
70         /// Member hook
71         /**
72             \p MemberOffset defines offset in bytes of \ref node member into your structure.
73             Use \p offsetof macro to define \p MemberOffset
74
75             \p Options are:
76             - opt::gc - garbage collector used.
77             - opt::tag - a \ref cds_intrusive_hook_tag "tag"
78         */
79         template < size_t MemberOffset, typename... Options >
80         struct member_hook: public hook< opt::member_hook_tag, Options... >
81         {
82             //@cond
83             static const size_t c_nMemberOffset = MemberOffset;
84             //@endcond
85         };
86
87         /// Traits hook
88         /**
89             \p NodeTraits defines type traits for node.
90             See \ref node_traits for \p NodeTraits interface description
91
92             \p Options are:
93             - opt::gc - garbage collector used.
94             - opt::tag - a \ref cds_intrusive_hook_tag "tag"
95         */
96         template <typename NodeTraits, typename... Options >
97         struct traits_hook: public hook< opt::traits_hook_tag, Options... >
98         {
99             //@cond
100             typedef NodeTraits node_traits;
101             //@endcond
102         };
103
104         /// BasketQueue internal statistics. May be used for debugging or profiling
105         /**
106             Template argument \p Counter defines type of counter.
107             Default is \p cds::atomicity::event_counter, that is weak, i.e. it is not guaranteed
108             strict event counting.
109             You may use stronger type of counter like as \p cds::atomicity::item_counter,
110             or even integral type, for example, \p int.
111         */
112         template <typename Counter = cds::atomicity::event_counter >
113         struct stat
114         {
115             typedef Counter counter_type;   ///< Counter type
116
117             counter_type m_EnqueueCount;    ///< Enqueue call count
118             counter_type m_DequeueCount;    ///< Dequeue call count
119             counter_type m_EnqueueRace;     ///< Count of enqueue race conditions encountered
120             counter_type m_DequeueRace;     ///< Count of dequeue race conditions encountered
121             counter_type m_AdvanceTailError;///< Count of "advance tail failed" events
122             counter_type m_BadTail;         ///< Count of events "Tail is not pointed to the last item in the queue"
123             counter_type m_TryAddBasket;    ///< Count of attemps adding new item to a basket (only or BasketQueue, for other queue this metric is not used)
124             counter_type m_AddBasketCount;  ///< Count of events "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
125
126             /// Register enqueue call
127             void onEnqueue() { ++m_EnqueueCount; }
128             /// Register dequeue call
129             void onDequeue() { ++m_DequeueCount; }
130             /// Register enqueue race event
131             void onEnqueueRace() { ++m_EnqueueRace; }
132             /// Register dequeue race event
133             void onDequeueRace() { ++m_DequeueRace; }
134             /// Register "advance tail failed" event
135             void onAdvanceTailFailed() { ++m_AdvanceTailError; }
136             /// Register event "Tail is not pointed to last item in the queue"
137             void onBadTail() { ++m_BadTail; }
138             /// Register an attempt t add new item to basket
139             void onTryAddBasket()           { ++m_TryAddBasket; }
140             /// Register event "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
141             void onAddBasket()              { ++m_AddBasketCount; }
142
143             //@cond
144             void reset()
145             {
146                 m_EnqueueCount.reset();
147                 m_DequeueCount.reset();
148                 m_EnqueueRace.reset();
149                 m_DequeueRace.reset();
150                 m_AdvanceTailError.reset();
151                 m_BadTail.reset();
152                 m_TryAddBasket.reset();
153                 m_AddBasketCount.reset();
154             }
155
156             stat& operator +=( stat const& s )
157             {
158                 m_EnqueueCount  += s.m_EnqueueCount.get();
159                 m_DequeueCount  += s.m_DequeueCount.get();
160                 m_EnqueueRace   += s.m_EnqueueRace.get();
161                 m_DequeueRace   += s.m_DequeueRace.get();
162                 m_AdvanceTailError += s.m_AdvanceTailError.get();
163                 m_BadTail       += s.m_BadTail.get();
164                 m_TryAddBasket  += s.m_TryAddBasket.get();
165                 m_AddBasketCount += s.m_AddBasketCount.get();
166                 return *this;
167             }
168             //@endcond
169         };
170
171         /// Dummy BasketQueue statistics - no counting is performed, no overhead. Support interface like \p basket_queue::stat
172         struct empty_stat
173         {
174             //@cond
175             void onEnqueue()            {}
176             void onDequeue()            {}
177             void onEnqueueRace()        {}
178             void onDequeueRace()        {}
179             void onAdvanceTailFailed()  {}
180             void onBadTail()            {}
181             void onTryAddBasket()       {}
182             void onAddBasket()          {}
183
184             void reset() {}
185             empty_stat& operator +=( empty_stat const& )
186             {
187                 return *this;
188             }
189             //@endcond
190         };
191
192         /// BasketQueue default type traits
193         struct traits
194         {
195             /// Back-off strategy
196             typedef cds::backoff::empty             back_off;
197
198             /// Hook, possible types are \p basket_queue::base_hook, \p basket_queue::member_hook, \p basket_queue::traits_hook
199             typedef basket_queue::base_hook<>       hook;
200
201             /// The functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used for dequeuing
202             typedef opt::v::empty_disposer          disposer;
203
204             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
205             typedef atomicity::empty_item_counter   item_counter;
206
207             /// Internal statistics (by default, disabled)
208             /**
209                 Possible option value are: \p basket_queue::stat, \p basket_queue::empty_stat (the default),
210                 user-provided class that supports \p %basket_queue::stat interface.
211             */
212             typedef basket_queue::empty_stat        stat;
213
214             /// C++ memory ordering model
215             /**
216                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
217                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
218             */
219             typedef opt::v::relaxed_ordering        memory_model;
220
221             /// Link checking, see \p cds::opt::link_checker
222             static CDS_CONSTEXPR const opt::link_check_type link_checker = opt::debug_check_link;
223
224             /// Alignment for internal queue data. Default is \p opt::cache_line_alignment
225             enum { alignment = opt::cache_line_alignment };
226         };
227
228
229         /// Metafunction converting option list to \p basket_queue::traits
230         /**
231             Supported \p Options are:
232
233             - opt::hook - hook used. Possible hooks are: \p basket_queue::base_hook, \p basket_queue::member_hook, \p basket_queue::traits_hook.
234                 If the option is not specified, \p %basket_queue::base_hook<> is used.
235             - opt::back_off - back-off strategy used, default is \p cds::backoff::empty.
236             - opt::disposer - the functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used
237                 when dequeuing.
238             - opt::link_checker - the type of node's link fields checking. Default is \p opt::debug_check_link
239             - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
240                 To enable item counting use \p cds::atomicity::item_counter
241             - opt::stat - the type to gather internal statistics.
242                 Possible statistics types are: \p basket_queue::stat, \p basket_queue::empty_stat, user-provided class that supports \p %basket_queue::stat interface.
243                 Default is \p %basket_queue::empty_stat (internal statistics disabled).
244             - opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment
245             - opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
246                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
247
248             Example: declare \p %BasketQueue with item counting and internal statistics
249             \code
250             typedef cds::intrusive::BasketQueue< cds::gc::HP, Foo,
251                 typename cds::intrusive::basket_queue::make_traits<
252                     cds::intrusive::opt:hook< cds::intrusive::basket_queue::base_hook< cds::opt::gc<cds:gc::HP> >>,
253                     cds::opt::item_counte< cds::atomicity::item_counter >,
254                     cds::opt::stat< cds::intrusive::basket_queue::stat<> >
255                 >::type
256             > myQueue;
257             \endcode
258         */
259         template <typename... Options>
260         struct make_traits {
261 #   ifdef CDS_DOXYGEN_INVOKED
262             typedef implementation_defined type;   ///< Metafunction result
263 #   else
264             typedef typename cds::opt::make_options<
265                 typename cds::opt::find_type_traits< traits, Options... >::type
266                 , Options...
267             >::type type;
268 #   endif
269         };
270     }   // namespace basket_queue
271
272     /// Basket lock-free queue (intrusive variant)
273     /** @ingroup cds_intrusive_queue
274         Implementation of basket queue algorithm.
275
276         \par Source:
277             [2007] Moshe Hoffman, Ori Shalev, Nir Shavit "The Baskets Queue"
278
279         <b>Key idea</b>
280
281         In the \93basket\94 approach, instead of
282         the traditional ordered list of nodes, the queue consists of an ordered list of groups
283         of nodes (logical baskets). The order of nodes in each basket need not be specified, and in
284         fact, it is easiest to maintain them in FIFO order. The baskets fulfill the following basic
285         rules:
286         - Each basket has a time interval in which all its nodes\92 enqueue operations overlap.
287         - The baskets are ordered by the order of their respective time intervals.
288         - For each basket, its nodes\92 dequeue operations occur after its time interval.
289         - The dequeue operations are performed according to the order of baskets.
290
291         Two properties define the FIFO order of nodes:
292         - The order of nodes in a basket is not specified.
293         - The order of nodes in different baskets is the FIFO-order of their respective baskets.
294
295         In algorithms such as the MS-queue or optimistic
296         queue, threads enqueue items by applying a Compare-and-swap (CAS) operation to the
297         queue\92s tail pointer, and all the threads that fail on a particular CAS operation (and also
298         the winner of that CAS) overlap in time. In particular, they share the time interval of
299         the CAS operation itself. Hence, all the threads that fail to CAS on the tail-node of
300         the queue may be inserted into the same basket. By integrating the basket-mechanism
301         as the back-off mechanism, the time usually spent on backing-off before trying to link
302         onto the new tail, can now be utilized to insert the failed operations into the basket,
303         allowing enqueues to complete sooner. In the meantime, the next successful CAS operations
304         by enqueues allow new baskets to be formed down the list, and these can be
305         filled concurrently. Moreover, the failed operations don\92t retry their link attempt on the
306         new tail, lowering the overall contention on it. This leads to a queue
307         algorithm that unlike all former concurrent queue algorithms requires virtually no tuning
308         of the backoff mechanisms to reduce contention, making the algorithm an attractive
309         out-of-the-box queue.
310
311         In order to enqueue, just as in MSQueue, a thread first tries to link the new node to
312         the last node. If it failed to do so, then another thread has already succeeded. Thus it
313         tries to insert the new node into the new basket that was created by the winner thread.
314         To dequeue a node, a thread first reads the head of the queue to obtain the
315         oldest basket. It may then dequeue any node in the oldest basket.
316
317         <b>Template arguments:</b>
318         - \p GC - garbage collector type: \p gc::HP, \p gc::DHP
319         - \p T - type of value to be stored in the queue
320         - \p Traits - queue traits, default is \p basket_queue::traits. You can use \p basket_queue::make_traits
321             metafunction to make your traits or just derive your traits from \p %basket_queue::traits:
322             \code
323             struct myTraits: public cds::intrusive::basket_queue::traits {
324                 typedef cds::intrusive::basket_queue::stat<> stat;
325                 typedef cds::atomicity::item_counter    item_counter;
326             };
327             typedef cds::intrusive::BasketQueue< cds::gc::HP, Foo, myTraits > myQueue;
328
329             // Equivalent make_traits example:
330             typedef cds::intrusive::BasketQueue< cds::gc::HP, Foo,
331                 typename cds::intrusive::basket_queue::make_traits<
332                     cds::opt::stat< cds::intrusive::basket_queue::stat<> >,
333                     cds::opt::item_counter< cds::atomicity::item_counter >
334                 >::type
335             > myQueue;
336             \endcode
337
338         Garbage collecting schema \p GC must be consistent with the \p basket_queue::node GC.
339
340         \par About item disposing
341         Like \p MSQueue, the Baskets queue algo has a key feature: even if the queue is empty it contains one item that is "dummy" one from
342         the standpoint of the algo. See \p dequeue() function doc for explanation.
343
344         \par Examples
345         \code
346         #include <cds/intrusive/basket_queue.h>
347         #include <cds/gc/hp.h>
348
349         namespace ci = cds::inrtusive;
350         typedef cds::gc::HP hp_gc;
351
352         // Basket queue with Hazard Pointer garbage collector, base hook + item disposer:
353         struct Foo: public ci::basket_queue::node< hp_gc >
354         {
355             // Your data
356             ...
357         };
358
359         // Disposer for Foo struct just deletes the object passed in
360         struct fooDisposer {
361             void operator()( Foo * p )
362             {
363                 delete p;
364             }
365         };
366
367         struct fooTraits: public ci::basket_queue::traits {
368             typedef ci::basket_queue::base_hook< ci::opt::gc<hp_gc> > hook;
369             typedef fooDisposer disposer;
370         };
371         typedef ci::BasketQueue< hp_gc, Foo, fooTraits > fooQueue;
372
373         // BasketQueue with Hazard Pointer garbage collector,
374         // member hook + item disposer + item counter,
375         // without alignment of internal queue data:
376         struct Bar
377         {
378             // Your data
379             ...
380             ci::basket_queue::node< hp_gc > hMember;
381         };
382
383         struct barTraits: public
384             ci::basket_queue::make_traits<
385                 ci::opt::hook<
386                     ci::basket_queue::member_hook<
387                         offsetof(Bar, hMember)
388                         ,ci::opt::gc<hp_gc>
389                     >
390                 >
391                 ,ci::opt::disposer< fooDisposer >
392                 ,cds::opt::item_counter< cds::atomicity::item_counter >
393                 ,cds::opt::alignment< cds::opt::no_special_alignment >
394             >::type
395         {};
396         typedef ci::BasketQueue< hp_gc, Bar, barTraits > barQueue;
397         \endcode
398     */
399     template <typename GC, typename T, typename Traits = basket_queue::traits >
400     class BasketQueue
401     {
402     public:
403         typedef GC gc;          ///< Garbage collector
404         typedef T  value_type;  ///< type of value stored in the queue
405         typedef Traits traits;  ///< Queue traits
406         typedef typename traits::hook       hook;       ///< hook type
407         typedef typename hook::node_type    node_type;  ///< node type
408         typedef typename traits::disposer   disposer;   ///< disposer used
409         typedef typename get_node_traits< value_type, node_type, hook>::type node_traits;   ///< node traits
410         typedef typename single_link::get_link_checker< node_type, traits::link_checker >::type link_checker;   ///< link checker
411
412         typedef typename traits::back_off       back_off;     ///< back-off strategy
413         typedef typename traits::item_counter   item_counter; ///< Item counting policy used
414         typedef typename traits::stat           stat;         ///< Internal statistics policy used
415         typedef typename traits::memory_model   memory_model; ///< Memory ordering. See cds::opt::memory_model option
416
417         /// Rebind template arguments
418         template <typename GC2, typename T2, typename Traits2>
419         struct rebind {
420             typedef BasketQueue< GC2, T2, Traits2> other   ;   ///< Rebinding result
421         };
422
423         static CDS_CONSTEXPR const size_t m_nHazardPtrCount = 6 ; ///< Count of hazard pointer required for the algorithm
424
425     protected:
426         //@cond
427         typedef typename node_type::marked_ptr   marked_ptr;
428         typedef typename node_type::atomic_marked_ptr atomic_marked_ptr;
429
430         typedef intrusive::node_to_value<BasketQueue> node_to_value;
431         typedef typename opt::details::alignment_setter< atomic_marked_ptr, traits::alignment >::type aligned_node_ptr;
432         typedef typename opt::details::alignment_setter< node_type, traits::alignment >::type dummy_node_type;
433
434         // GC and node_type::gc must be the same
435         static_assert( std::is_same<gc, typename node_type::gc>::value, "GC and node_type::gc must be the same");
436         //@endcond
437
438         aligned_node_ptr    m_pHead ;           ///< Queue's head pointer (aligned)
439         aligned_node_ptr    m_pTail ;           ///< Queue's tail pointer (aligned)
440         dummy_node_type     m_Dummy ;           ///< dummy node
441         item_counter        m_ItemCounter   ;   ///< Item counter
442         stat                m_Stat  ;           ///< Internal statistics
443         //@cond
444         size_t const        m_nMaxHops;
445         //@endcond
446
447         //@cond
448
449         template <size_t Count>
450         static marked_ptr guard_node( typename gc::template GuardArray<Count>& g, size_t idx, atomic_marked_ptr const& p )
451         {
452             marked_ptr pg;
453             while ( true ) {
454                 pg = p.load( memory_model::memory_order_relaxed );
455                 g.assign( idx, node_traits::to_value_ptr( pg.ptr() ) );
456                 if ( p.load( memory_model::memory_order_acquire) == pg ) {
457                     return pg;
458                 }
459             }
460         }
461
462         static marked_ptr guard_node( typename gc::Guard& g, atomic_marked_ptr const& p )
463         {
464             marked_ptr pg;
465             while ( true ) {
466                 pg = p.load( memory_model::memory_order_relaxed );
467                 g.assign( node_traits::to_value_ptr( pg.ptr() ) );
468                 if ( p.load( memory_model::memory_order_acquire) == pg ) {
469                     return pg;
470                 }
471             }
472         }
473
474         struct dequeue_result {
475             typename gc::template GuardArray<3>  guards;
476             node_type * pNext;
477         };
478
479         bool do_dequeue( dequeue_result& res, bool bDeque )
480         {
481             // Note:
482             // If bDeque == false then the function is called from empty method and no real dequeuing operation is performed
483
484             back_off bkoff;
485
486             marked_ptr h;
487             marked_ptr t;
488             marked_ptr pNext;
489
490             while ( true ) {
491                 h = guard_node( res.guards, 0, m_pHead );
492                 t = guard_node( res.guards, 1, m_pTail );
493                 pNext = guard_node( res.guards, 2, h->m_pNext );
494
495                 if ( h == m_pHead.load( memory_model::memory_order_acquire ) ) {
496                     if ( h.ptr() == t.ptr() ) {
497                         if ( !pNext.ptr() )
498                             return false;
499
500                         {
501                             typename gc::Guard g;
502                             while ( pNext->m_pNext.load(memory_model::memory_order_relaxed).ptr() && m_pTail.load(memory_model::memory_order_relaxed) == t ) {
503                                 pNext = guard_node( g, pNext->m_pNext );
504                                 res.guards.assign( 2, g.template get<value_type>() );
505                             }
506                         }
507
508                         m_pTail.compare_exchange_weak( t, marked_ptr(pNext.ptr()), memory_model::memory_order_release, memory_model::memory_order_relaxed );
509                     }
510                     else {
511                         marked_ptr iter( h );
512                         size_t hops = 0;
513
514                         typename gc::Guard g;
515
516                         while ( pNext.ptr() && pNext.bits() && iter.ptr() != t.ptr() && m_pHead.load(memory_model::memory_order_relaxed) == h ) {
517                             iter = pNext;
518                             g.assign( res.guards.template get<value_type>(2) );
519                             pNext = guard_node( res.guards, 2, pNext->m_pNext );
520                             ++hops;
521                         }
522
523                         if ( m_pHead.load(memory_model::memory_order_relaxed) != h )
524                             continue;
525
526                         if ( iter.ptr() == t.ptr() )
527                             free_chain( h, iter );
528                         else if ( bDeque ) {
529                             res.pNext = pNext.ptr();
530
531                             if ( iter->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNext.ptr(), 1 ), memory_model::memory_order_release, memory_model::memory_order_relaxed ) ) {
532                                 if ( hops >= m_nMaxHops )
533                                     free_chain( h, pNext );
534                                 break;
535                             }
536                         }
537                         else
538                             return true;
539                     }
540                 }
541
542                 if ( bDeque )
543                     m_Stat.onDequeueRace();
544                 bkoff();
545             }
546
547             if ( bDeque ) {
548                 --m_ItemCounter;
549                 m_Stat.onDequeue();
550             }
551
552             return true;
553         }
554
555         void free_chain( marked_ptr head, marked_ptr newHead )
556         {
557             // "head" and "newHead" are guarded
558
559             if ( m_pHead.compare_exchange_strong( head, marked_ptr(newHead.ptr()), memory_model::memory_order_release, memory_model::memory_order_relaxed ))
560             {
561                 typename gc::template GuardArray<2> guards;
562                 guards.assign( 0, node_traits::to_value_ptr(head.ptr()) );
563                 while ( head.ptr() != newHead.ptr() ) {
564                     marked_ptr pNext = guard_node( guards, 1, head->m_pNext );
565                     assert( pNext.bits() != 0 );
566                     dispose_node( head.ptr() );
567                     guards.assign( 0, guards.template get<value_type>(1) );
568                     head = pNext;
569                 }
570             }
571         }
572
573         static void clear_links( node_type * pNode )
574         {
575             pNode->m_pNext.store( marked_ptr( nullptr ), memory_model::memory_order_release );
576         }
577
578         void dispose_node( node_type * p )
579         {
580             if ( p != &m_Dummy ) {
581                 struct internal_disposer
582                 {
583                     void operator()( value_type * p )
584                     {
585                         assert( p != nullptr );
586                         BasketQueue::clear_links( node_traits::to_node_ptr( p ) );
587                         disposer()(p);
588                     }
589                 };
590                 gc::template retire<internal_disposer>( node_traits::to_value_ptr(p) );
591             }
592         }
593         //@endcond
594
595     public:
596         /// Initializes empty queue
597         BasketQueue()
598             : m_pHead( &m_Dummy )
599             , m_pTail( &m_Dummy )
600             , m_nMaxHops( 3 )
601         {}
602
603         /// Destructor clears the queue
604         /**
605             Since the baskets queue contains at least one item even
606             if the queue is empty, the destructor may call item disposer.
607         */
608         ~BasketQueue()
609         {
610             clear();
611
612             node_type * pHead = m_pHead.load(memory_model::memory_order_relaxed).ptr();
613             assert( pHead != nullptr );
614
615             {
616                 node_type * pNext = pHead->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
617                 while ( pNext ) {
618                     node_type * p = pNext;
619                     pNext = pNext->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
620                     p->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
621                     dispose_node( p );
622                 }
623                 pHead->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
624                 //m_pTail.store( marked_ptr( pHead ), memory_model::memory_order_relaxed );
625             }
626
627             m_pHead.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
628             m_pTail.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
629
630             dispose_node( pHead );
631         }
632
633         /// Enqueues \p val value into the queue.
634         /** @anchor cds_intrusive_BasketQueue_enqueue
635             The function always returns \p true.
636         */
637         bool enqueue( value_type& val )
638         {
639             node_type * pNew = node_traits::to_node_ptr( val );
640             link_checker::is_empty( pNew );
641
642             typename gc::Guard guard;
643             back_off bkoff;
644
645             marked_ptr t;
646             while ( true ) {
647                 t = guard_node( guard, m_pTail );
648
649                 marked_ptr pNext = t->m_pNext.load(memory_model::memory_order_acquire );
650
651                 if ( pNext.ptr() == nullptr ) {
652                     pNew->m_pNext.store( marked_ptr(), memory_model::memory_order_release );
653                     if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr(pNew), memory_model::memory_order_release, memory_model::memory_order_relaxed ) ) {
654                         if ( !m_pTail.compare_exchange_strong( t, marked_ptr(pNew), memory_model::memory_order_release, memory_model::memory_order_relaxed ))
655                             m_Stat.onAdvanceTailFailed();
656                         break;
657                     }
658
659                     // Try adding to basket
660                     m_Stat.onTryAddBasket();
661
662                     // Reread tail next
663                     typename gc::Guard gNext;
664
665                 try_again:
666                     pNext = guard_node( gNext, t->m_pNext );
667
668                     // add to the basket
669                     if ( m_pTail.load(memory_model::memory_order_relaxed) == t
670                          && t->m_pNext.load( memory_model::memory_order_relaxed) == pNext
671                          && !pNext.bits()  )
672                     {
673                         bkoff();
674                         pNew->m_pNext.store( pNext, memory_model::memory_order_release );
675                         if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNew ), memory_model::memory_order_release, memory_model::memory_order_relaxed )) {
676                             m_Stat.onAddBasket();
677                             break;
678                         }
679                         goto try_again;
680                     }
681                 }
682                 else {
683                     // Tail is misplaced, advance it
684
685                     typename gc::template GuardArray<2> g;
686                     g.assign( 0, node_traits::to_value_ptr( pNext.ptr() ) );
687                     if ( t->m_pNext.load( memory_model::memory_order_relaxed ) != pNext ) {
688                         m_Stat.onEnqueueRace();
689                         bkoff();
690                         continue;
691                     }
692
693                     marked_ptr p;
694                     bool bTailOk = true;
695                     while ( (p = pNext->m_pNext.load( memory_model::memory_order_relaxed )).ptr() != nullptr )
696                     {
697                         bTailOk = m_pTail.load( memory_model::memory_order_relaxed ) == t;
698                         if ( !bTailOk )
699                             break;
700
701                         g.assign( 1, node_traits::to_value_ptr( p.ptr() ));
702                         if ( pNext->m_pNext.load(memory_model::memory_order_relaxed) != p )
703                             continue;
704                         pNext = p;
705                         g.assign( 0, g.template get<value_type>( 1 ) );
706                     }
707                     if ( !bTailOk || !m_pTail.compare_exchange_weak( t, marked_ptr( pNext.ptr() ), memory_model::memory_order_release, memory_model::memory_order_relaxed ))
708                         m_Stat.onAdvanceTailFailed();
709
710                     m_Stat.onBadTail();
711                 }
712
713                 m_Stat.onEnqueueRace();
714             }
715
716             ++m_ItemCounter;
717             m_Stat.onEnqueue();
718
719             return true;
720         }
721
722         /// Synonym for \p enqueue() function
723         bool push( value_type& val )
724         {
725             return enqueue( val );
726         }
727
728         /// Dequeues a value from the queue
729         /** @anchor cds_intrusive_BasketQueue_dequeue
730             If the queue is empty the function returns \p nullptr.
731
732             @note See \p MSQueue::dequeue() note about item disposing
733         */
734         value_type * dequeue()
735         {
736             dequeue_result res;
737
738             if ( do_dequeue( res, true ))
739                 return node_traits::to_value_ptr( *res.pNext );
740             return nullptr;
741         }
742
743         /// Synonym for \p dequeue() function
744         value_type * pop()
745         {
746             return dequeue();
747         }
748
749         /// Checks if the queue is empty
750         /**
751             Note that this function is not \p const.
752             The function is based on \p dequeue() algorithm
753             but really it does not dequeue any item.
754         */
755         bool empty()
756         {
757             dequeue_result res;
758             return !do_dequeue( res, false );
759         }
760
761         /// Clear the queue
762         /**
763             The function repeatedly calls \p dequeue() until it returns \p nullptr.
764             The disposer defined in template \p Traits is called for each item
765             that can be safely disposed.
766         */
767         void clear()
768         {
769             while ( dequeue() );
770         }
771
772         /// Returns queue's item count
773         /**
774             The value returned depends on \p Traits (see basket_queue::traits::item_counter). For \p atomicity::empty_item_counter,
775             this function always returns 0.
776
777             @note Even if you use real item counter and it returns 0, this fact is not mean that the queue
778             is empty. To check queue emptyness use \p empty() method.
779         */
780         size_t size() const
781         {
782             return m_ItemCounter.value();
783         }
784
785         /// Returns reference to internal statistics
786         const stat& statistics() const
787         {
788             return m_Stat;
789         }
790     };
791
792 }} // namespace cds::intrusive
793
794 #endif // #ifndef __CDS_INTRUSIVE_BASKET_QUEUE_H