Runs fewer test cases (only 2 threads)
[libcds.git] / test / stress / queue / push_pop.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "queue_type.h"
32
33 #include <vector>
34 #include <algorithm>
35 #include <type_traits>
36
37 // Multi-threaded queue push/pop test
38 namespace {
39
40     static size_t s_nConsumerThreadCount = 4;
41     static size_t s_nProducerThreadCount = 4;
42     static size_t s_nQueueSize = 4000000;
43     static size_t s_nSegmentedQueueSize = 400000;
44     static size_t s_nVyukovQueueSize = 40000;
45     static size_t s_nHeavyValueSize = 100;
46
47     static std::atomic<size_t> s_nProducerDone( 0 );
48
49     struct old_value
50     {
51         size_t nNo;
52         size_t nWriterNo;
53     };
54
55     template<class Value = old_value>
56     class queue_push_pop: public cds_test::stress_fixture
57     {
58     protected:
59        using value_type = Value;
60
61         enum {
62             producer_thread,
63             consumer_thread
64         };
65
66         template <class Queue>
67         class Producer: public cds_test::thread
68         {
69             typedef cds_test::thread base_class;
70
71         public:
72             Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
73                 : base_class( pool, producer_thread )
74                 , m_Queue( queue )
75                 , m_nPushFailed( 0 )
76                 , m_nPushCount( nPushCount )
77             {}
78
79             Producer( Producer& src )
80                 : base_class( src )
81                 , m_Queue( src.m_Queue )
82                 , m_nPushFailed( 0 )
83                 , m_nPushCount( src.m_nPushCount )
84             {}
85
86             virtual thread * clone()
87             {
88                 return new Producer( *this );
89             }
90
91             virtual void test()
92             {
93                 size_t const nPushCount = m_nPushCount;
94                 value_type v;
95                 v.nWriterNo = id();
96                 v.nNo = 0;
97                 m_nPushFailed = 0;
98
99                 while ( v.nNo < nPushCount ) {
100                     if ( m_Queue.push( v ))
101                         ++v.nNo;
102                     else
103                         ++m_nPushFailed;
104                 }
105
106                 s_nProducerDone.fetch_add( 1 );
107             }
108
109         public:
110             Queue&              m_Queue;
111             size_t              m_nPushFailed;
112             size_t const        m_nPushCount;
113         };
114
115         template <class Queue>
116         class Consumer: public cds_test::thread
117         {
118             typedef cds_test::thread base_class;
119
120         public:
121             Queue&              m_Queue;
122             size_t const        m_nPushPerProducer;
123             size_t              m_nPopEmpty;
124             size_t              m_nPopped;
125             size_t              m_nBadWriter;
126
127             typedef std::vector<size_t> popped_data;
128             typedef std::vector<size_t>::iterator       data_iterator;
129             typedef std::vector<size_t>::const_iterator const_data_iterator;
130
131             std::vector<popped_data>        m_WriterData;
132
133         private:
134             void initPoppedData()
135             {
136                 const size_t nProducerCount = s_nProducerThreadCount;
137                 m_WriterData.resize( nProducerCount );
138                 for ( size_t i = 0; i < nProducerCount; ++i )
139                     m_WriterData[i].reserve( m_nPushPerProducer );
140             }
141
142         public:
143             Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
144                 : base_class( pool, consumer_thread )
145                 , m_Queue( queue )
146                 , m_nPushPerProducer( nPushPerProducer )
147                 , m_nPopEmpty( 0 )
148                 , m_nPopped( 0 )
149                 , m_nBadWriter( 0 )
150             {
151                 initPoppedData();
152             }
153             Consumer( Consumer& src )
154                 : base_class( src )
155                 , m_Queue( src.m_Queue )
156                 , m_nPushPerProducer( src.m_nPushPerProducer )
157                 , m_nPopEmpty( 0 )
158                 , m_nPopped( 0 )
159                 , m_nBadWriter( 0 )
160             {
161                 initPoppedData();
162             }
163
164             virtual thread * clone()
165             {
166                 return new Consumer( *this );
167             }
168
169             virtual void test()
170             {
171                 m_nPopEmpty = 0;
172                 m_nPopped = 0;
173                 m_nBadWriter = 0;
174                 const size_t nTotalWriters = s_nProducerThreadCount;
175                 value_type v;
176                 while ( true ) {
177                     if ( m_Queue.pop( v )) {
178                         ++m_nPopped;
179                         if ( v.nWriterNo < nTotalWriters )
180                             m_WriterData[ v.nWriterNo ].push_back( v.nNo );
181                         else
182                             ++m_nBadWriter;
183                     }
184                     else {
185                         ++m_nPopEmpty;
186
187                         if ( s_nProducerDone.load() >= nTotalWriters ) {
188                             if ( m_Queue.empty())
189                                 break;
190                         }
191                     }
192                 }
193             }
194         };
195
196     protected:
197         size_t m_nThreadPushCount;
198
199     protected:
200         template <class Queue>
201         void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
202         {
203             cds_test::thread_pool& pool = get_pool();
204
205             typedef Consumer<Queue> consumer_type;
206             typedef Producer<Queue> producer_type;
207
208             size_t nPostTestPops = 0;
209             {
210                 value_type v;
211                 while ( q.pop( v ))
212                     ++nPostTestPops;
213             }
214
215             size_t nTotalPops = 0;
216             size_t nPopFalse = 0;
217             size_t nPoppedItems = 0;
218             size_t nPushFailed = 0;
219
220             std::vector< consumer_type * > arrConsumer;
221
222             for ( size_t i = 0; i < pool.size(); ++i ) {
223                 cds_test::thread& thr = pool.get(i);
224                 if ( thr.type() == consumer_thread ) {
225                     consumer_type& consumer = static_cast<consumer_type&>( thr );
226                     nTotalPops += consumer.m_nPopped;
227                     nPopFalse += consumer.m_nPopEmpty;
228                     arrConsumer.push_back( &consumer );
229                     EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer_thread_no " << i;
230
231                     size_t nPopped = 0;
232                     for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
233                         nPopped += consumer.m_WriterData[n].size();
234
235                     nPoppedItems += nPopped;
236                 }
237                 else {
238                     assert( thr.type() == producer_thread );
239
240                     producer_type& producer = static_cast<producer_type&>( thr );
241                     nPushFailed += producer.m_nPushFailed;
242                     EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer_thread_no " << i;
243                 }
244             }
245             EXPECT_EQ( nTotalPops, nPoppedItems );
246
247             EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
248             EXPECT_TRUE( q.empty());
249
250             // Test consistency of popped sequence
251             for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
252                 std::vector<size_t> arrData;
253                 arrData.reserve( m_nThreadPushCount );
254                 for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
255                     auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
256                     auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
257                     if ( it != itEnd ) {
258                         auto itPrev = it;
259                         for ( ++it; it != itEnd; ++it ) {
260                             EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
261                             itPrev = it;
262                         }
263                     }
264
265                     for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
266                         arrData.push_back( *it );
267                 }
268
269                 std::sort( arrData.begin(), arrData.end());
270                 for ( size_t i=1; i < arrData.size(); ++i ) {
271                     EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
272                 }
273
274                 EXPECT_EQ( arrData[0], 0u ) << "producer=" << nWriter;
275                 EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
276             }
277         }
278
279         template <class Queue>
280         void test_queue( Queue& q )
281         {
282             m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
283
284             cds_test::thread_pool& pool = get_pool();
285             pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
286             pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
287
288             s_nProducerDone.store( 0 );
289             s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
290
291             propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
292                 << std::make_pair( "consumer_count", s_nConsumerThreadCount )
293                 << std::make_pair( "push_count", s_nQueueSize );
294
295             std::chrono::milliseconds duration = pool.run();
296
297             propout() << std::make_pair( "duration", duration );
298         }
299
300         template <class Queue>
301         void test( Queue& q )
302         {
303             test_queue( q );
304             analyze( q );
305             propout() << q.statistics();
306         }
307
308     private:
309         static void set_array_size( size_t size ) {
310             const bool tmp = fc_test::has_set_array_size<value_type>::value;
311             set_array_size(size, std::integral_constant<bool, tmp>());
312         }
313
314         static void set_array_size(size_t size, std::true_type){
315             value_type::set_array_size(size);
316         }
317
318         static void set_array_size(size_t, std::false_type)
319         {
320         }
321
322     public:
323         static void SetUpTestCase()
324         {
325             cds_test::config const& cfg = get_config( "queue_push_pop" );
326
327             s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
328             s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
329             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
330             s_nVyukovQueueSize = cfg.get_size_t( "VyukovQueueSize", s_nVyukovQueueSize );
331             s_nSegmentedQueueSize = cfg.get_size_t( "SegmentedQueueSize", s_nSegmentedQueueSize );
332             s_nHeavyValueSize = cfg.get_size_t( "HeavyValueSize", s_nHeavyValueSize );
333
334             if ( s_nConsumerThreadCount == 0u )
335                 s_nConsumerThreadCount = 1;
336             if ( s_nProducerThreadCount == 0u )
337                 s_nProducerThreadCount = 1;
338             if ( s_nQueueSize == 0u )
339                 s_nQueueSize = 1000;
340             if ( s_nHeavyValueSize == 0 )
341                 s_nHeavyValueSize = 1;
342
343             set_array_size( s_nHeavyValueSize );
344         }
345
346         //static void TearDownTestCase();
347     };
348
349     using fc_with_heavy_value = queue_push_pop< fc_test::heavy_value<36000> >;
350     using simple_queue_push_pop = queue_push_pop<>;
351
352     CDSSTRESS_MSQueue( simple_queue_push_pop )
353     CDSSTRESS_MoirQueue( simple_queue_push_pop )
354     CDSSTRESS_BasketQueue( simple_queue_push_pop )
355     CDSSTRESS_OptimsticQueue( simple_queue_push_pop )
356     CDSSTRESS_RWQueue( simple_queue_push_pop )
357
358 #undef CDSSTRESS_Queue_F
359 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
360     TEST_F( test_fixture, type_name ) \
361     { \
362         size_t old_queue_size = s_nQueueSize; \
363         s_nQueueSize = s_nVyukovQueueSize; \
364         typedef queue::Types< value_type >::type_name queue_type; \
365         queue_type queue( s_nQueueSize ); \
366         test( queue ); \
367         s_nQueueSize = old_queue_size; \
368     }
369
370     //CDSSTRESS_VyukovQueue( simple_queue_push_pop )
371
372 #undef CDSSTRESS_Queue_F
373
374
375     // ********************************************************************
376     // SegmentedQueue test
377
378     class segmented_queue_push_pop
379         : public queue_push_pop<>
380         , public ::testing::WithParamInterface< size_t >
381     {
382         typedef queue_push_pop<> base_class;
383
384     protected:
385
386         template <typename Queue>
387         void test()
388         {
389             size_t quasi_factor = GetParam();
390
391             Queue q( quasi_factor );
392             propout() << std::make_pair( "quasi_factor", quasi_factor );
393             base_class::test_queue( q );
394             analyze( q, quasi_factor * 2, quasi_factor );
395             propout() << q.statistics();
396         }
397
398     public:
399         static std::vector< size_t > get_test_parameters()
400         {
401             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
402             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
403             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
404
405             std::vector<size_t> args;
406             if ( bIterative && quasi_factor > 4 ) {
407                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
408                     args.push_back( qf );
409             } else {
410                 if ( quasi_factor > 2 )
411                     args.push_back( quasi_factor );
412                 else
413                     args.push_back( 2 );
414             }
415
416             return args;
417         }
418     };
419
420 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
421     TEST_P( test_fixture, type_name ) \
422     { \
423         typedef typename queue::Types<value_type>::type_name queue_type; \
424         s_nQueueSize = s_nSegmentedQueueSize; \
425         test< queue_type >(); \
426     }
427
428     CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
429
430 #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
431     static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
432     {
433         return std::to_string( p.param );
434     }
435     INSTANTIATE_TEST_CASE_P( SQ,
436         segmented_queue_push_pop,
437         ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()), get_test_parameter_name );
438 #else
439     INSTANTIATE_TEST_CASE_P( SQ,
440         segmented_queue_push_pop,
441         ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()));
442 #endif
443
444 } // namespace