Uses different pass count for different parallel queue test cases
[libcds.git] / test / stress / queue / random.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "queue_type.h"
32
33 #include <vector>
34
35 // Multi-threaded queue test for random push/pop operation
36 namespace {
37
38     static size_t s_nThreadCount = 16;
39     static size_t s_nQueueSize = 10000000;
40
41     std::atomic< size_t > s_nProducerCount(0);
42
43     class queue_random: public cds_test::stress_fixture
44     {
45         typedef cds_test::stress_fixture base_class;
46
47     protected:
48         struct value_type {
49             size_t      nNo;
50             size_t      nThread;
51
52             value_type() {}
53             value_type( size_t n ) : nNo( n ) {}
54         };
55
56         template <class Queue>
57         class Strain: public cds_test::thread
58         {
59             typedef cds_test::thread base_class;
60
61         public:
62             Strain( cds_test::thread_pool& pool, Queue& q, size_t nPushCount, size_t nSpread = 0 )
63                 : base_class( pool )
64                 , m_Queue( q )
65                 , m_nSpread( nSpread )
66                 , m_nTotalPushCount( nPushCount )
67             {}
68
69             Strain( Strain& src )
70                 : base_class( src )
71                 , m_Queue( src.m_Queue )
72                 , m_nSpread( src.m_nSpread )
73                 , m_nTotalPushCount( src.m_nTotalPushCount )
74             {}
75
76             virtual thread * clone()
77             {
78                 return new Strain( *this );
79             }
80
81             virtual void test()
82             {
83                 size_t const nThreadCount = s_nThreadCount;
84                 size_t const nTotalPush = m_nTotalPushCount;
85
86                 m_arrLastRead.resize( nThreadCount, 0 );
87                 m_arrPopCountPerThread.resize( nThreadCount, 0 );
88
89                 value_type node;
90
91                 while ( m_nPushCount < nTotalPush ) {
92                     if ( ( std::rand() & 3) != 3 ) {
93                         node.nThread = id();
94                         node.nNo = ++m_nPushCount;
95                         if ( !m_Queue.push( node )) {
96                             ++m_nPushError;
97                             --m_nPushCount;
98                         }
99                     }
100                     else
101                         pop( nThreadCount );
102                 }
103
104                 s_nProducerCount.fetch_sub( 1, std::memory_order_relaxed );
105
106                 while ( !m_Queue.empty() || s_nProducerCount.load( std::memory_order_relaxed ) != 0 )
107                     pop( nThreadCount );
108             }
109
110             bool pop( size_t nThreadCount )
111             {
112                 value_type node;
113                 node.nThread = nThreadCount;
114                 node.nNo = ~0;
115                 if ( m_Queue.pop( node )) {
116                     ++m_nPopCount;
117                     if ( node.nThread < nThreadCount ) {
118                         m_arrPopCountPerThread[ node.nThread ] += 1;
119                         if ( m_nSpread ) {
120                             if ( m_arrLastRead[ node.nThread ] > node.nNo ) {
121                                 if ( m_arrLastRead[ node.nThread ] - node.nNo > m_nSpread )
122                                     ++m_nRepeatValue;
123                             }
124                             else if ( m_arrLastRead[ node.nThread ] == node.nNo )
125                                 ++m_nRepeatValue;
126                             m_arrLastRead[ node.nThread ] = node.nNo;
127                         }
128                         else {
129                             if ( m_arrLastRead[ node.nThread ] < node.nNo )
130                                 m_arrLastRead[ node.nThread ] = node.nNo;
131                             else
132                                 ++m_nRepeatValue;
133                         }
134                     }
135                     else
136                         ++m_nUndefWriter;
137                 }
138                 else {
139                     ++m_nEmptyPop;
140                     return false;
141                 }
142                 return true;
143             }
144
145         public:
146             Queue&              m_Queue;
147
148             size_t  m_nPushCount = 0;
149             size_t  m_nPopCount  = 0;
150             size_t  m_nEmptyPop  = 0;
151
152             size_t  m_nUndefWriter = 0;
153             size_t  m_nRepeatValue = 0;
154             size_t  m_nPushError   = 0;
155
156             std::vector<size_t> m_arrLastRead;
157             std::vector<size_t> m_arrPopCountPerThread;
158
159             size_t const m_nSpread;
160             size_t const m_nTotalPushCount;
161         };
162
163     public:
164         static void SetUpTestCase()
165         {
166             cds_test::config const& cfg = get_config( "queue_random" );
167
168             s_nThreadCount = cfg.get_size_t( "ThreadCount", s_nThreadCount );
169             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
170
171             if ( s_nThreadCount == 0u )
172                 s_nThreadCount = 1;
173             if ( s_nQueueSize == 0u )
174                 s_nQueueSize = 1000;
175         }
176
177         //static void TearDownTestCase();
178
179     protected:
180         template <class Queue>
181         void analyze( Queue& q  )
182         {
183             EXPECT_TRUE( q.empty());
184
185             std::vector< size_t > arrPushCount;
186             arrPushCount.resize( s_nThreadCount, 0 );
187
188             size_t nPushTotal = 0;
189             size_t nPopTotal  = 0;
190             size_t nPushError = 0;
191
192             cds_test::thread_pool& pool = get_pool();
193             for ( size_t i = 0; i < pool.size(); ++i ) {
194                 Strain<Queue>& thr = static_cast<Strain<Queue> &>( pool.get(i));
195                 EXPECT_EQ( thr.m_nUndefWriter, 0u );
196                 EXPECT_EQ( thr.m_nRepeatValue, 0u );
197                 EXPECT_EQ( thr.m_nPushError, 0u );
198                 nPushError += thr.m_nPushError;
199
200                 arrPushCount[ thr.id() ] += thr.m_nPushCount;
201
202                 nPushTotal += thr.m_nPushCount;
203                 nPopTotal += thr.m_nPopCount;
204             }
205
206             EXPECT_EQ( nPushTotal, s_nQueueSize );
207             EXPECT_EQ( nPopTotal, s_nQueueSize );
208
209             size_t const nThreadPushCount = s_nQueueSize / s_nThreadCount;
210             for ( size_t i = 0; i < s_nThreadCount; ++i )
211                 EXPECT_EQ( arrPushCount[i], nThreadPushCount ) << "thread=" << i;
212         }
213
214         template <class Queue>
215         void test( Queue& q )
216         {
217             size_t nThreadPushCount = s_nQueueSize / s_nThreadCount;
218
219             cds_test::thread_pool& pool = get_pool();
220             pool.add( new Strain<Queue>( pool, q, nThreadPushCount ), s_nThreadCount );
221
222             s_nQueueSize = nThreadPushCount * s_nThreadCount;
223             propout() << std::make_pair( "thread_count", s_nThreadCount )
224                 << std::make_pair( "push_count", s_nQueueSize );
225
226             s_nProducerCount.store( pool.size(), std::memory_order_release );
227             std::chrono::milliseconds duration = pool.run();
228             propout() << std::make_pair( "duration", duration );
229
230             analyze( q );
231
232             propout() << q.statistics();
233         }
234     };
235
236     CDSSTRESS_MSQueue( queue_random )
237     CDSSTRESS_MoirQueue( queue_random )
238     CDSSTRESS_BasketQueue( queue_random )
239     CDSSTRESS_OptimsticQueue( queue_random )
240     CDSSTRESS_RWQueue( queue_random )
241
242 #undef CDSSTRESS_Queue_F
243 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
244     TEST_F( test_fixture, type_name ) \
245     { \
246         typedef queue::Types< value_type >::type_name queue_type; \
247         queue_type queue( s_nQueueSize ); \
248         test( queue ); \
249     }
250
251     CDSSTRESS_VyukovQueue( queue_random )
252
253 #undef CDSSTRESS_Queue_F
254
255     // ********************************************************************
256     // SegmentedQueue test
257
258     class segmented_queue_random
259         : public queue_random
260         , public ::testing::WithParamInterface< size_t >
261     {
262         typedef queue_random base_class;
263
264     protected:
265         template <typename Queue>
266         void test()
267         {
268             size_t quasi_factor = GetParam();
269
270             Queue q( quasi_factor );
271             propout() << std::make_pair( "quasi_factor", quasi_factor );
272
273             size_t nThreadPushCount = s_nQueueSize / s_nThreadCount;
274
275             cds_test::thread_pool& pool = get_pool();
276             pool.add( new Strain<Queue>( pool, q, nThreadPushCount, quasi_factor * 2 ), s_nThreadCount );
277
278             s_nQueueSize = nThreadPushCount * s_nThreadCount;
279             propout() << std::make_pair( "thread_count", s_nThreadCount )
280                 << std::make_pair( "push_count", s_nQueueSize );
281
282             s_nProducerCount.store( pool.size(), std::memory_order_release );
283             std::chrono::milliseconds duration = pool.run();
284             propout() << std::make_pair( "duration", duration );
285
286             analyze( q );
287
288             propout() << q.statistics();
289         }
290
291     public:
292         static std::vector< size_t > get_test_parameters()
293         {
294             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push" );
295             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
296             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
297
298             std::vector<size_t> args;
299             if ( bIterative && quasi_factor > 4 ) {
300                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
301                     args.push_back( qf );
302             }
303             else {
304                 if ( quasi_factor > 2 )
305                     args.push_back( quasi_factor );
306                 else
307                     args.push_back( 2 );
308             }
309
310             return args;
311         }
312     };
313
314 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
315     TEST_P( test_fixture, type_name ) \
316     { \
317         typedef typename queue::Types<value_type>::type_name queue_type; \
318         test< queue_type >(); \
319     }
320
321     CDSSTRESS_SegmentedQueue( segmented_queue_random )
322
323 #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
324     static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
325     {
326         return std::to_string( p.param );
327     }
328     INSTANTIATE_TEST_CASE_P( SQ,
329         segmented_queue_random,
330         ::testing::ValuesIn( segmented_queue_random::get_test_parameters()), get_test_parameter_name );
331 #else
332     INSTANTIATE_TEST_CASE_P( SQ,
333         segmented_queue_random,
334         ::testing::ValuesIn( segmented_queue_random::get_test_parameters()));
335 #endif
336
337 } // namespace