Move libcds 1.6.0 from SVN
[libcds.git] / tests / unit / queue / queue_push.cpp
1 //$$CDS-header$$
2
3 #include "cppunit/thread.h"
4 #include "queue/queue_type.h"
5 #include "queue/queue_defs.h"
6
7
8 // Multi-threaded queue test for push operation
9 namespace queue {
10
11 #define TEST_CASE( Q, V )       void Q() { test< Types<V>::Q >(); }
12 #define TEST_BOUNDED( Q, V )    void Q() { test_bounded< Types<V>::Q >(); }
13 #define TEST_SEGMENTED( Q, V )  void Q() { test_segmented< Types<V>::Q >(); }
14
15     namespace ns_Queue_Push {
16         static size_t s_nThreadCount = 8;
17         static size_t s_nQueueSize = 20000000 ;   // no more than 20 million records
18
19         struct SimpleValue {
20             size_t      nNo;
21
22             SimpleValue(): nNo(0) {}
23             SimpleValue( size_t n ): nNo(n) {}
24             size_t getNo() const { return  nNo; }
25         };
26     }
27     using namespace ns_Queue_Push;
28
29     class Queue_Push: public CppUnitMini::TestCase
30     {
31         template <class Queue>
32         class Thread: public CppUnitMini::TestThread
33         {
34             virtual TestThread *    clone()
35             {
36                 return new Thread( *this );
37             }
38         public:
39             Queue&              m_Queue;
40             double              m_fTime;
41             size_t              m_nStartItem;
42             size_t              m_nEndItem;
43             size_t              m_nPushError;
44
45         public:
46             Thread( CppUnitMini::ThreadPool& pool, Queue& q )
47                 : CppUnitMini::TestThread( pool )
48                 , m_Queue( q )
49             {}
50             Thread( Thread& src )
51                 : CppUnitMini::TestThread( src )
52                 , m_Queue( src.m_Queue )
53             {}
54
55             Queue_Push&  getTest()
56             {
57                 return reinterpret_cast<Queue_Push&>( m_Pool.m_Test );
58             }
59
60             virtual void init()
61             {
62                 cds::threading::Manager::attachThread();
63             }
64             virtual void fini()
65             {
66                 cds::threading::Manager::detachThread();
67             }
68
69             virtual void test()
70             {
71                 m_fTime = m_Timer.duration();
72
73                 m_nPushError = 0;
74                 for ( size_t nItem = m_nStartItem; nItem < m_nEndItem; ++nItem ) {
75                     if ( !m_Queue.push( nItem ))
76                         ++m_nPushError;
77                 }
78
79                 m_fTime = m_Timer.duration() - m_fTime;
80             }
81         };
82
83     protected:
84         template <class Queue>
85         void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue  )
86         {
87             size_t nThreadItems = s_nQueueSize / s_nThreadCount;
88             double fTime = 0;
89             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
90                 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
91                 fTime += pThread->m_fTime;
92                 if ( pThread->m_nPushError != 0 )
93                     CPPUNIT_MSG("     ERROR: thread push error count=" << pThread->m_nPushError );
94             }
95             CPPUNIT_MSG( "     Duration=" << (fTime / s_nThreadCount) );
96             CPPUNIT_CHECK( !testQueue.empty() )
97
98             size_t * arr = new size_t[ s_nQueueSize ];
99             memset(arr, 0, sizeof(arr[0]) * s_nQueueSize );
100
101             cds::OS::Timer      timer;
102             CPPUNIT_MSG( "   Pop (single-threaded)..." );
103             size_t nPopped = 0;
104             SimpleValue val = SimpleValue();
105             while ( testQueue.pop( val )) {
106                 nPopped++;
107                 ++arr[ val.getNo() ];
108             }
109             CPPUNIT_MSG( "     Duration=" << timer.duration() );
110
111             size_t nTotalItems = nThreadItems * s_nThreadCount;
112             size_t nError = 0;
113             for ( size_t i = 0; i < nTotalItems; ++i ) {
114                 if ( arr[i] != 1 ) {
115                     CPPUNIT_MSG( "   ERROR: Item " << i << " has not been pushed" );
116                     CPPUNIT_ASSERT( ++nError <= 10 );
117                 }
118             }
119
120             delete [] arr;
121         }
122
123         template <class Queue>
124         void test()
125         {
126             Queue testQueue;
127
128             CppUnitMini::ThreadPool pool( *this );
129             pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
130
131             size_t nStart = 0;
132             size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
133             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
134                 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
135                 pThread->m_nStartItem = nStart;
136                 nStart += nThreadItemCount;
137                 pThread->m_nEndItem = nStart;
138             }
139
140             CPPUNIT_MSG( "   Push test, thread count=" << s_nThreadCount << " ...");
141             pool.run();
142
143             analyze( pool, testQueue );
144
145             CPPUNIT_MSG( testQueue.statistics() );
146         }
147
148         template <class Queue>
149         void test_bounded()
150         {
151             size_t nStart = 0;
152             size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
153
154             Queue testQueue( s_nQueueSize );
155
156             CppUnitMini::ThreadPool pool( *this );
157             pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
158
159             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
160                 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
161                 pThread->m_nStartItem = nStart;
162                 nStart += nThreadItemCount;
163                 pThread->m_nEndItem = nStart;
164             }
165
166             CPPUNIT_MSG( "   Push test, thread count=" << s_nThreadCount << " ...");
167             pool.run();
168
169             analyze( pool, testQueue );
170
171             CPPUNIT_MSG( testQueue.statistics() );
172         }
173
174         template <class Queue>
175         void test_segmented()
176         {
177             CPPUNIT_MSG( "   Push test, thread count=" << s_nThreadCount << " ...");
178             for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
179                 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
180
181                 Queue testQueue( nSegmentSize );
182
183                 CppUnitMini::ThreadPool pool( *this );
184                 pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
185
186                 size_t nStart = 0;
187                 size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
188                 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
189                     Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
190                     pThread->m_nStartItem = nStart;
191                     nStart += nThreadItemCount;
192                     pThread->m_nEndItem = nStart;
193                 }
194
195                 pool.run();
196
197                 analyze( pool, testQueue );
198
199                 CPPUNIT_MSG( testQueue.statistics() );
200             }
201         }
202
203         void setUpParams( const CppUnitMini::TestCfg& cfg ) {
204             s_nThreadCount = cfg.getULong("ThreadCount", 8 );
205             s_nQueueSize = cfg.getULong("QueueSize", 20000000 );
206         }
207
208     protected:
209         CDSUNIT_DECLARE_MoirQueue( SimpleValue )
210         CDSUNIT_DECLARE_MSQueue( SimpleValue )
211         CDSUNIT_DECLARE_OptimisticQueue( SimpleValue )
212         CDSUNIT_DECLARE_BasketQueue( SimpleValue )
213         CDSUNIT_DECLARE_FCQueue( SimpleValue )
214         CDSUNIT_DECLARE_FCDeque( SimpleValue )
215         CDSUNIT_DECLARE_SegmentedQueue( SimpleValue )
216         CDSUNIT_DECLARE_RWQueue( SimpleValue )
217         CDSUNIT_DECLARE_MichaelDeque( SimpleValue )
218         CDSUNIT_DECLARE_TsigasCysleQueue( SimpleValue )
219         CDSUNIT_DECLARE_VyukovMPMCCycleQueue( SimpleValue )
220         CDSUNIT_DECLARE_StdQueue( SimpleValue )
221
222         CPPUNIT_TEST_SUITE(Queue_Push)
223             CDSUNIT_TEST_MoirQueue
224             CDSUNIT_TEST_MSQueue
225             CDSUNIT_TEST_OptimisticQueue
226             CDSUNIT_TEST_BasketQueue
227             CDSUNIT_TEST_FCQueue
228             CDSUNIT_TEST_FCDeque
229             CDSUNIT_TEST_SegmentedQueue
230             CDSUNIT_TEST_RWQueue
231             CDSUNIT_TEST_MichaelDeque
232             CDSUNIT_TEST_TsigasCysleQueue
233             CDSUNIT_TEST_VyukovMPMCCycleQueue
234             CDSUNIT_TEST_StdQueue
235         CPPUNIT_TEST_SUITE_END();
236     };
237
238 } // namespace queue
239
240 CPPUNIT_TEST_SUITE_REGISTRATION(queue::Queue_Push);