Removed cds::OS::is_thread_alive() function. This function is based of pthread_kill...
[libcds.git] / src / hp.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include <algorithm>
32 #include <vector>
33
34 #include <cds/gc/hp.h>
35 #include <cds/os/thread.h>
36
37 namespace cds { namespace gc { namespace hp {
38
39     namespace {
40         void * default_alloc_memory( size_t size )
41         {
42             return new uintptr_t[( size + sizeof( uintptr_t ) - 1 ) / sizeof( uintptr_t) ];
43         }
44
45         void default_free_memory( void* p )
46         {
47             delete[] reinterpret_cast<uintptr_t*>( p );
48         }
49
50         void* ( *s_alloc_memory )( size_t size ) = default_alloc_memory;
51         void ( *s_free_memory )( void* p ) = default_free_memory;
52
53         template <typename T>
54         class allocator
55         {
56         public:
57             typedef T   value_type;
58
59             allocator() {}
60             allocator( allocator const& ) {}
61             template <class U>
62             explicit allocator( allocator<U> const& ) {}
63
64             static T* allocate( size_t nCount )
65             {
66                 return reinterpret_cast<T*>( s_alloc_memory( sizeof( value_type ) * nCount ) );
67             }
68
69             static void deallocate( T* p, size_t /*nCount*/ )
70             {
71                 s_free_memory( reinterpret_cast<void*>( p ) );
72             }
73         };
74
75         struct defaults {
76             static const size_t c_nHazardPointerPerThread = 8;
77             static const size_t c_nMaxThreadCount = 100;
78         };
79
80         size_t calc_retired_size( size_t nSize, size_t nHPCount, size_t nThreadCount )
81         {
82             size_t const min_size = nHPCount * nThreadCount;
83             return nSize < min_size ? min_size * 2 : nSize;
84         }
85
86         stat s_postmortem_stat;
87     } // namespace
88
89     /*static*/ CDS_EXPORT_API smr* smr::instance_ = nullptr;
90     thread_local thread_data* tls_ = nullptr;
91
92     /*static*/ CDS_EXPORT_API thread_data* smr::tls()
93     {
94         assert( tls_ != nullptr );
95         return tls_;
96     }
97
98     struct smr::thread_record: thread_data
99     {
100         atomics::atomic<thread_record*>     m_pNextNode; ///< next hazard ptr record in list
101         atomics::atomic<cds::OS::ThreadId>  m_idOwner;   ///< Owner thread id; 0 - the record is free (not owned)
102         atomics::atomic<bool>               m_bFree;     ///< true if record is free (not owned)
103
104         thread_record( guard* guards, size_t guard_count, retired_ptr* retired_arr, size_t retired_capacity )
105             : thread_data( guards, guard_count, retired_arr, retired_capacity )
106             , m_bFree( false )
107         {}
108     };
109
110     /*static*/ CDS_EXPORT_API void smr::set_memory_allocator(
111         void* ( *alloc_func )( size_t size ),
112         void( *free_func )( void * p )
113     )
114     {
115         // The memory allocation functions may be set BEFORE initializing HP SMR!!!
116         assert( instance_ == nullptr );
117
118         s_alloc_memory = alloc_func;
119         s_free_memory = free_func;
120     }
121
122
123     /*static*/ CDS_EXPORT_API void smr::construct( size_t nHazardPtrCount, size_t nMaxThreadCount, size_t nMaxRetiredPtrCount, scan_type nScanType )
124     {
125         if ( !instance_ ) {
126             instance_ = new( s_alloc_memory(sizeof(smr))) smr( nHazardPtrCount, nMaxThreadCount, nMaxRetiredPtrCount, nScanType );
127         }
128     }
129
130     /*static*/ CDS_EXPORT_API void smr::destruct( bool bDetachAll )
131     {
132         if ( instance_ ) {
133             if ( bDetachAll )
134                 instance_->detach_all_thread();
135
136             instance_->~smr();
137             s_free_memory( instance_ );
138             instance_ = nullptr;
139         }
140     }
141
142     CDS_EXPORT_API smr::smr( size_t nHazardPtrCount, size_t nMaxThreadCount, size_t nMaxRetiredPtrCount, scan_type nScanType )
143         : thread_list_( nullptr )
144         , hazard_ptr_count_( nHazardPtrCount == 0 ? defaults::c_nHazardPointerPerThread : nHazardPtrCount )
145         , max_thread_count_( nMaxThreadCount == 0 ? defaults::c_nMaxThreadCount : nMaxThreadCount )
146         , max_retired_ptr_count_( calc_retired_size( nMaxRetiredPtrCount, hazard_ptr_count_, max_thread_count_ ))
147         , scan_type_( nScanType )
148         , scan_func_( nScanType == classic ? &smr::classic_scan : &smr::inplace_scan )
149     {}
150
151     CDS_EXPORT_API smr::~smr()
152     {
153         CDS_DEBUG_ONLY( const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId; )
154         CDS_DEBUG_ONLY( const cds::OS::ThreadId mainThreadId = cds::OS::get_current_thread_id();)
155
156         CDS_HPSTAT( statistics( s_postmortem_stat ));
157
158         thread_record* pHead = thread_list_.load( atomics::memory_order_relaxed );
159         thread_list_.store( nullptr, atomics::memory_order_relaxed );
160
161         thread_record* pNext = nullptr;
162         for ( thread_record* hprec = pHead; hprec; hprec = pNext )
163         {
164             assert( hprec->m_idOwner.load( atomics::memory_order_relaxed ) == nullThreadId
165                 || hprec->m_idOwner.load( atomics::memory_order_relaxed ) == mainThreadId )
166             );
167
168             retired_array& arr = hprec->retired_;
169             for ( retired_ptr* cur{ arr.first() }, *last{ arr.last() }; cur != last; ++cur ) {
170                 cur->free();
171                 CDS_HPSTAT( ++s_postmortem_stat.free_count );
172             }
173
174             arr.reset( 0 );
175             pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
176             hprec->m_bFree.store( true, atomics::memory_order_relaxed );
177             destroy_thread_data( hprec );
178         }
179     }
180
181
182     CDS_EXPORT_API smr::thread_record* smr::create_thread_data()
183     {
184         size_t const guard_array_size = thread_hp_storage::calc_array_size( get_hazard_ptr_count());
185         size_t const retired_array_size = retired_array::calc_array_size( get_max_retired_ptr_count());
186         size_t const nSize = sizeof( thread_record ) + guard_array_size + retired_array_size;
187
188         /*
189             The memory is allocated by contnuous block
190             Memory layout:
191             +--------------------------+
192             |                          |
193             | thread_record            |
194             |         hazards_         +---+
195         +---|         retired_         |   |
196         |   |                          |   |
197         |   |--------------------------|   |
198         |   | hazard_ptr[]             |<--+
199         |   |                          |
200         |   |                          |
201         |   |--------------------------|
202         +-->| retired_ptr[]            |
203             |                          |
204             |                          |
205             +--------------------------+
206         */
207
208         char* mem = reinterpret_cast<char*>( s_alloc_memory( nSize ));
209         return new( mem ) thread_record( 
210             reinterpret_cast<guard*>( mem + sizeof( thread_record )), get_hazard_ptr_count(),
211             reinterpret_cast<retired_ptr*>( mem + sizeof( thread_record ) + guard_array_size ), get_max_retired_ptr_count()
212         );
213     }
214
215     /*static*/ CDS_EXPORT_API void smr::destroy_thread_data( thread_record* pRec )
216     {
217         // all retired pointers must be freed
218         assert( pRec->retired_.size() == 0 );
219
220         pRec->~thread_record();
221         s_free_memory( pRec );
222     }
223
224
225     CDS_EXPORT_API smr::thread_record* smr::alloc_thread_data()
226     {
227         //CDS_HAZARDPTR_STATISTIC( ++m_Stat.m_AllocHPRec )
228
229         thread_record * hprec;
230         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
231         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
232
233         // First try to reuse a free (non-active) HP record
234         for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) ) {
235             cds::OS::ThreadId thId = nullThreadId;
236             if ( !hprec->m_idOwner.compare_exchange_strong( thId, curThreadId, atomics::memory_order_relaxed, atomics::memory_order_relaxed ) )
237                 continue;
238             hprec->m_bFree.store( false, atomics::memory_order_release );
239             return hprec;
240         }
241
242         // No HP records available for reuse
243         // Allocate and push a new HP record
244         hprec = create_thread_data();
245         hprec->m_idOwner.store( curThreadId, atomics::memory_order_relaxed );
246
247         thread_record* pOldHead = thread_list_.load( atomics::memory_order_relaxed );
248         do {
249             hprec->m_pNextNode.store( pOldHead, atomics::memory_order_relaxed );
250         } while ( !thread_list_.compare_exchange_weak( pOldHead, hprec, atomics::memory_order_release, atomics::memory_order_acquire ) );
251
252         return hprec;
253     }
254
255     CDS_EXPORT_API void smr::free_thread_data( smr::thread_record* pRec )
256     {
257         assert( pRec != nullptr );
258
259         pRec->hazards_.clear();
260         scan( pRec );
261         help_scan( pRec );
262         pRec->m_idOwner.store( cds::OS::c_NullThreadId, atomics::memory_order_release );
263     }
264
265     CDS_EXPORT_API void smr::detach_all_thread()
266     {
267         thread_record * pNext = nullptr;
268         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
269
270         for ( thread_record * hprec = thread_list_.load( atomics::memory_order_relaxed ); hprec; hprec = pNext ) {
271             pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
272             if ( hprec->m_idOwner.load( atomics::memory_order_relaxed ) != nullThreadId ) {
273                 free_thread_data( hprec );
274             }
275         }
276     }
277
278     /*static*/ CDS_EXPORT_API void smr::attach_thread()
279     {
280         if ( !tls_ )
281             tls_ = instance().alloc_thread_data();
282     }
283
284     /*static*/ CDS_EXPORT_API void smr::detach_thread()
285     {
286         thread_data* rec = tls_;
287         if ( rec ) {
288             tls_ = nullptr;
289             instance().free_thread_data( static_cast<thread_record*>( rec ));
290         }
291     }
292
293
294     CDS_EXPORT_API void smr::inplace_scan( thread_data* pThreadRec )
295     {
296         thread_record* pRec = static_cast<thread_record*>( pThreadRec );
297
298         //CDS_HAZARDPTR_STATISTIC( ++m_Stat.m_ScanCallCount )
299
300         // In-place scan algo uses LSB of retired ptr as a mark for internal purposes.
301         // It is correct if all retired pointers are ar least 2-byte aligned (LSB is zero).
302         // If it is wrong, we use classic scan algorithm
303
304         // Check if all retired pointers has zero LSB
305         // LSB is used for marking pointers that cannot be deleted yet
306         retired_ptr* first_retired = pRec->retired_.first();
307         retired_ptr* last_retired = pRec->retired_.last();
308         if ( first_retired == last_retired )
309             return;
310
311         for ( auto it = first_retired; it != last_retired; ++it ) {
312             if ( it->m_n & 1 ) {
313                 // found a pointer with LSB bit set - use classic_scan
314                 classic_scan( pRec );
315                 return;
316             }
317         }
318
319         CDS_HPSTAT( ++pRec->scan_count_ );
320
321         // Sort retired pointer array
322         std::sort( first_retired, last_retired, retired_ptr::less );
323
324         // Check double free
325 #   ifdef _DEBUG
326         {
327             auto it = first_retired;
328             auto itPrev = it;
329             while ( ++it != last_retired ) {
330                 assert( itPrev->m_p < it->m_p );
331                 itPrev = it;
332             }
333         }
334 #   endif
335
336         // Search guarded pointers in retired array
337         thread_record* pNode = thread_list_.load( atomics::memory_order_acquire );
338
339         {
340             retired_ptr dummy_retired;
341             while ( pNode ) {
342                 if ( pNode->m_idOwner.load( atomics::memory_order_relaxed ) != cds::OS::c_NullThreadId ) {
343                     thread_hp_storage& hpstg = pNode->hazards_;
344                     for ( size_t i = 0; i < hazard_ptr_count_; ++i ) {
345                         pRec->sync();
346                         void * hptr = hpstg[i].get();
347                         if ( hptr ) {
348                             dummy_retired.m_p = hptr;
349                             retired_ptr* it = std::lower_bound( first_retired, last_retired, dummy_retired, retired_ptr::less );
350                             if ( it != last_retired && it->m_p == hptr ) {
351                                 // Mark retired pointer as guarded
352                                 it->m_n |= 1;
353                             }
354                         }
355                     }
356                 }
357                 pNode = pNode->m_pNextNode.load( atomics::memory_order_relaxed );
358             }
359         }
360
361         // Move all marked pointers to head of array
362         {
363             retired_ptr* insert_pos = first_retired;
364             for ( retired_ptr* it = first_retired; it != last_retired; ++it ) {
365                 if ( it->m_n & 1 ) {
366                     it->m_n &= ~uintptr_t(1);
367                     if ( insert_pos != it )
368                         *insert_pos = *it;
369                     ++insert_pos;
370                 }
371                 else {
372                     // Retired pointer may be freed
373                     it->free();
374                     CDS_HPSTAT( ++pRec->free_count_ );
375                 }
376             }
377             const size_t nDeferred = insert_pos - first_retired;
378             pRec->retired_.reset( nDeferred );
379         }
380     }
381
382     // cppcheck-suppress functionConst
383     CDS_EXPORT_API void smr::classic_scan( thread_data* pThreadRec )
384     {
385         thread_record* pRec = static_cast<thread_record*>( pThreadRec );
386
387         CDS_HPSTAT( ++pRec->scan_count_ );
388
389         std::vector< void*, allocator<void*>>   plist;
390         plist.reserve( get_max_thread_count() * get_hazard_ptr_count());
391         assert( plist.size() == 0 );
392
393         // Stage 1: Scan HP list and insert non-null values in plist
394
395         thread_record* pNode = thread_list_.load( atomics::memory_order_acquire );
396
397         while ( pNode ) {
398             if ( pNode->m_idOwner.load( std::memory_order_relaxed ) != cds::OS::c_NullThreadId ) {
399                 for ( size_t i = 0; i < get_hazard_ptr_count(); ++i ) {
400                     pRec->sync();
401                     void * hptr = pNode->hazards_[i].get();
402                     if ( hptr )
403                         plist.push_back( hptr );
404                 }
405             }
406             pNode = pNode->m_pNextNode.load( atomics::memory_order_relaxed );
407         }
408
409         // Sort plist to simplify search in
410         std::sort( plist.begin(), plist.end() );
411
412         // Stage 2: Search plist
413         retired_array& retired = pRec->retired_;
414
415         retired_ptr* first_retired = retired.first();
416         retired_ptr* last_retired = retired.last();
417
418         {
419             auto itBegin = plist.begin();
420             auto itEnd = plist.end();
421             retired_ptr* insert_pos = first_retired;
422             for ( retired_ptr* it = first_retired; it != last_retired; ++it ) {
423                 if ( std::binary_search( itBegin, itEnd, first_retired->m_p ) ) {
424                     if ( insert_pos != it )
425                         *insert_pos = *it;
426                     ++insert_pos;
427                 }
428                 else {
429                     it->free();
430                     CDS_HPSTAT( ++pRec->free_count_ );
431                 }
432             }
433
434             retired.reset( insert_pos - first_retired );
435         }
436     }
437
438     CDS_EXPORT_API void smr::help_scan( thread_data* pThis )
439     {
440         assert( static_cast<thread_record*>( pThis )->m_idOwner.load( atomics::memory_order_relaxed ) == cds::OS::get_current_thread_id() );
441
442         CDS_HPSTAT( ++pThis->help_scan_count_ );
443
444         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
445         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
446         for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ))
447         {
448             if ( hprec == static_cast<thread_record*>( pThis ))
449                 continue;
450
451             // If m_bFree == true then hprec->retired_ is empty - we don't need to see it
452             if ( hprec->m_bFree.load( atomics::memory_order_acquire ))
453                 continue;
454
455             // Owns hprec if it is empty.
456             // Several threads may work concurrently so we use atomic technique only.
457             {
458                 cds::OS::ThreadId curOwner = hprec->m_idOwner.load( atomics::memory_order_relaxed );
459                 if ( curOwner == nullThreadId ) {
460                     if ( !hprec->m_idOwner.compare_exchange_strong( curOwner, curThreadId, atomics::memory_order_acquire, atomics::memory_order_relaxed ) )
461                         continue;
462                 }
463                 else
464                     continue;
465             }
466
467             // We own the thread record successfully. Now, we can see whether it has retired pointers.
468             // If it has ones then we move them to pThis that is private for current thread.
469             retired_array& src = hprec->retired_;
470             retired_array& dest = pThis->retired_;
471             assert( !dest.full() );
472
473             retired_ptr* src_first = src.first();
474             retired_ptr* src_last = src.last();
475
476             for ( ; src_first != src_last; ++src_first ) {
477                 if ( !dest.push( std::move( *src_first )))
478                     scan( pThis );
479             }
480
481             src.reset( 0 );
482
483             hprec->m_bFree.store( true, atomics::memory_order_relaxed );
484             hprec->m_idOwner.store( nullThreadId, atomics::memory_order_release );
485
486             scan( pThis );
487         }
488     }
489
490     CDS_EXPORT_API void smr::statistics( stat& st )
491     {
492         st.clear();
493 #   ifdef CDS_ENABLE_HPSTAT
494         for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) )
495         {
496             ++st.thread_rec_count;
497             st.guard_allocated += hprec->hazards_.alloc_guard_count_;
498             st.guard_freed     += hprec->hazards_.free_guard_count_;
499             st.retired_count   += hprec->retired_.retire_call_count_;
500             st.free_count      += hprec->free_count_;
501             st.scan_count      += hprec->scan_count_;
502             st.help_scan_count += hprec->help_scan_count_;
503         }
504 #   endif
505     }
506
507 }}} // namespace cds::gc::hp
508
509 CDS_EXPORT_API /*static*/ cds::gc::HP::stat const& cds::gc::HP::postmortem_statistics()
510 {
511     return cds::gc::hp::s_postmortem_stat;
512 }
513