398ca66b197631ba53e1d9ec31310c5093461a2e
[libcds.git] / test / stress / queue / push.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "queue_type.h"
32 #include "../misc/common.h"
33
34 // Multi-threaded queue test for push operation
35 namespace {
36
37     static size_t s_nThreadCount = 8;
38     static size_t s_nQueueSize = 20000000 ;   // no more than 20 million records
39
40     class queue_push: public cds_test::stress_fixture
41     {
42     protected:
43         struct value_type
44         {
45             size_t      nNo;
46
47             value_type()
48                 : nNo( 0 )
49             {}
50
51             value_type( size_t n )
52                 : nNo( n )
53             {}
54         };
55
56         template <class Queue>
57         class Producer: public cds_test::thread
58         {
59             typedef cds_test::thread base_class;
60
61         public:
62             Producer( cds_test::thread_pool& pool, Queue& queue )
63                 : base_class( pool )
64                 , m_Queue( queue )
65                 , m_nStartItem( 0 )
66                 , m_nEndItem( 0 )
67                 , m_nPushError( 0 )
68             {}
69
70             Producer( Producer& src )
71                 : base_class( src )
72                 , m_Queue( src.m_Queue )
73                 , m_nStartItem( 0 )
74                 , m_nEndItem( 0 )
75                 , m_nPushError( 0 )
76             {}
77
78             virtual thread * clone()
79             {
80                 return new Producer( *this );
81             }
82
83             virtual void test()
84             {
85                 for ( size_t nItem = m_nStartItem; nItem < m_nEndItem; ++nItem ) {
86                     if ( !m_Queue.push( nItem ))
87                         ++m_nPushError;
88                 }
89             }
90
91         public:
92             Queue&              m_Queue;
93             size_t              m_nStartItem;
94             size_t              m_nEndItem;
95             size_t              m_nPushError;
96         };
97
98     public:
99         static void SetUpTestCase()
100         {
101             cds_test::config const& cfg = get_config( "queue_push" );
102
103             s_nThreadCount = cfg.get_size_t( "ThreadCount", s_nThreadCount );
104             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
105
106             if ( s_nThreadCount == 0u )
107                 s_nThreadCount = 1;
108             if ( s_nQueueSize == 0u )
109                 s_nQueueSize = 1000;
110         }
111
112         //static void TearDownTestCase();
113
114     protected:
115         template <class Queue>
116         void test( Queue& q )
117         {
118             cds_test::thread_pool& pool = get_pool();
119
120             pool.add( new Producer<Queue>( pool, q ), s_nThreadCount );
121
122             size_t nStart = 0;
123             size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
124             for ( size_t i = 0; i < pool.size(); ++i ) {
125                 Producer<Queue>& thread = static_cast<Producer<Queue>&>(pool.get( i ));
126                 thread.m_nStartItem = nStart;
127                 nStart += nThreadItemCount;
128                 thread.m_nEndItem = nStart;
129             }
130
131             s_nQueueSize = nThreadItemCount * s_nThreadCount;
132             propout() << std::make_pair( "thread_count", s_nThreadCount )
133                 << std::make_pair( "push_count", s_nQueueSize );
134
135             std::chrono::milliseconds duration = pool.run();
136
137             propout() << std::make_pair( "duration", duration );
138
139             DEBUG(analyze( q ));
140
141             propout() << q.statistics();
142         }
143
144         template <class Queue>
145         void analyze( Queue& q )
146         {
147             size_t nThreadItems = s_nQueueSize / s_nThreadCount;
148             cds_test::thread_pool& pool = get_pool();
149
150             for ( size_t i = 0; i < pool.size(); ++i ) {
151                 Producer<Queue>& thread = static_cast<Producer<Queue>&>(pool.get( i ));
152                 EXPECT_EQ( thread.m_nPushError, 0u ) << " producer thread " << i;
153             }
154             EXPECT_TRUE( !q.empty());
155
156             std::unique_ptr< uint8_t[] > arr( new uint8_t[s_nQueueSize] );
157             memset( arr.get(), 0, sizeof(arr[0]) * s_nQueueSize );
158
159             size_t nPopped = 0;
160             value_type val;
161             while ( q.pop( val )) {
162                 nPopped++;
163                 ++arr[ val.nNo ];
164             }
165
166             size_t nTotalItems = nThreadItems * s_nThreadCount;
167             for ( size_t i = 0; i < nTotalItems; ++i ) {
168                 EXPECT_EQ( arr[i], 1 ) << "i=" << i;
169             }
170         }
171     };
172
173     CDSSTRESS_MSQueue( queue_push )
174     CDSSTRESS_MoirQueue( queue_push )
175     CDSSTRESS_BasketQueue( queue_push )
176     CDSSTRESS_OptimsticQueue( queue_push )
177     CDSSTRESS_RWQueue( queue_push )
178
179 #undef CDSSTRESS_Queue_F
180 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
181     TEST_F( test_fixture, type_name ) \
182     { \
183         typedef queue::Types< value_type >::type_name queue_type; \
184         queue_type queue( s_nQueueSize ); \
185         test( queue ); \
186     }
187
188     CDSSTRESS_VyukovQueue( queue_push )
189
190 #undef CDSSTRESS_Queue_F
191
192
193     // ********************************************************************
194     // SegmentedQueue test
195
196     class segmented_queue_push
197         : public queue_push
198         , public ::testing::WithParamInterface< size_t >
199     {
200         typedef queue_push base_class;
201
202     protected:
203         template <typename Queue>
204         void test()
205         {
206             size_t quasi_factor = GetParam();
207
208             Queue q( quasi_factor );
209             propout() << std::make_pair( "quasi_factor", quasi_factor );
210             base_class::test( q );
211         }
212
213     public:
214         static std::vector< size_t > get_test_parameters()
215         {
216             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push" );
217             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
218             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
219
220             std::vector<size_t> args;
221             if ( bIterative && quasi_factor > 4 ) {
222                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
223                     args.push_back( qf );
224             }
225             else {
226                 if ( quasi_factor > 2 )
227                     args.push_back( quasi_factor );
228                 else
229                     args.push_back( 2 );
230             }
231
232             return args;
233         }
234     };
235
236 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
237     TEST_P( test_fixture, type_name ) \
238     { \
239         typedef typename queue::Types<value_type>::type_name queue_type; \
240         test< queue_type >(); \
241     }
242
243     CDSSTRESS_SegmentedQueue( segmented_queue_push )
244
245 #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
246     static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
247     {
248         return std::to_string( p.param );
249     }
250     INSTANTIATE_TEST_CASE_P( SQ,
251         segmented_queue_push,
252         ::testing::ValuesIn( segmented_queue_push::get_test_parameters()), get_test_parameter_name );
253 #else
254         INSTANTIATE_TEST_CASE_P( SQ,
255             segmented_queue_push,
256             ::testing::ValuesIn( segmented_queue_push::get_test_parameters()));
257 #endif
258
259
260 } // namespace