3 #ifndef __CDS_INTRUSIVE_SEGMENTED_QUEUE_H
4 #define __CDS_INTRUSIVE_SEGMENTED_QUEUE_H
7 #include <cds/intrusive/details/base.h>
8 #include <cds/details/marked_ptr.h>
9 #include <cds/algo/int_algo.h>
10 #include <cds/lock/spinlock.h>
11 #include <cds/opt/permutation.h>
13 #include <boost/intrusive/slist.hpp>
15 #if CDS_COMPILER == CDS_COMPILER_MSVC
16 # pragma warning( push )
17 # pragma warning( disable: 4355 ) // warning C4355: 'this' : used in base member initializer list
20 namespace cds { namespace intrusive {
22 /// SegmentedQueue -related declarations
23 namespace segmented_queue {
25 /// SegmentedQueue internal statistics. May be used for debugging or profiling
26 template <typename Counter = cds::atomicity::event_counter >
28 typedef Counter counter_type; ///< Counter type
30 counter_type m_nPush; ///< Push count
31 counter_type m_nPushPopulated; ///< Number of attempts to push to populated (non-empty) cell
32 counter_type m_nPushContended; ///< Number of failed CAS when pushing
33 counter_type m_nPop; ///< Pop count
34 counter_type m_nPopEmpty; ///< Number of dequeuing from empty queue
35 counter_type m_nPopContended; ///< Number of failed CAS when popping
37 counter_type m_nCreateSegmentReq; ///< Number of request to create new segment
38 counter_type m_nDeleteSegmentReq; ///< Number to request to delete segment
39 counter_type m_nSegmentCreated; ///< Number of created segments
40 counter_type m_nSegmentDeleted; ///< Number of deleted segments
43 void onPush() { ++m_nPush; }
44 void onPushPopulated() { ++m_nPushPopulated; }
45 void onPushContended() { ++m_nPushContended; }
46 void onPop() { ++m_nPop; }
47 void onPopEmpty() { ++m_nPopEmpty; }
48 void onPopContended() { ++m_nPopContended; }
49 void onCreateSegmentReq() { ++m_nCreateSegmentReq; }
50 void onDeleteSegmentReq() { ++m_nDeleteSegmentReq; }
51 void onSegmentCreated() { ++m_nSegmentCreated; }
52 void onSegmentDeleted() { ++m_nSegmentDeleted; }
56 /// Dummy SegmentedQueue statistics, no overhead
59 void onPush() const {}
60 void onPushPopulated() const {}
61 void onPushContended() const {}
63 void onPopEmpty() const {}
64 void onPopContended() const {}
65 void onCreateSegmentReq() const {}
66 void onDeleteSegmentReq() const {}
67 void onSegmentCreated() const {}
68 void onSegmentDeleted() const {}
72 /// SegmentedQueue default type traits
74 /// Element disposer that is called when the item to be dequeued. Default is opt::v::empty_disposer (no disposer)
75 typedef opt::v::empty_disposer disposer;
77 /// Item counter, default is atomicity::item_counter
79 The item counting is an essential part of segmented queue algorithm.
80 The \p empty() member function is based on checking <tt>size() == 0</tt>.
81 Therefore, dummy item counter like atomicity::empty_item_counter is not the proper counter.
83 typedef atomicity::item_counter item_counter;
85 /// Internal statistics, possible predefined types are \ref stat, \ref empty_stat (the default)
86 typedef segmented_queue::empty_stat stat;
88 /// Memory model, default is opt::v::relaxed_ordering. See cds::opt::memory_model for the full list of possible types
89 typedef opt::v::relaxed_ordering memory_model;
91 /// Alignment of critical data, default is cache line alignment. See cds::opt::alignment option specification
92 enum { alignment = opt::cache_line_alignment };
94 /// Segment allocator. Default is \ref CDS_DEFAULT_ALLOCATOR
95 typedef CDS_DEFAULT_ALLOCATOR allocator;
97 /// Lock type used to maintain an internal list of allocated segments
98 typedef cds::lock::Spin lock_type;
100 /// Random \ref cds::opt::permutation_generator "permutation generator" for sequence [0, quasi_factor)
101 typedef cds::opt::v::random2_permutation<int> permutation_generator;
104 /// Metafunction converting option list to traits for SegmentedQueue
106 The metafunction can be useful if a few fields in \ref type_traits should be changed.
109 typedef cds::intrusive::segmented_queue::make_traits<
110 cds::opt::item_counter< cds::atomicity::item_counter >
111 >::type my_segmented_queue_traits;
113 This code creates \p %SegmentedQueue type traits with item counting feature,
114 all other \p type_traits members left unchanged.
117 - \p opt::disposer - the functor used for dispose removed items.
118 - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
119 - \p opt::item_counter - item counting feature. Note that atomicity::empty_item_counetr is not suitable
121 - \p opt::memory_model - memory model, default is \p opt::v::relaxed_ordering.
122 See option description for the full list of possible models
123 - \p opt::alignment - the alignment for critical data, see option description for explanation
124 - \p opt::allocator - the allocator used t maintain segments.
125 - \p opt::lock_type - a mutual exclusion lock type used to maintain internal list of allocated
126 segments. Default is \p cds::opt::Spin, \p std::mutex is also suitable.
127 - \p opt::permutation_generator - a random permutation generator for sequence [0, quasi_factor),
128 default is cds::opt::v::random2_permutation<int>
130 template <typename... Options>
132 # ifdef CDS_DOXYGEN_INVOKED
133 typedef implementation_defined type ; ///< Metafunction result
135 typedef typename cds::opt::make_options<
136 typename cds::opt::find_type_traits< type_traits, Options... >::type
141 } // namespace segmented_queue
144 /** @ingroup cds_intrusive_queue
146 The queue is based on work
147 - [2010] Afek, Korland, Yanovsky "Quasi-Linearizability: relaxed consistency for improved concurrency"
149 In this paper the authors offer a relaxed version of linearizability, so-called quasi-linearizability,
150 that preserves some of the intuition, provides a flexible way to control the level of relaxation
151 and supports th implementation of more concurrent and scalable data structure.
152 Intuitively, the linearizability requires each run to be equivalent in some sense to a serial run
153 of the algorithm. This equivalence to some serial run imposes strong synchronization requirements
154 that in many cases results in limited scalability and synchronization bottleneck.
156 The general idea is that the queue maintains a linked list of segments, each segment is an array of
157 nodes in the size of the quasi factor, and each node has a deleted boolean marker, which states
158 if it has been dequeued. Each producer iterates over last segment in the linked list in some random
159 permutation order. Whet it finds an empty cell it performs a CAS operation attempting to enqueue its
160 new element. In case the entire segment has been scanned and no available cell is found (implying
161 that the segment is full), then it attempts to add a new segment to the list.
163 The dequeue operation is similar: the consumer iterates over the first segment in the linked list
164 in some random permutation order. When it finds an item which has not yet been dequeued, it performs
165 CAS on its deleted marker in order to "delete" it, if succeeded this item is considered dequeued.
166 In case the entire segment was scanned and all the nodes have already been dequeued (implying that
167 the segment is empty), then it attempts to remove this segment from the linked list and starts
168 the same process on the next segment. If there is no next segment, the queue is considered empty.
170 Based on the fact that most of the time threads do not add or remove segments, most of the work
171 is done in parallel on different cells in the segments. This ensures a controlled contention
172 depending on the segment size, which is quasi factor.
174 The segmented queue is an <i>unfair</i> queue since it violates the strong FIFO order but no more than
175 quasi factor. This means that the consumer dequeues <i>any</i> item from the current first segment.
178 - \p GC - a garbage collector, possible types are cds::gc::HP, cds::gc::PTB
179 - \p T - the type of values stored in the queue
180 - \p Traits - queue type traits, default is segmented_queue::type_traits.
181 segmented_queue::make_traits metafunction can be used to construct the
184 The queue stores the pointers to enqueued items so no special node hooks are needed.
186 template <class GC, typename T, typename Traits = segmented_queue::type_traits >
190 typedef GC gc ; ///< Garbage collector
191 typedef T value_type ; ///< type of the value stored in the queue
192 typedef Traits options ; ///< Queue's traits
194 typedef typename options::disposer disposer ; ///< value disposer, called only in \p clear() when the element to be dequeued
195 typedef typename options::allocator allocator ; ///< Allocator maintaining the segments
196 typedef typename options::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
197 typedef typename options::item_counter item_counter; ///< Item counting policy, see cds::opt::item_counter option setter
198 typedef typename options::stat stat ; ///< Internal statistics policy
199 typedef typename options::lock_type lock_type ; ///< Type of mutex for maintaining an internal list of allocated segments.
200 typedef typename options::permutation_generator permutation_generator; ///< Random permutation generator for sequence [0, quasi-factor)
202 static const size_t m_nHazardPtrCount = 2 ; ///< Count of hazard pointer required for the algorithm
206 // Segment cell. LSB is used as deleted mark
207 typedef cds::details::marked_ptr< value_type, 1 > cell;
210 struct segment: public boost::intrusive::slist_base_hook<>
212 atomics::atomic< cell > * cells; // Cell array of size \ref m_nQuasiFactor
213 size_t version; // version tag (ABA prevention tag)
214 // cell array is placed here in one continuous memory block
216 // Initializes the segment
217 segment( size_t nCellCount )
218 // MSVC warning C4355: 'this': used in base member initializer list
219 : cells( reinterpret_cast< atomics::atomic< cell > * >( this + 1 ))
225 void init( size_t nCellCount )
227 atomics::atomic< cell > * pLastCell = cells + nCellCount;
228 for ( atomics::atomic< cell > * pCell = cells; pCell < pLastCell; ++pCell )
229 pCell->store( cell(), atomics::memory_order_relaxed );
230 atomics::atomic_thread_fence( memory_model::memory_order_release );
237 typedef typename opt::details::alignment_setter< atomics::atomic<segment *>, options::alignment >::type aligned_segment_ptr;
244 typedef boost::intrusive::slist< segment, boost::intrusive::cache_last< true > > list_impl;
245 typedef std::unique_lock< lock_type > scoped_lock;
247 aligned_segment_ptr m_pHead;
248 aligned_segment_ptr m_pTail;
251 mutable lock_type m_Lock;
252 size_t const m_nQuasiFactor;
256 struct segment_disposer
258 void operator()( segment * pSegment )
260 assert( pSegment != nullptr );
261 free_segment( pSegment );
265 struct gc_segment_disposer
267 void operator()( segment * pSegment )
269 assert( pSegment != nullptr );
270 retire_segment( pSegment );
275 segment_list( size_t nQuasiFactor, stat& st )
278 , m_nQuasiFactor( nQuasiFactor )
281 assert( cds::beans::is_power2( nQuasiFactor ));
286 m_List.clear_and_dispose( gc_segment_disposer() );
289 segment * head( typename gc::Guard& guard )
291 return guard.protect( m_pHead );
294 segment * tail( typename gc::Guard& guard )
296 return guard.protect( m_pTail );
300 bool populated( segment const& s ) const
302 // The lock should be held
303 atomics::atomic< cell > const * pLastCell = s.cells + quasi_factor();
304 for ( atomics::atomic< cell > const * pCell = s.cells; pCell < pLastCell; ++pCell ) {
305 if ( !pCell->load( memory_model::memory_order_relaxed ).all() )
310 bool exhausted( segment const& s ) const
312 // The lock should be held
313 atomics::atomic< cell > const * pLastCell = s.cells + quasi_factor();
314 for ( atomics::atomic< cell > const * pCell = s.cells; pCell < pLastCell; ++pCell ) {
315 if ( !pCell->load( memory_model::memory_order_relaxed ).bits() )
322 segment * create_tail( segment * pTail, typename gc::Guard& guard )
324 // pTail is guarded by GC
326 m_Stat.onCreateSegmentReq();
328 scoped_lock l( m_Lock );
330 if ( !m_List.empty() && ( pTail != &m_List.back() || get_version(pTail) != m_List.back().version )) {
331 m_pTail.store( &m_List.back(), memory_model::memory_order_relaxed );
333 return guard.assign( &m_List.back() );
336 assert( m_List.empty() || populated( m_List.back() ));
338 segment * pNew = allocate_segment();
339 m_Stat.onSegmentCreated();
341 if ( m_List.empty() )
342 m_pHead.store( pNew, memory_model::memory_order_relaxed );
343 m_List.push_back( *pNew );
344 m_pTail.store( pNew, memory_model::memory_order_release );
345 return guard.assign( pNew );
348 segment * remove_head( segment * pHead, typename gc::Guard& guard )
350 // pHead is guarded by GC
351 m_Stat.onDeleteSegmentReq();
355 scoped_lock l( m_Lock );
357 if ( m_List.empty() ) {
358 m_pTail.store( nullptr, memory_model::memory_order_relaxed );
359 m_pHead.store( nullptr, memory_model::memory_order_relaxed );
360 return guard.assign( nullptr );
363 if ( pHead != &m_List.front() || get_version(pHead) != m_List.front().version ) {
364 m_pHead.store( &m_List.front(), memory_model::memory_order_relaxed );
365 return guard.assign( &m_List.front() );
368 assert( exhausted(m_List.front()) );
371 if ( m_List.empty() ) {
372 pRet = guard.assign( nullptr );
373 m_pTail.store( nullptr, memory_model::memory_order_relaxed );
376 pRet = guard.assign( &m_List.front() );
377 m_pHead.store( pRet, memory_model::memory_order_release );
380 retire_segment( pHead );
381 m_Stat.onSegmentDeleted();
386 size_t quasi_factor() const
388 return m_nQuasiFactor;
392 typedef cds::details::Allocator< segment, allocator > segment_allocator;
394 static size_t get_version( segment * pSegment )
396 return pSegment ? pSegment->version : 0;
399 segment * allocate_segment()
401 return segment_allocator().NewBlock( sizeof(segment) + sizeof(cell) * m_nQuasiFactor,
405 static void free_segment( segment * pSegment )
407 segment_allocator().Delete( pSegment );
410 static void retire_segment( segment * pSegment )
412 gc::template retire<segment_disposer>( pSegment );
418 segment_list m_SegmentList; ///< List of segments
420 item_counter m_ItemCounter; ///< Item counter
421 stat m_Stat; ///< Internal statistics
424 /// Initializes the empty queue
426 size_t nQuasiFactor ///< Quasi factor. If it is not a power of 2 it is rounded up to nearest power of 2. Minimum is 2.
428 : m_SegmentList( cds::beans::ceil2(nQuasiFactor), m_Stat )
430 static_assert( (!std::is_same< item_counter, cds::atomicity::empty_item_counter >::value),
431 "cds::atomicity::empty_item_counter is not supported for SegmentedQueue"
433 assert( m_SegmentList.quasi_factor() > 1 );
436 /// Clears the queue and deletes all internal data
442 /// Inserts a new element at last segment of the queue
443 bool enqueue( value_type& val )
445 // LSB is used as a flag in marked pointer
446 assert( (reinterpret_cast<uintptr_t>( &val ) & 1) == 0 );
448 typename gc::Guard segmentGuard;
449 segment * pTailSegment = m_SegmentList.tail( segmentGuard );
450 if ( !pTailSegment ) {
451 // no segments, create the new one
452 pTailSegment = m_SegmentList.create_tail( pTailSegment, segmentGuard );
453 assert( pTailSegment );
456 permutation_generator gen( quasi_factor() );
458 // First, increment item counter.
459 // We sure that the item will be enqueued
460 // but if we increment the counter after inserting we can get a negative counter value
461 // if dequeuing occurs before incrementing (enqueue/dequeue race)
465 CDS_DEBUG_ONLY( size_t nLoopCount = 0);
467 typename permutation_generator::integer_type i = gen;
468 CDS_DEBUG_ONLY( ++nLoopCount );
469 if ( pTailSegment->cells[i].load(memory_model::memory_order_relaxed).all() ) {
470 // Cell is not empty, go next
471 m_Stat.onPushPopulated();
474 // Empty cell found, try to enqueue here
476 if ( pTailSegment->cells[i].compare_exchange_strong( nullCell, cell( &val ),
477 memory_model::memory_order_release, atomics::memory_order_relaxed ))
483 assert( nullCell.ptr() );
484 m_Stat.onPushContended();
486 } while ( gen.next() );
488 assert( nLoopCount == quasi_factor());
490 // No available position, create a new segment
491 pTailSegment = m_SegmentList.create_tail( pTailSegment, segmentGuard );
493 // Get new permutation
498 /// Removes an element from first segment of the queue and returns it
500 If the queue is empty the function returns \p nullptr.
502 The disposer specified in \p Traits template argument is <b>not</b> called for returned item.
503 You should manually dispose the item:
506 void operator()( foo * p )
511 cds::intrusive::SegmentedQueue< cds::gc::HP, foo > theQueue;
515 foo * pItem = theQueue.dequeue();
519 // pItem is not longer needed and can be deleted
520 // Do it via gc::HP::retire
521 cds::gc::HP::template retire< my_disposer >( pItem );
524 value_type * dequeue()
526 typename gc::Guard itemGuard;
527 if ( do_dequeue( itemGuard )) {
528 value_type * pVal = itemGuard.template get<value_type>();
536 /// Synonym for \p enqueue(value_type&) member function
537 bool push( value_type& val )
539 return enqueue( val );
542 /// Synonym for \p dequeue() member function
548 /// Checks if the queue is empty
550 The original segmented queue algorithm does not allow to check emptiness accurately
551 because \p empty() is unlinearizable.
552 This function tests queue's emptiness checking <tt>size() == 0</tt>,
553 so, the item counting feature is an essential part of queue's algorithm.
562 The function repeatedly calls \ref dequeue until it returns \p nullptr.
563 The disposer specified in \p Traits template argument is called for each removed item.
567 clear_with( disposer() );
572 The function repeatedly calls \p dequeue() until it returns \p nullptr.
573 \p Disposer is called for each removed item.
575 template <class Disposer>
576 void clear_with( Disposer )
578 typename gc::Guard itemGuard;
579 while ( do_dequeue( itemGuard ) ) {
580 assert( itemGuard.template get<value_type>() );
581 gc::template retire<Disposer>( itemGuard.template get<value_type>() );
586 /// Returns queue's item count
589 return m_ItemCounter.value();
592 /// Returns reference to internal statistics
594 The type of internal statistics is specified by \p Traits template argument.
596 const stat& statistics() const
601 /// Returns quasi factor, a power-of-two number
602 size_t quasi_factor() const
604 return m_SegmentList.quasi_factor();
609 bool do_dequeue( typename gc::Guard& itemGuard )
611 typename gc::Guard segmentGuard;
612 segment * pHeadSegment = m_SegmentList.head( segmentGuard );
614 permutation_generator gen( quasi_factor() );
616 if ( !pHeadSegment ) {
622 bool bHadNullValue = false;
624 CDS_DEBUG_ONLY( size_t nLoopCount = 0 );
626 typename permutation_generator::integer_type i = gen;
627 CDS_DEBUG_ONLY( ++nLoopCount );
630 // In segmented queue the cell cannot be reused
631 // So no loop is needed here to protect the cell
632 item = pHeadSegment->cells[i].load( memory_model::memory_order_relaxed );
633 itemGuard.assign( item.ptr() );
635 // Check if this cell is empty, which means an element
636 // can be enqueued to this cell in the future
638 bHadNullValue = true;
640 // If the item is not deleted yet
641 if ( !item.bits() ) {
642 // Try to mark the cell as deleted
643 if ( pHeadSegment->cells[i].compare_exchange_strong( item, item | 1,
644 memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
651 assert( item.bits() );
652 m_Stat.onPopContended();
655 } while ( gen.next() );
657 assert( nLoopCount == quasi_factor() );
659 // scanning the entire segment without finding a candidate to dequeue
660 // If there was an empty cell, the queue is considered empty
661 if ( bHadNullValue ) {
666 // All nodes have been dequeued, we can safely remove the first segment
667 pHeadSegment = m_SegmentList.remove_head( pHeadSegment, segmentGuard );
669 // Get new permutation
675 }} // namespace cds::intrusive
677 #if CDS_COMPILER == CDS_COMPILER_MSVC
678 # pragma warning( pop )
681 #endif // #ifndef __CDS_INTRUSIVE_SEGMENTED_QUEUE_H