Merge branch 'master' into dev
[libcds.git] / test / stress / queue / push.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "queue_type.h"
32
33 // Multi-threaded queue test for push operation
34 namespace {
35
36     static size_t s_nThreadCount = 8;
37     static size_t s_nQueueSize = 20000000 ;   // no more than 20 million records
38
39     class queue_push: public cds_test::stress_fixture
40     {
41     protected:
42         struct value_type
43         {
44             size_t      nNo;
45
46             value_type()
47                 : nNo( 0 )
48             {}
49
50             value_type( size_t n )
51                 : nNo( n )
52             {}
53         };
54
55         template <class Queue>
56         class Producer: public cds_test::thread
57         {
58             typedef cds_test::thread base_class;
59
60         public:
61             Producer( cds_test::thread_pool& pool, Queue& queue )
62                 : base_class( pool )
63                 , m_Queue( queue )
64                 , m_nStartItem( 0 )
65                 , m_nEndItem( 0 )
66                 , m_nPushError( 0 )
67             {}
68
69             Producer( Producer& src )
70                 : base_class( src )
71                 , m_Queue( src.m_Queue )
72                 , m_nStartItem( 0 )
73                 , m_nEndItem( 0 )
74                 , m_nPushError( 0 )
75             {}
76
77             virtual thread * clone()
78             {
79                 return new Producer( *this );
80             }
81
82             virtual void test()
83             {
84                 for ( size_t nItem = m_nStartItem; nItem < m_nEndItem; ++nItem ) {
85                     if ( !m_Queue.push( nItem ))
86                         ++m_nPushError;
87                 }
88             }
89
90         public:
91             Queue&              m_Queue;
92             size_t              m_nStartItem;
93             size_t              m_nEndItem;
94             size_t              m_nPushError;
95         };
96
97     public:
98         static void SetUpTestCase()
99         {
100             cds_test::config const& cfg = get_config( "queue_push" );
101
102             s_nThreadCount = cfg.get_size_t( "ThreadCount", s_nThreadCount );
103             s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
104
105             if ( s_nThreadCount == 0u )
106                 s_nThreadCount = 1;
107             if ( s_nQueueSize == 0u )
108                 s_nQueueSize = 1000;
109         }
110
111         //static void TearDownTestCase();
112
113     protected:
114         template <class Queue>
115         void test( Queue& q )
116         {
117             cds_test::thread_pool& pool = get_pool();
118
119             pool.add( new Producer<Queue>( pool, q ), s_nThreadCount );
120
121             size_t nStart = 0;
122             size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
123             for ( size_t i = 0; i < pool.size(); ++i ) {
124                 Producer<Queue>& thread = static_cast<Producer<Queue>&>(pool.get( i ));
125                 thread.m_nStartItem = nStart;
126                 nStart += nThreadItemCount;
127                 thread.m_nEndItem = nStart;
128             }
129
130             s_nQueueSize = nThreadItemCount * s_nThreadCount;
131             propout() << std::make_pair( "thread_count", s_nThreadCount )
132                 << std::make_pair( "push_count", s_nQueueSize );
133
134             std::chrono::milliseconds duration = pool.run();
135
136             propout() << std::make_pair( "duration", duration );
137
138             analyze( q );
139
140             propout() << q.statistics();
141         }
142
143         template <class Queue>
144         void analyze( Queue& q )
145         {
146             size_t nThreadItems = s_nQueueSize / s_nThreadCount;
147             cds_test::thread_pool& pool = get_pool();
148
149             for ( size_t i = 0; i < pool.size(); ++i ) {
150                 Producer<Queue>& thread = static_cast<Producer<Queue>&>(pool.get( i ));
151                 EXPECT_EQ( thread.m_nPushError, 0u ) << " producer thread " << i;
152             }
153             EXPECT_TRUE( !q.empty());
154
155             std::unique_ptr< uint8_t[] > arr( new uint8_t[s_nQueueSize] );
156             memset( arr.get(), 0, sizeof(arr[0]) * s_nQueueSize );
157
158             size_t nPopped = 0;
159             value_type val;
160             while ( q.pop( val )) {
161                 nPopped++;
162                 ++arr[ val.nNo ];
163             }
164
165             size_t nTotalItems = nThreadItems * s_nThreadCount;
166             for ( size_t i = 0; i < nTotalItems; ++i ) {
167                 EXPECT_EQ( arr[i], 1 ) << "i=" << i;
168             }
169         }
170     };
171
172     CDSSTRESS_MSQueue( queue_push )
173     CDSSTRESS_MoirQueue( queue_push )
174     CDSSTRESS_BasketQueue( queue_push )
175     CDSSTRESS_OptimsticQueue( queue_push )
176     CDSSTRESS_FCQueue( queue_push )
177     CDSSTRESS_FCDeque( queue_push )
178     CDSSTRESS_RWQueue( queue_push )
179     CDSSTRESS_StdQueue( queue_push )
180
181 #undef CDSSTRESS_Queue_F
182 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
183     TEST_F( test_fixture, type_name ) \
184     { \
185         typedef queue::Types< value_type >::type_name queue_type; \
186         queue_type queue( s_nQueueSize ); \
187         test( queue ); \
188     }
189
190     CDSSTRESS_VyukovQueue( queue_push )
191
192 #undef CDSSTRESS_Queue_F
193
194
195     // ********************************************************************
196     // SegmentedQueue test
197
198     class segmented_queue_push
199         : public queue_push
200         , public ::testing::WithParamInterface< size_t >
201     {
202         typedef queue_push base_class;
203
204     protected:
205         template <typename Queue>
206         void test()
207         {
208             size_t quasi_factor = GetParam();
209
210             Queue q( quasi_factor );
211             propout() << std::make_pair( "quasi_factor", quasi_factor );
212             base_class::test( q );
213         }
214
215     public:
216         static std::vector< size_t > get_test_parameters()
217         {
218             cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push" );
219             bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
220             size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
221
222             std::vector<size_t> args;
223             if ( bIterative && quasi_factor > 4 ) {
224                 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
225                     args.push_back( qf );
226             }
227             else {
228                 if ( quasi_factor > 2 )
229                     args.push_back( quasi_factor );
230                 else
231                     args.push_back( 2 );
232             }
233
234             return args;
235         }
236     };
237
238 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
239     TEST_P( test_fixture, type_name ) \
240     { \
241         typedef typename queue::Types<value_type>::type_name queue_type; \
242         test< queue_type >(); \
243     }
244
245     CDSSTRESS_SegmentedQueue( segmented_queue_push )
246
247 #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
248     static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
249     {
250         return std::to_string( p.param );
251     }
252     INSTANTIATE_TEST_CASE_P( SQ,
253         segmented_queue_push,
254         ::testing::ValuesIn( segmented_queue_push::get_test_parameters()), get_test_parameter_name );
255 #else
256         INSTANTIATE_TEST_CASE_P( SQ,
257             segmented_queue_push,
258             ::testing::ValuesIn( segmented_queue_push::get_test_parameters()) );
259 #endif
260
261
262 } // namespace