Merge branch 'flat_combinig_add_stress_and_unint_tests' of https://github.com/mgalimu...
[libcds.git] / test / stress / queue / intrusive_push_pop.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "intrusive_queue_type.h"
32 #include <vector>
33 #include <algorithm>
34
35 // Multi-threaded random queue test
36 namespace {
37
38     static size_t s_nReaderThreadCount = 4;
39     static size_t s_nWriterThreadCount = 4;
40     static size_t s_nQueueSize = 4000000;
41
42     static unsigned int s_nFCPassCount = 8;
43     static unsigned int s_nFCCompactFactor = 64;
44
45     static atomics::atomic< size_t > s_nProducerCount(0);
46     static size_t s_nThreadPushCount;
47     static CDS_CONSTEXPR const size_t c_nBadConsumer = 0xbadc0ffe;
48
49     struct empty {};
50
51     template <typename Base = empty >
52     struct value_type: public Base
53     {
54         size_t      nNo;
55         size_t      nWriterNo;
56         size_t      nConsumer;
57     };
58
59     class intrusive_queue_push_pop: public cds_test::stress_fixture
60     {
61         typedef cds_test::stress_fixture base_class;
62
63     protected:
64         enum {
65             producer_thread,
66             consumer_thread
67         };
68
69         template <class Queue>
70         class Producer: public cds_test::thread
71         {
72             typedef cds_test::thread base_class;
73
74         public:
75             Producer( cds_test::thread_pool& pool, Queue& q )
76                 : base_class( pool, producer_thread )
77                 , m_Queue( q )
78             {}
79             Producer( Producer& src )
80                 : base_class( src )
81                 , m_Queue( src.m_Queue )
82             {}
83
84             virtual thread * clone()
85             {
86                 return new Producer( *this );
87             }
88
89             virtual void test()
90             {
91                 size_t i = 0;
92                 for ( typename Queue::value_type * p = m_pStart; p < m_pEnd; ) {
93                     p->nNo = i;
94                     p->nWriterNo = id();
95                     CDS_TSAN_ANNOTATE_HAPPENS_BEFORE( &p->nWriterNo );
96                     if ( m_Queue.push( *p )) {
97                         ++p;
98                         ++i;
99                     }
100                     else
101                         ++m_nPushFailed;
102                 }
103                 s_nProducerCount.fetch_sub( 1, atomics::memory_order_release );
104             }
105
106         public:
107             Queue&              m_Queue;
108             size_t              m_nPushFailed = 0;
109
110             // Interval in m_arrValue
111             typename Queue::value_type *       m_pStart;
112             typename Queue::value_type *       m_pEnd;
113         };
114
115         template <class Queue>
116         class Consumer: public cds_test::thread
117         {
118             typedef cds_test::thread base_class;
119
120         public:
121             Queue&              m_Queue;
122             size_t              m_nPopEmpty = 0;
123             size_t              m_nPopped = 0;
124             size_t              m_nBadWriter = 0;
125
126             typedef std::vector<size_t> TPoppedData;
127             typedef std::vector<size_t>::iterator       data_iterator;
128             typedef std::vector<size_t>::const_iterator const_data_iterator;
129
130             std::vector<TPoppedData>        m_WriterData;
131
132         private:
133             void initPoppedData()
134             {
135                 const size_t nWriterCount = s_nWriterThreadCount;
136                 const size_t nWriterPushCount = s_nThreadPushCount;
137                 m_WriterData.resize( nWriterCount );
138                 for ( size_t i = 0; i < nWriterCount; ++i )
139                     m_WriterData[i].reserve( nWriterPushCount );
140             }
141
142         public:
143             Consumer( cds_test::thread_pool& pool, Queue& q )
144                 : base_class( pool, consumer_thread )
145                 , m_Queue( q )
146             {
147                 initPoppedData();
148             }
149             Consumer( Consumer& src )
150                 : base_class( src )
151                 , m_Queue( src.m_Queue )
152             {
153                 initPoppedData();
154             }
155
156             virtual thread * clone()
157             {
158                 return new Consumer( *this );
159             }
160
161             virtual void test()
162             {
163                 size_t const nTotalWriters = s_nWriterThreadCount;
164
165                 while ( true ) {
166                     typename Queue::value_type * p = m_Queue.pop();
167                     if ( p ) {
168                         p->nConsumer = id();
169                         ++m_nPopped;
170                         CDS_TSAN_ANNOTATE_HAPPENS_AFTER( &p->nWriterNo );
171                         if ( p->nWriterNo < nTotalWriters )
172                             m_WriterData[ p->nWriterNo ].push_back( p->nNo );
173                         else
174                             ++m_nBadWriter;
175                     }
176                     else {
177                         ++m_nPopEmpty;
178                         if ( s_nProducerCount.load( atomics::memory_order_acquire ) == 0 && m_Queue.empty())
179                             break;
180                     }
181                 }
182             }
183         };
184
185         template <typename T>
186         class value_array
187         {
188             std::unique_ptr<T[]> m_pArr;
189         public:
190             value_array( size_t nSize )
191                 : m_pArr( new T[nSize] )
192             {}
193
194             T * get() const { return m_pArr.get(); }
195         };
196
197     public:
198         static void SetUpTestCase()
199         {
200             cds_test::config const& cfg = get_config( "queue_random" );
201
202             s_nReaderThreadCount = cfg.get_size_t( "ReaderCount", s_nReaderThreadCount );
203             s_nWriterThreadCount = cfg.get_size_t( "WriterCount", s_nWriterThreadCount );
204             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
205
206             s_nFCPassCount = cfg.get_uint( "FCPassCount", s_nFCPassCount );
207             s_nFCCompactFactor = cfg.get_uint( "FCCompactFactor", s_nFCCompactFactor );
208
209             if ( s_nReaderThreadCount == 0u )
210                 s_nReaderThreadCount = 1;
211             if ( s_nWriterThreadCount == 0u )
212                 s_nWriterThreadCount = 1;
213             if ( s_nQueueSize == 0u )
214                 s_nQueueSize = 1000;
215         }
216
217         //static void TearDownTestCase();
218
219     protected:
220         template <class Queue>
221         void analyze( Queue& testQueue, size_t /*nLeftOffset*/, size_t nRightOffset )
222         {
223             typedef Consumer<Queue> Reader;
224             typedef typename Reader::const_data_iterator    ReaderIterator;
225
226             size_t nPostTestPops = 0;
227             while ( testQueue.pop())
228                 ++nPostTestPops;
229
230             size_t nTotalPops = 0;
231             size_t nPopFalse = 0;
232             size_t nPoppedItems = 0;
233             size_t nPushFailed = 0;
234
235             std::vector< Reader * > arrReaders;
236
237             cds_test::thread_pool& pool = get_pool();
238             for ( size_t i = 0; i < pool.size(); ++i ) {
239                 cds_test::thread& thr = pool.get( i );
240                 if ( thr.type() == consumer_thread ) {
241                     Consumer<Queue>& consumer = static_cast<Consumer<Queue>&>( thr );
242                     nTotalPops += consumer.m_nPopped;
243                     nPopFalse += consumer.m_nPopEmpty;
244                     arrReaders.push_back( &consumer );
245                     EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer " << (i - s_nWriterThreadCount);
246
247                     size_t nPopped = 0;
248                     for ( size_t n = 0; n < s_nWriterThreadCount; ++n )
249                         nPopped += consumer.m_WriterData[n].size();
250
251                     {
252                         std::stringstream s;
253                         s << "consumer" << (i - s_nWriterThreadCount) << "_popped";
254                         propout() << std::make_pair( s.str().c_str(), nPopped );
255                     }
256                     nPoppedItems += nPopped;
257                 }
258                 else {
259                     Producer<Queue>& producer = static_cast<Producer<Queue>&>( thr );
260                     nPushFailed += producer.m_nPushFailed;
261                     if ( !std::is_base_of<cds::bounded_container, Queue>::value ) {
262                         EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer " << i;
263                     }
264                 }
265             }
266             EXPECT_EQ( nTotalPops, nPoppedItems );
267
268             propout() << std::make_pair( "success_pop", nTotalPops )
269                 << std::make_pair( "empty_pop", nPopFalse )
270                 << std::make_pair( "failed_push", nPushFailed );
271
272             size_t nQueueSize = s_nThreadPushCount * s_nWriterThreadCount;
273             EXPECT_EQ( nTotalPops + nPostTestPops, nQueueSize );
274             EXPECT_TRUE( testQueue.empty());
275
276             // Test that all items have been popped
277             // Test FIFO order
278             for ( size_t nWriter = 0; nWriter < s_nWriterThreadCount; ++nWriter ) {
279                 std::vector<size_t> arrData;
280                 arrData.reserve( s_nThreadPushCount );
281                 for ( size_t nReader = 0; nReader < arrReaders.size(); ++nReader ) {
282                     ReaderIterator it = arrReaders[nReader]->m_WriterData[nWriter].begin();
283                     ReaderIterator itEnd = arrReaders[nReader]->m_WriterData[nWriter].end();
284                     if ( it != itEnd ) {
285                         ReaderIterator itPrev = it;
286                         for ( ++it; it != itEnd; ++it ) {
287                             EXPECT_LT( *itPrev, *it + nRightOffset )
288                                 << "Reader " << nReader << ", Writer " << nWriter << ": prev=" << *itPrev << ", cur=" << *it;
289                             itPrev = it;
290                         }
291                     }
292
293                     for ( it = arrReaders[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
294                         arrData.push_back( *it );
295                 }
296                 std::sort( arrData.begin(), arrData.end());
297                 for ( size_t i=1; i < arrData.size(); ++i ) {
298                     if ( arrData[i-1] + 1 != arrData[i] ) {
299                         EXPECT_EQ( arrData[i-1] + 1,  arrData[i] ) << "Writer " << nWriter << ": [" << (i-1) << "]=" << arrData[i-1]
300                             << ", [" << i << "]=" << arrData[i];
301                     }
302                 }
303
304                 EXPECT_EQ( arrData[0], 0u ) << "Writer " << nWriter;
305                 EXPECT_EQ( arrData[arrData.size() - 1], s_nThreadPushCount - 1 ) << "Writer " << nWriter;
306             }
307         }
308
309         template <class Queue>
310         void test( Queue& q, value_array<typename Queue::value_type>& arrValue, size_t nLeftOffset, size_t nRightOffset )
311         {
312             s_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
313             s_nQueueSize = s_nThreadPushCount * s_nWriterThreadCount;
314             propout() << std::make_pair( "producer_count", s_nWriterThreadCount )
315                 << std::make_pair( "consumer_count", s_nReaderThreadCount )
316                 << std::make_pair( "queue_size", s_nQueueSize );
317
318             typename Queue::value_type * pValStart = arrValue.get();
319             typename Queue::value_type * pValEnd = pValStart + s_nQueueSize;
320
321             cds_test::thread_pool& pool = get_pool();
322             s_nProducerCount.store( s_nWriterThreadCount, atomics::memory_order_release );
323
324             // Writers must be first
325             pool.add( new Producer<Queue>( pool, q ), s_nWriterThreadCount );
326             {
327                 for ( typename Queue::value_type * it = pValStart; it != pValEnd; ++it ) {
328                     it->nNo = 0;
329                     it->nWriterNo = 0;
330                     it->nConsumer = c_nBadConsumer;
331                 }
332
333                 typename Queue::value_type * pStart = pValStart;
334                 for ( size_t i = 0; i < pool.size(); ++i ) {
335                     Producer<Queue>& producer = static_cast<Producer<Queue>&>( pool.get( i ));
336                     producer.m_pStart = pStart;
337                     pStart += s_nThreadPushCount;
338                     producer.m_pEnd = pStart;
339                 }
340             }
341             pool.add( new Consumer<Queue>( pool, q ), s_nReaderThreadCount );
342
343             std::chrono::milliseconds duration = pool.run();
344             propout() << std::make_pair( "duration", duration );
345
346             // Check that all values have been dequeued
347             {
348                 size_t nBadConsumerCount = 0;
349                 typename Queue::value_type * pEnd = pValStart + s_nQueueSize;
350                 for ( typename Queue::value_type * it = pValStart; it != pEnd; ++it  ) {
351                     if ( it->nConsumer == c_nBadConsumer )
352                         ++nBadConsumerCount;
353                 }
354                 EXPECT_EQ( nBadConsumerCount, 0u );
355             }
356
357             analyze( q, nLeftOffset, nRightOffset );
358
359             propout() << q.statistics();
360         }
361     };
362
363 #define CDSSTRESS_QUEUE_F( QueueType, NodeType ) \
364     TEST_F( intrusive_queue_push_pop, QueueType ) \
365     { \
366         typedef value_type<NodeType> node_type; \
367         typedef typename queue::Types< node_type >::QueueType queue_type; \
368         value_array<typename queue_type::value_type> arrValue( s_nQueueSize ); \
369         { \
370             queue_type q; \
371             test( q, arrValue, 0, 0 ); \
372         } \
373         queue_type::gc::force_dispose(); \
374     }
375
376     CDSSTRESS_QUEUE_F( MSQueue_HP,       cds::intrusive::msqueue::node<cds::gc::HP> )
377     CDSSTRESS_QUEUE_F( MSQueue_HP_ic,    cds::intrusive::msqueue::node<cds::gc::HP> )
378     CDSSTRESS_QUEUE_F( MSQueue_HP_stat,  cds::intrusive::msqueue::node<cds::gc::HP> )
379     CDSSTRESS_QUEUE_F( MSQueue_DHP,      cds::intrusive::msqueue::node<cds::gc::DHP> )
380     CDSSTRESS_QUEUE_F( MSQueue_DHP_ic,   cds::intrusive::msqueue::node<cds::gc::DHP> )
381     CDSSTRESS_QUEUE_F( MSQueue_DHP_stat, cds::intrusive::msqueue::node<cds::gc::DHP> )
382
383     CDSSTRESS_QUEUE_F( MoirQueue_HP,       cds::intrusive::msqueue::node<cds::gc::HP> )
384     CDSSTRESS_QUEUE_F( MoirQueue_HP_ic,    cds::intrusive::msqueue::node<cds::gc::HP> )
385     CDSSTRESS_QUEUE_F( MoirQueue_HP_stat,  cds::intrusive::msqueue::node<cds::gc::HP> )
386     CDSSTRESS_QUEUE_F( MoirQueue_DHP,      cds::intrusive::msqueue::node<cds::gc::DHP> )
387     CDSSTRESS_QUEUE_F( MoirQueue_DHP_ic,   cds::intrusive::msqueue::node<cds::gc::DHP> )
388     CDSSTRESS_QUEUE_F( MoirQueue_DHP_stat, cds::intrusive::msqueue::node<cds::gc::DHP> )
389
390     CDSSTRESS_QUEUE_F( OptimisticQueue_HP,       cds::intrusive::optimistic_queue::node<cds::gc::HP> )
391     CDSSTRESS_QUEUE_F( OptimisticQueue_HP_ic,    cds::intrusive::optimistic_queue::node<cds::gc::HP> )
392     CDSSTRESS_QUEUE_F( OptimisticQueue_HP_stat,  cds::intrusive::optimistic_queue::node<cds::gc::HP> )
393     CDSSTRESS_QUEUE_F( OptimisticQueue_DHP,      cds::intrusive::optimistic_queue::node<cds::gc::DHP> )
394     CDSSTRESS_QUEUE_F( OptimisticQueue_DHP_ic,   cds::intrusive::optimistic_queue::node<cds::gc::DHP> )
395     CDSSTRESS_QUEUE_F( OptimisticQueue_DHP_stat, cds::intrusive::optimistic_queue::node<cds::gc::DHP> )
396
397     CDSSTRESS_QUEUE_F( BasketQueue_HP,       cds::intrusive::basket_queue::node<cds::gc::HP> )
398     CDSSTRESS_QUEUE_F( BasketQueue_HP_ic,    cds::intrusive::basket_queue::node<cds::gc::HP> )
399     CDSSTRESS_QUEUE_F( BasketQueue_HP_stat,  cds::intrusive::basket_queue::node<cds::gc::HP> )
400     CDSSTRESS_QUEUE_F( BasketQueue_DHP,      cds::intrusive::basket_queue::node<cds::gc::DHP> )
401     CDSSTRESS_QUEUE_F( BasketQueue_DHP_ic,   cds::intrusive::basket_queue::node<cds::gc::DHP> )
402     CDSSTRESS_QUEUE_F( BasketQueue_DHP_stat, cds::intrusive::basket_queue::node<cds::gc::DHP> )
403 #undef CDSSTRESS_QUEUE_F
404
405
406 #define CDSSTRESS_QUEUE_F( QueueType, NodeType ) \
407     TEST_F( intrusive_queue_push_pop, QueueType ) \
408     { \
409         typedef value_type<NodeType> node_type; \
410         typedef typename queue::Types< node_type >::QueueType queue_type; \
411         value_array<typename queue_type::value_type> arrValue( s_nQueueSize ); \
412         queue_type q( s_nFCCompactFactor, s_nFCPassCount ); \
413         test( q, arrValue, 0, 0 ); \
414     }
415
416     CDSSTRESS_QUEUE_F(FCQueue_list_delay2,                      boost::intrusive::list_base_hook<> )
417     CDSSTRESS_QUEUE_F(FCQueue_list_delay2_elimination,          boost::intrusive::list_base_hook<> )
418     CDSSTRESS_QUEUE_F(FCQueue_list_delay2_elimination_stat,     boost::intrusive::list_base_hook<> )
419     CDSSTRESS_QUEUE_F(FCQueue_list_expbackoff_elimination,      boost::intrusive::list_base_hook<> )
420     CDSSTRESS_QUEUE_F(FCQueue_list_expbackoff_elimination_stat, boost::intrusive::list_base_hook<> )
421     CDSSTRESS_QUEUE_F(FCQueue_list_wait_ss,                     boost::intrusive::list_base_hook<> )
422     CDSSTRESS_QUEUE_F(FCQueue_list_wait_ss_stat,                boost::intrusive::list_base_hook<> )
423     CDSSTRESS_QUEUE_F(FCQueue_list_wait_sm,                     boost::intrusive::list_base_hook<> )
424     CDSSTRESS_QUEUE_F(FCQueue_list_wait_sm_stat,                boost::intrusive::list_base_hook<> )
425     CDSSTRESS_QUEUE_F(FCQueue_list_wait_mm,                     boost::intrusive::list_base_hook<> )
426     CDSSTRESS_QUEUE_F(FCQueue_list_wait_mm_stat,                boost::intrusive::list_base_hook<> )
427 #undef CDSSTRESS_QUEUE_F
428
429
430 #define CDSSTRESS_QUEUE_F( QueueType ) \
431     TEST_F( intrusive_queue_push_pop, QueueType ) \
432     { \
433         typedef typename queue::Types< value_type<> >::QueueType queue_type; \
434         value_array<typename queue_type::value_type> arrValue( s_nQueueSize ); \
435         queue_type q( s_nQueueSize ); \
436         test( q, arrValue, 0, 0 ); \
437     }
438
439     CDSSTRESS_QUEUE_F( VyukovMPMCCycleQueue_dyn )
440     CDSSTRESS_QUEUE_F( VyukovMPMCCycleQueue_dyn_ic )
441 #undef CDSSTRESS_QUEUE_F
442
443
444     // ********************************************************************
445     // SegmentedQueue test
446
447     class intrusive_segmented_queue_push_pop
448         : public intrusive_queue_push_pop
449         , public ::testing::WithParamInterface< size_t >
450     {
451         typedef intrusive_queue_push_pop base_class;
452
453     protected:
454         template <typename Queue>
455         void test()
456         {
457             value_array<typename Queue::value_type> arrValue( s_nQueueSize ); \
458             {
459                 size_t quasi_factor = GetParam();
460
461                 Queue q( quasi_factor );
462                 propout() << std::make_pair( "quasi_factor", quasi_factor );
463
464                 base_class::test( q, arrValue, quasi_factor * 2, quasi_factor );
465             }
466             Queue::gc::force_dispose();
467         }
468
469     public:
470         static std::vector< size_t > get_test_parameters()
471         {
472             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "intrusive_queue_push_pop" );
473             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
474             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
475
476             std::vector<size_t> args;
477             if ( bIterative && quasi_factor > 4 ) {
478                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
479                     args.push_back( qf );
480             }
481             else {
482                 if ( quasi_factor > 2 )
483                     args.push_back( quasi_factor );
484                 else
485                     args.push_back( 2 );
486             }
487
488             return args;
489         }
490     };
491
492 #define CDSSTRESS_QUEUE_F( type_name ) \
493     TEST_P( intrusive_segmented_queue_push_pop, type_name ) \
494     { \
495         typedef typename queue::Types<value_type<>>::type_name queue_type; \
496         test< queue_type >(); \
497     }
498
499     CDSSTRESS_QUEUE_F( SegmentedQueue_HP_spin )
500     //CDSSTRESS_QUEUE_F( SegmentedQueue_HP_spin_padding )
501     CDSSTRESS_QUEUE_F( SegmentedQueue_HP_spin_stat )
502     CDSSTRESS_QUEUE_F( SegmentedQueue_HP_mutex )
503     //CDSSTRESS_QUEUE_F( SegmentedQueue_HP_mutex_padding )
504     CDSSTRESS_QUEUE_F( SegmentedQueue_HP_mutex_stat )
505     CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_spin )
506     //CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_spin_padding )
507     CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_spin_stat )
508     CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_mutex )
509     //CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_mutex_padding )
510     CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_mutex_stat )
511
512     INSTANTIATE_TEST_CASE_P( SQ,
513         intrusive_segmented_queue_push_pop,
514         ::testing::ValuesIn( intrusive_segmented_queue_push_pop::get_test_parameters()));
515
516 } // namespace