22b2b35208edab835d6b6b6a3de573273f370b38
[libcds.git] / test / stress / queue / intrusive_push_pop.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "intrusive_queue_type.h"
32 #include <vector>
33 #include <algorithm>
34
35 // Multi-threaded random queue test
36 namespace {
37
38     static size_t s_nReaderThreadCount = 4;
39     static size_t s_nWriterThreadCount = 4;
40     static size_t s_nQueueSize = 4000000;
41
42     static unsigned int s_nFCPassCount = 8;
43     static unsigned int s_nFCCompactFactor = 64;
44
45     static atomics::atomic< size_t > s_nProducerCount(0);
46     static size_t s_nThreadPushCount;
47     static CDS_CONSTEXPR const size_t c_nBadConsumer = 0xbadc0ffe;
48
49     struct empty {};
50
51     template <typename Base = empty >
52     struct value_type: public Base
53     {
54         size_t      nNo;
55         size_t      nWriterNo;
56         size_t      nConsumer;
57     };
58
59     class intrusive_queue_push_pop: public cds_test::stress_fixture
60     {
61         typedef cds_test::stress_fixture base_class;
62
63     protected:
64         enum {
65             producer_thread,
66             consumer_thread
67         };
68
69         template <class Queue>
70         class Producer: public cds_test::thread
71         {
72             typedef cds_test::thread base_class;
73
74         public:
75             Producer( cds_test::thread_pool& pool, Queue& q )
76                 : base_class( pool, producer_thread )
77                 , m_Queue( q )
78             {}
79             Producer( Producer& src )
80                 : base_class( src )
81                 , m_Queue( src.m_Queue )
82             {}
83
84             virtual thread * clone()
85             {
86                 return new Producer( *this );
87             }
88
89             virtual void test()
90             {
91                 size_t i = 0;
92                 for ( typename Queue::value_type * p = m_pStart; p < m_pEnd; ) {
93                     p->nNo = i;
94                     p->nWriterNo = id();
95                     CDS_TSAN_ANNOTATE_HAPPENS_BEFORE( &p->nWriterNo );
96                     if ( m_Queue.push( *p )) {
97                         ++p;
98                         ++i;
99                     }
100                     else
101                         ++m_nPushFailed;
102                 }
103                 s_nProducerCount.fetch_sub( 1, atomics::memory_order_release );
104             }
105
106         public:
107             Queue&              m_Queue;
108             size_t              m_nPushFailed = 0;
109
110             // Interval in m_arrValue
111             typename Queue::value_type *       m_pStart;
112             typename Queue::value_type *       m_pEnd;
113         };
114
115         template <class Queue>
116         class Consumer: public cds_test::thread
117         {
118             typedef cds_test::thread base_class;
119
120         public:
121             Queue&              m_Queue;
122             size_t              m_nPopEmpty = 0;
123             size_t              m_nPopped = 0;
124             size_t              m_nBadWriter = 0;
125
126             typedef std::vector<size_t> TPoppedData;
127             typedef std::vector<size_t>::iterator       data_iterator;
128             typedef std::vector<size_t>::const_iterator const_data_iterator;
129
130             std::vector<TPoppedData>        m_WriterData;
131
132         private:
133             void initPoppedData()
134             {
135                 const size_t nWriterCount = s_nWriterThreadCount;
136                 const size_t nWriterPushCount = s_nThreadPushCount;
137                 m_WriterData.resize( nWriterCount );
138                 for ( size_t i = 0; i < nWriterCount; ++i )
139                     m_WriterData[i].reserve( nWriterPushCount );
140             }
141
142         public:
143             Consumer( cds_test::thread_pool& pool, Queue& q )
144                 : base_class( pool, consumer_thread )
145                 , m_Queue( q )
146             {
147                 initPoppedData();
148             }
149             Consumer( Consumer& src )
150                 : base_class( src )
151                 , m_Queue( src.m_Queue )
152             {
153                 initPoppedData();
154             }
155
156             virtual thread * clone()
157             {
158                 return new Consumer( *this );
159             }
160
161             virtual void test()
162             {
163                 size_t const nTotalWriters = s_nWriterThreadCount;
164
165                 while ( true ) {
166                     typename Queue::value_type * p = m_Queue.pop();
167                     if ( p ) {
168                         p->nConsumer = id();
169                         ++m_nPopped;
170                         CDS_TSAN_ANNOTATE_HAPPENS_AFTER( &p->nWriterNo );
171                         if ( p->nWriterNo < nTotalWriters )
172                             m_WriterData[ p->nWriterNo ].push_back( p->nNo );
173                         else
174                             ++m_nBadWriter;
175                     }
176                     else {
177                         ++m_nPopEmpty;
178                         if ( s_nProducerCount.load( atomics::memory_order_acquire ) == 0 && m_Queue.empty())
179                             break;
180                     }
181                 }
182             }
183         };
184
185         template <typename T>
186         class value_array
187         {
188             std::unique_ptr<T[]> m_pArr;
189         public:
190             value_array( size_t nSize )
191                 : m_pArr( new T[nSize] )
192             {}
193
194             T * get() const { return m_pArr.get(); }
195         };
196
197     public:
198         static void SetUpTestCase()
199         {
200             cds_test::config const& cfg = get_config( "queue_random" );
201
202             s_nReaderThreadCount = cfg.get_size_t( "ReaderCount", s_nReaderThreadCount );
203             s_nWriterThreadCount = cfg.get_size_t( "WriterCount", s_nWriterThreadCount );
204             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
205
206             s_nFCPassCount = cfg.get_uint( "FCPassCount", s_nFCPassCount );
207             s_nFCCompactFactor = cfg.get_uint( "FCCompactFactor", s_nFCCompactFactor );
208
209             if ( s_nReaderThreadCount == 0u )
210                 s_nReaderThreadCount = 1;
211             if ( s_nWriterThreadCount == 0u )
212                 s_nWriterThreadCount = 1;
213             if ( s_nQueueSize == 0u )
214                 s_nQueueSize = 1000;
215         }
216
217         //static void TearDownTestCase();
218
219     protected:
220         template <class Queue>
221         void analyze( Queue& testQueue, size_t /*nLeftOffset*/, size_t nRightOffset )
222         {
223             typedef Consumer<Queue> Reader;
224             typedef Producer<Queue> Writer;
225             typedef typename Reader::const_data_iterator    ReaderIterator;
226
227             size_t nPostTestPops = 0;
228             while ( testQueue.pop())
229                 ++nPostTestPops;
230
231             size_t nTotalPops = 0;
232             size_t nPopFalse = 0;
233             size_t nPoppedItems = 0;
234             size_t nPushFailed = 0;
235
236             std::vector< Reader * > arrReaders;
237
238             cds_test::thread_pool& pool = get_pool();
239             for ( size_t i = 0; i < pool.size(); ++i ) {
240                 cds_test::thread& thr = pool.get( i );
241                 if ( thr.type() == consumer_thread ) {
242                     Consumer<Queue>& consumer = static_cast<Consumer<Queue>&>( thr );
243                     nTotalPops += consumer.m_nPopped;
244                     nPopFalse += consumer.m_nPopEmpty;
245                     arrReaders.push_back( &consumer );
246                     EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer " << (i - s_nWriterThreadCount);
247
248                     size_t nPopped = 0;
249                     for ( size_t n = 0; n < s_nWriterThreadCount; ++n )
250                         nPopped += consumer.m_WriterData[n].size();
251
252                     {
253                         std::stringstream s;
254                         s << "consumer" << (i - s_nWriterThreadCount) << "_popped";
255                         propout() << std::make_pair( s.str().c_str(), nPopped );
256                     }
257                     nPoppedItems += nPopped;
258                 }
259                 else {
260                     Producer<Queue>& producer = static_cast<Producer<Queue>&>( thr );
261                     nPushFailed += producer.m_nPushFailed;
262                     if ( !std::is_base_of<cds::bounded_container, Queue>::value ) {
263                         EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer " << i;
264                     }
265                 }
266             }
267             EXPECT_EQ( nTotalPops, nPoppedItems );
268
269             propout() << std::make_pair( "success_pop", nTotalPops )
270                 << std::make_pair( "empty_pop", nPopFalse )
271                 << std::make_pair( "failed_push", nPushFailed );
272
273             size_t nQueueSize = s_nThreadPushCount * s_nWriterThreadCount;
274             EXPECT_EQ( nTotalPops + nPostTestPops, nQueueSize );
275             EXPECT_TRUE( testQueue.empty());
276
277             // Test that all items have been popped
278             // Test FIFO order
279             for ( size_t nWriter = 0; nWriter < s_nWriterThreadCount; ++nWriter ) {
280                 std::vector<size_t> arrData;
281                 arrData.reserve( s_nThreadPushCount );
282                 for ( size_t nReader = 0; nReader < arrReaders.size(); ++nReader ) {
283                     ReaderIterator it = arrReaders[nReader]->m_WriterData[nWriter].begin();
284                     ReaderIterator itEnd = arrReaders[nReader]->m_WriterData[nWriter].end();
285                     if ( it != itEnd ) {
286                         ReaderIterator itPrev = it;
287                         for ( ++it; it != itEnd; ++it ) {
288                             EXPECT_LT( *itPrev, *it + nRightOffset )
289                                 << "Reader " << nReader << ", Writer " << nWriter << ": prev=" << *itPrev << ", cur=" << *it;
290                             itPrev = it;
291                         }
292                     }
293
294                     for ( it = arrReaders[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
295                         arrData.push_back( *it );
296                 }
297                 std::sort( arrData.begin(), arrData.end());
298                 for ( size_t i=1; i < arrData.size(); ++i ) {
299                     if ( arrData[i-1] + 1 != arrData[i] ) {
300                         EXPECT_EQ( arrData[i-1] + 1,  arrData[i] ) << "Writer " << nWriter << ": [" << (i-1) << "]=" << arrData[i-1]
301                             << ", [" << i << "]=" << arrData[i];
302                     }
303                 }
304
305                 EXPECT_EQ( arrData[0], 0u ) << "Writer " << nWriter;
306                 EXPECT_EQ( arrData[arrData.size() - 1], s_nThreadPushCount - 1 ) << "Writer " << nWriter;
307             }
308         }
309
310         template <class Queue>
311         void test( Queue& q, value_array<typename Queue::value_type>& arrValue, size_t nLeftOffset, size_t nRightOffset )
312         {
313             s_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
314             s_nQueueSize = s_nThreadPushCount * s_nWriterThreadCount;
315             propout() << std::make_pair( "producer_count", s_nWriterThreadCount )
316                 << std::make_pair( "consumer_count", s_nReaderThreadCount )
317                 << std::make_pair( "queue_size", s_nQueueSize );
318
319             typename Queue::value_type * pValStart = arrValue.get();
320             typename Queue::value_type * pValEnd = pValStart + s_nQueueSize;
321
322             cds_test::thread_pool& pool = get_pool();
323             s_nProducerCount.store( s_nWriterThreadCount, atomics::memory_order_release );
324
325             // Writers must be first
326             pool.add( new Producer<Queue>( pool, q ), s_nWriterThreadCount );
327             {
328                 for ( typename Queue::value_type * it = pValStart; it != pValEnd; ++it ) {
329                     it->nNo = 0;
330                     it->nWriterNo = 0;
331                     it->nConsumer = c_nBadConsumer;
332                 }
333
334                 typename Queue::value_type * pStart = pValStart;
335                 for ( size_t i = 0; i < pool.size(); ++i ) {
336                     Producer<Queue>& producer = static_cast<Producer<Queue>&>( pool.get( i ));
337                     producer.m_pStart = pStart;
338                     pStart += s_nThreadPushCount;
339                     producer.m_pEnd = pStart;
340                 }
341             }
342             pool.add( new Consumer<Queue>( pool, q ), s_nReaderThreadCount );
343
344             std::chrono::milliseconds duration = pool.run();
345             propout() << std::make_pair( "duration", duration );
346
347             // Check that all values have been dequeued
348             {
349                 size_t nBadConsumerCount = 0;
350                 typename Queue::value_type * pEnd = pValStart + s_nQueueSize;
351                 for ( typename Queue::value_type * it = pValStart; it != pEnd; ++it  ) {
352                     if ( it->nConsumer == c_nBadConsumer )
353                         ++nBadConsumerCount;
354                 }
355                 EXPECT_EQ( nBadConsumerCount, 0u );
356             }
357
358             analyze( q, nLeftOffset, nRightOffset );
359
360             propout() << q.statistics();
361         }
362     };
363
364 #define CDSSTRESS_QUEUE_F( QueueType, NodeType ) \
365     TEST_F( intrusive_queue_push_pop, QueueType ) \
366     { \
367         typedef value_type<NodeType> node_type; \
368         typedef typename queue::Types< node_type >::QueueType queue_type; \
369         value_array<typename queue_type::value_type> arrValue( s_nQueueSize ); \
370         { \
371             queue_type q; \
372             test( q, arrValue, 0, 0 ); \
373         } \
374         queue_type::gc::force_dispose(); \
375     }
376
377     CDSSTRESS_QUEUE_F( MSQueue_HP,       cds::intrusive::msqueue::node<cds::gc::HP> )
378     CDSSTRESS_QUEUE_F( MSQueue_HP_ic,    cds::intrusive::msqueue::node<cds::gc::HP> )
379     CDSSTRESS_QUEUE_F( MSQueue_HP_stat,  cds::intrusive::msqueue::node<cds::gc::HP> )
380     CDSSTRESS_QUEUE_F( MSQueue_DHP,      cds::intrusive::msqueue::node<cds::gc::DHP> )
381     CDSSTRESS_QUEUE_F( MSQueue_DHP_ic,   cds::intrusive::msqueue::node<cds::gc::DHP> )
382     CDSSTRESS_QUEUE_F( MSQueue_DHP_stat, cds::intrusive::msqueue::node<cds::gc::DHP> )
383
384     CDSSTRESS_QUEUE_F( MoirQueue_HP,       cds::intrusive::msqueue::node<cds::gc::HP> )
385     CDSSTRESS_QUEUE_F( MoirQueue_HP_ic,    cds::intrusive::msqueue::node<cds::gc::HP> )
386     CDSSTRESS_QUEUE_F( MoirQueue_HP_stat,  cds::intrusive::msqueue::node<cds::gc::HP> )
387     CDSSTRESS_QUEUE_F( MoirQueue_DHP,      cds::intrusive::msqueue::node<cds::gc::DHP> )
388     CDSSTRESS_QUEUE_F( MoirQueue_DHP_ic,   cds::intrusive::msqueue::node<cds::gc::DHP> )
389     CDSSTRESS_QUEUE_F( MoirQueue_DHP_stat, cds::intrusive::msqueue::node<cds::gc::DHP> )
390
391     CDSSTRESS_QUEUE_F( OptimisticQueue_HP,       cds::intrusive::optimistic_queue::node<cds::gc::HP> )
392     CDSSTRESS_QUEUE_F( OptimisticQueue_HP_ic,    cds::intrusive::optimistic_queue::node<cds::gc::HP> )
393     CDSSTRESS_QUEUE_F( OptimisticQueue_HP_stat,  cds::intrusive::optimistic_queue::node<cds::gc::HP> )
394     CDSSTRESS_QUEUE_F( OptimisticQueue_DHP,      cds::intrusive::optimistic_queue::node<cds::gc::DHP> )
395     CDSSTRESS_QUEUE_F( OptimisticQueue_DHP_ic,   cds::intrusive::optimistic_queue::node<cds::gc::DHP> )
396     CDSSTRESS_QUEUE_F( OptimisticQueue_DHP_stat, cds::intrusive::optimistic_queue::node<cds::gc::DHP> )
397
398     CDSSTRESS_QUEUE_F( BasketQueue_HP,       cds::intrusive::basket_queue::node<cds::gc::HP> )
399     CDSSTRESS_QUEUE_F( BasketQueue_HP_ic,    cds::intrusive::basket_queue::node<cds::gc::HP> )
400     CDSSTRESS_QUEUE_F( BasketQueue_HP_stat,  cds::intrusive::basket_queue::node<cds::gc::HP> )
401     CDSSTRESS_QUEUE_F( BasketQueue_DHP,      cds::intrusive::basket_queue::node<cds::gc::DHP> )
402     CDSSTRESS_QUEUE_F( BasketQueue_DHP_ic,   cds::intrusive::basket_queue::node<cds::gc::DHP> )
403     CDSSTRESS_QUEUE_F( BasketQueue_DHP_stat, cds::intrusive::basket_queue::node<cds::gc::DHP> )
404 #undef CDSSTRESS_QUEUE_F
405
406
407 #define CDSSTRESS_QUEUE_F( QueueType, NodeType ) \
408     TEST_F( intrusive_queue_push_pop, QueueType ) \
409     { \
410         typedef value_type<NodeType> node_type; \
411         typedef typename queue::Types< node_type >::QueueType queue_type; \
412         value_array<typename queue_type::value_type> arrValue( s_nQueueSize ); \
413         queue_type q( s_nFCCompactFactor, s_nFCPassCount ); \
414         test( q, arrValue, 0, 0 ); \
415     }
416
417     CDSSTRESS_QUEUE_F(FCQueue_list_delay2,                      boost::intrusive::list_base_hook<> )
418     CDSSTRESS_QUEUE_F(FCQueue_list_delay2_elimination,          boost::intrusive::list_base_hook<> )
419     CDSSTRESS_QUEUE_F(FCQueue_list_delay2_elimination_stat,     boost::intrusive::list_base_hook<> )
420     CDSSTRESS_QUEUE_F(FCQueue_list_expbackoff_elimination,      boost::intrusive::list_base_hook<> )
421     CDSSTRESS_QUEUE_F(FCQueue_list_expbackoff_elimination_stat, boost::intrusive::list_base_hook<> )
422     CDSSTRESS_QUEUE_F(FCQueue_list_wait_ss,                     boost::intrusive::list_base_hook<> )
423     CDSSTRESS_QUEUE_F(FCQueue_list_wait_ss_stat,                boost::intrusive::list_base_hook<> )
424     CDSSTRESS_QUEUE_F(FCQueue_list_wait_sm,                     boost::intrusive::list_base_hook<> )
425     CDSSTRESS_QUEUE_F(FCQueue_list_wait_sm_stat,                boost::intrusive::list_base_hook<> )
426     CDSSTRESS_QUEUE_F(FCQueue_list_wait_mm,                     boost::intrusive::list_base_hook<> )
427     CDSSTRESS_QUEUE_F(FCQueue_list_wait_mm_stat,                boost::intrusive::list_base_hook<> )
428 #undef CDSSTRESS_QUEUE_F
429
430
431 #define CDSSTRESS_QUEUE_F( QueueType ) \
432     TEST_F( intrusive_queue_push_pop, QueueType ) \
433     { \
434         typedef typename queue::Types< value_type<> >::QueueType queue_type; \
435         value_array<typename queue_type::value_type> arrValue( s_nQueueSize ); \
436         queue_type q( s_nQueueSize ); \
437         test( q, arrValue, 0, 0 ); \
438     }
439
440     CDSSTRESS_QUEUE_F( VyukovMPMCCycleQueue_dyn )
441     CDSSTRESS_QUEUE_F( VyukovMPMCCycleQueue_dyn_ic )
442 #undef CDSSTRESS_QUEUE_F
443
444
445     // ********************************************************************
446     // SegmentedQueue test
447
448     class intrusive_segmented_queue_push_pop
449         : public intrusive_queue_push_pop
450         , public ::testing::WithParamInterface< size_t >
451     {
452         typedef intrusive_queue_push_pop base_class;
453
454     protected:
455         template <typename Queue>
456         void test()
457         {
458             value_array<typename Queue::value_type> arrValue( s_nQueueSize ); \
459             {
460                 size_t quasi_factor = GetParam();
461
462                 Queue q( quasi_factor );
463                 propout() << std::make_pair( "quasi_factor", quasi_factor );
464
465                 base_class::test( q, arrValue, quasi_factor * 2, quasi_factor );
466             }
467             Queue::gc::force_dispose();
468         }
469
470     public:
471         static std::vector< size_t > get_test_parameters()
472         {
473             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "intrusive_queue_push_pop" );
474             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
475             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
476
477             std::vector<size_t> args;
478             if ( bIterative && quasi_factor > 4 ) {
479                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
480                     args.push_back( qf );
481             }
482             else {
483                 if ( quasi_factor > 2 )
484                     args.push_back( quasi_factor );
485                 else
486                     args.push_back( 2 );
487             }
488
489             return args;
490         }
491     };
492
493 #define CDSSTRESS_QUEUE_F( type_name ) \
494     TEST_P( intrusive_segmented_queue_push_pop, type_name ) \
495     { \
496         typedef typename queue::Types<value_type<>>::type_name queue_type; \
497         test< queue_type >(); \
498     }
499
500     CDSSTRESS_QUEUE_F( SegmentedQueue_HP_spin )
501     //CDSSTRESS_QUEUE_F( SegmentedQueue_HP_spin_padding )
502     CDSSTRESS_QUEUE_F( SegmentedQueue_HP_spin_stat )
503     CDSSTRESS_QUEUE_F( SegmentedQueue_HP_mutex )
504     //CDSSTRESS_QUEUE_F( SegmentedQueue_HP_mutex_padding )
505     CDSSTRESS_QUEUE_F( SegmentedQueue_HP_mutex_stat )
506     CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_spin )
507     //CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_spin_padding )
508     CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_spin_stat )
509     CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_mutex )
510     //CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_mutex_padding )
511     CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_mutex_stat )
512
513     INSTANTIATE_TEST_CASE_P( SQ,
514         intrusive_segmented_queue_push_pop,
515         ::testing::ValuesIn( intrusive_segmented_queue_push_pop::get_test_parameters()));
516
517 } // namespace