MSQueue, MoirQueue refactoring done (issues #1, #2, #3)
[libcds.git] / tests / unit / queue / queue_pop.cpp
1 //$$CDS-header$$
2
3 #include "cppunit/thread.h"
4 #include "queue/queue_type.h"
5 #include "queue/queue_defs.h"
6
7 // Multi-threaded queue test for pop operation
8 namespace queue {
9
10 #define TEST_CASE( Q, V )       void Q() { test< Types<V>::Q >(); }
11 #define TEST_BOUNDED( Q, V )    void Q() { test_bounded< Types<V>::Q >(); }
12 #define TEST_SEGMENTED( Q, V )  void Q() { test_segmented< Types<V>::Q >(); }
13
14     namespace ns_Queue_Pop {
15         static size_t s_nThreadCount = 8;
16         static size_t s_nQueueSize = 20000000 ;   // no more than 20 million records
17
18         struct SimpleValue {
19             size_t    nNo;
20
21             SimpleValue(): nNo(0) {}
22             SimpleValue( size_t n ): nNo(n) {}
23             size_t getNo() const { return  nNo; }
24         };
25     }
26     using namespace ns_Queue_Pop;
27
28     class Queue_Pop: public CppUnitMini::TestCase
29     {
30         template <class Queue>
31         class Thread: public CppUnitMini::TestThread
32         {
33             virtual TestThread *    clone()
34             {
35                 return new Thread( *this );
36             }
37         public:
38             Queue&              m_Queue;
39             double              m_fTime;
40             long *              m_arr;
41             size_t              m_nPopCount;
42
43         public:
44             Thread( CppUnitMini::ThreadPool& pool, Queue& q )
45                 : CppUnitMini::TestThread( pool )
46                 , m_Queue( q )
47             {
48                 m_arr = new long[s_nQueueSize];
49             }
50             Thread( Thread& src )
51                 : CppUnitMini::TestThread( src )
52                 , m_Queue( src.m_Queue )
53             {
54                 m_arr = new long[s_nQueueSize];
55             }
56             ~Thread()
57             {
58                 delete [] m_arr;
59             }
60
61             Queue_Pop&  getTest()
62             {
63                 return reinterpret_cast<Queue_Pop&>( m_Pool.m_Test );
64             }
65
66             virtual void init()
67             {
68                 cds::threading::Manager::attachThread();
69                 memset(m_arr, 0, sizeof(m_arr[0]) * s_nQueueSize );
70             }
71             virtual void fini()
72             {
73                 cds::threading::Manager::detachThread();
74             }
75
76             virtual void test()
77             {
78                 m_fTime = m_Timer.duration();
79
80                 typedef typename Queue::value_type value_type;
81                 value_type value = value_type();
82                 size_t nPopCount = 0;
83                 while ( m_Queue.pop( value ) ) {
84                     ++m_arr[ value.getNo() ];
85                     ++nPopCount;
86                 }
87                 m_nPopCount = nPopCount;
88
89                 m_fTime = m_Timer.duration() - m_fTime;
90             }
91         };
92
93     protected:
94
95         template <class Queue>
96         void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue  )
97         {
98             size_t * arr = new size_t[ s_nQueueSize ];
99             memset(arr, 0, sizeof(arr[0]) * s_nQueueSize );
100
101             double fTime = 0;
102             size_t nTotalPops = 0;
103             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
104                 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
105                 for ( size_t i = 0; i < s_nQueueSize; ++i )
106                     arr[i] += pThread->m_arr[i];
107                 nTotalPops += pThread->m_nPopCount;
108                 fTime += pThread->m_fTime;
109             }
110             CPPUNIT_MSG( "     Duration=" << (fTime / s_nThreadCount) );
111             CPPUNIT_CHECK_EX( nTotalPops == s_nQueueSize, "Total pop=" << nTotalPops << ", queue size=" << s_nQueueSize);
112             CPPUNIT_CHECK( testQueue.empty() )
113
114             size_t nError = 0;
115             for ( size_t i = 0; i < s_nQueueSize; ++i ) {
116                 if ( arr[i] != 1 ) {
117                     CPPUNIT_MSG( "   ERROR: Item " << i << " has not been popped" );
118                     CPPUNIT_ASSERT( ++nError <= 10 );
119                 }
120             }
121
122             delete [] arr;
123         }
124
125         template <class Queue>
126         void test()
127         {
128             Queue testQueue;
129             CppUnitMini::ThreadPool pool( *this );
130             pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
131
132             CPPUNIT_MSG( "   Create queue size =" << s_nQueueSize << " ...");
133             cds::OS::Timer      timer;
134             for ( size_t i = 0; i < s_nQueueSize; ++i )
135                 testQueue.push( i );
136             CPPUNIT_MSG( "     Duration=" << timer.duration() );
137
138             CPPUNIT_MSG( "   Pop test, thread count=" << s_nThreadCount << " ...");
139             pool.run();
140
141             analyze( pool, testQueue );
142
143             CPPUNIT_MSG( testQueue.statistics() );
144         }
145
146         template <class Queue>
147         void test_bounded()
148         {
149             Queue testQueue( s_nQueueSize );
150             CppUnitMini::ThreadPool pool( *this );
151             pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
152
153             CPPUNIT_MSG( "   Create queue size =" << s_nQueueSize << " ...");
154             cds::OS::Timer      timer;
155             for ( size_t i = 0; i < s_nQueueSize; ++i )
156                 testQueue.push( i );
157             CPPUNIT_MSG( "     Duration=" << timer.duration() );
158
159             CPPUNIT_MSG( "   Pop test, thread count=" << s_nThreadCount << " ...");
160             pool.run();
161
162             analyze( pool, testQueue );
163
164             CPPUNIT_MSG( testQueue.statistics() );
165         }
166
167         template <class Queue>
168         void test_segmented()
169         {
170             for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
171                 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
172
173                 Queue testQueue( nSegmentSize );
174                 CppUnitMini::ThreadPool pool( *this );
175                 pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
176
177                 CPPUNIT_MSG( "   Create queue size =" << s_nQueueSize << " ...");
178                 cds::OS::Timer      timer;
179                 for ( size_t i = 0; i < s_nQueueSize; ++i )
180                     testQueue.push( i );
181                 CPPUNIT_MSG( "     Duration=" << timer.duration() );
182
183                 CPPUNIT_MSG( "   Pop test, thread count=" << s_nThreadCount << " ...");
184                 pool.run();
185
186                 analyze( pool, testQueue );
187
188                 CPPUNIT_MSG( testQueue.statistics() );
189             }
190         }
191
192         void setUpParams( const CppUnitMini::TestCfg& cfg ) {
193             s_nThreadCount = cfg.getULong("ThreadCount", 8 );
194             s_nQueueSize = cfg.getULong("QueueSize", 20000000 );
195         }
196
197     protected:
198         CDSUNIT_DECLARE_MoirQueue( SimpleValue )
199         CDSUNIT_DECLARE_MSQueue( SimpleValue )
200         CDSUNIT_DECLARE_OptimisticQueue( SimpleValue )
201         CDSUNIT_DECLARE_BasketQueue( SimpleValue )
202         CDSUNIT_DECLARE_FCQueue( SimpleValue )
203         CDSUNIT_DECLARE_FCDeque( SimpleValue )
204         CDSUNIT_DECLARE_SegmentedQueue( SimpleValue )
205         CDSUNIT_DECLARE_RWQueue( SimpleValue )
206         CDSUNIT_DECLARE_TsigasCysleQueue( SimpleValue )
207         CDSUNIT_DECLARE_VyukovMPMCCycleQueue( SimpleValue )
208         CDSUNIT_DECLARE_StdQueue( SimpleValue )
209
210         CPPUNIT_TEST_SUITE(Queue_Pop)
211             CDSUNIT_TEST_MoirQueue
212             CDSUNIT_TEST_MSQueue
213             CDSUNIT_TEST_OptimisticQueue
214             CDSUNIT_TEST_BasketQueue
215             CDSUNIT_TEST_FCQueue
216             CDSUNIT_TEST_FCDeque
217             CDSUNIT_TEST_SegmentedQueue
218             CDSUNIT_TEST_RWQueue
219             CDSUNIT_TEST_TsigasCysleQueue
220             CDSUNIT_TEST_VyukovMPMCCycleQueue
221             CDSUNIT_TEST_StdQueue
222         CPPUNIT_TEST_SUITE_END();
223     };
224
225 } // namespace queue
226
227 CPPUNIT_TEST_SUITE_REGISTRATION(queue::Queue_Pop);