Move libcds 1.6.0 from SVN
[libcds.git] / tests / unit / queue / queue_reader_writer.cpp
1 //$$CDS-header$$
2
3 #include "cppunit/thread.h"
4 #include "queue/queue_type.h"
5 #include "queue/queue_defs.h"
6
7 #include <vector>
8 #include <algorithm>
9
10 // Multi-threaded random queue test
11 namespace queue {
12
13 #define TEST_CASE( Q, V )       void Q() { test< Types<V>::Q >(); }
14 #define TEST_BOUNDED( Q, V )    TEST_CASE( Q, V )
15 #define TEST_SEGMENTED( Q, V )  void Q() { test_segmented< Types< V >::Q >(); }
16
17     namespace {
18         static size_t s_nReaderThreadCount = 4;
19         static size_t s_nWriterThreadCount = 4;
20         static size_t s_nQueueSize = 4000000;
21
22         struct Value {
23             size_t      nNo;
24             size_t      nWriterNo;
25         };
26     }
27
28     class Queue_ReaderWriter: public CppUnitMini::TestCase
29     {
30         template <class Queue>
31         class WriterThread: public CppUnitMini::TestThread
32         {
33             virtual TestThread *    clone()
34             {
35                 return new WriterThread( *this );
36             }
37         public:
38             Queue&              m_Queue;
39             double              m_fTime;
40             size_t              m_nPushFailed;
41
42         public:
43             WriterThread( CppUnitMini::ThreadPool& pool, Queue& q )
44                 : CppUnitMini::TestThread( pool )
45                 , m_Queue( q )
46             {}
47             WriterThread( WriterThread& src )
48                 : CppUnitMini::TestThread( src )
49                 , m_Queue( src.m_Queue )
50             {}
51
52             Queue_ReaderWriter&  getTest()
53             {
54                 return reinterpret_cast<Queue_ReaderWriter&>( m_Pool.m_Test );
55             }
56
57             virtual void init()
58             {
59                 cds::threading::Manager::attachThread();
60             }
61             virtual void fini()
62             {
63                 cds::threading::Manager::detachThread();
64             }
65
66             virtual void test()
67             {
68                 size_t nPushCount = getTest().m_nThreadPushCount;
69                 Value v;
70                 v.nWriterNo = m_nThreadNo;
71                 v.nNo = 0;
72                 m_nPushFailed = 0;
73
74                 m_fTime = m_Timer.duration();
75
76                 while ( v.nNo < nPushCount ) {
77                     if ( m_Queue.push( v ))
78                         ++v.nNo;
79                     else
80                         ++m_nPushFailed;
81                 }
82
83                 m_fTime = m_Timer.duration() - m_fTime;
84                 getTest().m_nWriterDone.fetch_add( 1 );
85             }
86         };
87
88         template <class Queue>
89         class ReaderThread: public CppUnitMini::TestThread
90         {
91             virtual TestThread *    clone()
92             {
93                 return new ReaderThread( *this );
94             }
95         public:
96             Queue&              m_Queue;
97             double              m_fTime;
98             size_t              m_nPopEmpty;
99             size_t              m_nPopped;
100             size_t              m_nBadWriter;
101
102             typedef std::vector<size_t> TPoppedData;
103             typedef std::vector<size_t>::iterator       data_iterator;
104             typedef std::vector<size_t>::const_iterator const_data_iterator;
105
106             std::vector<TPoppedData>        m_WriterData;
107
108         private:
109             void initPoppedData()
110             {
111                 const size_t nWriterCount = s_nWriterThreadCount;
112                 const size_t nWriterPushCount = getTest().m_nThreadPushCount;
113                 m_WriterData.resize( nWriterCount );
114                 for ( size_t i = 0; i < nWriterCount; ++i )
115                     m_WriterData[i].reserve( nWriterPushCount );
116             }
117
118         public:
119             ReaderThread( CppUnitMini::ThreadPool& pool, Queue& q )
120                 : CppUnitMini::TestThread( pool )
121                 , m_Queue( q )
122             {
123                 initPoppedData();
124             }
125             ReaderThread( ReaderThread& src )
126                 : CppUnitMini::TestThread( src )
127                 , m_Queue( src.m_Queue )
128             {
129                 initPoppedData();
130             }
131
132             Queue_ReaderWriter&  getTest()
133             {
134                 return reinterpret_cast<Queue_ReaderWriter&>( m_Pool.m_Test );
135             }
136
137             virtual void init()
138             {
139                 cds::threading::Manager::attachThread();
140             }
141             virtual void fini()
142             {
143                 cds::threading::Manager::detachThread();
144             }
145
146             virtual void test()
147             {
148                 m_nPopEmpty = 0;
149                 m_nPopped = 0;
150                 m_nBadWriter = 0;
151                 const size_t nTotalWriters = s_nWriterThreadCount;
152                 Value v;
153
154                 m_fTime = m_Timer.duration();
155
156                 while ( true ) {
157                     if ( m_Queue.pop( v ) ) {
158                         ++m_nPopped;
159                         if ( /*v.nWriterNo >= 0 &&*/ v.nWriterNo < nTotalWriters )
160                             m_WriterData[ v.nWriterNo ].push_back( v.nNo );
161                         else
162                             ++m_nBadWriter;
163                     }
164                     else
165                         ++m_nPopEmpty;
166
167                     if ( m_Queue.empty() ) {
168                         if ( getTest().m_nWriterDone.load() >= nTotalWriters ) {
169                             if ( m_Queue.empty() )
170                                     break;
171                         }
172                     }
173                 }
174
175                 m_fTime = m_Timer.duration() - m_fTime;
176             }
177         };
178
179     protected:
180         size_t                  m_nThreadPushCount;
181         CDS_ATOMIC::atomic<size_t>     m_nWriterDone;
182
183     protected:
184         template <class Queue>
185         void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue, size_t nLeftOffset = 0, size_t nRightOffset = 0  )
186         {
187             typedef ReaderThread<Queue> Reader;
188             typedef WriterThread<Queue> Writer;
189             typedef typename Reader::const_data_iterator    ReaderIterator;
190
191             size_t nPostTestPops = 0;
192             {
193                 Value v;
194                 while ( testQueue.pop( v ))
195                     ++nPostTestPops;
196             }
197
198             double fTimeWriter = 0;
199             double fTimeReader = 0;
200             size_t nTotalPops = 0;
201             size_t nPopFalse = 0;
202             size_t nPoppedItems = 0;
203             size_t nPushFailed = 0;
204
205             std::vector< Reader * > arrReaders;
206
207             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
208                 Reader * pReader = dynamic_cast<Reader *>( *it );
209                 if ( pReader ) {
210                     fTimeReader += pReader->m_fTime;
211                     nTotalPops += pReader->m_nPopped;
212                     nPopFalse += pReader->m_nPopEmpty;
213                     arrReaders.push_back( pReader );
214                     CPPUNIT_CHECK_EX( pReader->m_nBadWriter == 0, "reader " << pReader->m_nThreadNo << " bad writer event count=" << pReader->m_nBadWriter );
215
216                     size_t nPopped = 0;
217                     for ( size_t n = 0; n < s_nWriterThreadCount; ++n )
218                         nPopped += pReader->m_WriterData[n].size();
219
220                     CPPUNIT_MSG( "    Reader " << pReader->m_nThreadNo - s_nWriterThreadCount << " popped count=" << nPopped );
221                     nPoppedItems += nPopped;
222                 }
223                 else {
224                     Writer * pWriter = dynamic_cast<Writer *>( *it );
225                     CPPUNIT_ASSERT( pWriter != NULL );
226                     fTimeWriter += pWriter->m_fTime;
227                     nPushFailed += pWriter->m_nPushFailed;
228                     if ( !boost::is_base_of<cds::bounded_container, Queue>::value ) {
229                         CPPUNIT_CHECK_EX( pWriter->m_nPushFailed == 0,
230                             "writer " << pWriter->m_nThreadNo << " push failed count=" << pWriter->m_nPushFailed );
231                     }
232                 }
233             }
234             CPPUNIT_CHECK_EX( nTotalPops == nPoppedItems, "nTotalPops=" << nTotalPops << ", nPoppedItems=" << nPoppedItems );
235
236             CPPUNIT_MSG( "    Readers: duration=" << fTimeReader / s_nReaderThreadCount << ", success pop=" << nTotalPops << ", failed pops=" << nPopFalse );
237             CPPUNIT_MSG( "    Writers: duration=" << fTimeWriter / s_nWriterThreadCount << ", failed push=" << nPushFailed );
238
239             size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount;
240             CPPUNIT_CHECK_EX( nTotalPops + nPostTestPops == nQueueSize, "popped=" << nTotalPops + nPostTestPops << " must be " << nQueueSize );
241             CPPUNIT_CHECK( testQueue.empty() );
242
243             // Test that all items have been popped
244             CPPUNIT_MSG( "   Test consistency of popped sequence..." );
245             size_t nErrors = 0;
246             for ( size_t nWriter = 0; nWriter < s_nWriterThreadCount; ++nWriter ) {
247                 std::vector<size_t> arrData;
248                 arrData.reserve( m_nThreadPushCount );
249                 nErrors = 0;
250                 for ( size_t nReader = 0; nReader < arrReaders.size(); ++nReader ) {
251                     ReaderIterator it = arrReaders[nReader]->m_WriterData[nWriter].begin();
252                     ReaderIterator itEnd = arrReaders[nReader]->m_WriterData[nWriter].end();
253                     if ( it != itEnd ) {
254                         ReaderIterator itPrev = it;
255                         for ( ++it; it != itEnd; ++it ) {
256                             CPPUNIT_CHECK_EX( *itPrev < *it + nRightOffset, "Reader " << nReader << ", Writer " << nWriter << ": prev=" << *itPrev << ", cur=" << *it );
257                             if ( ++nErrors > 10 )
258                                 return;
259                             itPrev = it;
260                         }
261                     }
262
263                     for ( it = arrReaders[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
264                         arrData.push_back( *it );
265                 }
266                 std::sort( arrData.begin(), arrData.end() );
267                 nErrors = 0;
268                 for ( size_t i=1; i < arrData.size(); ++i ) {
269                     if ( arrData[i-1] + 1 != arrData[i] ) {
270                         CPPUNIT_CHECK_EX( arrData[i-1] + 1 == arrData[i], "Writer " << nWriter << ": [" << (i-1) << "]=" << arrData[i-1] << ", [" << i << "]=" << arrData[i] );
271                         if ( ++nErrors > 10 )
272                             return;
273                     }
274                 }
275
276                 CPPUNIT_CHECK_EX( arrData[0] == 0, "Writer " << nWriter << "[0] != 0" );
277                 CPPUNIT_CHECK_EX( arrData[arrData.size() - 1] == m_nThreadPushCount - 1, "Writer " << nWriter << "[last] != " << m_nThreadPushCount - 1 );
278             }
279         }
280
281         template <class Queue>
282         void test()
283         {
284             m_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
285             CPPUNIT_MSG( "    reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount
286                 << " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "..." );
287
288             Queue testQueue;
289             CppUnitMini::ThreadPool pool( *this );
290
291             m_nWriterDone.store( 0 );
292
293             // Writers must be first
294             pool.add( new WriterThread<Queue>( pool, testQueue ), s_nWriterThreadCount );
295             pool.add( new ReaderThread<Queue>( pool, testQueue ), s_nReaderThreadCount );
296
297             //CPPUNIT_MSG( "   Reader/Writer test, reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount << "..." );
298             pool.run();
299
300             analyze( pool, testQueue );
301             CPPUNIT_MSG( testQueue.statistics() );
302         }
303
304         template <class Queue>
305         void test_segmented()
306         {
307             m_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
308             CPPUNIT_MSG( "    reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount
309                 << " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "..." );
310
311             for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
312                 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
313
314                 Queue q( nSegmentSize );
315                 CppUnitMini::ThreadPool pool( *this );
316
317                 m_nWriterDone.store( 0 );
318
319                 // Writers must be first
320                 pool.add( new WriterThread<Queue>( pool, q ), s_nWriterThreadCount );
321                 pool.add( new ReaderThread<Queue>( pool, q ), s_nReaderThreadCount );
322
323                 pool.run();
324
325                 analyze( pool, q, nSegmentSize * 2, nSegmentSize );
326                 CPPUNIT_MSG( q.statistics() );
327             }
328         }
329
330         void setUpParams( const CppUnitMini::TestCfg& cfg ) {
331             s_nReaderThreadCount = cfg.getULong("ReaderCount", 4 );
332             s_nWriterThreadCount = cfg.getULong("WriterCount", 4 );
333             s_nQueueSize = cfg.getULong("QueueSize", 10000000 );
334         }
335
336     protected:
337         CDSUNIT_DECLARE_MoirQueue( Value )
338         CDSUNIT_DECLARE_MSQueue( Value )
339         CDSUNIT_DECLARE_OptimisticQueue( Value )
340         CDSUNIT_DECLARE_BasketQueue( Value )
341         CDSUNIT_DECLARE_FCQueue( Value )
342         CDSUNIT_DECLARE_FCDeque( Value )
343         CDSUNIT_DECLARE_SegmentedQueue( Value )
344         CDSUNIT_DECLARE_RWQueue( Value )
345         CDSUNIT_DECLARE_MichaelDeque( Value )
346         CDSUNIT_DECLARE_TsigasCysleQueue( Value )
347         CDSUNIT_DECLARE_VyukovMPMCCycleQueue( Value )
348         CDSUNIT_DECLARE_StdQueue( Value )
349
350         CPPUNIT_TEST_SUITE(Queue_ReaderWriter)
351             CDSUNIT_TEST_MoirQueue
352             CDSUNIT_TEST_MSQueue
353             CDSUNIT_TEST_OptimisticQueue
354             CDSUNIT_TEST_BasketQueue
355             CDSUNIT_TEST_FCQueue
356             CDSUNIT_TEST_FCDeque
357             CDSUNIT_TEST_SegmentedQueue
358             CDSUNIT_TEST_RWQueue
359             CDSUNIT_TEST_MichaelDeque
360             CDSUNIT_TEST_TsigasCysleQueue
361             CDSUNIT_TEST_VyukovMPMCCycleQueue
362             CDSUNIT_TEST_StdQueue
363         CPPUNIT_TEST_SUITE_END();
364     };
365
366 } // namespace queue
367
368 CPPUNIT_TEST_SUITE_REGISTRATION(queue::Queue_ReaderWriter);