Move libcds 1.6.0 from SVN
[libcds.git] / tests / unit / pqueue / push_pop.cpp
1 //$$CDS-header$$
2
3 #include "cppunit/thread.h"
4 #include "pqueue/pqueue_item.h"
5 #include "pqueue/pqueue_type.h"
6
7 #include <vector>
8 #include <algorithm>    // random_shuffle
9 #include <cds/details/std/memory.h>
10
11 namespace pqueue {
12
13 #define TEST_CASE( Q ) void Q() { test< Types<pqueue::SimpleValue>::Q >(); }
14 #define TEST_BOUNDED( Q ) void Q() { test_bounded< Types<pqueue::SimpleValue>::Q >(); }
15
16     namespace {
17         static size_t s_nPushThreadCount = 4;
18         static size_t s_nPopThreadCount = 4;
19         static size_t s_nQueueSize = 2000000;
20     }
21 } // namespace pqueue
22
23 namespace pqueue {
24
25     class PQueue_PushPop: public CppUnitMini::TestCase
26     {
27
28         template <class PQueue>
29         class Pusher: public CppUnitMini::TestThread
30         {
31             virtual TestThread *    clone()
32             {
33                 return new Pusher( *this );
34             }
35         public:
36             PQueue&             m_Queue;
37             size_t              m_nPushError;
38
39             typedef std::vector<size_t> array_type;
40             array_type          m_arr;
41
42         public:
43             Pusher( CppUnitMini::ThreadPool& pool, PQueue& q )
44                 : CppUnitMini::TestThread( pool )
45                 , m_Queue( q )
46             {}
47             Pusher( Pusher& src )
48                 : CppUnitMini::TestThread( src )
49                 , m_Queue( src.m_Queue )
50             {}
51
52             PQueue_PushPop&  getTest()
53             {
54                 return static_cast<PQueue_PushPop&>( m_Pool.m_Test );
55             }
56
57             virtual void init()
58             {
59                 cds::threading::Manager::attachThread();
60             }
61             virtual void fini()
62             {
63                 cds::threading::Manager::detachThread();
64             }
65
66             virtual void test()
67             {
68                 m_nPushError = 0;
69
70                 for ( array_type::const_iterator it = m_arr.begin(); it != m_arr.end(); ++it ) {
71                     if ( !m_Queue.push( SimpleValue( *it ) ))
72                         ++m_nPushError;
73                 }
74
75                 getTest().end_pusher();
76             }
77
78             void prepare( size_t nStart, size_t nEnd )
79             {
80                 m_arr.reserve( nEnd - nStart );
81                 for ( size_t i = nStart; i < nEnd; ++i )
82                     m_arr.push_back( i );
83                 std::random_shuffle( m_arr.begin(), m_arr.end() );
84             }
85         };
86
87         template <class PQueue>
88         class Popper: public CppUnitMini::TestThread
89         {
90             virtual TestThread *    clone()
91             {
92                 return new Popper( *this );
93             }
94         public:
95             PQueue&             m_Queue;
96             size_t              m_nPopSuccess;
97             size_t              m_nPopFailed;
98
99             typedef std::vector<size_t> array_type;
100             array_type          m_arr;
101
102         public:
103             Popper( CppUnitMini::ThreadPool& pool, PQueue& q )
104                 : CppUnitMini::TestThread( pool )
105                 , m_Queue( q )
106             {}
107             Popper( Popper& src )
108                 : CppUnitMini::TestThread( src )
109                 , m_Queue( src.m_Queue )
110             {}
111
112             PQueue_PushPop&  getTest()
113             {
114                 return static_cast<PQueue_PushPop&>( m_Pool.m_Test );
115             }
116
117             virtual void init()
118             {
119                 cds::threading::Manager::attachThread();
120             }
121             virtual void fini()
122             {
123                 cds::threading::Manager::detachThread();
124             }
125
126             virtual void test()
127             {
128                 m_nPopSuccess = 0;
129                 m_nPopFailed = 0;
130
131                 SimpleValue val;
132                 while ( getTest().pushing() || !m_Queue.empty() ) {
133                     if ( m_Queue.pop( val ))
134                         ++m_nPopSuccess;
135                     else
136                         ++m_nPopFailed;
137                 }
138             }
139         };
140
141         size_t  m_nPusherCount;
142         void end_pusher()
143         {
144             --m_nPusherCount;
145         }
146         bool pushing() const
147         {
148             return m_nPusherCount != 0;
149         }
150
151     protected:
152         template <class PQueue>
153         void test()
154         {
155             PQueue testQueue;
156             test_with( testQueue );
157         }
158
159         template <class PQueue>
160         void test_bounded()
161         {
162             std::unique_ptr<PQueue> pq( new PQueue(s_nQueueSize) );
163             test_with( *pq.get() );
164         }
165
166         template <class PQueue>
167         void test_with( PQueue& testQueue )
168         {
169             size_t const nThreadItemCount = s_nQueueSize / s_nPushThreadCount;
170
171             CppUnitMini::ThreadPool pool( *this );
172             pool.add( new Pusher<PQueue>( pool, testQueue ), s_nPushThreadCount );
173
174             size_t nStart = 0;
175             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
176                 Pusher<PQueue> * pThread = static_cast<Pusher<PQueue> *>(*it);
177                 pThread->prepare( nStart, nStart + nThreadItemCount );
178                 nStart += nThreadItemCount;
179             }
180
181             pool.add( new Popper<PQueue>( pool, testQueue ), s_nPopThreadCount );
182
183             m_nPusherCount = s_nPushThreadCount;
184             CPPUNIT_MSG( "   push thread count=" << s_nPushThreadCount << " pop thread count=" << s_nPopThreadCount
185                 << ", item count=" << nThreadItemCount * s_nPushThreadCount << " ..." );
186             pool.run();
187             CPPUNIT_MSG( "     Duration=" << pool.avgDuration() );
188
189             // Analyze result
190             size_t nTotalPopped = 0;
191             size_t nPushFailed = 0;
192             size_t nPopFailed = 0;
193             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
194                 Popper<PQueue> * pPopper = dynamic_cast<Popper<PQueue> *>(*it);
195                 if ( pPopper ) {
196                     nTotalPopped += pPopper->m_nPopSuccess;
197                     nPopFailed += pPopper->m_nPopFailed;
198                 }
199                 else {
200                     Pusher<PQueue> * pPusher = dynamic_cast<Pusher<PQueue> *>(*it);
201                     assert( pPusher );
202                     nPushFailed += pPusher->m_nPushError;
203                 }
204             }
205
206             CPPUNIT_MSG( "   Total: popped=" << nTotalPopped << ", empty pop=" << nPopFailed << ", push error=" << nPushFailed );
207             CPPUNIT_CHECK( nTotalPopped == nThreadItemCount * s_nPushThreadCount );
208             CPPUNIT_CHECK( nPushFailed == 0 );
209
210             check_statistics( testQueue.statistics() );
211             CPPUNIT_MSG( testQueue.statistics() );
212         }
213
214         void setUpParams( const CppUnitMini::TestCfg& cfg ) {
215             s_nPushThreadCount = cfg.getULong("PushThreadCount", (unsigned long) s_nPushThreadCount );
216             s_nPopThreadCount = cfg.getULong("PopThreadCount", (unsigned long) s_nPopThreadCount );
217             s_nQueueSize = cfg.getULong("QueueSize", (unsigned long) s_nQueueSize );
218         }
219
220     protected:
221 #include "pqueue/pqueue_defs.h"
222         CDSUNIT_DECLARE_MSPriorityQueue
223         CDSUNIT_DECLARE_EllenBinTree
224         CDSUNIT_DECLARE_SkipList
225         CDSUNIT_DECLARE_FCPriorityQueue
226         CDSUNIT_DECLARE_StdPQueue
227
228         CPPUNIT_TEST_SUITE(PQueue_PushPop)
229             CDSUNIT_TEST_MSPriorityQueue
230             CDSUNIT_TEST_EllenBinTree
231             CDSUNIT_TEST_SkipList
232             CDSUNIT_TEST_FCPriorityQueue
233             CDUNIT_TEST_StdPQueue
234         CPPUNIT_TEST_SUITE_END();
235     };
236
237 } // namespace queue
238
239 CPPUNIT_TEST_SUITE_REGISTRATION(pqueue::PQueue_PushPop);