f4cc2053f95197eb4d9ade668e7f8fe5b4a39f4c
[libcds.git] / test / stress / pqueue / push_pop.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "pqueue_type.h"
32 #include "item.h"
33
34 namespace {
35     static size_t s_nPushThreadCount = 4;
36     static size_t s_nPopThreadCount = 4;
37     static size_t s_nQueueSize = 2000000;
38
39     atomics::atomic<size_t>  s_nProducerCount(0);
40
41     class pqueue_push_pop: public cds_test::stress_fixture
42     {
43         typedef cds_test::stress_fixture base_class;
44
45     public:
46         enum {
47             producer_thread,
48             consumer_thread
49         };
50
51         template <class PQueue>
52         class Producer: public cds_test::thread
53         {
54             typedef cds_test::thread base_class;
55
56         public:
57             Producer( cds_test::thread_pool& pool, PQueue& queue )
58                 : base_class( pool, producer_thread )
59                 , m_Queue( queue )
60             {}
61
62             Producer( Producer& src )
63                 : base_class( src )
64                 , m_Queue( src.m_Queue )
65             {}
66
67             virtual thread * clone()
68             {
69                 return new Producer( *this );
70             }
71
72             virtual void test()
73             {
74                 typedef typename PQueue::value_type value_type;
75
76                 for ( auto it = m_arr.begin(); it != m_arr.end(); ++it ) {
77                     if ( !m_Queue.push( value_type( *it )))
78                         ++m_nPushError;
79                 }
80
81                 s_nProducerCount.fetch_sub( 1, atomics::memory_order_relaxed );
82             }
83
84             void prepare( size_t nStart, size_t nEnd )
85             {
86                 m_arr.reserve( nEnd - nStart );
87                 for ( size_t i = nStart; i < nEnd; ++i )
88                     m_arr.push_back( i );
89                 shuffle( m_arr.begin(), m_arr.end() );
90             }
91
92         public:
93             PQueue&             m_Queue;
94             size_t              m_nPushError = 0;
95
96             typedef std::vector<size_t> array_type;
97             array_type          m_arr;
98         };
99
100         template <class PQueue>
101         class Consumer: public cds_test::thread
102         {
103             typedef cds_test::thread base_class;
104
105         public:
106             Consumer( cds_test::thread_pool& pool, PQueue& queue )
107                 : base_class( pool, consumer_thread )
108                 , m_Queue( queue )
109             {}
110
111             Consumer( Consumer& src )
112                 : base_class( src )
113                 , m_Queue( src.m_Queue )
114             {}
115
116             virtual thread * clone()
117             {
118                 return new Consumer( *this );
119             }
120
121             virtual void test()
122             {
123                 typename PQueue::value_type val;
124                 while ( s_nProducerCount.load( atomics::memory_order_relaxed ) != 0 || !m_Queue.empty() ) {
125                     if ( m_Queue.pop( val ))
126                         ++m_nPopSuccess;
127                     else
128                         ++m_nPopFailed;
129                 }
130             }
131
132         public:
133             PQueue&             m_Queue;
134             size_t              m_nPopSuccess = 0;
135             size_t              m_nPopFailed = 0;
136
137             typedef std::vector<size_t> array_type;
138             array_type          m_arr;
139         };
140
141     protected:
142
143         template <class PQueue>
144         void test( PQueue& q )
145         {
146             size_t const nThreadItemCount = s_nQueueSize / s_nPushThreadCount;
147             s_nQueueSize = nThreadItemCount * s_nPushThreadCount;
148
149             propout() << std::make_pair( "producer_count", s_nPushThreadCount )
150                 << std::make_pair( "consunmer_count", s_nPopThreadCount )
151                 << std::make_pair( "queue_size", s_nQueueSize );
152
153             cds_test::thread_pool& pool = get_pool();
154             pool.add( new Producer<PQueue>( pool, q ), s_nPushThreadCount );
155
156             size_t nStart = 0;
157             for ( size_t i = 0; i < pool.size(); ++i ) {
158                 static_cast<Producer<PQueue>&>(pool.get( i )).prepare( nStart, nStart + nThreadItemCount );
159                 nStart += nThreadItemCount;
160             }
161
162             pool.add( new Consumer<PQueue>( pool, q ), s_nPopThreadCount );
163
164             s_nProducerCount.store( s_nPushThreadCount, atomics::memory_order_release );
165
166             std::chrono::milliseconds duration = pool.run();
167             propout() << std::make_pair( "duration", duration );
168
169             // Analyze result
170             size_t nTotalPopped = 0;
171             size_t nPushFailed = 0;
172             size_t nPopFailed = 0;
173             for ( size_t i = 0; i < pool.size(); ++i ) {
174                 cds_test::thread& t = pool.get(i);
175                 if ( t.type() == consumer_thread ) {
176                     Consumer<PQueue>& cons = static_cast<Consumer<PQueue>&>( t );
177                     nTotalPopped += cons.m_nPopSuccess;
178                     nPopFailed += cons.m_nPopFailed;
179                 }
180                 else {
181                     assert( t.type() == producer_thread );
182                     Producer<PQueue>& prod = static_cast<Producer<PQueue>&>(t);
183                     nPushFailed += prod.m_nPushError;
184                     EXPECT_EQ( prod.m_nPushError , 0 ) << "producer " << i;
185                 }
186             }
187
188             propout() << std::make_pair( "total_popped", nTotalPopped )
189                 << std::make_pair( "empty_pop", nPopFailed )
190                 << std::make_pair( "push_error", nPushFailed );
191
192             EXPECT_EQ( nTotalPopped, s_nQueueSize );
193             EXPECT_EQ( nPushFailed, 0 );
194
195             //check_statistics( testQueue.statistics() );
196             propout() << q.statistics();
197         }
198
199     public:
200         static void SetUpTestCase()\r
201         {\r
202             cds_test::config const& cfg = get_config( "pqueue_push_pop" );\r
203 \r
204             s_nPushThreadCount = cfg.get_size_t( "PushThreadCount", s_nPushThreadCount );
205             s_nPopThreadCount = cfg.get_size_t( "PopThreadCount", s_nPopThreadCount );
206             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
207
208             if ( s_nPushThreadCount == 0 )
209                 s_nPushThreadCount = 1;
210             if ( s_nPopThreadCount == 0 )
211                 s_nPopThreadCount = 1;
212             if ( s_nQueueSize == 0 )
213                 s_nQueueSize = 1000;\r
214         }
215
216         //static void TearDownTestCase();
217     };
218
219 #define CDSSTRESS_MSPriorityQueue( fixture_t, pqueue_t ) \
220     TEST_F( fixture_t, pqueue_t ) \
221     { \
222         typedef pqueue::Types<pqueue::simple_value>::pqueue_t pqueue_type; \
223         pqueue_type pq( s_nQueueSize ); \
224         test( pq ); \
225     }
226     CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_dyn_less )
227     CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_dyn_less_stat )
228     CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_dyn_cmp )
229     //CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_dyn_mutex ) // too slow
230
231 #define CDSSTRESS_MSPriorityQueue_static( fixture_t, pqueue_t ) \
232     TEST_F( fixture_t, pqueue_t ) \
233     { \
234         typedef pqueue::Types<pqueue::simple_value>::pqueue_t pqueue_type; \
235         std::unique_ptr< pqueue_type > pq( new pqueue_type ); \
236         test( *pq.get() ); \
237     }
238     //CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_static_less )
239     //CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_static_less_stat )
240     //CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_static_cmp )
241     //CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_static_mutex )
242
243
244 #define CDSSTRESS_PriorityQueue( fixture_t, pqueue_t ) \
245     TEST_F( fixture_t, pqueue_t ) \
246     { \
247         typedef pqueue::Types<pqueue::simple_value>::pqueue_t pqueue_type; \
248         pqueue_type pq; \
249         test( pq ); \
250     }
251     CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_vector )
252     CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_vector_stat )
253     CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_deque )
254     CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_deque_stat )
255     CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_boost_deque )
256     CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_boost_deque_stat )
257     CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_boost_stable_vector )
258     CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_boost_stable_vector_stat )
259
260     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_HP_max )
261     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_HP_max_stat )
262     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_HP_min )
263     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_HP_min_stat )
264     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_DHP_max )
265     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_DHP_max_stat )
266     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_DHP_min )
267     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_DHP_min_stat )
268     // CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpi_max )
269     // CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpi_max_stat )
270     // CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpi_min )
271     // CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpi_min_stat )
272     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpb_max )
273     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpb_max_stat )
274     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpb_min )
275     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpb_min_stat )
276     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpt_max )
277     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpt_max_stat )
278     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpt_min )
279     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpt_min_stat )
280 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
281     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_shb_max )
282     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_shb_max_stat )
283     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_shb_min )
284     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_shb_min_stat )
285     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_sht_max )
286     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_sht_max_stat )
287     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_sht_min )
288     CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_sht_min_stat )
289 #endif
290
291     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_HP_max )
292     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_HP_max_stat )
293     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_HP_min )
294     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_HP_min_stat )
295     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_DHP_max )
296     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_DHP_max_stat )
297     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_DHP_min )
298     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_DHP_min_stat )
299     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_RCU_gpi_max )
300     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_RCU_gpi_min )
301     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_RCU_gpb_max )
302     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_RCU_gpb_min )
303     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_RCU_gpt_max )
304     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_RCU_gpt_min )
305 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
306     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_RCU_shb_max )
307     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_RCU_shb_min )
308     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_RCU_sht_max )
309     CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList_RCU_sht_min )
310 #endif
311
312     CDSSTRESS_PriorityQueue( pqueue_push_pop, StdPQueue_vector_spin )
313     CDSSTRESS_PriorityQueue( pqueue_push_pop, StdPQueue_vector_mutex )
314     CDSSTRESS_PriorityQueue( pqueue_push_pop, StdPQueue_deque_spin )
315     CDSSTRESS_PriorityQueue( pqueue_push_pop, StdPQueue_deque_mutex )
316
317 } // namespace