reformatting
[libcds.git] / src / hp.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include <algorithm>
32 #include <vector>
33
34 #include <cds/gc/hp.h>
35 #include <cds/os/thread.h>
36
37 namespace cds { namespace gc { namespace hp {
38
39     namespace {
40         void * default_alloc_memory( size_t size )
41         {
42             return new uintptr_t[( size + sizeof( uintptr_t ) - 1 ) / sizeof( uintptr_t) ];
43         }
44
45         void default_free_memory( void* p )
46         {
47             delete[] reinterpret_cast<uintptr_t*>( p );
48         }
49
50         void* ( *s_alloc_memory )( size_t size ) = default_alloc_memory;
51         void ( *s_free_memory )( void* p ) = default_free_memory;
52
53         template <typename T>
54         class allocator
55         {
56         public:
57             typedef T   value_type;
58
59             allocator() {}
60             allocator( allocator const& ) {}
61             template <class U>
62             explicit allocator( allocator<U> const& ) {}
63
64             static T* allocate( size_t nCount )
65             {
66                 return reinterpret_cast<T*>( s_alloc_memory( sizeof( value_type ) * nCount ));
67             }
68
69             static void deallocate( T* p, size_t /*nCount*/ )
70             {
71                 s_free_memory( reinterpret_cast<void*>( p ));
72             }
73         };
74
75         struct defaults {
76             static const size_t c_nHazardPointerPerThread = 8;
77             static const size_t c_nMaxThreadCount = 100;
78         };
79
80         size_t calc_retired_size( size_t nSize, size_t nHPCount, size_t nThreadCount )
81         {
82             size_t const min_size = nHPCount * nThreadCount;
83             return nSize < min_size ? min_size * 2 : nSize;
84         }
85
86         stat s_postmortem_stat;
87     } // namespace
88
89     /*static*/ CDS_EXPORT_API smr* smr::instance_ = nullptr;
90     thread_local thread_data* tls_ = nullptr;
91
92     /*static*/ CDS_EXPORT_API thread_data* smr::tls()
93     {
94         assert( tls_ != nullptr );
95         return tls_;
96     }
97
98     struct smr::thread_record: thread_data
99     {
100         atomics::atomic<thread_record*>     m_pNextNode; ///< next hazard ptr record in list
101         atomics::atomic<cds::OS::ThreadId>  m_idOwner;   ///< Owner thread id; 0 - the record is free (not owned)
102         atomics::atomic<bool>               m_bFree;     ///< true if record is free (not owned)
103
104         thread_record( guard* guards, size_t guard_count, retired_ptr* retired_arr, size_t retired_capacity )
105             : thread_data( guards, guard_count, retired_arr, retired_capacity )
106             , m_pNextNode( nullptr )
107             , m_idOwner( cds::OS::c_NullThreadId )
108             , m_bFree( false )
109         {}
110     };
111
112     /*static*/ CDS_EXPORT_API void smr::set_memory_allocator(
113         void* ( *alloc_func )( size_t size ),
114         void( *free_func )( void * p )
115     )
116     {
117         // The memory allocation functions may be set BEFORE initializing HP SMR!!!
118         assert( instance_ == nullptr );
119
120         s_alloc_memory = alloc_func;
121         s_free_memory = free_func;
122     }
123
124
125     /*static*/ CDS_EXPORT_API void smr::construct( size_t nHazardPtrCount, size_t nMaxThreadCount, size_t nMaxRetiredPtrCount, scan_type nScanType )
126     {
127         if ( !instance_ ) {
128             instance_ = new( s_alloc_memory(sizeof(smr))) smr( nHazardPtrCount, nMaxThreadCount, nMaxRetiredPtrCount, nScanType );
129         }
130     }
131
132     /*static*/ CDS_EXPORT_API void smr::destruct( bool bDetachAll )
133     {
134         if ( instance_ ) {
135             if ( bDetachAll )
136                 instance_->detach_all_thread();
137
138             instance_->~smr();
139             s_free_memory( instance_ );
140             instance_ = nullptr;
141         }
142     }
143
144     CDS_EXPORT_API smr::smr( size_t nHazardPtrCount, size_t nMaxThreadCount, size_t nMaxRetiredPtrCount, scan_type nScanType )
145         : hazard_ptr_count_( nHazardPtrCount == 0 ? defaults::c_nHazardPointerPerThread : nHazardPtrCount )
146         , max_thread_count_( nMaxThreadCount == 0 ? defaults::c_nMaxThreadCount : nMaxThreadCount )
147         , max_retired_ptr_count_( calc_retired_size( nMaxRetiredPtrCount, hazard_ptr_count_, max_thread_count_ ))
148         , scan_type_( nScanType )
149         , scan_func_( nScanType == classic ? &smr::classic_scan : &smr::inplace_scan )
150     {
151         thread_list_.store( nullptr, atomics::memory_order_release );
152     }
153
154     CDS_EXPORT_API smr::~smr()
155     {
156         CDS_DEBUG_ONLY( const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId; )
157         CDS_DEBUG_ONLY( const cds::OS::ThreadId mainThreadId = cds::OS::get_current_thread_id();)
158
159         CDS_HPSTAT( statistics( s_postmortem_stat ));
160
161         thread_record* pHead = thread_list_.load( atomics::memory_order_relaxed );
162         thread_list_.store( nullptr, atomics::memory_order_release );
163
164         thread_record* pNext = nullptr;
165         for ( thread_record* hprec = pHead; hprec; hprec = pNext )
166         {
167             assert( hprec->m_idOwner.load( atomics::memory_order_relaxed ) == nullThreadId
168                 || hprec->m_idOwner.load( atomics::memory_order_relaxed ) == mainThreadId );
169
170             retired_array& arr = hprec->retired_;
171             for ( retired_ptr* cur{ arr.first() }, *last{ arr.last() }; cur != last; ++cur ) {
172                 cur->free();
173                 CDS_HPSTAT( ++s_postmortem_stat.free_count );
174             }
175
176             arr.reset( 0 );
177             pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
178             hprec->m_bFree.store( true, atomics::memory_order_relaxed );
179             destroy_thread_data( hprec );
180         }
181     }
182
183
184     CDS_EXPORT_API smr::thread_record* smr::create_thread_data()
185     {
186         size_t const guard_array_size = thread_hp_storage::calc_array_size( get_hazard_ptr_count());
187         size_t const retired_array_size = retired_array::calc_array_size( get_max_retired_ptr_count());
188         size_t const nSize = sizeof( thread_record ) + guard_array_size + retired_array_size;
189
190         /*
191             The memory is allocated by contnuous block
192             Memory layout:
193             +--------------------------+
194             |                          |
195             | thread_record            |
196             |         hazards_         +---+
197         +---|         retired_         |   |
198         |   |                          |   |
199         |   |--------------------------|   |
200         |   | hazard_ptr[]             |<--+
201         |   |                          |
202         |   |                          |
203         |   |--------------------------|
204         +-->| retired_ptr[]            |
205             |                          |
206             |                          |
207             +--------------------------+
208         */
209
210         uint8_t* mem = reinterpret_cast<uint8_t*>( s_alloc_memory( nSize ));
211
212         return new( mem ) thread_record(
213             reinterpret_cast<guard*>( mem + sizeof( thread_record )),
214             get_hazard_ptr_count(),
215             reinterpret_cast<retired_ptr*>( mem + sizeof( thread_record ) + guard_array_size ),
216             get_max_retired_ptr_count()
217         );
218     }
219
220     /*static*/ CDS_EXPORT_API void smr::destroy_thread_data( thread_record* pRec )
221     {
222         // all retired pointers must be freed
223         assert( pRec->retired_.size() == 0 );
224
225         pRec->~thread_record();
226         s_free_memory( pRec );
227     }
228
229
230     CDS_EXPORT_API smr::thread_record* smr::alloc_thread_data()
231     {
232         thread_record * hprec;
233         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
234         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
235
236         // First try to reuse a free (non-active) HP record
237         for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_acquire )) {
238             cds::OS::ThreadId thId = nullThreadId;
239             if ( !hprec->m_idOwner.compare_exchange_strong( thId, curThreadId, atomics::memory_order_relaxed, atomics::memory_order_relaxed ))
240                 continue;
241             hprec->m_bFree.store( false, atomics::memory_order_release );
242             return hprec;
243         }
244
245         // No HP records available for reuse
246         // Allocate and push a new HP record
247         hprec = create_thread_data();
248         hprec->m_idOwner.store( curThreadId, atomics::memory_order_relaxed );
249
250         thread_record* pOldHead = thread_list_.load( atomics::memory_order_relaxed );
251         do {
252             hprec->m_pNextNode.store( pOldHead, atomics::memory_order_release );
253         } while ( !thread_list_.compare_exchange_weak( pOldHead, hprec, atomics::memory_order_release, atomics::memory_order_acquire ));
254
255         return hprec;
256     }
257
258     CDS_EXPORT_API void smr::free_thread_data( smr::thread_record* pRec )
259     {
260         assert( pRec != nullptr );
261
262         pRec->hazards_.clear();
263         scan( pRec );
264         help_scan( pRec );
265         pRec->m_idOwner.store( cds::OS::c_NullThreadId, atomics::memory_order_release );
266     }
267
268     CDS_EXPORT_API void smr::detach_all_thread()
269     {
270         thread_record * pNext = nullptr;
271         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
272
273         for ( thread_record * hprec = thread_list_.load( atomics::memory_order_relaxed ); hprec; hprec = pNext ) {
274             pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
275             if ( hprec->m_idOwner.load( atomics::memory_order_relaxed ) != nullThreadId ) {
276                 free_thread_data( hprec );
277             }
278         }
279     }
280
281     /*static*/ CDS_EXPORT_API void smr::attach_thread()
282     {
283         if ( !tls_ )
284             tls_ = instance().alloc_thread_data();
285     }
286
287     /*static*/ CDS_EXPORT_API void smr::detach_thread()
288     {
289         thread_data* rec = tls_;
290         if ( rec ) {
291             tls_ = nullptr;
292             instance().free_thread_data( static_cast<thread_record*>( rec ));
293         }
294     }
295
296
297     CDS_EXPORT_API void smr::inplace_scan( thread_data* pThreadRec )
298     {
299         thread_record* pRec = static_cast<thread_record*>( pThreadRec );
300
301         //CDS_HAZARDPTR_STATISTIC( ++m_Stat.m_ScanCallCount )
302
303         // In-place scan algo uses LSB of retired ptr as a mark for internal purposes.
304         // It is correct if all retired pointers are ar least 2-byte aligned (LSB is zero).
305         // If it is wrong, we use classic scan algorithm
306
307         // Check if all retired pointers has zero LSB
308         // LSB is used for marking pointers that cannot be deleted yet
309         retired_ptr* first_retired = pRec->retired_.first();
310         retired_ptr* last_retired = pRec->retired_.last();
311         if ( first_retired == last_retired )
312             return;
313
314         for ( auto it = first_retired; it != last_retired; ++it ) {
315             if ( it->m_n & 1 ) {
316                 // found a pointer with LSB bit set - use classic_scan
317                 classic_scan( pRec );
318                 return;
319             }
320         }
321
322         CDS_HPSTAT( ++pThreadRec->scan_count_ );
323
324         // Sort retired pointer array
325         std::sort( first_retired, last_retired, retired_ptr::less );
326
327         // Check double free
328 #   ifdef _DEBUG
329         {
330             auto it = first_retired;
331             auto itPrev = it;
332             while ( ++it != last_retired ) {
333                 assert( itPrev->m_p < it->m_p );
334                 itPrev = it;
335             }
336         }
337 #   endif
338
339         // Search guarded pointers in retired array
340         thread_record* pNode = thread_list_.load( atomics::memory_order_acquire );
341
342         {
343             retired_ptr dummy_retired;
344             while ( pNode ) {
345                 if ( pNode->m_idOwner.load( atomics::memory_order_relaxed ) != cds::OS::c_NullThreadId ) {
346                     thread_hp_storage& hpstg = pNode->hazards_;
347                     for ( size_t i = 0; i < hazard_ptr_count_; ++i ) {
348                         pRec->sync();
349                         void * hptr = hpstg[i].get();
350                         if ( hptr ) {
351                             dummy_retired.m_p = hptr;
352                             retired_ptr* it = std::lower_bound( first_retired, last_retired, dummy_retired, retired_ptr::less );
353                             if ( it != last_retired && it->m_p == hptr ) {
354                                 // Mark retired pointer as guarded
355                                 it->m_n |= 1;
356                             }
357                         }
358                     }
359                 }
360                 pNode = pNode->m_pNextNode.load( atomics::memory_order_relaxed );
361             }
362         }
363
364         // Move all marked pointers to head of array
365         {
366             retired_ptr* insert_pos = first_retired;
367             for ( retired_ptr* it = first_retired; it != last_retired; ++it ) {
368                 if ( it->m_n & 1 ) {
369                     it->m_n &= ~uintptr_t(1);
370                     if ( insert_pos != it )
371                         *insert_pos = *it;
372                     ++insert_pos;
373                 }
374                 else {
375                     // Retired pointer may be freed
376                     it->free();
377                     CDS_HPSTAT( ++pRec->free_count_ );
378                 }
379             }
380             const size_t nDeferred = insert_pos - first_retired;
381             pRec->retired_.reset( nDeferred );
382         }
383     }
384
385     // cppcheck-suppress functionConst
386     CDS_EXPORT_API void smr::classic_scan( thread_data* pThreadRec )
387     {
388         thread_record* pRec = static_cast<thread_record*>( pThreadRec );
389
390         CDS_HPSTAT( ++pThreadRec->scan_count_ );
391
392         std::vector< void*, allocator<void*>>   plist;
393         plist.reserve( get_max_thread_count() * get_hazard_ptr_count());
394         assert( plist.size() == 0 );
395
396         // Stage 1: Scan HP list and insert non-null values in plist
397
398         thread_record* pNode = thread_list_.load( atomics::memory_order_acquire );
399
400         while ( pNode ) {
401             if ( pNode->m_idOwner.load( std::memory_order_relaxed ) != cds::OS::c_NullThreadId ) {
402                 for ( size_t i = 0; i < get_hazard_ptr_count(); ++i ) {
403                     pRec->sync();
404                     void * hptr = pNode->hazards_[i].get();
405                     if ( hptr )
406                         plist.push_back( hptr );
407                 }
408             }
409             pNode = pNode->m_pNextNode.load( atomics::memory_order_relaxed );
410         }
411
412         // Sort plist to simplify search in
413         std::sort( plist.begin(), plist.end());
414
415         // Stage 2: Search plist
416         retired_array& retired = pRec->retired_;
417
418         retired_ptr* first_retired = retired.first();
419         retired_ptr* last_retired = retired.last();
420
421         {
422             auto itBegin = plist.begin();
423             auto itEnd = plist.end();
424             retired_ptr* insert_pos = first_retired;
425             for ( retired_ptr* it = first_retired; it != last_retired; ++it ) {
426                 if ( std::binary_search( itBegin, itEnd, first_retired->m_p )) {
427                     if ( insert_pos != it )
428                         *insert_pos = *it;
429                     ++insert_pos;
430                 }
431                 else {
432                     it->free();
433                     CDS_HPSTAT( ++pRec->free_count_ );
434                 }
435             }
436
437             retired.reset( insert_pos - first_retired );
438         }
439     }
440
441     CDS_EXPORT_API void smr::help_scan( thread_data* pThis )
442     {
443         assert( static_cast<thread_record*>( pThis )->m_idOwner.load( atomics::memory_order_relaxed ) == cds::OS::get_current_thread_id());
444
445         CDS_HPSTAT( ++pThis->help_scan_count_ );
446
447         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
448         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
449         for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ))
450         {
451             if ( hprec == static_cast<thread_record*>( pThis ))
452                 continue;
453
454             // If m_bFree == true then hprec->retired_ is empty - we don't need to see it
455             if ( hprec->m_bFree.load( atomics::memory_order_acquire ))
456                 continue;
457
458             // Owns hprec if it is empty.
459             // Several threads may work concurrently so we use atomic technique only.
460             {
461                 cds::OS::ThreadId curOwner = hprec->m_idOwner.load( atomics::memory_order_relaxed );
462                 if ( curOwner == nullThreadId ) {
463                     if ( !hprec->m_idOwner.compare_exchange_strong( curOwner, curThreadId, atomics::memory_order_acquire, atomics::memory_order_relaxed ))
464                         continue;
465                 }
466                 else
467                     continue;
468             }
469
470             // We own the thread record successfully. Now, we can see whether it has retired pointers.
471             // If it has ones then we move them to pThis that is private for current thread.
472             retired_array& src = hprec->retired_;
473             retired_array& dest = pThis->retired_;
474             assert( !dest.full());
475
476             retired_ptr* src_first = src.first();
477             retired_ptr* src_last = src.last();
478
479             for ( ; src_first != src_last; ++src_first ) {
480                 if ( !dest.push( std::move( *src_first )))
481                     scan( pThis );
482             }
483
484             src.interthread_clear();
485             hprec->m_bFree.store( true, atomics::memory_order_release );
486             hprec->m_idOwner.store( nullThreadId, atomics::memory_order_release );
487
488             scan( pThis );
489         }
490     }
491
492     CDS_EXPORT_API void smr::statistics( stat& st )
493     {
494         st.clear();
495 #   ifdef CDS_ENABLE_HPSTAT
496         for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ))
497         {
498             CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN;
499             ++st.thread_rec_count;
500             st.guard_allocated += hprec->hazards_.alloc_guard_count_;
501             st.guard_freed     += hprec->hazards_.free_guard_count_;
502             st.retired_count   += hprec->retired_.retire_call_count_;
503             st.free_count      += hprec->free_count_;
504             st.scan_count      += hprec->scan_count_;
505             st.help_scan_count += hprec->help_scan_count_;
506             CDS_TSAN_ANNOTATE_IGNORE_READS_END;
507         }
508 #   endif
509     }
510
511 }}} // namespace cds::gc::hp
512
513 CDS_EXPORT_API /*static*/ cds::gc::HP::stat const& cds::gc::HP::postmortem_statistics()
514 {
515     return cds::gc::hp::s_postmortem_stat;
516 }
517