Move libcds 1.6.0 from SVN
[libcds.git] / tests / unit / queue / queue_random.cpp
1 //$$CDS-header$$
2
3 #include "cppunit/thread.h"
4 #include "queue/queue_type.h"
5 #include "queue/queue_defs.h"
6
7
8 #include <vector>
9 #include <boost/type_traits/is_base_of.hpp>
10
11 // Multi-threaded queue test for random push/pop operation
12 namespace queue {
13
14 #define TEST_CASE( Q, V )       void Q() { test< Types<V>::Q >(); }
15 #define TEST_BOUNDED( Q, V )    TEST_CASE( Q, V )
16 #define TEST_SEGMENTED( Q, V )  void Q() { test_segmented< Types< V >::Q >(); }
17
18     namespace ns_Queue_Random {
19         static size_t s_nThreadCount = 16;
20         static size_t s_nQueueSize = 10000000;
21
22         struct SimpleValue {
23             size_t      nNo;
24             size_t      nThread;
25
26             SimpleValue() {}
27             SimpleValue( size_t n ): nNo(n) {}
28             size_t getNo() const { return  nNo; }
29         };
30     }
31
32     using namespace ns_Queue_Random;
33
34     class Queue_Random: public CppUnitMini::TestCase
35     {
36         typedef CppUnitMini::TestCase base_class;
37
38         template <class Queue>
39         class Thread: public CppUnitMini::TestThread
40         {
41             virtual TestThread *    clone()
42             {
43                 return new Thread( *this );
44             }
45         public:
46             Queue&              m_Queue;
47             double              m_fTime;
48
49             size_t  m_nPushCount;
50             size_t  m_nPopCount;
51             size_t  m_nEmptyPop;
52
53             size_t  m_nUndefWriter;
54             size_t  m_nRepeatValue;
55             size_t  m_nPushError        ;    // push error count
56
57             std::vector<size_t> m_arrLastRead;
58             std::vector<size_t> m_arrPopCountPerThread;
59
60             size_t const m_nSpread;
61
62         public:
63             Thread( CppUnitMini::ThreadPool& pool, Queue& q, size_t nSpread = 0 )
64                 : CppUnitMini::TestThread( pool )
65                 , m_Queue( q )
66                 , m_nSpread( nSpread )
67             {}
68             Thread( Thread& src )
69                 : CppUnitMini::TestThread( src )
70                 , m_Queue( src.m_Queue )
71                 , m_nSpread( src.m_nSpread )
72             {}
73
74             Queue_Random&  getTest()
75             {
76                 return reinterpret_cast<Queue_Random&>( m_Pool.m_Test );
77             }
78
79             virtual void init()
80             {
81                 cds::threading::Manager::attachThread();
82                 m_nPushCount =
83                     m_nPopCount =
84                     m_nEmptyPop =
85                     m_nUndefWriter =
86                     m_nRepeatValue =
87                     m_nPushError = 0;
88
89                 m_arrLastRead.resize( s_nThreadCount, 0 );
90                 m_arrPopCountPerThread.resize( s_nThreadCount, 0 );
91             }
92             virtual void fini()
93             {
94                 cds::threading::Manager::detachThread();
95             }
96
97             virtual void test()
98             {
99                 size_t const nThreadCount = s_nThreadCount;
100                 size_t const nTotalPush = getTest().m_nThreadPushCount;
101
102                 SimpleValue node;
103
104                 m_fTime = m_Timer.duration();
105
106                 bool bNextPop = false;
107                 while ( m_nPushCount < nTotalPush ) {
108                     if ( !bNextPop && (rand() & 3) != 3 ) {
109                         // push
110                         node.nThread = m_nThreadNo;
111                         node.nNo = ++m_nPushCount;
112                         if ( !m_Queue.push( node )) {
113                             ++m_nPushError;
114                             --m_nPushCount;
115                         }
116
117                     }
118                     else {
119                         // pop
120                         pop( nThreadCount );
121                         bNextPop = false;
122                     }
123                 }
124
125                 size_t nPopLoop = 0;
126                 while ( !m_Queue.empty() && nPopLoop < 1000000 ) {
127                     if ( pop( nThreadCount ) )
128                         nPopLoop = 0;
129                     else
130                         ++nPopLoop;
131                 }
132
133
134                 m_fTime = m_Timer.duration() - m_fTime;
135             }
136
137             bool pop( size_t nThreadCount )
138             {
139                 SimpleValue node;
140                 node.nThread = -1;
141                 node.nNo = -1;
142                 if ( m_Queue.pop( node )) {
143                     ++m_nPopCount;
144                     if ( node.nThread < nThreadCount ) {
145                         m_arrPopCountPerThread[ node.nThread ] += 1;
146                         if ( m_nSpread ) {
147                             if ( m_arrLastRead[ node.nThread ] > node.nNo ) {
148                                 if ( m_arrLastRead[ node.nThread ] - node.nNo > m_nSpread )
149                                     ++m_nRepeatValue;
150                             }
151                             else if ( m_arrLastRead[ node.nThread ] == node.nNo )
152                                 ++m_nRepeatValue;
153                             m_arrLastRead[ node.nThread ] = node.nNo;
154                         }
155                         else {
156                             if ( m_arrLastRead[ node.nThread ] < node.nNo ) {
157                                 m_arrLastRead[ node.nThread ] = node.nNo;
158                             }
159                             else
160                                 ++m_nRepeatValue;
161                         }
162
163                         //if ( node.nNo < m_Test.m_nPushCount )
164                         //    m_Test.m_pRead[ node.nWriter ][ node.nNo ] = node.nNo;
165                     }
166                     else {
167                         ++m_nUndefWriter;
168                     }
169                 }
170                 else {
171                     ++m_nEmptyPop;
172                     return false;
173                 }
174                 return true;
175             }
176         };
177
178     protected:
179         size_t  m_nThreadPushCount;
180
181     protected:
182         template <class Queue>
183         void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue  )
184         {
185             CPPUNIT_CHECK( testQueue.empty() );
186
187             std::vector< size_t > arrPushCount;
188             arrPushCount.resize( s_nThreadCount, 0 );
189
190             size_t nPushTotal = 0;
191             size_t nPopTotal  = 0;
192             double fTime = 0;
193             size_t nPushError = 0;
194
195             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
196                 Thread<Queue> * pThread = static_cast<Thread<Queue> *>( *it );
197                 CPPUNIT_CHECK( pThread->m_nUndefWriter == 0 );
198                 CPPUNIT_CHECK_EX( pThread->m_nRepeatValue == 0, "nRepeatValue=" << pThread->m_nRepeatValue );
199                 if ( !boost::is_base_of<cds::bounded_container, Queue>::value ) {
200                     CPPUNIT_CHECK( pThread->m_nPushError == 0 );
201                 }
202                 else
203                     nPushError += pThread->m_nPushError;
204
205                 arrPushCount[ pThread->m_nThreadNo ] += pThread->m_nPushCount;
206
207                 nPushTotal += pThread->m_nPushCount;
208                 nPopTotal += pThread->m_nPopCount;
209                 fTime += pThread->m_fTime;
210             }
211
212             CPPUNIT_MSG( "     Duration=" << (fTime /= s_nThreadCount) );
213             if ( boost::is_base_of<cds::bounded_container, Queue>::value ) {
214                 CPPUNIT_MSG( "         push error (when queue is full)=" << nPushError );
215             }
216
217             size_t nTotalItems = m_nThreadPushCount * s_nThreadCount;
218
219             CPPUNIT_CHECK_EX( nPushTotal == nTotalItems, "nPushTotal=" << nPushTotal << ", nTotalItems=" << nTotalItems );
220             CPPUNIT_CHECK_EX( nPopTotal == nTotalItems, "nPopTotal=" << nPopTotal << ", nTotalItems=" << nTotalItems );
221
222             for ( size_t i = 0; i < s_nThreadCount; ++i )
223                 CPPUNIT_CHECK( arrPushCount[i] == m_nThreadPushCount );
224         }
225
226         template <class Queue>
227         void test()
228         {
229             CPPUNIT_MSG( "Random push/pop test\n    thread count=" << s_nThreadCount << ", push count=" << s_nQueueSize << " ..." );
230
231             m_nThreadPushCount = s_nQueueSize / s_nThreadCount;
232
233             Queue testQueue;
234             CppUnitMini::ThreadPool pool( *this );
235             pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
236
237             pool.run();
238
239             analyze( pool, testQueue );
240             CPPUNIT_MSG( testQueue.statistics() );
241         }
242
243         template <class Queue>
244         void test_segmented()
245         {
246             CPPUNIT_MSG( "Random push/pop test\n    thread count=" << s_nThreadCount << ", push count=" << s_nQueueSize << " ..." );
247
248             m_nThreadPushCount = s_nQueueSize / s_nThreadCount;
249
250             for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
251                 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
252
253                 Queue testQueue( nSegmentSize );
254                 CppUnitMini::ThreadPool pool( *this );
255                 pool.add( new Thread<Queue>( pool, testQueue, nSegmentSize * 2 ), s_nThreadCount );
256
257                 pool.run();
258
259                 analyze( pool, testQueue );
260                 CPPUNIT_MSG( testQueue.statistics() );
261             }
262         }
263
264         void setUpParams( const CppUnitMini::TestCfg& cfg ) {
265             s_nThreadCount = cfg.getULong("ThreadCount", 8 );
266             s_nQueueSize = cfg.getULong("QueueSize", 20000000 );
267         }
268
269     protected:
270         CDSUNIT_DECLARE_MoirQueue( SimpleValue )
271         CDSUNIT_DECLARE_MSQueue( SimpleValue )
272         CDSUNIT_DECLARE_OptimisticQueue( SimpleValue )
273         CDSUNIT_DECLARE_BasketQueue( SimpleValue )
274         CDSUNIT_DECLARE_FCQueue( SimpleValue )
275         CDSUNIT_DECLARE_FCDeque( SimpleValue )
276         CDSUNIT_DECLARE_SegmentedQueue( SimpleValue )
277         CDSUNIT_DECLARE_RWQueue( SimpleValue )
278         CDSUNIT_DECLARE_MichaelDeque( SimpleValue )
279         CDSUNIT_DECLARE_TsigasCysleQueue( SimpleValue )
280         CDSUNIT_DECLARE_VyukovMPMCCycleQueue( SimpleValue )
281         CDSUNIT_DECLARE_StdQueue( SimpleValue )
282
283         CPPUNIT_TEST_SUITE(Queue_Random)
284             CDSUNIT_TEST_MoirQueue
285             CDSUNIT_TEST_MSQueue
286             CDSUNIT_TEST_OptimisticQueue
287             CDSUNIT_TEST_BasketQueue
288             CDSUNIT_TEST_FCQueue
289             CDSUNIT_TEST_FCDeque
290             CDSUNIT_TEST_SegmentedQueue
291             CDSUNIT_TEST_RWQueue
292             CDSUNIT_TEST_MichaelDeque
293             CDSUNIT_TEST_TsigasCysleQueue
294             CDSUNIT_TEST_VyukovMPMCCycleQueue
295             CDSUNIT_TEST_StdQueue
296         CPPUNIT_TEST_SUITE_END();
297     };
298
299 } // namespace queue
300
301 CPPUNIT_TEST_SUITE_REGISTRATION(queue::Queue_Random);