3 #ifndef CDSLIB_URCU_DETAILS_SIG_THREADED_H
4 #define CDSLIB_URCU_DETAILS_SIG_THREADED_H
6 #include <mutex> //unique_lock
7 #include <cds/urcu/details/sh.h>
8 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
10 #include <cds/urcu/dispose_thread.h>
11 #include <cds/algo/backoff_strategy.h>
12 #include <cds/container/vyukov_mpmc_cycle_queue.h>
14 namespace cds { namespace urcu {
16 /// User-space signal-handled RCU with deferred threaded reclamation
18 @headerfile cds/urcu/signal_threaded.h
20 This implementation is similar to \ref signal_buffered but separate thread is created
21 for deleting the retired objects. Like \p %signal_buffered, the class contains an internal buffer
22 where retired objects are accumulated. When the buffer becomes full,
23 the RCU \p synchronize function is called that waits until all reader/updater threads end up their read-side critical sections,
24 i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread.
25 The reclamation thread frees the buffer.
26 This synchronization cycle may be called in any thread that calls \ref retire_ptr function.
28 There is a wrapper \ref cds_urcu_signal_threaded_gc "gc<signal_threaded>" for \p %signal_threaded class
29 that provides unified RCU interface. You should use this wrapper class instead \p %signal_threaded
32 - \p Buffer - buffer type with FIFO semantics. Default is cds::container::VyukovMPMCCycleQueue. See \ref signal_buffered
33 for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr
34 type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
35 has been placed into the buffer. The \p %signal_threaded object has a global epoch counter
36 that is incremented on each \p synchronize call. The epoch is used internally to prevent early deletion.
37 - \p Lock - mutex type, default is \p std::mutex
38 - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
39 see the description of this class for required interface.
40 - \p Backoff - back-off schema, default is cds::backoff::Default
43 class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
44 ,class Lock = std::mutex
45 ,class DisposerThread = dispose_thread<Buffer>
46 ,class Backoff = cds::backoff::Default
48 class signal_threaded: public details::sh_singleton< signal_threaded_tag >
51 typedef details::sh_singleton< signal_threaded_tag > base_class;
54 typedef Buffer buffer_type ; ///< Buffer type
55 typedef Lock lock_type ; ///< Lock type
56 typedef Backoff back_off ; ///< Back-off scheme
57 typedef DisposerThread disposer_thread ; ///< Disposer thread type
59 typedef signal_threaded_tag rcu_tag ; ///< Thread-side RCU part
60 typedef base_class::thread_gc thread_gc ; ///< Access lock class
61 typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
63 static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
67 typedef details::sh_singleton_instance< rcu_tag > singleton_ptr;
69 struct scoped_disposer {
70 void operator ()( signal_threaded * p )
80 atomics::atomic<uint64_t> m_nCurEpoch;
82 size_t const m_nCapacity;
83 disposer_thread m_DisposerThread;
87 /// Returns singleton instance
88 static signal_threaded * instance()
90 return static_cast<signal_threaded *>( base_class::instance() );
92 /// Checks if the singleton is created and ready to use
95 return singleton_ptr::s_pRCU != nullptr;
100 signal_threaded( size_t nBufferCapacity, int nSignal = SIGUSR1 )
101 : base_class( nSignal )
102 , m_Buffer( nBufferCapacity )
104 , m_nCapacity( nBufferCapacity )
107 // Return: true - synchronize has been called, false - otherwise
108 bool push_buffer( epoch_retired_ptr& p )
110 bool bPushed = m_Buffer.push( p );
111 if ( !bPushed || m_Buffer.size() >= capacity() ) {
114 CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
116 CDS_TSAN_ANNOTATE_IGNORE_RW_END;
131 /// Creates singleton object and starts reclamation thread
133 The \p nBufferCapacity parameter defines RCU threshold.
135 The \p nSignal parameter defines a signal number stated for RCU, default is \p SIGUSR1
137 static void Construct( size_t nBufferCapacity = 256, int nSignal = SIGUSR1 )
139 if ( !singleton_ptr::s_pRCU ) {
140 std::unique_ptr< signal_threaded, scoped_disposer > pRCU( new signal_threaded( nBufferCapacity, nSignal ) );
141 pRCU->m_DisposerThread.start();
143 singleton_ptr::s_pRCU = pRCU.release();
147 /// Destroys singleton object and terminates internal reclamation thread
148 static void Destruct( bool bDetachAll = false )
151 signal_threaded * pThis = instance();
153 pThis->m_ThreadList.detach_all();
155 pThis->m_DisposerThread.stop( pThis->m_Buffer, pThis->m_nCurEpoch.load( atomics::memory_order_acquire ));
158 singleton_ptr::s_pRCU = nullptr;
163 /// Retires \p p pointer
165 The method pushes \p p pointer to internal buffer.
166 When the buffer becomes full \ref synchronize function is called
167 to wait for the end of grace period and then
168 a message is sent to the reclamation thread.
170 virtual void retire_ptr( retired_ptr& p )
173 epoch_retired_ptr ep( p, m_nCurEpoch.load( atomics::memory_order_acquire ) );
178 /// Retires the pointer chain [\p itFirst, \p itLast)
179 template <typename ForwardIterator>
180 void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
182 uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
183 while ( itFirst != itLast ) {
184 epoch_retired_ptr p( *itFirst, nEpoch );
190 /// Waits to finish a grace period and calls disposing thread
193 synchronize( false );
197 void synchronize( bool bSync )
199 uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
201 atomics::atomic_thread_fence( atomics::memory_order_acquire );
203 std::unique_lock<lock_type> sl( m_Lock );
206 base_class::force_membar_all_threads( bkOff );
207 base_class::switch_next_epoch();
209 base_class::wait_for_quiescent_state( bkOff );
210 base_class::switch_next_epoch();
212 base_class::wait_for_quiescent_state( bkOff );
213 base_class::force_membar_all_threads( bkOff );
215 m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
224 /// Returns the threshold of internal buffer
225 size_t capacity() const
230 /// Returns the signal number stated for RCU
231 int signal_no() const
233 return base_class::signal_no();
236 }} // namespace cds::urcu
238 #endif // #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
239 #endif // #ifndef CDSLIB_URCU_DETAILS_SIG_THREADED_H