65b81a264498681c0bbdfb627f4181caae9bba62
[libcds.git] / test / stress / queue / random.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #include "queue_type.h"
32
33 #include <vector>
34
35 // Multi-threaded queue test for random push/pop operation
36 namespace {
37
38     static size_t s_nThreadCount = 16;
39     static size_t s_nQueueSize = 10000000;
40
41     std::atomic< size_t > s_nProducerCount(0);
42
43     class queue_random: public cds_test::stress_fixture
44     {
45         typedef cds_test::stress_fixture base_class;
46
47     protected:
48         struct value_type {
49             size_t      nNo;
50             size_t      nThread;
51
52             value_type() {}
53             value_type( size_t n ) : nNo( n ) {}
54         };
55
56         template <class Queue>
57         class Strain: public cds_test::thread
58         {
59             typedef cds_test::thread base_class;
60
61         public:
62             Strain( cds_test::thread_pool& pool, Queue& q, size_t nPushCount, size_t nSpread = 0 )
63                 : base_class( pool )
64                 , m_Queue( q )
65                 , m_nSpread( nSpread )
66                 , m_nTotalPushCount( nPushCount )
67             {}
68
69             Strain( Strain& src )
70                 : base_class( src )
71                 , m_Queue( src.m_Queue )
72                 , m_nSpread( src.m_nSpread )
73                 , m_nTotalPushCount( src.m_nTotalPushCount )
74             {}
75
76             virtual thread * clone()
77             {
78                 return new Strain( *this );
79             }
80
81             virtual void test()
82             {
83                 size_t const nThreadCount = s_nThreadCount;
84                 size_t const nTotalPush = m_nTotalPushCount;
85
86                 m_arrLastRead.resize( nThreadCount, 0 );
87                 m_arrPopCountPerThread.resize( nThreadCount, 0 );
88
89                 value_type node;
90
91                 while ( m_nPushCount < nTotalPush ) {
92                     if ( (rand() & 3) != 3 ) {
93                         node.nThread = id();
94                         node.nNo = ++m_nPushCount;
95                         if ( !m_Queue.push( node )) {
96                             ++m_nPushError;
97                             --m_nPushCount;
98                         }
99                     }
100                     else
101                         pop( nThreadCount );
102                 }
103
104                 s_nProducerCount.fetch_sub( 1, std::memory_order_relaxed );
105
106                 while ( !m_Queue.empty() || s_nProducerCount.load( std::memory_order_relaxed ) != 0 )
107                     pop( nThreadCount );
108             }
109
110             bool pop( size_t nThreadCount )
111             {
112                 value_type node;
113                 node.nThread = nThreadCount;
114                 node.nNo = ~0;
115                 if ( m_Queue.pop( node )) {
116                     ++m_nPopCount;
117                     if ( node.nThread < nThreadCount ) {
118                         m_arrPopCountPerThread[ node.nThread ] += 1;
119                         if ( m_nSpread ) {
120                             if ( m_arrLastRead[ node.nThread ] > node.nNo ) {
121                                 if ( m_arrLastRead[ node.nThread ] - node.nNo > m_nSpread )
122                                     ++m_nRepeatValue;
123                             }
124                             else if ( m_arrLastRead[ node.nThread ] == node.nNo )
125                                 ++m_nRepeatValue;
126                             m_arrLastRead[ node.nThread ] = node.nNo;
127                         }
128                         else {
129                             if ( m_arrLastRead[ node.nThread ] < node.nNo )
130                                 m_arrLastRead[ node.nThread ] = node.nNo;
131                             else
132                                 ++m_nRepeatValue;
133                         }
134                     }
135                     else
136                         ++m_nUndefWriter;
137                 }
138                 else {
139                     ++m_nEmptyPop;
140                     return false;
141                 }
142                 return true;
143             }
144
145         public:
146             Queue&              m_Queue;
147
148             size_t  m_nPushCount = 0;
149             size_t  m_nPopCount  = 0;
150             size_t  m_nEmptyPop  = 0;
151
152             size_t  m_nUndefWriter = 0;
153             size_t  m_nRepeatValue = 0;
154             size_t  m_nPushError   = 0;
155
156             std::vector<size_t> m_arrLastRead;
157             std::vector<size_t> m_arrPopCountPerThread;
158
159             size_t const m_nSpread;
160             size_t const m_nTotalPushCount;
161         };
162
163     public:
164         static void SetUpTestCase()\r
165         {\r
166             cds_test::config const& cfg = get_config( "queue_random" );\r
167 \r
168             s_nThreadCount = cfg.get_size_t( "ThreadCount", s_nThreadCount );
169             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
170
171             if ( s_nThreadCount == 0 )
172                 s_nThreadCount = 1;
173             if ( s_nQueueSize == 0 )
174                 s_nQueueSize = 1000;
175         }\r
176 \r
177         //static void TearDownTestCase();\r
178
179     protected:
180         template <class Queue>
181         void analyze( Queue& q  )
182         {
183             EXPECT_TRUE( q.empty() );
184
185             std::vector< size_t > arrPushCount;
186             arrPushCount.resize( s_nThreadCount, 0 );
187
188             size_t nPushTotal = 0;
189             size_t nPopTotal  = 0;
190             size_t nPushError = 0;
191
192             cds_test::thread_pool& pool = get_pool();
193             for ( size_t i = 0; i < pool.size(); ++i ) {
194                 Strain<Queue>& thr = static_cast<Strain<Queue> &>( pool.get(i));
195                 EXPECT_EQ( thr.m_nUndefWriter, 0 );
196                 EXPECT_EQ( thr.m_nRepeatValue, 0 );
197                 EXPECT_EQ( thr.m_nPushError, 0 );
198                 nPushError += thr.m_nPushError;
199
200                 arrPushCount[ thr.id() ] += thr.m_nPushCount;
201
202                 nPushTotal += thr.m_nPushCount;
203                 nPopTotal += thr.m_nPopCount;
204             }
205
206             EXPECT_EQ( nPushTotal, s_nQueueSize );
207             EXPECT_EQ( nPopTotal, s_nQueueSize );
208
209             size_t const nThreadPushCount = s_nQueueSize / s_nThreadCount;
210             for ( size_t i = 0; i < s_nThreadCount; ++i )
211                 EXPECT_EQ( arrPushCount[i], nThreadPushCount ) << "thread=" << i;
212         }
213
214         template <class Queue>
215         void test( Queue& q )
216         {
217             size_t nThreadPushCount = s_nQueueSize / s_nThreadCount;
218
219             cds_test::thread_pool& pool = get_pool();
220             pool.add( new Strain<Queue>( pool, q, nThreadPushCount ), s_nThreadCount );
221
222             s_nQueueSize = nThreadPushCount * s_nThreadCount;
223             propout() << std::make_pair( "thread_count", s_nThreadCount )
224                 << std::make_pair( "push_count", s_nQueueSize );
225
226             s_nProducerCount.store( pool.size(), std::memory_order_release );
227             std::chrono::milliseconds duration = pool.run();
228             propout() << std::make_pair( "duration", duration );
229
230             analyze( q );
231
232             propout() << q.statistics();
233         }
234     };
235
236     CDSSTRESS_MSQueue( queue_random )
237     CDSSTRESS_MoirQueue( queue_random )
238     CDSSTRESS_BasketQueue( queue_random )
239     CDSSTRESS_OptimsticQueue( queue_random )
240     CDSSTRESS_FCQueue( queue_random )
241     CDSSTRESS_FCDeque( queue_random )
242     CDSSTRESS_RWQueue( queue_random )
243     CDSSTRESS_StdQueue( queue_random )
244
245 #undef CDSSTRESS_Queue_F
246 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
247     TEST_F( test_fixture, type_name ) \
248     { \
249         typedef queue::Types< value_type >::type_name queue_type; \
250         queue_type queue( s_nQueueSize ); \
251         test( queue ); \
252     }
253
254     CDSSTRESS_TsigasQueue( queue_random )
255     CDSSTRESS_VyukovQueue( queue_random )
256
257 #undef CDSSTRESS_Queue_F
258
259     // ********************************************************************
260     // SegmentedQueue test
261
262     class segmented_queue_random
263         : public queue_random
264         , public ::testing::WithParamInterface< size_t >
265     {
266         typedef queue_random base_class;
267
268     protected:
269         template <typename Queue>
270         void test()
271         {
272             size_t quasi_factor = GetParam();
273
274             Queue q( quasi_factor );
275             propout() << std::make_pair( "quasi_factor", quasi_factor );
276
277             size_t nThreadPushCount = s_nQueueSize / s_nThreadCount;
278
279             cds_test::thread_pool& pool = get_pool();
280             pool.add( new Strain<Queue>( pool, q, nThreadPushCount, quasi_factor * 2 ), s_nThreadCount );
281
282             s_nQueueSize = nThreadPushCount * s_nThreadCount;
283             propout() << std::make_pair( "thread_count", s_nThreadCount )
284                 << std::make_pair( "push_count", s_nQueueSize );
285
286             s_nProducerCount.store( pool.size(), std::memory_order_release );
287             std::chrono::milliseconds duration = pool.run();
288             propout() << std::make_pair( "duration", duration );
289
290             analyze( q );
291
292             propout() << q.statistics();
293         }
294
295     public:
296         static std::vector< size_t > get_test_parameters()
297         {
298             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push" );
299             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
300             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
301
302             std::vector<size_t> args;
303             if ( bIterative && quasi_factor > 4 ) {
304                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
305                     args.push_back( qf );
306             }
307             else {
308                 if ( quasi_factor > 2 )
309                     args.push_back( quasi_factor );
310                 else
311                     args.push_back( 2 );
312             }
313
314             return args;
315         }
316     };
317
318 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
319     TEST_P( test_fixture, type_name ) \
320     { \
321         typedef typename queue::Types<value_type>::type_name queue_type; \
322         test< queue_type >(); \
323     }
324
325     CDSSTRESS_SegmentedQueue( segmented_queue_random )
326
327     INSTANTIATE_TEST_CASE_P( SQ,
328         segmented_queue_random,
329         ::testing::ValuesIn( segmented_queue_random::get_test_parameters()));
330 } // namespace