3 #include "cppunit/thread.h"
4 #include "queue/queue_type.h"
5 #include "queue/queue_defs.h"
10 // Multi-threaded random queue test
13 #define TEST_CASE( Q, V ) void Q() { test< Types<V>::Q >(); }
14 #define TEST_BOUNDED( Q, V ) TEST_CASE( Q, V )
15 #define TEST_SEGMENTED( Q, V ) void Q() { test_segmented< Types< V >::Q >(); }
18 static size_t s_nReaderThreadCount = 4;
19 static size_t s_nWriterThreadCount = 4;
20 static size_t s_nQueueSize = 4000000;
28 class Queue_ReaderWriter: public CppUnitMini::TestCase
30 template <class Queue>
31 class WriterThread: public CppUnitMini::TestThread
33 virtual TestThread * clone()
35 return new WriterThread( *this );
43 WriterThread( CppUnitMini::ThreadPool& pool, Queue& q )
44 : CppUnitMini::TestThread( pool )
47 WriterThread( WriterThread& src )
48 : CppUnitMini::TestThread( src )
49 , m_Queue( src.m_Queue )
52 Queue_ReaderWriter& getTest()
54 return reinterpret_cast<Queue_ReaderWriter&>( m_Pool.m_Test );
59 cds::threading::Manager::attachThread();
63 cds::threading::Manager::detachThread();
68 size_t nPushCount = getTest().m_nThreadPushCount;
70 v.nWriterNo = m_nThreadNo;
74 m_fTime = m_Timer.duration();
76 while ( v.nNo < nPushCount ) {
77 if ( m_Queue.push( v ))
83 m_fTime = m_Timer.duration() - m_fTime;
84 getTest().m_nWriterDone.fetch_add( 1 );
88 template <class Queue>
89 class ReaderThread: public CppUnitMini::TestThread
91 virtual TestThread * clone()
93 return new ReaderThread( *this );
102 typedef std::vector<size_t> TPoppedData;
103 typedef std::vector<size_t>::iterator data_iterator;
104 typedef std::vector<size_t>::const_iterator const_data_iterator;
106 std::vector<TPoppedData> m_WriterData;
109 void initPoppedData()
111 const size_t nWriterCount = s_nWriterThreadCount;
112 const size_t nWriterPushCount = getTest().m_nThreadPushCount;
113 m_WriterData.resize( nWriterCount );
114 for ( size_t i = 0; i < nWriterCount; ++i )
115 m_WriterData[i].reserve( nWriterPushCount );
119 ReaderThread( CppUnitMini::ThreadPool& pool, Queue& q )
120 : CppUnitMini::TestThread( pool )
125 ReaderThread( ReaderThread& src )
126 : CppUnitMini::TestThread( src )
127 , m_Queue( src.m_Queue )
132 Queue_ReaderWriter& getTest()
134 return reinterpret_cast<Queue_ReaderWriter&>( m_Pool.m_Test );
139 cds::threading::Manager::attachThread();
143 cds::threading::Manager::detachThread();
151 const size_t nTotalWriters = s_nWriterThreadCount;
154 m_fTime = m_Timer.duration();
157 if ( m_Queue.pop( v ) ) {
159 if ( /*v.nWriterNo >= 0 &&*/ v.nWriterNo < nTotalWriters )
160 m_WriterData[ v.nWriterNo ].push_back( v.nNo );
167 if ( m_Queue.empty() ) {
168 if ( getTest().m_nWriterDone.load() >= nTotalWriters ) {
169 if ( m_Queue.empty() )
175 m_fTime = m_Timer.duration() - m_fTime;
180 size_t m_nThreadPushCount;
181 atomics::atomic<size_t> m_nWriterDone;
184 template <class Queue>
185 void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue, size_t nLeftOffset = 0, size_t nRightOffset = 0 )
187 typedef ReaderThread<Queue> Reader;
188 typedef WriterThread<Queue> Writer;
189 typedef typename Reader::const_data_iterator ReaderIterator;
191 size_t nPostTestPops = 0;
194 while ( testQueue.pop( v ))
198 double fTimeWriter = 0;
199 double fTimeReader = 0;
200 size_t nTotalPops = 0;
201 size_t nPopFalse = 0;
202 size_t nPoppedItems = 0;
203 size_t nPushFailed = 0;
205 std::vector< Reader * > arrReaders;
207 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
208 Reader * pReader = dynamic_cast<Reader *>( *it );
210 fTimeReader += pReader->m_fTime;
211 nTotalPops += pReader->m_nPopped;
212 nPopFalse += pReader->m_nPopEmpty;
213 arrReaders.push_back( pReader );
214 CPPUNIT_CHECK_EX( pReader->m_nBadWriter == 0, "reader " << pReader->m_nThreadNo << " bad writer event count=" << pReader->m_nBadWriter );
217 for ( size_t n = 0; n < s_nWriterThreadCount; ++n )
218 nPopped += pReader->m_WriterData[n].size();
220 CPPUNIT_MSG( " Reader " << pReader->m_nThreadNo - s_nWriterThreadCount << " popped count=" << nPopped );
221 nPoppedItems += nPopped;
224 Writer * pWriter = dynamic_cast<Writer *>( *it );
225 CPPUNIT_ASSERT( pWriter != nullptr );
226 fTimeWriter += pWriter->m_fTime;
227 nPushFailed += pWriter->m_nPushFailed;
228 if ( !boost::is_base_of<cds::bounded_container, Queue>::value ) {
229 CPPUNIT_CHECK_EX( pWriter->m_nPushFailed == 0,
230 "writer " << pWriter->m_nThreadNo << " push failed count=" << pWriter->m_nPushFailed );
234 CPPUNIT_CHECK_EX( nTotalPops == nPoppedItems, "nTotalPops=" << nTotalPops << ", nPoppedItems=" << nPoppedItems );
236 CPPUNIT_MSG( " Readers: duration=" << fTimeReader / s_nReaderThreadCount << ", success pop=" << nTotalPops << ", failed pops=" << nPopFalse );
237 CPPUNIT_MSG( " Writers: duration=" << fTimeWriter / s_nWriterThreadCount << ", failed push=" << nPushFailed );
239 size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount;
240 CPPUNIT_CHECK_EX( nTotalPops + nPostTestPops == nQueueSize, "popped=" << nTotalPops + nPostTestPops << " must be " << nQueueSize );
241 CPPUNIT_CHECK( testQueue.empty() );
243 // Test that all items have been popped
244 CPPUNIT_MSG( " Test consistency of popped sequence..." );
246 for ( size_t nWriter = 0; nWriter < s_nWriterThreadCount; ++nWriter ) {
247 std::vector<size_t> arrData;
248 arrData.reserve( m_nThreadPushCount );
250 for ( size_t nReader = 0; nReader < arrReaders.size(); ++nReader ) {
251 ReaderIterator it = arrReaders[nReader]->m_WriterData[nWriter].begin();
252 ReaderIterator itEnd = arrReaders[nReader]->m_WriterData[nWriter].end();
254 ReaderIterator itPrev = it;
255 for ( ++it; it != itEnd; ++it ) {
256 CPPUNIT_CHECK_EX( *itPrev < *it + nRightOffset, "Reader " << nReader << ", Writer " << nWriter << ": prev=" << *itPrev << ", cur=" << *it );
257 if ( ++nErrors > 10 )
263 for ( it = arrReaders[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
264 arrData.push_back( *it );
266 std::sort( arrData.begin(), arrData.end() );
268 for ( size_t i=1; i < arrData.size(); ++i ) {
269 if ( arrData[i-1] + 1 != arrData[i] ) {
270 CPPUNIT_CHECK_EX( arrData[i-1] + 1 == arrData[i], "Writer " << nWriter << ": [" << (i-1) << "]=" << arrData[i-1] << ", [" << i << "]=" << arrData[i] );
271 if ( ++nErrors > 10 )
276 CPPUNIT_CHECK_EX( arrData[0] == 0, "Writer " << nWriter << "[0] != 0" );
277 CPPUNIT_CHECK_EX( arrData[arrData.size() - 1] == m_nThreadPushCount - 1, "Writer " << nWriter << "[last] != " << m_nThreadPushCount - 1 );
281 template <class Queue>
284 m_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
285 CPPUNIT_MSG( " reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount
286 << " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "..." );
289 CppUnitMini::ThreadPool pool( *this );
291 m_nWriterDone.store( 0 );
293 // Writers must be first
294 pool.add( new WriterThread<Queue>( pool, testQueue ), s_nWriterThreadCount );
295 pool.add( new ReaderThread<Queue>( pool, testQueue ), s_nReaderThreadCount );
297 //CPPUNIT_MSG( " Reader/Writer test, reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount << "..." );
300 analyze( pool, testQueue );
301 CPPUNIT_MSG( testQueue.statistics() );
304 template <class Queue>
305 void test_segmented()
307 m_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
308 CPPUNIT_MSG( " reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount
309 << " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "..." );
311 for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
312 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
314 Queue q( nSegmentSize );
315 CppUnitMini::ThreadPool pool( *this );
317 m_nWriterDone.store( 0 );
319 // Writers must be first
320 pool.add( new WriterThread<Queue>( pool, q ), s_nWriterThreadCount );
321 pool.add( new ReaderThread<Queue>( pool, q ), s_nReaderThreadCount );
325 analyze( pool, q, nSegmentSize * 2, nSegmentSize );
326 CPPUNIT_MSG( q.statistics() );
330 void setUpParams( const CppUnitMini::TestCfg& cfg ) {
331 s_nReaderThreadCount = cfg.getULong("ReaderCount", 4 );
332 s_nWriterThreadCount = cfg.getULong("WriterCount", 4 );
333 s_nQueueSize = cfg.getULong("QueueSize", 10000000 );
337 CDSUNIT_DECLARE_MoirQueue( Value )
338 CDSUNIT_DECLARE_MSQueue( Value )
339 CDSUNIT_DECLARE_OptimisticQueue( Value )
340 CDSUNIT_DECLARE_BasketQueue( Value )
341 CDSUNIT_DECLARE_FCQueue( Value )
342 CDSUNIT_DECLARE_FCDeque( Value )
343 CDSUNIT_DECLARE_SegmentedQueue( Value )
344 CDSUNIT_DECLARE_RWQueue( Value )
345 CDSUNIT_DECLARE_TsigasCysleQueue( Value )
346 CDSUNIT_DECLARE_VyukovMPMCCycleQueue( Value )
347 CDSUNIT_DECLARE_StdQueue( Value )
349 CPPUNIT_TEST_SUITE(Queue_ReaderWriter)
350 CDSUNIT_TEST_MoirQueue
352 CDSUNIT_TEST_OptimisticQueue
353 CDSUNIT_TEST_BasketQueue
356 CDSUNIT_TEST_SegmentedQueue
358 CDSUNIT_TEST_TsigasCysleQueue
359 CDSUNIT_TEST_VyukovMPMCCycleQueue
360 CDSUNIT_TEST_StdQueue
361 CPPUNIT_TEST_SUITE_END();
366 CPPUNIT_TEST_SUITE_REGISTRATION(queue::Queue_ReaderWriter);