Updated copyright
[libcds.git] / cds / intrusive / basket_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_INTRUSIVE_BASKET_QUEUE_H
32 #define CDSLIB_INTRUSIVE_BASKET_QUEUE_H
33
34 #include <type_traits>
35 #include <cds/intrusive/details/single_link_struct.h>
36 #include <cds/details/marked_ptr.h>
37
38 namespace cds { namespace intrusive {
39
40     /// BasketQueue -related definitions
41     /** @ingroup cds_intrusive_helper
42     */
43     namespace basket_queue {
44         /// BasketQueue node
45         /**
46             Template parameters:
47             Template parameters:
48             - GC - garbage collector used
49             - Tag - a \ref cds_intrusive_hook_tag "tag"
50             */
51         template <class GC, typename Tag = opt::none>
52         struct node
53         {
54             typedef GC      gc  ;   ///< Garbage collector
55             typedef Tag     tag ;   ///< tag
56
57             typedef cds::details::marked_ptr<node, 1>                    marked_ptr;        ///< marked pointer
58             typedef typename gc::template atomic_marked_ptr< marked_ptr> atomic_marked_ptr; ///< atomic marked pointer specific for GC
59
60             /// Rebind node for other template parameters
61             template <class GC2, typename Tag2 = tag>
62             struct rebind {
63                 typedef node<GC2, Tag2>  other ;    ///< Rebinding result
64             };
65
66             atomic_marked_ptr m_pNext ; ///< pointer to the next node in the container
67
68             node()
69                 : m_pNext( nullptr )
70             {}
71         };
72
73         using cds::intrusive::single_link::default_hook;
74
75         //@cond
76         template < typename HookType, typename... Options>
77         struct hook
78         {
79             typedef typename opt::make_options< default_hook, Options...>::type  options;
80             typedef typename options::gc    gc;
81             typedef typename options::tag   tag;
82             typedef node<gc, tag> node_type;
83             typedef HookType     hook_type;
84         };
85         //@endcond
86
87
88         /// Base hook
89         /**
90             \p Options are:
91             - opt::gc - garbage collector used.
92             - opt::tag - a \ref cds_intrusive_hook_tag "tag"
93         */
94         template < typename... Options >
95         struct base_hook: public hook< opt::base_hook_tag, Options... >
96         {};
97
98         /// Member hook
99         /**
100             \p MemberOffset defines offset in bytes of \ref node member into your structure.
101             Use \p offsetof macro to define \p MemberOffset
102
103             \p Options are:
104             - opt::gc - garbage collector used.
105             - opt::tag - a \ref cds_intrusive_hook_tag "tag"
106         */
107         template < size_t MemberOffset, typename... Options >
108         struct member_hook: public hook< opt::member_hook_tag, Options... >
109         {
110             //@cond
111             static const size_t c_nMemberOffset = MemberOffset;
112             //@endcond
113         };
114
115         /// Traits hook
116         /**
117             \p NodeTraits defines type traits for node.
118             See \ref node_traits for \p NodeTraits interface description
119
120             \p Options are:
121             - opt::gc - garbage collector used.
122             - opt::tag - a \ref cds_intrusive_hook_tag "tag"
123         */
124         template <typename NodeTraits, typename... Options >
125         struct traits_hook: public hook< opt::traits_hook_tag, Options... >
126         {
127             //@cond
128             typedef NodeTraits node_traits;
129             //@endcond
130         };
131
132         /// BasketQueue internal statistics. May be used for debugging or profiling
133         /**
134             Template argument \p Counter defines type of counter.
135             Default is \p cds::atomicity::event_counter, that is weak, i.e. it is not guaranteed
136             strict event counting.
137             You may use stronger type of counter like as \p cds::atomicity::item_counter,
138             or even integral type, for example, \p int.
139         */
140         template <typename Counter = cds::atomicity::event_counter >
141         struct stat
142         {
143             typedef Counter counter_type;   ///< Counter type
144
145             counter_type m_EnqueueCount;    ///< Enqueue call count
146             counter_type m_DequeueCount;    ///< Dequeue call count
147             counter_type m_EnqueueRace;     ///< Count of enqueue race conditions encountered
148             counter_type m_DequeueRace;     ///< Count of dequeue race conditions encountered
149             counter_type m_AdvanceTailError;///< Count of "advance tail failed" events
150             counter_type m_BadTail;         ///< Count of events "Tail is not pointed to the last item in the queue"
151             counter_type m_TryAddBasket;    ///< Count of attemps adding new item to a basket (only or BasketQueue, for other queue this metric is not used)
152             counter_type m_AddBasketCount;  ///< Count of events "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
153             counter_type m_EmptyDequeue;    ///< Count of dequeue from empty queue
154
155             /// Register enqueue call
156             void onEnqueue()                { ++m_EnqueueCount; }
157             /// Register dequeue call
158             void onDequeue()                { ++m_DequeueCount; }
159             /// Register enqueue race event
160             void onEnqueueRace()            { ++m_EnqueueRace; }
161             /// Register dequeue race event
162             void onDequeueRace()            { ++m_DequeueRace; }
163             /// Register "advance tail failed" event
164             void onAdvanceTailFailed()      { ++m_AdvanceTailError; }
165             /// Register event "Tail is not pointed to last item in the queue"
166             void onBadTail()                { ++m_BadTail; }
167             /// Register an attempt t add new item to basket
168             void onTryAddBasket()           { ++m_TryAddBasket; }
169             /// Register event "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
170             void onAddBasket()              { ++m_AddBasketCount; }
171             /// Register dequeuing from empty queue
172             void onEmptyDequeue()           { ++m_EmptyDequeue; }
173
174
175             //@cond
176             void reset()
177             {
178                 m_EnqueueCount.reset();
179                 m_DequeueCount.reset();
180                 m_EnqueueRace.reset();
181                 m_DequeueRace.reset();
182                 m_AdvanceTailError.reset();
183                 m_BadTail.reset();
184                 m_TryAddBasket.reset();
185                 m_AddBasketCount.reset();
186                 m_EmptyDequeue.reset();
187             }
188
189             stat& operator +=( stat const& s )
190             {
191                 m_EnqueueCount  += s.m_EnqueueCount.get();
192                 m_DequeueCount  += s.m_DequeueCount.get();
193                 m_EnqueueRace   += s.m_EnqueueRace.get();
194                 m_DequeueRace   += s.m_DequeueRace.get();
195                 m_AdvanceTailError += s.m_AdvanceTailError.get();
196                 m_BadTail       += s.m_BadTail.get();
197                 m_TryAddBasket  += s.m_TryAddBasket.get();
198                 m_AddBasketCount += s.m_AddBasketCount.get();
199                 m_EmptyDequeue  += s.m_EmptyDequeue.get();
200                 return *this;
201             }
202             //@endcond
203         };
204
205         /// Dummy BasketQueue statistics - no counting is performed, no overhead. Support interface like \p basket_queue::stat
206         struct empty_stat
207         {
208             //@cond
209             void onEnqueue()            const {}
210             void onDequeue()            const {}
211             void onEnqueueRace()        const {}
212             void onDequeueRace()        const {}
213             void onAdvanceTailFailed()  const {}
214             void onBadTail()            const {}
215             void onTryAddBasket()       const {}
216             void onAddBasket()          const {}
217             void onEmptyDequeue()       const {}
218
219             void reset() {}
220             empty_stat& operator +=( empty_stat const& )
221             {
222                 return *this;
223             }
224             //@endcond
225         };
226
227         /// BasketQueue default type traits
228         struct traits
229         {
230             /// Back-off strategy
231             typedef cds::backoff::empty             back_off;
232
233             /// Hook, possible types are \p basket_queue::base_hook, \p basket_queue::member_hook, \p basket_queue::traits_hook
234             typedef basket_queue::base_hook<>       hook;
235
236             /// The functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used for dequeuing
237             typedef opt::v::empty_disposer          disposer;
238
239             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
240             typedef atomicity::empty_item_counter   item_counter;
241
242             /// Internal statistics (by default, disabled)
243             /**
244                 Possible option value are: \p basket_queue::stat, \p basket_queue::empty_stat (the default),
245                 user-provided class that supports \p %basket_queue::stat interface.
246             */
247             typedef basket_queue::empty_stat        stat;
248
249             /// C++ memory ordering model
250             /**
251                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
252                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
253             */
254             typedef opt::v::relaxed_ordering        memory_model;
255
256             /// Link checking, see \p cds::opt::link_checker
257             static CDS_CONSTEXPR const opt::link_check_type link_checker = opt::debug_check_link;
258
259             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
260             enum { padding = opt::cache_line_padding };
261         };
262
263
264         /// Metafunction converting option list to \p basket_queue::traits
265         /**
266             Supported \p Options are:
267             - \p opt::hook - hook used. Possible hooks are: \p basket_queue::base_hook, \p basket_queue::member_hook, \p basket_queue::traits_hook.
268                 If the option is not specified, \p %basket_queue::base_hook<> is used.
269             - \p opt::back_off - back-off strategy used, default is \p cds::backoff::empty.
270             - \p opt::disposer - the functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used
271                 when dequeuing.
272             - \p opt::link_checker - the type of node's link fields checking. Default is \p opt::debug_check_link
273             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
274                 To enable item counting use \p cds::atomicity::item_counter
275             - \p opt::stat - the type to gather internal statistics.
276                 Possible statistics types are: \p basket_queue::stat, \p basket_queue::empty_stat, user-provided class that supports \p %basket_queue::stat interface.
277                 Default is \p %basket_queue::empty_stat (internal statistics disabled).
278             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
279             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
280                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
281
282             Example: declare \p %BasketQueue with item counting and internal statistics
283             \code
284             typedef cds::intrusive::BasketQueue< cds::gc::HP, Foo,
285                 typename cds::intrusive::basket_queue::make_traits<
286                     cds::intrusive::opt:hook< cds::intrusive::basket_queue::base_hook< cds::opt::gc<cds:gc::HP> >>,
287                     cds::opt::item_counte< cds::atomicity::item_counter >,
288                     cds::opt::stat< cds::intrusive::basket_queue::stat<> >
289                 >::type
290             > myQueue;
291             \endcode
292         */
293         template <typename... Options>
294         struct make_traits {
295 #   ifdef CDS_DOXYGEN_INVOKED
296             typedef implementation_defined type;   ///< Metafunction result
297 #   else
298             typedef typename cds::opt::make_options<
299                 typename cds::opt::find_type_traits< traits, Options... >::type
300                 , Options...
301             >::type type;
302 #   endif
303         };
304     }   // namespace basket_queue
305
306     /// Basket lock-free queue (intrusive variant)
307     /** @ingroup cds_intrusive_queue
308         Implementation of basket queue algorithm.
309
310         \par Source:
311             [2007] Moshe Hoffman, Ori Shalev, Nir Shavit "The Baskets Queue"
312
313         <b>Key idea</b>
314
315         In the 'basket' approach, instead of
316         the traditional ordered list of nodes, the queue consists of an ordered list of groups
317         of nodes (logical baskets). The order of nodes in each basket need not be specified, and in
318         fact, it is easiest to maintain them in FIFO order. The baskets fulfill the following basic
319         rules:
320         - Each basket has a time interval in which all its nodes' enqueue operations overlap.
321         - The baskets are ordered by the order of their respective time intervals.
322         - For each basket, its nodes' dequeue operations occur after its time interval.
323         - The dequeue operations are performed according to the order of baskets.
324
325         Two properties define the FIFO order of nodes:
326         - The order of nodes in a basket is not specified.
327         - The order of nodes in different baskets is the FIFO-order of their respective baskets.
328
329         In algorithms such as the MS-queue or optimistic
330         queue, threads enqueue items by applying a Compare-and-swap (CAS) operation to the
331         queue's tail pointer, and all the threads that fail on a particular CAS operation (and also
332         the winner of that CAS) overlap in time. In particular, they share the time interval of
333         the CAS operation itself. Hence, all the threads that fail to CAS on the tail-node of
334         the queue may be inserted into the same basket. By integrating the basket-mechanism
335         as the back-off mechanism, the time usually spent on backing-off before trying to link
336         onto the new tail, can now be utilized to insert the failed operations into the basket,
337         allowing enqueues to complete sooner. In the meantime, the next successful CAS operations
338         by enqueues allow new baskets to be formed down the list, and these can be
339         filled concurrently. Moreover, the failed operations don't retry their link attempt on the
340         new tail, lowering the overall contention on it. This leads to a queue
341         algorithm that unlike all former concurrent queue algorithms requires virtually no tuning
342         of the backoff mechanisms to reduce contention, making the algorithm an attractive
343         out-of-the-box queue.
344
345         In order to enqueue, just as in \p MSQueue, a thread first tries to link the new node to
346         the last node. If it failed to do so, then another thread has already succeeded. Thus it
347         tries to insert the new node into the new basket that was created by the winner thread.
348         To dequeue a node, a thread first reads the head of the queue to obtain the
349         oldest basket. It may then dequeue any node in the oldest basket.
350
351         <b>Template arguments:</b>
352         - \p GC - garbage collector type: \p gc::HP, \p gc::DHP
353         - \p T - type of value to be stored in the queue
354         - \p Traits - queue traits, default is \p basket_queue::traits. You can use \p basket_queue::make_traits
355             metafunction to make your traits or just derive your traits from \p %basket_queue::traits:
356             \code
357             struct myTraits: public cds::intrusive::basket_queue::traits {
358                 typedef cds::intrusive::basket_queue::stat<> stat;
359                 typedef cds::atomicity::item_counter    item_counter;
360             };
361             typedef cds::intrusive::BasketQueue< cds::gc::HP, Foo, myTraits > myQueue;
362
363             // Equivalent make_traits example:
364             typedef cds::intrusive::BasketQueue< cds::gc::HP, Foo,
365                 typename cds::intrusive::basket_queue::make_traits<
366                     cds::opt::stat< cds::intrusive::basket_queue::stat<> >,
367                     cds::opt::item_counter< cds::atomicity::item_counter >
368                 >::type
369             > myQueue;
370             \endcode
371
372         Garbage collecting schema \p GC must be consistent with the \p basket_queue::node GC.
373
374         \par About item disposing
375         Like \p MSQueue, the Baskets queue algo has a key feature: even if the queue is empty it contains one item that is "dummy" one from
376         the standpoint of the algo. See \p dequeue() function doc for explanation.
377
378         \par Examples
379         \code
380         #include <cds/intrusive/basket_queue.h>
381         #include <cds/gc/hp.h>
382
383         namespace ci = cds::inrtusive;
384         typedef cds::gc::HP hp_gc;
385
386         // Basket queue with Hazard Pointer garbage collector, base hook + item disposer:
387         struct Foo: public ci::basket_queue::node< hp_gc >
388         {
389             // Your data
390             ...
391         };
392
393         // Disposer for Foo struct just deletes the object passed in
394         struct fooDisposer {
395             void operator()( Foo * p )
396             {
397                 delete p;
398             }
399         };
400
401         struct fooTraits: public ci::basket_queue::traits {
402             typedef ci::basket_queue::base_hook< ci::opt::gc<hp_gc> > hook;
403             typedef fooDisposer disposer;
404         };
405         typedef ci::BasketQueue< hp_gc, Foo, fooTraits > fooQueue;
406
407         // BasketQueue with Hazard Pointer garbage collector,
408         // member hook + item disposer + item counter,
409         // without padding of internal queue data:
410         struct Bar
411         {
412             // Your data
413             ...
414             ci::basket_queue::node< hp_gc > hMember;
415         };
416
417         struct barTraits: public
418             ci::basket_queue::make_traits<
419                 ci::opt::hook<
420                     ci::basket_queue::member_hook<
421                         offsetof(Bar, hMember)
422                         ,ci::opt::gc<hp_gc>
423                     >
424                 >
425                 ,ci::opt::disposer< fooDisposer >
426                 ,cds::opt::item_counter< cds::atomicity::item_counter >
427                 ,cds::opt::padding< cds::opt::no_special_padding >
428             >::type
429         {};
430         typedef ci::BasketQueue< hp_gc, Bar, barTraits > barQueue;
431         \endcode
432     */
433     template <typename GC, typename T, typename Traits = basket_queue::traits >
434     class BasketQueue
435     {
436     public:
437         typedef GC gc;          ///< Garbage collector
438         typedef T  value_type;  ///< type of value stored in the queue
439         typedef Traits traits;  ///< Queue traits
440         typedef typename traits::hook       hook;       ///< hook type
441         typedef typename hook::node_type    node_type;  ///< node type
442         typedef typename traits::disposer   disposer;   ///< disposer used
443         typedef typename get_node_traits< value_type, node_type, hook>::type node_traits;   ///< node traits
444         typedef typename single_link::get_link_checker< node_type, traits::link_checker >::type link_checker;   ///< link checker
445
446         typedef typename traits::back_off       back_off;     ///< back-off strategy
447         typedef typename traits::item_counter   item_counter; ///< Item counting policy used
448         typedef typename traits::stat           stat;         ///< Internal statistics policy used
449         typedef typename traits::memory_model   memory_model; ///< Memory ordering. See cds::opt::memory_model option
450
451         /// Rebind template arguments
452         template <typename GC2, typename T2, typename Traits2>
453         struct rebind {
454             typedef BasketQueue< GC2, T2, Traits2> other   ;   ///< Rebinding result
455         };
456
457         static CDS_CONSTEXPR const size_t c_nHazardPtrCount = 6 ; ///< Count of hazard pointer required for the algorithm
458
459     protected:
460         //@cond
461         typedef typename node_type::marked_ptr   marked_ptr;
462         typedef typename node_type::atomic_marked_ptr atomic_marked_ptr;
463
464         // GC and node_type::gc must be the same
465         static_assert( std::is_same<gc, typename node_type::gc>::value, "GC and node_type::gc must be the same");
466         //@endcond
467
468         atomic_marked_ptr    m_pHead ;           ///< Queue's head pointer (aligned)
469         //@cond
470         typename opt::details::apply_padding< atomic_marked_ptr, traits::padding >::padding_type pad1_;
471         //@endcond
472         atomic_marked_ptr    m_pTail ;           ///< Queue's tail pointer (aligned)
473         //@cond
474         typename opt::details::apply_padding< atomic_marked_ptr, traits::padding >::padding_type pad2_;
475         //@endcond
476         node_type           m_Dummy ;           ///< dummy node
477         //@cond
478         typename opt::details::apply_padding< node_type, traits::padding >::padding_type pad3_;
479         //@endcond
480         item_counter        m_ItemCounter   ;   ///< Item counter
481         stat                m_Stat  ;           ///< Internal statistics
482         //@cond
483         size_t const        m_nMaxHops;
484         //@endcond
485
486         //@cond
487
488         struct dequeue_result {
489             typename gc::template GuardArray<3>  guards;
490             node_type * pNext;
491         };
492
493         bool do_dequeue( dequeue_result& res, bool bDeque )
494         {
495             // Note:
496             // If bDeque == false then the function is called from empty method and no real dequeuing operation is performed
497
498             back_off bkoff;
499
500             marked_ptr h;
501             marked_ptr t;
502             marked_ptr pNext;
503
504             while ( true ) {
505                 h = res.guards.protect( 0, m_pHead, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
506                 t = res.guards.protect( 1, m_pTail, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
507                 pNext = res.guards.protect( 2, h->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
508
509                 if ( h == m_pHead.load( memory_model::memory_order_acquire )) {
510                     if ( h.ptr() == t.ptr()) {
511                         if ( !pNext.ptr()) {
512                             m_Stat.onEmptyDequeue();
513                             return false;
514                         }
515
516                         {
517                             typename gc::Guard g;
518                             while ( pNext->m_pNext.load(memory_model::memory_order_relaxed).ptr() && m_pTail.load(memory_model::memory_order_relaxed) == t ) {
519                                 pNext = g.protect( pNext->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
520                                 res.guards.copy( 2, g );
521                             }
522                         }
523
524                         m_pTail.compare_exchange_weak( t, marked_ptr(pNext.ptr()), memory_model::memory_order_acquire, atomics::memory_order_relaxed );
525                     }
526                     else {
527                         marked_ptr iter( h );
528                         size_t hops = 0;
529
530                         typename gc::Guard g;
531
532                         while ( pNext.ptr() && pNext.bits() && iter.ptr() != t.ptr() && m_pHead.load(memory_model::memory_order_relaxed) == h ) {
533                             iter = pNext;
534                             g.assign( res.guards.template get<value_type>(2));
535                             pNext = res.guards.protect( 2, pNext->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
536                             ++hops;
537                         }
538
539                         if ( m_pHead.load(memory_model::memory_order_relaxed) != h )
540                             continue;
541
542                         if ( iter.ptr() == t.ptr())
543                             free_chain( h, iter );
544                         else if ( bDeque ) {
545                             res.pNext = pNext.ptr();
546
547                             if ( iter->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNext.ptr(), 1 ), memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
548                                 if ( hops >= m_nMaxHops )
549                                     free_chain( h, pNext );
550                                 break;
551                             }
552                         }
553                         else
554                             return true;
555                     }
556                 }
557
558                 if ( bDeque )
559                     m_Stat.onDequeueRace();
560                 bkoff();
561             }
562
563             if ( bDeque ) {
564                 --m_ItemCounter;
565                 m_Stat.onDequeue();
566             }
567
568             return true;
569         }
570
571         void free_chain( marked_ptr head, marked_ptr newHead )
572         {
573             // "head" and "newHead" are guarded
574
575             if ( m_pHead.compare_exchange_strong( head, marked_ptr(newHead.ptr()), memory_model::memory_order_release, atomics::memory_order_relaxed ))
576             {
577                 typename gc::template GuardArray<2> guards;
578                 guards.assign( 0, node_traits::to_value_ptr(head.ptr()));
579                 while ( head.ptr() != newHead.ptr()) {
580                     marked_ptr pNext = guards.protect( 1, head->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
581                     assert( pNext.bits() != 0 );
582                     dispose_node( head.ptr());
583                     guards.copy( 0, 1 );
584                     head = pNext;
585                 }
586             }
587         }
588
589         static void clear_links( node_type * pNode )
590         {
591             pNode->m_pNext.store( marked_ptr( nullptr ), memory_model::memory_order_release );
592         }
593
594         void dispose_node( node_type * p )
595         {
596             if ( p != &m_Dummy ) {
597                 struct internal_disposer
598                 {
599                     void operator()( value_type * p )
600                     {
601                         assert( p != nullptr );
602                         BasketQueue::clear_links( node_traits::to_node_ptr( p ));
603                         disposer()(p);
604                     }
605                 };
606                 gc::template retire<internal_disposer>( node_traits::to_value_ptr(p));
607             }
608         }
609         //@endcond
610
611     public:
612         /// Initializes empty queue
613         BasketQueue()
614             : m_pHead( &m_Dummy )
615             , m_pTail( &m_Dummy )
616             , m_nMaxHops( 3 )
617         {}
618
619         /// Destructor clears the queue
620         /**
621             Since the baskets queue contains at least one item even
622             if the queue is empty, the destructor may call item disposer.
623         */
624         ~BasketQueue()
625         {
626             clear();
627
628             node_type * pHead = m_pHead.load(memory_model::memory_order_relaxed).ptr();
629             assert( pHead != nullptr );
630
631             {
632                 node_type * pNext = pHead->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
633                 while ( pNext ) {
634                     node_type * p = pNext;
635                     pNext = pNext->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
636                     p->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
637                     dispose_node( p );
638                 }
639                 pHead->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
640                 //m_pTail.store( marked_ptr( pHead ), memory_model::memory_order_relaxed );
641             }
642
643             m_pHead.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
644             m_pTail.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
645
646             dispose_node( pHead );
647         }
648
649         /// Enqueues \p val value into the queue.
650         /** @anchor cds_intrusive_BasketQueue_enqueue
651             The function always returns \p true.
652         */
653         bool enqueue( value_type& val )
654         {
655             node_type * pNew = node_traits::to_node_ptr( val );
656             link_checker::is_empty( pNew );
657
658             typename gc::Guard guard;
659             typename gc::Guard gNext;
660             back_off bkoff;
661
662             marked_ptr t;
663             while ( true ) {
664                 t = guard.protect( m_pTail, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
665
666                 marked_ptr pNext = t->m_pNext.load(memory_model::memory_order_acquire );
667
668                 if ( pNext.ptr() == nullptr ) {
669                     pNew->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
670                     if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr(pNew), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
671                         if ( !m_pTail.compare_exchange_strong( t, marked_ptr(pNew), memory_model::memory_order_release, atomics::memory_order_acquire ))
672                             m_Stat.onAdvanceTailFailed();
673                         break;
674                     }
675
676                     // Try adding to basket
677                     m_Stat.onTryAddBasket();
678
679                     // Reread tail next
680                 try_again:
681                     pNext = gNext.protect( t->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
682
683                     // add to the basket
684                     if ( m_pTail.load(memory_model::memory_order_acquire) == t
685                          && t->m_pNext.load( memory_model::memory_order_relaxed) == pNext
686                          && !pNext.bits())
687                     {
688                         bkoff();
689                         pNew->m_pNext.store( pNext, memory_model::memory_order_relaxed );
690                         if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNew ), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
691                             m_Stat.onAddBasket();
692                             break;
693                         }
694                         goto try_again;
695                     }
696                 }
697                 else {
698                     // Tail is misplaced, advance it
699
700                     typename gc::template GuardArray<2> g;
701                     g.assign( 0, node_traits::to_value_ptr( pNext.ptr()));
702                     if ( m_pTail.load( memory_model::memory_order_acquire ) != t
703
704                       || t->m_pNext.load( memory_model::memory_order_relaxed ) != pNext )
705
706                     {
707                         m_Stat.onEnqueueRace();
708                         bkoff();
709                         continue;
710                     }
711
712                     marked_ptr p;
713                     bool bTailOk = true;
714                     while ( (p = pNext->m_pNext.load( memory_model::memory_order_relaxed )).ptr() != nullptr )
715                     {
716                         bTailOk = m_pTail.load( memory_model::memory_order_acquire ) == t;
717                         if ( !bTailOk )
718                             break;
719
720                         g.assign( 1, node_traits::to_value_ptr( p.ptr()));
721                         if ( pNext->m_pNext.load(memory_model::memory_order_acquire) != p )
722                             continue;
723                         pNext = p;
724                         g.assign( 0, g.template get<value_type>( 1 ));
725                     }
726                     if ( !bTailOk || !m_pTail.compare_exchange_weak( t, marked_ptr( pNext.ptr()), memory_model::memory_order_release, atomics::memory_order_relaxed ))
727                         m_Stat.onAdvanceTailFailed();
728
729                     m_Stat.onBadTail();
730                 }
731
732                 m_Stat.onEnqueueRace();
733             }
734
735             ++m_ItemCounter;
736             m_Stat.onEnqueue();
737
738             return true;
739         }
740
741         /// Synonym for \p enqueue() function
742         bool push( value_type& val )
743         {
744             return enqueue( val );
745         }
746
747         /// Dequeues a value from the queue
748         /** @anchor cds_intrusive_BasketQueue_dequeue
749             If the queue is empty the function returns \p nullptr.
750
751             @note See \p MSQueue::dequeue() note about item disposing
752         */
753         value_type * dequeue()
754         {
755             dequeue_result res;
756
757             if ( do_dequeue( res, true ))
758                 return node_traits::to_value_ptr( *res.pNext );
759             return nullptr;
760         }
761
762         /// Synonym for \p dequeue() function
763         value_type * pop()
764         {
765             return dequeue();
766         }
767
768         /// Checks if the queue is empty
769         /**
770             Note that this function is not \p const.
771             The function is based on \p dequeue() algorithm
772             but really it does not dequeue any item.
773         */
774         bool empty()
775         {
776             dequeue_result res;
777             return !do_dequeue( res, false );
778         }
779
780         /// Clear the queue
781         /**
782             The function repeatedly calls \p dequeue() until it returns \p nullptr.
783             The disposer defined in template \p Traits is called for each item
784             that can be safely disposed.
785         */
786         void clear()
787         {
788             while ( dequeue());
789         }
790
791         /// Returns queue's item count
792         /**
793             The value returned depends on \p Traits (see basket_queue::traits::item_counter). For \p atomicity::empty_item_counter,
794             this function always returns 0.
795
796             @note Even if you use real item counter and it returns 0, this fact is not mean that the queue
797             is empty. To check queue emptyness use \p empty() method.
798         */
799         size_t size() const
800         {
801             return m_ItemCounter.value();
802         }
803
804         /// Returns reference to internal statistics
805         const stat& statistics() const
806         {
807             return m_Stat;
808         }
809     };
810
811 }} // namespace cds::intrusive
812
813 #endif // #ifndef CDSLIB_INTRUSIVE_BASKET_QUEUE_H