typedef cds_test::stress_fixture base_class;
protected:
- template <class PQueue>
- class Producer: public cds_test::thread
- {
- typedef cds_test::thread base_class;
-
- public:
- Producer( cds_test::thread_pool& pool, PQueue& queue )
- : base_class( pool )
- , m_Queue( queue )
- {}
-
- Producer( Producer& src )
- : base_class( src )
- , m_Queue( src.m_Queue )
- {}
-
- virtual thread * clone()
- {
- return new Producer( *this );
- }
-
- virtual void test()
- {
- typedef typename PQueue::value_type value_type;
- for ( array_type::const_iterator it = m_arr.begin(); it != m_arr.end(); ++it ) {
- if ( !m_Queue.push( value_type( *it ) ))
- ++m_nPushError;
- }
- }
-
- void prepare( size_t nStart, size_t nEnd )
- {
- m_arr.reserve( nEnd - nStart );
- for ( size_t i = nStart; i < nEnd; ++i )
- m_arr.push_back( i );
- shuffle( m_arr.begin(), m_arr.end() );
- }
-
- public:
- PQueue& m_Queue;
- size_t m_nPushError = 0;
-
- typedef std::vector<size_t> array_type;
- array_type m_arr;
- };
-
template <class PQueue>
class Consumer: public cds_test::thread
{
template <class PQueue>
void test( PQueue& q )
{
- size_t const nThreadItemCount = s_nQueueSize / s_nThreadCount;
- s_nQueueSize = nThreadItemCount * s_nThreadCount;
-
cds_test::thread_pool& pool = get_pool();
propout() << std::make_pair( "thread_count", s_nThreadCount )
// push
{
- pool.add( new Producer<PQueue>( pool, q ), s_nThreadCount );
+ std::vector< size_t > arr;
+ arr.reserve( s_nQueueSize );
+ for ( size_t i = 0; i < s_nQueueSize; ++i )
+ arr.push_back( i );
+ shuffle( arr.begin(), arr.end() );
- size_t nStart = 0;
- for ( size_t i = 0; i < pool.size(); ++i ) {
- static_cast<Producer<PQueue>&>( pool.get(i) ).prepare( nStart, nStart + nThreadItemCount );
- nStart += nThreadItemCount;
- }
-
- std::chrono::milliseconds duration = pool.run();
- propout() << std::make_pair( "producer_duration", duration );
+ typedef typename PQueue::value_type value_type;
+ for ( auto it = arr.begin(); it != arr.end(); ++it )
+ q.push( value_type( *it ));
}
// pop
{
- pool.clear();
pool.add( new Consumer<PQueue>( pool, q ), s_nThreadCount );
std::chrono::milliseconds duration = pool.run();
TEST_F( fixture_t, pqueue_t ) \
{ \
typedef pqueue::Types<pqueue::simple_value>::pqueue_t pqueue_type; \
- pqueue_type pq( s_nQueueSize ); \
+ pqueue_type pq( s_nQueueSize + 1 ); \
test( pq ); \
}
CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_dyn_less )