Uses different pass count for different parallel queue test cases
[libcds.git] / cds / intrusive / basket_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_INTRUSIVE_BASKET_QUEUE_H
32 #define CDSLIB_INTRUSIVE_BASKET_QUEUE_H
33
34 #include <type_traits>
35 #include <cds/intrusive/details/single_link_struct.h>
36 #include <cds/details/marked_ptr.h>
37
38 namespace cds { namespace intrusive {
39
40     /// BasketQueue -related definitions
41     /** @ingroup cds_intrusive_helper
42     */
43     namespace basket_queue {
44         /// BasketQueue node
45         /**
46             Template parameters:
47             Template parameters:
48             - GC - garbage collector used
49             - Tag - a \ref cds_intrusive_hook_tag "tag"
50             */
51         template <class GC, typename Tag = opt::none>
52         struct node
53         {
54             typedef GC      gc  ;   ///< Garbage collector
55             typedef Tag     tag ;   ///< tag
56
57             typedef cds::details::marked_ptr<node, 1>                    marked_ptr;        ///< marked pointer
58             typedef typename gc::template atomic_marked_ptr< marked_ptr> atomic_marked_ptr; ///< atomic marked pointer specific for GC
59
60             /// Rebind node for other template parameters
61             template <class GC2, typename Tag2 = tag>
62             struct rebind {
63                 typedef node<GC2, Tag2>  other ;    ///< Rebinding result
64             };
65
66             atomic_marked_ptr m_pNext ; ///< pointer to the next node in the container
67
68             node()
69             {
70                 m_pNext.store( marked_ptr(), atomics::memory_order_release );
71             }
72         };
73
74         using cds::intrusive::single_link::default_hook;
75
76         //@cond
77         template < typename HookType, typename... Options>
78         struct hook
79         {
80             typedef typename opt::make_options< default_hook, Options...>::type  options;
81             typedef typename options::gc    gc;
82             typedef typename options::tag   tag;
83             typedef node<gc, tag> node_type;
84             typedef HookType     hook_type;
85         };
86         //@endcond
87
88
89         /// Base hook
90         /**
91             \p Options are:
92             - opt::gc - garbage collector used.
93             - opt::tag - a \ref cds_intrusive_hook_tag "tag"
94         */
95         template < typename... Options >
96         struct base_hook: public hook< opt::base_hook_tag, Options... >
97         {};
98
99         /// Member hook
100         /**
101             \p MemberOffset defines offset in bytes of \ref node member into your structure.
102             Use \p offsetof macro to define \p MemberOffset
103
104             \p Options are:
105             - opt::gc - garbage collector used.
106             - opt::tag - a \ref cds_intrusive_hook_tag "tag"
107         */
108         template < size_t MemberOffset, typename... Options >
109         struct member_hook: public hook< opt::member_hook_tag, Options... >
110         {
111             //@cond
112             static const size_t c_nMemberOffset = MemberOffset;
113             //@endcond
114         };
115
116         /// Traits hook
117         /**
118             \p NodeTraits defines type traits for node.
119             See \ref node_traits for \p NodeTraits interface description
120
121             \p Options are:
122             - opt::gc - garbage collector used.
123             - opt::tag - a \ref cds_intrusive_hook_tag "tag"
124         */
125         template <typename NodeTraits, typename... Options >
126         struct traits_hook: public hook< opt::traits_hook_tag, Options... >
127         {
128             //@cond
129             typedef NodeTraits node_traits;
130             //@endcond
131         };
132
133         /// BasketQueue internal statistics. May be used for debugging or profiling
134         /**
135             Template argument \p Counter defines type of counter.
136             Default is \p cds::atomicity::event_counter, that is weak, i.e. it is not guaranteed
137             strict event counting.
138             You may use stronger type of counter like as \p cds::atomicity::item_counter,
139             or even integral type, for example, \p int.
140         */
141         template <typename Counter = cds::atomicity::event_counter >
142         struct stat
143         {
144             typedef Counter counter_type;   ///< Counter type
145
146             counter_type m_EnqueueCount;    ///< Enqueue call count
147             counter_type m_DequeueCount;    ///< Dequeue call count
148             counter_type m_EnqueueRace;     ///< Count of enqueue race conditions encountered
149             counter_type m_DequeueRace;     ///< Count of dequeue race conditions encountered
150             counter_type m_AdvanceTailError;///< Count of "advance tail failed" events
151             counter_type m_BadTail;         ///< Count of events "Tail is not pointed to the last item in the queue"
152             counter_type m_TryAddBasket;    ///< Count of attemps adding new item to a basket (only or BasketQueue, for other queue this metric is not used)
153             counter_type m_AddBasketCount;  ///< Count of events "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
154             counter_type m_EmptyDequeue;    ///< Count of dequeue from empty queue
155
156             /// Register enqueue call
157             void onEnqueue()                { ++m_EnqueueCount; }
158             /// Register dequeue call
159             void onDequeue()                { ++m_DequeueCount; }
160             /// Register enqueue race event
161             void onEnqueueRace()            { ++m_EnqueueRace; }
162             /// Register dequeue race event
163             void onDequeueRace()            { ++m_DequeueRace; }
164             /// Register "advance tail failed" event
165             void onAdvanceTailFailed()      { ++m_AdvanceTailError; }
166             /// Register event "Tail is not pointed to last item in the queue"
167             void onBadTail()                { ++m_BadTail; }
168             /// Register an attempt t add new item to basket
169             void onTryAddBasket()           { ++m_TryAddBasket; }
170             /// Register event "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
171             void onAddBasket()              { ++m_AddBasketCount; }
172             /// Register dequeuing from empty queue
173             void onEmptyDequeue()           { ++m_EmptyDequeue; }
174
175
176             //@cond
177             void reset()
178             {
179                 m_EnqueueCount.reset();
180                 m_DequeueCount.reset();
181                 m_EnqueueRace.reset();
182                 m_DequeueRace.reset();
183                 m_AdvanceTailError.reset();
184                 m_BadTail.reset();
185                 m_TryAddBasket.reset();
186                 m_AddBasketCount.reset();
187                 m_EmptyDequeue.reset();
188             }
189
190             stat& operator +=( stat const& s )
191             {
192                 m_EnqueueCount  += s.m_EnqueueCount.get();
193                 m_DequeueCount  += s.m_DequeueCount.get();
194                 m_EnqueueRace   += s.m_EnqueueRace.get();
195                 m_DequeueRace   += s.m_DequeueRace.get();
196                 m_AdvanceTailError += s.m_AdvanceTailError.get();
197                 m_BadTail       += s.m_BadTail.get();
198                 m_TryAddBasket  += s.m_TryAddBasket.get();
199                 m_AddBasketCount += s.m_AddBasketCount.get();
200                 m_EmptyDequeue  += s.m_EmptyDequeue.get();
201                 return *this;
202             }
203             //@endcond
204         };
205
206         /// Dummy BasketQueue statistics - no counting is performed, no overhead. Support interface like \p basket_queue::stat
207         struct empty_stat
208         {
209             //@cond
210             void onEnqueue()            const {}
211             void onDequeue()            const {}
212             void onEnqueueRace()        const {}
213             void onDequeueRace()        const {}
214             void onAdvanceTailFailed()  const {}
215             void onBadTail()            const {}
216             void onTryAddBasket()       const {}
217             void onAddBasket()          const {}
218             void onEmptyDequeue()       const {}
219
220             void reset() {}
221             empty_stat& operator +=( empty_stat const& )
222             {
223                 return *this;
224             }
225             //@endcond
226         };
227
228         /// BasketQueue default type traits
229         struct traits
230         {
231             /// Back-off strategy
232             typedef cds::backoff::empty             back_off;
233
234             /// Hook, possible types are \p basket_queue::base_hook, \p basket_queue::member_hook, \p basket_queue::traits_hook
235             typedef basket_queue::base_hook<>       hook;
236
237             /// The functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used for dequeuing
238             typedef opt::v::empty_disposer          disposer;
239
240             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
241             typedef atomicity::empty_item_counter   item_counter;
242
243             /// Internal statistics (by default, disabled)
244             /**
245                 Possible option value are: \p basket_queue::stat, \p basket_queue::empty_stat (the default),
246                 user-provided class that supports \p %basket_queue::stat interface.
247             */
248             typedef basket_queue::empty_stat        stat;
249
250             /// C++ memory ordering model
251             /**
252                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
253                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
254             */
255             typedef opt::v::relaxed_ordering        memory_model;
256
257             /// Link checking, see \p cds::opt::link_checker
258             static CDS_CONSTEXPR const opt::link_check_type link_checker = opt::debug_check_link;
259
260             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
261             enum { padding = opt::cache_line_padding };
262         };
263
264
265         /// Metafunction converting option list to \p basket_queue::traits
266         /**
267             Supported \p Options are:
268             - \p opt::hook - hook used. Possible hooks are: \p basket_queue::base_hook, \p basket_queue::member_hook, \p basket_queue::traits_hook.
269                 If the option is not specified, \p %basket_queue::base_hook<> is used.
270             - \p opt::back_off - back-off strategy used, default is \p cds::backoff::empty.
271             - \p opt::disposer - the functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used
272                 when dequeuing.
273             - \p opt::link_checker - the type of node's link fields checking. Default is \p opt::debug_check_link
274             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
275                 To enable item counting use \p cds::atomicity::item_counter
276             - \p opt::stat - the type to gather internal statistics.
277                 Possible statistics types are: \p basket_queue::stat, \p basket_queue::empty_stat, user-provided class that supports \p %basket_queue::stat interface.
278                 Default is \p %basket_queue::empty_stat (internal statistics disabled).
279             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
280             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
281                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
282
283             Example: declare \p %BasketQueue with item counting and internal statistics
284             \code
285             typedef cds::intrusive::BasketQueue< cds::gc::HP, Foo,
286                 typename cds::intrusive::basket_queue::make_traits<
287                     cds::intrusive::opt:hook< cds::intrusive::basket_queue::base_hook< cds::opt::gc<cds:gc::HP> >>,
288                     cds::opt::item_counte< cds::atomicity::item_counter >,
289                     cds::opt::stat< cds::intrusive::basket_queue::stat<> >
290                 >::type
291             > myQueue;
292             \endcode
293         */
294         template <typename... Options>
295         struct make_traits {
296 #   ifdef CDS_DOXYGEN_INVOKED
297             typedef implementation_defined type;   ///< Metafunction result
298 #   else
299             typedef typename cds::opt::make_options<
300                 typename cds::opt::find_type_traits< traits, Options... >::type
301                 , Options...
302             >::type type;
303 #   endif
304         };
305     }   // namespace basket_queue
306
307     /// Basket lock-free queue (intrusive variant)
308     /** @ingroup cds_intrusive_queue
309         Implementation of basket queue algorithm.
310
311         \par Source:
312             [2007] Moshe Hoffman, Ori Shalev, Nir Shavit "The Baskets Queue"
313
314         <b>Key idea</b>
315
316         In the 'basket' approach, instead of
317         the traditional ordered list of nodes, the queue consists of an ordered list of groups
318         of nodes (logical baskets). The order of nodes in each basket need not be specified, and in
319         fact, it is easiest to maintain them in FIFO order. The baskets fulfill the following basic
320         rules:
321         - Each basket has a time interval in which all its nodes' enqueue operations overlap.
322         - The baskets are ordered by the order of their respective time intervals.
323         - For each basket, its nodes' dequeue operations occur after its time interval.
324         - The dequeue operations are performed according to the order of baskets.
325
326         Two properties define the FIFO order of nodes:
327         - The order of nodes in a basket is not specified.
328         - The order of nodes in different baskets is the FIFO-order of their respective baskets.
329
330         In algorithms such as the MS-queue or optimistic
331         queue, threads enqueue items by applying a Compare-and-swap (CAS) operation to the
332         queue's tail pointer, and all the threads that fail on a particular CAS operation (and also
333         the winner of that CAS) overlap in time. In particular, they share the time interval of
334         the CAS operation itself. Hence, all the threads that fail to CAS on the tail-node of
335         the queue may be inserted into the same basket. By integrating the basket-mechanism
336         as the back-off mechanism, the time usually spent on backing-off before trying to link
337         onto the new tail, can now be utilized to insert the failed operations into the basket,
338         allowing enqueues to complete sooner. In the meantime, the next successful CAS operations
339         by enqueues allow new baskets to be formed down the list, and these can be
340         filled concurrently. Moreover, the failed operations don't retry their link attempt on the
341         new tail, lowering the overall contention on it. This leads to a queue
342         algorithm that unlike all former concurrent queue algorithms requires virtually no tuning
343         of the backoff mechanisms to reduce contention, making the algorithm an attractive
344         out-of-the-box queue.
345
346         In order to enqueue, just as in \p MSQueue, a thread first tries to link the new node to
347         the last node. If it failed to do so, then another thread has already succeeded. Thus it
348         tries to insert the new node into the new basket that was created by the winner thread.
349         To dequeue a node, a thread first reads the head of the queue to obtain the
350         oldest basket. It may then dequeue any node in the oldest basket.
351
352         <b>Template arguments:</b>
353         - \p GC - garbage collector type: \p gc::HP, \p gc::DHP
354         - \p T - type of value to be stored in the queue
355         - \p Traits - queue traits, default is \p basket_queue::traits. You can use \p basket_queue::make_traits
356             metafunction to make your traits or just derive your traits from \p %basket_queue::traits:
357             \code
358             struct myTraits: public cds::intrusive::basket_queue::traits {
359                 typedef cds::intrusive::basket_queue::stat<> stat;
360                 typedef cds::atomicity::item_counter    item_counter;
361             };
362             typedef cds::intrusive::BasketQueue< cds::gc::HP, Foo, myTraits > myQueue;
363
364             // Equivalent make_traits example:
365             typedef cds::intrusive::BasketQueue< cds::gc::HP, Foo,
366                 typename cds::intrusive::basket_queue::make_traits<
367                     cds::opt::stat< cds::intrusive::basket_queue::stat<> >,
368                     cds::opt::item_counter< cds::atomicity::item_counter >
369                 >::type
370             > myQueue;
371             \endcode
372
373         Garbage collecting schema \p GC must be consistent with the \p basket_queue::node GC.
374
375         \par About item disposing
376         Like \p MSQueue, the Baskets queue algo has a key feature: even if the queue is empty it contains one item that is "dummy" one from
377         the standpoint of the algo. See \p dequeue() function doc for explanation.
378
379         \par Examples
380         \code
381         #include <cds/intrusive/basket_queue.h>
382         #include <cds/gc/hp.h>
383
384         namespace ci = cds::inrtusive;
385         typedef cds::gc::HP hp_gc;
386
387         // Basket queue with Hazard Pointer garbage collector, base hook + item disposer:
388         struct Foo: public ci::basket_queue::node< hp_gc >
389         {
390             // Your data
391             ...
392         };
393
394         // Disposer for Foo struct just deletes the object passed in
395         struct fooDisposer {
396             void operator()( Foo * p )
397             {
398                 delete p;
399             }
400         };
401
402         struct fooTraits: public ci::basket_queue::traits {
403             typedef ci::basket_queue::base_hook< ci::opt::gc<hp_gc> > hook;
404             typedef fooDisposer disposer;
405         };
406         typedef ci::BasketQueue< hp_gc, Foo, fooTraits > fooQueue;
407
408         // BasketQueue with Hazard Pointer garbage collector,
409         // member hook + item disposer + item counter,
410         // without padding of internal queue data:
411         struct Bar
412         {
413             // Your data
414             ...
415             ci::basket_queue::node< hp_gc > hMember;
416         };
417
418         struct barTraits: public
419             ci::basket_queue::make_traits<
420                 ci::opt::hook<
421                     ci::basket_queue::member_hook<
422                         offsetof(Bar, hMember)
423                         ,ci::opt::gc<hp_gc>
424                     >
425                 >
426                 ,ci::opt::disposer< fooDisposer >
427                 ,cds::opt::item_counter< cds::atomicity::item_counter >
428                 ,cds::opt::padding< cds::opt::no_special_padding >
429             >::type
430         {};
431         typedef ci::BasketQueue< hp_gc, Bar, barTraits > barQueue;
432         \endcode
433     */
434     template <typename GC, typename T, typename Traits = basket_queue::traits >
435     class BasketQueue
436     {
437     public:
438         typedef GC gc;          ///< Garbage collector
439         typedef T  value_type;  ///< type of value stored in the queue
440         typedef Traits traits;  ///< Queue traits
441         typedef typename traits::hook       hook;       ///< hook type
442         typedef typename hook::node_type    node_type;  ///< node type
443         typedef typename traits::disposer   disposer;   ///< disposer used
444         typedef typename get_node_traits< value_type, node_type, hook>::type node_traits;   ///< node traits
445         typedef typename single_link::get_link_checker< node_type, traits::link_checker >::type link_checker;   ///< link checker
446
447         typedef typename traits::back_off       back_off;     ///< back-off strategy
448         typedef typename traits::item_counter   item_counter; ///< Item counting policy used
449         typedef typename traits::stat           stat;         ///< Internal statistics policy used
450         typedef typename traits::memory_model   memory_model; ///< Memory ordering. See cds::opt::memory_model option
451
452         /// Rebind template arguments
453         template <typename GC2, typename T2, typename Traits2>
454         struct rebind {
455             typedef BasketQueue< GC2, T2, Traits2> other   ;   ///< Rebinding result
456         };
457
458         static CDS_CONSTEXPR const size_t c_nHazardPtrCount = 6 ; ///< Count of hazard pointer required for the algorithm
459
460     protected:
461         //@cond
462         typedef typename node_type::marked_ptr   marked_ptr;
463         typedef typename node_type::atomic_marked_ptr atomic_marked_ptr;
464
465         // GC and node_type::gc must be the same
466         static_assert( std::is_same<gc, typename node_type::gc>::value, "GC and node_type::gc must be the same");
467         //@endcond
468
469         atomic_marked_ptr    m_pHead ;           ///< Queue's head pointer (aligned)
470         //@cond
471         typename opt::details::apply_padding< atomic_marked_ptr, traits::padding >::padding_type pad1_;
472         //@endcond
473         atomic_marked_ptr    m_pTail ;           ///< Queue's tail pointer (aligned)
474         //@cond
475         typename opt::details::apply_padding< atomic_marked_ptr, traits::padding >::padding_type pad2_;
476         //@endcond
477         node_type           m_Dummy ;           ///< dummy node
478         //@cond
479         typename opt::details::apply_padding< node_type, traits::padding >::padding_type pad3_;
480         //@endcond
481         item_counter        m_ItemCounter   ;   ///< Item counter
482         stat                m_Stat  ;           ///< Internal statistics
483         //@cond
484         size_t const        m_nMaxHops;
485         //@endcond
486
487         //@cond
488
489         struct dequeue_result {
490             typename gc::template GuardArray<3>  guards;
491             node_type * pNext;
492         };
493
494         bool do_dequeue( dequeue_result& res, bool bDeque )
495         {
496             // Note:
497             // If bDeque == false then the function is called from empty method and no real dequeuing operation is performed
498
499             back_off bkoff;
500
501             marked_ptr h;
502             marked_ptr t;
503             marked_ptr pNext;
504
505             while ( true ) {
506                 h = res.guards.protect( 0, m_pHead, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
507                 t = res.guards.protect( 1, m_pTail, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
508                 pNext = res.guards.protect( 2, h->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
509
510                 if ( h == m_pHead.load( memory_model::memory_order_acquire )) {
511                     if ( h.ptr() == t.ptr()) {
512                         if ( !pNext.ptr()) {
513                             m_Stat.onEmptyDequeue();
514                             return false;
515                         }
516
517                         {
518                             typename gc::Guard g;
519                             while ( pNext->m_pNext.load(memory_model::memory_order_relaxed).ptr() && m_pTail.load(memory_model::memory_order_relaxed) == t ) {
520                                 pNext = g.protect( pNext->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
521                                 res.guards.copy( 2, g );
522                             }
523                         }
524
525                         m_pTail.compare_exchange_weak( t, marked_ptr(pNext.ptr()), memory_model::memory_order_acquire, atomics::memory_order_relaxed );
526                     }
527                     else {
528                         marked_ptr iter( h );
529                         size_t hops = 0;
530
531                         typename gc::Guard g;
532
533                         while ( pNext.ptr() && pNext.bits() && iter.ptr() != t.ptr() && m_pHead.load(memory_model::memory_order_relaxed) == h ) {
534                             iter = pNext;
535                             g.assign( res.guards.template get<value_type>(2));
536                             pNext = res.guards.protect( 2, pNext->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
537                             ++hops;
538                         }
539
540                         if ( m_pHead.load(memory_model::memory_order_relaxed) != h )
541                             continue;
542
543                         if ( iter.ptr() == t.ptr())
544                             free_chain( h, iter );
545                         else if ( bDeque ) {
546                             res.pNext = pNext.ptr();
547
548                             if ( iter->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNext.ptr(), 1 ), memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
549                                 if ( hops >= m_nMaxHops )
550                                     free_chain( h, pNext );
551                                 break;
552                             }
553                         }
554                         else
555                             return true;
556                     }
557                 }
558
559                 if ( bDeque )
560                     m_Stat.onDequeueRace();
561                 bkoff();
562             }
563
564             if ( bDeque ) {
565                 --m_ItemCounter;
566                 m_Stat.onDequeue();
567             }
568
569             return true;
570         }
571
572         void free_chain( marked_ptr head, marked_ptr newHead )
573         {
574             // "head" and "newHead" are guarded
575
576             if ( m_pHead.compare_exchange_strong( head, marked_ptr(newHead.ptr()), memory_model::memory_order_release, atomics::memory_order_relaxed ))
577             {
578                 typename gc::template GuardArray<2> guards;
579                 guards.assign( 0, node_traits::to_value_ptr(head.ptr()));
580                 while ( head.ptr() != newHead.ptr()) {
581                     marked_ptr pNext = guards.protect( 1, head->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
582                     assert( pNext.bits() != 0 );
583                     dispose_node( head.ptr());
584                     guards.copy( 0, 1 );
585                     head = pNext;
586                 }
587             }
588         }
589
590         static void clear_links( node_type * pNode )
591         {
592             pNode->m_pNext.store( marked_ptr( nullptr ), memory_model::memory_order_release );
593         }
594
595         void dispose_node( node_type * p )
596         {
597             if ( p != &m_Dummy ) {
598                 struct internal_disposer
599                 {
600                     void operator()( value_type * p )
601                     {
602                         assert( p != nullptr );
603                         BasketQueue::clear_links( node_traits::to_node_ptr( p ));
604                         disposer()(p);
605                     }
606                 };
607                 gc::template retire<internal_disposer>( node_traits::to_value_ptr(p));
608             }
609         }
610         //@endcond
611
612     public:
613         /// Initializes empty queue
614         BasketQueue()
615             : m_pHead( &m_Dummy )
616             , m_pTail( &m_Dummy )
617             , m_nMaxHops( 3 )
618         {}
619
620         /// Destructor clears the queue
621         /**
622             Since the baskets queue contains at least one item even
623             if the queue is empty, the destructor may call item disposer.
624         */
625         ~BasketQueue()
626         {
627             clear();
628
629             node_type * pHead = m_pHead.load(memory_model::memory_order_relaxed).ptr();
630             assert( pHead != nullptr );
631
632             {
633                 node_type * pNext = pHead->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
634                 while ( pNext ) {
635                     node_type * p = pNext;
636                     pNext = pNext->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
637                     p->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
638                     dispose_node( p );
639                 }
640                 pHead->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
641                 //m_pTail.store( marked_ptr( pHead ), memory_model::memory_order_relaxed );
642             }
643
644             m_pHead.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
645             m_pTail.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
646
647             dispose_node( pHead );
648         }
649
650         /// Enqueues \p val value into the queue.
651         /** @anchor cds_intrusive_BasketQueue_enqueue
652             The function always returns \p true.
653         */
654         bool enqueue( value_type& val )
655         {
656             node_type * pNew = node_traits::to_node_ptr( val );
657             link_checker::is_empty( pNew );
658
659             typename gc::Guard guard;
660             typename gc::Guard gNext;
661             back_off bkoff;
662
663             marked_ptr t;
664             while ( true ) {
665                 t = guard.protect( m_pTail, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
666
667                 marked_ptr pNext = t->m_pNext.load(memory_model::memory_order_relaxed );
668
669                 if ( pNext.ptr() == nullptr ) {
670                     pNew->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
671                     if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr(pNew), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
672                         if ( !m_pTail.compare_exchange_strong( t, marked_ptr(pNew), memory_model::memory_order_release, atomics::memory_order_relaxed ))
673                             m_Stat.onAdvanceTailFailed();
674                         break;
675                     }
676
677                     // Try adding to basket
678                     m_Stat.onTryAddBasket();
679
680                     // Reread tail next
681                 try_again:
682                     pNext = gNext.protect( t->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
683
684                     // add to the basket
685                     if ( m_pTail.load( memory_model::memory_order_relaxed ) == t
686                          && t->m_pNext.load( memory_model::memory_order_relaxed) == pNext
687                          && !pNext.bits())
688                     {
689                         bkoff();
690                         pNew->m_pNext.store( pNext, memory_model::memory_order_relaxed );
691                         if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNew ), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
692                             m_Stat.onAddBasket();
693                             break;
694                         }
695                         goto try_again;
696                     }
697                 }
698                 else {
699                     // Tail is misplaced, advance it
700
701                     typename gc::template GuardArray<2> g;
702                     g.assign( 0, node_traits::to_value_ptr( pNext.ptr()));
703                     if ( m_pTail.load( memory_model::memory_order_acquire ) != t
704                       || t->m_pNext.load( memory_model::memory_order_relaxed ) != pNext )
705                     {
706                         m_Stat.onEnqueueRace();
707                         bkoff();
708                         continue;
709                     }
710
711                     marked_ptr p;
712                     bool bTailOk = true;
713                     while ( (p = pNext->m_pNext.load( memory_model::memory_order_acquire )).ptr() != nullptr )
714                     {
715                         bTailOk = m_pTail.load( memory_model::memory_order_relaxed ) == t;
716                         if ( !bTailOk )
717                             break;
718
719                         g.assign( 1, node_traits::to_value_ptr( p.ptr()));
720                         if ( pNext->m_pNext.load( memory_model::memory_order_relaxed ) != p )
721                             continue;
722                         pNext = p;
723                         g.assign( 0, g.template get<value_type>( 1 ));
724                     }
725                     if ( !bTailOk || !m_pTail.compare_exchange_weak( t, marked_ptr( pNext.ptr()), memory_model::memory_order_release, atomics::memory_order_relaxed ))
726                         m_Stat.onAdvanceTailFailed();
727
728                     m_Stat.onBadTail();
729                 }
730
731                 m_Stat.onEnqueueRace();
732             }
733
734             ++m_ItemCounter;
735             m_Stat.onEnqueue();
736
737             return true;
738         }
739
740         /// Synonym for \p enqueue() function
741         bool push( value_type& val )
742         {
743             return enqueue( val );
744         }
745
746         /// Dequeues a value from the queue
747         /** @anchor cds_intrusive_BasketQueue_dequeue
748             If the queue is empty the function returns \p nullptr.
749
750             @note See \p MSQueue::dequeue() note about item disposing
751         */
752         value_type * dequeue()
753         {
754             dequeue_result res;
755
756             if ( do_dequeue( res, true ))
757                 return node_traits::to_value_ptr( *res.pNext );
758             return nullptr;
759         }
760
761         /// Synonym for \p dequeue() function
762         value_type * pop()
763         {
764             return dequeue();
765         }
766
767         /// Checks if the queue is empty
768         /**
769             Note that this function is not \p const.
770             The function is based on \p dequeue() algorithm
771             but really it does not dequeue any item.
772         */
773         bool empty()
774         {
775             dequeue_result res;
776             return !do_dequeue( res, false );
777         }
778
779         /// Clear the queue
780         /**
781             The function repeatedly calls \p dequeue() until it returns \p nullptr.
782             The disposer defined in template \p Traits is called for each item
783             that can be safely disposed.
784         */
785         void clear()
786         {
787             while ( dequeue());
788         }
789
790         /// Returns queue's item count
791         /**
792             The value returned depends on \p Traits (see basket_queue::traits::item_counter). For \p atomicity::empty_item_counter,
793             this function always returns 0.
794
795             @note Even if you use real item counter and it returns 0, this fact is not mean that the queue
796             is empty. To check queue emptyness use \p empty() method.
797         */
798         size_t size() const
799         {
800             return m_ItemCounter.value();
801         }
802
803         /// Returns reference to internal statistics
804         const stat& statistics() const
805         {
806             return m_Stat;
807         }
808     };
809
810 }} // namespace cds::intrusive
811
812 #endif // #ifndef CDSLIB_INTRUSIVE_BASKET_QUEUE_H