2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_INTRUSIVE_FCQUEUE_H
32 #define CDSLIB_INTRUSIVE_FCQUEUE_H
34 #include <cds/algo/flat_combining.h>
35 #include <cds/algo/elimination_opt.h>
36 #include <cds/intrusive/options.h>
37 #include <boost/intrusive/list.hpp>
39 namespace cds { namespace intrusive {
41 /// \p FCQueue related definitions
44 /// \p FCQueue internal statistics
45 template <typename Counter = cds::atomicity::event_counter >
46 struct stat: public cds::algo::flat_combining::stat<Counter>
48 typedef cds::algo::flat_combining::stat<Counter> flat_combining_stat; ///< Flat-combining statistics
49 typedef typename flat_combining_stat::counter_type counter_type; ///< Counter type
51 counter_type m_nEnqueue ; ///< Count of push operations
52 counter_type m_nDequeue ; ///< Count of success pop operations
53 counter_type m_nFailedDeq ; ///< Count of failed pop operations (pop from empty queue)
54 counter_type m_nCollided ; ///< How many pairs of push/pop were collided, if elimination is enabled
57 void onEnqueue() { ++m_nEnqueue; }
58 void onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue; }
59 void onCollide() { ++m_nCollided; }
63 /// FCQueue dummy statistics, no overhead
64 struct empty_stat: public cds::algo::flat_combining::empty_stat
68 void onDequeue(bool) {}
73 /// \p FCQueue type traits
74 struct traits: public cds::algo::flat_combining::traits
76 typedef cds::intrusive::opt::v::empty_disposer disposer ; ///< Disposer to erase removed elements. Used only in \p FCQueue::clear() function
77 typedef empty_stat stat; ///< Internal statistics
78 static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
81 /// Metafunction converting option list to traits
84 - \p opt::lock_type - mutex type, default is \p cds::sync::spin
85 - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::Default
86 - \p opt::disposer - the functor used to dispose removed items. Default is \p opt::intrusive::v::empty_disposer.
87 This option is used only in \p FCQueue::clear() function.
88 - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
89 - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
90 - \p opt::memory_model - C++ memory ordering model.
91 List of all available memory ordering see \p opt::memory_model.
92 Default is \p cds::opt::v:relaxed_ordering
93 - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
94 By default, the elimination is disabled (\p false)
96 template <typename... Options>
98 # ifdef CDS_DOXYGEN_INVOKED
99 typedef implementation_defined type ; ///< Metafunction result
101 typedef typename cds::opt::make_options<
102 typename cds::opt::find_type_traits< traits, Options... >::type
107 } // namespace fcqueue
109 /// Flat-combining intrusive queue
111 @ingroup cds_intrusive_queue
112 @ingroup cds_flat_combining_intrusive
114 \ref cds_flat_combining_description "Flat combining" sequential intrusive queue.
117 - \p T - a value type stored in the queue
118 - \p Container - sequential intrusive container with \p push_back and \p pop_front functions.
119 Default is \p boost::intrusive::list
120 - \p Traits - type traits of flat combining, default is \p fcqueue::traits.
121 \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization
124 ,class Container = boost::intrusive::list<T>
125 ,typename Traits = fcqueue::traits
128 #ifndef CDS_DOXYGEN_INVOKED
129 : public cds::algo::flat_combining::container
133 typedef T value_type; ///< Value type
134 typedef Container container_type; ///< Sequential container type
135 typedef Traits traits; ///< Queue traits
137 typedef typename traits::disposer disposer; ///< The disposer functor. The disposer is used only in \ref clear() function
138 typedef typename traits::stat stat; ///< Internal statistics type
139 static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
143 /// Queue operation IDs
145 op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
148 op_clear_and_dispose ///< Clear and dispose
151 /// Flat combining publication list record
152 struct fc_record: public cds::algo::flat_combining::publication_record
154 value_type * pVal; ///< Value to enqueue or dequeue
155 bool bEmpty; ///< \p true if the queue is empty
159 /// Flat combining kernel
160 typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
164 fc_kernel m_FlatCombining;
165 container_type m_Queue;
169 /// Initializes empty queue object
173 /// Initializes empty queue object and gives flat combining parameters
175 unsigned int nCompactFactor ///< Flat combining: publication list compacting factor
176 ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
178 : m_FlatCombining( nCompactFactor, nCombinePassCount )
181 /// Inserts a new element at the end of the queue
183 The function always returns \p true.
185 bool enqueue( value_type& val )
187 fc_record * pRec = m_FlatCombining.acquire_record();
190 if ( c_bEliminationEnabled )
191 m_FlatCombining.batch_combine( op_enq, pRec, *this );
193 m_FlatCombining.combine( op_enq, pRec, *this );
195 assert( pRec->is_done() );
196 m_FlatCombining.release_record( pRec );
197 m_FlatCombining.internal_statistics().onEnqueue();
201 /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
202 bool push( value_type& val )
204 return enqueue( val );
207 /// Removes the next element from the queue
209 If the queue is empty the function returns \p nullptr
211 value_type * dequeue()
213 fc_record * pRec = m_FlatCombining.acquire_record();
214 pRec->pVal = nullptr;
216 if ( c_bEliminationEnabled )
217 m_FlatCombining.batch_combine( op_deq, pRec, *this );
219 m_FlatCombining.combine( op_deq, pRec, *this );
221 assert( pRec->is_done() );
222 m_FlatCombining.release_record( pRec );
224 m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
228 /// Removes the next element from the queue (a synonym for \ref dequeue)
236 If \p bDispose is \p true, the disposer provided in \p Traits class' template parameter
237 will be called for each removed element.
239 void clear( bool bDispose = false )
241 fc_record * pRec = m_FlatCombining.acquire_record();
243 if ( c_bEliminationEnabled )
244 m_FlatCombining.batch_combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
246 m_FlatCombining.combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
248 assert( pRec->is_done() );
249 m_FlatCombining.release_record( pRec );
252 /// Returns the number of elements in the queue.
254 Note that <tt>size() == 0</tt> is not mean that the queue is empty because
255 combining record can be in process.
256 To check emptiness use \ref empty function.
260 return m_Queue.size();
263 /// Checks if the queue is empty
265 If the combining is in process the function waits while it is done.
269 m_FlatCombining.wait_while_combining();
270 return m_Queue.empty();
273 /// Internal statistics
274 stat const& statistics() const
276 return m_FlatCombining.statistics();
279 public: // flat combining cooperation, not for direct use!
281 /// Flat combining supporting function. Do not call it directly!
283 The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
284 object if the current thread becomes a combiner. Invocation of the function means that
285 the queue should perform an action recorded in \p pRec.
287 void fc_apply( fc_record * pRec )
291 // this function is called under FC mutex, so switch TSan off
292 // All TSan warnings are false positive
293 CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
295 switch ( pRec->op() ) {
297 assert( pRec->pVal );
298 m_Queue.push_back( *(pRec->pVal ) );
301 pRec->bEmpty = m_Queue.empty();
302 if ( !pRec->bEmpty ) {
303 pRec->pVal = &m_Queue.front();
310 case op_clear_and_dispose:
311 m_Queue.clear_and_dispose( disposer() );
317 CDS_TSAN_ANNOTATE_IGNORE_RW_END;
320 /// Batch-processing flat combining
321 void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
323 // this function is called under FC mutex, so switch TSan off
324 // All TSan warnings are false positive
325 CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
327 typedef typename fc_kernel::iterator fc_iterator;
328 for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
329 switch ( it->op() ) {
332 if ( m_Queue.empty() ) {
333 if ( itPrev != itEnd && collide( *itPrev, *it ))
341 CDS_TSAN_ANNOTATE_IGNORE_RW_END;
347 bool collide( fc_record& rec1, fc_record& rec2 )
349 assert( m_Queue.empty() );
351 switch ( rec1.op() ) {
353 if ( rec2.op() == op_deq ) {
355 rec2.pVal = rec1.pVal;
357 m_FlatCombining.operation_done( rec1 );
358 m_FlatCombining.operation_done( rec2 );
359 m_FlatCombining.internal_statistics().onCollide();
364 if ( rec2.op() == op_enq ) {
366 rec1.pVal = rec2.pVal;
368 m_FlatCombining.operation_done( rec1 );
369 m_FlatCombining.operation_done( rec2 );
370 m_FlatCombining.internal_statistics().onCollide();
380 }} // namespace cds::intrusive
382 #endif // #ifndef CDSLIB_INTRUSIVE_FCQUEUE_H