Updated copyright
[libcds.git] / test / stress / queue / push_pop.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "queue_type.h"
32
33 #include <vector>
34 #include <algorithm>
35
36 // Multi-threaded queue push/pop test
37 namespace {
38
39     static size_t s_nConsumerThreadCount = 4;
40     static size_t s_nProducerThreadCount = 4;
41     static size_t s_nQueueSize = 4000000;
42
43     static std::atomic<size_t> s_nProducerDone( 0 );
44
45     class queue_push_pop: public cds_test::stress_fixture
46     {
47     protected:
48         struct value_type
49         {
50             size_t      nNo;
51             size_t      nWriterNo;
52         };
53
54         enum {
55             producer_thread,
56             consumer_thread
57         };
58
59         template <class Queue>
60         class Producer: public cds_test::thread
61         {
62             typedef cds_test::thread base_class;
63
64         public:
65             Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
66                 : base_class( pool, producer_thread )
67                 , m_Queue( queue )
68                 , m_nPushFailed( 0 )
69                 , m_nPushCount( nPushCount )
70             {}
71
72             Producer( Producer& src )
73                 : base_class( src )
74                 , m_Queue( src.m_Queue )
75                 , m_nPushFailed( 0 )
76                 , m_nPushCount( src.m_nPushCount )
77             {}
78
79             virtual thread * clone()
80             {
81                 return new Producer( *this );
82             }
83
84             virtual void test()
85             {
86                 size_t const nPushCount = m_nPushCount;
87                 value_type v;
88                 v.nWriterNo = id();
89                 v.nNo = 0;
90                 m_nPushFailed = 0;
91
92                 while ( v.nNo < nPushCount ) {
93                     if ( m_Queue.push( v ))
94                         ++v.nNo;
95                     else
96                         ++m_nPushFailed;
97                 }
98
99                 s_nProducerDone.fetch_add( 1 );
100             }
101
102         public:
103             Queue&              m_Queue;
104             size_t              m_nPushFailed;
105             size_t const        m_nPushCount;
106         };
107
108         template <class Queue>
109         class Consumer: public cds_test::thread
110         {
111             typedef cds_test::thread base_class;
112
113         public:
114             Queue&              m_Queue;
115             size_t const        m_nPushPerProducer;
116             size_t              m_nPopEmpty;
117             size_t              m_nPopped;
118             size_t              m_nBadWriter;
119
120             typedef std::vector<size_t> popped_data;
121             typedef std::vector<size_t>::iterator       data_iterator;
122             typedef std::vector<size_t>::const_iterator const_data_iterator;
123
124             std::vector<popped_data>        m_WriterData;
125
126         private:
127             void initPoppedData()
128             {
129                 const size_t nProducerCount = s_nProducerThreadCount;
130                 m_WriterData.resize( nProducerCount );
131                 for ( size_t i = 0; i < nProducerCount; ++i )
132                     m_WriterData[i].reserve( m_nPushPerProducer );
133             }
134
135         public:
136             Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
137                 : base_class( pool, consumer_thread )
138                 , m_Queue( queue )
139                 , m_nPushPerProducer( nPushPerProducer )
140                 , m_nPopEmpty( 0 )
141                 , m_nPopped( 0 )
142                 , m_nBadWriter( 0 )
143             {
144                 initPoppedData();
145             }
146             Consumer( Consumer& src )
147                 : base_class( src )
148                 , m_Queue( src.m_Queue )
149                 , m_nPushPerProducer( src.m_nPushPerProducer )
150                 , m_nPopEmpty( 0 )
151                 , m_nPopped( 0 )
152                 , m_nBadWriter( 0 )
153             {
154                 initPoppedData();
155             }
156
157             virtual thread * clone()
158             {
159                 return new Consumer( *this );
160             }
161
162             virtual void test()
163             {
164                 m_nPopEmpty = 0;
165                 m_nPopped = 0;
166                 m_nBadWriter = 0;
167                 const size_t nTotalWriters = s_nProducerThreadCount;
168                 value_type v;
169                 while ( true ) {
170                     if ( m_Queue.pop( v )) {
171                         ++m_nPopped;
172                         if ( v.nWriterNo < nTotalWriters )
173                             m_WriterData[ v.nWriterNo ].push_back( v.nNo );
174                         else
175                             ++m_nBadWriter;
176                     }
177                     else {
178                         ++m_nPopEmpty;
179
180                         if ( s_nProducerDone.load() >= nTotalWriters ) {
181                             if ( m_Queue.empty())
182                                 break;
183                         }
184                     }
185                 }
186             }
187         };
188
189     protected:
190         size_t m_nThreadPushCount;
191
192     protected:
193         template <class Queue>
194         void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
195         {
196             cds_test::thread_pool& pool = get_pool();
197
198             typedef Consumer<Queue> Consumer;
199             typedef Producer<Queue> Producer;
200
201             size_t nPostTestPops = 0;
202             {
203                 value_type v;
204                 while ( q.pop( v ))
205                     ++nPostTestPops;
206             }
207
208             size_t nTotalPops = 0;
209             size_t nPopFalse = 0;
210             size_t nPoppedItems = 0;
211             size_t nPushFailed = 0;
212
213             std::vector< Consumer * > arrConsumer;
214
215             for ( size_t i = 0; i < pool.size(); ++i ) {
216                 cds_test::thread& thr = pool.get(i);
217                 if ( thr.type() == consumer_thread ) {
218                     Consumer& consumer = static_cast<Consumer&>( thr );
219                     nTotalPops += consumer.m_nPopped;
220                     nPopFalse += consumer.m_nPopEmpty;
221                     arrConsumer.push_back( &consumer );
222                     EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer_thread_no " << i;
223
224                     size_t nPopped = 0;
225                     for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
226                         nPopped += consumer.m_WriterData[n].size();
227
228                     nPoppedItems += nPopped;
229                 }
230                 else {
231                     assert( thr.type() == producer_thread );
232
233                     Producer& producer = static_cast<Producer&>( thr );
234                     nPushFailed += producer.m_nPushFailed;
235                     EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer_thread_no " << i;
236                 }
237             }
238             EXPECT_EQ( nTotalPops, nPoppedItems );
239
240             EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
241             EXPECT_TRUE( q.empty());
242
243             // Test consistency of popped sequence
244             for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
245                 std::vector<size_t> arrData;
246                 arrData.reserve( m_nThreadPushCount );
247                 for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
248                     auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
249                     auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
250                     if ( it != itEnd ) {
251                         auto itPrev = it;
252                         for ( ++it; it != itEnd; ++it ) {
253                             EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
254                             itPrev = it;
255                         }
256                     }
257
258                     for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
259                         arrData.push_back( *it );
260                 }
261
262                 std::sort( arrData.begin(), arrData.end());
263                 for ( size_t i=1; i < arrData.size(); ++i ) {
264                     EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
265                 }
266
267                 EXPECT_EQ( arrData[0], 0u ) << "producer=" << nWriter;
268                 EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
269             }
270         }
271
272         template <class Queue>
273         void test_queue( Queue& q )
274         {
275             m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
276
277             cds_test::thread_pool& pool = get_pool();
278             pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
279             pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
280
281             s_nProducerDone.store( 0 );
282             s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
283
284             propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
285                 << std::make_pair( "consumer_count", s_nConsumerThreadCount )
286                 << std::make_pair( "push_count", s_nQueueSize );
287
288             std::chrono::milliseconds duration = pool.run();
289
290             propout() << std::make_pair( "duration", duration );
291         }
292
293         template <class Queue>
294         void test( Queue& q )
295         {
296             test_queue( q );
297             analyze( q );
298             propout() << q.statistics();
299         }
300
301     public:
302         static void SetUpTestCase()
303         {
304             cds_test::config const& cfg = get_config( "queue_push_pop" );
305
306             s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
307             s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
308             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
309
310             if ( s_nConsumerThreadCount == 0u )
311                 s_nConsumerThreadCount = 1;
312             if ( s_nProducerThreadCount == 0u )
313                 s_nProducerThreadCount = 1;
314             if ( s_nQueueSize == 0u )
315                 s_nQueueSize = 1000;
316         }
317
318         //static void TearDownTestCase();
319     };
320
321     CDSSTRESS_MSQueue( queue_push_pop )
322     CDSSTRESS_MoirQueue( queue_push_pop )
323     CDSSTRESS_BasketQueue( queue_push_pop )
324     CDSSTRESS_OptimsticQueue( queue_push_pop )
325     CDSSTRESS_FCQueue( queue_push_pop )
326     CDSSTRESS_FCDeque( queue_push_pop )
327     CDSSTRESS_RWQueue( queue_push_pop )
328     CDSSTRESS_StdQueue( queue_push_pop )
329
330 #undef CDSSTRESS_Queue_F
331 #define CDSSTRESS_Queue_F( test_fixture, type_name, level ) \
332     TEST_F( test_fixture, type_name ) \
333     { \
334         if ( !check_detail_level( level )) return; \
335         typedef queue::Types< value_type >::type_name queue_type; \
336         queue_type queue( s_nQueueSize ); \
337         test( queue ); \
338     }
339
340     CDSSTRESS_VyukovQueue( queue_push_pop )
341
342 #undef CDSSTRESS_Queue_F
343
344
345     // ********************************************************************
346     // SegmentedQueue test
347
348     class segmented_queue_push_pop
349         : public queue_push_pop
350         , public ::testing::WithParamInterface< size_t >
351     {
352         typedef queue_push_pop base_class;
353
354     protected:
355
356         template <typename Queue>
357         void test()
358         {
359             size_t quasi_factor = GetParam();
360
361             Queue q( quasi_factor );
362             propout() << std::make_pair( "quasi_factor", quasi_factor );
363             base_class::test_queue( q );
364             analyze( q, quasi_factor * 2, quasi_factor );
365             propout() << q.statistics();
366         }
367
368     public:
369         static std::vector< size_t > get_test_parameters()
370         {
371             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
372             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
373             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
374
375             std::vector<size_t> args;
376             if ( bIterative && quasi_factor > 4 ) {
377                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
378                     args.push_back( qf );
379             }
380             else {
381                 if ( quasi_factor > 2 )
382                     args.push_back( quasi_factor );
383                 else
384                     args.push_back( 2 );
385             }
386
387             return args;
388         }
389     };
390
391 #define CDSSTRESS_Queue_F( test_fixture, type_name, level ) \
392     TEST_P( test_fixture, type_name ) \
393     { \
394         if ( !check_detail_level( level )) return; \
395         typedef typename queue::Types<value_type>::type_name queue_type; \
396         test< queue_type >(); \
397     }
398
399     CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
400
401     INSTANTIATE_TEST_CASE_P( SQ,
402         segmented_queue_push_pop,
403         ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()));
404
405 } // namespace