Removed TSan annotations, tuned memory ordering
[libcds.git] / tests / unit / queue / intrusive_queue_reader_writer.cpp
1 //$$CDS-header$$
2
3 #include "cppunit/thread.h"
4 #include "queue/intrusive_queue_type.h"
5 #include "queue/intrusive_queue_defs.h"
6 #include <vector>
7 #include <algorithm>
8
9 // Multi-threaded random queue test
10 namespace queue {
11
12 #define TEST_CASE( Q, HOOK )    void Q() { test< Types< Value<HOOK> >::Q >(); }
13 #define TEST_BOUNDED( Q )       void Q() { test_bounded< Types< Value<> >::Q >(); }
14 #define TEST_FCQUEUE( Q, HOOK ) void Q() { test_fcqueue< Types< Value<HOOK> >::Q >(); }
15 #define TEST_SEGMENTED( Q )     void Q() { test_segmented< Types< Value<> >::Q >(); }
16 #define TEST_BOOST( Q, HOOK )   void Q() { test_boost< Types< Value<HOOK> >::Q >(); }
17
18     namespace {
19         static size_t s_nReaderThreadCount = 4;
20         static size_t s_nWriterThreadCount = 4;
21         static size_t s_nQueueSize = 4000000;
22         static unsigned int s_nFCPassCount = 8;
23         static unsigned int s_nFCCompactFactor = 64;
24
25         struct empty {};
26
27         template <typename Base = empty >
28         struct Value: public Base
29         {
30             size_t      nNo;
31             size_t      nWriterNo;
32             size_t      nConsumer;
33         };
34     }
35
36     class IntrusiveQueue_ReaderWriter: public CppUnitMini::TestCase
37     {
38         template <class Queue>
39         class Producer: public CppUnitMini::TestThread
40         {
41             virtual TestThread *    clone()
42             {
43                 return new Producer( *this );
44             }
45         public:
46             Queue&              m_Queue;
47             double              m_fTime;
48             size_t              m_nPushFailed;
49
50             // Interval in m_arrValue
51             typename Queue::value_type *       m_pStart;
52             typename Queue::value_type *       m_pEnd;
53
54         public:
55             Producer( CppUnitMini::ThreadPool& pool, Queue& q )
56                 : CppUnitMini::TestThread( pool )
57                 , m_Queue( q )
58             {}
59             Producer( Producer& src )
60                 : CppUnitMini::TestThread( src )
61                 , m_Queue( src.m_Queue )
62             {}
63
64             IntrusiveQueue_ReaderWriter&  getTest()
65             {
66                 return static_cast<IntrusiveQueue_ReaderWriter&>( m_Pool.m_Test );
67             }
68
69             virtual void init()
70             {
71                 cds::threading::Manager::attachThread();
72             }
73             virtual void fini()
74             {
75                 cds::threading::Manager::detachThread();
76             }
77
78             virtual void test()
79             {
80                 m_nPushFailed = 0;
81
82                 m_fTime = m_Timer.duration();
83
84                 size_t i = 0;
85                 for ( typename Queue::value_type * p = m_pStart; p < m_pEnd; ) {
86                     p->nNo = i;
87                     p->nWriterNo = m_nThreadNo;
88                     if ( m_Queue.push( *p )) {
89                         ++p;
90                         ++i;
91                     }
92                     else
93                         ++m_nPushFailed;
94                 }
95
96                 m_fTime = m_Timer.duration() - m_fTime;
97                 getTest().m_nProducerCount.fetch_sub( 1, atomics::memory_order_release );
98             }
99         };
100
101         template <class Queue>
102         class Consumer: public CppUnitMini::TestThread
103         {
104             virtual TestThread *    clone()
105             {
106                 return new Consumer( *this );
107             }
108         public:
109             Queue&              m_Queue;
110             double              m_fTime;
111             size_t              m_nPopEmpty;
112             size_t              m_nPopped;
113             size_t              m_nBadWriter;
114
115             typedef std::vector<size_t> TPoppedData;
116             typedef std::vector<size_t>::iterator       data_iterator;
117             typedef std::vector<size_t>::const_iterator const_data_iterator;
118
119             std::vector<TPoppedData>        m_WriterData;
120
121         private:
122             void initPoppedData()
123             {
124                 const size_t nWriterCount = s_nWriterThreadCount;
125                 const size_t nWriterPushCount = getTest().m_nThreadPushCount;
126                 m_WriterData.resize( nWriterCount );
127                 for ( size_t i = 0; i < nWriterCount; ++i )
128                     m_WriterData[i].reserve( nWriterPushCount );
129             }
130
131         public:
132             Consumer( CppUnitMini::ThreadPool& pool, Queue& q )
133                 : CppUnitMini::TestThread( pool )
134                 , m_Queue( q )
135             {
136                 initPoppedData();
137             }
138             Consumer( Consumer& src )
139                 : CppUnitMini::TestThread( src )
140                 , m_Queue( src.m_Queue )
141             {
142                 initPoppedData();
143             }
144
145             IntrusiveQueue_ReaderWriter&  getTest()
146             {
147                 return static_cast<IntrusiveQueue_ReaderWriter&>( m_Pool.m_Test );
148             }
149
150             virtual void init()
151             {
152                 cds::threading::Manager::attachThread();
153             }
154             virtual void fini()
155             {
156                 cds::threading::Manager::detachThread();
157             }
158
159             virtual void test()
160             {
161                 m_nPopEmpty = 0;
162                 m_nPopped = 0;
163                 m_nBadWriter = 0;
164                 const size_t nTotalWriters = s_nWriterThreadCount;
165
166                 m_fTime = m_Timer.duration();
167
168                 while ( true ) {
169                     typename Queue::value_type * p = m_Queue.pop();
170                     if ( p ) {
171                         p->nConsumer = m_nThreadNo;
172                         ++m_nPopped;
173                         if ( p->nWriterNo < nTotalWriters )
174                             m_WriterData[ p->nWriterNo ].push_back( p->nNo );
175                         else
176                             ++m_nBadWriter;
177                     }
178                     else {
179                         ++m_nPopEmpty;
180                         if ( getTest().m_nProducerCount.load( atomics::memory_order_acquire ) == 0 && m_Queue.empty() )
181                             break;
182                     }
183                 }
184
185                 m_fTime = m_Timer.duration() - m_fTime;
186             }
187         };
188
189         template <typename T>
190         class value_array
191         {
192             T * m_pArr;
193         public:
194             value_array( size_t nSize )
195                 : m_pArr( new T[nSize] )
196             {}
197
198             ~value_array()
199             {
200                 delete [] m_pArr;
201             }
202
203             T * get() const { return m_pArr; }
204         };
205
206
207     protected:
208         size_t                  m_nThreadPushCount;
209         atomics::atomic<size_t>     m_nProducerCount;
210         static CDS_CONSTEXPR const size_t c_nBadConsumer = 0xbadc0ffe;
211
212     protected:
213         template <class Queue>
214         void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue, size_t /*nLeftOffset*/, size_t nRightOffset )
215         {
216             typedef Consumer<Queue> Reader;
217             typedef Producer<Queue> Writer;
218             typedef typename Reader::const_data_iterator    ReaderIterator;
219
220             size_t nPostTestPops = 0;
221             while ( testQueue.pop() )
222                 ++nPostTestPops;
223
224             double fTimeWriter = 0;
225             double fTimeReader = 0;
226             size_t nTotalPops = 0;
227             size_t nPopFalse = 0;
228             size_t nPoppedItems = 0;
229             size_t nPushFailed = 0;
230
231             std::vector< Reader * > arrReaders;
232
233             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
234                 Reader * pReader = dynamic_cast<Reader *>( *it );
235                 if ( pReader ) {
236                     fTimeReader += pReader->m_fTime;
237                     nTotalPops += pReader->m_nPopped;
238                     nPopFalse += pReader->m_nPopEmpty;
239                     arrReaders.push_back( pReader );
240                     CPPUNIT_CHECK_EX( pReader->m_nBadWriter == 0, "reader " << pReader->m_nThreadNo << " bad writer event count=" << pReader->m_nBadWriter );
241
242                     size_t nPopped = 0;
243                     for ( size_t n = 0; n < s_nWriterThreadCount; ++n )
244                         nPopped += pReader->m_WriterData[n].size();
245
246                     CPPUNIT_MSG( "    Reader " << pReader->m_nThreadNo - s_nWriterThreadCount << " popped count=" << nPopped );
247                     nPoppedItems += nPopped;
248                 }
249                 else {
250                     Writer * pWriter = dynamic_cast<Writer *>( *it );
251                     CPPUNIT_ASSERT( pWriter != nullptr );
252                     fTimeWriter += pWriter->m_fTime;
253                     nPushFailed += pWriter->m_nPushFailed;
254                     if ( !boost::is_base_of<cds::bounded_container, Queue>::value ) {
255                         CPPUNIT_CHECK_EX( pWriter->m_nPushFailed == 0,
256                             "writer " << pWriter->m_nThreadNo << " push failed count=" << pWriter->m_nPushFailed );
257                     }
258                 }
259             }
260             CPPUNIT_CHECK_EX( nTotalPops == nPoppedItems, "nTotalPops=" << nTotalPops << ", nPoppedItems=" << nPoppedItems );
261
262             CPPUNIT_MSG( "    Readers: duration=" << fTimeReader / s_nReaderThreadCount << ", success pop=" << nTotalPops << ", failed pops=" << nPopFalse );
263             CPPUNIT_MSG( "    Writers: duration=" << fTimeWriter / s_nWriterThreadCount << ", failed push=" << nPushFailed );
264
265             size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount;
266             CPPUNIT_CHECK_EX( nTotalPops + nPostTestPops == nQueueSize, "popped=" << nTotalPops + nPostTestPops << " must be " << nQueueSize );
267             CPPUNIT_CHECK( testQueue.empty() );
268
269             // Test that all items have been popped
270             // Test FIFO order
271             CPPUNIT_MSG( "   Test consistency of popped sequence..." );
272             size_t nErrors = 0;
273             for ( size_t nWriter = 0; nWriter < s_nWriterThreadCount; ++nWriter ) {
274                 std::vector<size_t> arrData;
275                 arrData.reserve( m_nThreadPushCount );
276                 nErrors = 0;
277                 for ( size_t nReader = 0; nReader < arrReaders.size(); ++nReader ) {
278                     ReaderIterator it = arrReaders[nReader]->m_WriterData[nWriter].begin();
279                     ReaderIterator itEnd = arrReaders[nReader]->m_WriterData[nWriter].end();
280                     if ( it != itEnd ) {
281                         ReaderIterator itPrev = it;
282                         for ( ++it; it != itEnd; ++it ) {
283                             CPPUNIT_CHECK_EX( *itPrev < *it + nRightOffset,
284                                 "Reader " << nReader << ", Writer " << nWriter << ": prev=" << *itPrev << ", cur=" << *it );
285                             if ( ++nErrors > 10 )
286                                 return;
287                             itPrev = it;
288                         }
289                     }
290
291                     for ( it = arrReaders[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
292                         arrData.push_back( *it );
293                 }
294                 std::sort( arrData.begin(), arrData.end() );
295                 nErrors = 0;
296                 for ( size_t i=1; i < arrData.size(); ++i ) {
297                     if ( arrData[i-1] + 1 != arrData[i] ) {
298                         CPPUNIT_CHECK_EX( arrData[i-1] + 1 == arrData[i], "Writer " << nWriter << ": [" << (i-1) << "]=" << arrData[i-1] << ", [" << i << "]=" << arrData[i] );
299                         if ( ++nErrors > 10 )
300                             return;
301                     }
302                 }
303
304                 CPPUNIT_CHECK_EX( arrData[0] == 0, "Writer " << nWriter << "[0] != 0" );
305                 CPPUNIT_CHECK_EX( arrData[arrData.size() - 1] == m_nThreadPushCount - 1, "Writer " << nWriter << "[last] != " << m_nThreadPushCount - 1 );
306             }
307         }
308
309         template <class Queue>
310         void test_with( Queue& testQueue, value_array<typename Queue::value_type>& arrValue, size_t nLeftOffset, size_t nRightOffset )
311         {
312             m_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
313             CPPUNIT_MSG( "    reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount
314                 << " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "..." );
315
316             typename Queue::value_type * pValStart = arrValue.get();
317             typename Queue::value_type * pValEnd = pValStart + s_nQueueSize;
318
319             CppUnitMini::ThreadPool pool( *this );
320
321             m_nProducerCount.store( s_nWriterThreadCount, atomics::memory_order_release );
322
323             // Writers must be first
324             pool.add( new Producer<Queue>( pool, testQueue ), s_nWriterThreadCount );
325             {
326                 for ( typename Queue::value_type * it = pValStart; it != pValEnd; ++it ) {
327                     it->nNo = 0;
328                     it->nWriterNo = 0;
329                     it->nConsumer = c_nBadConsumer;
330                 }
331
332                 typename Queue::value_type * pStart = pValStart;
333                 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
334                     static_cast<Producer<Queue>* >( *it )->m_pStart = pStart;
335                     pStart += m_nThreadPushCount;
336                     static_cast<Producer<Queue>* >( *it )->m_pEnd = pStart;
337                 }
338             }
339             pool.add( new Consumer<Queue>( pool, testQueue ), s_nReaderThreadCount );
340
341             pool.run();
342
343             // Check that all values have been dequeued
344             {
345                 size_t nBadConsumerCount = 0;
346                 size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount;
347                 typename Queue::value_type * pEnd = pValStart + nQueueSize;
348                 for ( typename Queue::value_type * it = pValStart; it != pEnd; ++it  ) {
349                     if ( it->nConsumer == c_nBadConsumer )
350                         ++nBadConsumerCount;
351                 }
352                 CPPUNIT_CHECK_EX( nBadConsumerCount == 0, "nBadConsumerCount=" << nBadConsumerCount );
353             }
354
355             analyze( pool, testQueue, nLeftOffset, nRightOffset );
356             CPPUNIT_MSG( testQueue.statistics() );
357         }
358
359         template <typename Queue>
360         void test()
361         {
362             value_array<typename Queue::value_type> arrValue( s_nQueueSize );
363             {
364                 {
365                     Queue q;
366                     test_with( q, arrValue, 0, 0 );
367                 }
368                 Queue::gc::force_dispose();
369             }
370         }
371
372         template <typename Queue>
373         void test_boost()
374         {
375             value_array<typename Queue::value_type> arrValue( s_nQueueSize );
376             {
377                 Queue q;
378                 test_with(q, arrValue, 0, 0);
379             }
380         }
381
382         template <typename Queue>
383         void test_bounded()
384         {
385             value_array<typename Queue::value_type> arrValue( s_nQueueSize );
386             Queue q;
387             test_with(q, arrValue, 0, 0);
388         }
389
390         template <typename Queue>
391         void test_fcqueue()
392         {
393             value_array<typename Queue::value_type> arrValue( s_nQueueSize );
394             CPPUNIT_MSG( "Combining pass count: " << s_nFCPassCount << ", compact factor: " << s_nFCCompactFactor );
395             Queue q( s_nFCCompactFactor, s_nFCPassCount );
396             test_with(q, arrValue, 0, 0);
397         }
398
399         template <typename Queue>
400         void test_segmented()
401         {
402             value_array<typename Queue::value_type> arrValue( s_nQueueSize );
403             for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
404                 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
405                 {
406                     Queue q( nSegmentSize );
407                     test_with( q, arrValue, nSegmentSize * 2, nSegmentSize );
408                 }
409                 Queue::gc::force_dispose();
410             }
411         }
412
413         template <typename Queue>
414         void test_spqueue()
415         {
416             value_array<typename Queue::value_type> arrValue( s_nQueueSize );
417             for ( size_t nArraySize = 2; nArraySize <= 64; nArraySize *= 2 ) {
418                 CPPUNIT_MSG( "Array size: " << nArraySize );
419                 {
420                     Queue q( nArraySize );
421                     test_with( q, arrValue, 0, 0 );
422                 }
423                 Queue::gc::force_dispose();
424             }
425         }
426
427         void setUpParams( const CppUnitMini::TestCfg& cfg ) {
428             s_nReaderThreadCount = cfg.getULong("ReaderCount", 4 );
429             s_nWriterThreadCount = cfg.getULong("WriterCount", 4 );
430             s_nQueueSize = cfg.getULong("QueueSize", 10000000 );
431             s_nFCPassCount = cfg.getUInt("FCPassCount", 8);
432             s_nFCCompactFactor = cfg.getUInt("FCCompactFactor", 64);
433         }
434
435     protected:
436         CDSUNIT_DECLARE_MSQueue
437         CDSUNIT_DECLARE_MoirQueue
438         CDSUNIT_DECLARE_OptimisticQueue
439         CDSUNIT_DECLARE_BasketQueue
440         CDSUNIT_DECLARE_FCQueue
441         CDSUNIT_DECLARE_SegmentedQueue
442         CDSUNIT_DECLARE_TsigasCycleQueue
443         CDSUNIT_DECLARE_VyukovMPMCCycleQueue
444         CDSUNIT_DECLARE_BoostSList
445
446
447         CPPUNIT_TEST_SUITE(IntrusiveQueue_ReaderWriter)
448             CDSUNIT_TEST_MSQueue
449             CDSUNIT_TEST_MoirQueue
450             CDSUNIT_TEST_OptimisticQueue
451             CDSUNIT_TEST_BasketQueue
452             CDSUNIT_TEST_FCQueue
453             CDSUNIT_TEST_SegmentedQueue
454             CDSUNIT_TEST_TsigasCycleQueue
455             CDSUNIT_TEST_VyukovMPMCCycleQueue
456             CDSUNIT_TEST_BoostSList
457         CPPUNIT_TEST_SUITE_END();
458     };
459
460 } // namespace queue
461
462 CPPUNIT_TEST_SUITE_REGISTRATION(queue::IntrusiveQueue_ReaderWriter);