Merge branch 'master' into dev
[libcds.git] / test / stress / queue / push_pop.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "queue_type.h"
32
33 #include <vector>
34 #include <algorithm>
35 #include <type_traits>
36
37 // Multi-threaded queue push/pop test
38 namespace {
39
40     static size_t s_nConsumerThreadCount = 4;
41     static size_t s_nProducerThreadCount = 4;
42     static size_t s_nQueueSize = 4000000;
43     static size_t s_nHeavyValueSize = 100;
44
45     static std::atomic<size_t> s_nProducerDone( 0 );
46
47     struct old_value
48     {
49         size_t nNo;
50         size_t nWriterNo;
51     };
52
53     template<class Value = old_value>
54     class queue_push_pop: public cds_test::stress_fixture
55     {
56     protected:
57        using value_type = Value;
58
59         enum {
60             producer_thread,
61             consumer_thread
62         };
63
64         template <class Queue>
65         class Producer: public cds_test::thread
66         {
67             typedef cds_test::thread base_class;
68
69         public:
70             Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
71                 : base_class( pool, producer_thread )
72                 , m_Queue( queue )
73                 , m_nPushFailed( 0 )
74                 , m_nPushCount( nPushCount )
75             {}
76
77             Producer( Producer& src )
78                 : base_class( src )
79                 , m_Queue( src.m_Queue )
80                 , m_nPushFailed( 0 )
81                 , m_nPushCount( src.m_nPushCount )
82             {}
83
84             virtual thread * clone()
85             {
86                 return new Producer( *this );
87             }
88
89             virtual void test()
90             {
91                 size_t const nPushCount = m_nPushCount;
92                 value_type v;
93                 v.nWriterNo = id();
94                 v.nNo = 0;
95                 m_nPushFailed = 0;
96
97                 while ( v.nNo < nPushCount ) {
98                     if ( m_Queue.push( v ))
99                         ++v.nNo;
100                     else
101                         ++m_nPushFailed;
102                 }
103
104                 s_nProducerDone.fetch_add( 1 );
105             }
106
107         public:
108             Queue&              m_Queue;
109             size_t              m_nPushFailed;
110             size_t const        m_nPushCount;
111         };
112
113         template <class Queue>
114         class Consumer: public cds_test::thread
115         {
116             typedef cds_test::thread base_class;
117
118         public:
119             Queue&              m_Queue;
120             size_t const        m_nPushPerProducer;
121             size_t              m_nPopEmpty;
122             size_t              m_nPopped;
123             size_t              m_nBadWriter;
124
125             typedef std::vector<size_t> popped_data;
126             typedef std::vector<size_t>::iterator       data_iterator;
127             typedef std::vector<size_t>::const_iterator const_data_iterator;
128
129             std::vector<popped_data>        m_WriterData;
130
131         private:
132             void initPoppedData()
133             {
134                 const size_t nProducerCount = s_nProducerThreadCount;
135                 m_WriterData.resize( nProducerCount );
136                 for ( size_t i = 0; i < nProducerCount; ++i )
137                     m_WriterData[i].reserve( m_nPushPerProducer );
138             }
139
140         public:
141             Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
142                 : base_class( pool, consumer_thread )
143                 , m_Queue( queue )
144                 , m_nPushPerProducer( nPushPerProducer )
145                 , m_nPopEmpty( 0 )
146                 , m_nPopped( 0 )
147                 , m_nBadWriter( 0 )
148             {
149                 initPoppedData();
150             }
151             Consumer( Consumer& src )
152                 : base_class( src )
153                 , m_Queue( src.m_Queue )
154                 , m_nPushPerProducer( src.m_nPushPerProducer )
155                 , m_nPopEmpty( 0 )
156                 , m_nPopped( 0 )
157                 , m_nBadWriter( 0 )
158             {
159                 initPoppedData();
160             }
161
162             virtual thread * clone()
163             {
164                 return new Consumer( *this );
165             }
166
167             virtual void test()
168             {
169                 m_nPopEmpty = 0;
170                 m_nPopped = 0;
171                 m_nBadWriter = 0;
172                 const size_t nTotalWriters = s_nProducerThreadCount;
173                 value_type v;
174                 while ( true ) {
175                     if ( m_Queue.pop( v )) {
176                         ++m_nPopped;
177                         if ( v.nWriterNo < nTotalWriters )
178                             m_WriterData[ v.nWriterNo ].push_back( v.nNo );
179                         else
180                             ++m_nBadWriter;
181                     }
182                     else {
183                         ++m_nPopEmpty;
184
185                         if ( s_nProducerDone.load() >= nTotalWriters ) {
186                             if ( m_Queue.empty())
187                                 break;
188                         }
189                     }
190                 }
191             }
192         };
193
194     protected:
195         size_t m_nThreadPushCount;
196
197     protected:
198         template <class Queue>
199         void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
200         {
201             cds_test::thread_pool& pool = get_pool();
202
203             typedef Consumer<Queue> Consumer;
204             typedef Producer<Queue> Producer;
205
206             size_t nPostTestPops = 0;
207             {
208                 value_type v;
209                 while ( q.pop( v ))
210                     ++nPostTestPops;
211             }
212
213             size_t nTotalPops = 0;
214             size_t nPopFalse = 0;
215             size_t nPoppedItems = 0;
216             size_t nPushFailed = 0;
217
218             std::vector< Consumer * > arrConsumer;
219
220             for ( size_t i = 0; i < pool.size(); ++i ) {
221                 cds_test::thread& thr = pool.get(i);
222                 if ( thr.type() == consumer_thread ) {
223                     Consumer& consumer = static_cast<Consumer&>( thr );
224                     nTotalPops += consumer.m_nPopped;
225                     nPopFalse += consumer.m_nPopEmpty;
226                     arrConsumer.push_back( &consumer );
227                     EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer_thread_no " << i;
228
229                     size_t nPopped = 0;
230                     for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
231                         nPopped += consumer.m_WriterData[n].size();
232
233                     nPoppedItems += nPopped;
234                 }
235                 else {
236                     assert( thr.type() == producer_thread );
237
238                     Producer& producer = static_cast<Producer&>( thr );
239                     nPushFailed += producer.m_nPushFailed;
240                     EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer_thread_no " << i;
241                 }
242             }
243             EXPECT_EQ( nTotalPops, nPoppedItems );
244
245             EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
246             EXPECT_TRUE( q.empty());
247
248             // Test consistency of popped sequence
249             for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
250                 std::vector<size_t> arrData;
251                 arrData.reserve( m_nThreadPushCount );
252                 for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
253                     auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
254                     auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
255                     if ( it != itEnd ) {
256                         auto itPrev = it;
257                         for ( ++it; it != itEnd; ++it ) {
258                             EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
259                             itPrev = it;
260                         }
261                     }
262
263                     for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
264                         arrData.push_back( *it );
265                 }
266
267                 std::sort( arrData.begin(), arrData.end());
268                 for ( size_t i=1; i < arrData.size(); ++i ) {
269                     EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
270                 }
271
272                 EXPECT_EQ( arrData[0], 0u ) << "producer=" << nWriter;
273                 EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
274             }
275         }
276
277         template <class Queue>
278         void test_queue( Queue& q )
279         {
280             m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
281
282             cds_test::thread_pool& pool = get_pool();
283             pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
284             pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
285
286             s_nProducerDone.store( 0 );
287             s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
288
289             propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
290                 << std::make_pair( "consumer_count", s_nConsumerThreadCount )
291                 << std::make_pair( "push_count", s_nQueueSize );
292
293             std::chrono::milliseconds duration = pool.run();
294
295             propout() << std::make_pair( "duration", duration );
296         }
297
298         template <class Queue>
299         void test( Queue& q )
300         {
301             test_queue( q );
302             analyze( q );
303             propout() << q.statistics();
304         }
305
306     private:
307         static void set_array_size( size_t size ) {
308             const bool tmp = fc_test::has_set_array_size<value_type>::value;
309             set_array_size(size, std::integral_constant<bool, tmp>());
310         }
311
312         static void set_array_size(size_t size, std::true_type){
313             value_type::set_array_size(size);
314         }
315
316         static void set_array_size(size_t, std::false_type)
317         {
318         }
319
320     public:
321         static void SetUpTestCase()
322         {
323             cds_test::config const& cfg = get_config( "queue_push_pop" );
324
325             s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
326             s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
327             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
328             s_nHeavyValueSize = cfg.get_size_t( "HeavyValueSize", s_nHeavyValueSize );
329
330             if ( s_nConsumerThreadCount == 0u )
331                 s_nConsumerThreadCount = 1;
332             if ( s_nProducerThreadCount == 0u )
333                 s_nProducerThreadCount = 1;
334             if ( s_nQueueSize == 0u )
335                 s_nQueueSize = 1000;
336             if ( s_nHeavyValueSize == 0 )
337                 s_nHeavyValueSize = 1;
338
339             set_array_size( s_nHeavyValueSize );
340         }
341
342         //static void TearDownTestCase();
343     };
344
345     using fc_with_heavy_value = queue_push_pop< fc_test::heavy_value<36000> >;
346     using simple_queue_push_pop = queue_push_pop<>;
347
348     CDSSTRESS_MSQueue( simple_queue_push_pop )
349     CDSSTRESS_MoirQueue( simple_queue_push_pop )
350     CDSSTRESS_BasketQueue( simple_queue_push_pop )
351     CDSSTRESS_OptimsticQueue( simple_queue_push_pop )
352     CDSSTRESS_FCQueue( simple_queue_push_pop )
353     CDSSTRESS_FCDeque( simple_queue_push_pop )
354     CDSSTRESS_FCDeque_HeavyValue( fc_with_heavy_value )
355     CDSSTRESS_RWQueue( simple_queue_push_pop )
356     CDSSTRESS_StdQueue( simple_queue_push_pop )
357
358 #undef CDSSTRESS_Queue_F
359 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
360     TEST_F( test_fixture, type_name ) \
361     { \
362         typedef queue::Types< value_type >::type_name queue_type; \
363         queue_type queue( s_nQueueSize ); \
364         test( queue ); \
365     }
366
367     CDSSTRESS_VyukovQueue( simple_queue_push_pop )
368
369 #undef CDSSTRESS_Queue_F
370
371
372     // ********************************************************************
373     // SegmentedQueue test
374
375     class segmented_queue_push_pop
376         : public queue_push_pop<>
377         , public ::testing::WithParamInterface< size_t >
378     {
379         typedef queue_push_pop<> base_class;
380
381     protected:
382
383         template <typename Queue>
384         void test()
385         {
386             size_t quasi_factor = GetParam();
387
388             Queue q( quasi_factor );
389             propout() << std::make_pair( "quasi_factor", quasi_factor );
390             base_class::test_queue( q );
391             analyze( q, quasi_factor * 2, quasi_factor );
392             propout() << q.statistics();
393         }
394
395     public:
396         static std::vector< size_t > get_test_parameters()
397         {
398             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
399             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
400             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
401
402             std::vector<size_t> args;
403             if ( bIterative && quasi_factor > 4 ) {
404                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
405                     args.push_back( qf );
406             } else {
407                 if ( quasi_factor > 2 )
408                     args.push_back( quasi_factor );
409                 else
410                     args.push_back( 2 );
411             }
412
413             return args;
414         }
415     };
416
417 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
418     TEST_P( test_fixture, type_name ) \
419     { \
420         typedef typename queue::Types<value_type>::type_name queue_type; \
421         test< queue_type >(); \
422     }
423
424     CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
425
426 #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
427     static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
428     {
429         return std::to_string( p.param );
430     }
431     INSTANTIATE_TEST_CASE_P( SQ,
432         segmented_queue_push_pop,
433         ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()), get_test_parameter_name );
434 #else
435     INSTANTIATE_TEST_CASE_P( SQ,
436         segmented_queue_push_pop,
437         ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()) );
438 #endif
439
440 } // namespace