Fixed compiler error
[libcds.git] / src / dhp.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include <algorithm>
32 #include <vector>
33
34 #include <cds/gc/dhp.h>
35 #include <cds/os/thread.h>
36
37 namespace cds { namespace gc { namespace dhp {
38
39     namespace {
40         void * default_alloc_memory( size_t size )
41         {
42             return new uintptr_t[( size + sizeof( uintptr_t ) - 1 ) / sizeof( uintptr_t )];
43         }
44
45         void default_free_memory( void* p )
46         {
47             delete[] reinterpret_cast<uintptr_t*>( p );
48         }
49
50         struct defaults {
51             static size_t const c_extended_guard_block_size = 16;
52         };
53
54         void* ( *s_alloc_memory )( size_t size ) = default_alloc_memory;
55         void( *s_free_memory )( void* p ) = default_free_memory;
56
57         template <typename T>
58         class allocator
59         {
60         public:
61             typedef T   value_type;
62
63             allocator() {}
64             allocator( allocator const& ) {}
65             template <class U>
66             explicit allocator( allocator<U> const& ) {}
67
68             static T* allocate( size_t nCount )
69             {
70                 return reinterpret_cast<T*>( s_alloc_memory( sizeof( value_type ) * nCount ));
71             }
72
73             static void deallocate( T* p, size_t /*nCount*/ )
74             {
75                 s_free_memory( reinterpret_cast<void*>( p ));
76             }
77         };
78
79         stat s_postmortem_stat;
80     } // namespace
81
82     /*static*/ CDS_EXPORT_API smr* smr::instance_ = nullptr;
83     thread_local thread_data* tls_ = nullptr;
84
85     CDS_EXPORT_API hp_allocator::~hp_allocator()
86     {
87         while ( guard_block* gp = static_cast<guard_block*>( free_list_.get())) {
88             gp->~guard_block();
89             s_free_memory( gp );
90         }
91     }
92
93     CDS_EXPORT_API guard_block* hp_allocator::alloc()
94     {
95         guard_block* gb;
96         auto block = free_list_.get();
97         if ( block )
98             gb = static_cast< guard_block* >( block );
99         else {
100             // allocate new block
101             gb = new( s_alloc_memory( sizeof( guard_block ) + sizeof( guard ) * defaults::c_extended_guard_block_size )) guard_block;
102             new ( gb->first() ) guard[defaults::c_extended_guard_block_size];
103
104             CDS_HPSTAT( block_allocated_.fetch_add( 1, atomics::memory_order_relaxed ));
105         }
106
107         // links guards in the block
108         guard* p = gb->first();
109         for ( guard* last = p + defaults::c_extended_guard_block_size - 1; p != last; ++p ) {
110             p->clear( atomics::memory_order_relaxed );
111             p->next_ = p + 1;
112         }
113         p->next_ = nullptr;
114         p->clear();
115
116         return gb;
117     }
118
119     CDS_EXPORT_API retired_allocator::~retired_allocator()
120     {
121         while ( retired_block* rb = static_cast<retired_block*>( free_list_.get() ) ) {
122             rb->~retired_block();
123             s_free_memory( rb );
124         }
125     }
126
127     CDS_EXPORT_API retired_block* retired_allocator::alloc()
128     {
129         retired_block* rb;
130         auto block = free_list_.get();
131         if ( block )
132             rb = static_cast< retired_block* >( block );
133         else {
134             // allocate new block
135             rb = new( s_alloc_memory( sizeof( retired_block ) + sizeof( retired_ptr ) * retired_block::c_capacity )) retired_block;
136             new ( rb->first()) retired_ptr[retired_block::c_capacity];
137             CDS_HPSTAT( block_allocated_.fetch_add( 1, atomics::memory_order_relaxed ) );
138         }
139
140         rb->next_ = nullptr;
141         return rb;
142     }
143
144     struct smr::thread_record: thread_data
145     {
146         atomics::atomic<thread_record*>     m_pNextNode; ///< next hazard ptr record in list
147         atomics::atomic<cds::OS::ThreadId>  m_idOwner;   ///< Owner thread id; 0 - the record is free (not owned)
148         atomics::atomic<bool>               m_bFree;     ///< true if record is free (not owned)
149
150         thread_record( guard* guards, size_t guard_count )
151             : thread_data( guards, guard_count )
152             , m_bFree( false )
153         {}
154     };
155
156     /*static*/ CDS_EXPORT_API thread_data* smr::tls()
157     {
158         assert( tls_ != nullptr );
159         return tls_;
160     }
161
162     /*static*/ CDS_EXPORT_API void smr::set_memory_allocator(
163         void* ( *alloc_func )( size_t size ),
164         void( *free_func )( void * p )
165     )
166     {
167         // The memory allocation functions may be set BEFORE initializing DHP SMR!!!
168         assert( instance_ == nullptr );
169
170         s_alloc_memory = alloc_func;
171         s_free_memory = free_func;
172     }
173
174     /*static*/ CDS_EXPORT_API void smr::construct( size_t nInitialHazardPtrCount )
175     {
176         if ( !instance_ ) {
177             instance_ = new( s_alloc_memory( sizeof( smr ))) smr( nInitialHazardPtrCount );
178         }
179     }
180
181     /*static*/ CDS_EXPORT_API void smr::destruct( bool bDetachAll )
182     {
183         if ( instance_ ) {
184             if ( bDetachAll )
185                 instance_->detach_all_thread();
186
187             instance_->~smr();
188             s_free_memory( instance_ );
189             instance_ = nullptr;
190         }
191     }
192
193     CDS_EXPORT_API smr::smr( size_t nInitialHazardPtrCount )
194         : thread_list_( nullptr )
195         , initial_hazard_count_( nInitialHazardPtrCount < 4 ? 16 : nInitialHazardPtrCount )
196         , last_plist_size_( initial_hazard_count_ * 64 )
197     {}
198
199     CDS_EXPORT_API smr::~smr()
200     {
201         CDS_DEBUG_ONLY( const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId; )
202         CDS_DEBUG_ONLY( const cds::OS::ThreadId mainThreadId = cds::OS::get_current_thread_id(); )
203
204         CDS_HPSTAT( statistics( s_postmortem_stat ) );
205
206         thread_record* pHead = thread_list_.load( atomics::memory_order_relaxed );
207         thread_list_.store( nullptr, atomics::memory_order_relaxed );
208
209         thread_record* pNext = nullptr;
210         for ( thread_record* hprec = pHead; hprec; hprec = pNext )
211         {
212             assert( hprec->m_idOwner.load( atomics::memory_order_relaxed ) == nullThreadId
213                 || hprec->m_idOwner.load( atomics::memory_order_relaxed ) == mainThreadId );
214
215             retired_array& retired = hprec->retired_;
216
217             // delete retired data
218             for ( retired_block* block = retired.list_head_; block && block != retired.current_block_; block = block->next_ ) {
219                 for ( retired_ptr* p = block->first(); p != block->last(); ++p ) {
220                     p->free();
221                     CDS_HPSTAT( ++s_postmortem_stat.free_count );
222                 }
223             }
224             if ( retired.current_block_ ) {
225                 for ( retired_ptr* p = retired.current_block_->first(); p != retired.current_cell_; ++p ) {
226                     p->free();
227                     CDS_HPSTAT( ++s_postmortem_stat.free_count );
228                 }
229             }
230             hprec->retired_.fini();
231             hprec->hazards_.clear();
232
233             pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
234             hprec->m_bFree.store( true, atomics::memory_order_relaxed );
235             destroy_thread_data( hprec );
236         }
237     }
238
239     /*static*/ CDS_EXPORT_API void smr::attach_thread()
240     {
241         if ( !tls_ )
242             tls_ = instance().alloc_thread_data();
243     }
244
245     /*static*/ CDS_EXPORT_API void smr::detach_thread()
246     {
247         thread_data* rec = tls_;
248         if ( rec ) {
249             tls_ = nullptr;
250             instance().free_thread_data( static_cast<thread_record*>( rec ) );
251         }
252     }
253
254     CDS_EXPORT_API void smr::detach_all_thread()
255     {
256         thread_record * pNext = nullptr;
257         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
258
259         for ( thread_record * hprec = thread_list_.load( atomics::memory_order_relaxed ); hprec; hprec = pNext ) {
260             pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
261             if ( hprec->m_idOwner.load( atomics::memory_order_relaxed ) != nullThreadId ) {
262                 free_thread_data( hprec );
263             }
264         }
265     }
266
267     CDS_EXPORT_API smr::thread_record* smr::create_thread_data()
268     {
269         size_t const guard_array_size = sizeof( guard ) * initial_hazard_count_;
270
271         /*
272             The memory is allocated by contnuous block
273             Memory layout:
274             +--------------------------+
275             |                          |
276             | thread_record            |
277             |         hazards_         +---+
278             |         retired_         |   |
279             |                          |   |
280             |--------------------------|   |
281             | hazard_ptr[]             |<--+
282             |  initial HP array        |
283             |                          |
284             +--------------------------+
285         */
286
287         char* mem = reinterpret_cast<char*>( s_alloc_memory( sizeof( thread_record ) + guard_array_size ));
288         return new( mem ) thread_record(
289             reinterpret_cast<guard*>( mem + sizeof( thread_record ) ), initial_hazard_count_
290         );
291     }
292
293     /*static*/ CDS_EXPORT_API void smr::destroy_thread_data( thread_record* pRec )
294     {
295         // all retired pointers must be freed
296         pRec->~thread_record();
297         s_free_memory( pRec );
298     }
299
300     CDS_EXPORT_API smr::thread_record* smr::alloc_thread_data()
301     {
302         thread_record * hprec = nullptr;
303         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
304         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
305
306         // First try to reuse a free (non-active) DHP record
307         for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) ) {
308             cds::OS::ThreadId thId = nullThreadId;
309             if ( !hprec->m_idOwner.compare_exchange_strong( thId, curThreadId, atomics::memory_order_relaxed, atomics::memory_order_relaxed ) )
310                 continue;
311             hprec->m_bFree.store( false, atomics::memory_order_release );
312             break;
313         }
314         
315         if ( !hprec ) {
316             // No HP records available for reuse
317             // Allocate and push a new HP record
318             hprec = create_thread_data();
319             hprec->m_idOwner.store( curThreadId, atomics::memory_order_relaxed );
320
321             thread_record* pOldHead = thread_list_.load( atomics::memory_order_relaxed );
322             do {
323                 hprec->m_pNextNode.store( pOldHead, atomics::memory_order_relaxed );
324             } while ( !thread_list_.compare_exchange_weak( pOldHead, hprec, atomics::memory_order_release, atomics::memory_order_acquire ) );
325         }
326
327         hprec->hazards_.init();
328         hprec->retired_.init();
329
330         return hprec;
331     }
332
333     CDS_EXPORT_API void smr::free_thread_data( thread_record* pRec )
334     {
335         assert( pRec != nullptr );
336         //CDS_HAZARDPTR_STATISTIC( ++m_Stat.m_RetireHPRec )
337
338         pRec->hazards_.clear();
339         scan( pRec );
340         help_scan( pRec );
341
342         if ( pRec->retired_.empty() ) {
343             pRec->retired_.fini();
344             pRec->m_bFree.store( true, std::memory_order_release );
345         }
346         else {
347             // Free all empty blocks
348             retired_block* free_block = pRec->retired_.current_block_->next_;
349             if ( free_block ) {
350                 pRec->retired_.current_block_->next_ = nullptr;
351                 while ( free_block ) {
352                     retired_block* next = free_block->next_;
353                     retired_allocator_.free( free_block );
354                     free_block = next;
355                     --pRec->retired_.block_count_;
356                 }
357             }
358         }
359
360         pRec->m_idOwner.store( cds::OS::c_NullThreadId, atomics::memory_order_release );
361     }
362
363     namespace {
364         typedef std::vector<void*, allocator<void*>> hp_vector;
365
366         inline void copy_hazards( hp_vector& vect, guard const* arr, size_t size )
367         {
368             for ( guard const* end = arr + size; arr != end; ++arr ) {
369                 void* hp = arr->get();
370                 if ( hp )
371                     vect.push_back( hp );
372             }
373         }
374
375         inline size_t retire_data( hp_vector const& plist, retired_array& stg, retired_block* block, size_t block_size )
376         {
377             auto hp_begin = plist.begin();
378             auto hp_end = plist.end();
379             size_t count = 0;
380
381             for ( retired_ptr* p = block->first(), *end = p + block_size; p != end; ++p ) {
382                 if ( cds_unlikely( std::binary_search( hp_begin, hp_end, p->m_p )))
383                     stg.repush( p );
384                 else {
385                     p->free();
386                     ++count;
387                 }
388             }
389
390             return count;
391         }
392
393     } // namespace
394
395     CDS_EXPORT_API void smr::scan( thread_data* pThreadRec )
396     {
397         thread_record* pRec = static_cast<thread_record*>( pThreadRec );
398
399         CDS_HPSTAT( ++pRec->scan_call_count_ );
400
401         hp_vector plist;
402         size_t plist_size = last_plist_size_.load( std::memory_order_relaxed );
403         plist.reserve( plist_size );
404
405         // Stage 1: Scan HP list and insert non-null values in plist
406         thread_record* pNode = thread_list_.load( atomics::memory_order_acquire );
407         while ( pNode ) {
408             if ( pNode->m_idOwner.load( std::memory_order_relaxed ) != cds::OS::c_NullThreadId ) {
409                 copy_hazards( plist, pNode->hazards_.array_, pNode->hazards_.initial_capacity_ );
410
411                 for ( guard_block* block = pNode->hazards_.extended_list_; block; block = block->next_ )
412                     copy_hazards( plist, block->first(), defaults::c_extended_guard_block_size );
413             }
414
415             pNode = pNode->m_pNextNode.load( atomics::memory_order_relaxed );
416         }
417
418         // Store plist size for next scan() call (vector reallocation optimization)
419         if ( plist.size() > plist_size )
420             last_plist_size_.compare_exchange_weak( plist_size, plist.size(), std::memory_order_relaxed, std::memory_order_relaxed );
421
422         // Sort plist to simplify search in
423         std::sort( plist.begin(), plist.end() );
424
425         // Stage 2: Search plist
426         size_t free_count = 0;
427         size_t retired_count = 0;
428         retired_block* last_block = pRec->retired_.current_block_;
429         retired_ptr*   last_block_cell = pRec->retired_.current_cell_;
430
431         pRec->retired_.current_block_ = pRec->retired_.list_head_;
432         pRec->retired_.current_cell_ = pRec->retired_.current_block_->first();
433
434         for ( retired_block* block = pRec->retired_.list_head_; block; block = block->next_ ) {
435             bool const end_block = block == last_block;
436             size_t const size = end_block ? last_block_cell - block->first() : retired_block::c_capacity;
437
438             retired_count += retired_block::c_capacity;
439             free_count += retire_data( plist, pRec->retired_, block, size );
440
441             if ( end_block )
442                 break;
443         }
444         CDS_HPSTAT( pRec->free_call_count_ += free_count );
445
446         // If the count of freed elements is too small, increase retired array
447         if ( free_count < retired_count / 4 && last_block == pRec->retired_.list_tail_ && last_block_cell == last_block->last() )
448             pRec->retired_.extend();
449     }
450
451     CDS_EXPORT_API void smr::help_scan( thread_data* pThis )
452     {
453         assert( static_cast<thread_record*>( pThis )->m_idOwner.load( atomics::memory_order_relaxed ) == cds::OS::get_current_thread_id() );
454         CDS_HPSTAT( ++pThis->help_scan_call_count_ );
455
456         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
457         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
458         for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) )
459         {
460             if ( hprec == static_cast<thread_record*>( pThis ))
461                 continue;
462
463             // If m_bFree == true then hprec->retired_ is empty - we don't need to see it
464             if ( hprec->m_bFree.load( atomics::memory_order_acquire ) ) {
465                 assert( hprec->retired_.empty() );
466                 continue;
467             }
468
469             // Owns hprec
470             // Several threads may work concurrently so we use atomic technique
471             {
472                 cds::OS::ThreadId curOwner = hprec->m_idOwner.load( atomics::memory_order_relaxed );
473                 if ( curOwner == nullThreadId ) {
474                     if ( !hprec->m_idOwner.compare_exchange_strong( curOwner, curThreadId, atomics::memory_order_acquire, atomics::memory_order_relaxed ) )
475                         continue;
476                 }
477                 else
478                     continue;
479             }
480
481             // We own the thread record successfully. Now, we can see whether it has retired pointers.
482             // If it has ones then we move them to pThis that is private for current thread.
483             retired_array& src = hprec->retired_;
484             retired_array& dest = pThis->retired_;
485
486             for ( retired_block* block = src.list_head_; block; block = block->next_ ) {
487                 retired_ptr* last = block == src.current_block_ ? src.current_cell_ : block->last();
488                 for ( retired_ptr* p = block->first(); p != last; ++p ) {
489                     if ( !dest.push( *p ) )
490                         scan( pThis );
491                 }
492
493                 if ( block == src.current_block_ )
494                     break;
495             }
496
497             src.fini();
498             hprec->m_bFree.store( true, atomics::memory_order_relaxed );
499             hprec->m_idOwner.store( nullThreadId, atomics::memory_order_release );
500         }
501
502         scan( pThis );
503     }
504
505     CDS_EXPORT_API void smr::statistics( stat& st )
506     {
507         st.clear();
508 #   ifdef CDS_ENABLE_HPSTAT
509         for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) )
510         {
511             ++st.thread_rec_count;
512             st.guard_allocated      += hprec->hazards_.alloc_guard_count_;
513             st.guard_freed          += hprec->hazards_.free_guard_count_;
514             st.hp_extend_count      += hprec->hazards_.extend_call_count_;
515             st.retired_count        += hprec->retired_.retire_call_count_;
516             st.retired_extend_count += hprec->retired_.extend_call_count_;
517             st.free_count           += hprec->free_call_count_;
518             st.scan_count           += hprec->scan_call_count_;
519             st.help_scan_count      += hprec->help_scan_call_count_;
520         }
521
522         st.hp_block_count = hp_allocator_.block_allocated_.load( atomics::memory_order_relaxed );
523         st.retired_block_count = retired_allocator_.block_allocated_.load( atomics::memory_order_relaxed );
524 #   endif
525     }
526
527
528 }}} // namespace cds::gc::dhp
529
530 CDS_EXPORT_API /*static*/ cds::gc::DHP::stat const& cds::gc::DHP::postmortem_statistics()
531 {
532     return cds::gc::dhp::s_postmortem_stat;
533 }
534