issue#11: cds: changed __CDS_ guard prefix to CDSLIB_ for all .h files
[libcds.git] / tests / unit / pqueue / push.cpp
1 //$$CDS-header$$
2
3 #include "cppunit/thread.h"
4 #include "pqueue/pqueue_item.h"
5 #include "pqueue/pqueue_type.h"
6
7 #include <vector>
8 #include <algorithm>    // random_shuffle
9 #include <memory>
10
11 namespace pqueue {
12
13 #define TEST_CASE( Q ) void Q() { test< Types<pqueue::SimpleValue>::Q >(); }
14 #define TEST_BOUNDED( Q ) void Q() { test_bounded< Types<pqueue::SimpleValue>::Q >(); }
15
16     namespace {
17         static size_t s_nThreadCount = 8;
18         static size_t s_nQueueSize = 2000000;
19     }
20 } // namespace pqueue
21
22 namespace pqueue {
23
24     class PQueue_Push: public CppUnitMini::TestCase
25     {
26         template <class PQueue>
27         class Pusher: public CppUnitMini::TestThread
28         {
29             virtual TestThread *    clone()
30             {
31                 return new Pusher( *this );
32             }
33         public:
34             PQueue&             m_Queue;
35             size_t              m_nPushError;
36
37             typedef std::vector<size_t> array_type;
38             array_type          m_arr;
39
40         public:
41             Pusher( CppUnitMini::ThreadPool& pool, PQueue& q )
42                 : CppUnitMini::TestThread( pool )
43                 , m_Queue( q )
44             {}
45             Pusher( Pusher& src )
46                 : CppUnitMini::TestThread( src )
47                 , m_Queue( src.m_Queue )
48             {}
49
50             PQueue_Push&  getTest()
51             {
52                 return static_cast<PQueue_Push&>( m_Pool.m_Test );
53             }
54
55             virtual void init()
56             {
57                 cds::threading::Manager::attachThread();
58             }
59             virtual void fini()
60             {
61                 cds::threading::Manager::detachThread();
62             }
63
64             virtual void test()
65             {
66                 m_nPushError = 0;
67
68                 for ( array_type::const_iterator it = m_arr.begin(); it != m_arr.end(); ++it ) {
69                     if ( !m_Queue.push( SimpleValue( *it ) ))
70                         ++m_nPushError;
71                 }
72             }
73
74             void prepare( size_t nStart, size_t nEnd )
75             {
76                 m_arr.reserve( nEnd - nStart );
77                 for ( size_t i = nStart; i < nEnd; ++i )
78                     m_arr.push_back( i );
79                 std::random_shuffle( m_arr.begin(), m_arr.end() );
80             }
81         };
82
83     protected:
84         template <class PQueue>
85         void analyze( CppUnitMini::ThreadPool& pool, PQueue& testQueue  )
86         {
87             size_t nThreadItems = s_nQueueSize / s_nThreadCount;
88             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
89                 Pusher<PQueue> * pThread = static_cast<Pusher<PQueue> *>(*it);
90                 CPPUNIT_CHECK_EX( pThread->m_nPushError == 0, "Thread push error count=" << pThread->m_nPushError );
91             }
92             CPPUNIT_MSG( "     Duration=" << pool.avgDuration() );
93             CPPUNIT_ASSERT( !testQueue.empty() );
94
95             typedef std::vector<size_t> vector_type;
96             vector_type arr;
97             arr.reserve( s_nQueueSize );
98
99             cds::OS::Timer      timer;
100             CPPUNIT_MSG( "   Pop (single-threaded)..." );
101             size_t nPopped = 0;
102             SimpleValue val;
103             while ( testQueue.pop( val )) {
104                 nPopped++;
105                 arr.push_back( val.key );
106             }
107             CPPUNIT_MSG( "     Duration=" << timer.duration() );
108
109             CPPUNIT_CHECK( arr.size() == nThreadItems * s_nThreadCount );
110             vector_type::const_iterator it = arr.begin();
111             size_t nPrev = *it;
112             ++it;
113             size_t nErrCount = 0;
114             for ( vector_type::const_iterator itEnd = arr.end(); it != itEnd; ++it ) {
115                 if ( nPrev - 1 != *it ) {
116                     if ( ++nErrCount < 10 ) {
117                         CPPUNIT_CHECK_EX( nPrev - 1 == *it, "Expected=" << nPrev - 1 << ", current=" << *it );
118                     }
119                 }
120                 nPrev = *it;
121             }
122
123             CPPUNIT_CHECK_EX( nErrCount == 0, "Error count=" << nErrCount );
124         }
125
126         template <class PQueue>
127         void test()
128         {
129             PQueue testQueue;
130
131             CppUnitMini::ThreadPool pool( *this );
132             pool.add( new Pusher<PQueue>( pool, testQueue ), s_nThreadCount );
133
134             size_t nStart = 0;
135             size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
136             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
137                 Pusher<PQueue> * pThread = static_cast<Pusher<PQueue> *>(*it);
138                 pThread->prepare( nStart, nStart + nThreadItemCount );
139                 nStart += nThreadItemCount;
140             }
141
142             CPPUNIT_MSG( "   Push test, thread count=" << s_nThreadCount << ", item count=" << nThreadItemCount * s_nThreadCount << " ..." );
143             pool.run();
144
145             analyze( pool, testQueue );
146
147             CPPUNIT_MSG( testQueue.statistics() );
148         }
149
150         template <class PQueue>
151         void test_bounded()
152         {
153             size_t nStart = 0;
154             size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
155
156             std::unique_ptr<PQueue> pq( new PQueue(s_nQueueSize) );
157
158             CppUnitMini::ThreadPool pool( *this );
159             pool.add( new Pusher<PQueue>( pool, *pq ), s_nThreadCount );
160
161             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
162                 Pusher<PQueue> * pThread = static_cast<Pusher<PQueue> *>(*it);
163                 pThread->prepare( nStart, nStart + nThreadItemCount );
164                 nStart += nThreadItemCount;
165             }
166
167             CPPUNIT_MSG( "   Push test, thread count=" << s_nThreadCount << ", item count=" << nThreadItemCount * s_nThreadCount << " ..." );
168             pool.run();
169
170             analyze( pool, *pq );
171
172             CPPUNIT_MSG( pq->statistics() );
173         }
174
175         void setUpParams( const CppUnitMini::TestCfg& cfg ) {
176             s_nThreadCount = cfg.getULong("ThreadCount", (unsigned long) s_nThreadCount );
177             s_nQueueSize = cfg.getULong("QueueSize", (unsigned long) s_nQueueSize );
178         }
179
180     protected:
181 #include "pqueue/pqueue_defs.h"
182         CDSUNIT_DECLARE_MSPriorityQueue
183         CDSUNIT_DECLARE_EllenBinTree
184         CDSUNIT_DECLARE_SkipList
185         CDSUNIT_DECLARE_FCPriorityQueue
186         CDSUNIT_DECLARE_StdPQueue
187
188         CPPUNIT_TEST_SUITE(PQueue_Push)
189             CDSUNIT_TEST_MSPriorityQueue
190             CDSUNIT_TEST_EllenBinTree
191             CDSUNIT_TEST_SkipList
192             CDSUNIT_TEST_FCPriorityQueue
193             CDUNIT_TEST_StdPQueue
194         CPPUNIT_TEST_SUITE_END();
195     };
196
197 } // namespace queue
198
199 CPPUNIT_TEST_SUITE_REGISTRATION(pqueue::PQueue_Push);