eed476bc5fd66bb98531aa51af0ec0469169ca12
[libcds.git] / cds / intrusive / optimistic_queue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_OPTIMISTIC_QUEUE_H
4 #define __CDS_INTRUSIVE_OPTIMISTIC_QUEUE_H
5
6 #include <cds/intrusive/base.h>
7 #include <cds/cxx11_atomic.h>
8 #include <cds/gc/default_gc.h>
9 #include <cds/gc/hrc/gc_fwd.h>
10 #include <cds/intrusive/queue_stat.h>
11 #include <cds/ref.h>
12
13 #include <cds/details/std/type_traits.h>
14
15 namespace cds { namespace intrusive {
16
17     /// Optimistic queue related definitions
18     /** @ingroup cds_intrusive_helper
19     */
20     namespace optimistic_queue {
21
22         /// Optimistic queue node
23         /**
24             Template parameters:
25             - GC - garbage collector used. gc::HRC is not supported.
26             - Tag - a tag used to distinguish between different implementation
27         */
28         template <class GC, typename Tag = opt::none>
29         struct node: public GC::container_node
30         {
31             typedef GC  gc  ;   ///< Garbage collector
32             typedef Tag tag ;   ///< tag
33
34             typedef typename gc::template atomic_ref<node>    atomic_node_ptr    ;    ///< atomic pointer
35
36             atomic_node_ptr m_pPrev ;   ///< Pointer to previous node
37             atomic_node_ptr m_pNext ;   ///< Pointer to next node
38
39             CDS_CONSTEXPR node() CDS_NOEXCEPT
40                 : m_pPrev( null_ptr<node *>() )
41                 , m_pNext( null_ptr<node *>() )
42             {}
43         };
44
45         //@cond
46         struct default_hook {
47             typedef cds::gc::default_gc gc;
48             typedef opt::none           tag;
49         };
50         //@endcond
51
52         //@cond
53         template < typename HookType, CDS_DECL_OPTIONS2>
54         struct hook
55         {
56             typedef typename opt::make_options< default_hook, CDS_OPTIONS2>::type  options;
57             typedef typename options::gc    gc;
58             typedef typename options::tag   tag;
59             typedef node<gc, tag> node_type;
60             typedef HookType     hook_type;
61         };
62         //@endcond
63
64         /// Base hook
65         /**
66             \p Options are:
67             - opt::gc - garbage collector used.
68             - opt::tag - tag
69         */
70         template < CDS_DECL_OPTIONS2 >
71         struct base_hook: public hook< opt::base_hook_tag, CDS_OPTIONS2 >
72         {};
73
74         /// Member hook
75         /**
76             \p MemberOffset defines offset in bytes of \ref node member into your structure.
77             Use \p offsetof macro to define \p MemberOffset
78
79             \p Options are:
80             - opt::gc - garbage collector used.
81             - opt::tag - tag
82         */
83         template < size_t MemberOffset, CDS_DECL_OPTIONS2 >
84         struct member_hook: public hook< opt::member_hook_tag, CDS_OPTIONS2 >
85         {
86             //@cond
87             static const size_t c_nMemberOffset = MemberOffset;
88             //@endcond
89         };
90
91         /// Traits hook
92         /**
93             \p NodeTraits defines type traits for node.
94             See \ref node_traits for \p NodeTraits interface description
95
96             \p Options are:
97             - opt::gc - garbage collector used.
98             - opt::tag - tag
99         */
100         template <typename NodeTraits, CDS_DECL_OPTIONS2 >
101         struct traits_hook: public hook< opt::traits_hook_tag, CDS_OPTIONS2 >
102         {
103             //@cond
104             typedef NodeTraits node_traits;
105             //@endcond
106         };
107
108         /// Check link
109         template <typename Node>
110         struct link_checker {
111             //@cond
112             typedef Node node_type;
113             //@endcond
114
115             /// Checks if the link fields of node \p pNode is NULL
116             /**
117                 An asserting is generated if \p pNode link fields is not NULL
118             */
119             static void is_empty( const node_type * pNode )
120             {
121                 assert( pNode->m_pNext.load(CDS_ATOMIC::memory_order_relaxed) == null_ptr<node_type *>() );
122                 assert( pNode->m_pPrev.load(CDS_ATOMIC::memory_order_relaxed) == null_ptr<node_type *>() );
123             }
124         };
125
126         /// Metafunction for selecting appropriate link checking policy
127         template < typename Node, opt::link_check_type LinkType >
128         struct get_link_checker
129         {
130             //@cond
131             typedef intrusive::opt::v::empty_link_checker<Node>  type;
132             //@endcond
133         };
134
135         //@cond
136         template < typename Node >
137         struct get_link_checker< Node, opt::always_check_link >
138         {
139             typedef link_checker<Node>  type;
140         };
141         template < typename Node >
142         struct get_link_checker< Node, opt::debug_check_link >
143         {
144 #       ifdef _DEBUG
145             typedef link_checker<Node>  type;
146 #       else
147             typedef intrusive::opt::v::empty_link_checker<Node>  type;
148 #       endif
149         };
150         //@endcond
151
152         /// OptimisticQueue internal statistics. May be used for debugging or profiling
153         /**
154             Template argument \p Counter defines type of counter.
155             Default is cds::atomicity::event_counter, that is weak, i.e. it is not guaranteed
156             strict event counting.
157             You may use stronger type of counter like as cds::atomicity::item_counter,
158             or even integral type, for example, \p int.
159
160             The class extends intrusive::queue_stat interface for OptimisticQueue.
161         */
162         template <typename Counter = cds::atomicity::event_counter >
163         struct stat: public cds::intrusive::queue_stat<Counter>
164         {
165             //@cond
166             typedef cds::intrusive::queue_stat<Counter> base_class;
167             typedef typename base_class::counter_type    counter_type;
168             //@endcond
169
170             counter_type    m_FixListCount  ;   ///< Count of fix list event
171
172             /// Register fix list event
173             void onFixList()    { ++m_FixListCount; }
174
175             //@cond
176             void reset()
177             {
178                 base_class::reset();
179                 m_FixListCount.reset();
180             }
181
182             stat& operator +=( stat const& s )
183             {
184                 base_class::operator +=( s );
185                 m_FixListCount += s.m_FixListCount.get();
186                 return *this;
187             }
188             //@endcond
189         };
190
191         /// Dummy OptimisticQueue statistics - no counting is performed. Support interface like \ref optimistic_queue::stat
192         struct dummy_stat: public cds::intrusive::queue_dummy_stat
193         {
194             //@cond
195             void onFixList() {}
196
197             void reset() {}
198             dummy_stat& operator +=( dummy_stat const& )
199             {
200                 return *this;
201             }
202             //@endcond
203         };
204
205     }   // namespace optimistic_queue
206
207     /// Optimistic queue
208     /** @ingroup cds_intrusive_queue
209         Implementation of Ladan-Mozes & Shavit optimistic queue algorithm.
210
211         \par Source:
212             [2008] Edya Ladan-Mozes, Nir Shavit "An Optimistic Approach to Lock-Free FIFO Queues"
213
214         Template arguments:
215         - \p GC - garbage collector type: gc::HP, gc::PTB. Note that gc::HRC is <b>not</b> supported
216         - \p T - type to be stored in the queue
217         - \p Options - options
218
219         Type of node: \ref optimistic_queue::node.
220
221         \p Options are:
222         - opt::hook - hook used. Possible values are: optimistic_queue::base_hook, optimistic_queue::member_hook, optimistic_queue::traits_hook.
223             If the option is not specified, <tt>optimistic_queue::base_hook<></tt> is used.
224         - opt::back_off - back-off strategy used. If the option is not specified, the cds::backoff::empty is used.
225         - opt::disposer - the functor used for dispose removed items. Default is opt::v::empty_disposer. This option is used
226             in \ref dequeue function.
227         - opt::link_checker - the type of node's link fields checking. Default is \ref opt::debug_check_link
228         - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter
229         - opt::stat - the type to gather internal statistics.
230             Possible option value are: optimistic_queue::stat, optimistic_queue::dummy_stat,
231             user-provided class that supports optimistic_queue::stat interface.
232             Generic option intrusive::queue_stat and intrusive::queue_dummy_stat are acceptable too, however,
233             they will be automatically converted to optimistic_queue::stat and optimistic_queue::dummy_stat
234             respectively.
235             Default is \ref optimistic_queue::dummy_stat.
236         - opt::alignment - the alignment for internal queue data. Default is opt::cache_line_alignment
237         - opt::memory_model - C++ memory ordering model. Can be opt::v::relaxed_ordering (relaxed memory model, the default)
238             or opt::v::sequential_consistent (sequentially consisnent memory model).
239
240         Garbage collecting schema \p GC must be consistent with the optimistic_queue::node GC.
241
242         \par About item disposing
243         The optimistic queue algo has a key feature: even if the queue is empty it contains one item that is "dummy" one from
244         the standpoint of the algo. See \ref dequeue function for explanation.
245
246         \par Examples
247         \code
248         #include <cds/intrusive/optimistic_queue.h>
249         #include <cds/gc/hp.h>
250
251         namespace ci = cds::inrtusive;
252         typedef cds::gc::HP hp_gc;
253
254         // Optimistic queue with Hazard Pointer garbage collector, base hook + item counter:
255         struct Foo: public ci::optimistic_queue::node< hp_gc >
256         {
257             // Your data
258             ...
259         };
260
261         typedef ci::OptimisticQueue< hp_gc,
262             Foo
263             ,ci::opt::hook<
264                 ci::optimistic_queue::base_hook< ci::opt::gc< hp_gc > >
265             >
266             ,cds::opt::item_counter< cds::atomicity::item_counter >
267         > FooQueue;
268
269         // Optimistic queue with Hazard Pointer garbage collector, member hook, no item counter:
270         struct Bar
271         {
272             // Your data
273             ...
274             ci::optimistic_queue::node< hp_gc > hMember;
275         };
276
277         typedef ci::OptimisticQueue< hp_gc,
278             Bar
279             ,ci::opt::hook<
280                 ci::optimistic_queue::member_hook<
281                     offsetof(Bar, hMember)
282                     ,ci::opt::gc< hp_gc >
283                 >
284             >
285         > BarQueue;
286
287         \endcode
288     */
289     template <typename GC, typename T, CDS_DECL_OPTIONS9>
290     class OptimisticQueue
291     {
292         //@cond
293         struct default_options
294         {
295             typedef cds::backoff::empty             back_off;
296             typedef optimistic_queue::base_hook<>   hook;
297             typedef opt::v::empty_disposer          disposer;
298             typedef atomicity::empty_item_counter   item_counter;
299             typedef opt::v::relaxed_ordering        memory_model;
300             typedef optimistic_queue::dummy_stat    stat;
301             static const opt::link_check_type link_checker = opt::debug_check_link;
302             enum { alignment = opt::cache_line_alignment };
303         };
304         //@endcond
305
306     public:
307         //@cond
308         typedef typename opt::make_options<
309             typename cds::opt::find_type_traits< default_options, CDS_OPTIONS9 >::type
310             ,CDS_OPTIONS9
311         >::type   options;
312
313         typedef typename std::conditional<
314             std::is_same<typename options::stat, cds::intrusive::queue_stat<> >::value
315             ,optimistic_queue::stat<>
316             ,typename std::conditional<
317                 std::is_same<typename options::stat, cds::intrusive::queue_dummy_stat>::value
318                 ,optimistic_queue::dummy_stat
319                 ,typename options::stat
320             >::type
321         >::type stat_type_;
322
323         //@endcond
324
325     public:
326         typedef T  value_type   ;   ///< type of value stored in the queue
327         typedef typename options::hook      hook        ;   ///< hook type
328         typedef typename hook::node_type    node_type   ;   ///< node type
329         typedef typename options::disposer  disposer    ;   ///< disposer used
330         typedef typename get_node_traits< value_type, node_type, hook>::type node_traits ;    ///< node traits
331         typedef typename optimistic_queue::get_link_checker< node_type, options::link_checker >::type link_checker   ;   ///< link checker
332
333         typedef GC gc          ;   ///< Garbage collector
334         typedef typename options::back_off  back_off    ;   ///< back-off strategy
335         typedef typename options::item_counter item_counter ;   ///< Item counting policy used
336         typedef typename options::memory_model  memory_model      ;   ///< Memory ordering. See cds::opt::memory_model option
337 #ifdef CDS_DOXYGEN_INVOKED
338         typedef typename options::stat      stat        ;   ///< Internal statistics policy used
339 #else
340         typedef stat_type_  stat;
341 #endif
342
343         /// Rebind template arguments
344         template <typename GC2, typename T2, CDS_DECL_OTHER_OPTIONS9>
345         struct rebind {
346             typedef OptimisticQueue< GC2, T2, CDS_OTHER_OPTIONS9> other   ;   ///< Rebinding result
347         };
348
349     protected:
350         //@cond
351
352         struct internal_disposer
353         {
354             void operator ()( value_type * p )
355             {
356                 assert( p != null_ptr<value_type *>());
357
358                 OptimisticQueue::clear_links( node_traits::to_node_ptr(*p) );
359                 disposer()( p );
360             }
361         };
362
363         typedef intrusive::node_to_value<OptimisticQueue> node_to_value;
364         typedef typename opt::details::alignment_setter< typename node_type::atomic_node_ptr, options::alignment >::type aligned_node_ptr;
365         //@endcond
366
367         aligned_node_ptr m_pTail ;   ///< Pointer to tail node
368         aligned_node_ptr m_pHead ;   ///< Pointer to head node
369         node_type        m_Dummy ;           ///< dummy node
370
371         item_counter        m_ItemCounter   ;   ///< Item counter
372         stat                m_Stat          ;   ///< Internal statistics
373
374         static CDS_CONSTEXPR_CONST size_t c_nHazardPtrCount = 5 ; ///< Count of hazard pointer required for the algorithm
375
376     protected:
377         //@cond
378         static void clear_links( node_type * pNode )
379         {
380             pNode->m_pNext.store( null_ptr<node_type *>(), memory_model::memory_order_release );
381             pNode->m_pPrev.store( null_ptr<node_type *>(), memory_model::memory_order_release );
382         }
383
384         struct dequeue_result {
385             typename gc::template GuardArray<3>  guards;
386
387             node_type * pHead;
388             node_type * pNext;
389         };
390
391         bool do_dequeue( dequeue_result& res )
392         {
393             node_type * pTail;
394             node_type * pHead;
395             node_type * pFirstNodePrev;
396             back_off bkoff;
397
398             while ( true ) { // Try till success or empty
399                 pHead = res.guards.protect( 0, m_pHead, node_to_value() );
400                 pTail = res.guards.protect( 1, m_pTail, node_to_value() );
401                 assert( pHead != null_ptr<node_type *>() );
402                 pFirstNodePrev = res.guards.protect( 2, pHead->m_pPrev, node_to_value() );
403
404                 if ( pHead == m_pHead.load(memory_model::memory_order_relaxed)) {
405                     if ( pTail != pHead ) {
406                         if ( pFirstNodePrev == null_ptr<node_type *>()
407                           || pFirstNodePrev->m_pNext.load(memory_model::memory_order_relaxed) != pHead )
408                         {
409                             fix_list( pTail, pHead );
410                             continue;
411                         }
412                         if ( m_pHead.compare_exchange_weak( pHead, pFirstNodePrev, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed )) {
413                             // dequeue success
414                             break;
415                         }
416                     }
417                     else {
418                         // the queue is empty
419                         return false;
420                     }
421                 }
422
423                 m_Stat.onDequeueRace();
424                 bkoff();
425             }
426
427             --m_ItemCounter;
428             m_Stat.onDequeue();
429
430             res.pHead = pHead;
431             res.pNext = pFirstNodePrev;
432             return true;
433         }
434
435
436         /// Helper function for optimistic queue. Corrects \p prev pointer of queue's nodes if it is needed
437         void fix_list( node_type * pTail, node_type * pHead )
438         {
439             // pTail and pHead are already guarded
440
441             node_type * pCurNode;
442             node_type * pCurNodeNext;
443
444             typename gc::template GuardArray<2> guards;
445
446             pCurNode = pTail;
447             while ( pCurNode != pHead ) { // While not at head
448                 pCurNodeNext = guards.protect(0, pCurNode->m_pNext, node_to_value() );
449                 if ( pHead != m_pHead.load(memory_model::memory_order_relaxed) )
450                     break;
451                 pCurNodeNext->m_pPrev.store( pCurNode, memory_model::memory_order_release );
452                 guards.assign( 1, node_traits::to_value_ptr( pCurNode = pCurNodeNext ));
453             }
454
455             m_Stat.onFixList();
456         }
457
458         void dispose_result( dequeue_result& res )
459         {
460             dispose_node( res.pHead );
461         }
462
463         void dispose_node( node_type * p )
464         {
465             assert( p != null_ptr<node_type *>());
466
467             if ( p != &m_Dummy ) {
468                 gc::template retire<internal_disposer>( node_traits::to_value_ptr(p) );
469             }
470         }
471
472         //@endcond
473
474     public:
475         /// Constructor creates empty queue
476         OptimisticQueue()
477             : m_pTail( null_ptr<node_type *>() )
478             , m_pHead( null_ptr<node_type *>() )
479         {
480             // GC and node_type::gc must be the same
481             static_assert(( std::is_same<gc, typename node_type::gc>::value ), "GC and node_type::gc must be the same");
482
483             // cds::gc::HRC is not allowed
484             static_assert(( !std::is_same<gc, cds::gc::HRC>::value ), "cds::gc::HRC is not allowed here");
485
486             m_pTail.store( &m_Dummy, memory_model::memory_order_relaxed );
487             m_pHead.store( &m_Dummy, memory_model::memory_order_relaxed );
488         }
489
490         ~OptimisticQueue()
491         {
492             clear();
493             node_type * pHead = m_pHead.load(memory_model::memory_order_relaxed);
494             CDS_DEBUG_DO( node_type * pTail = m_pTail.load(memory_model::memory_order_relaxed); )
495             CDS_DEBUG_DO( assert( pHead == pTail ); )
496             assert( pHead != null_ptr<node_type *>() );
497
498             m_pHead.store( null_ptr<node_type *>(), memory_model::memory_order_relaxed );
499             m_pTail.store( null_ptr<node_type *>(), memory_model::memory_order_relaxed );
500
501             dispose_node( pHead );
502         }
503
504         /// @anchor cds_intrusive_OptimisticQueue_enqueue Enqueues \p data in lock-free manner. Always return \a true
505         bool enqueue( value_type& val )
506         {
507             node_type * pNew = node_traits::to_node_ptr( val );
508             link_checker::is_empty( pNew );
509
510             typename gc::template GuardArray<2> guards;
511             back_off bkoff;
512
513             guards.assign( 1, &val );
514             node_type * pTail = guards.protect( 0, m_pTail, node_to_value() )  ;   // Read the tail
515             while( true ) {
516                 pNew->m_pNext.store( pTail, memory_model::memory_order_release );
517                 if ( m_pTail.compare_exchange_strong( pTail, pNew, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed ) ) {     // Try to CAS the tail
518                     pTail->m_pPrev.store( pNew, memory_model::memory_order_release )     ;           // Success, write prev
519                     ++m_ItemCounter;
520                     m_Stat.onEnqueue();
521                     break ;                             // Enqueue done!
522                 }
523                 guards.assign( 0, node_traits::to_value_ptr( pTail ) )   ;  // pTail has been changed by CAS above
524                 m_Stat.onEnqueueRace();
525                 bkoff();
526             }
527             return true;
528         }
529
530         /// Dequeues a value from the queue
531         /** @anchor cds_intrusive_OptimisticQueue_dequeue
532             If the queue is empty the function returns \a NULL
533
534             \par Warning
535             The queue algorithm has following feature: when \p dequeue is called,
536             the item returning is still queue's top, and previous top is disposed:
537
538             \code
539             before dequeuing         Dequeue               after dequeuing
540             +------------------+                           +------------------+
541       Top ->|      Item 1      |  -> Dispose Item 1        |      Item 2      | <- Top
542             +------------------+                           +------------------+
543             |      Item 2      |  -> Return Item 2         |       ...        |
544             +------------------+
545             |       ...        |
546             \endcode
547
548             \p dequeue function returns Item 2, that becomes new top of queue, and calls
549             the disposer for Item 1, that was queue's top on function entry.
550             Thus, you cannot manually delete item returned because it is still included in
551             item sequence and it has valuable link field that must not be zeroed.
552             The item may be deleted only in disposer call.
553         */
554         value_type * dequeue()
555         {
556             dequeue_result res;
557             if ( do_dequeue( res )) {
558                 dispose_result( res );
559
560                 return node_traits::to_value_ptr( *res.pNext );
561             }
562             return null_ptr<value_type *>();
563         }
564
565         /// Synonym for @ref cds_intrusive_OptimisticQueue_enqueue "enqueue"
566         bool push( value_type& val )
567         {
568             return enqueue( val );
569         }
570
571         /// Synonym for \ref cds_intrusive_OptimisticQueue_dequeue "dequeue"
572         value_type * pop()
573         {
574             return dequeue();
575         }
576
577         /// Checks if queue is empty
578         bool empty() const
579         {
580             return m_pTail.load(memory_model::memory_order_relaxed) == m_pHead.load(memory_model::memory_order_relaxed);
581         }
582
583         /// Clear the stack
584         /**
585             The function repeatedly calls \ref dequeue until it returns NULL.
586             The disposer defined in template \p Options is called for each item
587             that can be safely disposed.
588         */
589         void clear()
590         {
591             value_type * pv;
592             while ( (pv = dequeue()) != null_ptr<value_type *>() );
593         }
594
595         /// Returns queue's item count
596         /**
597             The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
598             this function always returns 0.
599
600             <b>Warning</b>: even if you use real item counter and it returns 0, this fact is not mean that the queue
601             is empty. To check queue emptyness use \ref empty() method.
602         */
603         size_t size() const
604         {
605             return m_ItemCounter.value();
606         }
607
608         /// Returns refernce to internal statistics
609         const stat& statistics() const
610         {
611             return m_Stat;
612         }
613     };
614
615 }}  // namespace cds::intrusive
616
617 #endif // #ifndef __CDS_INTRUSIVE_OPTIMISTIC_QUEUE_H