3 #include "cppunit/thread.h"
4 #include "pqueue/pqueue_item.h"
5 #include "pqueue/pqueue_type.h"
12 #define TEST_CASE( Q ) void Q() { test< Types<pqueue::SimpleValue>::Q >(); }
13 #define TEST_BOUNDED( Q ) void Q() { test_bounded< Types<pqueue::SimpleValue>::Q >(); }
16 static size_t s_nPushThreadCount = 4;
17 static size_t s_nPopThreadCount = 4;
18 static size_t s_nQueueSize = 2000000;
24 class PQueue_PushPop: public CppUnitMini::TestCase
27 template <class PQueue>
28 class Pusher: public CppUnitMini::TestThread
30 virtual TestThread * clone()
32 return new Pusher( *this );
38 typedef std::vector<size_t> array_type;
42 Pusher( CppUnitMini::ThreadPool& pool, PQueue& q )
43 : CppUnitMini::TestThread( pool )
47 : CppUnitMini::TestThread( src )
48 , m_Queue( src.m_Queue )
51 PQueue_PushPop& getTest()
53 return static_cast<PQueue_PushPop&>( m_Pool.m_Test );
58 cds::threading::Manager::attachThread();
62 cds::threading::Manager::detachThread();
69 for ( array_type::const_iterator it = m_arr.begin(); it != m_arr.end(); ++it ) {
70 if ( !m_Queue.push( SimpleValue( *it ) ))
74 getTest().end_pusher();
77 void prepare( size_t nStart, size_t nEnd )
79 m_arr.reserve( nEnd - nStart );
80 for ( size_t i = nStart; i < nEnd; ++i )
82 shuffle( m_arr.begin(), m_arr.end() );
86 template <class PQueue>
87 class Popper: public CppUnitMini::TestThread
89 virtual TestThread * clone()
91 return new Popper( *this );
98 typedef std::vector<size_t> array_type;
102 Popper( CppUnitMini::ThreadPool& pool, PQueue& q )
103 : CppUnitMini::TestThread( pool )
106 Popper( Popper& src )
107 : CppUnitMini::TestThread( src )
108 , m_Queue( src.m_Queue )
111 PQueue_PushPop& getTest()
113 return static_cast<PQueue_PushPop&>( m_Pool.m_Test );
118 cds::threading::Manager::attachThread();
122 cds::threading::Manager::detachThread();
131 while ( getTest().pushing() || !m_Queue.empty() ) {
132 if ( m_Queue.pop( val ))
140 atomics::atomic<size_t> m_nPusherCount;
143 m_nPusherCount.fetch_sub( 1, atomics::memory_order_relaxed );
147 return m_nPusherCount.load( atomics::memory_order_relaxed ) != 0;
151 template <class PQueue>
155 test_with( testQueue );
158 template <class PQueue>
161 std::unique_ptr<PQueue> pq( new PQueue(s_nQueueSize) );
162 test_with( *pq.get() );
165 template <class PQueue>
166 void test_with( PQueue& testQueue )
168 size_t const nThreadItemCount = s_nQueueSize / s_nPushThreadCount;
170 CppUnitMini::ThreadPool pool( *this );
171 pool.add( new Pusher<PQueue>( pool, testQueue ), s_nPushThreadCount );
174 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
175 Pusher<PQueue> * pThread = static_cast<Pusher<PQueue> *>(*it);
176 pThread->prepare( nStart, nStart + nThreadItemCount );
177 nStart += nThreadItemCount;
180 pool.add( new Popper<PQueue>( pool, testQueue ), s_nPopThreadCount );
182 m_nPusherCount.store( s_nPushThreadCount, atomics::memory_order_release );
183 CPPUNIT_MSG( " push thread count=" << s_nPushThreadCount << " pop thread count=" << s_nPopThreadCount
184 << ", item count=" << nThreadItemCount * s_nPushThreadCount << " ..." );
186 CPPUNIT_MSG( " Duration=" << pool.avgDuration() );
189 size_t nTotalPopped = 0;
190 size_t nPushFailed = 0;
191 size_t nPopFailed = 0;
192 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
193 Popper<PQueue> * pPopper = dynamic_cast<Popper<PQueue> *>(*it);
195 nTotalPopped += pPopper->m_nPopSuccess;
196 nPopFailed += pPopper->m_nPopFailed;
199 Pusher<PQueue> * pPusher = dynamic_cast<Pusher<PQueue> *>(*it);
201 nPushFailed += pPusher->m_nPushError;
205 CPPUNIT_MSG( " Total: popped=" << nTotalPopped << ", empty pop=" << nPopFailed << ", push error=" << nPushFailed );
206 CPPUNIT_CHECK( nTotalPopped == nThreadItemCount * s_nPushThreadCount );
207 CPPUNIT_CHECK( nPushFailed == 0 );
209 check_statistics( testQueue.statistics() );
210 CPPUNIT_MSG( testQueue.statistics() );
213 void setUpParams( const CppUnitMini::TestCfg& cfg ) {
214 s_nPushThreadCount = cfg.getULong("PushThreadCount", (unsigned long) s_nPushThreadCount );
215 s_nPopThreadCount = cfg.getULong("PopThreadCount", (unsigned long) s_nPopThreadCount );
216 s_nQueueSize = cfg.getULong("QueueSize", (unsigned long) s_nQueueSize );
220 #include "pqueue/pqueue_defs.h"
221 CDSUNIT_DECLARE_MSPriorityQueue
222 CDSUNIT_DECLARE_EllenBinTree
223 CDSUNIT_DECLARE_SkipList
224 CDSUNIT_DECLARE_FCPriorityQueue
225 CDSUNIT_DECLARE_StdPQueue
227 CPPUNIT_TEST_SUITE(PQueue_PushPop)
228 CDSUNIT_TEST_MSPriorityQueue
229 CDSUNIT_TEST_EllenBinTree
230 CDSUNIT_TEST_SkipList
231 CDSUNIT_TEST_FCPriorityQueue
232 CDUNIT_TEST_StdPQueue
233 CPPUNIT_TEST_SUITE_END();
238 CPPUNIT_TEST_SUITE_REGISTRATION(pqueue::PQueue_PushPop);