Merge branch 'integration' of github.com:khizmax/libcds into integration
[libcds.git] / test / stress / queue / random.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "queue_type.h"
32
33 #include <vector>
34
35 // Multi-threaded queue test for random push/pop operation
36 namespace {
37
38     static size_t s_nThreadCount = 16;
39     static size_t s_nQueueSize = 10000000;
40
41     std::atomic< size_t > s_nProducerCount(0);
42
43     class queue_random: public cds_test::stress_fixture
44     {
45         typedef cds_test::stress_fixture base_class;
46
47     protected:
48         struct value_type {
49             size_t      nNo;
50             size_t      nThread;
51
52             value_type() {}
53             value_type( size_t n ) : nNo( n ) {}
54         };
55
56         template <class Queue>
57         class Strain: public cds_test::thread
58         {
59             typedef cds_test::thread base_class;
60
61         public:
62             Strain( cds_test::thread_pool& pool, Queue& q, size_t nPushCount, size_t nSpread = 0 )
63                 : base_class( pool )
64                 , m_Queue( q )
65                 , m_nSpread( nSpread )
66                 , m_nTotalPushCount( nPushCount )
67             {}
68
69             Strain( Strain& src )
70                 : base_class( src )
71                 , m_Queue( src.m_Queue )
72                 , m_nSpread( src.m_nSpread )
73                 , m_nTotalPushCount( src.m_nTotalPushCount )
74             {}
75
76             virtual thread * clone()
77             {
78                 return new Strain( *this );
79             }
80
81             virtual void test()
82             {
83                 size_t const nThreadCount = s_nThreadCount;
84                 size_t const nTotalPush = m_nTotalPushCount;
85
86                 m_arrLastRead.resize( nThreadCount, 0 );
87                 m_arrPopCountPerThread.resize( nThreadCount, 0 );
88
89                 value_type node;
90
91                 while ( m_nPushCount < nTotalPush ) {
92                     if ( ( std::rand() & 3) != 3 ) {
93                         node.nThread = id();
94                         node.nNo = ++m_nPushCount;
95                         if ( !m_Queue.push( node )) {
96                             ++m_nPushError;
97                             --m_nPushCount;
98                         }
99                     }
100                     else
101                         pop( nThreadCount );
102                 }
103
104                 s_nProducerCount.fetch_sub( 1, std::memory_order_relaxed );
105
106                 while ( !m_Queue.empty() || s_nProducerCount.load( std::memory_order_relaxed ) != 0 )
107                     pop( nThreadCount );
108             }
109
110             bool pop( size_t nThreadCount )
111             {
112                 value_type node;
113                 node.nThread = nThreadCount;
114                 node.nNo = ~0;
115                 if ( m_Queue.pop( node )) {
116                     ++m_nPopCount;
117                     if ( node.nThread < nThreadCount ) {
118                         m_arrPopCountPerThread[ node.nThread ] += 1;
119                         if ( m_nSpread ) {
120                             if ( m_arrLastRead[ node.nThread ] > node.nNo ) {
121                                 if ( m_arrLastRead[ node.nThread ] - node.nNo > m_nSpread )
122                                     ++m_nRepeatValue;
123                             }
124                             else if ( m_arrLastRead[ node.nThread ] == node.nNo )
125                                 ++m_nRepeatValue;
126                             m_arrLastRead[ node.nThread ] = node.nNo;
127                         }
128                         else {
129                             if ( m_arrLastRead[ node.nThread ] < node.nNo )
130                                 m_arrLastRead[ node.nThread ] = node.nNo;
131                             else
132                                 ++m_nRepeatValue;
133                         }
134                     }
135                     else
136                         ++m_nUndefWriter;
137                 }
138                 else {
139                     ++m_nEmptyPop;
140                     return false;
141                 }
142                 return true;
143             }
144
145         public:
146             Queue&              m_Queue;
147
148             size_t  m_nPushCount = 0;
149             size_t  m_nPopCount  = 0;
150             size_t  m_nEmptyPop  = 0;
151
152             size_t  m_nUndefWriter = 0;
153             size_t  m_nRepeatValue = 0;
154             size_t  m_nPushError   = 0;
155
156             std::vector<size_t> m_arrLastRead;
157             std::vector<size_t> m_arrPopCountPerThread;
158
159             size_t const m_nSpread;
160             size_t const m_nTotalPushCount;
161         };
162
163     public:
164         static void SetUpTestCase()\r
165         {\r
166             cds_test::config const& cfg = get_config( "queue_random" );\r
167 \r
168             s_nThreadCount = cfg.get_size_t( "ThreadCount", s_nThreadCount );
169             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
170
171             if ( s_nThreadCount == 0 )
172                 s_nThreadCount = 1;
173             if ( s_nQueueSize == 0 )
174                 s_nQueueSize = 1000;
175         }\r
176 \r
177         //static void TearDownTestCase();\r
178
179     protected:
180         template <class Queue>
181         void analyze( Queue& q  )
182         {
183             EXPECT_TRUE( q.empty() );
184
185             std::vector< size_t > arrPushCount;
186             arrPushCount.resize( s_nThreadCount, 0 );
187
188             size_t nPushTotal = 0;
189             size_t nPopTotal  = 0;
190             size_t nPushError = 0;
191
192             cds_test::thread_pool& pool = get_pool();
193             for ( size_t i = 0; i < pool.size(); ++i ) {
194                 Strain<Queue>& thr = static_cast<Strain<Queue> &>( pool.get(i));
195                 EXPECT_EQ( thr.m_nUndefWriter, 0 );
196                 EXPECT_EQ( thr.m_nRepeatValue, 0 );
197                 EXPECT_EQ( thr.m_nPushError, 0 );
198                 nPushError += thr.m_nPushError;
199
200                 arrPushCount[ thr.id() ] += thr.m_nPushCount;
201
202                 nPushTotal += thr.m_nPushCount;
203                 nPopTotal += thr.m_nPopCount;
204             }
205
206             EXPECT_EQ( nPushTotal, s_nQueueSize );
207             EXPECT_EQ( nPopTotal, s_nQueueSize );
208
209             size_t const nThreadPushCount = s_nQueueSize / s_nThreadCount;
210             for ( size_t i = 0; i < s_nThreadCount; ++i )
211                 EXPECT_EQ( arrPushCount[i], nThreadPushCount ) << "thread=" << i;
212         }
213
214         template <class Queue>
215         void test( Queue& q )
216         {
217             size_t nThreadPushCount = s_nQueueSize / s_nThreadCount;
218
219             cds_test::thread_pool& pool = get_pool();
220             pool.add( new Strain<Queue>( pool, q, nThreadPushCount ), s_nThreadCount );
221
222             s_nQueueSize = nThreadPushCount * s_nThreadCount;
223             propout() << std::make_pair( "thread_count", s_nThreadCount )
224                 << std::make_pair( "push_count", s_nQueueSize );
225
226             s_nProducerCount.store( pool.size(), std::memory_order_release );
227             std::chrono::milliseconds duration = pool.run();
228             propout() << std::make_pair( "duration", duration );
229
230             analyze( q );
231
232             propout() << q.statistics();
233         }
234     };
235
236     CDSSTRESS_MSQueue( queue_random )
237     CDSSTRESS_MoirQueue( queue_random )
238     CDSSTRESS_BasketQueue( queue_random )
239     CDSSTRESS_OptimsticQueue( queue_random )
240     CDSSTRESS_FCQueue( queue_random )
241     CDSSTRESS_FCDeque( queue_random )
242     CDSSTRESS_RWQueue( queue_random )
243     CDSSTRESS_StdQueue( queue_random )
244
245 #undef CDSSTRESS_Queue_F
246 #define CDSSTRESS_Queue_F( test_fixture, type_name, level ) \
247     TEST_F( test_fixture, type_name ) \
248     { \
249         if ( !check_detail_level( level )) return; \
250         typedef queue::Types< value_type >::type_name queue_type; \
251         queue_type queue( s_nQueueSize ); \
252         test( queue ); \
253     }
254
255     CDSSTRESS_TsigasQueue( queue_random )
256     CDSSTRESS_VyukovQueue( queue_random )
257
258 #undef CDSSTRESS_Queue_F
259
260     // ********************************************************************
261     // SegmentedQueue test
262
263     class segmented_queue_random
264         : public queue_random
265         , public ::testing::WithParamInterface< size_t >
266     {
267         typedef queue_random base_class;
268
269     protected:
270         template <typename Queue>
271         void test()
272         {
273             size_t quasi_factor = GetParam();
274
275             Queue q( quasi_factor );
276             propout() << std::make_pair( "quasi_factor", quasi_factor );
277
278             size_t nThreadPushCount = s_nQueueSize / s_nThreadCount;
279
280             cds_test::thread_pool& pool = get_pool();
281             pool.add( new Strain<Queue>( pool, q, nThreadPushCount, quasi_factor * 2 ), s_nThreadCount );
282
283             s_nQueueSize = nThreadPushCount * s_nThreadCount;
284             propout() << std::make_pair( "thread_count", s_nThreadCount )
285                 << std::make_pair( "push_count", s_nQueueSize );
286
287             s_nProducerCount.store( pool.size(), std::memory_order_release );
288             std::chrono::milliseconds duration = pool.run();
289             propout() << std::make_pair( "duration", duration );
290
291             analyze( q );
292
293             propout() << q.statistics();
294         }
295
296     public:
297         static std::vector< size_t > get_test_parameters()
298         {
299             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push" );
300             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
301             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
302
303             std::vector<size_t> args;
304             if ( bIterative && quasi_factor > 4 ) {
305                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
306                     args.push_back( qf );
307             }
308             else {
309                 if ( quasi_factor > 2 )
310                     args.push_back( quasi_factor );
311                 else
312                     args.push_back( 2 );
313             }
314
315             return args;
316         }
317     };
318
319 #define CDSSTRESS_Queue_F( test_fixture, type_name, level ) \
320     TEST_P( test_fixture, type_name ) \
321     { \
322         if ( !check_detail_level( level )) return; \
323         typedef typename queue::Types<value_type>::type_name queue_type; \
324         test< queue_type >(); \
325     }
326
327     CDSSTRESS_SegmentedQueue( segmented_queue_random )
328
329     INSTANTIATE_TEST_CASE_P( SQ,
330         segmented_queue_random,
331         ::testing::ValuesIn( segmented_queue_random::get_test_parameters()));
332 } // namespace