Migrated queue stress test to gtest framework
[libcds.git] / test / stress / queue / push_pop.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #include "queue_type.h"
32
33 #include <vector>
34 #include <algorithm>
35
36 // Multi-threaded queue push/pop test
37 namespace {
38
39     static size_t s_nConsumerThreadCount = 4;
40     static size_t s_nProducerThreadCount = 4;
41     static size_t s_nQueueSize = 4000000;
42
43     static std::atomic<size_t> s_nProducerDone( 0 );
44
45     class queue_push_pop: public cds_test::stress_fixture
46     {
47     protected:
48         struct value_type
49         {
50             size_t      nNo;
51             size_t      nWriterNo;
52         };
53
54         enum {
55             producer_thread,
56             consumer_thread
57         };
58
59         template <class Queue>
60         class Producer: public cds_test::thread
61         {
62             typedef cds_test::thread base_class;
63
64         public:
65             Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
66                 : base_class( pool, producer_thread )
67                 , m_Queue( queue )
68                 , m_nPushFailed( 0 )
69                 , m_nPushCount( nPushCount )
70             {}
71
72             Producer( Producer& src )
73                 : base_class( src )
74                 , m_Queue( src.m_Queue )
75                 , m_nPushFailed( 0 )
76                 , m_nPushCount( src.m_nPushCount )
77             {}
78
79             virtual thread * clone()
80             {
81                 return new Producer( *this );
82             }
83
84             virtual void test()
85             {
86                 size_t const nPushCount = m_nPushCount;
87                 value_type v;
88                 v.nWriterNo = id();
89                 v.nNo = 0;
90                 m_nPushFailed = 0;
91
92                 while ( v.nNo < nPushCount ) {
93                     if ( m_Queue.push( v ))
94                         ++v.nNo;
95                     else
96                         ++m_nPushFailed;
97                 }
98
99                 s_nProducerDone.fetch_add( 1 );
100             }
101         public:
102             Queue&              m_Queue;
103             size_t              m_nPushFailed;
104             size_t const        m_nPushCount;
105         };
106
107         template <class Queue>
108         class Consumer: public cds_test::thread
109         {
110             typedef cds_test::thread base_class;
111
112         public:
113             Queue&              m_Queue;
114             size_t const        m_nPushPerProducer;
115             size_t              m_nPopEmpty;
116             size_t              m_nPopped;
117             size_t              m_nBadWriter;
118
119             typedef std::vector<size_t> popped_data;
120             typedef std::vector<size_t>::iterator       data_iterator;
121             typedef std::vector<size_t>::const_iterator const_data_iterator;
122
123             std::vector<popped_data>        m_WriterData;
124
125         private:
126             void initPoppedData()
127             {
128                 const size_t nProducerCount = s_nProducerThreadCount;
129                 m_WriterData.resize( nProducerCount );
130                 for ( size_t i = 0; i < nProducerCount; ++i )
131                     m_WriterData[i].reserve( m_nPushPerProducer );
132             }
133
134         public:
135             Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
136                 : base_class( pool, consumer_thread )
137                 , m_Queue( queue )
138                 , m_nPushPerProducer( nPushPerProducer )
139                 , m_nPopEmpty( 0 )
140                 , m_nPopped( 0 )
141                 , m_nBadWriter( 0 )
142             {
143                 initPoppedData();
144             }
145             Consumer( Consumer& src )
146                 : base_class( src )
147                 , m_Queue( src.m_Queue )
148                 , m_nPushPerProducer( src.m_nPushPerProducer )
149                 , m_nPopEmpty( 0 )
150                 , m_nPopped( 0 )
151                 , m_nBadWriter( 0 )
152             {
153                 initPoppedData();
154             }
155
156             virtual thread * clone()
157             {
158                 return new Consumer( *this );
159             }
160
161             virtual void test()
162             {
163                 m_nPopEmpty = 0;
164                 m_nPopped = 0;
165                 m_nBadWriter = 0;
166                 const size_t nTotalWriters = s_nProducerThreadCount;
167                 value_type v;
168                 while ( true ) {
169                     if ( m_Queue.pop( v ) ) {
170                         ++m_nPopped;
171                         if ( v.nWriterNo < nTotalWriters )
172                             m_WriterData[ v.nWriterNo ].push_back( v.nNo );
173                         else
174                             ++m_nBadWriter;
175                     }
176                     else
177                         ++m_nPopEmpty;
178
179                     if ( m_Queue.empty() ) {
180                         if ( s_nProducerDone.load() >= nTotalWriters ) {
181                             if ( m_Queue.empty() )
182                                 break;
183                         }
184                     }
185                 }
186             }
187         };
188
189     protected:
190         size_t m_nThreadPushCount;
191
192     protected:
193         template <class Queue>
194         void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
195         {
196             cds_test::thread_pool& pool = get_pool();
197
198             typedef Consumer<Queue> Consumer;
199             typedef Producer<Queue> Producer;
200
201             size_t nPostTestPops = 0;
202             {
203                 value_type v;
204                 while ( q.pop( v ))
205                     ++nPostTestPops;
206             }
207
208             size_t nTotalPops = 0;
209             size_t nPopFalse = 0;
210             size_t nPoppedItems = 0;
211             size_t nPushFailed = 0;
212
213             std::vector< Consumer * > arrConsumer;
214
215             for ( size_t i = 0; i < pool.size(); ++i ) {
216                 cds_test::thread& thr = pool.get(i);
217                 if ( thr.type() == consumer_thread ) {
218                     Consumer& consumer = static_cast<Consumer&>( thr );
219                     nTotalPops += consumer.m_nPopped;
220                     nPopFalse += consumer.m_nPopEmpty;
221                     arrConsumer.push_back( &consumer );
222                     EXPECT_EQ( consumer.m_nBadWriter, 0 ) << "consumer_thread_no " << i;
223
224                     size_t nPopped = 0;
225                     for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
226                         nPopped += consumer.m_WriterData[n].size();
227
228                     nPoppedItems += nPopped;
229                 }
230                 else {
231                     assert( thr.type() == producer_thread );
232
233                     Producer& producer = static_cast<Producer&>( thr );
234                     nPushFailed += producer.m_nPushFailed;
235                     EXPECT_EQ( producer.m_nPushFailed, 0 ) << "producer_thread_no " << i;
236                 }
237             }
238             EXPECT_EQ( nTotalPops, nPoppedItems );
239
240             EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
241             EXPECT_TRUE( q.empty() );
242
243             // Test consistency of popped sequence
244             for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
245                 std::vector<size_t> arrData;
246                 arrData.reserve( m_nThreadPushCount );
247                 for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
248                     auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
249                     auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
250                     if ( it != itEnd ) {
251                         auto itPrev = it;
252                         for ( ++it; it != itEnd; ++it ) {
253                             EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
254                             itPrev = it;
255                         }
256                     }
257
258                     for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
259                         arrData.push_back( *it );
260                 }
261
262                 std::sort( arrData.begin(), arrData.end() );
263                 for ( size_t i=1; i < arrData.size(); ++i ) {
264                     EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
265                 }
266
267                 EXPECT_EQ( arrData[0], 0 ) << "producer=" << nWriter;
268                 EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
269             }
270         }
271
272         template <class Queue>
273         void test_queue( Queue& q )
274         {
275             m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
276
277             cds_test::thread_pool& pool = get_pool();
278             pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
279             pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
280
281             s_nProducerDone.store( 0 );
282             s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
283
284             propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
285                 << std::make_pair( "consumer_count", s_nConsumerThreadCount )
286                 << std::make_pair( "push_count", s_nQueueSize );
287
288             std::chrono::milliseconds duration = pool.run();
289
290             propout() << std::make_pair( "duration", duration );
291         }
292
293         template <class Queue>
294         void test( Queue& q )
295         {
296             test_queue( q );
297             analyze( q );
298             propout() << q.statistics();
299         }
300
301     public:
302         static void SetUpTestCase()\r
303         {\r
304             cds_test::config const& cfg = get_config( "queue_push_pop" );\r
305 \r
306             s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
307             s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
308             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
309
310             if ( s_nConsumerThreadCount == 0 )
311                 s_nConsumerThreadCount = 1;
312             if ( s_nProducerThreadCount == 0 )
313                 s_nProducerThreadCount = 1;
314             if ( s_nQueueSize == 0 )
315                 s_nQueueSize = 1000;
316         }\r
317 \r
318         //static void TearDownTestCase();\r
319     };
320
321     CDSSTRESS_MSQueue( queue_push_pop )
322     CDSSTRESS_MoirQueue( queue_push_pop )
323     CDSSTRESS_BasketQueue( queue_push_pop )
324     CDSSTRESS_OptimsticQueue( queue_push_pop )
325     CDSSTRESS_FCQueue( queue_push_pop )
326     CDSSTRESS_FCDeque( queue_push_pop )
327     CDSSTRESS_RWQueue( queue_push_pop )
328     CDSSTRESS_StdQueue( queue_push_pop )
329
330 #undef CDSSTRESS_Queue_F
331 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
332     TEST_F( test_fixture, type_name ) \
333     { \
334         typedef queue::Types< value_type >::type_name queue_type; \
335         queue_type queue( s_nQueueSize ); \
336         test( queue ); \
337     }
338
339     CDSSTRESS_TsigasQueue( queue_push_pop )
340     CDSSTRESS_VyukovQueue( queue_push_pop )
341
342 #undef CDSSTRESS_Queue_F
343
344
345     // ********************************************************************
346     // SegmentedQueue test
347
348     class segmented_queue_push_pop
349         : public queue_push_pop
350         , public ::testing::WithParamInterface< size_t >
351     {
352         typedef queue_push_pop base_class;
353
354     protected:
355
356         template <typename Queue>
357         void test()
358         {
359             size_t quasi_factor = GetParam();
360
361             Queue q( quasi_factor );
362             propout() << std::make_pair( "quasi_factor", quasi_factor );
363             base_class::test_queue( q );
364             analyze( q, quasi_factor * 2, quasi_factor );
365             propout() << q.statistics();
366         }
367
368     public:
369         static std::vector< size_t > get_test_parameters()
370         {
371             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
372             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
373             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
374
375             std::vector<size_t> args;
376             if ( bIterative && quasi_factor > 4 ) {
377                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
378                     args.push_back( qf );
379             }
380             else {
381                 if ( quasi_factor > 2 )
382                     args.push_back( quasi_factor );
383                 else
384                     args.push_back( 2 );
385             }
386
387             return args;
388         }
389     };
390
391 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
392     TEST_P( test_fixture, type_name ) \
393     { \
394         typedef typename queue::Types<value_type>::type_name queue_type; \
395         test< queue_type >(); \
396     }
397
398     CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
399
400     INSTANTIATE_TEST_CASE_P( SQ,
401         segmented_queue_push_pop,
402         ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()));
403
404 } // namespace