2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "queue_type.h"
35 #include <type_traits>
37 // Multi-threaded queue push/pop test
40 static size_t s_nConsumerThreadCount = 4;
41 static size_t s_nProducerThreadCount = 4;
42 static size_t s_nQueueSize = 4000000;
43 static size_t s_nVyukovQueueSize = 40000;
44 static size_t s_nHeavyValueSize = 100;
46 static std::atomic<size_t> s_nProducerDone( 0 );
54 template<class Value = old_value>
55 class queue_push_pop: public cds_test::stress_fixture
58 using value_type = Value;
65 template <class Queue>
66 class Producer: public cds_test::thread
68 typedef cds_test::thread base_class;
71 Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
72 : base_class( pool, producer_thread )
75 , m_nPushCount( nPushCount )
78 Producer( Producer& src )
80 , m_Queue( src.m_Queue )
82 , m_nPushCount( src.m_nPushCount )
85 virtual thread * clone()
87 return new Producer( *this );
92 size_t const nPushCount = m_nPushCount;
98 while ( v.nNo < nPushCount ) {
99 if ( m_Queue.push( v ))
105 s_nProducerDone.fetch_add( 1 );
110 size_t m_nPushFailed;
111 size_t const m_nPushCount;
114 template <class Queue>
115 class Consumer: public cds_test::thread
117 typedef cds_test::thread base_class;
121 size_t const m_nPushPerProducer;
126 typedef std::vector<size_t> popped_data;
127 typedef std::vector<size_t>::iterator data_iterator;
128 typedef std::vector<size_t>::const_iterator const_data_iterator;
130 std::vector<popped_data> m_WriterData;
133 void initPoppedData()
135 const size_t nProducerCount = s_nProducerThreadCount;
136 m_WriterData.resize( nProducerCount );
137 for ( size_t i = 0; i < nProducerCount; ++i )
138 m_WriterData[i].reserve( m_nPushPerProducer );
142 Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
143 : base_class( pool, consumer_thread )
145 , m_nPushPerProducer( nPushPerProducer )
152 Consumer( Consumer& src )
154 , m_Queue( src.m_Queue )
155 , m_nPushPerProducer( src.m_nPushPerProducer )
163 virtual thread * clone()
165 return new Consumer( *this );
173 const size_t nTotalWriters = s_nProducerThreadCount;
176 if ( m_Queue.pop( v )) {
178 if ( v.nWriterNo < nTotalWriters )
179 m_WriterData[ v.nWriterNo ].push_back( v.nNo );
186 if ( s_nProducerDone.load() >= nTotalWriters ) {
187 if ( m_Queue.empty())
196 size_t m_nThreadPushCount;
199 template <class Queue>
200 void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
202 cds_test::thread_pool& pool = get_pool();
204 typedef Consumer<Queue> consumer_type;
205 typedef Producer<Queue> producer_type;
207 size_t nPostTestPops = 0;
214 size_t nTotalPops = 0;
215 size_t nPopFalse = 0;
216 size_t nPoppedItems = 0;
217 size_t nPushFailed = 0;
219 std::vector< consumer_type * > arrConsumer;
221 for ( size_t i = 0; i < pool.size(); ++i ) {
222 cds_test::thread& thr = pool.get(i);
223 if ( thr.type() == consumer_thread ) {
224 consumer_type& consumer = static_cast<consumer_type&>( thr );
225 nTotalPops += consumer.m_nPopped;
226 nPopFalse += consumer.m_nPopEmpty;
227 arrConsumer.push_back( &consumer );
228 EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer_thread_no " << i;
231 for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
232 nPopped += consumer.m_WriterData[n].size();
234 nPoppedItems += nPopped;
237 assert( thr.type() == producer_thread );
239 producer_type& producer = static_cast<producer_type&>( thr );
240 nPushFailed += producer.m_nPushFailed;
241 EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer_thread_no " << i;
244 EXPECT_EQ( nTotalPops, nPoppedItems );
246 EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
247 EXPECT_TRUE( q.empty());
249 // Test consistency of popped sequence
250 for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
251 std::vector<size_t> arrData;
252 arrData.reserve( m_nThreadPushCount );
253 for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
254 auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
255 auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
258 for ( ++it; it != itEnd; ++it ) {
259 EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
264 for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
265 arrData.push_back( *it );
268 std::sort( arrData.begin(), arrData.end());
269 for ( size_t i=1; i < arrData.size(); ++i ) {
270 EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
273 EXPECT_EQ( arrData[0], 0u ) << "producer=" << nWriter;
274 EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
278 template <class Queue>
279 void test_queue( Queue& q )
281 m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
283 cds_test::thread_pool& pool = get_pool();
284 pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
285 pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
287 s_nProducerDone.store( 0 );
288 s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
290 propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
291 << std::make_pair( "consumer_count", s_nConsumerThreadCount )
292 << std::make_pair( "push_count", s_nQueueSize );
294 std::chrono::milliseconds duration = pool.run();
296 propout() << std::make_pair( "duration", duration );
299 template <class Queue>
300 void test( Queue& q )
304 propout() << q.statistics();
308 static void set_array_size( size_t size ) {
309 const bool tmp = fc_test::has_set_array_size<value_type>::value;
310 set_array_size(size, std::integral_constant<bool, tmp>());
313 static void set_array_size(size_t size, std::true_type){
314 value_type::set_array_size(size);
317 static void set_array_size(size_t, std::false_type)
322 static void SetUpTestCase()
324 cds_test::config const& cfg = get_config( "queue_push_pop" );
326 s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
327 s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
328 s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
329 s_nVyukovQueueSize = cfg.get_size_t( "VyukovQueueSize", s_nVyukovQueueSize );
330 s_nHeavyValueSize = cfg.get_size_t( "HeavyValueSize", s_nHeavyValueSize );
332 if ( s_nConsumerThreadCount == 0u )
333 s_nConsumerThreadCount = 1;
334 if ( s_nProducerThreadCount == 0u )
335 s_nProducerThreadCount = 1;
336 if ( s_nQueueSize == 0u )
338 if ( s_nHeavyValueSize == 0 )
339 s_nHeavyValueSize = 1;
341 set_array_size( s_nHeavyValueSize );
344 //static void TearDownTestCase();
347 using fc_with_heavy_value = queue_push_pop< fc_test::heavy_value<36000> >;
348 using simple_queue_push_pop = queue_push_pop<>;
350 CDSSTRESS_MSQueue( simple_queue_push_pop )
351 CDSSTRESS_MoirQueue( simple_queue_push_pop )
352 CDSSTRESS_BasketQueue( simple_queue_push_pop )
353 CDSSTRESS_OptimsticQueue( simple_queue_push_pop )
354 CDSSTRESS_RWQueue( simple_queue_push_pop )
356 #undef CDSSTRESS_Queue_F
357 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
358 TEST_F( test_fixture, type_name ) \
360 size_t old_queue_size = s_nQueueSize; \
361 s_nQueueSize = s_nVyukovQueueSize; \
362 typedef queue::Types< value_type >::type_name queue_type; \
363 queue_type queue( s_nQueueSize ); \
365 s_nQueueSize = old_queue_size; \
368 CDSSTRESS_VyukovQueue( simple_queue_push_pop )
370 #undef CDSSTRESS_Queue_F
373 // ********************************************************************
374 // SegmentedQueue test
376 class segmented_queue_push_pop
377 : public queue_push_pop<>
378 , public ::testing::WithParamInterface< size_t >
380 typedef queue_push_pop<> base_class;
384 template <typename Queue>
387 size_t quasi_factor = GetParam();
389 Queue q( quasi_factor );
390 propout() << std::make_pair( "quasi_factor", quasi_factor );
391 base_class::test_queue( q );
392 analyze( q, quasi_factor * 2, quasi_factor );
393 propout() << q.statistics();
397 static std::vector< size_t > get_test_parameters()
399 cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
400 bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
401 size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
403 std::vector<size_t> args;
404 if ( bIterative && quasi_factor > 4 ) {
405 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
406 args.push_back( qf );
408 if ( quasi_factor > 2 )
409 args.push_back( quasi_factor );
418 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
419 TEST_P( test_fixture, type_name ) \
421 typedef typename queue::Types<value_type>::type_name queue_type; \
422 test< queue_type >(); \
425 CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
427 #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
428 static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
430 return std::to_string( p.param );
432 INSTANTIATE_TEST_CASE_P( SQ,
433 segmented_queue_push_pop,
434 ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()), get_test_parameter_name );
436 INSTANTIATE_TEST_CASE_P( SQ,
437 segmented_queue_push_pop,
438 ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()));