0991de10da0b060166c1907462dc39da7f565064
[libcds.git] / src / dhp.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include <algorithm>
32 #include <vector>
33
34 #include <cds/gc/dhp.h>
35 #include <cds/os/thread.h>
36
37 namespace cds { namespace gc { namespace dhp {
38
39     namespace {
40         void * default_alloc_memory( size_t size )
41         {
42             return new uintptr_t[( size + sizeof( uintptr_t ) - 1 ) / sizeof( uintptr_t )];
43         }
44
45         void default_free_memory( void* p )
46         {
47             delete[] reinterpret_cast<uintptr_t*>( p );
48         }
49
50         struct defaults {
51             static size_t const c_extended_guard_block_size = 16;
52         };
53
54         void* ( *s_alloc_memory )( size_t size ) = default_alloc_memory;
55         void( *s_free_memory )( void* p ) = default_free_memory;
56
57         template <typename T>
58         class allocator
59         {
60         public:
61             typedef T   value_type;
62
63             allocator() {}
64             allocator( allocator const& ) {}
65             template <class U>
66             explicit allocator( allocator<U> const& ) {}
67
68             static T* allocate( size_t nCount )
69             {
70                 return reinterpret_cast<T*>( s_alloc_memory( sizeof( value_type ) * nCount ));
71             }
72
73             static void deallocate( T* p, size_t /*nCount*/ )
74             {
75                 s_free_memory( reinterpret_cast<void*>( p ));
76             }
77         };
78
79         stat s_postmortem_stat;
80     } // namespace
81
82     /*static*/ CDS_EXPORT_API smr* smr::instance_ = nullptr;
83     thread_local thread_data* tls_ = nullptr;
84
85     CDS_EXPORT_API hp_allocator::~hp_allocator()
86     {
87         while ( guard_block* gp = static_cast<guard_block*>( free_list_.get())) {
88             gp->~guard_block();
89             s_free_memory( gp );
90         }
91     }
92
93     CDS_EXPORT_API guard_block* hp_allocator::alloc()
94     {
95         guard_block* gb;
96         auto block = free_list_.get();
97         if ( block )
98             gb = static_cast< guard_block* >( block );
99         else {
100             // allocate new block
101             gb = new( s_alloc_memory( sizeof( guard_block ) + sizeof( guard ) * defaults::c_extended_guard_block_size )) guard_block;
102             new ( gb->first() ) guard[defaults::c_extended_guard_block_size];
103
104             CDS_HPSTAT( block_allocated_.fetch_add( 1, atomics::memory_order_relaxed ));
105         }
106
107         // links guards in the block
108         guard* p = gb->first();
109         for ( guard* last = p + defaults::c_extended_guard_block_size - 1; p != last; ++p ) {
110             p->clear( atomics::memory_order_relaxed );
111             p->next_ = p + 1;
112         }
113         p->next_ = nullptr;
114         p->clear();
115
116         return gb;
117     }
118
119     CDS_EXPORT_API retired_allocator::~retired_allocator()
120     {
121         while ( retired_block* rb = static_cast<retired_block*>( free_list_.get() ) ) {
122             rb->~retired_block();
123             s_free_memory( rb );
124         }
125     }
126
127     CDS_EXPORT_API retired_block* retired_allocator::alloc()
128     {
129         retired_block* rb;
130         auto block = free_list_.get();
131         if ( block )
132             rb = static_cast< retired_block* >( block );
133         else {
134             // allocate new block
135             rb = new( s_alloc_memory( sizeof( retired_block ) + sizeof( retired_ptr ) * retired_block::c_capacity )) retired_block;
136             new ( rb->first()) retired_ptr[retired_block::c_capacity];
137             CDS_HPSTAT( block_allocated_.fetch_add( 1, atomics::memory_order_relaxed ) );
138         }
139
140         rb->next_ = nullptr;
141         return rb;
142     }
143
144     struct smr::thread_record: thread_data
145     {
146         atomics::atomic<thread_record*>     m_pNextNode; ///< next hazard ptr record in list
147         atomics::atomic<cds::OS::ThreadId>  m_idOwner;   ///< Owner thread id; 0 - the record is free (not owned)
148         atomics::atomic<bool>               m_bFree;     ///< true if record is free (not owned)
149
150         thread_record( guard* guards, size_t guard_count )
151             : thread_data( guards, guard_count )
152             , m_bFree( false )
153         {}
154     };
155
156     /*static*/ CDS_EXPORT_API thread_data* smr::tls()
157     {
158         assert( tls_ != nullptr );
159         return tls_;
160     }
161
162     /*static*/ CDS_EXPORT_API void smr::set_memory_allocator(
163         void* ( *alloc_func )( size_t size ),
164         void( *free_func )( void * p )
165     )
166     {
167         // The memory allocation functions may be set BEFORE initializing DHP SMR!!!
168         assert( instance_ == nullptr );
169
170         s_alloc_memory = alloc_func;
171         s_free_memory = free_func;
172     }
173
174     /*static*/ CDS_EXPORT_API void smr::construct( size_t nInitialHazardPtrCount )
175     {
176         if ( !instance_ ) {
177             instance_ = new( s_alloc_memory( sizeof( smr ))) smr( nInitialHazardPtrCount );
178         }
179     }
180
181     /*static*/ CDS_EXPORT_API void smr::destruct( bool bDetachAll )
182     {
183         if ( instance_ ) {
184             if ( bDetachAll )
185                 instance_->detach_all_thread();
186
187             instance_->~smr();
188             s_free_memory( instance_ );
189             instance_ = nullptr;
190         }
191     }
192
193     CDS_EXPORT_API smr::smr( size_t nInitialHazardPtrCount )
194         : thread_list_( nullptr )
195         , initial_hazard_count_( nInitialHazardPtrCount < 4 ? 16 : nInitialHazardPtrCount )
196         , last_plist_size_( initial_hazard_count_ * 64 )
197     {}
198
199     CDS_EXPORT_API smr::~smr()
200     {
201         CDS_DEBUG_ONLY( const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId; )
202         CDS_DEBUG_ONLY( const cds::OS::ThreadId mainThreadId = cds::OS::get_current_thread_id(); )
203
204         CDS_HPSTAT( statistics( s_postmortem_stat ) );
205
206         thread_record* pHead = thread_list_.load( atomics::memory_order_relaxed );
207         thread_list_.store( nullptr, atomics::memory_order_relaxed );
208
209         thread_record* pNext = nullptr;
210         for ( thread_record* hprec = pHead; hprec; hprec = pNext )
211         {
212             assert( hprec->m_idOwner.load( atomics::memory_order_relaxed ) == nullThreadId
213                 || hprec->m_idOwner.load( atomics::memory_order_relaxed ) == mainThreadId )
214             );
215
216             retired_array& retired = hprec->retired_;
217
218             // delete retired data
219             for ( retired_block* block = retired.list_head_; block && block != retired.current_block_; block = block->next_ ) {
220                 for ( retired_ptr* p = block->first(); p != block->last(); ++p ) {
221                     p->free();
222                     CDS_HPSTAT( ++s_postmortem_stat.free_count );
223                 }
224             }
225             if ( retired.current_block_ ) {
226                 for ( retired_ptr* p = retired.current_block_->first(); p != retired.current_cell_; ++p ) {
227                     p->free();
228                     CDS_HPSTAT( ++s_postmortem_stat.free_count );
229                 }
230             }
231             hprec->retired_.fini();
232             hprec->hazards_.clear();
233
234             pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
235             hprec->m_bFree.store( true, atomics::memory_order_relaxed );
236             destroy_thread_data( hprec );
237         }
238     }
239
240     /*static*/ CDS_EXPORT_API void smr::attach_thread()
241     {
242         if ( !tls_ )
243             tls_ = instance().alloc_thread_data();
244     }
245
246     /*static*/ CDS_EXPORT_API void smr::detach_thread()
247     {
248         thread_data* rec = tls_;
249         if ( rec ) {
250             tls_ = nullptr;
251             instance().free_thread_data( static_cast<thread_record*>( rec ) );
252         }
253     }
254
255     CDS_EXPORT_API void smr::detach_all_thread()
256     {
257         thread_record * pNext = nullptr;
258         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
259
260         for ( thread_record * hprec = thread_list_.load( atomics::memory_order_relaxed ); hprec; hprec = pNext ) {
261             pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
262             if ( hprec->m_idOwner.load( atomics::memory_order_relaxed ) != nullThreadId ) {
263                 free_thread_data( hprec );
264             }
265         }
266     }
267
268     CDS_EXPORT_API smr::thread_record* smr::create_thread_data()
269     {
270         size_t const guard_array_size = sizeof( guard ) * initial_hazard_count_;
271
272         /*
273             The memory is allocated by contnuous block
274             Memory layout:
275             +--------------------------+
276             |                          |
277             | thread_record            |
278             |         hazards_         +---+
279             |         retired_         |   |
280             |                          |   |
281             |--------------------------|   |
282             | hazard_ptr[]             |<--+
283             |  initial HP array        |
284             |                          |
285             +--------------------------+
286         */
287
288         char* mem = reinterpret_cast<char*>( s_alloc_memory( sizeof( thread_record ) + guard_array_size ));
289         return new( mem ) thread_record(
290             reinterpret_cast<guard*>( mem + sizeof( thread_record ) ), initial_hazard_count_
291         );
292     }
293
294     /*static*/ CDS_EXPORT_API void smr::destroy_thread_data( thread_record* pRec )
295     {
296         // all retired pointers must be freed
297         pRec->~thread_record();
298         s_free_memory( pRec );
299     }
300
301     CDS_EXPORT_API smr::thread_record* smr::alloc_thread_data()
302     {
303         thread_record * hprec = nullptr;
304         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
305         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
306
307         // First try to reuse a free (non-active) DHP record
308         for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) ) {
309             cds::OS::ThreadId thId = nullThreadId;
310             if ( !hprec->m_idOwner.compare_exchange_strong( thId, curThreadId, atomics::memory_order_relaxed, atomics::memory_order_relaxed ) )
311                 continue;
312             hprec->m_bFree.store( false, atomics::memory_order_release );
313             break;
314         }
315         
316         if ( !hprec ) {
317             // No HP records available for reuse
318             // Allocate and push a new HP record
319             hprec = create_thread_data();
320             hprec->m_idOwner.store( curThreadId, atomics::memory_order_relaxed );
321
322             thread_record* pOldHead = thread_list_.load( atomics::memory_order_relaxed );
323             do {
324                 hprec->m_pNextNode.store( pOldHead, atomics::memory_order_relaxed );
325             } while ( !thread_list_.compare_exchange_weak( pOldHead, hprec, atomics::memory_order_release, atomics::memory_order_acquire ) );
326         }
327
328         hprec->hazards_.init();
329         hprec->retired_.init();
330
331         return hprec;
332     }
333
334     CDS_EXPORT_API void smr::free_thread_data( thread_record* pRec )
335     {
336         assert( pRec != nullptr );
337         //CDS_HAZARDPTR_STATISTIC( ++m_Stat.m_RetireHPRec )
338
339         pRec->hazards_.clear();
340         scan( pRec );
341         help_scan( pRec );
342
343         if ( pRec->retired_.empty() ) {
344             pRec->retired_.fini();
345             pRec->m_bFree.store( true, std::memory_order_release );
346         }
347         else {
348             // Free all empty blocks
349             retired_block* free_block = pRec->retired_.current_block_->next_;
350             if ( free_block ) {
351                 pRec->retired_.current_block_->next_ = nullptr;
352                 while ( free_block ) {
353                     retired_block* next = free_block->next_;
354                     retired_allocator_.free( free_block );
355                     free_block = next;
356                     --pRec->retired_.block_count_;
357                 }
358             }
359         }
360
361         pRec->m_idOwner.store( cds::OS::c_NullThreadId, atomics::memory_order_release );
362     }
363
364     namespace {
365         typedef std::vector<void*, allocator<void*>> hp_vector;
366
367         inline void copy_hazards( hp_vector& vect, guard const* arr, size_t size )
368         {
369             for ( guard const* end = arr + size; arr != end; ++arr ) {
370                 void* hp = arr->get();
371                 if ( hp )
372                     vect.push_back( hp );
373             }
374         }
375
376         inline size_t retire_data( hp_vector const& plist, retired_array& stg, retired_block* block, size_t block_size )
377         {
378             auto hp_begin = plist.begin();
379             auto hp_end = plist.end();
380             size_t count = 0;
381
382             for ( retired_ptr* p = block->first(), *end = p + block_size; p != end; ++p ) {
383                 if ( cds_unlikely( std::binary_search( hp_begin, hp_end, p->m_p )))
384                     stg.repush( p );
385                 else {
386                     p->free();
387                     ++count;
388                 }
389             }
390
391             return count;
392         }
393
394     } // namespace
395
396     CDS_EXPORT_API void smr::scan( thread_data* pThreadRec )
397     {
398         thread_record* pRec = static_cast<thread_record*>( pThreadRec );
399
400         CDS_HPSTAT( ++pRec->scan_call_count_ );
401
402         hp_vector plist;
403         size_t plist_size = last_plist_size_.load( std::memory_order_relaxed );
404         plist.reserve( plist_size );
405
406         // Stage 1: Scan HP list and insert non-null values in plist
407         thread_record* pNode = thread_list_.load( atomics::memory_order_acquire );
408         while ( pNode ) {
409             if ( pNode->m_idOwner.load( std::memory_order_relaxed ) != cds::OS::c_NullThreadId ) {
410                 copy_hazards( plist, pNode->hazards_.array_, pNode->hazards_.initial_capacity_ );
411
412                 for ( guard_block* block = pNode->hazards_.extended_list_; block; block = block->next_ )
413                     copy_hazards( plist, block->first(), defaults::c_extended_guard_block_size );
414             }
415
416             pNode = pNode->m_pNextNode.load( atomics::memory_order_relaxed );
417         }
418
419         // Store plist size for next scan() call (vector reallocation optimization)
420         if ( plist.size() > plist_size )
421             last_plist_size_.compare_exchange_weak( plist_size, plist.size(), std::memory_order_relaxed, std::memory_order_relaxed );
422
423         // Sort plist to simplify search in
424         std::sort( plist.begin(), plist.end() );
425
426         // Stage 2: Search plist
427         size_t free_count = 0;
428         size_t retired_count = 0;
429         retired_block* last_block = pRec->retired_.current_block_;
430         retired_ptr*   last_block_cell = pRec->retired_.current_cell_;
431
432         pRec->retired_.current_block_ = pRec->retired_.list_head_;
433         pRec->retired_.current_cell_ = pRec->retired_.current_block_->first();
434
435         for ( retired_block* block = pRec->retired_.list_head_; block; block = block->next_ ) {
436             bool const end_block = block == last_block;
437             size_t const size = end_block ? last_block_cell - block->first() : retired_block::c_capacity;
438
439             retired_count += retired_block::c_capacity;
440             free_count += retire_data( plist, pRec->retired_, block, size );
441
442             if ( end_block )
443                 break;
444         }
445         CDS_HPSTAT( pRec->free_call_count_ += free_count );
446
447         // If the count of freed elements is too small, increase retired array
448         if ( free_count < retired_count / 4 && last_block == pRec->retired_.list_tail_ && last_block_cell == last_block->last() )
449             pRec->retired_.extend();
450     }
451
452     CDS_EXPORT_API void smr::help_scan( thread_data* pThis )
453     {
454         assert( static_cast<thread_record*>( pThis )->m_idOwner.load( atomics::memory_order_relaxed ) == cds::OS::get_current_thread_id() );
455         CDS_HPSTAT( ++pThis->help_scan_call_count_ );
456
457         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
458         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
459         for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) )
460         {
461             if ( hprec == static_cast<thread_record*>( pThis ))
462                 continue;
463
464             // If m_bFree == true then hprec->retired_ is empty - we don't need to see it
465             if ( hprec->m_bFree.load( atomics::memory_order_acquire ) ) {
466                 assert( hprec->retired_.empty() );
467                 continue;
468             }
469
470             // Owns hprec
471             // Several threads may work concurrently so we use atomic technique
472             {
473                 cds::OS::ThreadId curOwner = hprec->m_idOwner.load( atomics::memory_order_relaxed );
474                 if ( curOwner == nullThreadId ) {
475                     if ( !hprec->m_idOwner.compare_exchange_strong( curOwner, curThreadId, atomics::memory_order_acquire, atomics::memory_order_relaxed ) )
476                         continue;
477                 }
478                 else
479                     continue;
480             }
481
482             // We own the thread record successfully. Now, we can see whether it has retired pointers.
483             // If it has ones then we move them to pThis that is private for current thread.
484             retired_array& src = hprec->retired_;
485             retired_array& dest = pThis->retired_;
486
487             for ( retired_block* block = src.list_head_; block; block = block->next_ ) {
488                 retired_ptr* last = block == src.current_block_ ? src.current_cell_ : block->last();
489                 for ( retired_ptr* p = block->first(); p != last; ++p ) {
490                     if ( !dest.push( *p ) )
491                         scan( pThis );
492                 }
493
494                 if ( block == src.current_block_ )
495                     break;
496             }
497
498             src.fini();
499             hprec->m_bFree.store( true, atomics::memory_order_relaxed );
500             hprec->m_idOwner.store( nullThreadId, atomics::memory_order_release );
501         }
502
503         scan( pThis );
504     }
505
506     CDS_EXPORT_API void smr::statistics( stat& st )
507     {
508         st.clear();
509 #   ifdef CDS_ENABLE_HPSTAT
510         for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) )
511         {
512             ++st.thread_rec_count;
513             st.guard_allocated      += hprec->hazards_.alloc_guard_count_;
514             st.guard_freed          += hprec->hazards_.free_guard_count_;
515             st.hp_extend_count      += hprec->hazards_.extend_call_count_;
516             st.retired_count        += hprec->retired_.retire_call_count_;
517             st.retired_extend_count += hprec->retired_.extend_call_count_;
518             st.free_count           += hprec->free_call_count_;
519             st.scan_count           += hprec->scan_call_count_;
520             st.help_scan_count      += hprec->help_scan_call_count_;
521         }
522
523         st.hp_block_count = hp_allocator_.block_allocated_.load( atomics::memory_order_relaxed );
524         st.retired_block_count = retired_allocator_.block_allocated_.load( atomics::memory_order_relaxed );
525 #   endif
526     }
527
528
529 }}} // namespace cds::gc::dhp
530
531 CDS_EXPORT_API /*static*/ cds::gc::DHP::stat const& cds::gc::DHP::postmortem_statistics()
532 {
533     return cds::gc::dhp::s_postmortem_stat;
534 }
535