Added copyright and license
[libcds.git] / tests / unit / queue / queue_random.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #include "cppunit/thread.h"
32 #include "queue/queue_type.h"
33 #include "queue/queue_defs.h"
34
35
36 #include <vector>
37 #include <boost/type_traits/is_base_of.hpp>
38
39 // Multi-threaded queue test for random push/pop operation
40 namespace queue {
41
42 #define TEST_CASE( Q, V )       void Q() { test< Types<V>::Q >(); }
43 #define TEST_BOUNDED( Q, V )    TEST_CASE( Q, V )
44 #define TEST_SEGMENTED( Q, V )  void Q() { test_segmented< Types< V >::Q >(); }
45
46     namespace ns_Queue_Random {
47         static size_t s_nThreadCount = 16;
48         static size_t s_nQueueSize = 10000000;
49
50         struct SimpleValue {
51             size_t      nNo;
52             size_t      nThread;
53
54             SimpleValue() {}
55             SimpleValue( size_t n ): nNo(n) {}
56             size_t getNo() const { return  nNo; }
57         };
58     }
59
60     using namespace ns_Queue_Random;
61
62     class Queue_Random: public CppUnitMini::TestCase
63     {
64         typedef CppUnitMini::TestCase base_class;
65
66         template <class Queue>
67         class Thread: public CppUnitMini::TestThread
68         {
69             virtual TestThread *    clone()
70             {
71                 return new Thread( *this );
72             }
73         public:
74             Queue&              m_Queue;
75             double              m_fTime;
76
77             size_t  m_nPushCount;
78             size_t  m_nPopCount;
79             size_t  m_nEmptyPop;
80
81             size_t  m_nUndefWriter;
82             size_t  m_nRepeatValue;
83             size_t  m_nPushError        ;    // push error count
84
85             std::vector<size_t> m_arrLastRead;
86             std::vector<size_t> m_arrPopCountPerThread;
87
88             size_t const m_nSpread;
89
90         public:
91             Thread( CppUnitMini::ThreadPool& pool, Queue& q, size_t nSpread = 0 )
92                 : CppUnitMini::TestThread( pool )
93                 , m_Queue( q )
94                 , m_nSpread( nSpread )
95             {}
96             Thread( Thread& src )
97                 : CppUnitMini::TestThread( src )
98                 , m_Queue( src.m_Queue )
99                 , m_nSpread( src.m_nSpread )
100             {}
101
102             Queue_Random&  getTest()
103             {
104                 return reinterpret_cast<Queue_Random&>( m_Pool.m_Test );
105             }
106
107             virtual void init()
108             {
109                 cds::threading::Manager::attachThread();
110                 m_nPushCount =
111                     m_nPopCount =
112                     m_nEmptyPop =
113                     m_nUndefWriter =
114                     m_nRepeatValue =
115                     m_nPushError = 0;
116
117                 m_arrLastRead.resize( s_nThreadCount, 0 );
118                 m_arrPopCountPerThread.resize( s_nThreadCount, 0 );
119             }
120             virtual void fini()
121             {
122                 cds::threading::Manager::detachThread();
123             }
124
125             virtual void test()
126             {
127                 size_t const nThreadCount = s_nThreadCount;
128                 size_t const nTotalPush = getTest().m_nThreadPushCount;
129
130                 SimpleValue node;
131
132                 m_fTime = m_Timer.duration();
133
134                 bool bNextPop = false;
135                 while ( m_nPushCount < nTotalPush ) {
136                     if ( !bNextPop && (rand() & 3) != 3 ) {
137                         // push
138                         node.nThread = m_nThreadNo;
139                         node.nNo = ++m_nPushCount;
140                         if ( !m_Queue.push( node )) {
141                             ++m_nPushError;
142                             --m_nPushCount;
143                         }
144
145                     }
146                     else {
147                         // pop
148                         pop( nThreadCount );
149                         bNextPop = false;
150                     }
151                 }
152
153                 size_t nPopLoop = 0;
154                 while ( !m_Queue.empty() && nPopLoop < 1000000 ) {
155                     if ( pop( nThreadCount ) )
156                         nPopLoop = 0;
157                     else
158                         ++nPopLoop;
159                 }
160
161
162                 m_fTime = m_Timer.duration() - m_fTime;
163             }
164
165             bool pop( size_t nThreadCount )
166             {
167                 SimpleValue node;
168                 node.nThread = -1;
169                 node.nNo = -1;
170                 if ( m_Queue.pop( node )) {
171                     ++m_nPopCount;
172                     if ( node.nThread < nThreadCount ) {
173                         m_arrPopCountPerThread[ node.nThread ] += 1;
174                         if ( m_nSpread ) {
175                             if ( m_arrLastRead[ node.nThread ] > node.nNo ) {
176                                 if ( m_arrLastRead[ node.nThread ] - node.nNo > m_nSpread )
177                                     ++m_nRepeatValue;
178                             }
179                             else if ( m_arrLastRead[ node.nThread ] == node.nNo )
180                                 ++m_nRepeatValue;
181                             m_arrLastRead[ node.nThread ] = node.nNo;
182                         }
183                         else {
184                             if ( m_arrLastRead[ node.nThread ] < node.nNo ) {
185                                 m_arrLastRead[ node.nThread ] = node.nNo;
186                             }
187                             else
188                                 ++m_nRepeatValue;
189                         }
190
191                         //if ( node.nNo < m_Test.m_nPushCount )
192                         //    m_Test.m_pRead[ node.nWriter ][ node.nNo ] = node.nNo;
193                     }
194                     else {
195                         ++m_nUndefWriter;
196                     }
197                 }
198                 else {
199                     ++m_nEmptyPop;
200                     return false;
201                 }
202                 return true;
203             }
204         };
205
206     protected:
207         size_t  m_nThreadPushCount;
208
209     protected:
210         template <class Queue>
211         void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue  )
212         {
213             CPPUNIT_CHECK( testQueue.empty() );
214
215             std::vector< size_t > arrPushCount;
216             arrPushCount.resize( s_nThreadCount, 0 );
217
218             size_t nPushTotal = 0;
219             size_t nPopTotal  = 0;
220             double fTime = 0;
221             size_t nPushError = 0;
222
223             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
224                 Thread<Queue> * pThread = static_cast<Thread<Queue> *>( *it );
225                 CPPUNIT_CHECK( pThread->m_nUndefWriter == 0 );
226                 CPPUNIT_CHECK_EX( pThread->m_nRepeatValue == 0, "nRepeatValue=" << pThread->m_nRepeatValue );
227                 if ( !boost::is_base_of<cds::bounded_container, Queue>::value ) {
228                     CPPUNIT_CHECK( pThread->m_nPushError == 0 );
229                 }
230                 else
231                     nPushError += pThread->m_nPushError;
232
233                 arrPushCount[ pThread->m_nThreadNo ] += pThread->m_nPushCount;
234
235                 nPushTotal += pThread->m_nPushCount;
236                 nPopTotal += pThread->m_nPopCount;
237                 fTime += pThread->m_fTime;
238             }
239
240             CPPUNIT_MSG( "     Duration=" << (fTime /= s_nThreadCount) );
241             if ( boost::is_base_of<cds::bounded_container, Queue>::value ) {
242                 CPPUNIT_MSG( "         push error (when queue is full)=" << nPushError );
243             }
244
245             size_t nTotalItems = m_nThreadPushCount * s_nThreadCount;
246
247             CPPUNIT_CHECK_EX( nPushTotal == nTotalItems, "nPushTotal=" << nPushTotal << ", nTotalItems=" << nTotalItems );
248             CPPUNIT_CHECK_EX( nPopTotal == nTotalItems, "nPopTotal=" << nPopTotal << ", nTotalItems=" << nTotalItems );
249
250             for ( size_t i = 0; i < s_nThreadCount; ++i )
251                 CPPUNIT_CHECK( arrPushCount[i] == m_nThreadPushCount );
252         }
253
254         template <class Queue>
255         void test()
256         {
257             CPPUNIT_MSG( "Random push/pop test\n    thread count=" << s_nThreadCount << ", push count=" << s_nQueueSize << " ..." );
258
259             m_nThreadPushCount = s_nQueueSize / s_nThreadCount;
260
261             Queue testQueue;
262             CppUnitMini::ThreadPool pool( *this );
263             pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
264
265             pool.run();
266
267             analyze( pool, testQueue );
268             CPPUNIT_MSG( testQueue.statistics() );
269         }
270
271         template <class Queue>
272         void test_segmented()
273         {
274             CPPUNIT_MSG( "Random push/pop test\n    thread count=" << s_nThreadCount << ", push count=" << s_nQueueSize << " ..." );
275
276             m_nThreadPushCount = s_nQueueSize / s_nThreadCount;
277
278             for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
279                 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
280
281                 Queue testQueue( nSegmentSize );
282                 CppUnitMini::ThreadPool pool( *this );
283                 pool.add( new Thread<Queue>( pool, testQueue, nSegmentSize * 2 ), s_nThreadCount );
284
285                 pool.run();
286
287                 analyze( pool, testQueue );
288                 CPPUNIT_MSG( testQueue.statistics() );
289             }
290         }
291
292         void setUpParams( const CppUnitMini::TestCfg& cfg ) {
293             s_nThreadCount = cfg.getULong("ThreadCount", 8 );
294             s_nQueueSize = cfg.getULong("QueueSize", 20000000 );
295         }
296
297     protected:
298         CDSUNIT_DECLARE_MoirQueue( SimpleValue )
299         CDSUNIT_DECLARE_MSQueue( SimpleValue )
300         CDSUNIT_DECLARE_OptimisticQueue( SimpleValue )
301         CDSUNIT_DECLARE_BasketQueue( SimpleValue )
302         CDSUNIT_DECLARE_FCQueue( SimpleValue )
303         CDSUNIT_DECLARE_FCDeque( SimpleValue )
304         CDSUNIT_DECLARE_SegmentedQueue( SimpleValue )
305         CDSUNIT_DECLARE_RWQueue( SimpleValue )
306         CDSUNIT_DECLARE_TsigasCycleQueue( SimpleValue )
307         CDSUNIT_DECLARE_VyukovMPMCCycleQueue( SimpleValue )
308         CDSUNIT_DECLARE_StdQueue( SimpleValue )
309
310         CPPUNIT_TEST_SUITE(Queue_Random)
311             CDSUNIT_TEST_MoirQueue
312             CDSUNIT_TEST_MSQueue
313             CDSUNIT_TEST_OptimisticQueue
314             CDSUNIT_TEST_BasketQueue
315             CDSUNIT_TEST_FCQueue
316             CDSUNIT_TEST_FCDeque
317             CDSUNIT_TEST_SegmentedQueue
318             CDSUNIT_TEST_RWQueue
319             CDSUNIT_TEST_TsigasCycleQueue
320             CDSUNIT_TEST_VyukovMPMCCycleQueue
321             CDSUNIT_TEST_StdQueue
322         CPPUNIT_TEST_SUITE_END();
323     };
324
325 } // namespace queue
326
327 CPPUNIT_TEST_SUITE_REGISTRATION(queue::Queue_Random);