Issue #59: added missing ifdef _DEBUG for assertions
[libcds.git] / cds / intrusive / segmented_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #ifndef CDSLIB_INTRUSIVE_SEGMENTED_QUEUE_H
32 #define CDSLIB_INTRUSIVE_SEGMENTED_QUEUE_H
33
34 #include <mutex>
35 #include <cds/intrusive/details/base.h>
36 #include <cds/details/marked_ptr.h>
37 #include <cds/algo/int_algo.h>
38 #include <cds/sync/spinlock.h>
39 #include <cds/opt/permutation.h>
40
41 #include <boost/intrusive/slist.hpp>
42
43 #if CDS_COMPILER == CDS_COMPILER_MSVC
44 #   pragma warning( push )
45 #   pragma warning( disable: 4355 ) // warning C4355: 'this' : used in base member initializer list
46 #endif
47
48 namespace cds { namespace intrusive {
49
50     /// SegmentedQueue -related declarations
51     namespace segmented_queue {
52
53         /// SegmentedQueue internal statistics. May be used for debugging or profiling
54         template <typename Counter = cds::atomicity::event_counter >
55         struct stat
56         {
57             typedef Counter  counter_type;  ///< Counter type
58
59             counter_type    m_nPush;            ///< Push count
60             counter_type    m_nPushPopulated;   ///< Number of attempts to push to populated (non-empty) cell
61             counter_type    m_nPushContended;   ///< Number of failed CAS when pushing
62             counter_type    m_nPop;             ///< Pop count
63             counter_type    m_nPopEmpty;        ///< Number of dequeuing from empty queue
64             counter_type    m_nPopContended;    ///< Number of failed CAS when popping
65
66             counter_type    m_nCreateSegmentReq;    ///< Number of request to create new segment
67             counter_type    m_nDeleteSegmentReq;    ///< Number to request to delete segment
68             counter_type    m_nSegmentCreated;  ///< Number of created segments
69             counter_type    m_nSegmentDeleted;  ///< Number of deleted segments
70
71             //@cond
72             void onPush()               { ++m_nPush; }
73             void onPushPopulated()      { ++m_nPushPopulated; }
74             void onPushContended()      { ++m_nPushContended; }
75             void onPop()                { ++m_nPop;  }
76             void onPopEmpty()           { ++m_nPopEmpty; }
77             void onPopContended()       { ++m_nPopContended; }
78             void onCreateSegmentReq()   { ++m_nCreateSegmentReq; }
79             void onDeleteSegmentReq()   { ++m_nDeleteSegmentReq; }
80             void onSegmentCreated()     { ++m_nSegmentCreated; }
81             void onSegmentDeleted()     { ++m_nSegmentDeleted; }
82             //@endcond
83         };
84
85         /// Dummy SegmentedQueue statistics, no overhead
86         struct empty_stat {
87             //@cond
88             void onPush() const             {}
89             void onPushPopulated() const    {}
90             void onPushContended() const    {}
91             void onPop() const              {}
92             void onPopEmpty() const         {}
93             void onPopContended() const     {}
94             void onCreateSegmentReq() const {}
95             void onDeleteSegmentReq() const {}
96             void onSegmentCreated() const   {}
97             void onSegmentDeleted() const   {}
98             //@endcond
99         };
100
101         /// SegmentedQueue default traits
102         struct traits {
103             /// Element disposer that is called when the item to be dequeued. Default is opt::v::empty_disposer (no disposer)
104             typedef opt::v::empty_disposer disposer;
105
106             /// Item counter, default is atomicity::item_counter
107             /**
108                 The item counting is an essential part of segmented queue algorithm.
109                 The \p empty() member function is based on checking <tt>size() == 0</tt>.
110                 Therefore, dummy item counter like atomicity::empty_item_counter is not the proper counter.
111             */
112             typedef atomicity::item_counter item_counter;
113
114             /// Internal statistics, possible predefined types are \ref stat, \ref empty_stat (the default)
115             typedef segmented_queue::empty_stat        stat;
116
117             /// Memory model, default is opt::v::relaxed_ordering. See cds::opt::memory_model for the full list of possible types
118             typedef opt::v::relaxed_ordering  memory_model;
119
120             /// Alignment of critical data, default is cache line alignment. See cds::opt::alignment option specification
121             enum { alignment = opt::cache_line_alignment };
122
123             /// Padding of segment data, default is no special padding
124             /**
125                 The segment is just an array of atomic data pointers,
126                 so, the high load leads to false sharing and performance degradation.
127                 A padding of segment data can eliminate false sharing issue.
128                 On the other hand, the padding leads to increase segment size.
129             */
130             enum { padding = opt::no_special_padding };
131
132             /// Segment allocator. Default is \ref CDS_DEFAULT_ALLOCATOR
133             typedef CDS_DEFAULT_ALLOCATOR allocator;
134
135             /// Lock type used to maintain an internal list of allocated segments
136             typedef cds::sync::spin lock_type;
137
138             /// Random \ref cds::opt::permutation_generator "permutation generator" for sequence [0, quasi_factor)
139             typedef cds::opt::v::random2_permutation<int>    permutation_generator;
140         };
141
142         /// Metafunction converting option list to traits for SegmentedQueue
143         /**
144             The metafunction can be useful if a few fields in \p segmented_queue::traits should be changed.
145             For example:
146             \code
147             typedef cds::intrusive::segmented_queue::make_traits<
148                 cds::opt::item_counter< cds::atomicity::item_counter >
149             >::type my_segmented_queue_traits;
150             \endcode
151             This code creates \p %SegmentedQueue type traits with item counting feature,
152             all other \p %segmented_queue::traits members left unchanged.
153
154             \p Options are:
155             - \p opt::disposer - the functor used to dispose removed items.
156             - \p opt::stat - internal statistics, possible type: \p segmented_queue::stat, \p segmented_queue::empty_stat (the default)
157             - \p opt::item_counter - item counting feature. Note that \p atomicity::empty_item_counetr is not suitable
158                 for segmented queue.
159             - \p opt::memory_model - memory model, default is \p opt::v::relaxed_ordering.
160                 See option description for the full list of possible models
161             - \p opt::alignment - the alignment for critical data, see option description for explanation
162             - \p opt::padding - the padding of segment data, default no special padding.
163                 See \p traits::padding for explanation.
164             - \p opt::allocator - the allocator to be used for maintaining segments.
165             - \p opt::lock_type - a mutual exclusion lock type used to maintain internal list of allocated
166                 segments. Default is \p cds::opt::Spin, \p std::mutex is also suitable.
167             - \p opt::permutation_generator - a random permutation generator for sequence [0, quasi_factor),
168                 default is \p cds::opt::v::random2_permutation<int>
169         */
170         template <typename... Options>
171         struct make_traits {
172 #   ifdef CDS_DOXYGEN_INVOKED
173             typedef implementation_defined type ;   ///< Metafunction result
174 #   else
175             typedef typename cds::opt::make_options<
176                 typename cds::opt::find_type_traits< traits, Options... >::type
177                 ,Options...
178             >::type   type;
179 #   endif
180         };
181     } // namespace segmented_queue
182
183     /// Segmented queue
184     /** @ingroup cds_intrusive_queue
185
186         The queue is based on work
187         - [2010] Afek, Korland, Yanovsky "Quasi-Linearizability: relaxed consistency for improved concurrency"
188
189         In this paper the authors offer a relaxed version of linearizability, so-called quasi-linearizability,
190         that preserves some of the intuition, provides a flexible way to control the level of relaxation
191         and supports th implementation of more concurrent and scalable data structure.
192         Intuitively, the linearizability requires each run to be equivalent in some sense to a serial run
193         of the algorithm. This equivalence to some serial run imposes strong synchronization requirements
194         that in many cases results in limited scalability and synchronization bottleneck.
195
196         The general idea is that the queue maintains a linked list of segments, each segment is an array of
197         nodes in the size of the quasi factor, and each node has a deleted boolean marker, which states
198         if it has been dequeued. Each producer iterates over last segment in the linked list in some random
199         permutation order. Whet it finds an empty cell it performs a CAS operation attempting to enqueue its
200         new element. In case the entire segment has been scanned and no available cell is found (implying
201         that the segment is full), then it attempts to add a new segment to the list.
202
203         The dequeue operation is similar: the consumer iterates over the first segment in the linked list
204         in some random permutation order. When it finds an item which has not yet been dequeued, it performs
205         CAS on its deleted marker in order to "delete" it, if succeeded this item is considered dequeued.
206         In case the entire segment was scanned and all the nodes have already been dequeued (implying that
207         the segment is empty), then it attempts to remove this segment from the linked list and starts
208         the same process on the next segment. If there is no next segment, the queue is considered empty.
209
210         Based on the fact that most of the time threads do not add or remove segments, most of the work
211         is done in parallel on different cells in the segments. This ensures a controlled contention
212         depending on the segment size, which is quasi factor.
213
214         The segmented queue is an <i>unfair</i> queue since it violates the strong FIFO order but no more than
215         quasi factor. This means that the consumer dequeues <i>any</i> item from the current first segment.
216
217         Template parameters:
218         - \p GC - a garbage collector, possible types are cds::gc::HP, cds::gc::DHP
219         - \p T - the type of values stored in the queue
220         - \p Traits - queue type traits, default is \p segmented_queue::traits.
221             \p segmented_queue::make_traits metafunction can be used to construct the
222             type traits.
223
224         The queue stores the pointers to enqueued items so no special node hooks are needed.
225     */
226     template <class GC, typename T, typename Traits = segmented_queue::traits >
227     class SegmentedQueue
228     {
229     public:
230         typedef GC  gc;         ///< Garbage collector
231         typedef T   value_type; ///< type of the value stored in the queue
232         typedef Traits traits;  ///< Queue traits
233
234         typedef typename traits::disposer      disposer    ;   ///< value disposer, called only in \p clear() when the element to be dequeued
235         typedef typename traits::allocator     allocator;   ///< Allocator maintaining the segments
236         typedef typename traits::memory_model  memory_model;   ///< Memory ordering. See cds::opt::memory_model option
237         typedef typename traits::item_counter  item_counter;   ///< Item counting policy, see cds::opt::item_counter option setter
238         typedef typename traits::stat          stat;   ///< Internal statistics policy
239         typedef typename traits::lock_type     lock_type;   ///< Type of mutex for maintaining an internal list of allocated segments.
240         typedef typename traits::permutation_generator permutation_generator; ///< Random permutation generator for sequence [0, quasi-factor)
241
242         static const size_t c_nHazardPtrCount = 2 ; ///< Count of hazard pointer required for the algorithm
243
244     protected:
245         //@cond
246         // Segment cell. LSB is used as deleted mark
247         typedef cds::details::marked_ptr< value_type, 1 > regular_cell;
248         typedef atomics::atomic< regular_cell > atomic_cell;
249         typedef typename cds::opt::details::apply_padding< atomic_cell, traits::padding >::type cell;
250
251         // Segment
252         struct segment: public boost::intrusive::slist_base_hook<>
253         {
254             cell * cells;    // Cell array of size \ref m_nQuasiFactor
255             size_t version;  // version tag (ABA prevention tag)
256             // cell array is placed here in one continuous memory block
257
258             // Initializes the segment
259             segment( size_t nCellCount )
260                 // MSVC warning C4355: 'this': used in base member initializer list
261                 : cells( reinterpret_cast< cell *>( this + 1 ))
262                 , version( 0 )
263             {
264                 init( nCellCount );
265             }
266
267             segment() = delete;
268
269             void init( size_t nCellCount )
270             {
271                 cell * pLastCell = cells + nCellCount;
272                 for ( cell* pCell = cells; pCell < pLastCell; ++pCell )
273                     pCell->data.store( regular_cell(), atomics::memory_order_relaxed );
274                 atomics::atomic_thread_fence( memory_model::memory_order_release );
275             }
276         };
277
278         typedef typename opt::details::alignment_setter< atomics::atomic<segment *>, traits::alignment >::type aligned_segment_ptr;
279         //@endcond
280
281     protected:
282         //@cond
283         class segment_list
284         {
285             typedef boost::intrusive::slist< segment, boost::intrusive::cache_last< true > > list_impl;
286             typedef std::unique_lock< lock_type > scoped_lock;
287
288             aligned_segment_ptr m_pHead;
289             aligned_segment_ptr m_pTail;
290
291             list_impl           m_List;
292             mutable lock_type   m_Lock;
293             size_t const        m_nQuasiFactor;
294             stat&               m_Stat;
295
296         private:
297             struct segment_disposer
298             {
299                 void operator()( segment * pSegment )
300                 {
301                     assert( pSegment != nullptr );
302                     free_segment( pSegment );
303                 }
304             };
305
306             struct gc_segment_disposer
307             {
308                 void operator()( segment * pSegment )
309                 {
310                     assert( pSegment != nullptr );
311                     retire_segment( pSegment );
312                 }
313             };
314
315         public:
316             segment_list( size_t nQuasiFactor, stat& st )
317                 : m_pHead( nullptr )
318                 , m_pTail( nullptr )
319                 , m_nQuasiFactor( nQuasiFactor )
320                 , m_Stat( st )
321             {
322                 assert( cds::beans::is_power2( nQuasiFactor ));
323             }
324
325             ~segment_list()
326             {
327                 m_List.clear_and_dispose( gc_segment_disposer() );
328             }
329
330             segment * head( typename gc::Guard& guard )
331             {
332                 return guard.protect( m_pHead );
333             }
334
335             segment * tail( typename gc::Guard& guard )
336             {
337                 return guard.protect( m_pTail );
338             }
339
340 #       ifdef _DEBUG
341             bool populated( segment const& s ) const
342             {
343                 // The lock should be held
344                 cell const * pLastCell = s.cells + quasi_factor();
345                 for ( cell const * pCell = s.cells; pCell < pLastCell; ++pCell ) {
346                     if ( !pCell->data.load( memory_model::memory_order_relaxed ).all() )
347                         return false;
348                 }
349                 return true;
350             }
351             bool exhausted( segment const& s ) const
352             {
353                 // The lock should be held
354                 cell const * pLastCell = s.cells + quasi_factor();
355                 for ( cell const * pCell = s.cells; pCell < pLastCell; ++pCell ) {
356                     if ( !pCell->data.load( memory_model::memory_order_relaxed ).bits() )
357                         return false;
358                 }
359                 return true;
360             }
361 #       endif
362
363             segment * create_tail( segment * pTail, typename gc::Guard& guard )
364             {
365                 // pTail is guarded by GC
366
367                 m_Stat.onCreateSegmentReq();
368
369                 scoped_lock l( m_Lock );
370
371                 if ( !m_List.empty() && ( pTail != &m_List.back() || get_version(pTail) != m_List.back().version )) {
372                     m_pTail.store( &m_List.back(), memory_model::memory_order_relaxed );
373
374                     return guard.assign( &m_List.back() );
375                 }
376
377 #           ifdef _DEBUG
378                 assert( m_List.empty() || populated( m_List.back() ));
379 #           endif
380
381                 segment * pNew = allocate_segment();
382                 m_Stat.onSegmentCreated();
383
384                 if ( m_List.empty() )
385                     m_pHead.store( pNew, memory_model::memory_order_release );
386                 m_List.push_back( *pNew );
387                 m_pTail.store( pNew, memory_model::memory_order_release );
388                 return guard.assign( pNew );
389             }
390
391             segment * remove_head( segment * pHead, typename gc::Guard& guard )
392             {
393                 // pHead is guarded by GC
394                 m_Stat.onDeleteSegmentReq();
395
396                 segment * pRet;
397                 {
398                     scoped_lock l( m_Lock );
399
400                     if ( m_List.empty() ) {
401                         m_pTail.store( nullptr, memory_model::memory_order_relaxed );
402                         m_pHead.store( nullptr, memory_model::memory_order_relaxed );
403                         return guard.assign( nullptr );
404                     }
405
406                     if ( pHead != &m_List.front() || get_version(pHead) != m_List.front().version ) {
407                         m_pHead.store( &m_List.front(), memory_model::memory_order_relaxed );
408                         return guard.assign( &m_List.front() );
409                     }
410
411 #           ifdef _DEBUG
412                     assert( exhausted( m_List.front()));
413 #           endif
414
415                     m_List.pop_front();
416                     if ( m_List.empty() ) {
417                         pRet = guard.assign( nullptr );
418                         m_pTail.store( nullptr, memory_model::memory_order_relaxed );
419                     }
420                     else
421                         pRet = guard.assign( &m_List.front() );
422                     m_pHead.store( pRet, memory_model::memory_order_release );
423                 }
424
425                 retire_segment( pHead );
426                 m_Stat.onSegmentDeleted();
427
428                 return pRet;
429             }
430
431             size_t quasi_factor() const
432             {
433                 return m_nQuasiFactor;
434             }
435
436         private:
437             typedef cds::details::Allocator< segment, allocator >   segment_allocator;
438
439             static size_t get_version( segment * pSegment )
440             {
441                 return pSegment ? pSegment->version : 0;
442             }
443
444             segment * allocate_segment()
445             {
446                 return segment_allocator().NewBlock( sizeof(segment) + sizeof(cell) * m_nQuasiFactor, quasi_factor() );
447             }
448
449             static void free_segment( segment * pSegment )
450             {
451                 segment_allocator().Delete( pSegment );
452             }
453
454             static void retire_segment( segment * pSegment )
455             {
456                 gc::template retire<segment_disposer>( pSegment );
457             }
458         };
459         //@endcond
460
461     protected:
462         segment_list              m_SegmentList;  ///< List of segments
463
464         item_counter              m_ItemCounter;  ///< Item counter
465         stat                      m_Stat;         ///< Internal statistics
466
467     public:
468         /// Initializes the empty queue
469         SegmentedQueue(
470             size_t nQuasiFactor     ///< Quasi factor. If it is not a power of 2 it is rounded up to nearest power of 2. Minimum is 2.
471             )
472             : m_SegmentList( cds::beans::ceil2(nQuasiFactor), m_Stat )
473         {
474             static_assert( (!std::is_same< item_counter, cds::atomicity::empty_item_counter >::value),
475                 "cds::atomicity::empty_item_counter is not supported for SegmentedQueue"
476                 );
477             assert( m_SegmentList.quasi_factor() > 1 );
478         }
479
480         /// Clears the queue and deletes all internal data
481         ~SegmentedQueue()
482         {
483             clear();
484         }
485
486         /// Inserts a new element at last segment of the queue
487         bool enqueue( value_type& val )
488         {
489             // LSB is used as a flag in marked pointer
490             assert( (reinterpret_cast<uintptr_t>( &val ) & 1) == 0 );
491
492             typename gc::Guard segmentGuard;
493             segment * pTailSegment = m_SegmentList.tail( segmentGuard );
494             if ( !pTailSegment ) {
495                 // no segments, create the new one
496                 pTailSegment = m_SegmentList.create_tail( pTailSegment, segmentGuard );
497                 assert( pTailSegment );
498             }
499
500             permutation_generator gen( quasi_factor() );
501
502             // First, increment item counter.
503             // We sure that the item will be enqueued
504             // but if we increment the counter after inserting we can get a negative counter value
505             // if dequeuing occurs before incrementing (enqueue/dequeue race)
506             ++m_ItemCounter;
507
508             while ( true ) {
509                 CDS_DEBUG_ONLY( size_t nLoopCount = 0);
510                 do {
511                     typename permutation_generator::integer_type i = gen;
512                     CDS_DEBUG_ONLY( ++nLoopCount );
513                     if ( pTailSegment->cells[i].data.load(memory_model::memory_order_relaxed).all() ) {
514                         // Cell is not empty, go next
515                         m_Stat.onPushPopulated();
516                     }
517                     else {
518                         // Empty cell found, try to enqueue here
519                         regular_cell nullCell;
520                         if ( pTailSegment->cells[i].data.compare_exchange_strong( nullCell, regular_cell( &val ),
521                             memory_model::memory_order_release, atomics::memory_order_relaxed ))
522                         {
523                             // Ok to push item
524                             m_Stat.onPush();
525                             return true;
526                         }
527                         assert( nullCell.ptr() );
528                         m_Stat.onPushContended();
529                     }
530                 } while ( gen.next() );
531
532                 assert( nLoopCount == quasi_factor());
533
534                 // No available position, create a new segment
535                 pTailSegment = m_SegmentList.create_tail( pTailSegment, segmentGuard );
536
537                 // Get new permutation
538                 gen.reset();
539             }
540         }
541
542         /// Removes an element from first segment of the queue and returns it
543         /**
544             If the queue is empty the function returns \p nullptr.
545
546             The disposer specified in \p Traits template argument is <b>not</b> called for returned item.
547             You should manually dispose the item:
548             <code>
549             struct my_disposer {
550                 void operator()( foo * p )
551                 {
552                     delete p;
553                 }
554             };
555             cds::intrusive::SegmentedQueue< cds::gc::HP, foo > theQueue;
556             // ...
557
558             // Dequeue an item
559             foo * pItem = theQueue.dequeue();
560             // deal with pItem
561             //...
562
563             // pItem is not longer needed and can be deleted
564             // Do it via gc::HP::retire
565             cds::gc::HP::template retire< my_disposer >( pItem );
566             </code>
567         */
568         value_type * dequeue()
569         {
570             typename gc::Guard itemGuard;
571             if ( do_dequeue( itemGuard )) {
572                 value_type * pVal = itemGuard.template get<value_type>();
573                 assert( pVal );
574                 return pVal;
575             }
576             return nullptr;
577
578         }
579
580         /// Synonym for \p enqueue(value_type&) member function
581         bool push( value_type& val )
582         {
583             return enqueue( val );
584         }
585
586         /// Synonym for \p dequeue() member function
587         value_type * pop()
588         {
589             return dequeue();
590         }
591
592         /// Checks if the queue is empty
593         /**
594             The original segmented queue algorithm does not allow to check emptiness accurately
595             because \p empty() is unlinearizable.
596             This function tests queue's emptiness checking <tt>size() == 0</tt>,
597             so, the item counting feature is an essential part of queue's algorithm.
598         */
599         bool empty() const
600         {
601             return size() == 0;
602         }
603
604         /// Clear the queue
605         /**
606             The function repeatedly calls \p dequeue() until it returns \p nullptr.
607             The disposer specified in \p Traits template argument is called for each removed item.
608         */
609         void clear()
610         {
611             clear_with( disposer() );
612         }
613
614         /// Clear the queue
615         /**
616             The function repeatedly calls \p dequeue() until it returns \p nullptr.
617             \p Disposer is called for each removed item.
618         */
619         template <class Disposer>
620         void clear_with( Disposer )
621         {
622             typename gc::Guard itemGuard;
623             while ( do_dequeue( itemGuard ) ) {
624                 assert( itemGuard.template get<value_type>() );
625                 gc::template retire<Disposer>( itemGuard.template get<value_type>() );
626                 itemGuard.clear();
627             }
628         }
629
630         /// Returns queue's item count
631         size_t size() const
632         {
633             return m_ItemCounter.value();
634         }
635
636         /// Returns reference to internal statistics
637         /**
638             The type of internal statistics is specified by \p Traits template argument.
639         */
640         const stat& statistics() const
641         {
642             return m_Stat;
643         }
644
645         /// Returns quasi factor, a power-of-two number
646         size_t quasi_factor() const
647         {
648             return m_SegmentList.quasi_factor();
649         }
650
651     protected:
652         //@cond
653         bool do_dequeue( typename gc::Guard& itemGuard )
654         {
655             typename gc::Guard segmentGuard;
656             segment * pHeadSegment = m_SegmentList.head( segmentGuard );
657
658             permutation_generator gen( quasi_factor() );
659             while ( true ) {
660                 if ( !pHeadSegment ) {
661                     // Queue is empty
662                     m_Stat.onPopEmpty();
663                     return false;
664                 }
665
666                 bool bHadNullValue = false;
667                 regular_cell item;
668                 CDS_DEBUG_ONLY( size_t nLoopCount = 0 );
669                 do {
670                     typename permutation_generator::integer_type i = gen;
671                     CDS_DEBUG_ONLY( ++nLoopCount );
672
673                     // Guard the item
674                     // In segmented queue the cell cannot be reused
675                     // So no loop is needed here to protect the cell
676                     item = pHeadSegment->cells[i].data.load( memory_model::memory_order_relaxed );
677                     itemGuard.assign( item.ptr() );
678
679                     // Check if this cell is empty, which means an element
680                     // can be enqueued to this cell in the future
681                     if ( !item.ptr() )
682                         bHadNullValue = true;
683                     else {
684                         // If the item is not deleted yet
685                         if ( !item.bits() ) {
686                             // Try to mark the cell as deleted
687                             if ( pHeadSegment->cells[i].data.compare_exchange_strong( item, item | 1,
688                                 memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
689                             {
690                                 --m_ItemCounter;
691                                 m_Stat.onPop();
692
693                                 return true;
694                             }
695                             assert( item.bits() );
696                             m_Stat.onPopContended();
697                         }
698                     }
699                 } while ( gen.next() );
700
701                 assert( nLoopCount == quasi_factor() );
702
703                 // scanning the entire segment without finding a candidate to dequeue
704                 // If there was an empty cell, the queue is considered empty
705                 if ( bHadNullValue ) {
706                     m_Stat.onPopEmpty();
707                     return false;
708                 }
709
710                 // All nodes have been dequeued, we can safely remove the first segment
711                 pHeadSegment = m_SegmentList.remove_head( pHeadSegment, segmentGuard );
712
713                 // Get new permutation
714                 gen.reset();
715             }
716         }
717         //@endcond
718     };
719 }} // namespace cds::intrusive
720
721 #if CDS_COMPILER == CDS_COMPILER_MSVC
722 #   pragma warning( pop )
723 #endif
724
725 #endif // #ifndef CDSLIB_INTRUSIVE_SEGMENTED_QUEUE_H