2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_URCU_DETAILS_SIG_THREADED_H
32 #define CDSLIB_URCU_DETAILS_SIG_THREADED_H
34 #include <cds/urcu/details/sh.h>
35 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
37 #include <mutex> //unique_lock
39 #include <cds/urcu/dispose_thread.h>
40 #include <cds/algo/backoff_strategy.h>
41 #include <cds/container/vyukov_mpmc_cycle_queue.h>
43 namespace cds { namespace urcu {
45 /// User-space signal-handled RCU with deferred threaded reclamation
47 @headerfile cds/urcu/signal_threaded.h
49 This implementation is similar to \ref signal_buffered but separate thread is created
50 for deleting the retired objects. Like \p %signal_buffered, the class contains an internal buffer
51 where retired objects are accumulated. When the buffer becomes full,
52 the RCU \p synchronize function is called that waits until all reader/updater threads end up their read-side critical sections,
53 i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread.
54 The reclamation thread frees the buffer.
55 This synchronization cycle may be called in any thread that calls \ref retire_ptr function.
57 There is a wrapper \ref cds_urcu_signal_threaded_gc "gc<signal_threaded>" for \p %signal_threaded class
58 that provides unified RCU interface. You should use this wrapper class instead \p %signal_threaded
61 - \p Buffer - buffer type with FIFO semantics. Default is \p cds::container::VyukovMPSCCycleQueue. See \ref signal_buffered
62 for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr
63 type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
64 has been placed into the buffer. The \p %signal_threaded object has a global epoch counter
65 that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion.
66 - \p Lock - mutex type, default is \p std::mutex
67 - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
68 see the description of this class for required interface.
69 - \p Backoff - back-off schema, default is cds::backoff::Default
72 class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
73 ,class Lock = std::mutex
74 ,class DisposerThread = dispose_thread<Buffer>
75 ,class Backoff = cds::backoff::Default
77 class signal_threaded: public details::sh_singleton< signal_threaded_tag >
80 typedef details::sh_singleton< signal_threaded_tag > base_class;
83 typedef Buffer buffer_type ; ///< Buffer type
84 typedef Lock lock_type ; ///< Lock type
85 typedef Backoff back_off ; ///< Back-off scheme
86 typedef DisposerThread disposer_thread ; ///< Disposer thread type
88 typedef signal_threaded_tag rcu_tag ; ///< Thread-side RCU part
89 typedef base_class::thread_gc thread_gc ; ///< Access lock class
90 typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
92 static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
96 typedef details::sh_singleton_instance< rcu_tag > singleton_ptr;
98 struct scoped_disposer {
99 void operator ()( signal_threaded * p )
108 buffer_type m_Buffer;
109 atomics::atomic<uint64_t> m_nCurEpoch;
111 size_t const m_nCapacity;
112 disposer_thread m_DisposerThread;
116 /// Returns singleton instance
117 static signal_threaded * instance()
119 return static_cast<signal_threaded *>( base_class::instance() );
121 /// Checks if the singleton is created and ready to use
124 return singleton_ptr::s_pRCU != nullptr;
129 signal_threaded( size_t nBufferCapacity, int nSignal = SIGUSR1 )
130 : base_class( nSignal )
131 , m_Buffer( nBufferCapacity )
133 , m_nCapacity( nBufferCapacity )
136 // Return: true - synchronize has been called, false - otherwise
137 bool push_buffer( epoch_retired_ptr&& p )
139 bool bPushed = m_Buffer.push( p );
140 if ( !bPushed || m_Buffer.size() >= capacity() ) {
158 /// Creates singleton object and starts reclamation thread
160 The \p nBufferCapacity parameter defines RCU threshold.
162 The \p nSignal parameter defines a signal number stated for RCU, default is \p SIGUSR1
164 static void Construct( size_t nBufferCapacity = 256, int nSignal = SIGUSR1 )
166 if ( !singleton_ptr::s_pRCU ) {
167 std::unique_ptr< signal_threaded, scoped_disposer > pRCU( new signal_threaded( nBufferCapacity, nSignal ) );
168 pRCU->m_DisposerThread.start();
170 singleton_ptr::s_pRCU = pRCU.release();
174 /// Destroys singleton object and terminates internal reclamation thread
175 static void Destruct( bool bDetachAll = false )
178 signal_threaded * pThis = instance();
180 pThis->m_ThreadList.detach_all();
182 pThis->m_DisposerThread.stop( pThis->m_Buffer, std::numeric_limits< uint64_t >::max());
185 singleton_ptr::s_pRCU = nullptr;
190 /// Retires \p p pointer
192 The method pushes \p p pointer to internal buffer.
193 When the buffer becomes full \ref synchronize function is called
194 to wait for the end of grace period and then
195 a message is sent to the reclamation thread.
197 virtual void retire_ptr( retired_ptr& p )
200 push_buffer( epoch_retired_ptr( p, m_nCurEpoch.load( atomics::memory_order_acquire )));
203 /// Retires the pointer chain [\p itFirst, \p itLast)
204 template <typename ForwardIterator>
205 void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
207 uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
208 while ( itFirst != itLast ) {
209 epoch_retired_ptr ep( *itFirst, nEpoch );
211 push_buffer( std::move(ep));
215 /// Retires the pointer chain until \p Func returns \p nullptr retired pointer
216 template <typename Func>
217 void batch_retire( Func e )
219 uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
220 for ( retired_ptr p{ e() }; p.m_p; ) {
221 epoch_retired_ptr ep( p, nEpoch );
223 push_buffer( std::move(ep));
228 /// Waits to finish a grace period and calls disposing thread
231 synchronize( false );
235 void synchronize( bool bSync )
237 uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
240 std::unique_lock<lock_type> sl( m_Lock );
243 base_class::force_membar_all_threads( bkOff );
244 base_class::switch_next_epoch();
246 base_class::wait_for_quiescent_state( bkOff );
247 base_class::switch_next_epoch();
249 base_class::wait_for_quiescent_state( bkOff );
250 base_class::force_membar_all_threads( bkOff );
252 m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
261 /// Returns the threshold of internal buffer
262 size_t capacity() const
267 /// Returns the signal number stated for RCU
268 int signal_no() const
270 return base_class::signal_no();
273 }} // namespace cds::urcu
275 #endif // #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
276 #endif // #ifndef CDSLIB_URCU_DETAILS_SIG_THREADED_H