bcb4e5f9755138eae3892ef74a2f5fe4b20014f1
[libcds.git] / tests / unit / queue / queue_pop.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #include "cppunit/thread.h"
32 #include "queue/queue_type.h"
33 #include "queue/queue_defs.h"
34
35 // Multi-threaded queue test for pop operation
36 namespace queue {
37
38 #define TEST_CASE( Q, V )       void Q() { test< Types<V>::Q >(); }
39 #define TEST_BOUNDED( Q, V )    void Q() { test_bounded< Types<V>::Q >(); }
40 #define TEST_SEGMENTED( Q, V )  void Q() { test_segmented< Types<V>::Q >(); }
41
42     namespace ns_Queue_Pop {
43         static size_t s_nThreadCount = 8;
44         static size_t s_nQueueSize = 20000000 ;   // no more than 20 million records
45
46         struct SimpleValue {
47             size_t    nNo;
48
49             SimpleValue(): nNo(0) {}
50             SimpleValue( size_t n ): nNo(n) {}
51             size_t getNo() const { return  nNo; }
52         };
53     }
54     using namespace ns_Queue_Pop;
55
56     class Queue_Pop: public CppUnitMini::TestCase
57     {
58         template <class Queue>
59         class Thread: public CppUnitMini::TestThread
60         {
61             virtual TestThread *    clone()
62             {
63                 return new Thread( *this );
64             }
65         public:
66             Queue&              m_Queue;
67             double              m_fTime;
68             long *              m_arr;
69             size_t              m_nPopCount;
70
71         public:
72             Thread( CppUnitMini::ThreadPool& pool, Queue& q )
73                 : CppUnitMini::TestThread( pool )
74                 , m_Queue( q )
75             {
76                 m_arr = new long[s_nQueueSize];
77             }
78             Thread( Thread& src )
79                 : CppUnitMini::TestThread( src )
80                 , m_Queue( src.m_Queue )
81             {
82                 m_arr = new long[s_nQueueSize];
83             }
84             ~Thread()
85             {
86                 delete [] m_arr;
87             }
88
89             Queue_Pop&  getTest()
90             {
91                 return reinterpret_cast<Queue_Pop&>( m_Pool.m_Test );
92             }
93
94             virtual void init()
95             {
96                 cds::threading::Manager::attachThread();
97                 memset(m_arr, 0, sizeof(m_arr[0]) * s_nQueueSize );
98             }
99             virtual void fini()
100             {
101                 cds::threading::Manager::detachThread();
102             }
103
104             virtual void test()
105             {
106                 m_fTime = m_Timer.duration();
107
108                 typedef typename Queue::value_type value_type;
109                 value_type value = value_type();
110                 size_t nPopCount = 0;
111                 while ( m_Queue.pop( value ) ) {
112                     ++m_arr[ value.getNo() ];
113                     ++nPopCount;
114                 }
115                 m_nPopCount = nPopCount;
116
117                 m_fTime = m_Timer.duration() - m_fTime;
118             }
119         };
120
121     protected:
122
123         template <class Queue>
124         void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue  )
125         {
126             size_t * arr = new size_t[ s_nQueueSize ];
127             memset(arr, 0, sizeof(arr[0]) * s_nQueueSize );
128
129             double fTime = 0;
130             size_t nTotalPops = 0;
131             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
132                 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
133                 for ( size_t i = 0; i < s_nQueueSize; ++i )
134                     arr[i] += pThread->m_arr[i];
135                 nTotalPops += pThread->m_nPopCount;
136                 fTime += pThread->m_fTime;
137             }
138             CPPUNIT_MSG( "     Duration=" << (fTime / s_nThreadCount) );
139             CPPUNIT_CHECK_EX( nTotalPops == s_nQueueSize, "Total pop=" << nTotalPops << ", queue size=" << s_nQueueSize);
140             CPPUNIT_CHECK( testQueue.empty() )
141
142             size_t nError = 0;
143             for ( size_t i = 0; i < s_nQueueSize; ++i ) {
144                 if ( arr[i] != 1 ) {
145                     CPPUNIT_MSG( "   ERROR: Item " << i << " has not been popped" );
146                     CPPUNIT_ASSERT( ++nError <= 10 );
147                 }
148             }
149
150             delete [] arr;
151         }
152
153         template <class Queue>
154         void test()
155         {
156             Queue testQueue;
157             CppUnitMini::ThreadPool pool( *this );
158             pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
159
160             CPPUNIT_MSG( "   Create queue size =" << s_nQueueSize << " ...");
161             cds::OS::Timer      timer;
162             for ( size_t i = 0; i < s_nQueueSize; ++i )
163                 testQueue.push( i );
164             CPPUNIT_MSG( "     Duration=" << timer.duration() );
165
166             CPPUNIT_MSG( "   Pop test, thread count=" << s_nThreadCount << " ...");
167             pool.run();
168
169             analyze( pool, testQueue );
170
171             CPPUNIT_MSG( testQueue.statistics() );
172         }
173
174         template <class Queue>
175         void test_bounded()
176         {
177             Queue testQueue( s_nQueueSize );
178             CppUnitMini::ThreadPool pool( *this );
179             pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
180
181             CPPUNIT_MSG( "   Create queue size =" << s_nQueueSize << " ...");
182             cds::OS::Timer      timer;
183             for ( size_t i = 0; i < s_nQueueSize; ++i )
184                 testQueue.push( i );
185             CPPUNIT_MSG( "     Duration=" << timer.duration() );
186
187             CPPUNIT_MSG( "   Pop test, thread count=" << s_nThreadCount << " ...");
188             pool.run();
189
190             analyze( pool, testQueue );
191
192             CPPUNIT_MSG( testQueue.statistics() );
193         }
194
195         template <class Queue>
196         void test_segmented()
197         {
198             for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
199                 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
200
201                 Queue testQueue( nSegmentSize );
202                 CppUnitMini::ThreadPool pool( *this );
203                 pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
204
205                 CPPUNIT_MSG( "   Create queue size =" << s_nQueueSize << " ...");
206                 cds::OS::Timer      timer;
207                 for ( size_t i = 0; i < s_nQueueSize; ++i )
208                     testQueue.push( i );
209                 CPPUNIT_MSG( "     Duration=" << timer.duration() );
210
211                 CPPUNIT_MSG( "   Pop test, thread count=" << s_nThreadCount << " ...");
212                 pool.run();
213
214                 analyze( pool, testQueue );
215
216                 CPPUNIT_MSG( testQueue.statistics() );
217             }
218         }
219
220         void setUpParams( const CppUnitMini::TestCfg& cfg ) {
221             s_nThreadCount = cfg.getULong("ThreadCount", 8 );
222             s_nQueueSize = cfg.getULong("QueueSize", 20000000 );
223         }
224
225     protected:
226         CDSUNIT_DECLARE_MoirQueue( SimpleValue )
227         CDSUNIT_DECLARE_MSQueue( SimpleValue )
228         CDSUNIT_DECLARE_OptimisticQueue( SimpleValue )
229         CDSUNIT_DECLARE_BasketQueue( SimpleValue )
230         CDSUNIT_DECLARE_FCQueue( SimpleValue )
231         CDSUNIT_DECLARE_FCDeque( SimpleValue )
232         CDSUNIT_DECLARE_SegmentedQueue( SimpleValue )
233         CDSUNIT_DECLARE_RWQueue( SimpleValue )
234         CDSUNIT_DECLARE_TsigasCycleQueue( SimpleValue )
235         CDSUNIT_DECLARE_VyukovMPMCCycleQueue( SimpleValue )
236         CDSUNIT_DECLARE_StdQueue( SimpleValue )
237
238         CPPUNIT_TEST_SUITE(Queue_Pop)
239             CDSUNIT_TEST_MoirQueue
240             CDSUNIT_TEST_MSQueue
241             CDSUNIT_TEST_OptimisticQueue
242             CDSUNIT_TEST_BasketQueue
243             CDSUNIT_TEST_FCQueue
244             CDSUNIT_TEST_FCDeque
245             CDSUNIT_TEST_SegmentedQueue
246             CDSUNIT_TEST_RWQueue
247             CDSUNIT_TEST_TsigasCycleQueue
248             CDSUNIT_TEST_VyukovMPMCCycleQueue
249             CDSUNIT_TEST_StdQueue
250         CPPUNIT_TEST_SUITE_END();
251     };
252
253 } // namespace queue
254
255 CPPUNIT_TEST_SUITE_REGISTRATION(queue::Queue_Pop);