Uses different pass count for different parallel queue test cases
[libcds.git] / test / stress / queue / push_pop.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "queue_type.h"
32
33 #include <vector>
34 #include <algorithm>
35 #include <type_traits>
36
37 // Multi-threaded queue push/pop test
38 namespace {
39
40     static size_t s_nConsumerThreadCount = 4;
41     static size_t s_nProducerThreadCount = 4;
42     static size_t s_nQueueSize = 4000000;
43     static size_t s_nMSQueueSize = 4000000;
44     static size_t s_nMoirQueueSize = 4000000;
45     static size_t s_nBasketQueueSize = 4000000;
46     static size_t s_nOptimisticQueueSize = 4000000;
47     static size_t s_nRWQueueSize = 4000000;
48     static size_t s_nSegmentedQueueSize = 400000;
49     static size_t s_nVyukovQueueSize = 40000;
50     static size_t s_nHeavyValueSize = 100;
51
52     static std::atomic<size_t> s_nProducerDone( 0 );
53
54     struct old_value
55     {
56         size_t nNo;
57         size_t nWriterNo;
58     };
59
60     template<class Value = old_value>
61     class queue_push_pop: public cds_test::stress_fixture
62     {
63     protected:
64        using value_type = Value;
65
66         enum {
67             producer_thread,
68             consumer_thread
69         };
70
71         template <class Queue>
72         class Producer: public cds_test::thread
73         {
74             typedef cds_test::thread base_class;
75
76         public:
77             Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
78                 : base_class( pool, producer_thread )
79                 , m_Queue( queue )
80                 , m_nPushFailed( 0 )
81                 , m_nPushCount( nPushCount )
82             {}
83
84             Producer( Producer& src )
85                 : base_class( src )
86                 , m_Queue( src.m_Queue )
87                 , m_nPushFailed( 0 )
88                 , m_nPushCount( src.m_nPushCount )
89             {}
90
91             virtual thread * clone()
92             {
93                 return new Producer( *this );
94             }
95
96             virtual void test()
97             {
98                 size_t const nPushCount = m_nPushCount;
99                 value_type v;
100                 v.nWriterNo = id();
101                 v.nNo = 0;
102                 m_nPushFailed = 0;
103
104                 while ( v.nNo < nPushCount ) {
105                     if ( m_Queue.push( v ))
106                         ++v.nNo;
107                     else
108                         ++m_nPushFailed;
109                 }
110
111                 s_nProducerDone.fetch_add( 1 );
112             }
113
114         public:
115             Queue&              m_Queue;
116             size_t              m_nPushFailed;
117             size_t const        m_nPushCount;
118         };
119
120         template <class Queue>
121         class Consumer: public cds_test::thread
122         {
123             typedef cds_test::thread base_class;
124
125         public:
126             Queue&              m_Queue;
127             size_t const        m_nPushPerProducer;
128             size_t              m_nPopEmpty;
129             size_t              m_nPopped;
130             size_t              m_nBadWriter;
131
132             typedef std::vector<size_t> popped_data;
133             typedef std::vector<size_t>::iterator       data_iterator;
134             typedef std::vector<size_t>::const_iterator const_data_iterator;
135
136             std::vector<popped_data>        m_WriterData;
137
138         private:
139             void initPoppedData()
140             {
141                 const size_t nProducerCount = s_nProducerThreadCount;
142                 m_WriterData.resize( nProducerCount );
143                 for ( size_t i = 0; i < nProducerCount; ++i )
144                     m_WriterData[i].reserve( m_nPushPerProducer );
145             }
146
147         public:
148             Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
149                 : base_class( pool, consumer_thread )
150                 , m_Queue( queue )
151                 , m_nPushPerProducer( nPushPerProducer )
152                 , m_nPopEmpty( 0 )
153                 , m_nPopped( 0 )
154                 , m_nBadWriter( 0 )
155             {
156                 initPoppedData();
157             }
158             Consumer( Consumer& src )
159                 : base_class( src )
160                 , m_Queue( src.m_Queue )
161                 , m_nPushPerProducer( src.m_nPushPerProducer )
162                 , m_nPopEmpty( 0 )
163                 , m_nPopped( 0 )
164                 , m_nBadWriter( 0 )
165             {
166                 initPoppedData();
167             }
168
169             virtual thread * clone()
170             {
171                 return new Consumer( *this );
172             }
173
174             virtual void test()
175             {
176                 m_nPopEmpty = 0;
177                 m_nPopped = 0;
178                 m_nBadWriter = 0;
179                 const size_t nTotalWriters = s_nProducerThreadCount;
180                 value_type v;
181                 while ( true ) {
182                     if ( m_Queue.pop( v )) {
183                         ++m_nPopped;
184                         if ( v.nWriterNo < nTotalWriters )
185                             m_WriterData[ v.nWriterNo ].push_back( v.nNo );
186                         else
187                             ++m_nBadWriter;
188                     }
189                     else {
190                         ++m_nPopEmpty;
191
192                         if ( s_nProducerDone.load() >= nTotalWriters ) {
193                             if ( m_Queue.empty())
194                                 break;
195                         }
196                     }
197                 }
198             }
199         };
200
201     protected:
202         size_t m_nThreadPushCount;
203
204     protected:
205         template <class Queue>
206         void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
207         {
208             cds_test::thread_pool& pool = get_pool();
209
210             typedef Consumer<Queue> consumer_type;
211             typedef Producer<Queue> producer_type;
212
213             size_t nPostTestPops = 0;
214             {
215                 value_type v;
216                 while ( q.pop( v ))
217                     ++nPostTestPops;
218             }
219
220             size_t nTotalPops = 0;
221             size_t nPopFalse = 0;
222             size_t nPoppedItems = 0;
223             size_t nPushFailed = 0;
224
225             std::vector< consumer_type * > arrConsumer;
226
227             for ( size_t i = 0; i < pool.size(); ++i ) {
228                 cds_test::thread& thr = pool.get(i);
229                 if ( thr.type() == consumer_thread ) {
230                     consumer_type& consumer = static_cast<consumer_type&>( thr );
231                     nTotalPops += consumer.m_nPopped;
232                     nPopFalse += consumer.m_nPopEmpty;
233                     arrConsumer.push_back( &consumer );
234                     EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer_thread_no " << i;
235
236                     size_t nPopped = 0;
237                     for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
238                         nPopped += consumer.m_WriterData[n].size();
239
240                     nPoppedItems += nPopped;
241                 }
242                 else {
243                     assert( thr.type() == producer_thread );
244
245                     producer_type& producer = static_cast<producer_type&>( thr );
246                     nPushFailed += producer.m_nPushFailed;
247                     EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer_thread_no " << i;
248                 }
249             }
250             EXPECT_EQ( nTotalPops, nPoppedItems );
251
252             EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
253             EXPECT_TRUE( q.empty());
254
255             // Test consistency of popped sequence
256             for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
257                 std::vector<size_t> arrData;
258                 arrData.reserve( m_nThreadPushCount );
259                 for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
260                     auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
261                     auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
262                     if ( it != itEnd ) {
263                         auto itPrev = it;
264                         for ( ++it; it != itEnd; ++it ) {
265                             EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
266                             itPrev = it;
267                         }
268                     }
269
270                     for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
271                         arrData.push_back( *it );
272                 }
273
274                 std::sort( arrData.begin(), arrData.end());
275                 for ( size_t i=1; i < arrData.size(); ++i ) {
276                     EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
277                 }
278
279                 EXPECT_EQ( arrData[0], 0u ) << "producer=" << nWriter;
280                 EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
281             }
282         }
283
284         template <class Queue>
285         void test_queue( Queue& q )
286         {
287             m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
288
289             cds_test::thread_pool& pool = get_pool();
290             pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
291             pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
292
293             s_nProducerDone.store( 0 );
294             s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
295
296             propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
297                 << std::make_pair( "consumer_count", s_nConsumerThreadCount )
298                 << std::make_pair( "push_count", s_nQueueSize );
299
300             std::chrono::milliseconds duration = pool.run();
301
302             propout() << std::make_pair( "duration", duration );
303         }
304
305         template <class Queue>
306         void test( Queue& q )
307         {
308             test_queue( q );
309             analyze( q );
310             propout() << q.statistics();
311         }
312
313     private:
314         static void set_array_size( size_t size ) {
315             const bool tmp = fc_test::has_set_array_size<value_type>::value;
316             set_array_size(size, std::integral_constant<bool, tmp>());
317         }
318
319         static void set_array_size(size_t size, std::true_type){
320             value_type::set_array_size(size);
321         }
322
323         static void set_array_size(size_t, std::false_type)
324         {
325         }
326
327     public:
328         static void SetUpTestCase()
329         {
330             cds_test::config const& cfg = get_config( "queue_push_pop" );
331
332             s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
333             s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
334             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
335
336             s_nMSQueueSize = cfg.get_size_t( "MSQueueSize", s_nMSQueueSize );
337             s_nMoirQueueSize = cfg.get_size_t( "MoirQueueSize", s_nMoirQueueSize );
338             s_nBasketQueueSize = cfg.get_size_t( "BasketQueueSize", s_nBasketQueueSize );
339             s_nOptimisticQueueSize = cfg.get_size_t( "OptimisticQueueSize", s_nOptimisticQueueSize );
340             s_nRWQueueSize = cfg.get_size_t( "RWQueueSize", s_nRWQueueSize );
341
342             s_nVyukovQueueSize = cfg.get_size_t( "VyukovQueueSize", s_nVyukovQueueSize );
343             s_nSegmentedQueueSize = cfg.get_size_t( "SegmentedQueueSize", s_nSegmentedQueueSize );
344             s_nHeavyValueSize = cfg.get_size_t( "HeavyValueSize", s_nHeavyValueSize );
345
346             if ( s_nConsumerThreadCount == 0u )
347                 s_nConsumerThreadCount = 1;
348             if ( s_nProducerThreadCount == 0u )
349                 s_nProducerThreadCount = 1;
350             if ( s_nQueueSize == 0u )
351                 s_nQueueSize = 1000;
352             if ( s_nHeavyValueSize == 0 )
353                 s_nHeavyValueSize = 1;
354
355             set_array_size( s_nHeavyValueSize );
356         }
357
358         //static void TearDownTestCase();
359     };
360
361     using fc_with_heavy_value = queue_push_pop< fc_test::heavy_value<36000> >;
362     using simple_queue_push_pop = queue_push_pop<>;
363
364 #undef CDSSTRESS_Queue_F
365 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
366     TEST_F( test_fixture, type_name ) \
367     { \
368         typedef queue::Types< value_type >::type_name queue_type; \
369         queue_type queue; \
370         s_nQueueSize = s_nMSQueueSize; \
371         test( queue ); \
372     }
373
374     CDSSTRESS_MSQueue( simple_queue_push_pop )
375
376 #undef CDSSTRESS_Queue_F
377 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
378     TEST_F( test_fixture, type_name ) \
379     { \
380         typedef queue::Types< value_type >::type_name queue_type; \
381         queue_type queue; \
382         s_nQueueSize = s_nMoirQueueSize; \
383         test( queue ); \
384     }
385     CDSSTRESS_MoirQueue( simple_queue_push_pop )
386
387 #undef CDSSTRESS_Queue_F
388 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
389     TEST_F( test_fixture, type_name ) \
390     { \
391         typedef queue::Types< value_type >::type_name queue_type; \
392         queue_type queue; \
393         s_nQueueSize = s_nBasketQueueSize; \
394         test( queue ); \
395     }
396     CDSSTRESS_BasketQueue( simple_queue_push_pop )
397
398 #undef CDSSTRESS_Queue_F
399 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
400     TEST_F( test_fixture, type_name ) \
401     { \
402         typedef queue::Types< value_type >::type_name queue_type; \
403         queue_type queue; \
404         s_nQueueSize = s_nOptimisticQueueSize; \
405         test( queue ); \
406     }
407     CDSSTRESS_OptimsticQueue( simple_queue_push_pop )
408
409 #undef CDSSTRESS_Queue_F
410 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
411     TEST_F( test_fixture, type_name ) \
412     { \
413         typedef queue::Types< value_type >::type_name queue_type; \
414         queue_type queue; \
415         s_nQueueSize = s_nRWQueueSize; \
416         test( queue ); \
417     }
418     CDSSTRESS_RWQueue( simple_queue_push_pop )
419
420 #undef CDSSTRESS_Queue_F
421 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
422     TEST_F( test_fixture, type_name ) \
423     { \
424         size_t old_queue_size = s_nQueueSize; \
425         s_nQueueSize = s_nVyukovQueueSize; \
426         typedef queue::Types< value_type >::type_name queue_type; \
427         queue_type queue( s_nQueueSize ); \
428         test( queue ); \
429         s_nQueueSize = old_queue_size; \
430     }
431
432     //CDSSTRESS_VyukovQueue( simple_queue_push_pop )
433
434     // ********************************************************************
435     // SegmentedQueue test
436
437     class segmented_queue_push_pop
438         : public queue_push_pop<>
439         , public ::testing::WithParamInterface< size_t >
440     {
441         typedef queue_push_pop<> base_class;
442
443     protected:
444
445         template <typename Queue>
446         void test()
447         {
448             size_t quasi_factor = GetParam();
449
450             Queue q( quasi_factor );
451             propout() << std::make_pair( "quasi_factor", quasi_factor );
452             base_class::test_queue( q );
453             analyze( q, quasi_factor * 2, quasi_factor );
454             propout() << q.statistics();
455         }
456
457     public:
458         static std::vector< size_t > get_test_parameters()
459         {
460             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
461             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
462             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
463
464             std::vector<size_t> args;
465             if ( bIterative && quasi_factor > 4 ) {
466                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
467                     args.push_back( qf );
468             } else {
469                 if ( quasi_factor > 2 )
470                     args.push_back( quasi_factor );
471                 else
472                     args.push_back( 2 );
473             }
474
475             return args;
476         }
477     };
478
479 #undef CDSSTRESS_Queue_F
480 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
481     TEST_P( test_fixture, type_name ) \
482     { \
483         typedef typename queue::Types<value_type>::type_name queue_type; \
484         s_nQueueSize = s_nSegmentedQueueSize; \
485         test< queue_type >(); \
486     }
487
488     CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
489
490 #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
491     static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
492     {
493         return std::to_string( p.param );
494     }
495     INSTANTIATE_TEST_CASE_P( SQ,
496         segmented_queue_push_pop,
497         ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()), get_test_parameter_name );
498 #else
499     INSTANTIATE_TEST_CASE_P( SQ,
500         segmented_queue_push_pop,
501         ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()));
502 #endif
503
504 } // namespace