0f2b9793a479f848a4a2c551ff062920eff359d1
[libcds.git] / test / stress / queue / push_pop.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "queue_type.h"
32
33 #include <vector>
34 #include <algorithm>
35
36 // Multi-threaded queue push/pop test
37 namespace {
38
39     static size_t s_nConsumerThreadCount = 4;
40     static size_t s_nProducerThreadCount = 4;
41     static size_t s_nQueueSize = 4000000;
42
43     static std::atomic<size_t> s_nProducerDone( 0 );
44
45     struct old_value
46         {
47         size_t nNo;
48                 size_t nWriterNo;
49         };
50
51     template<class Value = old_value>
52     class queue_push_pop: public cds_test::stress_fixture
53     {
54     protected:
55                 using value_type = Value;
56
57             enum {
58             producer_thread,
59             consumer_thread
60         };
61
62         template <class Queue>
63         class Producer: public cds_test::thread
64         {
65             typedef cds_test::thread base_class;
66
67         public:
68             Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
69                 : base_class( pool, producer_thread )
70                 , m_Queue( queue )
71                 , m_nPushFailed( 0 )
72                 , m_nPushCount( nPushCount )
73             {}
74
75             Producer( Producer& src )
76                 : base_class( src )
77                 , m_Queue( src.m_Queue )
78                 , m_nPushFailed( 0 )
79                 , m_nPushCount( src.m_nPushCount )
80             {}
81
82             virtual thread * clone()
83             {
84                 return new Producer( *this );
85             }
86
87             virtual void test()
88             {
89                 size_t const nPushCount = m_nPushCount;
90                 value_type v;
91                 v.nWriterNo = id();
92                 v.nNo = 0;
93                 m_nPushFailed = 0;
94
95                 while ( v.nNo < nPushCount ) {
96                     if ( m_Queue.push( v ))
97                         ++v.nNo;
98                     else
99                         ++m_nPushFailed;
100                 }
101
102                 s_nProducerDone.fetch_add( 1 );
103             }
104
105         public:
106             Queue&              m_Queue;
107             size_t              m_nPushFailed;
108             size_t const        m_nPushCount;
109         };
110
111         template <class Queue>
112         class Consumer: public cds_test::thread
113         {
114             typedef cds_test::thread base_class;
115
116         public:
117             Queue&              m_Queue;
118             size_t const        m_nPushPerProducer;
119             size_t              m_nPopEmpty;
120             size_t              m_nPopped;
121             size_t              m_nBadWriter;
122
123             typedef std::vector<size_t> popped_data;
124             typedef std::vector<size_t>::iterator       data_iterator;
125             typedef std::vector<size_t>::const_iterator const_data_iterator;
126
127             std::vector<popped_data>        m_WriterData;
128
129         private:
130             void initPoppedData()
131             {
132                 const size_t nProducerCount = s_nProducerThreadCount;
133                 m_WriterData.resize( nProducerCount );
134                 for ( size_t i = 0; i < nProducerCount; ++i )
135                     m_WriterData[i].reserve( m_nPushPerProducer );
136             }
137
138         public:
139             Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
140                 : base_class( pool, consumer_thread )
141                 , m_Queue( queue )
142                 , m_nPushPerProducer( nPushPerProducer )
143                 , m_nPopEmpty( 0 )
144                 , m_nPopped( 0 )
145                 , m_nBadWriter( 0 )
146             {
147                 initPoppedData();
148             }
149             Consumer( Consumer& src )
150                 : base_class( src )
151                 , m_Queue( src.m_Queue )
152                 , m_nPushPerProducer( src.m_nPushPerProducer )
153                 , m_nPopEmpty( 0 )
154                 , m_nPopped( 0 )
155                 , m_nBadWriter( 0 )
156             {
157                 initPoppedData();
158             }
159
160             virtual thread * clone()
161             {
162                 return new Consumer( *this );
163             }
164
165             virtual void test()
166             {
167                 m_nPopEmpty = 0;
168                 m_nPopped = 0;
169                 m_nBadWriter = 0;
170                 const size_t nTotalWriters = s_nProducerThreadCount;
171                 value_type v;
172                 while ( true ) {
173                     if ( m_Queue.pop( v )) {
174                         ++m_nPopped;
175                         if ( v.nWriterNo < nTotalWriters )
176                             m_WriterData[ v.nWriterNo ].push_back( v.nNo );
177                         else
178                             ++m_nBadWriter;
179                     }
180                     else {
181                         ++m_nPopEmpty;
182
183                         if ( s_nProducerDone.load() >= nTotalWriters ) {
184                             if ( m_Queue.empty())
185                                 break;
186                         }
187                     }
188                 }
189             }
190         };
191
192     protected:
193         size_t m_nThreadPushCount;
194
195     protected:
196         template <class Queue>
197         void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
198         {
199             cds_test::thread_pool& pool = get_pool();
200
201             typedef Consumer<Queue> Consumer;
202             typedef Producer<Queue> Producer;
203
204             size_t nPostTestPops = 0;
205             {
206                 value_type v;
207                 while ( q.pop( v ))
208                     ++nPostTestPops;
209             }
210
211             size_t nTotalPops = 0;
212             size_t nPopFalse = 0;
213             size_t nPoppedItems = 0;
214             size_t nPushFailed = 0;
215
216             std::vector< Consumer * > arrConsumer;
217
218             for ( size_t i = 0; i < pool.size(); ++i ) {
219                 cds_test::thread& thr = pool.get(i);
220                 if ( thr.type() == consumer_thread ) {
221                     Consumer& consumer = static_cast<Consumer&>( thr );
222                     nTotalPops += consumer.m_nPopped;
223                     nPopFalse += consumer.m_nPopEmpty;
224                     arrConsumer.push_back( &consumer );
225                     EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer_thread_no " << i;
226
227                     size_t nPopped = 0;
228                     for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
229                         nPopped += consumer.m_WriterData[n].size();
230
231                     nPoppedItems += nPopped;
232                 }
233                 else {
234                     assert( thr.type() == producer_thread );
235
236                     Producer& producer = static_cast<Producer&>( thr );
237                     nPushFailed += producer.m_nPushFailed;
238                     EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer_thread_no " << i;
239                 }
240             }
241             EXPECT_EQ( nTotalPops, nPoppedItems );
242
243             EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
244             EXPECT_TRUE( q.empty());
245
246             // Test consistency of popped sequence
247             for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
248                 std::vector<size_t> arrData;
249                 arrData.reserve( m_nThreadPushCount );
250                 for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
251                     auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
252                     auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
253                     if ( it != itEnd ) {
254                         auto itPrev = it;
255                         for ( ++it; it != itEnd; ++it ) {
256                             EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
257                             itPrev = it;
258                         }
259                     }
260
261                     for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
262                         arrData.push_back( *it );
263                 }
264
265                 std::sort( arrData.begin(), arrData.end());
266                 for ( size_t i=1; i < arrData.size(); ++i ) {
267                     EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
268                 }
269
270                 EXPECT_EQ( arrData[0], 0u ) << "producer=" << nWriter;
271                 EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
272             }
273         }
274
275         template <class Queue>
276         void test_queue( Queue& q )
277         {
278             m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
279
280             cds_test::thread_pool& pool = get_pool();
281             pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
282             pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
283
284             s_nProducerDone.store( 0 );
285             s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
286
287             propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
288                 << std::make_pair( "consumer_count", s_nConsumerThreadCount )
289                 << std::make_pair( "push_count", s_nQueueSize );
290
291             std::chrono::milliseconds duration = pool.run();
292
293             propout() << std::make_pair( "duration", duration );
294         }
295
296         template <class Queue>
297         void test( Queue& q )
298         {
299             test_queue( q );
300             analyze( q );
301             propout() << q.statistics();
302         }
303
304     public:
305         static void SetUpTestCase()
306         {
307             cds_test::config const& cfg = get_config( "queue_push_pop" );
308
309             s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
310             s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
311             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
312
313             if ( s_nConsumerThreadCount == 0u )
314                 s_nConsumerThreadCount = 1;
315             if ( s_nProducerThreadCount == 0u )
316                 s_nProducerThreadCount = 1;
317             if ( s_nQueueSize == 0u )
318                 s_nQueueSize = 1000;
319         }
320
321         //static void TearDownTestCase();
322     };
323
324     using value_for_fc_with_heavy_value = queue_push_pop< HeavyValue<10> >;
325     using old_queue_push_pop = queue_push_pop<>;
326
327 //    CDSSTRESS_MSQueue( old_queue_push_pop )
328 //    CDSSTRESS_MoirQueue( old_queue_push_pop )
329 //    CDSSTRESS_BasketQueue( old_queue_push_pop )
330 //    CDSSTRESS_OptimsticQueue( old_queue_push_pop )
331 //    CDSSTRESS_FCQueue( old_queue_push_pop )
332         CDSSTRESS_FCDeque_HeavyValue( value_for_fc_with_heavy_value )
333 //    CDSSTRESS_FCDeque( old_queue_push_pop )
334 //    CDSSTRESS_RWQueue( old_queue_push_pop )
335 //    CDSSTRESS_StdQueue( old_queue_push_pop )
336
337 #undef CDSSTRESS_Queue_F
338 #define CDSSTRESS_Queue_F( test_fixture, type_name, level ) \
339     TEST_F( test_fixture, type_name ) \
340     { \
341         if ( !check_detail_level( level )) return; \
342         typedef queue::Types< value_type >::type_name queue_type; \
343         queue_type queue( s_nQueueSize ); \
344         test( queue ); \
345     }
346
347     CDSSTRESS_VyukovQueue( old_queue_push_pop )
348
349 #undef CDSSTRESS_Queue_F
350
351
352     // ********************************************************************
353     // SegmentedQueue test
354
355     class segmented_queue_push_pop
356         : public queue_push_pop<>
357         , public ::testing::WithParamInterface< size_t >
358     {
359         typedef queue_push_pop<> base_class;
360
361     protected:
362
363         template <typename Queue>
364         void test()
365         {
366             size_t quasi_factor = GetParam();
367
368             Queue q( quasi_factor );
369             propout() << std::make_pair( "quasi_factor", quasi_factor );
370             base_class::test_queue( q );
371             analyze( q, quasi_factor * 2, quasi_factor );
372             propout() << q.statistics();
373         }
374
375     public:
376         static std::vector< size_t > get_test_parameters()
377         {
378             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
379             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
380             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
381
382             std::vector<size_t> args;
383             if ( bIterative && quasi_factor > 4 ) {
384                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
385                     args.push_back( qf );
386             }
387             else {
388                 if ( quasi_factor > 2 )
389                     args.push_back( quasi_factor );
390                 else
391                     args.push_back( 2 );
392             }
393
394             return args;
395         }
396     };
397
398 #define CDSSTRESS_Queue_F( test_fixture, type_name, level ) \
399     TEST_P( test_fixture, type_name ) \
400     { \
401         if ( !check_detail_level( level )) return; \
402         typedef typename queue::Types<value_type>::type_name queue_type; \
403         test< queue_type >(); \
404     }
405
406     CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
407
408     INSTANTIATE_TEST_CASE_P( SQ,
409         segmented_queue_push_pop,
410         ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()));
411
412 } // namespace