2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_INTRUSIVE_FCQUEUE_H
32 #define CDSLIB_INTRUSIVE_FCQUEUE_H
34 #include <cds/algo/flat_combining.h>
35 #include <cds/algo/elimination_opt.h>
36 #include <cds/intrusive/options.h>
37 #include <boost/intrusive/list.hpp>
39 namespace cds { namespace intrusive {
41 /// \p FCQueue related definitions
44 /// \p FCQueue internal statistics
45 template <typename Counter = cds::atomicity::event_counter >
46 struct stat: public cds::algo::flat_combining::stat<Counter>
48 typedef cds::algo::flat_combining::stat<Counter> flat_combining_stat; ///< Flat-combining statistics
49 typedef typename flat_combining_stat::counter_type counter_type; ///< Counter type
51 counter_type m_nEnqueue ; ///< Count of push operations
52 counter_type m_nDequeue ; ///< Count of success pop operations
53 counter_type m_nFailedDeq ; ///< Count of failed pop operations (pop from empty queue)
54 counter_type m_nCollided ; ///< How many pairs of push/pop were collided, if elimination is enabled
57 void onEnqueue() { ++m_nEnqueue; }
58 void onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue; }
59 void onCollide() { ++m_nCollided; }
63 /// FCQueue dummy statistics, no overhead
64 struct empty_stat: public cds::algo::flat_combining::empty_stat
68 void onDequeue(bool) {}
73 /// \p FCQueue type traits
74 struct traits: public cds::algo::flat_combining::traits
76 typedef cds::intrusive::opt::v::empty_disposer disposer ; ///< Disposer to erase removed elements. Used only in \p FCQueue::clear() function
77 typedef empty_stat stat; ///< Internal statistics
78 static constexpr const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
81 /// Metafunction converting option list to traits
84 - any \p cds::algo::flat_combining::make_traits options
85 - \p opt::disposer - the functor used to dispose removed items. Default is \p opt::intrusive::v::empty_disposer.
86 This option is used only in \p FCQueue::clear() function.
87 - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
88 - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
89 By default, the elimination is disabled (\p false)
91 template <typename... Options>
93 # ifdef CDS_DOXYGEN_INVOKED
94 typedef implementation_defined type ; ///< Metafunction result
96 typedef typename cds::opt::make_options<
97 typename cds::opt::find_type_traits< traits, Options... >::type
102 } // namespace fcqueue
104 /// Flat-combining intrusive queue
106 @ingroup cds_intrusive_queue
107 @ingroup cds_flat_combining_intrusive
109 \ref cds_flat_combining_description "Flat combining" sequential intrusive queue.
112 - \p T - a value type stored in the queue
113 - \p Container - sequential intrusive container with \p push_back and \p pop_front functions.
114 Default is \p boost::intrusive::list
115 - \p Traits - type traits of flat combining, default is \p fcqueue::traits.
116 \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization
119 ,class Container = boost::intrusive::list<T>
120 ,typename Traits = fcqueue::traits
123 #ifndef CDS_DOXYGEN_INVOKED
124 : public cds::algo::flat_combining::container
128 typedef T value_type; ///< Value type
129 typedef Container container_type; ///< Sequential container type
130 typedef Traits traits; ///< Queue traits
132 typedef typename traits::disposer disposer; ///< The disposer functor. The disposer is used only in \ref clear() function
133 typedef typename traits::stat stat; ///< Internal statistics type
134 static constexpr const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
138 /// Queue operation IDs
140 op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
143 op_clear_and_dispose ///< Clear and dispose
146 /// Flat combining publication list record
147 struct fc_record: public cds::algo::flat_combining::publication_record
149 value_type * pVal; ///< Value to enqueue or dequeue
150 bool bEmpty; ///< \p true if the queue is empty
154 /// Flat combining kernel
155 typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
159 mutable fc_kernel m_FlatCombining;
160 container_type m_Queue;
164 /// Initializes empty queue object
168 /// Initializes empty queue object and gives flat combining parameters
170 unsigned int nCompactFactor ///< Flat combining: publication list compacting factor
171 ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
173 : m_FlatCombining( nCompactFactor, nCombinePassCount )
176 /// Inserts a new element at the end of the queue
178 The function always returns \p true.
180 bool enqueue( value_type& val )
182 auto pRec = m_FlatCombining.acquire_record();
185 constexpr_if ( c_bEliminationEnabled )
186 m_FlatCombining.batch_combine( op_enq, pRec, *this );
188 m_FlatCombining.combine( op_enq, pRec, *this );
190 assert( pRec->is_done());
191 m_FlatCombining.release_record( pRec );
192 m_FlatCombining.internal_statistics().onEnqueue();
196 /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
197 bool push( value_type& val )
199 return enqueue( val );
202 /// Removes the next element from the queue
204 If the queue is empty the function returns \p nullptr
206 value_type * dequeue()
208 auto pRec = m_FlatCombining.acquire_record();
209 pRec->pVal = nullptr;
211 constexpr_if ( c_bEliminationEnabled )
212 m_FlatCombining.batch_combine( op_deq, pRec, *this );
214 m_FlatCombining.combine( op_deq, pRec, *this );
216 assert( pRec->is_done());
217 m_FlatCombining.release_record( pRec );
219 m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
223 /// Removes the next element from the queue (a synonym for \ref dequeue)
231 If \p bDispose is \p true, the disposer provided in \p Traits class' template parameter
232 will be called for each removed element.
234 void clear( bool bDispose = false )
236 auto pRec = m_FlatCombining.acquire_record();
238 constexpr_if ( c_bEliminationEnabled )
239 m_FlatCombining.batch_combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
241 m_FlatCombining.combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
243 assert( pRec->is_done());
244 m_FlatCombining.release_record( pRec );
247 /// Exclusive access to underlying queue object
249 The functor \p f can do any operation with underlying \p container_type in exclusive mode.
250 For example, you can iterate over the queue.
251 \p Func signature is:
253 void f( container_type& queue );
256 template <typename Func>
259 auto& queue = m_Queue;
260 m_FlatCombining.invoke_exclusive( [&queue, &f]() { f( queue ); } );
263 /// Exclusive access to underlying queue object
265 The functor \p f can do any operation with underlying \p container_type in exclusive mode.
266 For example, you can iterate over the queue.
267 \p Func signature is:
269 void f( container_type const& queue );
272 template <typename Func>
273 void apply( Func f ) const
275 auto const& queue = m_Queue;
276 m_FlatCombining.invoke_exclusive( [&queue, &f]() { f( queue ); } );
279 /// Returns the number of elements in the queue.
281 Note that <tt>size() == 0</tt> is not mean that the queue is empty because
282 combining record can be in process.
283 To check emptiness use \ref empty function.
287 return m_Queue.size();
290 /// Checks if the queue is empty
292 If the combining is in process the function waits while it is done.
297 auto const& queue = m_Queue;
298 m_FlatCombining.invoke_exclusive([&queue, &bRet]() { bRet = queue.empty(); });
302 /// Internal statistics
303 stat const& statistics() const
305 return m_FlatCombining.statistics();
308 public: // flat combining cooperation, not for direct use!
310 /// Flat combining supporting function. Do not call it directly!
312 The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
313 object if the current thread becomes a combiner. Invocation of the function means that
314 the queue should perform an action recorded in \p pRec.
316 void fc_apply( fc_record * pRec )
320 // this function is called under FC mutex, so switch TSan off
321 // All TSan warnings are false positive
322 //CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
324 switch ( pRec->op()) {
326 assert( pRec->pVal );
327 m_Queue.push_back( *(pRec->pVal ));
330 pRec->bEmpty = m_Queue.empty();
331 if ( !pRec->bEmpty ) {
332 pRec->pVal = &m_Queue.front();
339 case op_clear_and_dispose:
340 m_Queue.clear_and_dispose( disposer());
346 //CDS_TSAN_ANNOTATE_IGNORE_RW_END;
349 /// Batch-processing flat combining
350 void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
352 // this function is called under FC mutex, so switch TSan off
353 // All TSan warnings are false positive
354 //CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
356 typedef typename fc_kernel::iterator fc_iterator;
357 for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
358 switch ( it->op( atomics::memory_order_acquire )) {
361 if ( m_Queue.empty()) {
362 if ( itPrev != itEnd && collide( *itPrev, *it ))
370 //CDS_TSAN_ANNOTATE_IGNORE_RW_END;
376 bool collide( fc_record& rec1, fc_record& rec2 )
378 assert( m_Queue.empty());
380 switch ( rec1.op()) {
382 if ( rec2.op() == op_deq ) {
384 rec2.pVal = rec1.pVal;
386 m_FlatCombining.operation_done( rec1 );
387 m_FlatCombining.operation_done( rec2 );
388 m_FlatCombining.internal_statistics().onCollide();
393 if ( rec2.op() == op_enq ) {
395 rec1.pVal = rec2.pVal;
397 m_FlatCombining.operation_done( rec1 );
398 m_FlatCombining.operation_done( rec2 );
399 m_FlatCombining.internal_statistics().onCollide();
409 }} // namespace cds::intrusive
411 #endif // #ifndef CDSLIB_INTRUSIVE_FCQUEUE_H