Removed old tests
[libcds.git] / tests / unit / queue / queue_reader_writer.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #include "cppunit/thread.h"
32 #include "queue/queue_type.h"
33 #include "queue/queue_defs.h"
34
35 #include <vector>
36 #include <algorithm>
37
38 // Multi-threaded random queue test
39 namespace queue {
40
41 #define TEST_CASE( Q, V )       void Q() { test< Types<V>::Q >(); }
42 #define TEST_BOUNDED( Q, V )    TEST_CASE( Q, V )
43 #define TEST_SEGMENTED( Q, V )  void Q() { test_segmented< Types< V >::Q >(); }
44
45     namespace {
46         static size_t s_nReaderThreadCount = 4;
47         static size_t s_nWriterThreadCount = 4;
48         static size_t s_nQueueSize = 4000000;
49
50         struct Value {
51             size_t      nNo;
52             size_t      nWriterNo;
53         };
54     }
55
56     class Queue_ReaderWriter: public CppUnitMini::TestCase
57     {
58         template <class Queue>
59         class WriterThread: public CppUnitMini::TestThread
60         {
61             virtual TestThread *    clone()
62             {
63                 return new WriterThread( *this );
64             }
65         public:
66             Queue&              m_Queue;
67             double              m_fTime;
68             size_t              m_nPushFailed;
69
70         public:
71             WriterThread( CppUnitMini::ThreadPool& pool, Queue& q )
72                 : CppUnitMini::TestThread( pool )
73                 , m_Queue( q )
74             {}
75             WriterThread( WriterThread& src )
76                 : CppUnitMini::TestThread( src )
77                 , m_Queue( src.m_Queue )
78             {}
79
80             Queue_ReaderWriter&  getTest()
81             {
82                 return reinterpret_cast<Queue_ReaderWriter&>( m_Pool.m_Test );
83             }
84
85             virtual void init()
86             {
87                 cds::threading::Manager::attachThread();
88             }
89             virtual void fini()
90             {
91                 cds::threading::Manager::detachThread();
92             }
93
94             virtual void test()
95             {
96                 size_t nPushCount = getTest().m_nThreadPushCount;
97                 Value v;
98                 v.nWriterNo = m_nThreadNo;
99                 v.nNo = 0;
100                 m_nPushFailed = 0;
101
102                 m_fTime = m_Timer.duration();
103
104                 while ( v.nNo < nPushCount ) {
105                     if ( m_Queue.push( v ))
106                         ++v.nNo;
107                     else
108                         ++m_nPushFailed;
109                 }
110
111                 m_fTime = m_Timer.duration() - m_fTime;
112                 getTest().m_nWriterDone.fetch_add( 1 );
113             }
114         };
115
116         template <class Queue>
117         class ReaderThread: public CppUnitMini::TestThread
118         {
119             virtual TestThread *    clone()
120             {
121                 return new ReaderThread( *this );
122             }
123         public:
124             Queue&              m_Queue;
125             double              m_fTime;
126             size_t              m_nPopEmpty;
127             size_t              m_nPopped;
128             size_t              m_nBadWriter;
129
130             typedef std::vector<size_t> TPoppedData;
131             typedef std::vector<size_t>::iterator       data_iterator;
132             typedef std::vector<size_t>::const_iterator const_data_iterator;
133
134             std::vector<TPoppedData>        m_WriterData;
135
136         private:
137             void initPoppedData()
138             {
139                 const size_t nWriterCount = s_nWriterThreadCount;
140                 const size_t nWriterPushCount = getTest().m_nThreadPushCount;
141                 m_WriterData.resize( nWriterCount );
142                 for ( size_t i = 0; i < nWriterCount; ++i )
143                     m_WriterData[i].reserve( nWriterPushCount );
144             }
145
146         public:
147             ReaderThread( CppUnitMini::ThreadPool& pool, Queue& q )
148                 : CppUnitMini::TestThread( pool )
149                 , m_Queue( q )
150             {
151                 initPoppedData();
152             }
153             ReaderThread( ReaderThread& src )
154                 : CppUnitMini::TestThread( src )
155                 , m_Queue( src.m_Queue )
156             {
157                 initPoppedData();
158             }
159
160             Queue_ReaderWriter&  getTest()
161             {
162                 return reinterpret_cast<Queue_ReaderWriter&>( m_Pool.m_Test );
163             }
164
165             virtual void init()
166             {
167                 cds::threading::Manager::attachThread();
168             }
169             virtual void fini()
170             {
171                 cds::threading::Manager::detachThread();
172             }
173
174             virtual void test()
175             {
176                 m_nPopEmpty = 0;
177                 m_nPopped = 0;
178                 m_nBadWriter = 0;
179                 const size_t nTotalWriters = s_nWriterThreadCount;
180                 Value v;
181
182                 m_fTime = m_Timer.duration();
183
184                 while ( true ) {
185                     if ( m_Queue.pop( v ) ) {
186                         ++m_nPopped;
187                         if ( /*v.nWriterNo >= 0 &&*/ v.nWriterNo < nTotalWriters )
188                             m_WriterData[ v.nWriterNo ].push_back( v.nNo );
189                         else
190                             ++m_nBadWriter;
191                     }
192                     else
193                         ++m_nPopEmpty;
194
195                     if ( m_Queue.empty() ) {
196                         if ( getTest().m_nWriterDone.load() >= nTotalWriters ) {
197                             if ( m_Queue.empty() )
198                                     break;
199                         }
200                     }
201                 }
202
203                 m_fTime = m_Timer.duration() - m_fTime;
204             }
205         };
206
207     protected:
208         size_t                  m_nThreadPushCount;
209         atomics::atomic<size_t>     m_nWriterDone;
210
211     protected:
212         template <class Queue>
213         void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0  )
214         {
215             typedef ReaderThread<Queue> Reader;
216             typedef WriterThread<Queue> Writer;
217             typedef typename Reader::const_data_iterator    ReaderIterator;
218
219             size_t nPostTestPops = 0;
220             {
221                 Value v;
222                 while ( testQueue.pop( v ))
223                     ++nPostTestPops;
224             }
225
226             double fTimeWriter = 0;
227             double fTimeReader = 0;
228             size_t nTotalPops = 0;
229             size_t nPopFalse = 0;
230             size_t nPoppedItems = 0;
231             size_t nPushFailed = 0;
232
233             std::vector< Reader * > arrReaders;
234
235             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
236                 Reader * pReader = dynamic_cast<Reader *>( *it );
237                 if ( pReader ) {
238                     fTimeReader += pReader->m_fTime;
239                     nTotalPops += pReader->m_nPopped;
240                     nPopFalse += pReader->m_nPopEmpty;
241                     arrReaders.push_back( pReader );
242                     CPPUNIT_CHECK_EX( pReader->m_nBadWriter == 0, "reader " << pReader->m_nThreadNo << " bad writer event count=" << pReader->m_nBadWriter );
243
244                     size_t nPopped = 0;
245                     for ( size_t n = 0; n < s_nWriterThreadCount; ++n )
246                         nPopped += pReader->m_WriterData[n].size();
247
248                     CPPUNIT_MSG( "    Reader " << pReader->m_nThreadNo - s_nWriterThreadCount << " popped count=" << nPopped );
249                     nPoppedItems += nPopped;
250                 }
251                 else {
252                     Writer * pWriter = dynamic_cast<Writer *>( *it );
253                     CPPUNIT_ASSERT( pWriter != nullptr );
254                     fTimeWriter += pWriter->m_fTime;
255                     nPushFailed += pWriter->m_nPushFailed;
256                     if ( !boost::is_base_of<cds::bounded_container, Queue>::value ) {
257                         CPPUNIT_CHECK_EX( pWriter->m_nPushFailed == 0,
258                             "writer " << pWriter->m_nThreadNo << " push failed count=" << pWriter->m_nPushFailed );
259                     }
260                 }
261             }
262             CPPUNIT_CHECK_EX( nTotalPops == nPoppedItems, "nTotalPops=" << nTotalPops << ", nPoppedItems=" << nPoppedItems );
263
264             CPPUNIT_MSG( "    Readers: duration=" << fTimeReader / s_nReaderThreadCount << ", success pop=" << nTotalPops << ", failed pops=" << nPopFalse );
265             CPPUNIT_MSG( "    Writers: duration=" << fTimeWriter / s_nWriterThreadCount << ", failed push=" << nPushFailed );
266
267             size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount;
268             CPPUNIT_CHECK_EX( nTotalPops + nPostTestPops == nQueueSize, "popped=" << nTotalPops + nPostTestPops << " must be " << nQueueSize );
269             CPPUNIT_CHECK( testQueue.empty() );
270
271             // Test that all items have been popped
272             CPPUNIT_MSG( "   Test consistency of popped sequence..." );
273             for ( size_t nWriter = 0; nWriter < s_nWriterThreadCount; ++nWriter ) {
274                 std::vector<size_t> arrData;
275                 arrData.reserve( m_nThreadPushCount );
276                 size_t nErrors = 0;
277                 for ( size_t nReader = 0; nReader < arrReaders.size(); ++nReader ) {
278                     ReaderIterator it = arrReaders[nReader]->m_WriterData[nWriter].begin();
279                     ReaderIterator itEnd = arrReaders[nReader]->m_WriterData[nWriter].end();
280                     if ( it != itEnd ) {
281                         ReaderIterator itPrev = it;
282                         for ( ++it; it != itEnd; ++it ) {
283                             CPPUNIT_CHECK_EX( *itPrev < *it + nRightOffset, "Reader " << nReader << ", Writer " << nWriter << ": prev=" << *itPrev << ", cur=" << *it );
284                             if ( ++nErrors > 10 )
285                                 return;
286                             itPrev = it;
287                         }
288                     }
289
290                     for ( it = arrReaders[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
291                         arrData.push_back( *it );
292                 }
293
294                 std::sort( arrData.begin(), arrData.end() );
295                 nErrors = 0;
296                 for ( size_t i=1; i < arrData.size(); ++i ) {
297                     if ( arrData[i-1] + 1 != arrData[i] ) {
298                         CPPUNIT_CHECK_EX( arrData[i-1] + 1 == arrData[i], "Writer " << nWriter << ": [" << (i-1) << "]=" << arrData[i-1] << ", [" << i << "]=" << arrData[i] );
299                         if ( ++nErrors > 10 )
300                             return;
301                     }
302                 }
303
304                 CPPUNIT_CHECK_EX( arrData[0] == 0, "Writer " << nWriter << "[0] != 0" );
305                 CPPUNIT_CHECK_EX( arrData[arrData.size() - 1] == m_nThreadPushCount - 1, "Writer " << nWriter << "[last] != " << m_nThreadPushCount - 1 );
306             }
307         }
308
309         template <class Queue>
310         void test()
311         {
312             m_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
313             CPPUNIT_MSG( "    reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount
314                 << " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "..." );
315
316             Queue testQueue;
317             CppUnitMini::ThreadPool pool( *this );
318
319             m_nWriterDone.store( 0 );
320
321             // Writers must be first
322             pool.add( new WriterThread<Queue>( pool, testQueue ), s_nWriterThreadCount );
323             pool.add( new ReaderThread<Queue>( pool, testQueue ), s_nReaderThreadCount );
324
325             //CPPUNIT_MSG( "   Reader/Writer test, reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount << "..." );
326             pool.run();
327
328             analyze( pool, testQueue );
329             CPPUNIT_MSG( testQueue.statistics() );
330         }
331
332         template <class Queue>
333         void test_segmented()
334         {
335             m_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
336             CPPUNIT_MSG( "    reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount
337                 << " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "..." );
338
339             for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
340                 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
341
342                 Queue q( nSegmentSize );
343                 CppUnitMini::ThreadPool pool( *this );
344
345                 m_nWriterDone.store( 0 );
346
347                 // Writers must be first
348                 pool.add( new WriterThread<Queue>( pool, q ), s_nWriterThreadCount );
349                 pool.add( new ReaderThread<Queue>( pool, q ), s_nReaderThreadCount );
350
351                 pool.run();
352
353                 analyze( pool, q, nSegmentSize * 2, nSegmentSize );
354                 CPPUNIT_MSG( q.statistics() );
355             }
356         }
357
358         void setUpParams( const CppUnitMini::TestCfg& cfg ) {
359             s_nReaderThreadCount = cfg.getULong("ReaderCount", 4 );
360             s_nWriterThreadCount = cfg.getULong("WriterCount", 4 );
361             s_nQueueSize = cfg.getULong("QueueSize", 10000000 );
362         }
363
364     protected:
365         CDSUNIT_DECLARE_MoirQueue( Value )
366         CDSUNIT_DECLARE_MSQueue( Value )
367         CDSUNIT_DECLARE_OptimisticQueue( Value )
368         CDSUNIT_DECLARE_BasketQueue( Value )
369         CDSUNIT_DECLARE_FCQueue( Value )
370         CDSUNIT_DECLARE_FCDeque( Value )
371         CDSUNIT_DECLARE_SegmentedQueue( Value )
372         CDSUNIT_DECLARE_RWQueue( Value )
373         CDSUNIT_DECLARE_TsigasCycleQueue( Value )
374         CDSUNIT_DECLARE_VyukovMPMCCycleQueue( Value )
375         CDSUNIT_DECLARE_StdQueue( Value )
376
377         CPPUNIT_TEST_SUITE(Queue_ReaderWriter)
378             CDSUNIT_TEST_MoirQueue
379             CDSUNIT_TEST_MSQueue
380             CDSUNIT_TEST_OptimisticQueue
381             CDSUNIT_TEST_BasketQueue
382             CDSUNIT_TEST_FCQueue
383             CDSUNIT_TEST_FCDeque
384             CDSUNIT_TEST_SegmentedQueue
385             CDSUNIT_TEST_RWQueue
386             CDSUNIT_TEST_TsigasCycleQueue
387             CDSUNIT_TEST_VyukovMPMCCycleQueue
388             CDSUNIT_TEST_StdQueue
389         CPPUNIT_TEST_SUITE_END();
390     };
391
392 } // namespace queue
393
394 CPPUNIT_TEST_SUITE_REGISTRATION(queue::Queue_ReaderWriter);