Added copyright and license
[libcds.git] / tests / unit / pqueue / push.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #include "cppunit/thread.h"
32 #include "pqueue/pqueue_item.h"
33 #include "pqueue/pqueue_type.h"
34
35 #include <vector>
36 #include <memory>
37
38 namespace pqueue {
39
40 #define TEST_CASE( Q ) void Q() { test< Types<pqueue::SimpleValue>::Q >(); }
41 #define TEST_BOUNDED( Q ) void Q() { test_bounded< Types<pqueue::SimpleValue>::Q >(); }
42
43     namespace {
44         static size_t s_nThreadCount = 8;
45         static size_t s_nQueueSize = 2000000;
46     }
47 } // namespace pqueue
48
49 namespace pqueue {
50
51     class PQueue_Push: public CppUnitMini::TestCase
52     {
53         template <class PQueue>
54         class Pusher: public CppUnitMini::TestThread
55         {
56             virtual TestThread *    clone()
57             {
58                 return new Pusher( *this );
59             }
60         public:
61             PQueue&             m_Queue;
62             size_t              m_nPushError;
63
64             typedef std::vector<size_t> array_type;
65             array_type          m_arr;
66
67         public:
68             Pusher( CppUnitMini::ThreadPool& pool, PQueue& q )
69                 : CppUnitMini::TestThread( pool )
70                 , m_Queue( q )
71             {}
72             Pusher( Pusher& src )
73                 : CppUnitMini::TestThread( src )
74                 , m_Queue( src.m_Queue )
75             {}
76
77             PQueue_Push&  getTest()
78             {
79                 return static_cast<PQueue_Push&>( m_Pool.m_Test );
80             }
81
82             virtual void init()
83             {
84                 cds::threading::Manager::attachThread();
85             }
86             virtual void fini()
87             {
88                 cds::threading::Manager::detachThread();
89             }
90
91             virtual void test()
92             {
93                 m_nPushError = 0;
94
95                 for ( array_type::const_iterator it = m_arr.begin(); it != m_arr.end(); ++it ) {
96                     if ( !m_Queue.push( SimpleValue( *it ) ))
97                         ++m_nPushError;
98                 }
99             }
100
101             void prepare( size_t nStart, size_t nEnd )
102             {
103                 m_arr.reserve( nEnd - nStart );
104                 for ( size_t i = nStart; i < nEnd; ++i )
105                     m_arr.push_back( i );
106                 shuffle( m_arr.begin(), m_arr.end() );
107             }
108         };
109
110     protected:
111         template <class PQueue>
112         void analyze( CppUnitMini::ThreadPool& pool, PQueue& testQueue  )
113         {
114             size_t nThreadItems = s_nQueueSize / s_nThreadCount;
115             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
116                 Pusher<PQueue> * pThread = static_cast<Pusher<PQueue> *>(*it);
117                 CPPUNIT_CHECK_EX( pThread->m_nPushError == 0, "Thread push error count=" << pThread->m_nPushError );
118             }
119             CPPUNIT_MSG( "     Duration=" << pool.avgDuration() );
120             CPPUNIT_ASSERT( !testQueue.empty() );
121
122             typedef std::vector<size_t> vector_type;
123             vector_type arr;
124             arr.reserve( s_nQueueSize );
125
126             cds::OS::Timer      timer;
127             CPPUNIT_MSG( "   Pop (single-threaded)..." );
128             size_t nPopped = 0;
129             SimpleValue val;
130             while ( testQueue.pop( val )) {
131                 nPopped++;
132                 arr.push_back( val.key );
133             }
134             CPPUNIT_MSG( "     Duration=" << timer.duration() );
135
136             CPPUNIT_CHECK( arr.size() == nThreadItems * s_nThreadCount );
137             vector_type::const_iterator it = arr.begin();
138             size_t nPrev = *it;
139             ++it;
140             size_t nErrCount = 0;
141             for ( vector_type::const_iterator itEnd = arr.end(); it != itEnd; ++it ) {
142                 if ( nPrev - 1 != *it ) {
143                     if ( ++nErrCount < 10 ) {
144                         CPPUNIT_CHECK_EX( nPrev - 1 == *it, "Expected=" << nPrev - 1 << ", current=" << *it );
145                     }
146                 }
147                 nPrev = *it;
148             }
149
150             CPPUNIT_CHECK_EX( nErrCount == 0, "Error count=" << nErrCount );
151         }
152
153         template <class PQueue>
154         void test()
155         {
156             PQueue testQueue;
157
158             CppUnitMini::ThreadPool pool( *this );
159             pool.add( new Pusher<PQueue>( pool, testQueue ), s_nThreadCount );
160
161             size_t nStart = 0;
162             size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
163             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
164                 Pusher<PQueue> * pThread = static_cast<Pusher<PQueue> *>(*it);
165                 pThread->prepare( nStart, nStart + nThreadItemCount );
166                 nStart += nThreadItemCount;
167             }
168
169             CPPUNIT_MSG( "   Push test, thread count=" << s_nThreadCount << ", item count=" << nThreadItemCount * s_nThreadCount << " ..." );
170             pool.run();
171
172             analyze( pool, testQueue );
173
174             CPPUNIT_MSG( testQueue.statistics() );
175         }
176
177         template <class PQueue>
178         void test_bounded()
179         {
180             size_t nStart = 0;
181             size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
182
183             std::unique_ptr<PQueue> pq( new PQueue(s_nQueueSize) );
184
185             CppUnitMini::ThreadPool pool( *this );
186             pool.add( new Pusher<PQueue>( pool, *pq ), s_nThreadCount );
187
188             for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
189                 Pusher<PQueue> * pThread = static_cast<Pusher<PQueue> *>(*it);
190                 pThread->prepare( nStart, nStart + nThreadItemCount );
191                 nStart += nThreadItemCount;
192             }
193
194             CPPUNIT_MSG( "   Push test, thread count=" << s_nThreadCount << ", item count=" << nThreadItemCount * s_nThreadCount << " ..." );
195             pool.run();
196
197             analyze( pool, *pq );
198
199             CPPUNIT_MSG( pq->statistics() );
200         }
201
202         void setUpParams( const CppUnitMini::TestCfg& cfg ) {
203             s_nThreadCount = cfg.getULong("ThreadCount", (unsigned long) s_nThreadCount );
204             s_nQueueSize = cfg.getULong("QueueSize", (unsigned long) s_nQueueSize );
205         }
206
207     protected:
208 #include "pqueue/pqueue_defs.h"
209         CDSUNIT_DECLARE_MSPriorityQueue
210         CDSUNIT_DECLARE_EllenBinTree
211         CDSUNIT_DECLARE_SkipList
212         CDSUNIT_DECLARE_FCPriorityQueue
213         CDSUNIT_DECLARE_StdPQueue
214
215         CPPUNIT_TEST_SUITE(PQueue_Push)
216             CDSUNIT_TEST_MSPriorityQueue
217             CDSUNIT_TEST_EllenBinTree
218             CDSUNIT_TEST_SkipList
219             CDSUNIT_TEST_FCPriorityQueue
220             CDUNIT_TEST_StdPQueue
221         CPPUNIT_TEST_SUITE_END();
222     };
223
224 } // namespace queue
225
226 CPPUNIT_TEST_SUITE_REGISTRATION(pqueue::PQueue_Push);