Merge pull request #36 from khegeman/integration
[libcds.git] / cds / intrusive / basket_queue.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_INTRUSIVE_BASKET_QUEUE_H
4 #define CDSLIB_INTRUSIVE_BASKET_QUEUE_H
5
6 #include <type_traits>
7 #include <cds/intrusive/details/single_link_struct.h>
8 #include <cds/details/marked_ptr.h>
9
10 namespace cds { namespace intrusive {
11
12     /// BasketQueue -related definitions
13     /** @ingroup cds_intrusive_helper
14     */
15     namespace basket_queue {
16         /// BasketQueue node
17         /**
18             Template parameters:
19             Template parameters:
20             - GC - garbage collector used
21             - Tag - a \ref cds_intrusive_hook_tag "tag"
22             */
23         template <class GC, typename Tag = opt::none>
24         struct node
25         {
26             typedef GC      gc  ;   ///< Garbage collector
27             typedef Tag     tag ;   ///< tag
28
29             typedef cds::details::marked_ptr<node, 1>                    marked_ptr;        ///< marked pointer
30             typedef typename gc::template atomic_marked_ptr< marked_ptr> atomic_marked_ptr; ///< atomic marked pointer specific for GC
31
32             /// Rebind node for other template parameters
33             template <class GC2, typename Tag2 = tag>
34             struct rebind {
35                 typedef node<GC2, Tag2>  other ;    ///< Rebinding result
36             };
37
38             atomic_marked_ptr m_pNext ; ///< pointer to the next node in the container
39
40             node()
41                 : m_pNext( nullptr )
42             {}
43         };
44
45         using cds::intrusive::single_link::default_hook;
46
47         //@cond
48         template < typename HookType, typename... Options>
49         struct hook
50         {
51             typedef typename opt::make_options< default_hook, Options...>::type  options;
52             typedef typename options::gc    gc;
53             typedef typename options::tag   tag;
54             typedef node<gc, tag> node_type;
55             typedef HookType     hook_type;
56         };
57         //@endcond
58
59
60         /// Base hook
61         /**
62             \p Options are:
63             - opt::gc - garbage collector used.
64             - opt::tag - a \ref cds_intrusive_hook_tag "tag"
65         */
66         template < typename... Options >
67         struct base_hook: public hook< opt::base_hook_tag, Options... >
68         {};
69
70         /// Member hook
71         /**
72             \p MemberOffset defines offset in bytes of \ref node member into your structure.
73             Use \p offsetof macro to define \p MemberOffset
74
75             \p Options are:
76             - opt::gc - garbage collector used.
77             - opt::tag - a \ref cds_intrusive_hook_tag "tag"
78         */
79         template < size_t MemberOffset, typename... Options >
80         struct member_hook: public hook< opt::member_hook_tag, Options... >
81         {
82             //@cond
83             static const size_t c_nMemberOffset = MemberOffset;
84             //@endcond
85         };
86
87         /// Traits hook
88         /**
89             \p NodeTraits defines type traits for node.
90             See \ref node_traits for \p NodeTraits interface description
91
92             \p Options are:
93             - opt::gc - garbage collector used.
94             - opt::tag - a \ref cds_intrusive_hook_tag "tag"
95         */
96         template <typename NodeTraits, typename... Options >
97         struct traits_hook: public hook< opt::traits_hook_tag, Options... >
98         {
99             //@cond
100             typedef NodeTraits node_traits;
101             //@endcond
102         };
103
104         /// BasketQueue internal statistics. May be used for debugging or profiling
105         /**
106             Template argument \p Counter defines type of counter.
107             Default is \p cds::atomicity::event_counter, that is weak, i.e. it is not guaranteed
108             strict event counting.
109             You may use stronger type of counter like as \p cds::atomicity::item_counter,
110             or even integral type, for example, \p int.
111         */
112         template <typename Counter = cds::atomicity::event_counter >
113         struct stat
114         {
115             typedef Counter counter_type;   ///< Counter type
116
117             counter_type m_EnqueueCount;    ///< Enqueue call count
118             counter_type m_DequeueCount;    ///< Dequeue call count
119             counter_type m_EnqueueRace;     ///< Count of enqueue race conditions encountered
120             counter_type m_DequeueRace;     ///< Count of dequeue race conditions encountered
121             counter_type m_AdvanceTailError;///< Count of "advance tail failed" events
122             counter_type m_BadTail;         ///< Count of events "Tail is not pointed to the last item in the queue"
123             counter_type m_TryAddBasket;    ///< Count of attemps adding new item to a basket (only or BasketQueue, for other queue this metric is not used)
124             counter_type m_AddBasketCount;  ///< Count of events "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
125             counter_type m_EmptyDequeue;    ///< Count of dequeue from empty queue
126
127             /// Register enqueue call
128             void onEnqueue()                { ++m_EnqueueCount; }
129             /// Register dequeue call
130             void onDequeue()                { ++m_DequeueCount; }
131             /// Register enqueue race event
132             void onEnqueueRace()            { ++m_EnqueueRace; }
133             /// Register dequeue race event
134             void onDequeueRace()            { ++m_DequeueRace; }
135             /// Register "advance tail failed" event
136             void onAdvanceTailFailed()      { ++m_AdvanceTailError; }
137             /// Register event "Tail is not pointed to last item in the queue"
138             void onBadTail()                { ++m_BadTail; }
139             /// Register an attempt t add new item to basket
140             void onTryAddBasket()           { ++m_TryAddBasket; }
141             /// Register event "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
142             void onAddBasket()              { ++m_AddBasketCount; }
143             /// Register dequeuing from empty queue
144             void onEmptyDequeue()           { ++m_EmptyDequeue; }
145
146
147             //@cond
148             void reset()
149             {
150                 m_EnqueueCount.reset();
151                 m_DequeueCount.reset();
152                 m_EnqueueRace.reset();
153                 m_DequeueRace.reset();
154                 m_AdvanceTailError.reset();
155                 m_BadTail.reset();
156                 m_TryAddBasket.reset();
157                 m_AddBasketCount.reset();
158                 m_EmptyDequeue.reset();
159             }
160
161             stat& operator +=( stat const& s )
162             {
163                 m_EnqueueCount  += s.m_EnqueueCount.get();
164                 m_DequeueCount  += s.m_DequeueCount.get();
165                 m_EnqueueRace   += s.m_EnqueueRace.get();
166                 m_DequeueRace   += s.m_DequeueRace.get();
167                 m_AdvanceTailError += s.m_AdvanceTailError.get();
168                 m_BadTail       += s.m_BadTail.get();
169                 m_TryAddBasket  += s.m_TryAddBasket.get();
170                 m_AddBasketCount += s.m_AddBasketCount.get();
171                 m_EmptyDequeue  += s.m_EmptyDequeue.get();
172                 return *this;
173             }
174             //@endcond
175         };
176
177         /// Dummy BasketQueue statistics - no counting is performed, no overhead. Support interface like \p basket_queue::stat
178         struct empty_stat
179         {
180             //@cond
181             void onEnqueue()            const {}
182             void onDequeue()            const {}
183             void onEnqueueRace()        const {}
184             void onDequeueRace()        const {}
185             void onAdvanceTailFailed()  const {}
186             void onBadTail()            const {}
187             void onTryAddBasket()       const {}
188             void onAddBasket()          const {}
189             void onEmptyDequeue()       const {}
190
191             void reset() {}
192             empty_stat& operator +=( empty_stat const& )
193             {
194                 return *this;
195             }
196             //@endcond
197         };
198
199         /// BasketQueue default type traits
200         struct traits
201         {
202             /// Back-off strategy
203             typedef cds::backoff::empty             back_off;
204
205             /// Hook, possible types are \p basket_queue::base_hook, \p basket_queue::member_hook, \p basket_queue::traits_hook
206             typedef basket_queue::base_hook<>       hook;
207
208             /// The functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used for dequeuing
209             typedef opt::v::empty_disposer          disposer;
210
211             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
212             typedef atomicity::empty_item_counter   item_counter;
213
214             /// Internal statistics (by default, disabled)
215             /**
216                 Possible option value are: \p basket_queue::stat, \p basket_queue::empty_stat (the default),
217                 user-provided class that supports \p %basket_queue::stat interface.
218             */
219             typedef basket_queue::empty_stat        stat;
220
221             /// C++ memory ordering model
222             /**
223                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
224                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
225             */
226             typedef opt::v::relaxed_ordering        memory_model;
227
228             /// Link checking, see \p cds::opt::link_checker
229             static CDS_CONSTEXPR const opt::link_check_type link_checker = opt::debug_check_link;
230
231             /// Alignment for internal queue data. Default is \p opt::cache_line_alignment
232             enum { alignment = opt::cache_line_alignment };
233         };
234
235
236         /// Metafunction converting option list to \p basket_queue::traits
237         /**
238             Supported \p Options are:
239
240             - opt::hook - hook used. Possible hooks are: \p basket_queue::base_hook, \p basket_queue::member_hook, \p basket_queue::traits_hook.
241                 If the option is not specified, \p %basket_queue::base_hook<> is used.
242             - opt::back_off - back-off strategy used, default is \p cds::backoff::empty.
243             - opt::disposer - the functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used
244                 when dequeuing.
245             - opt::link_checker - the type of node's link fields checking. Default is \p opt::debug_check_link
246             - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
247                 To enable item counting use \p cds::atomicity::item_counter
248             - opt::stat - the type to gather internal statistics.
249                 Possible statistics types are: \p basket_queue::stat, \p basket_queue::empty_stat, user-provided class that supports \p %basket_queue::stat interface.
250                 Default is \p %basket_queue::empty_stat (internal statistics disabled).
251             - opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment
252             - opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
253                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
254
255             Example: declare \p %BasketQueue with item counting and internal statistics
256             \code
257             typedef cds::intrusive::BasketQueue< cds::gc::HP, Foo,
258                 typename cds::intrusive::basket_queue::make_traits<
259                     cds::intrusive::opt:hook< cds::intrusive::basket_queue::base_hook< cds::opt::gc<cds:gc::HP> >>,
260                     cds::opt::item_counte< cds::atomicity::item_counter >,
261                     cds::opt::stat< cds::intrusive::basket_queue::stat<> >
262                 >::type
263             > myQueue;
264             \endcode
265         */
266         template <typename... Options>
267         struct make_traits {
268 #   ifdef CDS_DOXYGEN_INVOKED
269             typedef implementation_defined type;   ///< Metafunction result
270 #   else
271             typedef typename cds::opt::make_options<
272                 typename cds::opt::find_type_traits< traits, Options... >::type
273                 , Options...
274             >::type type;
275 #   endif
276         };
277     }   // namespace basket_queue
278
279     /// Basket lock-free queue (intrusive variant)
280     /** @ingroup cds_intrusive_queue
281         Implementation of basket queue algorithm.
282
283         \par Source:
284             [2007] Moshe Hoffman, Ori Shalev, Nir Shavit "The Baskets Queue"
285
286         <b>Key idea</b>
287
288         In the \93basket\94 approach, instead of
289         the traditional ordered list of nodes, the queue consists of an ordered list of groups
290         of nodes (logical baskets). The order of nodes in each basket need not be specified, and in
291         fact, it is easiest to maintain them in FIFO order. The baskets fulfill the following basic
292         rules:
293         - Each basket has a time interval in which all its nodes\92 enqueue operations overlap.
294         - The baskets are ordered by the order of their respective time intervals.
295         - For each basket, its nodes\92 dequeue operations occur after its time interval.
296         - The dequeue operations are performed according to the order of baskets.
297
298         Two properties define the FIFO order of nodes:
299         - The order of nodes in a basket is not specified.
300         - The order of nodes in different baskets is the FIFO-order of their respective baskets.
301
302         In algorithms such as the MS-queue or optimistic
303         queue, threads enqueue items by applying a Compare-and-swap (CAS) operation to the
304         queue\92s tail pointer, and all the threads that fail on a particular CAS operation (and also
305         the winner of that CAS) overlap in time. In particular, they share the time interval of
306         the CAS operation itself. Hence, all the threads that fail to CAS on the tail-node of
307         the queue may be inserted into the same basket. By integrating the basket-mechanism
308         as the back-off mechanism, the time usually spent on backing-off before trying to link
309         onto the new tail, can now be utilized to insert the failed operations into the basket,
310         allowing enqueues to complete sooner. In the meantime, the next successful CAS operations
311         by enqueues allow new baskets to be formed down the list, and these can be
312         filled concurrently. Moreover, the failed operations don\92t retry their link attempt on the
313         new tail, lowering the overall contention on it. This leads to a queue
314         algorithm that unlike all former concurrent queue algorithms requires virtually no tuning
315         of the backoff mechanisms to reduce contention, making the algorithm an attractive
316         out-of-the-box queue.
317
318         In order to enqueue, just as in MSQueue, a thread first tries to link the new node to
319         the last node. If it failed to do so, then another thread has already succeeded. Thus it
320         tries to insert the new node into the new basket that was created by the winner thread.
321         To dequeue a node, a thread first reads the head of the queue to obtain the
322         oldest basket. It may then dequeue any node in the oldest basket.
323
324         <b>Template arguments:</b>
325         - \p GC - garbage collector type: \p gc::HP, \p gc::DHP
326         - \p T - type of value to be stored in the queue
327         - \p Traits - queue traits, default is \p basket_queue::traits. You can use \p basket_queue::make_traits
328             metafunction to make your traits or just derive your traits from \p %basket_queue::traits:
329             \code
330             struct myTraits: public cds::intrusive::basket_queue::traits {
331                 typedef cds::intrusive::basket_queue::stat<> stat;
332                 typedef cds::atomicity::item_counter    item_counter;
333             };
334             typedef cds::intrusive::BasketQueue< cds::gc::HP, Foo, myTraits > myQueue;
335
336             // Equivalent make_traits example:
337             typedef cds::intrusive::BasketQueue< cds::gc::HP, Foo,
338                 typename cds::intrusive::basket_queue::make_traits<
339                     cds::opt::stat< cds::intrusive::basket_queue::stat<> >,
340                     cds::opt::item_counter< cds::atomicity::item_counter >
341                 >::type
342             > myQueue;
343             \endcode
344
345         Garbage collecting schema \p GC must be consistent with the \p basket_queue::node GC.
346
347         \par About item disposing
348         Like \p MSQueue, the Baskets queue algo has a key feature: even if the queue is empty it contains one item that is "dummy" one from
349         the standpoint of the algo. See \p dequeue() function doc for explanation.
350
351         \par Examples
352         \code
353         #include <cds/intrusive/basket_queue.h>
354         #include <cds/gc/hp.h>
355
356         namespace ci = cds::inrtusive;
357         typedef cds::gc::HP hp_gc;
358
359         // Basket queue with Hazard Pointer garbage collector, base hook + item disposer:
360         struct Foo: public ci::basket_queue::node< hp_gc >
361         {
362             // Your data
363             ...
364         };
365
366         // Disposer for Foo struct just deletes the object passed in
367         struct fooDisposer {
368             void operator()( Foo * p )
369             {
370                 delete p;
371             }
372         };
373
374         struct fooTraits: public ci::basket_queue::traits {
375             typedef ci::basket_queue::base_hook< ci::opt::gc<hp_gc> > hook;
376             typedef fooDisposer disposer;
377         };
378         typedef ci::BasketQueue< hp_gc, Foo, fooTraits > fooQueue;
379
380         // BasketQueue with Hazard Pointer garbage collector,
381         // member hook + item disposer + item counter,
382         // without alignment of internal queue data:
383         struct Bar
384         {
385             // Your data
386             ...
387             ci::basket_queue::node< hp_gc > hMember;
388         };
389
390         struct barTraits: public
391             ci::basket_queue::make_traits<
392                 ci::opt::hook<
393                     ci::basket_queue::member_hook<
394                         offsetof(Bar, hMember)
395                         ,ci::opt::gc<hp_gc>
396                     >
397                 >
398                 ,ci::opt::disposer< fooDisposer >
399                 ,cds::opt::item_counter< cds::atomicity::item_counter >
400                 ,cds::opt::alignment< cds::opt::no_special_alignment >
401             >::type
402         {};
403         typedef ci::BasketQueue< hp_gc, Bar, barTraits > barQueue;
404         \endcode
405     */
406     template <typename GC, typename T, typename Traits = basket_queue::traits >
407     class BasketQueue
408     {
409     public:
410         typedef GC gc;          ///< Garbage collector
411         typedef T  value_type;  ///< type of value stored in the queue
412         typedef Traits traits;  ///< Queue traits
413         typedef typename traits::hook       hook;       ///< hook type
414         typedef typename hook::node_type    node_type;  ///< node type
415         typedef typename traits::disposer   disposer;   ///< disposer used
416         typedef typename get_node_traits< value_type, node_type, hook>::type node_traits;   ///< node traits
417         typedef typename single_link::get_link_checker< node_type, traits::link_checker >::type link_checker;   ///< link checker
418
419         typedef typename traits::back_off       back_off;     ///< back-off strategy
420         typedef typename traits::item_counter   item_counter; ///< Item counting policy used
421         typedef typename traits::stat           stat;         ///< Internal statistics policy used
422         typedef typename traits::memory_model   memory_model; ///< Memory ordering. See cds::opt::memory_model option
423
424         /// Rebind template arguments
425         template <typename GC2, typename T2, typename Traits2>
426         struct rebind {
427             typedef BasketQueue< GC2, T2, Traits2> other   ;   ///< Rebinding result
428         };
429
430         static CDS_CONSTEXPR const size_t m_nHazardPtrCount = 6 ; ///< Count of hazard pointer required for the algorithm
431
432     protected:
433         //@cond
434         typedef typename node_type::marked_ptr   marked_ptr;
435         typedef typename node_type::atomic_marked_ptr atomic_marked_ptr;
436
437         typedef intrusive::node_to_value<BasketQueue> node_to_value;
438         typedef typename opt::details::alignment_setter< atomic_marked_ptr, traits::alignment >::type aligned_node_ptr;
439         typedef typename opt::details::alignment_setter< node_type, traits::alignment >::type dummy_node_type;
440
441         // GC and node_type::gc must be the same
442         static_assert( std::is_same<gc, typename node_type::gc>::value, "GC and node_type::gc must be the same");
443         //@endcond
444
445         aligned_node_ptr    m_pHead ;           ///< Queue's head pointer (aligned)
446         aligned_node_ptr    m_pTail ;           ///< Queue's tail pointer (aligned)
447         dummy_node_type     m_Dummy ;           ///< dummy node
448         item_counter        m_ItemCounter   ;   ///< Item counter
449         stat                m_Stat  ;           ///< Internal statistics
450         //@cond
451         size_t const        m_nMaxHops;
452         //@endcond
453
454         //@cond
455
456         template <size_t Count>
457         static marked_ptr guard_node( typename gc::template GuardArray<Count>& g, size_t idx, atomic_marked_ptr const& p )
458         {
459             marked_ptr pg;
460             while ( true ) {
461                 pg = p.load( memory_model::memory_order_relaxed );
462                 g.assign( idx, node_traits::to_value_ptr( pg.ptr() ) );
463                 if ( p.load( memory_model::memory_order_acquire) == pg ) {
464                     return pg;
465                 }
466             }
467         }
468
469         static marked_ptr guard_node( typename gc::Guard& g, atomic_marked_ptr const& p )
470         {
471             marked_ptr pg;
472             while ( true ) {
473                 pg = p.load( memory_model::memory_order_relaxed );
474                 g.assign( node_traits::to_value_ptr( pg.ptr() ) );
475                 if ( p.load( memory_model::memory_order_acquire) == pg ) {
476                     return pg;
477                 }
478             }
479         }
480
481         struct dequeue_result {
482             typename gc::template GuardArray<3>  guards;
483             node_type * pNext;
484         };
485
486         bool do_dequeue( dequeue_result& res, bool bDeque )
487         {
488             // Note:
489             // If bDeque == false then the function is called from empty method and no real dequeuing operation is performed
490
491             back_off bkoff;
492
493             marked_ptr h;
494             marked_ptr t;
495             marked_ptr pNext;
496
497             while ( true ) {
498                 h = guard_node( res.guards, 0, m_pHead );
499                 t = guard_node( res.guards, 1, m_pTail );
500                 pNext = guard_node( res.guards, 2, h->m_pNext );
501
502                 if ( h == m_pHead.load( memory_model::memory_order_acquire ) ) {
503                     if ( h.ptr() == t.ptr() ) {
504                         if ( !pNext.ptr() ) {
505                             m_Stat.onEmptyDequeue();
506                             return false;
507                         }
508
509                         {
510                             typename gc::Guard g;
511                             while ( pNext->m_pNext.load(memory_model::memory_order_relaxed).ptr() && m_pTail.load(memory_model::memory_order_relaxed) == t ) {
512                                 pNext = guard_node( g, pNext->m_pNext );
513                                 res.guards.assign( 2, g.template get<value_type>() );
514                             }
515                         }
516
517                         m_pTail.compare_exchange_weak( t, marked_ptr(pNext.ptr()), memory_model::memory_order_release, memory_model::memory_order_relaxed );
518                     }
519                     else {
520                         marked_ptr iter( h );
521                         size_t hops = 0;
522
523                         typename gc::Guard g;
524
525                         while ( pNext.ptr() && pNext.bits() && iter.ptr() != t.ptr() && m_pHead.load(memory_model::memory_order_relaxed) == h ) {
526                             iter = pNext;
527                             g.assign( res.guards.template get<value_type>(2) );
528                             pNext = guard_node( res.guards, 2, pNext->m_pNext );
529                             ++hops;
530                         }
531
532                         if ( m_pHead.load(memory_model::memory_order_relaxed) != h )
533                             continue;
534
535                         if ( iter.ptr() == t.ptr() )
536                             free_chain( h, iter );
537                         else if ( bDeque ) {
538                             res.pNext = pNext.ptr();
539
540                             if ( iter->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNext.ptr(), 1 ), memory_model::memory_order_release, memory_model::memory_order_relaxed ) ) {
541                                 if ( hops >= m_nMaxHops )
542                                     free_chain( h, pNext );
543                                 break;
544                             }
545                         }
546                         else
547                             return true;
548                     }
549                 }
550
551                 if ( bDeque )
552                     m_Stat.onDequeueRace();
553                 bkoff();
554             }
555
556             if ( bDeque ) {
557                 --m_ItemCounter;
558                 m_Stat.onDequeue();
559             }
560
561             return true;
562         }
563
564         void free_chain( marked_ptr head, marked_ptr newHead )
565         {
566             // "head" and "newHead" are guarded
567
568             if ( m_pHead.compare_exchange_strong( head, marked_ptr(newHead.ptr()), memory_model::memory_order_release, memory_model::memory_order_relaxed ))
569             {
570                 typename gc::template GuardArray<2> guards;
571                 guards.assign( 0, node_traits::to_value_ptr(head.ptr()) );
572                 while ( head.ptr() != newHead.ptr() ) {
573                     marked_ptr pNext = guard_node( guards, 1, head->m_pNext );
574                     assert( pNext.bits() != 0 );
575                     dispose_node( head.ptr() );
576                     guards.assign( 0, guards.template get<value_type>(1) );
577                     head = pNext;
578                 }
579             }
580         }
581
582         static void clear_links( node_type * pNode )
583         {
584             pNode->m_pNext.store( marked_ptr( nullptr ), memory_model::memory_order_release );
585         }
586
587         void dispose_node( node_type * p )
588         {
589             if ( p != &m_Dummy ) {
590                 struct internal_disposer
591                 {
592                     void operator()( value_type * p )
593                     {
594                         assert( p != nullptr );
595                         BasketQueue::clear_links( node_traits::to_node_ptr( p ) );
596                         disposer()(p);
597                     }
598                 };
599                 gc::template retire<internal_disposer>( node_traits::to_value_ptr(p) );
600             }
601         }
602         //@endcond
603
604     public:
605         /// Initializes empty queue
606         BasketQueue()
607             : m_pHead( &m_Dummy )
608             , m_pTail( &m_Dummy )
609             , m_nMaxHops( 3 )
610         {}
611
612         /// Destructor clears the queue
613         /**
614             Since the baskets queue contains at least one item even
615             if the queue is empty, the destructor may call item disposer.
616         */
617         ~BasketQueue()
618         {
619             clear();
620
621             node_type * pHead = m_pHead.load(memory_model::memory_order_relaxed).ptr();
622             assert( pHead != nullptr );
623
624             {
625                 node_type * pNext = pHead->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
626                 while ( pNext ) {
627                     node_type * p = pNext;
628                     pNext = pNext->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
629                     p->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
630                     dispose_node( p );
631                 }
632                 pHead->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
633                 //m_pTail.store( marked_ptr( pHead ), memory_model::memory_order_relaxed );
634             }
635
636             m_pHead.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
637             m_pTail.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
638
639             dispose_node( pHead );
640         }
641
642         /// Enqueues \p val value into the queue.
643         /** @anchor cds_intrusive_BasketQueue_enqueue
644             The function always returns \p true.
645         */
646         bool enqueue( value_type& val )
647         {
648             node_type * pNew = node_traits::to_node_ptr( val );
649             link_checker::is_empty( pNew );
650
651             typename gc::Guard guard;
652             back_off bkoff;
653
654             marked_ptr t;
655             while ( true ) {
656                 t = guard_node( guard, m_pTail );
657
658                 marked_ptr pNext = t->m_pNext.load(memory_model::memory_order_acquire );
659
660                 if ( pNext.ptr() == nullptr ) {
661                     pNew->m_pNext.store( marked_ptr(), memory_model::memory_order_release );
662                     if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr(pNew), memory_model::memory_order_release, memory_model::memory_order_relaxed ) ) {
663                         if ( !m_pTail.compare_exchange_strong( t, marked_ptr(pNew), memory_model::memory_order_release, memory_model::memory_order_relaxed ))
664                             m_Stat.onAdvanceTailFailed();
665                         break;
666                     }
667
668                     // Try adding to basket
669                     m_Stat.onTryAddBasket();
670
671                     // Reread tail next
672                     typename gc::Guard gNext;
673
674                 try_again:
675                     pNext = guard_node( gNext, t->m_pNext );
676
677                     // add to the basket
678                     if ( m_pTail.load(memory_model::memory_order_relaxed) == t
679                          && t->m_pNext.load( memory_model::memory_order_relaxed) == pNext
680                          && !pNext.bits()  )
681                     {
682                         bkoff();
683                         pNew->m_pNext.store( pNext, memory_model::memory_order_release );
684                         if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNew ), memory_model::memory_order_release, memory_model::memory_order_relaxed )) {
685                             m_Stat.onAddBasket();
686                             break;
687                         }
688                         goto try_again;
689                     }
690                 }
691                 else {
692                     // Tail is misplaced, advance it
693
694                     typename gc::template GuardArray<2> g;
695                     g.assign( 0, node_traits::to_value_ptr( pNext.ptr() ) );
696                     if ( t->m_pNext.load( memory_model::memory_order_relaxed ) != pNext ) {
697                         m_Stat.onEnqueueRace();
698                         bkoff();
699                         continue;
700                     }
701
702                     marked_ptr p;
703                     bool bTailOk = true;
704                     while ( (p = pNext->m_pNext.load( memory_model::memory_order_relaxed )).ptr() != nullptr )
705                     {
706                         bTailOk = m_pTail.load( memory_model::memory_order_relaxed ) == t;
707                         if ( !bTailOk )
708                             break;
709
710                         g.assign( 1, node_traits::to_value_ptr( p.ptr() ));
711                         if ( pNext->m_pNext.load(memory_model::memory_order_relaxed) != p )
712                             continue;
713                         pNext = p;
714                         g.assign( 0, g.template get<value_type>( 1 ) );
715                     }
716                     if ( !bTailOk || !m_pTail.compare_exchange_weak( t, marked_ptr( pNext.ptr() ), memory_model::memory_order_release, memory_model::memory_order_relaxed ))
717                         m_Stat.onAdvanceTailFailed();
718
719                     m_Stat.onBadTail();
720                 }
721
722                 m_Stat.onEnqueueRace();
723             }
724
725             ++m_ItemCounter;
726             m_Stat.onEnqueue();
727
728             return true;
729         }
730
731         /// Synonym for \p enqueue() function
732         bool push( value_type& val )
733         {
734             return enqueue( val );
735         }
736
737         /// Dequeues a value from the queue
738         /** @anchor cds_intrusive_BasketQueue_dequeue
739             If the queue is empty the function returns \p nullptr.
740
741             @note See \p MSQueue::dequeue() note about item disposing
742         */
743         value_type * dequeue()
744         {
745             dequeue_result res;
746
747             if ( do_dequeue( res, true ))
748                 return node_traits::to_value_ptr( *res.pNext );
749             return nullptr;
750         }
751
752         /// Synonym for \p dequeue() function
753         value_type * pop()
754         {
755             return dequeue();
756         }
757
758         /// Checks if the queue is empty
759         /**
760             Note that this function is not \p const.
761             The function is based on \p dequeue() algorithm
762             but really it does not dequeue any item.
763         */
764         bool empty()
765         {
766             dequeue_result res;
767             return !do_dequeue( res, false );
768         }
769
770         /// Clear the queue
771         /**
772             The function repeatedly calls \p dequeue() until it returns \p nullptr.
773             The disposer defined in template \p Traits is called for each item
774             that can be safely disposed.
775         */
776         void clear()
777         {
778             while ( dequeue() );
779         }
780
781         /// Returns queue's item count
782         /**
783             The value returned depends on \p Traits (see basket_queue::traits::item_counter). For \p atomicity::empty_item_counter,
784             this function always returns 0.
785
786             @note Even if you use real item counter and it returns 0, this fact is not mean that the queue
787             is empty. To check queue emptyness use \p empty() method.
788         */
789         size_t size() const
790         {
791             return m_ItemCounter.value();
792         }
793
794         /// Returns reference to internal statistics
795         const stat& statistics() const
796         {
797             return m_Stat;
798         }
799     };
800
801 }} // namespace cds::intrusive
802
803 #endif // #ifndef CDSLIB_INTRUSIVE_BASKET_QUEUE_H