Added more error messages
[libcds.git] / test / stress / pqueue / pop.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "pqueue_type.h"
32 #include "item.h"
33
34 namespace {
35     static size_t s_nThreadCount = 8;
36     static size_t s_nQueueSize = 2000000;
37
38     class pqueue_pop: public cds_test::stress_fixture
39     {
40         typedef cds_test::stress_fixture base_class;
41
42     protected:
43         template <class PQueue>
44         class Producer: public cds_test::thread
45         {
46             typedef cds_test::thread base_class;
47
48         public:
49             Producer( cds_test::thread_pool& pool, PQueue& queue )
50                 : base_class( pool )
51                 , m_Queue( queue )
52             {}
53
54             Producer( Producer& src )
55                 : base_class( src )
56                 , m_Queue( src.m_Queue )
57             {}
58
59             virtual thread * clone()
60             {
61                 return new Producer( *this );
62             }
63
64             virtual void test()
65             {
66                 typedef typename PQueue::value_type value_type;
67                 for ( array_type::const_iterator it = m_arr.begin(); it != m_arr.end(); ++it ) {
68                     if ( !m_Queue.push( value_type( *it ) ))
69                         ++m_nPushError;
70                 }
71             }
72
73             void prepare( size_t nStart, size_t nEnd )
74             {
75                 m_arr.reserve( nEnd - nStart );
76                 for ( size_t i = nStart; i < nEnd; ++i )
77                     m_arr.push_back( i );
78                 shuffle( m_arr.begin(), m_arr.end() );
79             }
80
81         public:
82             PQueue&             m_Queue;
83             size_t              m_nPushError = 0;
84
85             typedef std::vector<size_t> array_type;
86             array_type          m_arr;
87         };
88
89         template <class PQueue>
90         class Consumer: public cds_test::thread
91         {
92             typedef cds_test::thread base_class;
93
94         public:
95             Consumer( cds_test::thread_pool& pool, PQueue& queue )
96                 : base_class( pool )
97                 , m_Queue( queue )
98             {}
99
100             Consumer( Consumer& src )
101                 : base_class( src )
102                 , m_Queue( src.m_Queue )
103             {}
104
105             virtual thread * clone()
106             {
107                 return new Consumer( *this );
108             }
109
110             virtual void test()
111             {
112                 typedef typename PQueue::value_type value_type;
113                 size_t nPrevKey;
114                 value_type val;
115                 if ( m_Queue.pop( val )) {
116                     ++m_nPopSuccess;
117                     nPrevKey = val.key;
118
119                     while ( !m_Queue.empty() ) {
120                         if ( m_Queue.pop( val )) {
121                             ++m_nPopSuccess;
122                             if ( val.key > nPrevKey ) {
123                                 ++m_nPopError;
124                                 m_arrFailedPops.emplace_back( failed_pops{ nPrevKey, val.key } );
125                             }
126                             else if ( val.key == nPrevKey ) {
127                                 ++m_nPopErrorEq;
128                                 m_arrFailedPops.emplace_back( failed_pops{ nPrevKey, val.key } );
129                             }
130                             nPrevKey = val.key;
131                         }
132                         else
133                             ++m_nPopFailed;
134                     }
135                 }
136                 else
137                     ++m_nPopFailed;
138             }
139
140         public:
141             PQueue&             m_Queue;
142             size_t              m_nPopError = 0;
143             size_t              m_nPopErrorEq = 0;
144             size_t              m_nPopSuccess = 0;
145             size_t              m_nPopFailed = 0;
146
147             struct failed_pops {
148                 size_t prev_key;
149                 size_t popped_key;
150             };
151             std::vector< failed_pops > m_arrFailedPops;
152         };
153
154     protected:
155
156         template <class PQueue>
157         void test( PQueue& q )
158         {
159             size_t const nThreadItemCount = s_nQueueSize / s_nThreadCount;
160             s_nQueueSize = nThreadItemCount * s_nThreadCount;
161
162             cds_test::thread_pool& pool = get_pool();
163
164             propout() << std::make_pair( "thread_count", s_nThreadCount )
165                 << std::make_pair( "push_count", s_nQueueSize );
166
167             // push
168             {
169                 pool.add( new Producer<PQueue>( pool, q ), s_nThreadCount );
170
171                 size_t nStart = 0;
172                 for ( size_t i = 0; i < pool.size(); ++i ) {
173                     static_cast<Producer<PQueue>&>( pool.get(i) ).prepare( nStart, nStart + nThreadItemCount );
174                     nStart += nThreadItemCount;
175                 }
176
177                 std::chrono::milliseconds duration = pool.run();
178                 propout() << std::make_pair( "producer_duration", duration );
179             }
180
181             // pop
182             {
183                 pool.clear();
184                 pool.add( new Consumer<PQueue>( pool, q ), s_nThreadCount );
185
186                 std::chrono::milliseconds duration = pool.run();
187                 propout() << std::make_pair( "consumer_duration", duration );
188
189                 // Analyze result
190                 size_t nTotalPopped = 0;
191                 size_t nTotalError = 0;
192                 size_t nTotalErrorEq = 0;
193                 size_t nTotalFailed = 0;
194                 for ( size_t i = 0; i < pool.size(); ++i ) {
195                     Consumer<PQueue>& cons = static_cast<Consumer<PQueue>&>( pool.get(i));
196
197                     nTotalPopped  += cons.m_nPopSuccess;
198                     nTotalError   += cons.m_nPopError;
199                     nTotalErrorEq += cons.m_nPopErrorEq;
200                     nTotalFailed  += cons.m_nPopFailed;
201
202                     if ( !cons.m_arrFailedPops.empty() ) {
203                         std::cerr << "Failed pop:";
204                         for ( size_t k = 0; k < cons.m_arrFailedPops.size(); ++k ) {
205                             std::cerr << "\nThread " << i << ": prev_key=" << cons.m_arrFailedPops[k].prev_key << " popped_key=" << cons.m_arrFailedPops[k].popped_key;
206                         }
207                         std::cerr << std::endl;
208                     }
209                 }
210
211                 propout()
212                     << std::make_pair( "total_popped", nTotalPopped )
213                     << std::make_pair( "error_pop_double", nTotalErrorEq )
214                     << std::make_pair( "error_priority_violation", nTotalError );
215
216                 EXPECT_EQ( nTotalPopped, s_nQueueSize );
217                 EXPECT_EQ( nTotalError, 0 );
218                 EXPECT_EQ( nTotalErrorEq, 0 );
219             }
220
221             propout() << q.statistics();
222         }
223
224     public:
225         static void SetUpTestCase()\r
226         {\r
227             cds_test::config const& cfg = get_config( "pqueue_pop" );\r
228 \r
229             s_nThreadCount = cfg.get_size_t( "ThreadCount", s_nThreadCount );
230             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
231
232             if ( s_nThreadCount == 0 )
233                 s_nThreadCount = 1;
234             if ( s_nQueueSize == 0 )
235                 s_nQueueSize = 1000;\r
236         }
237
238         //static void TearDownTestCase();
239     };
240
241 #define CDSSTRESS_MSPriorityQueue( fixture_t, pqueue_t ) \
242     TEST_F( fixture_t, pqueue_t ) \
243     { \
244         typedef pqueue::Types<pqueue::simple_value>::pqueue_t pqueue_type; \
245         pqueue_type pq( s_nQueueSize ); \
246         test( pq ); \
247     }
248     CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_dyn_less )
249     CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_dyn_less_stat )
250     CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_dyn_cmp )
251     //CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_dyn_mutex ) // too slow
252
253 #define CDSSTRESS_MSPriorityQueue_static( fixture_t, pqueue_t ) \
254     TEST_F( fixture_t, pqueue_t ) \
255     { \
256         typedef pqueue::Types<pqueue::simple_value>::pqueue_t pqueue_type; \
257         std::unique_ptr< pqueue_type > pq( new pqueue_type ); \
258         test( *pq.get() ); \
259     }
260     //CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_static_less )
261     //CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_static_less_stat )
262     //CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_static_cmp )
263     //CDSSTRESS_MSPriorityQueue( pqueue_pop, 1MSPriorityQueue_static_mutex )
264
265
266 #define CDSSTRESS_PriorityQueue( fixture_t, pqueue_t ) \
267     TEST_F( fixture_t, pqueue_t ) \
268     { \
269         typedef pqueue::Types<pqueue::simple_value>::pqueue_t pqueue_type; \
270         pqueue_type pq; \
271         test( pq ); \
272     }
273     CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_vector )
274     CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_vector_stat )
275     CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_deque )
276     CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_deque_stat )
277     CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_boost_deque )
278     CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_boost_deque_stat )
279     CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_boost_stable_vector )
280     CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_boost_stable_vector_stat )
281
282     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_HP_max )
283     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_HP_max_stat )
284     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_HP_min )
285     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_HP_min_stat )
286     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_DHP_max )
287     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_DHP_max_stat )
288     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_DHP_min )
289     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_DHP_min_stat )
290     // CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpi_max )
291     // CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpi_max_stat )
292     // CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpi_min )
293     // CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpi_min_stat )
294     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpb_max )
295     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpb_max_stat )
296     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpb_min )
297     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpb_min_stat )
298     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpt_max )
299     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpt_max_stat )
300     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpt_min )
301     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpt_min_stat )
302 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
303     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_shb_max )
304     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_shb_max_stat )
305     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_shb_min )
306     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_shb_min_stat )
307     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_sht_max )
308     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_sht_max_stat )
309     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_sht_min )
310     CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_sht_min_stat )
311 #endif
312
313     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_HP_max )
314     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_HP_max_stat )
315     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_HP_min )
316     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_HP_min_stat )
317     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_DHP_max )
318     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_DHP_max_stat )
319     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_DHP_min )
320     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_DHP_min_stat )
321     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_RCU_gpi_max )
322     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_RCU_gpi_min )
323     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_RCU_gpb_max )
324     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_RCU_gpb_min )
325     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_RCU_gpt_max )
326     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_RCU_gpt_min )
327 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
328     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_RCU_shb_max )
329     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_RCU_shb_min )
330     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_RCU_sht_max )
331     CDSSTRESS_PriorityQueue( pqueue_pop, SkipList_RCU_sht_min )
332 #endif
333
334     CDSSTRESS_PriorityQueue( pqueue_pop, StdPQueue_vector_spin )
335     CDSSTRESS_PriorityQueue( pqueue_pop, StdPQueue_vector_mutex )
336     CDSSTRESS_PriorityQueue( pqueue_pop, StdPQueue_deque_spin )
337     CDSSTRESS_PriorityQueue( pqueue_pop, StdPQueue_deque_mutex )
338
339 } // namespace