bce484853513a737bb5736732f030b09a30925de
[libcds.git] / tests / unit / queue / queue_push.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #include "cppunit/thread.h"
32 #include "queue/queue_type.h"
33 #include "queue/queue_defs.h"
34
35
36 // Multi-threaded queue test for push operation
37 namespace queue {
38
39 #define TEST_CASE( Q, V )       void Q() { test< Types<V>::Q >(); }
40 #define TEST_BOUNDED( Q, V )    void Q() { test_bounded< Types<V>::Q >(); }
41 #define TEST_SEGMENTED( Q, V )  void Q() { test_segmented< Types<V>::Q >(); }
42
43     namespace ns_Queue_Push {
44         static size_t s_nThreadCount = 8;
45         static size_t s_nQueueSize = 20000000 ;   // no more than 20 million records
46
47         struct SimpleValue {
48             size_t      nNo;
49
50             SimpleValue(): nNo(0) {}
51             SimpleValue( size_t n ): nNo(n) {}
52             size_t getNo() const { return  nNo; }
53         };
54     }
55     using namespace ns_Queue_Push;
56
57     class Queue_Push: public CppUnitMini::TestCase
58     {
59         template <class Queue>
60         class Thread: public CppUnitMini::TestThread
61         {
62             virtual TestThread *    clone()
63             {
64                 return new Thread( *this );
65             }
66         public:
67             Queue&              m_Queue;
68             double              m_fTime;
69             size_t              m_nStartItem;
70             size_t              m_nEndItem;
71             size_t              m_nPushError;
72
73         public:
74             Thread( CppUnitMini::ThreadPool& pool, Queue& q )
75                 : CppUnitMini::TestThread( pool )
76                 , m_Queue( q )
77             {}
78             Thread( Thread& src )
79                 : CppUnitMini::TestThread( src )
80                 , m_Queue( src.m_Queue )
81             {}
82
83             Queue_Push&  getTest()
84             {
85                 return reinterpret_cast<Queue_Push&>( m_Pool.m_Test );
86             }
87
88             virtual void init()
89             {
90                 cds::threading::Manager::attachThread();
91             }
92             virtual void fini()
93             {
94                 cds::threading::Manager::detachThread();
95             }
96
97             virtual void test()
98             {
99                 m_fTime = m_Timer.duration();
100
101                 m_nPushError = 0;
102                 for ( size_t nItem = m_nStartItem; nItem < m_nEndItem; ++nItem ) {
103                     if ( !m_Queue.push( nItem ))
104                         ++m_nPushError;
105                 }
106
107                 m_fTime = m_Timer.duration() - m_fTime;
108             }
109         };
110
111     protected:
112         template <class Queue>
113         void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue  )
114         {
115             size_t nThreadItems = s_nQueueSize / s_nThreadCount;
116             double fTime = 0;
117             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
118                 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
119                 fTime += pThread->m_fTime;
120                 if ( pThread->m_nPushError != 0 )
121                     CPPUNIT_MSG("     ERROR: thread push error count=" << pThread->m_nPushError );
122             }
123             CPPUNIT_MSG( "     Duration=" << (fTime / s_nThreadCount) );
124             CPPUNIT_CHECK( !testQueue.empty() )
125
126             size_t * arr = new size_t[ s_nQueueSize ];
127             memset(arr, 0, sizeof(arr[0]) * s_nQueueSize );
128
129             cds::OS::Timer      timer;
130             CPPUNIT_MSG( "   Pop (single-threaded)..." );
131             size_t nPopped = 0;
132             SimpleValue val = SimpleValue();
133             while ( testQueue.pop( val )) {
134                 nPopped++;
135                 ++arr[ val.getNo() ];
136             }
137             CPPUNIT_MSG( "     Duration=" << timer.duration() );
138
139             size_t nTotalItems = nThreadItems * s_nThreadCount;
140             size_t nError = 0;
141             for ( size_t i = 0; i < nTotalItems; ++i ) {
142                 if ( arr[i] != 1 ) {
143                     CPPUNIT_MSG( "   ERROR: Item " << i << " has not been pushed" );
144                     CPPUNIT_ASSERT( ++nError <= 10 );
145                 }
146             }
147
148             delete [] arr;
149         }
150
151         template <class Queue>
152         void test()
153         {
154             Queue testQueue;
155
156             CppUnitMini::ThreadPool pool( *this );
157             pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
158
159             size_t nStart = 0;
160             size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
161             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
162                 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
163                 pThread->m_nStartItem = nStart;
164                 nStart += nThreadItemCount;
165                 pThread->m_nEndItem = nStart;
166             }
167
168             CPPUNIT_MSG( "   Push test, thread count=" << s_nThreadCount << " ...");
169             pool.run();
170
171             analyze( pool, testQueue );
172
173             CPPUNIT_MSG( testQueue.statistics() );
174         }
175
176         template <class Queue>
177         void test_bounded()
178         {
179             size_t nStart = 0;
180             size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
181
182             Queue testQueue( s_nQueueSize );
183
184             CppUnitMini::ThreadPool pool( *this );
185             pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
186
187             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
188                 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
189                 pThread->m_nStartItem = nStart;
190                 nStart += nThreadItemCount;
191                 pThread->m_nEndItem = nStart;
192             }
193
194             CPPUNIT_MSG( "   Push test, thread count=" << s_nThreadCount << " ...");
195             pool.run();
196
197             analyze( pool, testQueue );
198
199             CPPUNIT_MSG( testQueue.statistics() );
200         }
201
202         template <class Queue>
203         void test_segmented()
204         {
205             CPPUNIT_MSG( "   Push test, thread count=" << s_nThreadCount << " ...");
206             for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
207                 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
208
209                 Queue testQueue( nSegmentSize );
210
211                 CppUnitMini::ThreadPool pool( *this );
212                 pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
213
214                 size_t nStart = 0;
215                 size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
216                 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
217                     Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
218                     pThread->m_nStartItem = nStart;
219                     nStart += nThreadItemCount;
220                     pThread->m_nEndItem = nStart;
221                 }
222
223                 pool.run();
224
225                 analyze( pool, testQueue );
226
227                 CPPUNIT_MSG( testQueue.statistics() );
228             }
229         }
230
231         void setUpParams( const CppUnitMini::TestCfg& cfg ) {
232             s_nThreadCount = cfg.getULong("ThreadCount", 8 );
233             s_nQueueSize = cfg.getULong("QueueSize", 20000000 );
234         }
235
236     protected:
237         CDSUNIT_DECLARE_MoirQueue( SimpleValue )
238         CDSUNIT_DECLARE_MSQueue( SimpleValue )
239         CDSUNIT_DECLARE_OptimisticQueue( SimpleValue )
240         CDSUNIT_DECLARE_BasketQueue( SimpleValue )
241         CDSUNIT_DECLARE_FCQueue( SimpleValue )
242         CDSUNIT_DECLARE_FCDeque( SimpleValue )
243         CDSUNIT_DECLARE_SegmentedQueue( SimpleValue )
244         CDSUNIT_DECLARE_RWQueue( SimpleValue )
245         CDSUNIT_DECLARE_TsigasCycleQueue( SimpleValue )
246         CDSUNIT_DECLARE_VyukovMPMCCycleQueue( SimpleValue )
247         CDSUNIT_DECLARE_StdQueue( SimpleValue )
248
249         CPPUNIT_TEST_SUITE(Queue_Push)
250             CDSUNIT_TEST_MoirQueue
251             CDSUNIT_TEST_MSQueue
252             CDSUNIT_TEST_OptimisticQueue
253             CDSUNIT_TEST_BasketQueue
254             CDSUNIT_TEST_FCQueue
255             CDSUNIT_TEST_FCDeque
256             CDSUNIT_TEST_SegmentedQueue
257             CDSUNIT_TEST_RWQueue
258             CDSUNIT_TEST_TsigasCycleQueue
259             CDSUNIT_TEST_VyukovMPMCCycleQueue
260             CDSUNIT_TEST_StdQueue
261         CPPUNIT_TEST_SUITE_END();
262     };
263
264 } // namespace queue
265
266 CPPUNIT_TEST_SUITE_REGISTRATION(queue::Queue_Push);