fb9217c97d5d25289afb5e750143b10636c95cb7
[libcds.git] / tests / unit / queue / intrusive_queue_reader_writer.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #include "cppunit/thread.h"
32 #include "queue/intrusive_queue_type.h"
33 #include "queue/intrusive_queue_defs.h"
34 #include <vector>
35 #include <algorithm>
36
37 // Multi-threaded random queue test
38 namespace queue {
39
40 #define TEST_CASE( Q, HOOK )    void Q() { test< Types< Value<HOOK> >::Q >(); }
41 #define TEST_BOUNDED( Q )       void Q() { test_bounded< Types< Value<> >::Q >(); }
42 #define TEST_FCQUEUE( Q, HOOK ) void Q() { test_fcqueue< Types< Value<HOOK> >::Q >(); }
43 #define TEST_SEGMENTED( Q )     void Q() { test_segmented< Types< Value<> >::Q >(); }
44 #define TEST_BOOST( Q, HOOK )   void Q() { test_boost< Types< Value<HOOK> >::Q >(); }
45
46     namespace {
47         static size_t s_nReaderThreadCount = 4;
48         static size_t s_nWriterThreadCount = 4;
49         static size_t s_nQueueSize = 4000000;
50         static unsigned int s_nFCPassCount = 8;
51         static unsigned int s_nFCCompactFactor = 64;
52
53         struct empty {};
54
55         template <typename Base = empty >
56         struct Value: public Base
57         {
58             size_t      nNo;
59             size_t      nWriterNo;
60             size_t      nConsumer;
61         };
62     }
63
64     class IntrusiveQueue_ReaderWriter: public CppUnitMini::TestCase
65     {
66         template <class Queue>
67         class Producer: public CppUnitMini::TestThread
68         {
69             virtual TestThread *    clone()
70             {
71                 return new Producer( *this );
72             }
73         public:
74             Queue&              m_Queue;
75             double              m_fTime;
76             size_t              m_nPushFailed;
77
78             // Interval in m_arrValue
79             typename Queue::value_type *       m_pStart;
80             typename Queue::value_type *       m_pEnd;
81
82         public:
83             Producer( CppUnitMini::ThreadPool& pool, Queue& q )
84                 : CppUnitMini::TestThread( pool )
85                 , m_Queue( q )
86             {}
87             Producer( Producer& src )
88                 : CppUnitMini::TestThread( src )
89                 , m_Queue( src.m_Queue )
90             {}
91
92             IntrusiveQueue_ReaderWriter&  getTest()
93             {
94                 return static_cast<IntrusiveQueue_ReaderWriter&>( m_Pool.m_Test );
95             }
96
97             virtual void init()
98             {
99                 cds::threading::Manager::attachThread();
100             }
101             virtual void fini()
102             {
103                 cds::threading::Manager::detachThread();
104             }
105
106             virtual void test()
107             {
108                 m_nPushFailed = 0;
109
110                 m_fTime = m_Timer.duration();
111
112                 size_t i = 0;
113                 for ( typename Queue::value_type * p = m_pStart; p < m_pEnd; ) {
114                     p->nNo = i;
115                     p->nWriterNo = m_nThreadNo;
116                     CDS_TSAN_ANNOTATE_HAPPENS_BEFORE( &p->nWriterNo );
117                     if ( m_Queue.push( *p )) {
118                         ++p;
119                         ++i;
120                     }
121                     else
122                         ++m_nPushFailed;
123                 }
124
125                 m_fTime = m_Timer.duration() - m_fTime;
126                 getTest().m_nProducerCount.fetch_sub( 1, atomics::memory_order_release );
127             }
128         };
129
130         template <class Queue>
131         class Consumer: public CppUnitMini::TestThread
132         {
133             virtual TestThread *    clone()
134             {
135                 return new Consumer( *this );
136             }
137         public:
138             Queue&              m_Queue;
139             double              m_fTime;
140             size_t              m_nPopEmpty;
141             size_t              m_nPopped;
142             size_t              m_nBadWriter;
143
144             typedef std::vector<size_t> TPoppedData;
145             typedef std::vector<size_t>::iterator       data_iterator;
146             typedef std::vector<size_t>::const_iterator const_data_iterator;
147
148             std::vector<TPoppedData>        m_WriterData;
149
150         private:
151             void initPoppedData()
152             {
153                 const size_t nWriterCount = s_nWriterThreadCount;
154                 const size_t nWriterPushCount = getTest().m_nThreadPushCount;
155                 m_WriterData.resize( nWriterCount );
156                 for ( size_t i = 0; i < nWriterCount; ++i )
157                     m_WriterData[i].reserve( nWriterPushCount );
158             }
159
160         public:
161             Consumer( CppUnitMini::ThreadPool& pool, Queue& q )
162                 : CppUnitMini::TestThread( pool )
163                 , m_Queue( q )
164             {
165                 initPoppedData();
166             }
167             Consumer( Consumer& src )
168                 : CppUnitMini::TestThread( src )
169                 , m_Queue( src.m_Queue )
170             {
171                 initPoppedData();
172             }
173
174             IntrusiveQueue_ReaderWriter&  getTest()
175             {
176                 return static_cast<IntrusiveQueue_ReaderWriter&>( m_Pool.m_Test );
177             }
178
179             virtual void init()
180             {
181                 cds::threading::Manager::attachThread();
182             }
183             virtual void fini()
184             {
185                 cds::threading::Manager::detachThread();
186             }
187
188             virtual void test()
189             {
190                 m_nPopEmpty = 0;
191                 m_nPopped = 0;
192                 m_nBadWriter = 0;
193                 const size_t nTotalWriters = s_nWriterThreadCount;
194
195                 m_fTime = m_Timer.duration();
196
197                 while ( true ) {
198                     typename Queue::value_type * p = m_Queue.pop();
199                     if ( p ) {
200                         p->nConsumer = m_nThreadNo;
201                         ++m_nPopped;
202                         CDS_TSAN_ANNOTATE_HAPPENS_AFTER( &p->nWriterNo );
203                         if ( p->nWriterNo < nTotalWriters )
204                             m_WriterData[ p->nWriterNo ].push_back( p->nNo );
205                         else
206                             ++m_nBadWriter;
207                     }
208                     else {
209                         ++m_nPopEmpty;
210                         if ( getTest().m_nProducerCount.load( atomics::memory_order_acquire ) == 0 && m_Queue.empty() )
211                             break;
212                     }
213                 }
214
215                 m_fTime = m_Timer.duration() - m_fTime;
216             }
217         };
218
219         template <typename T>
220         class value_array
221         {
222             T * m_pArr;
223         public:
224             value_array( size_t nSize )
225                 : m_pArr( new T[nSize] )
226             {}
227
228             ~value_array()
229             {
230                 delete [] m_pArr;
231             }
232
233             T * get() const { return m_pArr; }
234         };
235
236
237     protected:
238         size_t                  m_nThreadPushCount;
239         atomics::atomic<size_t>     m_nProducerCount;
240         static CDS_CONSTEXPR const size_t c_nBadConsumer = 0xbadc0ffe;
241
242     protected:
243         template <class Queue>
244         void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue, size_t /*nLeftOffset*/, size_t nRightOffset )
245         {
246             typedef Consumer<Queue> Reader;
247             typedef Producer<Queue> Writer;
248             typedef typename Reader::const_data_iterator    ReaderIterator;
249
250             size_t nPostTestPops = 0;
251             while ( testQueue.pop() )
252                 ++nPostTestPops;
253
254             double fTimeWriter = 0;
255             double fTimeReader = 0;
256             size_t nTotalPops = 0;
257             size_t nPopFalse = 0;
258             size_t nPoppedItems = 0;
259             size_t nPushFailed = 0;
260
261             std::vector< Reader * > arrReaders;
262
263             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
264                 Reader * pReader = dynamic_cast<Reader *>( *it );
265                 if ( pReader ) {
266                     fTimeReader += pReader->m_fTime;
267                     nTotalPops += pReader->m_nPopped;
268                     nPopFalse += pReader->m_nPopEmpty;
269                     arrReaders.push_back( pReader );
270                     CPPUNIT_CHECK_EX( pReader->m_nBadWriter == 0, "reader " << pReader->m_nThreadNo << " bad writer event count=" << pReader->m_nBadWriter );
271
272                     size_t nPopped = 0;
273                     for ( size_t n = 0; n < s_nWriterThreadCount; ++n )
274                         nPopped += pReader->m_WriterData[n].size();
275
276                     CPPUNIT_MSG( "    Reader " << pReader->m_nThreadNo - s_nWriterThreadCount << " popped count=" << nPopped );
277                     nPoppedItems += nPopped;
278                 }
279                 else {
280                     Writer * pWriter = dynamic_cast<Writer *>( *it );
281                     CPPUNIT_ASSERT( pWriter != nullptr );
282                     fTimeWriter += pWriter->m_fTime;
283                     nPushFailed += pWriter->m_nPushFailed;
284                     if ( !boost::is_base_of<cds::bounded_container, Queue>::value ) {
285                         CPPUNIT_CHECK_EX( pWriter->m_nPushFailed == 0,
286                             "writer " << pWriter->m_nThreadNo << " push failed count=" << pWriter->m_nPushFailed );
287                     }
288                 }
289             }
290             CPPUNIT_CHECK_EX( nTotalPops == nPoppedItems, "nTotalPops=" << nTotalPops << ", nPoppedItems=" << nPoppedItems );
291
292             CPPUNIT_MSG( "    Readers: duration=" << fTimeReader / s_nReaderThreadCount << ", success pop=" << nTotalPops << ", failed pops=" << nPopFalse );
293             CPPUNIT_MSG( "    Writers: duration=" << fTimeWriter / s_nWriterThreadCount << ", failed push=" << nPushFailed );
294
295             size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount;
296             CPPUNIT_CHECK_EX( nTotalPops + nPostTestPops == nQueueSize, "popped=" << nTotalPops + nPostTestPops << " must be " << nQueueSize );
297             CPPUNIT_CHECK( testQueue.empty() );
298
299             // Test that all items have been popped
300             // Test FIFO order
301             CPPUNIT_MSG( "   Test consistency of popped sequence..." );
302             size_t nErrors = 0;
303             for ( size_t nWriter = 0; nWriter < s_nWriterThreadCount; ++nWriter ) {
304                 std::vector<size_t> arrData;
305                 arrData.reserve( m_nThreadPushCount );
306                 nErrors = 0;
307                 for ( size_t nReader = 0; nReader < arrReaders.size(); ++nReader ) {
308                     ReaderIterator it = arrReaders[nReader]->m_WriterData[nWriter].begin();
309                     ReaderIterator itEnd = arrReaders[nReader]->m_WriterData[nWriter].end();
310                     if ( it != itEnd ) {
311                         ReaderIterator itPrev = it;
312                         for ( ++it; it != itEnd; ++it ) {
313                             CPPUNIT_CHECK_EX( *itPrev < *it + nRightOffset,
314                                 "Reader " << nReader << ", Writer " << nWriter << ": prev=" << *itPrev << ", cur=" << *it );
315                             if ( ++nErrors > 10 )
316                                 return;
317                             itPrev = it;
318                         }
319                     }
320
321                     for ( it = arrReaders[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
322                         arrData.push_back( *it );
323                 }
324                 std::sort( arrData.begin(), arrData.end() );
325                 nErrors = 0;
326                 for ( size_t i=1; i < arrData.size(); ++i ) {
327                     if ( arrData[i-1] + 1 != arrData[i] ) {
328                         CPPUNIT_CHECK_EX( arrData[i-1] + 1 == arrData[i], "Writer " << nWriter << ": [" << (i-1) << "]=" << arrData[i-1] << ", [" << i << "]=" << arrData[i] );
329                         if ( ++nErrors > 10 )
330                             return;
331                     }
332                 }
333
334                 CPPUNIT_CHECK_EX( arrData[0] == 0, "Writer " << nWriter << "[0] != 0" );
335                 CPPUNIT_CHECK_EX( arrData[arrData.size() - 1] == m_nThreadPushCount - 1, "Writer " << nWriter << "[last] != " << m_nThreadPushCount - 1 );
336             }
337         }
338
339         template <class Queue>
340         void test_with( Queue& testQueue, value_array<typename Queue::value_type>& arrValue, size_t nLeftOffset, size_t nRightOffset )
341         {
342             m_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
343             CPPUNIT_MSG( "    reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount
344                 << " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "..." );
345
346             typename Queue::value_type * pValStart = arrValue.get();
347             typename Queue::value_type * pValEnd = pValStart + s_nQueueSize;
348
349             CppUnitMini::ThreadPool pool( *this );
350
351             m_nProducerCount.store( s_nWriterThreadCount, atomics::memory_order_release );
352
353             // Writers must be first
354             pool.add( new Producer<Queue>( pool, testQueue ), s_nWriterThreadCount );
355             {
356                 for ( typename Queue::value_type * it = pValStart; it != pValEnd; ++it ) {
357                     it->nNo = 0;
358                     it->nWriterNo = 0;
359                     it->nConsumer = c_nBadConsumer;
360                 }
361
362                 typename Queue::value_type * pStart = pValStart;
363                 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
364                     static_cast<Producer<Queue>* >( *it )->m_pStart = pStart;
365                     pStart += m_nThreadPushCount;
366                     static_cast<Producer<Queue>* >( *it )->m_pEnd = pStart;
367                 }
368             }
369             pool.add( new Consumer<Queue>( pool, testQueue ), s_nReaderThreadCount );
370
371             pool.run();
372
373             // Check that all values have been dequeued
374             {
375                 size_t nBadConsumerCount = 0;
376                 size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount;
377                 typename Queue::value_type * pEnd = pValStart + nQueueSize;
378                 for ( typename Queue::value_type * it = pValStart; it != pEnd; ++it  ) {
379                     if ( it->nConsumer == c_nBadConsumer )
380                         ++nBadConsumerCount;
381                 }
382                 CPPUNIT_CHECK_EX( nBadConsumerCount == 0, "nBadConsumerCount=" << nBadConsumerCount );
383             }
384
385             analyze( pool, testQueue, nLeftOffset, nRightOffset );
386             CPPUNIT_MSG( testQueue.statistics() );
387         }
388
389         template <typename Queue>
390         void test()
391         {
392             value_array<typename Queue::value_type> arrValue( s_nQueueSize );
393             {
394                 {
395                     Queue q;
396                     test_with( q, arrValue, 0, 0 );
397                 }
398                 Queue::gc::force_dispose();
399             }
400         }
401
402         template <typename Queue>
403         void test_boost()
404         {
405             value_array<typename Queue::value_type> arrValue( s_nQueueSize );
406             {
407                 Queue q;
408                 test_with(q, arrValue, 0, 0);
409             }
410         }
411
412         template <typename Queue>
413         void test_bounded()
414         {
415             value_array<typename Queue::value_type> arrValue( s_nQueueSize );
416             Queue q;
417             test_with(q, arrValue, 0, 0);
418         }
419
420         template <typename Queue>
421         void test_fcqueue()
422         {
423             value_array<typename Queue::value_type> arrValue( s_nQueueSize );
424             CPPUNIT_MSG( "Combining pass count: " << s_nFCPassCount << ", compact factor: " << s_nFCCompactFactor );
425             Queue q( s_nFCCompactFactor, s_nFCPassCount );
426             test_with(q, arrValue, 0, 0);
427         }
428
429         template <typename Queue>
430         void test_segmented()
431         {
432             value_array<typename Queue::value_type> arrValue( s_nQueueSize );
433             for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
434                 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
435                 {
436                     Queue q( nSegmentSize );
437                     test_with( q, arrValue, nSegmentSize * 2, nSegmentSize );
438                 }
439                 Queue::gc::force_dispose();
440             }
441         }
442
443         template <typename Queue>
444         void test_spqueue()
445         {
446             value_array<typename Queue::value_type> arrValue( s_nQueueSize );
447             for ( size_t nArraySize = 2; nArraySize <= 64; nArraySize *= 2 ) {
448                 CPPUNIT_MSG( "Array size: " << nArraySize );
449                 {
450                     Queue q( nArraySize );
451                     test_with( q, arrValue, 0, 0 );
452                 }
453                 Queue::gc::force_dispose();
454             }
455         }
456
457         void setUpParams( const CppUnitMini::TestCfg& cfg ) {
458             s_nReaderThreadCount = cfg.getULong("ReaderCount", 4 );
459             s_nWriterThreadCount = cfg.getULong("WriterCount", 4 );
460             s_nQueueSize = cfg.getULong("QueueSize", 10000000 );
461             s_nFCPassCount = cfg.getUInt("FCPassCount", 8);
462             s_nFCCompactFactor = cfg.getUInt("FCCompactFactor", 64);
463         }
464
465     protected:
466         CDSUNIT_DECLARE_MSQueue
467         CDSUNIT_DECLARE_MoirQueue
468         CDSUNIT_DECLARE_OptimisticQueue
469         CDSUNIT_DECLARE_BasketQueue
470         CDSUNIT_DECLARE_FCQueue
471         CDSUNIT_DECLARE_SegmentedQueue
472         CDSUNIT_DECLARE_TsigasCycleQueue
473         CDSUNIT_DECLARE_VyukovMPMCCycleQueue
474         CDSUNIT_DECLARE_BoostSList
475
476
477         CPPUNIT_TEST_SUITE(IntrusiveQueue_ReaderWriter)
478             CDSUNIT_TEST_MSQueue
479             CDSUNIT_TEST_MoirQueue
480             CDSUNIT_TEST_OptimisticQueue
481             CDSUNIT_TEST_BasketQueue
482             CDSUNIT_TEST_FCQueue
483             CDSUNIT_TEST_SegmentedQueue
484             CDSUNIT_TEST_TsigasCycleQueue
485             CDSUNIT_TEST_VyukovMPMCCycleQueue
486             CDSUNIT_TEST_BoostSList
487         CPPUNIT_TEST_SUITE_END();
488     };
489
490 } // namespace queue
491
492 CPPUNIT_TEST_SUITE_REGISTRATION(queue::IntrusiveQueue_ReaderWriter);