53a7bf08e79c3af3cebf42ef7fe5542e8fb0e84d
[libcds.git] / src / dhp.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include <algorithm>
32 #include <vector>
33
34 #include <cds/gc/dhp.h>
35 #include <cds/os/thread.h>
36
37 namespace cds { namespace gc { namespace dhp {
38
39     namespace {
40         void * default_alloc_memory( size_t size )
41         {
42             return new uintptr_t[( size + sizeof( uintptr_t ) - 1 ) / sizeof( uintptr_t )];
43         }
44
45         void default_free_memory( void* p )
46         {
47             delete[] reinterpret_cast<uintptr_t*>( p );
48         }
49
50         struct defaults {
51             static size_t const c_extended_guard_block_size = 16;
52         };
53
54         void* ( *s_alloc_memory )( size_t size ) = default_alloc_memory;
55         void( *s_free_memory )( void* p ) = default_free_memory;
56
57         template <typename T>
58         class allocator
59         {
60         public:
61             typedef T   value_type;
62
63             allocator() {}
64             allocator( allocator const& ) {}
65             template <class U>
66             explicit allocator( allocator<U> const& ) {}
67
68             static T* allocate( size_t nCount )
69             {
70                 return reinterpret_cast<T*>( s_alloc_memory( sizeof( value_type ) * nCount ));
71             }
72
73             static void deallocate( T* p, size_t /*nCount*/ )
74             {
75                 s_free_memory( reinterpret_cast<void*>( p ));
76             }
77         };
78
79     } // namespace
80
81     /*static*/ CDS_EXPORT_API smr* smr::instance_ = nullptr;
82     thread_local thread_data* tls_ = nullptr;
83
84     CDS_EXPORT_API hp_allocator::~hp_allocator()
85     {
86         while ( guard_block* gp = static_cast<guard_block*>( free_list_.get())) {
87             gp->~guard_block();
88             s_free_memory( gp );
89         }
90     }
91
92     CDS_EXPORT_API guard_block* hp_allocator::alloc()
93     {
94         guard_block* gb;
95         auto block = free_list_.get();
96         if ( block )
97             gb = static_cast< guard_block* >( block );
98         else {
99             // allocate new block
100             gb = new( s_alloc_memory( sizeof( guard_block ) + sizeof( guard ) * defaults::c_extended_guard_block_size )) guard_block;
101             new ( gb->first() ) guard[defaults::c_extended_guard_block_size];
102         }
103
104         // links guards in the block
105         guard* p = gb->first();
106         for ( guard* last = p + defaults::c_extended_guard_block_size - 1; p != last; ++p ) {
107             p->clear( atomics::memory_order_relaxed );
108             p->next_ = p + 1;
109         }
110         p->next_ = nullptr;
111         p->clear();
112
113         return gb;
114     }
115
116     CDS_EXPORT_API retired_allocator::~retired_allocator()
117     {
118         while ( retired_block* rb = static_cast<retired_block*>( free_list_.get() ) ) {
119             rb->~retired_block();
120             s_free_memory( rb );
121         }
122     }
123
124     CDS_EXPORT_API retired_block* retired_allocator::alloc()
125     {
126         retired_block* rb;
127         auto block = free_list_.get();
128         if ( block )
129             rb = static_cast< retired_block* >( block );
130         else {
131             // allocate new block
132             rb = new( s_alloc_memory( sizeof( retired_block ) + sizeof( retired_ptr ) * retired_block::c_capacity )) retired_block;
133             new ( rb->first()) retired_ptr[retired_block::c_capacity];
134         }
135
136         rb->next_ = nullptr;
137         return rb;
138     }
139
140     struct smr::thread_record: thread_data
141     {
142         atomics::atomic<thread_record*>     m_pNextNode; ///< next hazard ptr record in list
143         atomics::atomic<cds::OS::ThreadId>  m_idOwner;   ///< Owner thread id; 0 - the record is free (not owned)
144         atomics::atomic<bool>               m_bFree;     ///< true if record is free (not owned)
145
146         thread_record( guard* guards, size_t guard_count )
147             : thread_data( guards, guard_count )
148             , m_bFree( false )
149         {}
150     };
151
152     /*static*/ CDS_EXPORT_API thread_data* smr::tls()
153     {
154         assert( tls_ != nullptr );
155         return tls_;
156     }
157
158     /*static*/ CDS_EXPORT_API void smr::set_memory_allocator(
159         void* ( *alloc_func )( size_t size ),
160         void( *free_func )( void * p )
161     )
162     {
163         // The memory allocation functions may be set BEFORE initializing DHP SMR!!!
164         assert( instance_ == nullptr );
165
166         s_alloc_memory = alloc_func;
167         s_free_memory = free_func;
168     }
169
170     /*static*/ CDS_EXPORT_API void smr::construct( size_t nInitialHazardPtrCount )
171     {
172         if ( !instance_ ) {
173             instance_ = new( s_alloc_memory( sizeof( smr ))) smr( nInitialHazardPtrCount );
174         }
175     }
176
177     /*static*/ CDS_EXPORT_API void smr::destruct( bool bDetachAll )
178     {
179         if ( instance_ ) {
180             if ( bDetachAll )
181                 instance_->detach_all_thread();
182
183             instance_->~smr();
184             s_free_memory( instance_ );
185             instance_ = nullptr;
186         }
187     }
188
189     CDS_EXPORT_API smr::smr( size_t nInitialHazardPtrCount )
190         : thread_list_( nullptr )
191         , initial_hazard_count_( nInitialHazardPtrCount < 4 ? 16 : nInitialHazardPtrCount )
192         , last_plist_size_( initial_hazard_count_ * 64 )
193     {}
194
195     CDS_EXPORT_API smr::~smr()
196     {
197         CDS_DEBUG_ONLY( const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId; )
198         CDS_DEBUG_ONLY( const cds::OS::ThreadId mainThreadId = cds::OS::get_current_thread_id(); )
199
200         thread_record* pHead = thread_list_.load( atomics::memory_order_relaxed );
201         thread_list_.store( nullptr, atomics::memory_order_relaxed );
202
203         thread_record* pNext = nullptr;
204         for ( thread_record* hprec = pHead; hprec; hprec = pNext )
205         {
206             assert( hprec->m_idOwner.load( atomics::memory_order_relaxed ) == nullThreadId
207                 || hprec->m_idOwner.load( atomics::memory_order_relaxed ) == mainThreadId
208                 || !cds::OS::is_thread_alive( hprec->m_idOwner.load( atomics::memory_order_relaxed ) )
209             );
210
211             retired_array& retired = hprec->retired_;
212
213             // delete retired data
214             for ( retired_block* block = retired.list_head_; block && block != retired.current_block_; block = block->next_ ) {
215                 for ( retired_ptr* p = block->first(); p != block->last(); ++p )
216                     p->free();
217             }
218             if ( retired.current_block_ ) {
219                 for ( retired_ptr* p = retired.current_block_->first(); p != retired.current_cell_; ++p )
220                     p->free();
221             }
222             hprec->retired_.fini();
223             hprec->hazards_.clear();
224
225             pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
226             hprec->m_bFree.store( true, atomics::memory_order_relaxed );
227             destroy_thread_data( hprec );
228         }
229     }
230
231     /*static*/ CDS_EXPORT_API void smr::attach_thread()
232     {
233         if ( !tls_ )
234             tls_ = instance().alloc_thread_data();
235     }
236
237     /*static*/ CDS_EXPORT_API void smr::detach_thread()
238     {
239         thread_data* rec = tls_;
240         if ( rec ) {
241             tls_ = nullptr;
242             instance().free_thread_data( static_cast<thread_record*>( rec ) );
243         }
244     }
245
246     CDS_EXPORT_API void smr::detach_all_thread()
247     {
248         thread_record * pNext = nullptr;
249         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
250
251         for ( thread_record * hprec = thread_list_.load( atomics::memory_order_relaxed ); hprec; hprec = pNext ) {
252             pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
253             if ( hprec->m_idOwner.load( atomics::memory_order_relaxed ) != nullThreadId ) {
254                 free_thread_data( hprec );
255             }
256         }
257     }
258
259     CDS_EXPORT_API smr::thread_record* smr::create_thread_data()
260     {
261         size_t const guard_array_size = sizeof( guard ) * initial_hazard_count_;
262
263         /*
264             The memory is allocated by contnuous block
265             Memory layout:
266             +--------------------------+
267             |                          |
268             | thread_record            |
269             |         hazards_         +---+
270             |         retired_         |   |
271             |                          |   |
272             |--------------------------|   |
273             | hazard_ptr[]             |<--+
274             |  initial HP array        |
275             |                          |
276             +--------------------------+
277         */
278
279         char* mem = reinterpret_cast<char*>( s_alloc_memory( sizeof( thread_record ) + guard_array_size ));
280         return new( mem ) thread_record(
281             reinterpret_cast<guard*>( mem + sizeof( thread_record ) ), initial_hazard_count_
282         );
283     }
284
285     /*static*/ CDS_EXPORT_API void smr::destroy_thread_data( thread_record* pRec )
286     {
287         // all retired pointers must be freed
288         pRec->~thread_record();
289         s_free_memory( pRec );
290     }
291
292     CDS_EXPORT_API smr::thread_record* smr::alloc_thread_data()
293     {
294         thread_record * hprec = nullptr;
295         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
296         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
297
298         // First try to reuse a free (non-active) DHP record
299         for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) ) {
300             cds::OS::ThreadId thId = nullThreadId;
301             if ( !hprec->m_idOwner.compare_exchange_strong( thId, curThreadId, atomics::memory_order_relaxed, atomics::memory_order_relaxed ) )
302                 continue;
303             hprec->m_bFree.store( false, atomics::memory_order_release );
304             break;
305         }
306         
307         if ( !hprec ) {
308             // No HP records available for reuse
309             // Allocate and push a new HP record
310             hprec = create_thread_data();
311             hprec->m_idOwner.store( curThreadId, atomics::memory_order_relaxed );
312
313             thread_record* pOldHead = thread_list_.load( atomics::memory_order_relaxed );
314             do {
315                 hprec->m_pNextNode.store( pOldHead, atomics::memory_order_relaxed );
316             } while ( !thread_list_.compare_exchange_weak( pOldHead, hprec, atomics::memory_order_release, atomics::memory_order_acquire ) );
317         }
318
319         hprec->hazards_.init();
320         hprec->retired_.init();
321
322         return hprec;
323     }
324
325     CDS_EXPORT_API void smr::free_thread_data( thread_record* pRec )
326     {
327         assert( pRec != nullptr );
328         //CDS_HAZARDPTR_STATISTIC( ++m_Stat.m_RetireHPRec )
329
330         pRec->hazards_.clear();
331         scan( pRec );
332         help_scan( pRec );
333
334         if ( pRec->retired_.empty() ) {
335             pRec->retired_.fini();
336             pRec->m_bFree.store( true, std::memory_order_release );
337         }
338         else {
339             // Free all empty blocks
340             retired_block* free_block = pRec->retired_.current_block_->next_;
341             if ( free_block ) {
342                 pRec->retired_.current_block_->next_ = nullptr;
343                 while ( free_block ) {
344                     retired_block* next = free_block->next_;
345                     retired_allocator_.free( free_block );
346                     free_block = next;
347                     --pRec->retired_.block_count_;
348                 }
349             }
350         }
351
352         pRec->m_idOwner.store( cds::OS::c_NullThreadId, atomics::memory_order_release );
353     }
354
355     namespace {
356         typedef std::vector<void*, allocator<void*>> hp_vector;
357
358         inline void copy_hazards( hp_vector& vect, guard const* arr, size_t size )
359         {
360             for ( guard const* end = arr + size; arr != end; ++arr ) {
361                 void* hp = arr->get();
362                 if ( hp )
363                     vect.push_back( hp );
364             }
365         }
366
367         inline size_t retire_data( hp_vector const& plist, retired_array& stg, retired_block* block, size_t block_size )
368         {
369             auto hp_begin = plist.begin();
370             auto hp_end = plist.end();
371             size_t count = 0;
372
373             for ( retired_ptr* p = block->first(), *end = p + block_size; p != end; ++p ) {
374                 if ( cds_unlikely( std::binary_search( hp_begin, hp_end, p->m_p )))
375                     stg.safe_push( p );
376                 else {
377                     p->free();
378                     ++count;
379                 }
380             }
381
382             return count;
383         }
384
385     } // namespace
386
387     CDS_EXPORT_API void smr::scan( thread_data* pThreadRec )
388     {
389         thread_record* pRec = static_cast<thread_record*>( pThreadRec );
390
391         hp_vector plist;
392         size_t plist_size = last_plist_size_.load( std::memory_order_relaxed );
393         plist.reserve( plist_size );
394
395         // Stage 1: Scan HP list and insert non-null values in plist
396         thread_record* pNode = thread_list_.load( atomics::memory_order_acquire );
397         while ( pNode ) {
398             if ( pNode->m_idOwner.load( std::memory_order_relaxed ) != cds::OS::c_NullThreadId ) {
399                 copy_hazards( plist, pNode->hazards_.array_, pNode->hazards_.initial_capacity_ );
400
401                 for ( guard_block* block = pNode->hazards_.extended_list_; block; block = block->next_ )
402                     copy_hazards( plist, block->first(), defaults::c_extended_guard_block_size );
403             }
404
405             pNode = pNode->m_pNextNode.load( atomics::memory_order_relaxed );
406         }
407
408         // Store plist size for next scan() call (vector reallocation optimization)
409         if ( plist.size() > plist_size )
410             last_plist_size_.compare_exchange_weak( plist_size, plist.size(), std::memory_order_relaxed, std::memory_order_relaxed );
411
412         // Sort plist to simplify search in
413         std::sort( plist.begin(), plist.end() );
414
415         // Stage 2: Search plist
416         size_t free_count = 0;
417         retired_block* last_block = pRec->retired_.current_block_;
418         retired_ptr*   last_block_cell = pRec->retired_.current_cell_;
419
420         pRec->retired_.current_block_ = pRec->retired_.list_head_;
421         pRec->retired_.current_cell_ = pRec->retired_.current_block_->first();
422
423         for ( retired_block* block = pRec->retired_.list_head_; block; block = block->next_ ) {
424             bool const end_block = block == last_block;
425             size_t const size = end_block ? last_block_cell - block->first() : retired_block::c_capacity;
426
427             free_count += retire_data( plist, pRec->retired_, block, size );
428
429             if ( end_block )
430                 break;
431         }
432
433         // If the count of freed elements is too small, increase retired array
434         if ( free_count == 0 && last_block == pRec->retired_.list_tail_ && last_block_cell == last_block->last() )
435             pRec->retired_.extend();
436     }
437
438     CDS_EXPORT_API void smr::help_scan( thread_data* pThis )
439     {
440         assert( static_cast<thread_record*>( pThis )->m_idOwner.load( atomics::memory_order_relaxed ) == cds::OS::get_current_thread_id() );
441
442         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
443         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
444         for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) )
445         {
446             // If m_bFree == true then hprec->retired_ is empty - we don't need to see it
447             if ( hprec->m_bFree.load( atomics::memory_order_acquire ) ) {
448                 assert( hprec->retired_.empty() );
449                 continue;
450             }
451
452             // Owns hprec
453             // Several threads may work concurrently so we use atomic technique
454             {
455                 cds::OS::ThreadId curOwner = hprec->m_idOwner.load( atomics::memory_order_relaxed );
456                 if ( curOwner == nullThreadId || !cds::OS::is_thread_alive( curOwner ) ) {
457                     if ( !hprec->m_idOwner.compare_exchange_strong( curOwner, curThreadId, atomics::memory_order_acquire, atomics::memory_order_relaxed ) )
458                         continue;
459                 }
460                 else
461                     continue;
462             }
463
464             // We own the thread record successfully. Now, we can see whether it has retired pointers.
465             // If it has ones then we move to pThis that is private for current thread.
466             retired_array& src = hprec->retired_;
467             retired_array& dest = pThis->retired_;
468
469             for ( retired_block* block = src.list_head_; block; block = block->next_ ) {
470                 retired_ptr* last = block == src.current_block_ ? src.current_cell_ : block->last();
471                 for ( retired_ptr* p = block->first(); p != last; ++p ) {
472                     if ( !dest.push( *p ) )
473                         scan( pThis );
474                 }
475
476                 if ( block == src.current_block_ )
477                     break;
478             }
479
480             src.fini();
481             hprec->m_bFree.store( true, atomics::memory_order_relaxed );
482             hprec->m_idOwner.store( nullThreadId, atomics::memory_order_release );
483         }
484
485         scan( pThis );
486     }
487
488 }}} // namespace cds::gc::dhp