Fixed minor warnings
[libcds.git] / test / stress / queue / random.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "queue_type.h"
32
33 #include <vector>
34
35 // Multi-threaded queue test for random push/pop operation
36 namespace {
37
38     static size_t s_nThreadCount = 16;
39     static size_t s_nQueueSize = 10000000;
40
41     std::atomic< size_t > s_nProducerCount(0);
42
43     class queue_random: public cds_test::stress_fixture
44     {
45         typedef cds_test::stress_fixture base_class;
46
47     protected:
48         struct value_type {
49             size_t      nNo;
50             size_t      nThread;
51
52             value_type() {}
53             value_type( size_t n ) : nNo( n ) {}
54         };
55
56         template <class Queue>
57         class Strain: public cds_test::thread
58         {
59             typedef cds_test::thread base_class;
60
61         public:
62             Strain( cds_test::thread_pool& pool, Queue& q, size_t nPushCount, size_t nSpread = 0 )
63                 : base_class( pool )
64                 , m_Queue( q )
65                 , m_nSpread( nSpread )
66                 , m_nTotalPushCount( nPushCount )
67             {}
68
69             Strain( Strain& src )
70                 : base_class( src )
71                 , m_Queue( src.m_Queue )
72                 , m_nSpread( src.m_nSpread )
73                 , m_nTotalPushCount( src.m_nTotalPushCount )
74             {}
75
76             virtual thread * clone()
77             {
78                 return new Strain( *this );
79             }
80
81             virtual void test()
82             {
83                 size_t const nThreadCount = s_nThreadCount;
84                 size_t const nTotalPush = m_nTotalPushCount;
85
86                 m_arrLastRead.resize( nThreadCount, 0 );
87                 m_arrPopCountPerThread.resize( nThreadCount, 0 );
88
89                 value_type node;
90
91                 while ( m_nPushCount < nTotalPush ) {
92                     if ( ( std::rand() & 3) != 3 ) {
93                         node.nThread = id();
94                         node.nNo = ++m_nPushCount;
95                         if ( !m_Queue.push( node )) {
96                             ++m_nPushError;
97                             --m_nPushCount;
98                         }
99                     }
100                     else
101                         pop( nThreadCount );
102                 }
103
104                 s_nProducerCount.fetch_sub( 1, std::memory_order_relaxed );
105
106                 while ( !m_Queue.empty() || s_nProducerCount.load( std::memory_order_relaxed ) != 0 )
107                     pop( nThreadCount );
108             }
109
110             bool pop( size_t nThreadCount )
111             {
112                 value_type node;
113                 node.nThread = nThreadCount;
114                 node.nNo = ~0;
115                 if ( m_Queue.pop( node )) {
116                     ++m_nPopCount;
117                     if ( node.nThread < nThreadCount ) {
118                         m_arrPopCountPerThread[ node.nThread ] += 1;
119                         if ( m_nSpread ) {
120                             if ( m_arrLastRead[ node.nThread ] > node.nNo ) {
121                                 if ( m_arrLastRead[ node.nThread ] - node.nNo > m_nSpread )
122                                     ++m_nRepeatValue;
123                             }
124                             else if ( m_arrLastRead[ node.nThread ] == node.nNo )
125                                 ++m_nRepeatValue;
126                             m_arrLastRead[ node.nThread ] = node.nNo;
127                         }
128                         else {
129                             if ( m_arrLastRead[ node.nThread ] < node.nNo )
130                                 m_arrLastRead[ node.nThread ] = node.nNo;
131                             else
132                                 ++m_nRepeatValue;
133                         }
134                     }
135                     else
136                         ++m_nUndefWriter;
137                 }
138                 else {
139                     ++m_nEmptyPop;
140                     return false;
141                 }
142                 return true;
143             }
144
145         public:
146             Queue&              m_Queue;
147
148             size_t  m_nPushCount = 0;
149             size_t  m_nPopCount  = 0;
150             size_t  m_nEmptyPop  = 0;
151
152             size_t  m_nUndefWriter = 0;
153             size_t  m_nRepeatValue = 0;
154             size_t  m_nPushError   = 0;
155
156             std::vector<size_t> m_arrLastRead;
157             std::vector<size_t> m_arrPopCountPerThread;
158
159             size_t const m_nSpread;
160             size_t const m_nTotalPushCount;
161         };
162
163     public:
164         static void SetUpTestCase()
165         {
166             cds_test::config const& cfg = get_config( "queue_random" );
167
168             s_nThreadCount = cfg.get_size_t( "ThreadCount", s_nThreadCount );
169             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
170
171             if ( s_nThreadCount == 0u )
172                 s_nThreadCount = 1;
173             if ( s_nQueueSize == 0u )
174                 s_nQueueSize = 1000;
175         }
176
177         //static void TearDownTestCase();
178
179     protected:
180         template <class Queue>
181         void analyze( Queue& q  )
182         {
183             EXPECT_TRUE( q.empty());
184
185             std::vector< size_t > arrPushCount;
186             arrPushCount.resize( s_nThreadCount, 0 );
187
188             size_t nPushTotal = 0;
189             size_t nPopTotal  = 0;
190             size_t nPushError = 0;
191
192             cds_test::thread_pool& pool = get_pool();
193             for ( size_t i = 0; i < pool.size(); ++i ) {
194                 Strain<Queue>& thr = static_cast<Strain<Queue> &>( pool.get(i));
195                 EXPECT_EQ( thr.m_nUndefWriter, 0u );
196                 EXPECT_EQ( thr.m_nRepeatValue, 0u );
197                 EXPECT_EQ( thr.m_nPushError, 0u );
198                 nPushError += thr.m_nPushError;
199
200                 arrPushCount[ thr.id() ] += thr.m_nPushCount;
201
202                 nPushTotal += thr.m_nPushCount;
203                 nPopTotal += thr.m_nPopCount;
204             }
205
206             EXPECT_EQ( nPushTotal, s_nQueueSize );
207             EXPECT_EQ( nPopTotal, s_nQueueSize );
208
209             size_t const nThreadPushCount = s_nQueueSize / s_nThreadCount;
210             for ( size_t i = 0; i < s_nThreadCount; ++i )
211                 EXPECT_EQ( arrPushCount[i], nThreadPushCount ) << "thread=" << i;
212         }
213
214         template <class Queue>
215         void test( Queue& q )
216         {
217             size_t nThreadPushCount = s_nQueueSize / s_nThreadCount;
218
219             cds_test::thread_pool& pool = get_pool();
220             pool.add( new Strain<Queue>( pool, q, nThreadPushCount ), s_nThreadCount );
221
222             s_nQueueSize = nThreadPushCount * s_nThreadCount;
223             propout() << std::make_pair( "thread_count", s_nThreadCount )
224                 << std::make_pair( "push_count", s_nQueueSize );
225
226             s_nProducerCount.store( pool.size(), std::memory_order_release );
227             std::chrono::milliseconds duration = pool.run();
228             propout() << std::make_pair( "duration", duration );
229
230             analyze( q );
231
232             propout() << q.statistics();
233         }
234     };
235
236     CDSSTRESS_MSQueue( queue_random )
237     CDSSTRESS_MoirQueue( queue_random )
238     CDSSTRESS_BasketQueue( queue_random )
239     CDSSTRESS_OptimsticQueue( queue_random )
240     CDSSTRESS_FCQueue( queue_random )
241     CDSSTRESS_FCDeque( queue_random )
242     CDSSTRESS_RWQueue( queue_random )
243     CDSSTRESS_StdQueue( queue_random )
244
245 #undef CDSSTRESS_Queue_F
246 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
247     TEST_F( test_fixture, type_name ) \
248     { \
249         typedef queue::Types< value_type >::type_name queue_type; \
250         queue_type queue( s_nQueueSize ); \
251         test( queue ); \
252     }
253
254     CDSSTRESS_VyukovQueue( queue_random )
255
256 #undef CDSSTRESS_Queue_F
257
258     // ********************************************************************
259     // SegmentedQueue test
260
261     class segmented_queue_random
262         : public queue_random
263         , public ::testing::WithParamInterface< size_t >
264     {
265         typedef queue_random base_class;
266
267     protected:
268         template <typename Queue>
269         void test()
270         {
271             size_t quasi_factor = GetParam();
272
273             Queue q( quasi_factor );
274             propout() << std::make_pair( "quasi_factor", quasi_factor );
275
276             size_t nThreadPushCount = s_nQueueSize / s_nThreadCount;
277
278             cds_test::thread_pool& pool = get_pool();
279             pool.add( new Strain<Queue>( pool, q, nThreadPushCount, quasi_factor * 2 ), s_nThreadCount );
280
281             s_nQueueSize = nThreadPushCount * s_nThreadCount;
282             propout() << std::make_pair( "thread_count", s_nThreadCount )
283                 << std::make_pair( "push_count", s_nQueueSize );
284
285             s_nProducerCount.store( pool.size(), std::memory_order_release );
286             std::chrono::milliseconds duration = pool.run();
287             propout() << std::make_pair( "duration", duration );
288
289             analyze( q );
290
291             propout() << q.statistics();
292         }
293
294     public:
295         static std::vector< size_t > get_test_parameters()
296         {
297             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push" );
298             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
299             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
300
301             std::vector<size_t> args;
302             if ( bIterative && quasi_factor > 4 ) {
303                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
304                     args.push_back( qf );
305             }
306             else {
307                 if ( quasi_factor > 2 )
308                     args.push_back( quasi_factor );
309                 else
310                     args.push_back( 2 );
311             }
312
313             return args;
314         }
315     };
316
317 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
318     TEST_P( test_fixture, type_name ) \
319     { \
320         typedef typename queue::Types<value_type>::type_name queue_type; \
321         test< queue_type >(); \
322     }
323
324     CDSSTRESS_SegmentedQueue( segmented_queue_random )
325
326 #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
327     static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
328     {
329         return std::to_string( p.param );
330     }
331     INSTANTIATE_TEST_CASE_P( SQ,
332         segmented_queue_random,
333         ::testing::ValuesIn( segmented_queue_random::get_test_parameters(), get_test_parameter_name ));
334 #else
335     INSTANTIATE_TEST_CASE_P( SQ,
336         segmented_queue_random,
337         ::testing::ValuesIn( segmented_queue_random::get_test_parameters() ));
338 #endif
339
340 } // namespace