2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "queue_type.h"
36 // Multi-threaded queue push/pop test
39 static size_t s_nConsumerThreadCount = 4;
40 static size_t s_nProducerThreadCount = 4;
41 static size_t s_nQueueSize = 4000000;
43 static std::atomic<size_t> s_nProducerDone( 0 );
51 template<class Value = old_value>
52 class queue_push_pop: public cds_test::stress_fixture
55 using value_type = Value;
62 template <class Queue>
63 class Producer: public cds_test::thread
65 typedef cds_test::thread base_class;
68 Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
69 : base_class( pool, producer_thread )
72 , m_nPushCount( nPushCount )
75 Producer( Producer& src )
77 , m_Queue( src.m_Queue )
79 , m_nPushCount( src.m_nPushCount )
82 virtual thread * clone()
84 return new Producer( *this );
89 size_t const nPushCount = m_nPushCount;
95 while ( v.nNo < nPushCount ) {
96 if ( m_Queue.push( v ))
102 s_nProducerDone.fetch_add( 1 );
107 size_t m_nPushFailed;
108 size_t const m_nPushCount;
111 template <class Queue>
112 class Consumer: public cds_test::thread
114 typedef cds_test::thread base_class;
118 size_t const m_nPushPerProducer;
123 typedef std::vector<size_t> popped_data;
124 typedef std::vector<size_t>::iterator data_iterator;
125 typedef std::vector<size_t>::const_iterator const_data_iterator;
127 std::vector<popped_data> m_WriterData;
130 void initPoppedData()
132 const size_t nProducerCount = s_nProducerThreadCount;
133 m_WriterData.resize( nProducerCount );
134 for ( size_t i = 0; i < nProducerCount; ++i )
135 m_WriterData[i].reserve( m_nPushPerProducer );
139 Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
140 : base_class( pool, consumer_thread )
142 , m_nPushPerProducer( nPushPerProducer )
149 Consumer( Consumer& src )
151 , m_Queue( src.m_Queue )
152 , m_nPushPerProducer( src.m_nPushPerProducer )
160 virtual thread * clone()
162 return new Consumer( *this );
170 const size_t nTotalWriters = s_nProducerThreadCount;
173 if ( m_Queue.pop( v )) {
175 if ( v.nWriterNo < nTotalWriters )
176 m_WriterData[ v.nWriterNo ].push_back( v.nNo );
183 if ( s_nProducerDone.load() >= nTotalWriters ) {
184 if ( m_Queue.empty())
193 size_t m_nThreadPushCount;
196 template <class Queue>
197 void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
199 cds_test::thread_pool& pool = get_pool();
201 typedef Consumer<Queue> Consumer;
202 typedef Producer<Queue> Producer;
204 size_t nPostTestPops = 0;
211 size_t nTotalPops = 0;
212 size_t nPopFalse = 0;
213 size_t nPoppedItems = 0;
214 size_t nPushFailed = 0;
216 std::vector< Consumer * > arrConsumer;
218 for ( size_t i = 0; i < pool.size(); ++i ) {
219 cds_test::thread& thr = pool.get(i);
220 if ( thr.type() == consumer_thread ) {
221 Consumer& consumer = static_cast<Consumer&>( thr );
222 nTotalPops += consumer.m_nPopped;
223 nPopFalse += consumer.m_nPopEmpty;
224 arrConsumer.push_back( &consumer );
225 EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer_thread_no " << i;
228 for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
229 nPopped += consumer.m_WriterData[n].size();
231 nPoppedItems += nPopped;
234 assert( thr.type() == producer_thread );
236 Producer& producer = static_cast<Producer&>( thr );
237 nPushFailed += producer.m_nPushFailed;
238 EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer_thread_no " << i;
241 EXPECT_EQ( nTotalPops, nPoppedItems );
243 EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
244 EXPECT_TRUE( q.empty());
246 // Test consistency of popped sequence
247 for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
248 std::vector<size_t> arrData;
249 arrData.reserve( m_nThreadPushCount );
250 for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
251 auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
252 auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
255 for ( ++it; it != itEnd; ++it ) {
256 EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
261 for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
262 arrData.push_back( *it );
265 std::sort( arrData.begin(), arrData.end());
266 for ( size_t i=1; i < arrData.size(); ++i ) {
267 EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
270 EXPECT_EQ( arrData[0], 0u ) << "producer=" << nWriter;
271 EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
275 template <class Queue>
276 void test_queue( Queue& q )
278 m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
280 cds_test::thread_pool& pool = get_pool();
281 pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
282 pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
284 s_nProducerDone.store( 0 );
285 s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
287 propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
288 << std::make_pair( "consumer_count", s_nConsumerThreadCount )
289 << std::make_pair( "push_count", s_nQueueSize );
291 std::chrono::milliseconds duration = pool.run();
293 propout() << std::make_pair( "duration", duration );
296 template <class Queue>
297 void test( Queue& q )
301 propout() << q.statistics();
305 static void SetUpTestCase()
307 cds_test::config const& cfg = get_config( "queue_push_pop" );
309 s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
310 s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
311 s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
313 if ( s_nConsumerThreadCount == 0u )
314 s_nConsumerThreadCount = 1;
315 if ( s_nProducerThreadCount == 0u )
316 s_nProducerThreadCount = 1;
317 if ( s_nQueueSize == 0u )
321 //static void TearDownTestCase();
324 using value_for_fc_with_heavy_value = queue_push_pop< HeavyValue<10> >;
325 using old_queue_push_pop = queue_push_pop<>;
327 // CDSSTRESS_MSQueue( old_queue_push_pop )
328 // CDSSTRESS_MoirQueue( old_queue_push_pop )
329 // CDSSTRESS_BasketQueue( old_queue_push_pop )
330 // CDSSTRESS_OptimsticQueue( old_queue_push_pop )
331 // CDSSTRESS_FCQueue( old_queue_push_pop )
332 CDSSTRESS_FCDeque_HeavyValue( value_for_fc_with_heavy_value )
333 // CDSSTRESS_FCDeque( old_queue_push_pop )
334 // CDSSTRESS_RWQueue( old_queue_push_pop )
335 // CDSSTRESS_StdQueue( old_queue_push_pop )
337 #undef CDSSTRESS_Queue_F
338 #define CDSSTRESS_Queue_F( test_fixture, type_name, level ) \
339 TEST_F( test_fixture, type_name ) \
341 if ( !check_detail_level( level )) return; \
342 typedef queue::Types< value_type >::type_name queue_type; \
343 queue_type queue( s_nQueueSize ); \
347 CDSSTRESS_VyukovQueue( old_queue_push_pop )
349 #undef CDSSTRESS_Queue_F
352 // ********************************************************************
353 // SegmentedQueue test
355 class segmented_queue_push_pop
356 : public queue_push_pop<>
357 , public ::testing::WithParamInterface< size_t >
359 typedef queue_push_pop<> base_class;
363 template <typename Queue>
366 size_t quasi_factor = GetParam();
368 Queue q( quasi_factor );
369 propout() << std::make_pair( "quasi_factor", quasi_factor );
370 base_class::test_queue( q );
371 analyze( q, quasi_factor * 2, quasi_factor );
372 propout() << q.statistics();
376 static std::vector< size_t > get_test_parameters()
378 cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
379 bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
380 size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
382 std::vector<size_t> args;
383 if ( bIterative && quasi_factor > 4 ) {
384 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
385 args.push_back( qf );
388 if ( quasi_factor > 2 )
389 args.push_back( quasi_factor );
398 #define CDSSTRESS_Queue_F( test_fixture, type_name, level ) \
399 TEST_P( test_fixture, type_name ) \
401 if ( !check_detail_level( level )) return; \
402 typedef typename queue::Types<value_type>::type_name queue_type; \
403 test< queue_type >(); \
406 CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
408 INSTANTIATE_TEST_CASE_P( SQ,
409 segmented_queue_push_pop,
410 ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()));