b12a19982c3dfab794fe55c27e620344890ee6dc
[libcds.git] / test / stress / queue / push_pop.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "queue_type.h"
32
33 #include <vector>
34 #include <algorithm>
35 #include <type_traits>
36
37 // Multi-threaded queue push/pop test
38 namespace {
39
40     static size_t s_nConsumerThreadCount = 4;
41     static size_t s_nProducerThreadCount = 4;
42     static size_t s_nQueueSize = 4000000;
43     static size_t s_nVyukovQueueSize = 40000;
44     static size_t s_nHeavyValueSize = 100;
45
46     static std::atomic<size_t> s_nProducerDone( 0 );
47
48     struct old_value
49     {
50         size_t nNo;
51         size_t nWriterNo;
52     };
53
54     template<class Value = old_value>
55     class queue_push_pop: public cds_test::stress_fixture
56     {
57     protected:
58        using value_type = Value;
59
60         enum {
61             producer_thread,
62             consumer_thread
63         };
64
65         template <class Queue>
66         class Producer: public cds_test::thread
67         {
68             typedef cds_test::thread base_class;
69
70         public:
71             Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
72                 : base_class( pool, producer_thread )
73                 , m_Queue( queue )
74                 , m_nPushFailed( 0 )
75                 , m_nPushCount( nPushCount )
76             {}
77
78             Producer( Producer& src )
79                 : base_class( src )
80                 , m_Queue( src.m_Queue )
81                 , m_nPushFailed( 0 )
82                 , m_nPushCount( src.m_nPushCount )
83             {}
84
85             virtual thread * clone()
86             {
87                 return new Producer( *this );
88             }
89
90             virtual void test()
91             {
92                 size_t const nPushCount = m_nPushCount;
93                 value_type v;
94                 v.nWriterNo = id();
95                 v.nNo = 0;
96                 m_nPushFailed = 0;
97
98                 while ( v.nNo < nPushCount ) {
99                     if ( m_Queue.push( v ))
100                         ++v.nNo;
101                     else
102                         ++m_nPushFailed;
103                 }
104
105                 s_nProducerDone.fetch_add( 1 );
106             }
107
108         public:
109             Queue&              m_Queue;
110             size_t              m_nPushFailed;
111             size_t const        m_nPushCount;
112         };
113
114         template <class Queue>
115         class Consumer: public cds_test::thread
116         {
117             typedef cds_test::thread base_class;
118
119         public:
120             Queue&              m_Queue;
121             size_t const        m_nPushPerProducer;
122             size_t              m_nPopEmpty;
123             size_t              m_nPopped;
124             size_t              m_nBadWriter;
125
126             typedef std::vector<size_t> popped_data;
127             typedef std::vector<size_t>::iterator       data_iterator;
128             typedef std::vector<size_t>::const_iterator const_data_iterator;
129
130             std::vector<popped_data>        m_WriterData;
131
132         private:
133             void initPoppedData()
134             {
135                 const size_t nProducerCount = s_nProducerThreadCount;
136                 m_WriterData.resize( nProducerCount );
137                 for ( size_t i = 0; i < nProducerCount; ++i )
138                     m_WriterData[i].reserve( m_nPushPerProducer );
139             }
140
141         public:
142             Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
143                 : base_class( pool, consumer_thread )
144                 , m_Queue( queue )
145                 , m_nPushPerProducer( nPushPerProducer )
146                 , m_nPopEmpty( 0 )
147                 , m_nPopped( 0 )
148                 , m_nBadWriter( 0 )
149             {
150                 initPoppedData();
151             }
152             Consumer( Consumer& src )
153                 : base_class( src )
154                 , m_Queue( src.m_Queue )
155                 , m_nPushPerProducer( src.m_nPushPerProducer )
156                 , m_nPopEmpty( 0 )
157                 , m_nPopped( 0 )
158                 , m_nBadWriter( 0 )
159             {
160                 initPoppedData();
161             }
162
163             virtual thread * clone()
164             {
165                 return new Consumer( *this );
166             }
167
168             virtual void test()
169             {
170                 m_nPopEmpty = 0;
171                 m_nPopped = 0;
172                 m_nBadWriter = 0;
173                 const size_t nTotalWriters = s_nProducerThreadCount;
174                 value_type v;
175                 while ( true ) {
176                     if ( m_Queue.pop( v )) {
177                         ++m_nPopped;
178                         if ( v.nWriterNo < nTotalWriters )
179                             m_WriterData[ v.nWriterNo ].push_back( v.nNo );
180                         else
181                             ++m_nBadWriter;
182                     }
183                     else {
184                         ++m_nPopEmpty;
185
186                         if ( s_nProducerDone.load() >= nTotalWriters ) {
187                             if ( m_Queue.empty())
188                                 break;
189                         }
190                     }
191                 }
192             }
193         };
194
195     protected:
196         size_t m_nThreadPushCount;
197
198     protected:
199         template <class Queue>
200         void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
201         {
202             cds_test::thread_pool& pool = get_pool();
203
204             typedef Consumer<Queue> consumer_type;
205             typedef Producer<Queue> producer_type;
206
207             size_t nPostTestPops = 0;
208             {
209                 value_type v;
210                 while ( q.pop( v ))
211                     ++nPostTestPops;
212             }
213
214             size_t nTotalPops = 0;
215             size_t nPopFalse = 0;
216             size_t nPoppedItems = 0;
217             size_t nPushFailed = 0;
218
219             std::vector< consumer_type * > arrConsumer;
220
221             for ( size_t i = 0; i < pool.size(); ++i ) {
222                 cds_test::thread& thr = pool.get(i);
223                 if ( thr.type() == consumer_thread ) {
224                     consumer_type& consumer = static_cast<consumer_type&>( thr );
225                     nTotalPops += consumer.m_nPopped;
226                     nPopFalse += consumer.m_nPopEmpty;
227                     arrConsumer.push_back( &consumer );
228                     EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer_thread_no " << i;
229
230                     size_t nPopped = 0;
231                     for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
232                         nPopped += consumer.m_WriterData[n].size();
233
234                     nPoppedItems += nPopped;
235                 }
236                 else {
237                     assert( thr.type() == producer_thread );
238
239                     producer_type& producer = static_cast<producer_type&>( thr );
240                     nPushFailed += producer.m_nPushFailed;
241                     EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer_thread_no " << i;
242                 }
243             }
244             EXPECT_EQ( nTotalPops, nPoppedItems );
245
246             EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
247             EXPECT_TRUE( q.empty());
248
249             // Test consistency of popped sequence
250             for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
251                 std::vector<size_t> arrData;
252                 arrData.reserve( m_nThreadPushCount );
253                 for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
254                     auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
255                     auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
256                     if ( it != itEnd ) {
257                         auto itPrev = it;
258                         for ( ++it; it != itEnd; ++it ) {
259                             EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
260                             itPrev = it;
261                         }
262                     }
263
264                     for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
265                         arrData.push_back( *it );
266                 }
267
268                 std::sort( arrData.begin(), arrData.end());
269                 for ( size_t i=1; i < arrData.size(); ++i ) {
270                     EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
271                 }
272
273                 EXPECT_EQ( arrData[0], 0u ) << "producer=" << nWriter;
274                 EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
275             }
276         }
277
278         template <class Queue>
279         void test_queue( Queue& q )
280         {
281             m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
282
283             cds_test::thread_pool& pool = get_pool();
284             pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
285             pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
286
287             s_nProducerDone.store( 0 );
288             s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
289
290             propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
291                 << std::make_pair( "consumer_count", s_nConsumerThreadCount )
292                 << std::make_pair( "push_count", s_nQueueSize );
293
294             std::chrono::milliseconds duration = pool.run();
295
296             propout() << std::make_pair( "duration", duration );
297         }
298
299         template <class Queue>
300         void test( Queue& q )
301         {
302             test_queue( q );
303             analyze( q );
304             propout() << q.statistics();
305         }
306
307     private:
308         static void set_array_size( size_t size ) {
309             const bool tmp = fc_test::has_set_array_size<value_type>::value;
310             set_array_size(size, std::integral_constant<bool, tmp>());
311         }
312
313         static void set_array_size(size_t size, std::true_type){
314             value_type::set_array_size(size);
315         }
316
317         static void set_array_size(size_t, std::false_type)
318         {
319         }
320
321     public:
322         static void SetUpTestCase()
323         {
324             cds_test::config const& cfg = get_config( "queue_push_pop" );
325
326             s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
327             s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
328             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
329             s_nVyukovQueueSize = cfg.get_size_t( "VyukovQueueSize", s_nVyukovQueueSize );
330             s_nHeavyValueSize = cfg.get_size_t( "HeavyValueSize", s_nHeavyValueSize );
331
332             if ( s_nConsumerThreadCount == 0u )
333                 s_nConsumerThreadCount = 1;
334             if ( s_nProducerThreadCount == 0u )
335                 s_nProducerThreadCount = 1;
336             if ( s_nQueueSize == 0u )
337                 s_nQueueSize = 1000;
338             if ( s_nHeavyValueSize == 0 )
339                 s_nHeavyValueSize = 1;
340
341             set_array_size( s_nHeavyValueSize );
342         }
343
344         //static void TearDownTestCase();
345     };
346
347     using fc_with_heavy_value = queue_push_pop< fc_test::heavy_value<36000> >;
348     using simple_queue_push_pop = queue_push_pop<>;
349
350     CDSSTRESS_MSQueue( simple_queue_push_pop )
351     CDSSTRESS_MoirQueue( simple_queue_push_pop )
352     CDSSTRESS_BasketQueue( simple_queue_push_pop )
353     CDSSTRESS_OptimsticQueue( simple_queue_push_pop )
354     CDSSTRESS_RWQueue( simple_queue_push_pop )
355
356 #undef CDSSTRESS_Queue_F
357 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
358     TEST_F( test_fixture, type_name ) \
359     { \
360         size_t old_queue_size = s_nQueueSize; \
361         s_nQueueSize = s_nVyukovQueueSize; \
362         typedef queue::Types< value_type >::type_name queue_type; \
363         queue_type queue( s_nQueueSize ); \
364         test( queue ); \
365         s_nQueueSize = old_queue_size; \
366     }
367
368     CDSSTRESS_VyukovQueue( simple_queue_push_pop )
369
370 #undef CDSSTRESS_Queue_F
371
372
373     // ********************************************************************
374     // SegmentedQueue test
375
376     class segmented_queue_push_pop
377         : public queue_push_pop<>
378         , public ::testing::WithParamInterface< size_t >
379     {
380         typedef queue_push_pop<> base_class;
381
382     protected:
383
384         template <typename Queue>
385         void test()
386         {
387             size_t quasi_factor = GetParam();
388
389             Queue q( quasi_factor );
390             propout() << std::make_pair( "quasi_factor", quasi_factor );
391             base_class::test_queue( q );
392             analyze( q, quasi_factor * 2, quasi_factor );
393             propout() << q.statistics();
394         }
395
396     public:
397         static std::vector< size_t > get_test_parameters()
398         {
399             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
400             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
401             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
402
403             std::vector<size_t> args;
404             if ( bIterative && quasi_factor > 4 ) {
405                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
406                     args.push_back( qf );
407             } else {
408                 if ( quasi_factor > 2 )
409                     args.push_back( quasi_factor );
410                 else
411                     args.push_back( 2 );
412             }
413
414             return args;
415         }
416     };
417
418 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
419     TEST_P( test_fixture, type_name ) \
420     { \
421         typedef typename queue::Types<value_type>::type_name queue_type; \
422         test< queue_type >(); \
423     }
424
425     CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
426
427 #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
428     static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
429     {
430         return std::to_string( p.param );
431     }
432     INSTANTIATE_TEST_CASE_P( SQ,
433         segmented_queue_push_pop,
434         ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()), get_test_parameter_name );
435 #else
436     INSTANTIATE_TEST_CASE_P( SQ,
437         segmented_queue_push_pop,
438         ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()));
439 #endif
440
441 } // namespace