10625e3062e7b1ae0cfdb8a4333e0447519785b4
[libcds.git] / tests / unit / pqueue / push_pop.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #include "cppunit/thread.h"
32 #include "pqueue/pqueue_item.h"
33 #include "pqueue/pqueue_type.h"
34
35 #include <vector>
36 #include <memory>
37
38 namespace pqueue {
39
40 #define TEST_CASE( Q ) void Q() { test< Types<pqueue::SimpleValue>::Q >(); }
41 #define TEST_BOUNDED( Q ) void Q() { test_bounded< Types<pqueue::SimpleValue>::Q >(); }
42
43     namespace {
44         static size_t s_nPushThreadCount = 4;
45         static size_t s_nPopThreadCount = 4;
46         static size_t s_nQueueSize = 2000000;
47     }
48 } // namespace pqueue
49
50 namespace pqueue {
51
52     class PQueue_PushPop: public CppUnitMini::TestCase
53     {
54
55         template <class PQueue>
56         class Pusher: public CppUnitMini::TestThread
57         {
58             virtual TestThread *    clone()
59             {
60                 return new Pusher( *this );
61             }
62         public:
63             PQueue&             m_Queue;
64             size_t              m_nPushError;
65
66             typedef std::vector<size_t> array_type;
67             array_type          m_arr;
68
69         public:
70             Pusher( CppUnitMini::ThreadPool& pool, PQueue& q )
71                 : CppUnitMini::TestThread( pool )
72                 , m_Queue( q )
73             {}
74             Pusher( Pusher& src )
75                 : CppUnitMini::TestThread( src )
76                 , m_Queue( src.m_Queue )
77             {}
78
79             PQueue_PushPop&  getTest()
80             {
81                 return static_cast<PQueue_PushPop&>( m_Pool.m_Test );
82             }
83
84             virtual void init()
85             {
86                 cds::threading::Manager::attachThread();
87             }
88             virtual void fini()
89             {
90                 cds::threading::Manager::detachThread();
91             }
92
93             virtual void test()
94             {
95                 m_nPushError = 0;
96
97                 for ( array_type::const_iterator it = m_arr.begin(); it != m_arr.end(); ++it ) {
98                     if ( !m_Queue.push( SimpleValue( *it ) ))
99                         ++m_nPushError;
100                 }
101
102                 getTest().end_pusher();
103             }
104
105             void prepare( size_t nStart, size_t nEnd )
106             {
107                 m_arr.reserve( nEnd - nStart );
108                 for ( size_t i = nStart; i < nEnd; ++i )
109                     m_arr.push_back( i );
110                 shuffle( m_arr.begin(), m_arr.end() );
111             }
112         };
113
114         template <class PQueue>
115         class Popper: public CppUnitMini::TestThread
116         {
117             virtual TestThread *    clone()
118             {
119                 return new Popper( *this );
120             }
121         public:
122             PQueue&             m_Queue;
123             size_t              m_nPopSuccess;
124             size_t              m_nPopFailed;
125
126             typedef std::vector<size_t> array_type;
127             array_type          m_arr;
128
129         public:
130             Popper( CppUnitMini::ThreadPool& pool, PQueue& q )
131                 : CppUnitMini::TestThread( pool )
132                 , m_Queue( q )
133             {}
134             Popper( Popper& src )
135                 : CppUnitMini::TestThread( src )
136                 , m_Queue( src.m_Queue )
137             {}
138
139             PQueue_PushPop&  getTest()
140             {
141                 return static_cast<PQueue_PushPop&>( m_Pool.m_Test );
142             }
143
144             virtual void init()
145             {
146                 cds::threading::Manager::attachThread();
147             }
148             virtual void fini()
149             {
150                 cds::threading::Manager::detachThread();
151             }
152
153             virtual void test()
154             {
155                 m_nPopSuccess = 0;
156                 m_nPopFailed = 0;
157
158                 SimpleValue val;
159                 while ( getTest().pushing() || !m_Queue.empty() ) {
160                     if ( m_Queue.pop( val ))
161                         ++m_nPopSuccess;
162                     else
163                         ++m_nPopFailed;
164                 }
165             }
166         };
167
168         atomics::atomic<size_t>  m_nPusherCount;
169         void end_pusher()
170         {
171             m_nPusherCount.fetch_sub( 1, atomics::memory_order_relaxed );
172         }
173         bool pushing() const
174         {
175             return m_nPusherCount.load( atomics::memory_order_relaxed ) != 0;
176         }
177
178     protected:
179         template <class PQueue>
180         void test()
181         {
182             PQueue testQueue;
183             test_with( testQueue );
184         }
185
186         template <class PQueue>
187         void test_bounded()
188         {
189             std::unique_ptr<PQueue> pq( new PQueue(s_nQueueSize) );
190             test_with( *pq.get() );
191         }
192
193         template <class PQueue>
194         void test_with( PQueue& testQueue )
195         {
196             size_t const nThreadItemCount = s_nQueueSize / s_nPushThreadCount;
197
198             CppUnitMini::ThreadPool pool( *this );
199             pool.add( new Pusher<PQueue>( pool, testQueue ), s_nPushThreadCount );
200
201             size_t nStart = 0;
202             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
203                 Pusher<PQueue> * pThread = static_cast<Pusher<PQueue> *>(*it);
204                 pThread->prepare( nStart, nStart + nThreadItemCount );
205                 nStart += nThreadItemCount;
206             }
207
208             pool.add( new Popper<PQueue>( pool, testQueue ), s_nPopThreadCount );
209
210             m_nPusherCount.store( s_nPushThreadCount, atomics::memory_order_release );
211             CPPUNIT_MSG( "   push thread count=" << s_nPushThreadCount << " pop thread count=" << s_nPopThreadCount
212                 << ", item count=" << nThreadItemCount * s_nPushThreadCount << " ..." );
213             pool.run();
214             CPPUNIT_MSG( "     Duration=" << pool.avgDuration() );
215
216             // Analyze result
217             size_t nTotalPopped = 0;
218             size_t nPushFailed = 0;
219             size_t nPopFailed = 0;
220             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
221                 Popper<PQueue> * pPopper = dynamic_cast<Popper<PQueue> *>(*it);
222                 if ( pPopper ) {
223                     nTotalPopped += pPopper->m_nPopSuccess;
224                     nPopFailed += pPopper->m_nPopFailed;
225                 }
226                 else {
227                     Pusher<PQueue> * pPusher = dynamic_cast<Pusher<PQueue> *>(*it);
228                     assert( pPusher );
229                     nPushFailed += pPusher->m_nPushError;
230                 }
231             }
232
233             CPPUNIT_MSG( "   Total: popped=" << nTotalPopped << ", empty pop=" << nPopFailed << ", push error=" << nPushFailed );
234             CPPUNIT_CHECK( nTotalPopped == nThreadItemCount * s_nPushThreadCount );
235             CPPUNIT_CHECK( nPushFailed == 0 );
236
237             check_statistics( testQueue.statistics() );
238             CPPUNIT_MSG( testQueue.statistics() );
239         }
240
241         void setUpParams( const CppUnitMini::TestCfg& cfg ) {
242             s_nPushThreadCount = cfg.getULong("PushThreadCount", (unsigned long) s_nPushThreadCount );
243             s_nPopThreadCount = cfg.getULong("PopThreadCount", (unsigned long) s_nPopThreadCount );
244             s_nQueueSize = cfg.getULong("QueueSize", (unsigned long) s_nQueueSize );
245         }
246
247     protected:
248 #include "pqueue/pqueue_defs.h"
249         CDSUNIT_DECLARE_MSPriorityQueue
250         CDSUNIT_DECLARE_EllenBinTree
251         CDSUNIT_DECLARE_SkipList
252         CDSUNIT_DECLARE_FCPriorityQueue
253         CDSUNIT_DECLARE_StdPQueue
254
255         CPPUNIT_TEST_SUITE(PQueue_PushPop)
256             CDSUNIT_TEST_MSPriorityQueue
257             CDSUNIT_TEST_EllenBinTree
258             CDSUNIT_TEST_SkipList
259             CDSUNIT_TEST_FCPriorityQueue
260             CDUNIT_TEST_StdPQueue
261         CPPUNIT_TEST_SUITE_END();
262     };
263
264 } // namespace queue
265
266 CPPUNIT_TEST_SUITE_REGISTRATION(pqueue::PQueue_PushPop);