2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "cppunit/thread.h"
32 #include "pqueue/pqueue_item.h"
33 #include "pqueue/pqueue_type.h"
40 #define TEST_CASE( Q ) void Q() { test< Types<pqueue::SimpleValue>::Q >(); }
41 #define TEST_BOUNDED( Q ) void Q() { test_bounded< Types<pqueue::SimpleValue>::Q >(); }
44 static size_t s_nPushThreadCount = 4;
45 static size_t s_nPopThreadCount = 4;
46 static size_t s_nQueueSize = 2000000;
52 class PQueue_PushPop: public CppUnitMini::TestCase
55 template <class PQueue>
56 class Pusher: public CppUnitMini::TestThread
58 virtual TestThread * clone()
60 return new Pusher( *this );
66 typedef std::vector<size_t> array_type;
70 Pusher( CppUnitMini::ThreadPool& pool, PQueue& q )
71 : CppUnitMini::TestThread( pool )
75 : CppUnitMini::TestThread( src )
76 , m_Queue( src.m_Queue )
79 PQueue_PushPop& getTest()
81 return static_cast<PQueue_PushPop&>( m_Pool.m_Test );
86 cds::threading::Manager::attachThread();
90 cds::threading::Manager::detachThread();
97 for ( array_type::const_iterator it = m_arr.begin(); it != m_arr.end(); ++it ) {
98 if ( !m_Queue.push( SimpleValue( *it ) ))
102 getTest().end_pusher();
105 void prepare( size_t nStart, size_t nEnd )
107 m_arr.reserve( nEnd - nStart );
108 for ( size_t i = nStart; i < nEnd; ++i )
109 m_arr.push_back( i );
110 shuffle( m_arr.begin(), m_arr.end() );
114 template <class PQueue>
115 class Popper: public CppUnitMini::TestThread
117 virtual TestThread * clone()
119 return new Popper( *this );
123 size_t m_nPopSuccess;
126 typedef std::vector<size_t> array_type;
130 Popper( CppUnitMini::ThreadPool& pool, PQueue& q )
131 : CppUnitMini::TestThread( pool )
134 Popper( Popper& src )
135 : CppUnitMini::TestThread( src )
136 , m_Queue( src.m_Queue )
139 PQueue_PushPop& getTest()
141 return static_cast<PQueue_PushPop&>( m_Pool.m_Test );
146 cds::threading::Manager::attachThread();
150 cds::threading::Manager::detachThread();
159 while ( getTest().pushing() || !m_Queue.empty() ) {
160 if ( m_Queue.pop( val ))
168 atomics::atomic<size_t> m_nPusherCount;
171 m_nPusherCount.fetch_sub( 1, atomics::memory_order_relaxed );
175 return m_nPusherCount.load( atomics::memory_order_relaxed ) != 0;
179 template <class PQueue>
183 test_with( testQueue );
186 template <class PQueue>
189 std::unique_ptr<PQueue> pq( new PQueue(s_nQueueSize) );
190 test_with( *pq.get() );
193 template <class PQueue>
194 void test_with( PQueue& testQueue )
196 size_t const nThreadItemCount = s_nQueueSize / s_nPushThreadCount;
198 CppUnitMini::ThreadPool pool( *this );
199 pool.add( new Pusher<PQueue>( pool, testQueue ), s_nPushThreadCount );
202 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
203 Pusher<PQueue> * pThread = static_cast<Pusher<PQueue> *>(*it);
204 pThread->prepare( nStart, nStart + nThreadItemCount );
205 nStart += nThreadItemCount;
208 pool.add( new Popper<PQueue>( pool, testQueue ), s_nPopThreadCount );
210 m_nPusherCount.store( s_nPushThreadCount, atomics::memory_order_release );
211 CPPUNIT_MSG( " push thread count=" << s_nPushThreadCount << " pop thread count=" << s_nPopThreadCount
212 << ", item count=" << nThreadItemCount * s_nPushThreadCount << " ..." );
214 CPPUNIT_MSG( " Duration=" << pool.avgDuration() );
217 size_t nTotalPopped = 0;
218 size_t nPushFailed = 0;
219 size_t nPopFailed = 0;
220 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
221 Popper<PQueue> * pPopper = dynamic_cast<Popper<PQueue> *>(*it);
223 nTotalPopped += pPopper->m_nPopSuccess;
224 nPopFailed += pPopper->m_nPopFailed;
227 Pusher<PQueue> * pPusher = dynamic_cast<Pusher<PQueue> *>(*it);
229 nPushFailed += pPusher->m_nPushError;
233 CPPUNIT_MSG( " Total: popped=" << nTotalPopped << ", empty pop=" << nPopFailed << ", push error=" << nPushFailed );
234 CPPUNIT_CHECK( nTotalPopped == nThreadItemCount * s_nPushThreadCount );
235 CPPUNIT_CHECK( nPushFailed == 0 );
237 check_statistics( testQueue.statistics() );
238 CPPUNIT_MSG( testQueue.statistics() );
241 void setUpParams( const CppUnitMini::TestCfg& cfg ) {
242 s_nPushThreadCount = cfg.getULong("PushThreadCount", (unsigned long) s_nPushThreadCount );
243 s_nPopThreadCount = cfg.getULong("PopThreadCount", (unsigned long) s_nPopThreadCount );
244 s_nQueueSize = cfg.getULong("QueueSize", (unsigned long) s_nQueueSize );
248 #include "pqueue/pqueue_defs.h"
249 CDSUNIT_DECLARE_MSPriorityQueue
250 CDSUNIT_DECLARE_EllenBinTree
251 CDSUNIT_DECLARE_SkipList
252 CDSUNIT_DECLARE_FCPriorityQueue
253 CDSUNIT_DECLARE_StdPQueue
255 CPPUNIT_TEST_SUITE(PQueue_PushPop)
256 CDSUNIT_TEST_MSPriorityQueue
257 CDSUNIT_TEST_EllenBinTree
258 CDSUNIT_TEST_SkipList
259 CDSUNIT_TEST_FCPriorityQueue
260 CDUNIT_TEST_StdPQueue
261 CPPUNIT_TEST_SUITE_END();
266 CPPUNIT_TEST_SUITE_REGISTRATION(pqueue::PQueue_PushPop);