5720172d5dc24a57b5a65822e42c5b28fd096b28
[libcds.git] / test / stress / queue / push_pop.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #include "queue_type.h"
32
33 #include <vector>
34 #include <algorithm>
35
36 // Multi-threaded queue push/pop test
37 namespace {
38
39     static size_t s_nConsumerThreadCount = 4;
40     static size_t s_nProducerThreadCount = 4;
41     static size_t s_nQueueSize = 4000000;
42
43     static std::atomic<size_t> s_nProducerDone( 0 );
44
45     class queue_push_pop: public cds_test::stress_fixture
46     {
47     protected:
48         struct value_type
49         {
50             size_t      nNo;
51             size_t      nWriterNo;
52         };
53
54         enum {
55             producer_thread,
56             consumer_thread
57         };
58
59         template <class Queue>
60         class Producer: public cds_test::thread
61         {
62             typedef cds_test::thread base_class;
63
64         public:
65             Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
66                 : base_class( pool, producer_thread )
67                 , m_Queue( queue )
68                 , m_nPushFailed( 0 )
69                 , m_nPushCount( nPushCount )
70             {}
71
72             Producer( Producer& src )
73                 : base_class( src )
74                 , m_Queue( src.m_Queue )
75                 , m_nPushFailed( 0 )
76                 , m_nPushCount( src.m_nPushCount )
77             {}
78
79             virtual thread * clone()
80             {
81                 return new Producer( *this );
82             }
83
84             virtual void test()
85             {
86                 size_t const nPushCount = m_nPushCount;
87                 value_type v;
88                 v.nWriterNo = id();
89                 v.nNo = 0;
90                 m_nPushFailed = 0;
91
92                 while ( v.nNo < nPushCount ) {
93                     if ( m_Queue.push( v ))
94                         ++v.nNo;
95                     else
96                         ++m_nPushFailed;
97                 }
98
99                 s_nProducerDone.fetch_add( 1 );
100             }
101
102         public:
103             Queue&              m_Queue;
104             size_t              m_nPushFailed;
105             size_t const        m_nPushCount;
106         };
107
108         template <class Queue>
109         class Consumer: public cds_test::thread
110         {
111             typedef cds_test::thread base_class;
112
113         public:
114             Queue&              m_Queue;
115             size_t const        m_nPushPerProducer;
116             size_t              m_nPopEmpty;
117             size_t              m_nPopped;
118             size_t              m_nBadWriter;
119
120             typedef std::vector<size_t> popped_data;
121             typedef std::vector<size_t>::iterator       data_iterator;
122             typedef std::vector<size_t>::const_iterator const_data_iterator;
123
124             std::vector<popped_data>        m_WriterData;
125
126         private:
127             void initPoppedData()
128             {
129                 const size_t nProducerCount = s_nProducerThreadCount;
130                 m_WriterData.resize( nProducerCount );
131                 for ( size_t i = 0; i < nProducerCount; ++i )
132                     m_WriterData[i].reserve( m_nPushPerProducer );
133             }
134
135         public:
136             Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
137                 : base_class( pool, consumer_thread )
138                 , m_Queue( queue )
139                 , m_nPushPerProducer( nPushPerProducer )
140                 , m_nPopEmpty( 0 )
141                 , m_nPopped( 0 )
142                 , m_nBadWriter( 0 )
143             {
144                 initPoppedData();
145             }
146             Consumer( Consumer& src )
147                 : base_class( src )
148                 , m_Queue( src.m_Queue )
149                 , m_nPushPerProducer( src.m_nPushPerProducer )
150                 , m_nPopEmpty( 0 )
151                 , m_nPopped( 0 )
152                 , m_nBadWriter( 0 )
153             {
154                 initPoppedData();
155             }
156
157             virtual thread * clone()
158             {
159                 return new Consumer( *this );
160             }
161
162             virtual void test()
163             {
164                 m_nPopEmpty = 0;
165                 m_nPopped = 0;
166                 m_nBadWriter = 0;
167                 const size_t nTotalWriters = s_nProducerThreadCount;
168                 value_type v;
169                 while ( true ) {
170                     if ( m_Queue.pop( v ) ) {
171                         ++m_nPopped;
172                         if ( v.nWriterNo < nTotalWriters )
173                             m_WriterData[ v.nWriterNo ].push_back( v.nNo );
174                         else
175                             ++m_nBadWriter;
176                     }
177                     else
178                         ++m_nPopEmpty;
179
180                     if ( m_Queue.empty() ) {
181                         if ( s_nProducerDone.load() >= nTotalWriters ) {
182                             if ( m_Queue.empty() )
183                                 break;
184                         }
185                     }
186                 }
187             }
188         };
189
190     protected:
191         size_t m_nThreadPushCount;
192
193     protected:
194         template <class Queue>
195         void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
196         {
197             cds_test::thread_pool& pool = get_pool();
198
199             typedef Consumer<Queue> Consumer;
200             typedef Producer<Queue> Producer;
201
202             size_t nPostTestPops = 0;
203             {
204                 value_type v;
205                 while ( q.pop( v ))
206                     ++nPostTestPops;
207             }
208
209             size_t nTotalPops = 0;
210             size_t nPopFalse = 0;
211             size_t nPoppedItems = 0;
212             size_t nPushFailed = 0;
213
214             std::vector< Consumer * > arrConsumer;
215
216             for ( size_t i = 0; i < pool.size(); ++i ) {
217                 cds_test::thread& thr = pool.get(i);
218                 if ( thr.type() == consumer_thread ) {
219                     Consumer& consumer = static_cast<Consumer&>( thr );
220                     nTotalPops += consumer.m_nPopped;
221                     nPopFalse += consumer.m_nPopEmpty;
222                     arrConsumer.push_back( &consumer );
223                     EXPECT_EQ( consumer.m_nBadWriter, 0 ) << "consumer_thread_no " << i;
224
225                     size_t nPopped = 0;
226                     for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
227                         nPopped += consumer.m_WriterData[n].size();
228
229                     nPoppedItems += nPopped;
230                 }
231                 else {
232                     assert( thr.type() == producer_thread );
233
234                     Producer& producer = static_cast<Producer&>( thr );
235                     nPushFailed += producer.m_nPushFailed;
236                     EXPECT_EQ( producer.m_nPushFailed, 0 ) << "producer_thread_no " << i;
237                 }
238             }
239             EXPECT_EQ( nTotalPops, nPoppedItems );
240
241             EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
242             EXPECT_TRUE( q.empty() );
243
244             // Test consistency of popped sequence
245             for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
246                 std::vector<size_t> arrData;
247                 arrData.reserve( m_nThreadPushCount );
248                 for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
249                     auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
250                     auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
251                     if ( it != itEnd ) {
252                         auto itPrev = it;
253                         for ( ++it; it != itEnd; ++it ) {
254                             EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
255                             itPrev = it;
256                         }
257                     }
258
259                     for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
260                         arrData.push_back( *it );
261                 }
262
263                 std::sort( arrData.begin(), arrData.end() );
264                 for ( size_t i=1; i < arrData.size(); ++i ) {
265                     EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
266                 }
267
268                 EXPECT_EQ( arrData[0], 0 ) << "producer=" << nWriter;
269                 EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
270             }
271         }
272
273         template <class Queue>
274         void test_queue( Queue& q )
275         {
276             m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
277
278             cds_test::thread_pool& pool = get_pool();
279             pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
280             pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
281
282             s_nProducerDone.store( 0 );
283             s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
284
285             propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
286                 << std::make_pair( "consumer_count", s_nConsumerThreadCount )
287                 << std::make_pair( "push_count", s_nQueueSize );
288
289             std::chrono::milliseconds duration = pool.run();
290
291             propout() << std::make_pair( "duration", duration );
292         }
293
294         template <class Queue>
295         void test( Queue& q )
296         {
297             test_queue( q );
298             analyze( q );
299             propout() << q.statistics();
300         }
301
302     public:
303         static void SetUpTestCase()\r
304         {\r
305             cds_test::config const& cfg = get_config( "queue_push_pop" );\r
306 \r
307             s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
308             s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
309             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
310
311             if ( s_nConsumerThreadCount == 0 )
312                 s_nConsumerThreadCount = 1;
313             if ( s_nProducerThreadCount == 0 )
314                 s_nProducerThreadCount = 1;
315             if ( s_nQueueSize == 0 )
316                 s_nQueueSize = 1000;
317         }\r
318 \r
319         //static void TearDownTestCase();\r
320     };
321
322     CDSSTRESS_MSQueue( queue_push_pop )
323     CDSSTRESS_MoirQueue( queue_push_pop )
324     CDSSTRESS_BasketQueue( queue_push_pop )
325     CDSSTRESS_OptimsticQueue( queue_push_pop )
326     CDSSTRESS_FCQueue( queue_push_pop )
327     CDSSTRESS_FCDeque( queue_push_pop )
328     CDSSTRESS_RWQueue( queue_push_pop )
329     CDSSTRESS_StdQueue( queue_push_pop )
330
331 #undef CDSSTRESS_Queue_F
332 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
333     TEST_F( test_fixture, type_name ) \
334     { \
335         typedef queue::Types< value_type >::type_name queue_type; \
336         queue_type queue( s_nQueueSize ); \
337         test( queue ); \
338     }
339
340     CDSSTRESS_TsigasQueue( queue_push_pop )
341     CDSSTRESS_VyukovQueue( queue_push_pop )
342
343 #undef CDSSTRESS_Queue_F
344
345
346     // ********************************************************************
347     // SegmentedQueue test
348
349     class segmented_queue_push_pop
350         : public queue_push_pop
351         , public ::testing::WithParamInterface< size_t >
352     {
353         typedef queue_push_pop base_class;
354
355     protected:
356
357         template <typename Queue>
358         void test()
359         {
360             size_t quasi_factor = GetParam();
361
362             Queue q( quasi_factor );
363             propout() << std::make_pair( "quasi_factor", quasi_factor );
364             base_class::test_queue( q );
365             analyze( q, quasi_factor * 2, quasi_factor );
366             propout() << q.statistics();
367         }
368
369     public:
370         static std::vector< size_t > get_test_parameters()
371         {
372             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
373             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
374             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
375
376             std::vector<size_t> args;
377             if ( bIterative && quasi_factor > 4 ) {
378                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
379                     args.push_back( qf );
380             }
381             else {
382                 if ( quasi_factor > 2 )
383                     args.push_back( quasi_factor );
384                 else
385                     args.push_back( 2 );
386             }
387
388             return args;
389         }
390     };
391
392 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
393     TEST_P( test_fixture, type_name ) \
394     { \
395         typedef typename queue::Types<value_type>::type_name queue_type; \
396         test< queue_type >(); \
397     }
398
399     CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
400
401     INSTANTIATE_TEST_CASE_P( SQ,
402         segmented_queue_push_pop,
403         ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()));
404
405 } // namespace