2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "cppunit/thread.h"
32 #include "queue/queue_type.h"
33 #include "queue/queue_defs.h"
37 #include <boost/type_traits/is_base_of.hpp>
39 // Multi-threaded queue test for random push/pop operation
42 #define TEST_CASE( Q, V ) void Q() { test< Types<V>::Q >(); }
43 #define TEST_BOUNDED( Q, V ) TEST_CASE( Q, V )
44 #define TEST_SEGMENTED( Q, V ) void Q() { test_segmented< Types< V >::Q >(); }
46 namespace ns_Queue_Random {
47 static size_t s_nThreadCount = 16;
48 static size_t s_nQueueSize = 10000000;
55 SimpleValue( size_t n ): nNo(n) {}
56 size_t getNo() const { return nNo; }
60 using namespace ns_Queue_Random;
62 class Queue_Random: public CppUnitMini::TestCase
64 typedef CppUnitMini::TestCase base_class;
66 template <class Queue>
67 class Thread: public CppUnitMini::TestThread
69 virtual TestThread * clone()
71 return new Thread( *this );
81 size_t m_nUndefWriter;
82 size_t m_nRepeatValue;
83 size_t m_nPushError ; // push error count
85 std::vector<size_t> m_arrLastRead;
86 std::vector<size_t> m_arrPopCountPerThread;
88 size_t const m_nSpread;
91 Thread( CppUnitMini::ThreadPool& pool, Queue& q, size_t nSpread = 0 )
92 : CppUnitMini::TestThread( pool )
94 , m_nSpread( nSpread )
97 : CppUnitMini::TestThread( src )
98 , m_Queue( src.m_Queue )
99 , m_nSpread( src.m_nSpread )
102 Queue_Random& getTest()
104 return reinterpret_cast<Queue_Random&>( m_Pool.m_Test );
109 cds::threading::Manager::attachThread();
117 m_arrLastRead.resize( s_nThreadCount, 0 );
118 m_arrPopCountPerThread.resize( s_nThreadCount, 0 );
122 cds::threading::Manager::detachThread();
127 size_t const nThreadCount = s_nThreadCount;
128 size_t const nTotalPush = getTest().m_nThreadPushCount;
132 m_fTime = m_Timer.duration();
134 bool bNextPop = false;
135 while ( m_nPushCount < nTotalPush ) {
136 if ( !bNextPop && (rand() & 3) != 3 ) {
138 node.nThread = m_nThreadNo;
139 node.nNo = ++m_nPushCount;
140 if ( !m_Queue.push( node )) {
154 while ( !m_Queue.empty() && nPopLoop < 1000000 ) {
155 if ( pop( nThreadCount ) )
162 m_fTime = m_Timer.duration() - m_fTime;
165 bool pop( size_t nThreadCount )
170 if ( m_Queue.pop( node )) {
172 if ( node.nThread < nThreadCount ) {
173 m_arrPopCountPerThread[ node.nThread ] += 1;
175 if ( m_arrLastRead[ node.nThread ] > node.nNo ) {
176 if ( m_arrLastRead[ node.nThread ] - node.nNo > m_nSpread )
179 else if ( m_arrLastRead[ node.nThread ] == node.nNo )
181 m_arrLastRead[ node.nThread ] = node.nNo;
184 if ( m_arrLastRead[ node.nThread ] < node.nNo ) {
185 m_arrLastRead[ node.nThread ] = node.nNo;
191 //if ( node.nNo < m_Test.m_nPushCount )
192 // m_Test.m_pRead[ node.nWriter ][ node.nNo ] = node.nNo;
207 size_t m_nThreadPushCount;
210 template <class Queue>
211 void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue )
213 CPPUNIT_CHECK( testQueue.empty() );
215 std::vector< size_t > arrPushCount;
216 arrPushCount.resize( s_nThreadCount, 0 );
218 size_t nPushTotal = 0;
219 size_t nPopTotal = 0;
221 size_t nPushError = 0;
223 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
224 Thread<Queue> * pThread = static_cast<Thread<Queue> *>( *it );
225 CPPUNIT_CHECK( pThread->m_nUndefWriter == 0 );
226 CPPUNIT_CHECK_EX( pThread->m_nRepeatValue == 0, "nRepeatValue=" << pThread->m_nRepeatValue );
227 if ( !boost::is_base_of<cds::bounded_container, Queue>::value ) {
228 CPPUNIT_CHECK( pThread->m_nPushError == 0 );
231 nPushError += pThread->m_nPushError;
233 arrPushCount[ pThread->m_nThreadNo ] += pThread->m_nPushCount;
235 nPushTotal += pThread->m_nPushCount;
236 nPopTotal += pThread->m_nPopCount;
237 fTime += pThread->m_fTime;
240 CPPUNIT_MSG( " Duration=" << (fTime /= s_nThreadCount) );
241 if ( boost::is_base_of<cds::bounded_container, Queue>::value ) {
242 CPPUNIT_MSG( " push error (when queue is full)=" << nPushError );
245 size_t nTotalItems = m_nThreadPushCount * s_nThreadCount;
247 CPPUNIT_CHECK_EX( nPushTotal == nTotalItems, "nPushTotal=" << nPushTotal << ", nTotalItems=" << nTotalItems );
248 CPPUNIT_CHECK_EX( nPopTotal == nTotalItems, "nPopTotal=" << nPopTotal << ", nTotalItems=" << nTotalItems );
250 for ( size_t i = 0; i < s_nThreadCount; ++i )
251 CPPUNIT_CHECK( arrPushCount[i] == m_nThreadPushCount );
254 template <class Queue>
257 CPPUNIT_MSG( "Random push/pop test\n thread count=" << s_nThreadCount << ", push count=" << s_nQueueSize << " ..." );
259 m_nThreadPushCount = s_nQueueSize / s_nThreadCount;
262 CppUnitMini::ThreadPool pool( *this );
263 pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
267 analyze( pool, testQueue );
268 CPPUNIT_MSG( testQueue.statistics() );
271 template <class Queue>
272 void test_segmented()
274 CPPUNIT_MSG( "Random push/pop test\n thread count=" << s_nThreadCount << ", push count=" << s_nQueueSize << " ..." );
276 m_nThreadPushCount = s_nQueueSize / s_nThreadCount;
278 for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
279 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
281 Queue testQueue( nSegmentSize );
282 CppUnitMini::ThreadPool pool( *this );
283 pool.add( new Thread<Queue>( pool, testQueue, nSegmentSize * 2 ), s_nThreadCount );
287 analyze( pool, testQueue );
288 CPPUNIT_MSG( testQueue.statistics() );
292 void setUpParams( const CppUnitMini::TestCfg& cfg ) {
293 s_nThreadCount = cfg.getULong("ThreadCount", 8 );
294 s_nQueueSize = cfg.getULong("QueueSize", 20000000 );
298 CDSUNIT_DECLARE_MoirQueue( SimpleValue )
299 CDSUNIT_DECLARE_MSQueue( SimpleValue )
300 CDSUNIT_DECLARE_OptimisticQueue( SimpleValue )
301 CDSUNIT_DECLARE_BasketQueue( SimpleValue )
302 CDSUNIT_DECLARE_FCQueue( SimpleValue )
303 CDSUNIT_DECLARE_FCDeque( SimpleValue )
304 CDSUNIT_DECLARE_SegmentedQueue( SimpleValue )
305 CDSUNIT_DECLARE_RWQueue( SimpleValue )
306 CDSUNIT_DECLARE_TsigasCycleQueue( SimpleValue )
307 CDSUNIT_DECLARE_VyukovMPMCCycleQueue( SimpleValue )
308 CDSUNIT_DECLARE_StdQueue( SimpleValue )
310 CPPUNIT_TEST_SUITE(Queue_Random)
311 CDSUNIT_TEST_MoirQueue
313 CDSUNIT_TEST_OptimisticQueue
314 CDSUNIT_TEST_BasketQueue
317 CDSUNIT_TEST_SegmentedQueue
319 CDSUNIT_TEST_TsigasCycleQueue
320 CDSUNIT_TEST_VyukovMPMCCycleQueue
321 CDSUNIT_TEST_StdQueue
322 CPPUNIT_TEST_SUITE_END();
327 CPPUNIT_TEST_SUITE_REGISTRATION(queue::Queue_Random);