Removed trailing spaces
[libcds.git] / test / stress / queue / spsc_buffer.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "queue_type.h"
32
33 #include <vector>
34 #include <algorithm>
35 #include <type_traits>
36
37 // Single producer/single consumer buffer push/pop test
38 namespace {
39
40     static size_t s_nBufferSize = 1024*1024;
41     static size_t s_nPushCount = 1000000;
42
43     static std::atomic<size_t> s_nProducerDone( 0 );
44
45     class spsc_buffer: public cds_test::stress_fixture
46     {
47     protected:
48        typedef size_t value_type;
49
50         enum {
51             producer_thread,
52             consumer_thread
53         };
54
55         template <class Queue>
56         class Producer: public cds_test::thread
57         {
58             typedef cds_test::thread base_class;
59
60         public:
61             Producer( cds_test::thread_pool& pool, Queue& queue )
62                 : base_class( pool, producer_thread )
63                 , m_Queue( queue )
64             {}
65
66             Producer( Producer& src )
67                 : base_class( src )
68                 , m_Queue( src.m_Queue )
69             {}
70
71             virtual thread * clone()
72             {
73                 return new Producer( *this );
74             }
75
76             virtual void test()
77             {
78                 size_t const nPushCount = s_nPushCount;
79
80                 for ( size_t i = 0; i < nPushCount; ++i ) {
81                     size_t len = rand( 1024 ) + 64;
82                     void* buf = m_Queue.back( len );
83                     if ( buf ) {
84                         memset( buf, len % 256, len );
85                         m_Queue.push_back();
86                         m_nPushed += len;
87                     }
88                     else
89                         ++m_nPushFailed;
90                 }
91
92                 s_nProducerDone.fetch_add( 1 );
93             }
94
95         public:
96             Queue&              m_Queue;
97             size_t              m_nPushFailed = 0;
98             size_t              m_nPushed = 0;
99         };
100
101         template <class Queue>
102         class Consumer: public cds_test::thread
103         {
104             typedef cds_test::thread base_class;
105
106         public:
107             Queue&              m_Queue;
108             size_t              m_nPopEmpty = 0;
109             size_t              m_nPopped = 0;
110             size_t              m_nBadValue = 0;
111             size_t              m_nPopFrontFailed = 0;
112
113         public:
114             Consumer( cds_test::thread_pool& pool, Queue& queue )
115                 : base_class( pool, consumer_thread )
116                 , m_Queue( queue )
117             {}
118             Consumer( Consumer& src )
119                 : base_class( src )
120                 , m_Queue( src.m_Queue )
121             {}
122
123             virtual thread * clone()
124             {
125                 return new Consumer( *this );
126             }
127
128             virtual void test()
129             {
130                 while ( true ) {
131                     auto buf = m_Queue.front();
132                     if ( buf.first ) {
133                         m_nPopped += buf.second;
134
135                         uint8_t val = static_cast<uint8_t>( buf.second % 256 );
136                         uint8_t const* p = reinterpret_cast<uint8_t*>( buf.first );
137                         for ( uint8_t const* pEnd = p + buf.second; p < pEnd; ++p ) {
138                             if ( *p != val ) {
139                                 ++m_nBadValue;
140                                 break;
141                             }
142                         }
143
144                         if ( !m_Queue.pop_front())
145                             ++m_nPopFrontFailed;
146                     }
147                     else {
148                         ++m_nPopEmpty;
149                         if ( s_nProducerDone.load() != 0 ) {
150                             if ( m_Queue.empty())
151                                 break;
152                         }
153                     }
154                 }
155             }
156         };
157
158     protected:
159         size_t m_nThreadPushCount;
160
161     protected:
162         template <class Queue>
163         void test_queue( Queue& q )
164         {
165             cds_test::thread_pool& pool = get_pool();
166             auto producer = new Producer<Queue>( pool, q );
167             auto consumer = new Consumer<Queue>( pool, q );
168
169             pool.add( producer, 1 );
170             pool.add( consumer, 1 );
171
172             s_nProducerDone.store( 0 );
173
174             propout() << std::make_pair( "buffer_size", s_nBufferSize )
175                       << std::make_pair( "push_count", s_nPushCount );
176
177             std::chrono::milliseconds duration = pool.run();
178
179             propout() << std::make_pair( "duration", duration );
180
181             // analyze result
182             EXPECT_EQ( consumer->m_nBadValue, 0u );
183             EXPECT_EQ( consumer->m_nPopFrontFailed, 0u );
184             EXPECT_EQ( consumer->m_nPopped, producer->m_nPushed );
185
186             propout()
187                 << std::make_pair( "producer_push_length", producer->m_nPushed )
188                 << std::make_pair( "producer_push_failed", producer->m_nPushFailed )
189                 << std::make_pair( "consumer_pop_length", consumer->m_nPopped )
190                 << std::make_pair( "consumer_pop_empty", consumer->m_nPopEmpty )
191                 << std::make_pair( "consumer_bad_value", consumer->m_nBadValue )
192                 << std::make_pair( "consumer_pop_front_failed", consumer->m_nPopFrontFailed );
193         }
194
195         template <class Queue>
196         void test( Queue& q )
197         {
198             test_queue( q );
199             propout() << q.statistics();
200         }
201
202     public:
203         static void SetUpTestCase()
204         {
205             cds_test::config const& cfg = get_config( "spsc_buffer" );
206
207             s_nBufferSize = cfg.get_size_t( "BufferSize", s_nBufferSize );
208             s_nPushCount = cfg.get_size_t( "PushCount", s_nPushCount );
209
210             if ( s_nBufferSize < 1024 * 64 )
211                 s_nBufferSize = 1024 * 64;
212             if ( s_nPushCount == 0u )
213                 s_nPushCount = 1024;
214         }
215     };
216
217 #undef CDSSTRESS_Queue_F
218 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
219     TEST_F( test_fixture, type_name ) \
220     { \
221         typedef queue::Types< value_type >::type_name queue_type; \
222         queue_type queue( s_nBufferSize ); \
223         test( queue ); \
224     }
225
226     CDSSTRESS_WeakRingBuffer_void( spsc_buffer )
227
228 #undef CDSSTRESS_Queue_F
229
230 } // namespace