Adds a few single-threaded test cases for queue, stack, and set
[libcds.git] / src / dhp.cpp
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include <algorithm>
32 #include <vector>
33
34 #include <cds/gc/dhp.h>
35 #include <cds/os/thread.h>
36
37 namespace cds { namespace gc { namespace dhp {
38
39     namespace {
40         void * default_alloc_memory( size_t size )
41         {
42             return new uintptr_t[( size + sizeof( uintptr_t ) - 1 ) / sizeof( uintptr_t )];
43         }
44
45         void default_free_memory( void* p )
46         {
47             delete[] reinterpret_cast<uintptr_t*>( p );
48         }
49
50         struct defaults {
51             static size_t const c_extended_guard_block_size = 16;
52         };
53
54         void* ( *s_alloc_memory )( size_t size ) = default_alloc_memory;
55         void( *s_free_memory )( void* p ) = default_free_memory;
56
57         template <typename T>
58         class allocator
59         {
60         public:
61             typedef T   value_type;
62
63             allocator() {}
64             allocator( allocator const& ) {}
65             template <class U>
66             explicit allocator( allocator<U> const& ) {}
67
68             static T* allocate( size_t nCount )
69             {
70                 return reinterpret_cast<T*>( s_alloc_memory( sizeof( value_type ) * nCount ));
71             }
72
73             static void deallocate( T* p, size_t /*nCount*/ )
74             {
75                 s_free_memory( reinterpret_cast<void*>( p ));
76             }
77         };
78
79         stat s_postmortem_stat;
80     } // namespace
81
82     /*static*/ CDS_EXPORT_API smr* smr::instance_ = nullptr;
83     thread_local thread_data* tls_ = nullptr;
84
85     CDS_EXPORT_API hp_allocator::~hp_allocator()
86     {
87         while ( guard_block* gp = static_cast<guard_block*>( free_list_.get())) {
88             gp->~guard_block();
89             s_free_memory( gp );
90         }
91     }
92
93     CDS_EXPORT_API guard_block* hp_allocator::alloc()
94     {
95         guard_block* gb;
96         auto block = free_list_.get();
97         if ( block )
98             gb = static_cast< guard_block* >( block );
99         else {
100             // allocate new block
101             gb = new( s_alloc_memory( sizeof( guard_block ) + sizeof( guard ) * defaults::c_extended_guard_block_size )) guard_block;
102             new ( gb->first()) guard[defaults::c_extended_guard_block_size];
103
104             CDS_HPSTAT( block_allocated_.fetch_add( 1, atomics::memory_order_relaxed ));
105         }
106
107         // links guards in the block
108         guard* p = gb->first();
109         for ( guard* last = p + defaults::c_extended_guard_block_size - 1; p != last; ++p ) {
110             p->clear( atomics::memory_order_relaxed );
111             p->next_ = p + 1;
112         }
113         p->next_ = nullptr;
114         p->clear();
115
116         return gb;
117     }
118
119     CDS_EXPORT_API retired_allocator::~retired_allocator()
120     {
121         while ( retired_block* rb = static_cast<retired_block*>( free_list_.get())) {
122             rb->~retired_block();
123             s_free_memory( rb );
124         }
125     }
126
127     CDS_EXPORT_API retired_block* retired_allocator::alloc()
128     {
129         retired_block* rb;
130         auto block = free_list_.get();
131         if ( block )
132             rb = static_cast< retired_block* >( block );
133         else {
134             // allocate new block
135             rb = new( s_alloc_memory( sizeof( retired_block ) + sizeof( retired_ptr ) * retired_block::c_capacity )) retired_block;
136             new ( rb->first()) retired_ptr[retired_block::c_capacity];
137             CDS_HPSTAT( block_allocated_.fetch_add( 1, atomics::memory_order_relaxed ));
138         }
139
140         rb->next_ = nullptr;
141         return rb;
142     }
143
144     struct smr::thread_record: thread_data
145     {
146         atomics::atomic<thread_record*>     m_pNextNode; ///< next hazard ptr record in list
147         atomics::atomic<cds::OS::ThreadId>  m_idOwner;   ///< Owner thread id; 0 - the record is free (not owned)
148         atomics::atomic<bool>               m_bFree;     ///< true if record is free (not owned)
149
150         thread_record( guard* guards, size_t guard_count )
151             : thread_data( guards, guard_count )
152             , m_pNextNode( nullptr )
153             , m_idOwner( cds::OS::c_NullThreadId )
154             , m_bFree( false )
155         {}
156     };
157
158     /*static*/ CDS_EXPORT_API thread_data* smr::tls()
159     {
160         assert( tls_ != nullptr );
161         return tls_;
162     }
163
164     /*static*/ CDS_EXPORT_API void smr::set_memory_allocator(
165         void* ( *alloc_func )( size_t size ),
166         void( *free_func )( void * p )
167     )
168     {
169         // The memory allocation functions may be set BEFORE initializing DHP SMR!!!
170         assert( instance_ == nullptr );
171
172         s_alloc_memory = alloc_func;
173         s_free_memory = free_func;
174     }
175
176     /*static*/ CDS_EXPORT_API void smr::construct( size_t nInitialHazardPtrCount )
177     {
178         if ( !instance_ ) {
179             instance_ = new( s_alloc_memory( sizeof( smr ))) smr( nInitialHazardPtrCount );
180         }
181     }
182
183     /*static*/ CDS_EXPORT_API void smr::destruct( bool bDetachAll )
184     {
185         if ( instance_ ) {
186             if ( bDetachAll )
187                 instance_->detach_all_thread();
188
189             instance_->~smr();
190             s_free_memory( instance_ );
191             instance_ = nullptr;
192         }
193     }
194
195     CDS_EXPORT_API smr::smr( size_t nInitialHazardPtrCount )
196         : initial_hazard_count_( nInitialHazardPtrCount < 4 ? 16 : nInitialHazardPtrCount )
197         , last_plist_size_( initial_hazard_count_ * 64 )
198     {
199         thread_list_.store( nullptr, atomics::memory_order_release );
200     }
201
202     CDS_EXPORT_API smr::~smr()
203     {
204         CDS_DEBUG_ONLY( const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId; )
205         CDS_DEBUG_ONLY( const cds::OS::ThreadId mainThreadId = cds::OS::get_current_thread_id(); )
206
207         CDS_HPSTAT( statistics( s_postmortem_stat ));
208
209         thread_record* pHead = thread_list_.load( atomics::memory_order_relaxed );
210         thread_list_.store( nullptr, atomics::memory_order_release );
211
212         thread_record* pNext = nullptr;
213         for ( thread_record* hprec = pHead; hprec; hprec = pNext )
214         {
215             assert( hprec->m_idOwner.load( atomics::memory_order_relaxed ) == nullThreadId
216                 || hprec->m_idOwner.load( atomics::memory_order_relaxed ) == mainThreadId );
217
218             retired_array& retired = hprec->retired_;
219
220             // delete retired data
221             for ( retired_block* block = retired.list_head_; block && block != retired.current_block_; block = block->next_ ) {
222                 for ( retired_ptr* p = block->first(); p != block->last(); ++p ) {
223                     p->free();
224                     CDS_HPSTAT( ++s_postmortem_stat.free_count );
225                 }
226             }
227             if ( retired.current_block_ ) {
228                 for ( retired_ptr* p = retired.current_block_->first(); p != retired.current_cell_; ++p ) {
229                     p->free();
230                     CDS_HPSTAT( ++s_postmortem_stat.free_count );
231                 }
232             }
233             hprec->retired_.fini();
234             hprec->hazards_.clear();
235
236             pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
237             hprec->m_bFree.store( true, atomics::memory_order_relaxed );
238             destroy_thread_data( hprec );
239         }
240     }
241
242     /*static*/ CDS_EXPORT_API void smr::attach_thread()
243     {
244         if ( !tls_ )
245             tls_ = instance().alloc_thread_data();
246     }
247
248     /*static*/ CDS_EXPORT_API void smr::detach_thread()
249     {
250         thread_data* rec = tls_;
251         if ( rec ) {
252             tls_ = nullptr;
253             instance().free_thread_data( static_cast<thread_record*>( rec ));
254         }
255     }
256
257     CDS_EXPORT_API void smr::detach_all_thread()
258     {
259         thread_record * pNext = nullptr;
260         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
261
262         for ( thread_record * hprec = thread_list_.load( atomics::memory_order_relaxed ); hprec; hprec = pNext ) {
263             pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
264             if ( hprec->m_idOwner.load( atomics::memory_order_relaxed ) != nullThreadId ) {
265                 free_thread_data( hprec );
266             }
267         }
268     }
269
270     CDS_EXPORT_API smr::thread_record* smr::create_thread_data()
271     {
272         size_t const guard_array_size = sizeof( guard ) * initial_hazard_count_;
273
274         /*
275             The memory is allocated by contnuous block
276             Memory layout:
277             +--------------------------+
278             |                          |
279             | thread_record            |
280             |         hazards_         +---+
281             |         retired_         |   |
282             |                          |   |
283             |--------------------------|   |
284             | hazard_ptr[]             |<--+
285             |  initial HP array        |
286             |                          |
287             +--------------------------+
288         */
289
290         char* mem = reinterpret_cast<char*>( s_alloc_memory( sizeof( thread_record ) + guard_array_size ));
291         return new( mem ) thread_record(
292             reinterpret_cast<guard*>( mem + sizeof( thread_record )), initial_hazard_count_
293         );
294     }
295
296     /*static*/ CDS_EXPORT_API void smr::destroy_thread_data( thread_record* pRec )
297     {
298         // all retired pointers must be freed
299         pRec->~thread_record();
300         s_free_memory( pRec );
301     }
302
303     CDS_EXPORT_API smr::thread_record* smr::alloc_thread_data()
304     {
305         thread_record * hprec = nullptr;
306         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
307         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
308
309         // First try to reuse a free (non-active) DHP record
310         for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_acquire )) {
311             cds::OS::ThreadId thId = nullThreadId;
312             if ( !hprec->m_idOwner.compare_exchange_strong( thId, curThreadId, atomics::memory_order_relaxed, atomics::memory_order_relaxed ))
313                 continue;
314             hprec->m_bFree.store( false, atomics::memory_order_release );
315             break;
316         }
317
318         if ( !hprec ) {
319             // No HP records available for reuse
320             // Allocate and push a new HP record
321             hprec = create_thread_data();
322             hprec->m_idOwner.store( curThreadId, atomics::memory_order_relaxed );
323
324             thread_record* pOldHead = thread_list_.load( atomics::memory_order_acquire );
325             do {
326                 hprec->m_pNextNode.store( pOldHead, atomics::memory_order_release );
327             } while ( !thread_list_.compare_exchange_weak( pOldHead, hprec, atomics::memory_order_release, atomics::memory_order_acquire ));
328         }
329
330         hprec->hazards_.init();
331         hprec->retired_.init();
332
333         return hprec;
334     }
335
336     CDS_EXPORT_API void smr::free_thread_data( thread_record* pRec )
337     {
338         assert( pRec != nullptr );
339         //CDS_HAZARDPTR_STATISTIC( ++m_Stat.m_RetireHPRec )
340
341         pRec->hazards_.clear();
342         scan( pRec );
343         help_scan( pRec );
344
345         if ( pRec->retired_.empty()) {
346             pRec->retired_.fini();
347             pRec->m_bFree.store( true, std::memory_order_release );
348         }
349         else {
350             // Free all empty blocks
351             retired_block* free_block = pRec->retired_.current_block_->next_;
352             if ( free_block ) {
353                 pRec->retired_.current_block_->next_ = nullptr;
354                 while ( free_block ) {
355                     retired_block* next = free_block->next_;
356                     retired_allocator_.free( free_block );
357                     free_block = next;
358                     --pRec->retired_.block_count_;
359                 }
360             }
361         }
362
363         pRec->m_idOwner.store( cds::OS::c_NullThreadId, atomics::memory_order_release );
364     }
365
366     namespace {
367         typedef std::vector<void*, allocator<void*>> hp_vector;
368
369         inline void copy_hazards( hp_vector& vect, guard const* arr, size_t size )
370         {
371             for ( guard const* end = arr + size; arr != end; ++arr ) {
372                 void* hp = arr->get();
373                 if ( hp )
374                     vect.push_back( hp );
375             }
376         }
377
378         inline size_t retire_data( hp_vector const& plist, retired_array& stg, retired_block* block, size_t block_size )
379         {
380             auto hp_begin = plist.begin();
381             auto hp_end = plist.end();
382             size_t count = 0;
383
384             for ( retired_ptr* p = block->first(), *end = p + block_size; p != end; ++p ) {
385                 if ( cds_unlikely( std::binary_search( hp_begin, hp_end, p->m_p )))
386                     stg.repush( p );
387                 else {
388                     p->free();
389                     ++count;
390                 }
391             }
392
393             return count;
394         }
395
396     } // namespace
397
398     CDS_EXPORT_API void smr::scan( thread_data* pThreadRec )
399     {
400         thread_record* pRec = static_cast<thread_record*>( pThreadRec );
401
402         CDS_HPSTAT( ++pRec->scan_call_count_ );
403
404         hp_vector plist;
405         size_t plist_size = last_plist_size_.load( std::memory_order_relaxed );
406         plist.reserve( plist_size );
407
408         // Stage 1: Scan HP list and insert non-null values in plist
409         thread_record* pNode = thread_list_.load( atomics::memory_order_acquire );
410         while ( pNode ) {
411             if ( pNode->m_idOwner.load( std::memory_order_relaxed ) != cds::OS::c_NullThreadId ) {
412                 copy_hazards( plist, pNode->hazards_.array_, pNode->hazards_.initial_capacity_ );
413
414                 for ( guard_block* block = pNode->hazards_.extended_list_.load( atomics::memory_order_acquire );
415                     block;
416                     block = block->next_block_.load( atomics::memory_order_acquire ))
417                 {
418                     copy_hazards( plist, block->first(), defaults::c_extended_guard_block_size );
419                 }
420             }
421
422             pNode = pNode->m_pNextNode.load( atomics::memory_order_relaxed );
423         }
424
425         // Store plist size for next scan() call (vector reallocation optimization)
426         if ( plist.size() > plist_size )
427             last_plist_size_.compare_exchange_weak( plist_size, plist.size(), std::memory_order_relaxed, std::memory_order_relaxed );
428
429         // Sort plist to simplify search in
430         std::sort( plist.begin(), plist.end());
431
432         // Stage 2: Search plist
433         size_t free_count = 0;
434         size_t retired_count = 0;
435         retired_block* last_block = pRec->retired_.current_block_;
436         retired_ptr*   last_block_cell = pRec->retired_.current_cell_;
437
438         pRec->retired_.current_block_ = pRec->retired_.list_head_;
439         pRec->retired_.current_cell_ = pRec->retired_.current_block_->first();
440
441         for ( retired_block* block = pRec->retired_.list_head_; block; block = block->next_ ) {
442             bool const end_block = block == last_block;
443             size_t const size = end_block ? last_block_cell - block->first() : retired_block::c_capacity;
444
445             retired_count += retired_block::c_capacity;
446             free_count += retire_data( plist, pRec->retired_, block, size );
447
448             if ( end_block )
449                 break;
450         }
451         CDS_HPSTAT( pRec->free_call_count_ += free_count );
452
453         // If the count of freed elements is too small, increase retired array
454         if ( free_count < retired_count / 4 && last_block == pRec->retired_.list_tail_ && last_block_cell == last_block->last())
455             pRec->retired_.extend();
456     }
457
458     CDS_EXPORT_API void smr::help_scan( thread_data* pThis )
459     {
460         assert( static_cast<thread_record*>( pThis )->m_idOwner.load( atomics::memory_order_relaxed ) == cds::OS::get_current_thread_id());
461         CDS_HPSTAT( ++pThis->help_scan_call_count_ );
462
463         const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
464         const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
465         for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ))
466         {
467             if ( hprec == static_cast<thread_record*>( pThis ))
468                 continue;
469
470             // If m_bFree == true then hprec->retired_ is empty - we don't need to see it
471             if ( hprec->m_bFree.load( atomics::memory_order_acquire )) {
472                 CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN;
473                 assert( hprec->retired_.empty());
474                 CDS_TSAN_ANNOTATE_IGNORE_READS_END;
475                 continue;
476             }
477
478             // Owns hprec
479             // Several threads may work concurrently so we use atomic technique
480             {
481                 cds::OS::ThreadId curOwner = hprec->m_idOwner.load( atomics::memory_order_relaxed );
482                 if ( curOwner == nullThreadId ) {
483                     if ( !hprec->m_idOwner.compare_exchange_strong( curOwner, curThreadId, atomics::memory_order_acquire, atomics::memory_order_relaxed ))
484                         continue;
485                 }
486                 else
487                     continue;
488             }
489
490             // We own the thread record successfully. Now, we can see whether it has retired pointers.
491             // If it has ones then we move them to pThis that is private for current thread.
492             retired_array& src = hprec->retired_;
493             retired_array& dest = pThis->retired_;
494
495             for ( retired_block* block = src.list_head_; block; block = block->next_ ) {
496                 retired_ptr* last = block == src.current_block_ ? src.current_cell_ : block->last();
497                 for ( retired_ptr* p = block->first(); p != last; ++p ) {
498                     if ( !dest.push( *p ))
499                         scan( pThis );
500                 }
501
502                 if ( block == src.current_block_ )
503                     break;
504             }
505
506             src.fini();
507             hprec->m_bFree.store( true, atomics::memory_order_relaxed );
508             hprec->m_idOwner.store( nullThreadId, atomics::memory_order_release );
509         }
510
511         scan( pThis );
512     }
513
514     CDS_EXPORT_API void smr::statistics( stat& st )
515     {
516         st.clear();
517 #   ifdef CDS_ENABLE_HPSTAT
518         for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ))
519         {
520             CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN;
521             ++st.thread_rec_count;
522             st.guard_allocated      += hprec->hazards_.alloc_guard_count_;
523             st.guard_freed          += hprec->hazards_.free_guard_count_;
524             st.hp_extend_count      += hprec->hazards_.extend_call_count_;
525             st.retired_count        += hprec->retired_.retire_call_count_;
526             st.retired_extend_count += hprec->retired_.extend_call_count_;
527             st.free_count           += hprec->free_call_count_;
528             st.scan_count           += hprec->scan_call_count_;
529             st.help_scan_count      += hprec->help_scan_call_count_;
530             CDS_TSAN_ANNOTATE_IGNORE_READS_END;
531         }
532
533         CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN;
534         st.hp_block_count = hp_allocator_.block_allocated_.load( atomics::memory_order_relaxed );
535         st.retired_block_count = retired_allocator_.block_allocated_.load( atomics::memory_order_relaxed );
536         CDS_TSAN_ANNOTATE_IGNORE_READS_END;
537 #   endif
538     }
539
540
541 }}} // namespace cds::gc::dhp
542
543 CDS_EXPORT_API /*static*/ cds::gc::DHP::stat const& cds::gc::DHP::postmortem_statistics()
544 {
545     return cds::gc::dhp::s_postmortem_stat;
546 }
547