-//$$CDS-header$$
+/*
+ This file is a part of libcds - Concurrent Data Structures library
-#ifndef __CDS_CONTAINER_FCQUEUE_H
-#define __CDS_CONTAINER_FCQUEUE_H
+ (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
+
+ Source code repo: http://github.com/khizmax/libcds/
+ Download: http://sourceforge.net/projects/libcds/files/
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifndef CDSLIB_CONTAINER_FCQUEUE_H
+#define CDSLIB_CONTAINER_FCQUEUE_H
#include <cds/algo/flat_combining.h>
#include <cds/algo/elimination_opt.h>
};
/// FCQueue type traits
- struct type_traits: public cds::algo::flat_combining::type_traits
+ struct traits: public cds::algo::flat_combining::traits
{
typedef empty_stat stat; ///< Internal statistics
- static CDS_CONSTEXPR_CONST bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
+ static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
};
/// Metafunction converting option list to traits
/**
- This is a wrapper for <tt> cds::opt::make_options< type_traits, Options...> </tt>
\p Options are:
- - \p opt::lock_type - mutex type, default is \p cds::lock::Spin
- - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::Default
- - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
- - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
- - \p opt::memory_model - C++ memory ordering model.
- List of all available memory ordering see opt::memory_model.
- Default if cds::opt::v:relaxed_ordering
+ - any \p cds::algo::flat_combining::make_traits options
+ - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
- \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
By default, the elimination is disabled. For queue, the elimination is possible if the queue
is empty.
*/
- template <CDS_DECL_OPTIONS8>
+ template <typename... Options>
struct make_traits {
# ifdef CDS_DOXYGEN_INVOKED
typedef implementation_defined type ; ///< Metafunction result
# else
typedef typename cds::opt::make_options<
- typename cds::opt::find_type_traits< type_traits, CDS_OPTIONS8 >::type
- ,CDS_OPTIONS8
+ typename cds::opt::find_type_traits< traits, Options... >::type
+ ,Options...
>::type type;
# endif
};
Template parameters:
- \p T - a value type stored in the queue
- \p Queue - sequential queue implementation, default is \p std::queue<T>
- - \p Trats - type traits of flat combining, default is \p fcqueue::type_traits.
- \p fcqueue::make_traits metafunction can be used to construct specialized \p %type_traits
+ - \p Trats - type traits of flat combining, default is \p fcqueue::traits.
+ \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization.
*/
template <typename T,
class Queue = std::queue<T>,
- typename Traits = fcqueue::type_traits
+ typename Traits = fcqueue::traits
>
class FCQueue
#ifndef CDS_DOXYGEN_INVOKED
public:
typedef T value_type; ///< Value type
typedef Queue queue_type; ///< Sequential queue class
- typedef Traits type_traits; ///< Queue type traits
+ typedef Traits traits; ///< Queue type traits
- typedef typename type_traits::stat stat; ///< Internal statistics type
- static CDS_CONSTEXPR_CONST bool c_bEliminationEnabled = type_traits::enable_elimination; ///< \p true if elimination is enabled
+ typedef typename traits::stat stat; ///< Internal statistics type
+ static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
protected:
//@cond
/// Queue operation IDs
enum fc_operation {
op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
- op_enq_move, ///< Enqueue (move semantics)
+ op_enq_move, ///< Enqueue (move semantics)
op_deq, ///< Dequeue
op_clear ///< Clear
};
//@endcond
/// Flat combining kernel
- typedef cds::algo::flat_combining::kernel< fc_record, type_traits > fc_kernel;
+ typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
protected:
//@cond
- fc_kernel m_FlatCombining;
- queue_type m_Queue;
+ mutable fc_kernel m_FlatCombining;
+ queue_type m_Queue;
//@endcond
public:
*/
bool enqueue( value_type const& val )
{
- fc_record * pRec = m_FlatCombining.acquire_record();
+ auto pRec = m_FlatCombining.acquire_record();
pRec->pValEnq = &val;
if ( c_bEliminationEnabled )
else
m_FlatCombining.combine( op_enq, pRec, *this );
- assert( pRec->is_done() );
+ assert( pRec->is_done());
m_FlatCombining.release_record( pRec );
m_FlatCombining.internal_statistics().onEnqueue();
return true;
return enqueue( val );
}
-# ifdef CDS_MOVE_SEMANTICS_SUPPORT
/// Inserts a new element at the end of the queue (move semantics)
/**
\p val is moved to inserted element
*/
bool enqueue( value_type&& val )
{
- fc_record * pRec = m_FlatCombining.acquire_record();
+ auto pRec = m_FlatCombining.acquire_record();
pRec->pValEnq = &val;
if ( c_bEliminationEnabled )
else
m_FlatCombining.combine( op_enq_move, pRec, *this );
- assert( pRec->is_done() );
+ assert( pRec->is_done());
m_FlatCombining.release_record( pRec );
m_FlatCombining.internal_statistics().onEnqMove();
{
return enqueue( val );
}
-# endif
/// Removes the next element from the queue
/**
*/
bool dequeue( value_type& val )
{
- fc_record * pRec = m_FlatCombining.acquire_record();
+ auto pRec = m_FlatCombining.acquire_record();
pRec->pValDeq = &val;
if ( c_bEliminationEnabled )
else
m_FlatCombining.combine( op_deq, pRec, *this );
- assert( pRec->is_done() );
+ assert( pRec->is_done());
m_FlatCombining.release_record( pRec );
m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
/// Clears the queue
void clear()
{
- fc_record * pRec = m_FlatCombining.acquire_record();
+ auto pRec = m_FlatCombining.acquire_record();
if ( c_bEliminationEnabled )
m_FlatCombining.batch_combine( op_clear, pRec, *this );
else
m_FlatCombining.combine( op_clear, pRec, *this );
- assert( pRec->is_done() );
+ assert( pRec->is_done());
m_FlatCombining.release_record( pRec );
}
*/
bool empty() const
{
- m_FlatCombining.wait_while_combining();
- return m_Queue.empty();
+ bool bRet = false;
+ auto const& queue = m_Queue;
+ m_FlatCombining.invoke_exclusive( [&queue, &bRet]() { bRet = queue.empty(); } );
+ return bRet;
}
/// Internal statistics
{
assert( pRec );
- switch ( pRec->op() ) {
+ // this function is called under FC mutex, so switch TSan off
+ CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
+
+ switch ( pRec->op()) {
case op_enq:
assert( pRec->pValEnq );
- m_Queue.push( *(pRec->pValEnq ) );
+ m_Queue.push( *(pRec->pValEnq ));
break;
-# ifdef CDS_MOVE_SEMANTICS_SUPPORT
case op_enq_move:
assert( pRec->pValEnq );
- m_Queue.push( std::move( *(pRec->pValEnq )) );
+ m_Queue.push( std::move( *(pRec->pValEnq )));
break;
-# endif
case op_deq:
assert( pRec->pValDeq );
pRec->bEmpty = m_Queue.empty();
if ( !pRec->bEmpty ) {
- *(pRec->pValDeq) = m_Queue.front();
+ *(pRec->pValDeq) = std::move( m_Queue.front());
m_Queue.pop();
}
break;
case op_clear:
- while ( !m_Queue.empty() )
+ while ( !m_Queue.empty())
m_Queue.pop();
break;
default:
assert(false);
break;
}
+ CDS_TSAN_ANNOTATE_IGNORE_RW_END;
}
/// Batch-processing flat combining
void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
{
typedef typename fc_kernel::iterator fc_iterator;
+
+ // this function is called under FC mutex, so switch TSan off
+ CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
+
for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
- switch ( it->op() ) {
+ switch ( it->op()) {
case op_enq:
case op_enq_move:
case op_deq:
- if ( m_Queue.empty() ) {
+ if ( m_Queue.empty()) {
if ( itPrev != itEnd && collide( *itPrev, *it ))
itPrev = itEnd;
else
break;
}
}
+ CDS_TSAN_ANNOTATE_IGNORE_RW_END;
}
//@endcond
//@cond
bool collide( fc_record& rec1, fc_record& rec2 )
{
- switch ( rec1.op() ) {
+ switch ( rec1.op()) {
case op_enq:
if ( rec2.op() == op_deq ) {
assert(rec1.pValEnq);
goto collided;
}
break;
-# ifdef CDS_MOVE_SEMANTICS_SUPPORT
case op_enq_move:
if ( rec2.op() == op_deq ) {
assert(rec1.pValEnq);
goto collided;
}
break;
-# endif
case op_deq:
- switch ( rec2.op() ) {
+ switch ( rec2.op()) {
case op_enq:
-# ifdef CDS_MOVE_SEMANTICS_SUPPORT
case op_enq_move:
-# endif
return collide( rec2, rec1 );
}
}
};
}} // namespace cds::container
-#endif // #ifndef __CDS_CONTAINER_FCQUEUE_H
+#endif // #ifndef CDSLIB_CONTAINER_FCQUEUE_H