3 #ifndef CDSLIB_CONTAINER_FCQUEUE_H
4 #define CDSLIB_CONTAINER_FCQUEUE_H
6 #include <cds/algo/flat_combining.h>
7 #include <cds/algo/elimination_opt.h>
10 namespace cds { namespace container {
12 /// FCQueue related definitions
13 /** @ingroup cds_nonintrusive_helper
17 /// FCQueue internal statistics
18 template <typename Counter = cds::atomicity::event_counter >
19 struct stat: public cds::algo::flat_combining::stat<Counter>
21 typedef cds::algo::flat_combining::stat<Counter> flat_combining_stat; ///< Flat-combining statistics
22 typedef typename flat_combining_stat::counter_type counter_type; ///< Counter type
24 counter_type m_nEnqueue ; ///< Count of enqueue operations
25 counter_type m_nEnqMove ; ///< Count of enqueue operations with move semantics
26 counter_type m_nDequeue ; ///< Count of success dequeue operations
27 counter_type m_nFailedDeq ; ///< Count of failed dequeue operations (pop from empty queue)
28 counter_type m_nCollided ; ///< How many pairs of enqueue/dequeue were collided, if elimination is enabled
31 void onEnqueue() { ++m_nEnqueue; }
32 void onEnqMove() { ++m_nEnqMove; }
33 void onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue; }
34 void onCollide() { ++m_nCollided; }
38 /// FCQueue dummy statistics, no overhead
39 struct empty_stat: public cds::algo::flat_combining::empty_stat
44 void onDequeue(bool) {}
49 /// FCQueue type traits
50 struct traits: public cds::algo::flat_combining::traits
52 typedef empty_stat stat; ///< Internal statistics
53 static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
56 /// Metafunction converting option list to traits
59 - \p opt::lock_type - mutex type, default is \p cds::sync::spin
60 - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
61 - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
62 - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
63 - \p opt::memory_model - C++ memory ordering model.
64 List of all available memory ordering see \p opt::memory_model.
65 Default is \p cds::opt::v:relaxed_ordering
66 - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
67 By default, the elimination is disabled. For queue, the elimination is possible if the queue
70 template <typename... Options>
72 # ifdef CDS_DOXYGEN_INVOKED
73 typedef implementation_defined type ; ///< Metafunction result
75 typedef typename cds::opt::make_options<
76 typename cds::opt::find_type_traits< traits, Options... >::type
82 } // namespace fcqueue
84 /// Flat-combining queue
86 @ingroup cds_nonintrusive_queue
87 @ingroup cds_flat_combining_container
89 \ref cds_flat_combining_description "Flat combining" sequential queue.
90 The class can be considered as a concurrent FC-based wrapper for \p std::queue.
93 - \p T - a value type stored in the queue
94 - \p Queue - sequential queue implementation, default is \p std::queue<T>
95 - \p Trats - type traits of flat combining, default is \p fcqueue::traits.
96 \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization.
99 class Queue = std::queue<T>,
100 typename Traits = fcqueue::traits
103 #ifndef CDS_DOXYGEN_INVOKED
104 : public cds::algo::flat_combining::container
108 typedef T value_type; ///< Value type
109 typedef Queue queue_type; ///< Sequential queue class
110 typedef Traits traits; ///< Queue type traits
112 typedef typename traits::stat stat; ///< Internal statistics type
113 static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
117 /// Queue operation IDs
119 op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
120 op_enq_move, ///< Enqueue (move semantics)
126 /// Flat combining publication list record
127 struct fc_record: public cds::algo::flat_combining::publication_record
130 value_type const * pValEnq; ///< Value to enqueue
131 value_type * pValDeq; ///< Dequeue destination
133 bool bEmpty; ///< \p true if the queue is empty
137 /// Flat combining kernel
138 typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
142 fc_kernel m_FlatCombining;
147 /// Initializes empty queue object
151 /// Initializes empty queue object and gives flat combining parameters
153 unsigned int nCompactFactor ///< Flat combining: publication list compacting factor
154 ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
156 : m_FlatCombining( nCompactFactor, nCombinePassCount )
159 /// Inserts a new element at the end of the queue
161 The content of the new element initialized to a copy of \p val.
163 The function always returns \p true
165 bool enqueue( value_type const& val )
167 fc_record * pRec = m_FlatCombining.acquire_record();
168 pRec->pValEnq = &val;
170 if ( c_bEliminationEnabled )
171 m_FlatCombining.batch_combine( op_enq, pRec, *this );
173 m_FlatCombining.combine( op_enq, pRec, *this );
175 assert( pRec->is_done() );
176 m_FlatCombining.release_record( pRec );
177 m_FlatCombining.internal_statistics().onEnqueue();
181 /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
182 bool push( value_type const& val )
184 return enqueue( val );
187 /// Inserts a new element at the end of the queue (move semantics)
189 \p val is moved to inserted element
191 bool enqueue( value_type&& val )
193 fc_record * pRec = m_FlatCombining.acquire_record();
194 pRec->pValEnq = &val;
196 if ( c_bEliminationEnabled )
197 m_FlatCombining.batch_combine( op_enq_move, pRec, *this );
199 m_FlatCombining.combine( op_enq_move, pRec, *this );
201 assert( pRec->is_done() );
202 m_FlatCombining.release_record( pRec );
204 m_FlatCombining.internal_statistics().onEnqMove();
208 /// Inserts a new element at the end of the queue (move semantics, synonym for \p enqueue)
209 bool push( value_type&& val )
211 return enqueue( val );
214 /// Removes the next element from the queue
216 \p val takes a copy of the element
218 bool dequeue( value_type& val )
220 fc_record * pRec = m_FlatCombining.acquire_record();
221 pRec->pValDeq = &val;
223 if ( c_bEliminationEnabled )
224 m_FlatCombining.batch_combine( op_deq, pRec, *this );
226 m_FlatCombining.combine( op_deq, pRec, *this );
228 assert( pRec->is_done() );
229 m_FlatCombining.release_record( pRec );
231 m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
232 return !pRec->bEmpty;
235 /// Removes the next element from the queue (a synonym for \ref dequeue)
236 bool pop( value_type& val )
238 return dequeue( val );
244 fc_record * pRec = m_FlatCombining.acquire_record();
246 if ( c_bEliminationEnabled )
247 m_FlatCombining.batch_combine( op_clear, pRec, *this );
249 m_FlatCombining.combine( op_clear, pRec, *this );
251 assert( pRec->is_done() );
252 m_FlatCombining.release_record( pRec );
255 /// Returns the number of elements in the queue.
257 Note that <tt>size() == 0</tt> is not mean that the queue is empty because
258 combining record can be in process.
259 To check emptiness use \ref empty function.
263 return m_Queue.size();
266 /// Checks if the queue is empty
268 If the combining is in process the function waits while combining done.
272 fc_record * pRec = m_FlatCombining.acquire_record();
274 if ( c_bEliminationEnabled )
275 m_FlatCombining.batch_combine( op_empty, pRec, *this );
277 m_FlatCombining.combine( op_empty, pRec, *this );
279 assert( pRec->is_done() );
280 m_FlatCombining.release_record( pRec );
284 /// Internal statistics
285 stat const& statistics() const
287 return m_FlatCombining.statistics();
290 public: // flat combining cooperation, not for direct use!
292 /// Flat combining supporting function. Do not call it directly!
294 The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
295 object if the current thread becomes a combiner. Invocation of the function means that
296 the queue should perform an action recorded in \p pRec.
298 void fc_apply( fc_record * pRec )
302 // this function is called under FC mutex, so switch TSan off
303 CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
305 switch ( pRec->op() ) {
307 assert( pRec->pValEnq );
308 m_Queue.push( *(pRec->pValEnq ) );
311 assert( pRec->pValEnq );
312 m_Queue.push( std::move( *(pRec->pValEnq )) );
315 assert( pRec->pValDeq );
316 pRec->bEmpty = m_Queue.empty();
317 if ( !pRec->bEmpty ) {
318 *(pRec->pValDeq) = m_Queue.front();
323 while ( !m_Queue.empty() )
327 pRec->bEmpty = m_Queue.empty();
333 CDS_TSAN_ANNOTATE_IGNORE_RW_END;
336 /// Batch-processing flat combining
337 void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
339 typedef typename fc_kernel::iterator fc_iterator;
341 // this function is called under FC mutex, so switch TSan off
342 CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
344 for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
345 switch ( it->op() ) {
349 if ( m_Queue.empty() ) {
350 if ( itPrev != itEnd && collide( *itPrev, *it ))
358 CDS_TSAN_ANNOTATE_IGNORE_RW_END;
364 bool collide( fc_record& rec1, fc_record& rec2 )
366 switch ( rec1.op() ) {
368 if ( rec2.op() == op_deq ) {
369 assert(rec1.pValEnq);
370 assert(rec2.pValDeq);
371 *rec2.pValDeq = *rec1.pValEnq;
377 if ( rec2.op() == op_deq ) {
378 assert(rec1.pValEnq);
379 assert(rec2.pValDeq);
380 *rec2.pValDeq = std::move( *rec1.pValEnq );
386 switch ( rec2.op() ) {
389 return collide( rec2, rec1 );
395 m_FlatCombining.operation_done( rec1 );
396 m_FlatCombining.operation_done( rec2 );
397 m_FlatCombining.internal_statistics().onCollide();
403 }} // namespace cds::container
405 #endif // #ifndef CDSLIB_CONTAINER_FCQUEUE_H