Fixed missing stat measuring in DHP
[libcds.git] / src / dhp.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include <algorithm>
32 #include <vector>
33
34 #include <cds/gc/dhp.h>
35 #include <cds/os/thread.h>
36
37 namespace cds { namespace gc { namespace dhp {
38
39     namespace {
40         void * default_alloc_memory( size_t size )
41         {
42             return new uintptr_t[( size + sizeof( uintptr_t ) - 1 ) / sizeof( uintptr_t )];
43         }
44
45         void default_free_memory( void* p )
46         {
47             delete[] reinterpret_cast<uintptr_t*>( p );
48         }
49
50         struct defaults {
51             static size_t const c_extended_guard_block_size = 16;
52         };
53
54         void* ( *s_alloc_memory )( size_t size ) = default_alloc_memory;
55         void( *s_free_memory )( void* p ) = default_free_memory;
56
57         template <typename T>
58         class allocator
59         {
60         public:
61             typedef T   value_type;
62
63             allocator() {}
64             allocator( allocator const& ) {}
65             template <class U>
66             explicit allocator( allocator<U> const& ) {}
67
68             static T* allocate( size_t nCount )
69             {
70                 return reinterpret_cast<T*>( s_alloc_memory( sizeof( value_type ) * nCount ));
71             }
72
73             static void deallocate( T* p, size_t /*nCount*/ )
74             {
75                 s_free_memory( reinterpret_cast<void*>( p ));
76             }
77         };
78
79         stat s_postmortem_stat;
80     } // namespace
81
82     /*static*/ CDS_EXPORT_API smr* smr::instance_ = nullptr;
83     thread_local thread_data* tls_ = nullptr;
84
85     CDS_EXPORT_API hp_allocator::~hp_allocator()
86     {
87         while ( guard_block* gp = static_cast<guard_block*>( free_list_.get())) {
88             gp->~guard_block();
89             s_free_memory( gp );
90         }
91     }
92
93     CDS_EXPORT_API guard_block* hp_allocator::alloc()
94     {
95         guard_block* gb;
96         auto block = free_list_.get();
97         if ( block )
98             gb = static_cast< guard_block* >( block );
99         else {
100             // allocate new block
101             gb = new( s_alloc_memory( sizeof( guard_block ) + sizeof( guard ) * defaults::c_extended_guard_block_size )) guard_block;
102             new ( gb->first() ) guard[defaults::c_extended_guard_block_size];
103
104             CDS_HPSTAT( block_allocated_.fetch_add( 1, atomics::memory_order_relaxed ));
105         }
106
107         // links guards in the block
108         guard* p = gb->first();
109         for ( guard* last = p + defaults::c_extended_guard_block_size - 1; p != last; ++p ) {
110             p->clear( atomics::memory_order_relaxed );
111             p->next_ = p + 1;
112         }
113         p->next_ = nullptr;
114         p->clear();
115
116         return gb;
117     }
118
119     CDS_EXPORT_API retired_allocator::~retired_allocator()
120     {
121         while ( retired_block* rb = static_cast<retired_block*>( free_list_.get() ) ) {
122             rb->~retired_block();
123             s_free_memory( rb );
124         }
125     }
126
127     CDS_EXPORT_API retired_block* retired_allocator::alloc()
128     {
129         retired_block* rb;
130         auto block = free_list_.get();
131         if ( block )
132             rb = static_cast< retired_block* >( block );
133         else {
134             // allocate new block
135             rb = new( s_alloc_memory( sizeof( retired_block ) + sizeof( retired_ptr ) * retired_block::c_capacity )) retired_block;
136             new ( rb->first()) retired_ptr[retired_block::c_capacity];
137             CDS_HPSTAT( block_allocated_.fetch_add( 1, atomics::memory_order_relaxed ) );
138         }
139
140         rb->next_ = nullptr;
141         return rb;
142     }
143
144     struct smr::thread_record: thread_data
145     {
146         atomics::atomic<thread_record*>     m_pNextNode; ///< next hazard ptr record in list
147         atomics::atomic<cds::OS::ThreadId>  m_idOwner;   ///< Owner thread id; 0 - the record is free (not owned)
148         atomics::atomic<bool>               m_bFree;     ///< true if record is free (not owned)
149
150         thread_record( guard* guards, size_t guard_count )
151             : thread_data( guards, guard_count )
152             , m_bFree( false )
153         {}
154     };
155
156     /*static*/ CDS_EXPORT_API thread_data* smr::tls()
157     {
158         assert( tls_ != nullptr );
159         return tls_;
160     }
161
162     /*static*/ CDS_EXPORT_API void smr::set_memory_allocator(
163         void* ( *alloc_func )( size_t size ),
164         void( *free_func )( void * p )
165     )
166     {
167         // The memory allocation functions may be set BEFORE initializing DHP SMR!!!
168         assert( instance_ == nullptr );
169
170         s_alloc_memory = alloc_func;
171         s_free_memory = free_func;
172     }
173
174     /*static*/ CDS_EXPORT_API void smr::construct( size_t nInitialHazardPtrCount )
175     {
176         if ( !instance_ ) {
177             instance_ = new( s_alloc_memory( sizeof( smr ))) smr( nInitialHazardPtrCount );
178         }
179     }
180
181     /*static*/ CDS_EXPORT_API void smr::destruct( bool bDetachAll )
182     {
183         if ( instance_ ) {
184             if ( bDetachAll )
185                 instance_->detach_all_thread();
186
187             instance_->~smr();
188             s_free_memory( instance_ );
189             instance_ = nullptr;
190         }
191     }
192
193     CDS_EXPORT_API smr::smr( size_t nInitialHazardPtrCount )
194         : thread_list_( nullptr )
195         , initial_hazard_count_( nInitialHazardPtrCount < 4 ? 16 : nInitialHazardPtrCount )
196         , last_plist_size_( initial_hazard_count_ * 64 )
197     {}
198
199     CDS_EXPORT_API smr::~smr()
200     {
201         CDS_DEBUG_ONLY( const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId; )
202         CDS_DEBUG_ONLY( const cds::OS::ThreadId mainThreadId = cds::OS::get_current_thread_id(); )
203
204         CDS_HPSTAT( statistics( s_postmortem_stat ) );
205
206         thread_record* pHead = thread_list_.load( atomics::memory_order_relaxed );
207         thread_list_.store( nullptr, atomics::memory_order_relaxed );
208
209         thread_record* pNext = nullptr;
210         for ( thread_record* hprec = pHead; hprec; hprec = pNext )
211         {
212             assert( hprec->m_idOwner.load( atomics::memory_order_relaxed ) == nullThreadId
213                 || hprec->m_idOwner.load( atomics::memory_order_relaxed ) == mainThreadId
214                 || !cds::OS::is_thread_alive( hprec->m_idOwner.load( atomics::memory_order_relaxed ) )
215             );
216
217             retired_array& retired = hprec->retired_;
218
219             // delete retired data
220             for ( retired_block* block = retired.list_head_; block && block != retired.current_block_; block = block->next_ ) {
221                 for ( retired_ptr* p = block->first(); p != block->last(); ++p ) {
222                     p->free();
223                     CDS_HPSTAT( ++s_postmortem_stat.free_count );
224                 }
225             }
226             if ( retired.current_block_ ) {
227                 for ( retired_ptr* p = retired.current_block_->first(); p != retired.current_cell_; ++p ) {
228                     p->free();
229                     CDS_HPSTAT( ++s_postmortem_stat.free_count );
230                 }
231             }
232             hprec->retired_.fini();
233             hprec->hazards_.clear();
234
235             pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
236             hprec->m_bFree.store( true, atomics::memory_order_relaxed );
237             destroy_thread_data( hprec );
238         }
239     }
240
241     /*static*/ CDS_EXPORT_API void smr::attach_thread()
242     {
243         if ( !tls_ )
244             tls_ = instance().alloc_thread_data();
245     }
246
247     /*static*/ CDS_EXPORT_API void smr::detach_thread()
248     {
249         thread_data* rec = tls_;
250         if ( rec ) {
251             tls_ = nullptr;
252             instance().free_thread_data( static_cast<thread_record*>( rec ) );
253         }
254     }
255
256     CDS_EXPORT_API void smr::detach_all_thread()
257     {
258         thread_record * pNext = nullptr;
259         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
260
261         for ( thread_record * hprec = thread_list_.load( atomics::memory_order_relaxed ); hprec; hprec = pNext ) {
262             pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
263             if ( hprec->m_idOwner.load( atomics::memory_order_relaxed ) != nullThreadId ) {
264                 free_thread_data( hprec );
265             }
266         }
267     }
268
269     CDS_EXPORT_API smr::thread_record* smr::create_thread_data()
270     {
271         size_t const guard_array_size = sizeof( guard ) * initial_hazard_count_;
272
273         /*
274             The memory is allocated by contnuous block
275             Memory layout:
276             +--------------------------+
277             |                          |
278             | thread_record            |
279             |         hazards_         +---+
280             |         retired_         |   |
281             |                          |   |
282             |--------------------------|   |
283             | hazard_ptr[]             |<--+
284             |  initial HP array        |
285             |                          |
286             +--------------------------+
287         */
288
289         char* mem = reinterpret_cast<char*>( s_alloc_memory( sizeof( thread_record ) + guard_array_size ));
290         return new( mem ) thread_record(
291             reinterpret_cast<guard*>( mem + sizeof( thread_record ) ), initial_hazard_count_
292         );
293     }
294
295     /*static*/ CDS_EXPORT_API void smr::destroy_thread_data( thread_record* pRec )
296     {
297         // all retired pointers must be freed
298         pRec->~thread_record();
299         s_free_memory( pRec );
300     }
301
302     CDS_EXPORT_API smr::thread_record* smr::alloc_thread_data()
303     {
304         thread_record * hprec = nullptr;
305         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
306         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
307
308         // First try to reuse a free (non-active) DHP record
309         for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) ) {
310             cds::OS::ThreadId thId = nullThreadId;
311             if ( !hprec->m_idOwner.compare_exchange_strong( thId, curThreadId, atomics::memory_order_relaxed, atomics::memory_order_relaxed ) )
312                 continue;
313             hprec->m_bFree.store( false, atomics::memory_order_release );
314             break;
315         }
316         
317         if ( !hprec ) {
318             // No HP records available for reuse
319             // Allocate and push a new HP record
320             hprec = create_thread_data();
321             hprec->m_idOwner.store( curThreadId, atomics::memory_order_relaxed );
322
323             thread_record* pOldHead = thread_list_.load( atomics::memory_order_relaxed );
324             do {
325                 hprec->m_pNextNode.store( pOldHead, atomics::memory_order_relaxed );
326             } while ( !thread_list_.compare_exchange_weak( pOldHead, hprec, atomics::memory_order_release, atomics::memory_order_acquire ) );
327         }
328
329         hprec->hazards_.init();
330         hprec->retired_.init();
331
332         return hprec;
333     }
334
335     CDS_EXPORT_API void smr::free_thread_data( thread_record* pRec )
336     {
337         assert( pRec != nullptr );
338         //CDS_HAZARDPTR_STATISTIC( ++m_Stat.m_RetireHPRec )
339
340         pRec->hazards_.clear();
341         scan( pRec );
342         help_scan( pRec );
343
344         if ( pRec->retired_.empty() ) {
345             pRec->retired_.fini();
346             pRec->m_bFree.store( true, std::memory_order_release );
347         }
348         else {
349             // Free all empty blocks
350             retired_block* free_block = pRec->retired_.current_block_->next_;
351             if ( free_block ) {
352                 pRec->retired_.current_block_->next_ = nullptr;
353                 while ( free_block ) {
354                     retired_block* next = free_block->next_;
355                     retired_allocator_.free( free_block );
356                     free_block = next;
357                     --pRec->retired_.block_count_;
358                 }
359             }
360         }
361
362         pRec->m_idOwner.store( cds::OS::c_NullThreadId, atomics::memory_order_release );
363     }
364
365     namespace {
366         typedef std::vector<void*, allocator<void*>> hp_vector;
367
368         inline void copy_hazards( hp_vector& vect, guard const* arr, size_t size )
369         {
370             for ( guard const* end = arr + size; arr != end; ++arr ) {
371                 void* hp = arr->get();
372                 if ( hp )
373                     vect.push_back( hp );
374             }
375         }
376
377         inline size_t retire_data( hp_vector const& plist, retired_array& stg, retired_block* block, size_t block_size )
378         {
379             auto hp_begin = plist.begin();
380             auto hp_end = plist.end();
381             size_t count = 0;
382
383             for ( retired_ptr* p = block->first(), *end = p + block_size; p != end; ++p ) {
384                 if ( cds_unlikely( std::binary_search( hp_begin, hp_end, p->m_p )))
385                     stg.repush( p );
386                 else {
387                     p->free();
388                     ++count;
389                 }
390             }
391
392             return count;
393         }
394
395     } // namespace
396
397     CDS_EXPORT_API void smr::scan( thread_data* pThreadRec )
398     {
399         thread_record* pRec = static_cast<thread_record*>( pThreadRec );
400
401         CDS_HPSTAT( ++pRec->scan_call_count_ );
402
403         hp_vector plist;
404         size_t plist_size = last_plist_size_.load( std::memory_order_relaxed );
405         plist.reserve( plist_size );
406
407         // Stage 1: Scan HP list and insert non-null values in plist
408         thread_record* pNode = thread_list_.load( atomics::memory_order_acquire );
409         while ( pNode ) {
410             if ( pNode->m_idOwner.load( std::memory_order_relaxed ) != cds::OS::c_NullThreadId ) {
411                 copy_hazards( plist, pNode->hazards_.array_, pNode->hazards_.initial_capacity_ );
412
413                 for ( guard_block* block = pNode->hazards_.extended_list_; block; block = block->next_ )
414                     copy_hazards( plist, block->first(), defaults::c_extended_guard_block_size );
415             }
416
417             pNode = pNode->m_pNextNode.load( atomics::memory_order_relaxed );
418         }
419
420         // Store plist size for next scan() call (vector reallocation optimization)
421         if ( plist.size() > plist_size )
422             last_plist_size_.compare_exchange_weak( plist_size, plist.size(), std::memory_order_relaxed, std::memory_order_relaxed );
423
424         // Sort plist to simplify search in
425         std::sort( plist.begin(), plist.end() );
426
427         // Stage 2: Search plist
428         size_t free_count = 0;
429         size_t retired_count = 0;
430         retired_block* last_block = pRec->retired_.current_block_;
431         retired_ptr*   last_block_cell = pRec->retired_.current_cell_;
432
433         pRec->retired_.current_block_ = pRec->retired_.list_head_;
434         pRec->retired_.current_cell_ = pRec->retired_.current_block_->first();
435
436         for ( retired_block* block = pRec->retired_.list_head_; block; block = block->next_ ) {
437             bool const end_block = block == last_block;
438             size_t const size = end_block ? last_block_cell - block->first() : retired_block::c_capacity;
439
440             retired_count += retired_block::c_capacity;
441             free_count += retire_data( plist, pRec->retired_, block, size );
442
443             if ( end_block )
444                 break;
445         }
446         CDS_HPSTAT( pRec->free_call_count_ += free_count );
447
448         // If the count of freed elements is too small, increase retired array
449         if ( free_count < retired_count / 4 && last_block == pRec->retired_.list_tail_ && last_block_cell == last_block->last() )
450             pRec->retired_.extend();
451     }
452
453     CDS_EXPORT_API void smr::help_scan( thread_data* pThis )
454     {
455         assert( static_cast<thread_record*>( pThis )->m_idOwner.load( atomics::memory_order_relaxed ) == cds::OS::get_current_thread_id() );
456         CDS_HPSTAT( ++pThis->help_scan_call_count_ );
457
458         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
459         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
460         for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) )
461         {
462             // If m_bFree == true then hprec->retired_ is empty - we don't need to see it
463             if ( hprec->m_bFree.load( atomics::memory_order_acquire ) ) {
464                 assert( hprec->retired_.empty() );
465                 continue;
466             }
467
468             // Owns hprec
469             // Several threads may work concurrently so we use atomic technique
470             {
471                 cds::OS::ThreadId curOwner = hprec->m_idOwner.load( atomics::memory_order_relaxed );
472                 if ( curOwner == nullThreadId || !cds::OS::is_thread_alive( curOwner ) ) {
473                     if ( !hprec->m_idOwner.compare_exchange_strong( curOwner, curThreadId, atomics::memory_order_acquire, atomics::memory_order_relaxed ) )
474                         continue;
475                 }
476                 else
477                     continue;
478             }
479
480             // We own the thread record successfully. Now, we can see whether it has retired pointers.
481             // If it has ones then we move to pThis that is private for current thread.
482             retired_array& src = hprec->retired_;
483             retired_array& dest = pThis->retired_;
484
485             for ( retired_block* block = src.list_head_; block; block = block->next_ ) {
486                 retired_ptr* last = block == src.current_block_ ? src.current_cell_ : block->last();
487                 for ( retired_ptr* p = block->first(); p != last; ++p ) {
488                     if ( !dest.push( *p ) )
489                         scan( pThis );
490                 }
491
492                 if ( block == src.current_block_ )
493                     break;
494             }
495
496             src.fini();
497             hprec->m_bFree.store( true, atomics::memory_order_relaxed );
498             hprec->m_idOwner.store( nullThreadId, atomics::memory_order_release );
499         }
500
501         scan( pThis );
502     }
503
504     void smr::statistics( stat& st )
505     {
506         st.clear();
507 #   ifdef CDS_ENABLE_HPSTAT
508         for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) )
509         {
510             ++st.thread_rec_count;
511             st.guard_allocated      += hprec->hazards_.alloc_guard_count_;
512             st.guard_freed          += hprec->hazards_.free_guard_count_;
513             st.hp_extend_count      += hprec->hazards_.extend_call_count_;
514             st.retired_count        += hprec->retired_.retire_call_count_;
515             st.retired_extend_count += hprec->retired_.extend_call_count_;
516             st.free_count           += hprec->free_call_count_;
517             st.scan_count           += hprec->scan_call_count_;
518             st.help_scan_count      += hprec->help_scan_call_count_;
519         }
520
521         st.hp_block_count = hp_allocator_.block_allocated_.load( atomics::memory_order_relaxed );
522         st.retired_block_count = retired_allocator_.block_allocated_.load( atomics::memory_order_relaxed );
523 #   endif
524     }
525
526
527 }}} // namespace cds::gc::dhp
528
529 /*static*/ cds::gc::DHP::stat const& cds::gc::DHP::postmortem_statistics()
530 {
531     return cds::gc::dhp::s_postmortem_stat;
532 }
533