2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "cppunit/thread.h"
32 #include "queue/queue_type.h"
33 #include "queue/queue_defs.h"
36 // Multi-threaded queue test for push operation
39 #define TEST_CASE( Q, V ) void Q() { test< Types<V>::Q >(); }
40 #define TEST_BOUNDED( Q, V ) void Q() { test_bounded< Types<V>::Q >(); }
41 #define TEST_SEGMENTED( Q, V ) void Q() { test_segmented< Types<V>::Q >(); }
43 namespace ns_Queue_Push {
44 static size_t s_nThreadCount = 8;
45 static size_t s_nQueueSize = 20000000 ; // no more than 20 million records
50 SimpleValue(): nNo(0) {}
51 SimpleValue( size_t n ): nNo(n) {}
52 size_t getNo() const { return nNo; }
55 using namespace ns_Queue_Push;
57 class Queue_Push: public CppUnitMini::TestCase
59 template <class Queue>
60 class Thread: public CppUnitMini::TestThread
62 virtual TestThread * clone()
64 return new Thread( *this );
74 Thread( CppUnitMini::ThreadPool& pool, Queue& q )
75 : CppUnitMini::TestThread( pool )
79 : CppUnitMini::TestThread( src )
80 , m_Queue( src.m_Queue )
85 return reinterpret_cast<Queue_Push&>( m_Pool.m_Test );
90 cds::threading::Manager::attachThread();
94 cds::threading::Manager::detachThread();
99 m_fTime = m_Timer.duration();
102 for ( size_t nItem = m_nStartItem; nItem < m_nEndItem; ++nItem ) {
103 if ( !m_Queue.push( nItem ))
107 m_fTime = m_Timer.duration() - m_fTime;
112 template <class Queue>
113 void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue )
115 size_t nThreadItems = s_nQueueSize / s_nThreadCount;
117 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
118 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
119 fTime += pThread->m_fTime;
120 if ( pThread->m_nPushError != 0 )
121 CPPUNIT_MSG(" ERROR: thread push error count=" << pThread->m_nPushError );
123 CPPUNIT_MSG( " Duration=" << (fTime / s_nThreadCount) );
124 CPPUNIT_CHECK( !testQueue.empty() )
126 size_t * arr = new size_t[ s_nQueueSize ];
127 memset(arr, 0, sizeof(arr[0]) * s_nQueueSize );
129 cds::OS::Timer timer;
130 CPPUNIT_MSG( " Pop (single-threaded)..." );
132 SimpleValue val = SimpleValue();
133 while ( testQueue.pop( val )) {
135 ++arr[ val.getNo() ];
137 CPPUNIT_MSG( " Duration=" << timer.duration() );
139 size_t nTotalItems = nThreadItems * s_nThreadCount;
141 for ( size_t i = 0; i < nTotalItems; ++i ) {
143 CPPUNIT_MSG( " ERROR: Item " << i << " has not been pushed" );
144 CPPUNIT_ASSERT( ++nError <= 10 );
151 template <class Queue>
156 CppUnitMini::ThreadPool pool( *this );
157 pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
160 size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
161 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
162 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
163 pThread->m_nStartItem = nStart;
164 nStart += nThreadItemCount;
165 pThread->m_nEndItem = nStart;
168 CPPUNIT_MSG( " Push test, thread count=" << s_nThreadCount << " ...");
171 analyze( pool, testQueue );
173 CPPUNIT_MSG( testQueue.statistics() );
176 template <class Queue>
180 size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
182 Queue testQueue( s_nQueueSize );
184 CppUnitMini::ThreadPool pool( *this );
185 pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
187 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
188 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
189 pThread->m_nStartItem = nStart;
190 nStart += nThreadItemCount;
191 pThread->m_nEndItem = nStart;
194 CPPUNIT_MSG( " Push test, thread count=" << s_nThreadCount << " ...");
197 analyze( pool, testQueue );
199 CPPUNIT_MSG( testQueue.statistics() );
202 template <class Queue>
203 void test_segmented()
205 CPPUNIT_MSG( " Push test, thread count=" << s_nThreadCount << " ...");
206 for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
207 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
209 Queue testQueue( nSegmentSize );
211 CppUnitMini::ThreadPool pool( *this );
212 pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
215 size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
216 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
217 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
218 pThread->m_nStartItem = nStart;
219 nStart += nThreadItemCount;
220 pThread->m_nEndItem = nStart;
225 analyze( pool, testQueue );
227 CPPUNIT_MSG( testQueue.statistics() );
231 void setUpParams( const CppUnitMini::TestCfg& cfg ) {
232 s_nThreadCount = cfg.getULong("ThreadCount", 8 );
233 s_nQueueSize = cfg.getULong("QueueSize", 20000000 );
237 CDSUNIT_DECLARE_MoirQueue( SimpleValue )
238 CDSUNIT_DECLARE_MSQueue( SimpleValue )
239 CDSUNIT_DECLARE_OptimisticQueue( SimpleValue )
240 CDSUNIT_DECLARE_BasketQueue( SimpleValue )
241 CDSUNIT_DECLARE_FCQueue( SimpleValue )
242 CDSUNIT_DECLARE_FCDeque( SimpleValue )
243 CDSUNIT_DECLARE_SegmentedQueue( SimpleValue )
244 CDSUNIT_DECLARE_RWQueue( SimpleValue )
245 CDSUNIT_DECLARE_TsigasCycleQueue( SimpleValue )
246 CDSUNIT_DECLARE_VyukovMPMCCycleQueue( SimpleValue )
247 CDSUNIT_DECLARE_StdQueue( SimpleValue )
249 CPPUNIT_TEST_SUITE(Queue_Push)
250 CDSUNIT_TEST_MoirQueue
252 CDSUNIT_TEST_OptimisticQueue
253 CDSUNIT_TEST_BasketQueue
256 CDSUNIT_TEST_SegmentedQueue
258 CDSUNIT_TEST_TsigasCycleQueue
259 CDSUNIT_TEST_VyukovMPMCCycleQueue
260 CDSUNIT_TEST_StdQueue
261 CPPUNIT_TEST_SUITE_END();
266 CPPUNIT_TEST_SUITE_REGISTRATION(queue::Queue_Push);