44baffdb6a96a6d17f0178bc287ea7d6c662d484
[libcds.git] / test / include / cds_test / thread.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13     list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16     this list of conditions and the following disclaimer in the documentation
17     and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSTEST_THREAD_H
32 #define CDSTEST_THREAD_H
33
34 #include <cds_test/ext_gtest.h>
35 #include <vector>
36 #include <thread>
37 #include <condition_variable>
38 #include <mutex>
39 #include <chrono>
40 #include <cds/threading/model.h>
41
42 namespace cds_test {
43
44     // Forwards
45     class thread;
46     class thread_pool;
47
48     // Test thread
49     class thread
50     {
51         void run();
52
53     protected: // thread_pool interface
54         thread( thread const& sample );
55
56         virtual ~thread()
57         {}
58
59     protected:
60         virtual thread * clone() = 0;
61         virtual void test() = 0;
62
63         virtual void SetUp()
64         {
65             cds::threading::Manager::attachThread();
66         }
67
68         virtual void TearDown()
69         {
70             cds::threading::Manager::detachThread();
71         }
72
73     public:
74         explicit thread( thread_pool& master, int type = 0 );
75
76         thread_pool& pool() { return m_pool; }
77         int type() const { return m_type; }
78         size_t id() const { return m_id;  }
79         bool time_elapsed() const;
80
81     private:
82         friend class thread_pool;
83
84         thread_pool&    m_pool;
85         int const       m_type;
86         size_t const    m_id;
87     };
88
89     // Pool of test threads
90     class thread_pool
91     {
92         class barrier
93         {
94         public:
95             barrier()
96                 : m_count( 0 )
97             {}
98
99             void reset( size_t count )
100             {
101                 std::unique_lock< std::mutex > lock( m_mtx );
102                 m_count = count;
103             }
104
105             bool wait()
106             {
107                 std::unique_lock< std::mutex > lock( m_mtx );
108                 if ( --m_count == 0 ) {
109                     m_cv.notify_all();
110                     return true;
111                 }
112
113                 while ( m_count != 0 )
114                     m_cv.wait( lock );
115
116                 return false;
117             }
118
119         private:
120             size_t      m_count;
121             std::mutex  m_mtx;
122             std::condition_variable m_cv;
123         };
124
125         class initial_gate
126         {
127         public:
128             initial_gate()
129                 : m_ready( false )
130             {}
131
132             void wait()
133             {
134                 std::unique_lock< std::mutex > lock( m_mtx );
135                 while ( !m_ready )
136                     m_cv.wait( lock );
137             }
138
139             void ready()
140             {
141                 std::unique_lock< std::mutex > lock( m_mtx );
142                 m_ready = true;
143                 m_cv.notify_all();
144             }
145
146             void reset()
147             {
148                 std::unique_lock< std::mutex > lock( m_mtx );
149                 m_ready = false;
150             }
151
152         private:
153             std::mutex  m_mtx;
154             std::condition_variable m_cv;
155             bool        m_ready;
156         };
157
158     public:
159         explicit thread_pool( ::testing::Test& fixture )
160             : m_fixture( fixture )
161             , m_bTimeElapsed( false )
162         {}
163
164         ~thread_pool()
165         {
166             clear();
167         }
168
169         void add( thread * what )
170         {
171             m_workers.push_back( what );
172         }
173
174         void add( thread * what, size_t count )
175         {
176             add( what );
177             for ( size_t i = 1; i < count; ++i ) {
178                 thread * p = what->clone();
179                 add( p );
180             }
181         }
182
183         std::chrono::milliseconds run()
184         {
185             return run( std::chrono::seconds::zero());
186         }
187
188         std::chrono::milliseconds run( std::chrono::seconds duration )
189         {
190             m_startBarrier.reset( m_workers.size() + 1 );
191             m_stopBarrier.reset( m_workers.size() + 1 );
192
193             // Create threads
194             std::vector< std::thread > threads;
195             threads.reserve( m_workers.size());
196             for ( auto w : m_workers )
197                 threads.emplace_back( &thread::run, w );
198
199             // The pool is intialized
200             m_startPoint.ready();
201
202             m_bTimeElapsed.store( false, std::memory_order_release );
203
204             auto native_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration);
205
206             // The pool is ready to start all workers
207             m_startBarrier.wait();
208
209             auto time_start = std::chrono::steady_clock::now();
210             auto const expected_end = time_start + native_duration;
211
212             if ( duration != std::chrono::seconds::zero()) {
213                 for ( ;; ) {
214                     std::this_thread::sleep_for( native_duration );
215                     auto time_now = std::chrono::steady_clock::now();
216                     if ( time_now >= expected_end )
217                         break;
218                     native_duration = expected_end - time_now;
219                 }
220             }
221             m_bTimeElapsed.store( true, std::memory_order_release );
222
223             // Waiting for all workers done
224             m_stopBarrier.wait();
225
226             auto time_end = std::chrono::steady_clock::now();
227
228             for ( auto& t : threads )
229                 t.join();
230
231             return m_testDuration = std::chrono::duration_cast<std::chrono::milliseconds>(time_end - time_start);
232         }
233
234         size_t size() const             { return m_workers.size(); }
235         thread& get( size_t idx ) const { return *m_workers.at( idx ); }
236
237         template <typename Fixture>
238         Fixture& fixture()
239         {
240             return static_cast<Fixture&>(m_fixture);
241         }
242
243         std::chrono::milliseconds duration() const { return m_testDuration; }
244
245         void clear()
246         {
247             for ( auto t : m_workers )
248                 delete t;
249             m_workers.clear();
250             m_startPoint.reset();
251         }
252
253         void reset()
254         {
255             clear();
256         }
257
258     protected: // thread interface
259         size_t get_next_id()
260         {
261             return m_workers.size();
262         }
263
264         void ready_to_start( thread& /*who*/ )
265         {
266             // Called from test thread
267
268             // Wait until the pool is ready
269             m_startPoint.wait();
270
271             // Wait until all thread ready
272             m_startBarrier.wait();
273         }
274
275         void thread_done( thread& /*who*/ )
276         {
277             // Called from test thread
278             m_stopBarrier.wait();
279         }
280
281     private:
282         friend class thread;
283
284         ::testing::Test&        m_fixture;
285         std::vector<thread *>   m_workers;
286
287         initial_gate            m_startPoint;
288         barrier                 m_startBarrier;
289         barrier                 m_stopBarrier;
290
291         std::atomic<bool> m_bTimeElapsed;
292         std::chrono::milliseconds m_testDuration;
293     };
294
295     inline thread::thread( thread_pool& master, int type /*= 0*/ )
296         : m_pool( master )
297         , m_type( type )
298         , m_id( master.get_next_id())
299     {}
300
301     inline thread::thread( thread const& sample )
302         : m_pool( sample.m_pool )
303         , m_type( sample.m_type )
304         , m_id( m_pool.get_next_id())
305     {}
306
307     inline void thread::run()
308     {
309         SetUp();
310         m_pool.ready_to_start( *this );
311         test();
312         m_pool.thread_done( *this );
313         TearDown();
314     }
315
316     inline bool thread::time_elapsed() const
317     {
318         return m_pool.m_bTimeElapsed.load( std::memory_order_acquire );
319     }
320
321 } // namespace cds_test
322
323 #endif // CDSTEST_THREAD_H