3 #ifndef __CDS_INTRUSIVE_FCQUEUE_H
4 #define __CDS_INTRUSIVE_FCQUEUE_H
6 #include <cds/algo/flat_combining.h>
7 #include <cds/algo/elimination_opt.h>
8 #include <cds/intrusive/options.h>
9 #include <boost/intrusive/list.hpp>
11 namespace cds { namespace intrusive {
13 /// FCQueue related definitions
16 /// FCQueue internal statistics
17 template <typename Counter = cds::atomicity::event_counter >
18 struct stat: public cds::algo::flat_combining::stat<Counter>
20 typedef cds::algo::flat_combining::stat<Counter> flat_combining_stat; ///< Flat-combining statistics
21 typedef typename flat_combining_stat::counter_type counter_type; ///< Counter type
23 counter_type m_nEnqueue ; ///< Count of push operations
24 counter_type m_nDequeue ; ///< Count of success pop operations
25 counter_type m_nFailedDeq ; ///< Count of failed pop operations (pop from empty queue)
26 counter_type m_nCollided ; ///< How many pairs of push/pop were collided, if elimination is enabled
29 void onEnqueue() { ++m_nEnqueue; }
30 void onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue; }
31 void onCollide() { ++m_nCollided; }
35 /// FCQueue dummy statistics, no overhead
36 struct empty_stat: public cds::algo::flat_combining::empty_stat
40 void onDequeue(bool) {}
45 /// FCQueue type traits
46 struct traits: public cds::algo::flat_combining::type_traits
48 typedef cds::intrusive::opt::v::empty_disposer disposer ; ///< Disposer to erase removed elements. Used only in \p FCQueue::clear() function
49 typedef empty_stat stat; ///< Internal statistics
50 static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
53 /// Metafunction converting option list to traits
56 - \p opt::lock_type - mutex type, default is \p cds::lock::Spin
57 - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::Default
58 - \p opt::disposer - the functor used for dispose removed items. Default is \p opt::intrusive::v::empty_disposer.
59 This option is used only in \p FCQueue::clear() function.
60 - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
61 - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
62 - \p opt::memory_model - C++ memory ordering model.
63 List of all available memory ordering see \p opt::memory_model.
64 Default is \p cds::opt::v:relaxed_ordering
65 - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
66 By default, the elimination is disabled.
68 template <typename... Options>
70 # ifdef CDS_DOXYGEN_INVOKED
71 typedef implementation_defined type ; ///< Metafunction result
73 typedef typename cds::opt::make_options<
74 typename cds::opt::find_type_traits< traits, Options... >::type
79 } // namespace fcqueue
81 /// Flat-combining intrusive queue
83 @ingroup cds_intrusive_queue
84 @ingroup cds_flat_combining_intrusive
86 \ref cds_flat_combining_description "Flat combining" sequential intrusive queue.
89 - \p T - a value type stored in the queue
90 - \p Container - sequential intrusive container with \p push_back and \p pop_front functions.
91 Default is \p boost::intrusive::list
92 - \p Traits - type traits of flat combining, default is \p fcqueue::traits.
93 \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization
96 ,class Container = boost::intrusive::list<T>
97 ,typename Traits = fcqueue::traits
100 #ifndef CDS_DOXYGEN_INVOKED
101 : public cds::algo::flat_combining::container
105 typedef T value_type; ///< Value type
106 typedef Container container_type; ///< Sequential container type
107 typedef Traits traits; ///< Queue traits
109 typedef typename traits::disposer disposer; ///< The disposer functor. The disposer is used only in \ref clear() function
110 typedef typename traits::stat stat; ///< Internal statistics type
111 static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
115 /// Queue operation IDs
117 op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
120 op_clear_and_dispose ///< Clear and dispose
123 /// Flat combining publication list record
124 struct fc_record: public cds::algo::flat_combining::publication_record
126 value_type * pVal; ///< Value to enqueue or dequeue
127 bool bEmpty; ///< \p true if the queue is empty
131 /// Flat combining kernel
132 typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
136 fc_kernel m_FlatCombining;
137 container_type m_Queue;
141 /// Initializes empty queue object
145 /// Initializes empty queue object and gives flat combining parameters
147 unsigned int nCompactFactor ///< Flat combining: publication list compacting factor
148 ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
150 : m_FlatCombining( nCompactFactor, nCombinePassCount )
153 /// Inserts a new element at the end of the queue
155 The function always returns \p true.
157 bool enqueue( value_type& val )
159 fc_record * pRec = m_FlatCombining.acquire_record();
162 if ( c_bEliminationEnabled )
163 m_FlatCombining.batch_combine( op_enq, pRec, *this );
165 m_FlatCombining.combine( op_enq, pRec, *this );
167 assert( pRec->is_done() );
168 m_FlatCombining.release_record( pRec );
169 m_FlatCombining.internal_statistics().onEnqueue();
173 /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
174 bool push( value_type& val )
176 return enqueue( val );
179 /// Removes the next element from the queue
181 If the queue is empty the function returns \p nullptr
183 value_type * dequeue()
185 fc_record * pRec = m_FlatCombining.acquire_record();
186 pRec->pVal = nullptr;
188 if ( c_bEliminationEnabled )
189 m_FlatCombining.batch_combine( op_deq, pRec, *this );
191 m_FlatCombining.combine( op_deq, pRec, *this );
193 assert( pRec->is_done() );
194 m_FlatCombining.release_record( pRec );
196 m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
200 /// Removes the next element from the queue (a synonym for \ref dequeue)
208 If \p bDispose is \p true, the disposer provided in \p Traits class' template parameter
209 will be called for each removed element.
211 void clear( bool bDispose = false )
213 fc_record * pRec = m_FlatCombining.acquire_record();
215 if ( c_bEliminationEnabled )
216 m_FlatCombining.batch_combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
218 m_FlatCombining.combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
220 assert( pRec->is_done() );
221 m_FlatCombining.release_record( pRec );
224 /// Returns the number of elements in the queue.
226 Note that <tt>size() == 0</tt> is not mean that the queue is empty because
227 combining record can be in process.
228 To check emptiness use \ref empty function.
232 return m_Queue.size();
235 /// Checks if the queue is empty
237 If the combining is in process the function waits while it is done.
241 m_FlatCombining.wait_while_combining();
242 return m_Queue.empty();
245 /// Internal statistics
246 stat const& statistics() const
248 return m_FlatCombining.statistics();
251 public: // flat combining cooperation, not for direct use!
253 /// Flat combining supporting function. Do not call it directly!
255 The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
256 object if the current thread becomes a combiner. Invocation of the function means that
257 the queue should perform an action recorded in \p pRec.
259 void fc_apply( fc_record * pRec )
263 switch ( pRec->op() ) {
265 assert( pRec->pVal );
266 m_Queue.push_back( *(pRec->pVal ) );
269 pRec->bEmpty = m_Queue.empty();
270 if ( !pRec->bEmpty ) {
271 pRec->pVal = &m_Queue.front();
278 case op_clear_and_dispose:
279 m_Queue.clear_and_dispose( disposer() );
287 /// Batch-processing flat combining
288 void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
290 typedef typename fc_kernel::iterator fc_iterator;
291 for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
292 switch ( it->op() ) {
295 if ( m_Queue.empty() ) {
296 if ( itPrev != itEnd && collide( *itPrev, *it ))
309 bool collide( fc_record& rec1, fc_record& rec2 )
311 assert( m_Queue.empty() );
313 switch ( rec1.op() ) {
315 if ( rec2.op() == op_deq ) {
317 rec2.pVal = rec1.pVal;
319 m_FlatCombining.operation_done( rec1 );
320 m_FlatCombining.operation_done( rec2 );
321 m_FlatCombining.internal_statistics().onCollide();
326 if ( rec2.op() == op_enq ) {
328 rec1.pVal = rec2.pVal;
330 m_FlatCombining.operation_done( rec1 );
331 m_FlatCombining.operation_done( rec2 );
332 m_FlatCombining.internal_statistics().onCollide();
342 }} // namespace cds::intrusive
344 #endif // #ifndef __CDS_INTRUSIVE_FCQUEUE_H