3 #ifndef CDSLIB_INTRUSIVE_SEGMENTED_QUEUE_H
4 #define CDSLIB_INTRUSIVE_SEGMENTED_QUEUE_H
7 #include <cds/intrusive/details/base.h>
8 #include <cds/details/marked_ptr.h>
9 #include <cds/algo/int_algo.h>
10 #include <cds/sync/spinlock.h>
11 #include <cds/opt/permutation.h>
13 #include <boost/intrusive/slist.hpp>
15 #if CDS_COMPILER == CDS_COMPILER_MSVC
16 # pragma warning( push )
17 # pragma warning( disable: 4355 ) // warning C4355: 'this' : used in base member initializer list
20 namespace cds { namespace intrusive {
22 /// SegmentedQueue -related declarations
23 namespace segmented_queue {
25 /// SegmentedQueue internal statistics. May be used for debugging or profiling
26 template <typename Counter = cds::atomicity::event_counter >
29 typedef Counter counter_type; ///< Counter type
31 counter_type m_nPush; ///< Push count
32 counter_type m_nPushPopulated; ///< Number of attempts to push to populated (non-empty) cell
33 counter_type m_nPushContended; ///< Number of failed CAS when pushing
34 counter_type m_nPop; ///< Pop count
35 counter_type m_nPopEmpty; ///< Number of dequeuing from empty queue
36 counter_type m_nPopContended; ///< Number of failed CAS when popping
38 counter_type m_nCreateSegmentReq; ///< Number of request to create new segment
39 counter_type m_nDeleteSegmentReq; ///< Number to request to delete segment
40 counter_type m_nSegmentCreated; ///< Number of created segments
41 counter_type m_nSegmentDeleted; ///< Number of deleted segments
44 void onPush() { ++m_nPush; }
45 void onPushPopulated() { ++m_nPushPopulated; }
46 void onPushContended() { ++m_nPushContended; }
47 void onPop() { ++m_nPop; }
48 void onPopEmpty() { ++m_nPopEmpty; }
49 void onPopContended() { ++m_nPopContended; }
50 void onCreateSegmentReq() { ++m_nCreateSegmentReq; }
51 void onDeleteSegmentReq() { ++m_nDeleteSegmentReq; }
52 void onSegmentCreated() { ++m_nSegmentCreated; }
53 void onSegmentDeleted() { ++m_nSegmentDeleted; }
57 /// Dummy SegmentedQueue statistics, no overhead
60 void onPush() const {}
61 void onPushPopulated() const {}
62 void onPushContended() const {}
64 void onPopEmpty() const {}
65 void onPopContended() const {}
66 void onCreateSegmentReq() const {}
67 void onDeleteSegmentReq() const {}
68 void onSegmentCreated() const {}
69 void onSegmentDeleted() const {}
73 /// SegmentedQueue default traits
75 /// Element disposer that is called when the item to be dequeued. Default is opt::v::empty_disposer (no disposer)
76 typedef opt::v::empty_disposer disposer;
78 /// Item counter, default is atomicity::item_counter
80 The item counting is an essential part of segmented queue algorithm.
81 The \p empty() member function is based on checking <tt>size() == 0</tt>.
82 Therefore, dummy item counter like atomicity::empty_item_counter is not the proper counter.
84 typedef atomicity::item_counter item_counter;
86 /// Internal statistics, possible predefined types are \ref stat, \ref empty_stat (the default)
87 typedef segmented_queue::empty_stat stat;
89 /// Memory model, default is opt::v::relaxed_ordering. See cds::opt::memory_model for the full list of possible types
90 typedef opt::v::relaxed_ordering memory_model;
92 /// Alignment of critical data, default is cache line alignment. See cds::opt::alignment option specification
93 enum { alignment = opt::cache_line_alignment };
95 /// Padding of segment data, default is no special padding
97 The segment is just an array of atomic data pointers,
98 so, the high load leads to false sharing and performance degradation.
99 A padding of segment data can eliminate false sharing issue.
100 On the other hand, the padding leads to increase segment size.
102 enum { padding = opt::no_special_padding };
104 /// Segment allocator. Default is \ref CDS_DEFAULT_ALLOCATOR
105 typedef CDS_DEFAULT_ALLOCATOR allocator;
107 /// Lock type used to maintain an internal list of allocated segments
108 typedef cds::sync::spin lock_type;
110 /// Random \ref cds::opt::permutation_generator "permutation generator" for sequence [0, quasi_factor)
111 typedef cds::opt::v::random2_permutation<int> permutation_generator;
114 /// Metafunction converting option list to traits for SegmentedQueue
116 The metafunction can be useful if a few fields in \p segmented_queue::traits should be changed.
119 typedef cds::intrusive::segmented_queue::make_traits<
120 cds::opt::item_counter< cds::atomicity::item_counter >
121 >::type my_segmented_queue_traits;
123 This code creates \p %SegmentedQueue type traits with item counting feature,
124 all other \p %segmented_queue::traits members left unchanged.
127 - \p opt::disposer - the functor used for dispose removed items.
128 - \p opt::stat - internal statistics, possible type: \p segmented_queue::stat, \p segmented_queue::empty_stat (the default)
129 - \p opt::item_counter - item counting feature. Note that \p atomicity::empty_item_counetr is not suitable
131 - \p opt::memory_model - memory model, default is \p opt::v::relaxed_ordering.
132 See option description for the full list of possible models
133 - \p opt::alignment - the alignment for critical data, see option description for explanation
134 - \p opt::padding - the padding of segment data, default no special padding.
135 See \p traits::padding for explanation.
136 - \p opt::allocator - the allocator to be used for maintaining segments.
137 - \p opt::lock_type - a mutual exclusion lock type used to maintain internal list of allocated
138 segments. Default is \p cds::opt::Spin, \p std::mutex is also suitable.
139 - \p opt::permutation_generator - a random permutation generator for sequence [0, quasi_factor),
140 default is \p cds::opt::v::random2_permutation<int>
142 template <typename... Options>
144 # ifdef CDS_DOXYGEN_INVOKED
145 typedef implementation_defined type ; ///< Metafunction result
147 typedef typename cds::opt::make_options<
148 typename cds::opt::find_type_traits< traits, Options... >::type
153 } // namespace segmented_queue
156 /** @ingroup cds_intrusive_queue
158 The queue is based on work
159 - [2010] Afek, Korland, Yanovsky "Quasi-Linearizability: relaxed consistency for improved concurrency"
161 In this paper the authors offer a relaxed version of linearizability, so-called quasi-linearizability,
162 that preserves some of the intuition, provides a flexible way to control the level of relaxation
163 and supports th implementation of more concurrent and scalable data structure.
164 Intuitively, the linearizability requires each run to be equivalent in some sense to a serial run
165 of the algorithm. This equivalence to some serial run imposes strong synchronization requirements
166 that in many cases results in limited scalability and synchronization bottleneck.
168 The general idea is that the queue maintains a linked list of segments, each segment is an array of
169 nodes in the size of the quasi factor, and each node has a deleted boolean marker, which states
170 if it has been dequeued. Each producer iterates over last segment in the linked list in some random
171 permutation order. Whet it finds an empty cell it performs a CAS operation attempting to enqueue its
172 new element. In case the entire segment has been scanned and no available cell is found (implying
173 that the segment is full), then it attempts to add a new segment to the list.
175 The dequeue operation is similar: the consumer iterates over the first segment in the linked list
176 in some random permutation order. When it finds an item which has not yet been dequeued, it performs
177 CAS on its deleted marker in order to "delete" it, if succeeded this item is considered dequeued.
178 In case the entire segment was scanned and all the nodes have already been dequeued (implying that
179 the segment is empty), then it attempts to remove this segment from the linked list and starts
180 the same process on the next segment. If there is no next segment, the queue is considered empty.
182 Based on the fact that most of the time threads do not add or remove segments, most of the work
183 is done in parallel on different cells in the segments. This ensures a controlled contention
184 depending on the segment size, which is quasi factor.
186 The segmented queue is an <i>unfair</i> queue since it violates the strong FIFO order but no more than
187 quasi factor. This means that the consumer dequeues <i>any</i> item from the current first segment.
190 - \p GC - a garbage collector, possible types are cds::gc::HP, cds::gc::DHP
191 - \p T - the type of values stored in the queue
192 - \p Traits - queue type traits, default is \p segmented_queue::traits.
193 \p segmented_queue::make_traits metafunction can be used to construct the
196 The queue stores the pointers to enqueued items so no special node hooks are needed.
198 template <class GC, typename T, typename Traits = segmented_queue::traits >
202 typedef GC gc; ///< Garbage collector
203 typedef T value_type; ///< type of the value stored in the queue
204 typedef Traits traits; ///< Queue traits
206 typedef typename traits::disposer disposer ; ///< value disposer, called only in \p clear() when the element to be dequeued
207 typedef typename traits::allocator allocator; ///< Allocator maintaining the segments
208 typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
209 typedef typename traits::item_counter item_counter; ///< Item counting policy, see cds::opt::item_counter option setter
210 typedef typename traits::stat stat; ///< Internal statistics policy
211 typedef typename traits::lock_type lock_type; ///< Type of mutex for maintaining an internal list of allocated segments.
212 typedef typename traits::permutation_generator permutation_generator; ///< Random permutation generator for sequence [0, quasi-factor)
214 static const size_t c_nHazardPtrCount = 2 ; ///< Count of hazard pointer required for the algorithm
218 // Segment cell. LSB is used as deleted mark
219 typedef cds::details::marked_ptr< value_type, 1 > regular_cell;
220 typedef atomics::atomic< regular_cell > atomic_cell;
221 typedef typename cds::opt::details::apply_padding< atomic_cell, traits::padding >::type cell;
224 struct segment: public boost::intrusive::slist_base_hook<>
226 cell * cells; // Cell array of size \ref m_nQuasiFactor
227 size_t version; // version tag (ABA prevention tag)
228 // cell array is placed here in one continuous memory block
230 // Initializes the segment
231 segment( size_t nCellCount )
232 // MSVC warning C4355: 'this': used in base member initializer list
233 : cells( reinterpret_cast< cell *>( this + 1 ))
241 void init( size_t nCellCount )
243 cell * pLastCell = cells + nCellCount;
244 for ( cell* pCell = cells; pCell < pLastCell; ++pCell )
245 pCell->data.store( regular_cell(), atomics::memory_order_relaxed );
246 atomics::atomic_thread_fence( memory_model::memory_order_release );
250 typedef typename opt::details::alignment_setter< atomics::atomic<segment *>, traits::alignment >::type aligned_segment_ptr;
257 typedef boost::intrusive::slist< segment, boost::intrusive::cache_last< true > > list_impl;
258 typedef std::unique_lock< lock_type > scoped_lock;
260 aligned_segment_ptr m_pHead;
261 aligned_segment_ptr m_pTail;
264 mutable lock_type m_Lock;
265 size_t const m_nQuasiFactor;
269 struct segment_disposer
271 void operator()( segment * pSegment )
273 assert( pSegment != nullptr );
274 free_segment( pSegment );
278 struct gc_segment_disposer
280 void operator()( segment * pSegment )
282 assert( pSegment != nullptr );
283 retire_segment( pSegment );
288 segment_list( size_t nQuasiFactor, stat& st )
291 , m_nQuasiFactor( nQuasiFactor )
294 assert( cds::beans::is_power2( nQuasiFactor ));
299 m_List.clear_and_dispose( gc_segment_disposer() );
302 segment * head( typename gc::Guard& guard )
304 return guard.protect( m_pHead );
307 segment * tail( typename gc::Guard& guard )
309 return guard.protect( m_pTail );
313 bool populated( segment const& s ) const
315 // The lock should be held
316 cell const * pLastCell = s.cells + quasi_factor();
317 for ( cell const * pCell = s.cells; pCell < pLastCell; ++pCell ) {
318 if ( !pCell->data.load( memory_model::memory_order_relaxed ).all() )
323 bool exhausted( segment const& s ) const
325 // The lock should be held
326 cell const * pLastCell = s.cells + quasi_factor();
327 for ( cell const * pCell = s.cells; pCell < pLastCell; ++pCell ) {
328 if ( !pCell->data.load( memory_model::memory_order_relaxed ).bits() )
335 segment * create_tail( segment * pTail, typename gc::Guard& guard )
337 // pTail is guarded by GC
339 m_Stat.onCreateSegmentReq();
341 scoped_lock l( m_Lock );
343 if ( !m_List.empty() && ( pTail != &m_List.back() || get_version(pTail) != m_List.back().version )) {
344 m_pTail.store( &m_List.back(), memory_model::memory_order_relaxed );
346 return guard.assign( &m_List.back() );
349 assert( m_List.empty() || populated( m_List.back() ));
351 segment * pNew = allocate_segment();
352 m_Stat.onSegmentCreated();
354 if ( m_List.empty() )
355 m_pHead.store( pNew, memory_model::memory_order_release );
356 m_List.push_back( *pNew );
357 m_pTail.store( pNew, memory_model::memory_order_release );
358 return guard.assign( pNew );
361 segment * remove_head( segment * pHead, typename gc::Guard& guard )
363 // pHead is guarded by GC
364 m_Stat.onDeleteSegmentReq();
368 scoped_lock l( m_Lock );
370 if ( m_List.empty() ) {
371 m_pTail.store( nullptr, memory_model::memory_order_relaxed );
372 m_pHead.store( nullptr, memory_model::memory_order_relaxed );
373 return guard.assign( nullptr );
376 if ( pHead != &m_List.front() || get_version(pHead) != m_List.front().version ) {
377 m_pHead.store( &m_List.front(), memory_model::memory_order_relaxed );
378 return guard.assign( &m_List.front() );
381 assert( exhausted(m_List.front()) );
384 if ( m_List.empty() ) {
385 pRet = guard.assign( nullptr );
386 m_pTail.store( nullptr, memory_model::memory_order_relaxed );
389 pRet = guard.assign( &m_List.front() );
390 m_pHead.store( pRet, memory_model::memory_order_release );
393 retire_segment( pHead );
394 m_Stat.onSegmentDeleted();
399 size_t quasi_factor() const
401 return m_nQuasiFactor;
405 typedef cds::details::Allocator< segment, allocator > segment_allocator;
407 static size_t get_version( segment * pSegment )
409 return pSegment ? pSegment->version : 0;
412 segment * allocate_segment()
414 return segment_allocator().NewBlock( sizeof(segment) + sizeof(cell) * m_nQuasiFactor, quasi_factor() );
417 static void free_segment( segment * pSegment )
419 segment_allocator().Delete( pSegment );
422 static void retire_segment( segment * pSegment )
424 gc::template retire<segment_disposer>( pSegment );
430 segment_list m_SegmentList; ///< List of segments
432 item_counter m_ItemCounter; ///< Item counter
433 stat m_Stat; ///< Internal statistics
436 /// Initializes the empty queue
438 size_t nQuasiFactor ///< Quasi factor. If it is not a power of 2 it is rounded up to nearest power of 2. Minimum is 2.
440 : m_SegmentList( cds::beans::ceil2(nQuasiFactor), m_Stat )
442 static_assert( (!std::is_same< item_counter, cds::atomicity::empty_item_counter >::value),
443 "cds::atomicity::empty_item_counter is not supported for SegmentedQueue"
445 assert( m_SegmentList.quasi_factor() > 1 );
448 /// Clears the queue and deletes all internal data
454 /// Inserts a new element at last segment of the queue
455 bool enqueue( value_type& val )
457 // LSB is used as a flag in marked pointer
458 assert( (reinterpret_cast<uintptr_t>( &val ) & 1) == 0 );
460 typename gc::Guard segmentGuard;
461 segment * pTailSegment = m_SegmentList.tail( segmentGuard );
462 if ( !pTailSegment ) {
463 // no segments, create the new one
464 pTailSegment = m_SegmentList.create_tail( pTailSegment, segmentGuard );
465 assert( pTailSegment );
468 permutation_generator gen( quasi_factor() );
470 // First, increment item counter.
471 // We sure that the item will be enqueued
472 // but if we increment the counter after inserting we can get a negative counter value
473 // if dequeuing occurs before incrementing (enqueue/dequeue race)
477 CDS_DEBUG_ONLY( size_t nLoopCount = 0);
479 typename permutation_generator::integer_type i = gen;
480 CDS_DEBUG_ONLY( ++nLoopCount );
481 if ( pTailSegment->cells[i].data.load(memory_model::memory_order_relaxed).all() ) {
482 // Cell is not empty, go next
483 m_Stat.onPushPopulated();
486 // Empty cell found, try to enqueue here
487 regular_cell nullCell;
488 if ( pTailSegment->cells[i].data.compare_exchange_strong( nullCell, regular_cell( &val ),
489 memory_model::memory_order_release, atomics::memory_order_relaxed ))
495 assert( nullCell.ptr() );
496 m_Stat.onPushContended();
498 } while ( gen.next() );
500 assert( nLoopCount == quasi_factor());
502 // No available position, create a new segment
503 pTailSegment = m_SegmentList.create_tail( pTailSegment, segmentGuard );
505 // Get new permutation
510 /// Removes an element from first segment of the queue and returns it
512 If the queue is empty the function returns \p nullptr.
514 The disposer specified in \p Traits template argument is <b>not</b> called for returned item.
515 You should manually dispose the item:
518 void operator()( foo * p )
523 cds::intrusive::SegmentedQueue< cds::gc::HP, foo > theQueue;
527 foo * pItem = theQueue.dequeue();
531 // pItem is not longer needed and can be deleted
532 // Do it via gc::HP::retire
533 cds::gc::HP::template retire< my_disposer >( pItem );
536 value_type * dequeue()
538 typename gc::Guard itemGuard;
539 if ( do_dequeue( itemGuard )) {
540 value_type * pVal = itemGuard.template get<value_type>();
548 /// Synonym for \p enqueue(value_type&) member function
549 bool push( value_type& val )
551 return enqueue( val );
554 /// Synonym for \p dequeue() member function
560 /// Checks if the queue is empty
562 The original segmented queue algorithm does not allow to check emptiness accurately
563 because \p empty() is unlinearizable.
564 This function tests queue's emptiness checking <tt>size() == 0</tt>,
565 so, the item counting feature is an essential part of queue's algorithm.
574 The function repeatedly calls \ref dequeue until it returns \p nullptr.
575 The disposer specified in \p Traits template argument is called for each removed item.
579 clear_with( disposer() );
584 The function repeatedly calls \p dequeue() until it returns \p nullptr.
585 \p Disposer is called for each removed item.
587 template <class Disposer>
588 void clear_with( Disposer )
590 typename gc::Guard itemGuard;
591 while ( do_dequeue( itemGuard ) ) {
592 assert( itemGuard.template get<value_type>() );
593 gc::template retire<Disposer>( itemGuard.template get<value_type>() );
598 /// Returns queue's item count
601 return m_ItemCounter.value();
604 /// Returns reference to internal statistics
606 The type of internal statistics is specified by \p Traits template argument.
608 const stat& statistics() const
613 /// Returns quasi factor, a power-of-two number
614 size_t quasi_factor() const
616 return m_SegmentList.quasi_factor();
621 bool do_dequeue( typename gc::Guard& itemGuard )
623 typename gc::Guard segmentGuard;
624 segment * pHeadSegment = m_SegmentList.head( segmentGuard );
626 permutation_generator gen( quasi_factor() );
628 if ( !pHeadSegment ) {
634 bool bHadNullValue = false;
636 CDS_DEBUG_ONLY( size_t nLoopCount = 0 );
638 typename permutation_generator::integer_type i = gen;
639 CDS_DEBUG_ONLY( ++nLoopCount );
642 // In segmented queue the cell cannot be reused
643 // So no loop is needed here to protect the cell
644 item = pHeadSegment->cells[i].data.load( memory_model::memory_order_relaxed );
645 itemGuard.assign( item.ptr() );
647 // Check if this cell is empty, which means an element
648 // can be enqueued to this cell in the future
650 bHadNullValue = true;
652 // If the item is not deleted yet
653 if ( !item.bits() ) {
654 // Try to mark the cell as deleted
655 if ( pHeadSegment->cells[i].data.compare_exchange_strong( item, item | 1,
656 memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
663 assert( item.bits() );
664 m_Stat.onPopContended();
667 } while ( gen.next() );
669 assert( nLoopCount == quasi_factor() );
671 // scanning the entire segment without finding a candidate to dequeue
672 // If there was an empty cell, the queue is considered empty
673 if ( bHadNullValue ) {
678 // All nodes have been dequeued, we can safely remove the first segment
679 pHeadSegment = m_SegmentList.remove_head( pHeadSegment, segmentGuard );
681 // Get new permutation
687 }} // namespace cds::intrusive
689 #if CDS_COMPILER == CDS_COMPILER_MSVC
690 # pragma warning( pop )
693 #endif // #ifndef CDSLIB_INTRUSIVE_SEGMENTED_QUEUE_H