TSan exam:
[libcds.git] / tests / unit / pqueue / push_pop.cpp
1 //$$CDS-header$$
2
3 #include "cppunit/thread.h"
4 #include "pqueue/pqueue_item.h"
5 #include "pqueue/pqueue_type.h"
6
7 #include <vector>
8 #include <memory>
9
10 namespace pqueue {
11
12 #define TEST_CASE( Q ) void Q() { test< Types<pqueue::SimpleValue>::Q >(); }
13 #define TEST_BOUNDED( Q ) void Q() { test_bounded< Types<pqueue::SimpleValue>::Q >(); }
14
15     namespace {
16         static size_t s_nPushThreadCount = 4;
17         static size_t s_nPopThreadCount = 4;
18         static size_t s_nQueueSize = 2000000;
19     }
20 } // namespace pqueue
21
22 namespace pqueue {
23
24     class PQueue_PushPop: public CppUnitMini::TestCase
25     {
26
27         template <class PQueue>
28         class Pusher: public CppUnitMini::TestThread
29         {
30             virtual TestThread *    clone()
31             {
32                 return new Pusher( *this );
33             }
34         public:
35             PQueue&             m_Queue;
36             size_t              m_nPushError;
37
38             typedef std::vector<size_t> array_type;
39             array_type          m_arr;
40
41         public:
42             Pusher( CppUnitMini::ThreadPool& pool, PQueue& q )
43                 : CppUnitMini::TestThread( pool )
44                 , m_Queue( q )
45             {}
46             Pusher( Pusher& src )
47                 : CppUnitMini::TestThread( src )
48                 , m_Queue( src.m_Queue )
49             {}
50
51             PQueue_PushPop&  getTest()
52             {
53                 return static_cast<PQueue_PushPop&>( m_Pool.m_Test );
54             }
55
56             virtual void init()
57             {
58                 cds::threading::Manager::attachThread();
59             }
60             virtual void fini()
61             {
62                 cds::threading::Manager::detachThread();
63             }
64
65             virtual void test()
66             {
67                 m_nPushError = 0;
68
69                 for ( array_type::const_iterator it = m_arr.begin(); it != m_arr.end(); ++it ) {
70                     if ( !m_Queue.push( SimpleValue( *it ) ))
71                         ++m_nPushError;
72                 }
73
74                 getTest().end_pusher();
75             }
76
77             void prepare( size_t nStart, size_t nEnd )
78             {
79                 m_arr.reserve( nEnd - nStart );
80                 for ( size_t i = nStart; i < nEnd; ++i )
81                     m_arr.push_back( i );
82                 shuffle( m_arr.begin(), m_arr.end() );
83             }
84         };
85
86         template <class PQueue>
87         class Popper: public CppUnitMini::TestThread
88         {
89             virtual TestThread *    clone()
90             {
91                 return new Popper( *this );
92             }
93         public:
94             PQueue&             m_Queue;
95             size_t              m_nPopSuccess;
96             size_t              m_nPopFailed;
97
98             typedef std::vector<size_t> array_type;
99             array_type          m_arr;
100
101         public:
102             Popper( CppUnitMini::ThreadPool& pool, PQueue& q )
103                 : CppUnitMini::TestThread( pool )
104                 , m_Queue( q )
105             {}
106             Popper( Popper& src )
107                 : CppUnitMini::TestThread( src )
108                 , m_Queue( src.m_Queue )
109             {}
110
111             PQueue_PushPop&  getTest()
112             {
113                 return static_cast<PQueue_PushPop&>( m_Pool.m_Test );
114             }
115
116             virtual void init()
117             {
118                 cds::threading::Manager::attachThread();
119             }
120             virtual void fini()
121             {
122                 cds::threading::Manager::detachThread();
123             }
124
125             virtual void test()
126             {
127                 m_nPopSuccess = 0;
128                 m_nPopFailed = 0;
129
130                 SimpleValue val;
131                 while ( getTest().pushing() || !m_Queue.empty() ) {
132                     if ( m_Queue.pop( val ))
133                         ++m_nPopSuccess;
134                     else
135                         ++m_nPopFailed;
136                 }
137             }
138         };
139
140         atomics::atomic<size_t>  m_nPusherCount;
141         void end_pusher()
142         {
143             m_nPusherCount.fetch_sub( 1, atomics::memory_order_relaxed );
144         }
145         bool pushing() const
146         {
147             return m_nPusherCount.load( atomics::memory_order_relaxed ) != 0;
148         }
149
150     protected:
151         template <class PQueue>
152         void test()
153         {
154             PQueue testQueue;
155             test_with( testQueue );
156         }
157
158         template <class PQueue>
159         void test_bounded()
160         {
161             std::unique_ptr<PQueue> pq( new PQueue(s_nQueueSize) );
162             test_with( *pq.get() );
163         }
164
165         template <class PQueue>
166         void test_with( PQueue& testQueue )
167         {
168             size_t const nThreadItemCount = s_nQueueSize / s_nPushThreadCount;
169
170             CppUnitMini::ThreadPool pool( *this );
171             pool.add( new Pusher<PQueue>( pool, testQueue ), s_nPushThreadCount );
172
173             size_t nStart = 0;
174             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
175                 Pusher<PQueue> * pThread = static_cast<Pusher<PQueue> *>(*it);
176                 pThread->prepare( nStart, nStart + nThreadItemCount );
177                 nStart += nThreadItemCount;
178             }
179
180             pool.add( new Popper<PQueue>( pool, testQueue ), s_nPopThreadCount );
181
182             m_nPusherCount.store( s_nPushThreadCount, atomics::memory_order_release );
183             CPPUNIT_MSG( "   push thread count=" << s_nPushThreadCount << " pop thread count=" << s_nPopThreadCount
184                 << ", item count=" << nThreadItemCount * s_nPushThreadCount << " ..." );
185             pool.run();
186             CPPUNIT_MSG( "     Duration=" << pool.avgDuration() );
187
188             // Analyze result
189             size_t nTotalPopped = 0;
190             size_t nPushFailed = 0;
191             size_t nPopFailed = 0;
192             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
193                 Popper<PQueue> * pPopper = dynamic_cast<Popper<PQueue> *>(*it);
194                 if ( pPopper ) {
195                     nTotalPopped += pPopper->m_nPopSuccess;
196                     nPopFailed += pPopper->m_nPopFailed;
197                 }
198                 else {
199                     Pusher<PQueue> * pPusher = dynamic_cast<Pusher<PQueue> *>(*it);
200                     assert( pPusher );
201                     nPushFailed += pPusher->m_nPushError;
202                 }
203             }
204
205             CPPUNIT_MSG( "   Total: popped=" << nTotalPopped << ", empty pop=" << nPopFailed << ", push error=" << nPushFailed );
206             CPPUNIT_CHECK( nTotalPopped == nThreadItemCount * s_nPushThreadCount );
207             CPPUNIT_CHECK( nPushFailed == 0 );
208
209             check_statistics( testQueue.statistics() );
210             CPPUNIT_MSG( testQueue.statistics() );
211         }
212
213         void setUpParams( const CppUnitMini::TestCfg& cfg ) {
214             s_nPushThreadCount = cfg.getULong("PushThreadCount", (unsigned long) s_nPushThreadCount );
215             s_nPopThreadCount = cfg.getULong("PopThreadCount", (unsigned long) s_nPopThreadCount );
216             s_nQueueSize = cfg.getULong("QueueSize", (unsigned long) s_nQueueSize );
217         }
218
219     protected:
220 #include "pqueue/pqueue_defs.h"
221         CDSUNIT_DECLARE_MSPriorityQueue
222         CDSUNIT_DECLARE_EllenBinTree
223         CDSUNIT_DECLARE_SkipList
224         CDSUNIT_DECLARE_FCPriorityQueue
225         CDSUNIT_DECLARE_StdPQueue
226
227         CPPUNIT_TEST_SUITE(PQueue_PushPop)
228             CDSUNIT_TEST_MSPriorityQueue
229             CDSUNIT_TEST_EllenBinTree
230             CDSUNIT_TEST_SkipList
231             CDSUNIT_TEST_FCPriorityQueue
232             CDUNIT_TEST_StdPQueue
233         CPPUNIT_TEST_SUITE_END();
234     };
235
236 } // namespace queue
237
238 CPPUNIT_TEST_SUITE_REGISTRATION(pqueue::PQueue_PushPop);