2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "queue_type.h"
35 #include <type_traits>
37 // Multi-threaded queue push/pop test
40 static size_t s_nConsumerThreadCount = 4;
41 static size_t s_nProducerThreadCount = 4;
42 static size_t s_nQueueSize = 4000000;
43 static size_t s_nSegmentedQueueSize = 400000;
44 static size_t s_nVyukovQueueSize = 40000;
45 static size_t s_nHeavyValueSize = 100;
47 static std::atomic<size_t> s_nProducerDone( 0 );
55 template<class Value = old_value>
56 class queue_push_pop: public cds_test::stress_fixture
59 using value_type = Value;
66 template <class Queue>
67 class Producer: public cds_test::thread
69 typedef cds_test::thread base_class;
72 Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
73 : base_class( pool, producer_thread )
76 , m_nPushCount( nPushCount )
79 Producer( Producer& src )
81 , m_Queue( src.m_Queue )
83 , m_nPushCount( src.m_nPushCount )
86 virtual thread * clone()
88 return new Producer( *this );
93 size_t const nPushCount = m_nPushCount;
99 while ( v.nNo < nPushCount ) {
100 if ( m_Queue.push( v ))
106 s_nProducerDone.fetch_add( 1 );
111 size_t m_nPushFailed;
112 size_t const m_nPushCount;
115 template <class Queue>
116 class Consumer: public cds_test::thread
118 typedef cds_test::thread base_class;
122 size_t const m_nPushPerProducer;
127 typedef std::vector<size_t> popped_data;
128 typedef std::vector<size_t>::iterator data_iterator;
129 typedef std::vector<size_t>::const_iterator const_data_iterator;
131 std::vector<popped_data> m_WriterData;
134 void initPoppedData()
136 const size_t nProducerCount = s_nProducerThreadCount;
137 m_WriterData.resize( nProducerCount );
138 for ( size_t i = 0; i < nProducerCount; ++i )
139 m_WriterData[i].reserve( m_nPushPerProducer );
143 Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
144 : base_class( pool, consumer_thread )
146 , m_nPushPerProducer( nPushPerProducer )
153 Consumer( Consumer& src )
155 , m_Queue( src.m_Queue )
156 , m_nPushPerProducer( src.m_nPushPerProducer )
164 virtual thread * clone()
166 return new Consumer( *this );
174 const size_t nTotalWriters = s_nProducerThreadCount;
177 if ( m_Queue.pop( v )) {
179 if ( v.nWriterNo < nTotalWriters )
180 m_WriterData[ v.nWriterNo ].push_back( v.nNo );
187 if ( s_nProducerDone.load() >= nTotalWriters ) {
188 if ( m_Queue.empty())
197 size_t m_nThreadPushCount;
200 template <class Queue>
201 void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
203 cds_test::thread_pool& pool = get_pool();
205 typedef Consumer<Queue> consumer_type;
206 typedef Producer<Queue> producer_type;
208 size_t nPostTestPops = 0;
215 size_t nTotalPops = 0;
216 size_t nPopFalse = 0;
217 size_t nPoppedItems = 0;
218 size_t nPushFailed = 0;
220 std::vector< consumer_type * > arrConsumer;
222 for ( size_t i = 0; i < pool.size(); ++i ) {
223 cds_test::thread& thr = pool.get(i);
224 if ( thr.type() == consumer_thread ) {
225 consumer_type& consumer = static_cast<consumer_type&>( thr );
226 nTotalPops += consumer.m_nPopped;
227 nPopFalse += consumer.m_nPopEmpty;
228 arrConsumer.push_back( &consumer );
229 EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer_thread_no " << i;
232 for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
233 nPopped += consumer.m_WriterData[n].size();
235 nPoppedItems += nPopped;
238 assert( thr.type() == producer_thread );
240 producer_type& producer = static_cast<producer_type&>( thr );
241 nPushFailed += producer.m_nPushFailed;
242 EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer_thread_no " << i;
245 EXPECT_EQ( nTotalPops, nPoppedItems );
247 EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
248 EXPECT_TRUE( q.empty());
250 // Test consistency of popped sequence
251 for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
252 std::vector<size_t> arrData;
253 arrData.reserve( m_nThreadPushCount );
254 for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
255 auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
256 auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
259 for ( ++it; it != itEnd; ++it ) {
260 EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
265 for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
266 arrData.push_back( *it );
269 std::sort( arrData.begin(), arrData.end());
270 for ( size_t i=1; i < arrData.size(); ++i ) {
271 EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
274 EXPECT_EQ( arrData[0], 0u ) << "producer=" << nWriter;
275 EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
279 template <class Queue>
280 void test_queue( Queue& q )
282 m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
284 cds_test::thread_pool& pool = get_pool();
285 pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
286 pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
288 s_nProducerDone.store( 0 );
289 s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
291 propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
292 << std::make_pair( "consumer_count", s_nConsumerThreadCount )
293 << std::make_pair( "push_count", s_nQueueSize );
295 std::chrono::milliseconds duration = pool.run();
297 propout() << std::make_pair( "duration", duration );
300 template <class Queue>
301 void test( Queue& q )
305 propout() << q.statistics();
309 static void set_array_size( size_t size ) {
310 const bool tmp = fc_test::has_set_array_size<value_type>::value;
311 set_array_size(size, std::integral_constant<bool, tmp>());
314 static void set_array_size(size_t size, std::true_type){
315 value_type::set_array_size(size);
318 static void set_array_size(size_t, std::false_type)
323 static void SetUpTestCase()
325 cds_test::config const& cfg = get_config( "queue_push_pop" );
327 s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
328 s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
329 s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
330 s_nVyukovQueueSize = cfg.get_size_t( "VyukovQueueSize", s_nVyukovQueueSize );
331 s_nSegmentedQueueSize = cfg.get_size_t( "SegmentedQueueSize", s_nSegmentedQueueSize );
332 s_nHeavyValueSize = cfg.get_size_t( "HeavyValueSize", s_nHeavyValueSize );
334 if ( s_nConsumerThreadCount == 0u )
335 s_nConsumerThreadCount = 1;
336 if ( s_nProducerThreadCount == 0u )
337 s_nProducerThreadCount = 1;
338 if ( s_nQueueSize == 0u )
340 if ( s_nHeavyValueSize == 0 )
341 s_nHeavyValueSize = 1;
343 set_array_size( s_nHeavyValueSize );
346 //static void TearDownTestCase();
349 using fc_with_heavy_value = queue_push_pop< fc_test::heavy_value<36000> >;
350 using simple_queue_push_pop = queue_push_pop<>;
352 CDSSTRESS_MSQueue( simple_queue_push_pop )
353 CDSSTRESS_MoirQueue( simple_queue_push_pop )
354 CDSSTRESS_BasketQueue( simple_queue_push_pop )
355 CDSSTRESS_OptimsticQueue( simple_queue_push_pop )
356 CDSSTRESS_RWQueue( simple_queue_push_pop )
358 #undef CDSSTRESS_Queue_F
359 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
360 TEST_F( test_fixture, type_name ) \
362 size_t old_queue_size = s_nQueueSize; \
363 s_nQueueSize = s_nVyukovQueueSize; \
364 typedef queue::Types< value_type >::type_name queue_type; \
365 queue_type queue( s_nQueueSize ); \
367 s_nQueueSize = old_queue_size; \
370 //CDSSTRESS_VyukovQueue( simple_queue_push_pop )
372 #undef CDSSTRESS_Queue_F
375 // ********************************************************************
376 // SegmentedQueue test
378 class segmented_queue_push_pop
379 : public queue_push_pop<>
380 , public ::testing::WithParamInterface< size_t >
382 typedef queue_push_pop<> base_class;
386 template <typename Queue>
389 size_t quasi_factor = GetParam();
391 Queue q( quasi_factor );
392 propout() << std::make_pair( "quasi_factor", quasi_factor );
393 base_class::test_queue( q );
394 analyze( q, quasi_factor * 2, quasi_factor );
395 propout() << q.statistics();
399 static std::vector< size_t > get_test_parameters()
401 cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
402 bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
403 size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
405 std::vector<size_t> args;
406 if ( bIterative && quasi_factor > 4 ) {
407 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
408 args.push_back( qf );
410 if ( quasi_factor > 2 )
411 args.push_back( quasi_factor );
420 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
421 TEST_P( test_fixture, type_name ) \
423 typedef typename queue::Types<value_type>::type_name queue_type; \
424 s_nQueueSize = s_nSegmentedQueueSize; \
425 test< queue_type >(); \
428 CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
430 #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
431 static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
433 return std::to_string( p.param );
435 INSTANTIATE_TEST_CASE_P( SQ,
436 segmented_queue_push_pop,
437 ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()), get_test_parameter_name );
439 INSTANTIATE_TEST_CASE_P( SQ,
440 segmented_queue_push_pop,
441 ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()));