Replace cds::lock::scoped_lock with std::unique_lock, remove cds/lock/scoped_lock.h
[libcds.git] / cds / urcu / details / sig_threaded.h
1 //$$CDS-header$$1
2
3 #ifndef _CDS_URCU_DETAILS_SIG_THREADED_H
4 #define _CDS_URCU_DETAILS_SIG_THREADED_H
5
6 #include <mutex>    //unique_lock
7 #include <cds/urcu/details/sh.h>
8 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
9
10 #include <cds/urcu/dispose_thread.h>
11 #include <cds/algo/backoff_strategy.h>
12 #include <cds/container/vyukov_mpmc_cycle_queue.h>
13
14 namespace cds { namespace urcu {
15
16     /// User-space signal-handled RCU with deferred threaded reclamation
17     /**
18         @headerfile cds/urcu/signal_threaded.h
19
20         This implementation is similar to \ref signal_buffered but separate thread is created
21         for deleting the retired objects. Like \p %signal_buffered, the class contains an internal buffer
22         where retired objects are accumulated. When the buffer becomes full,
23         the RCU \p synchronize function is called that waits until all reader/updater threads end up their read-side critical sections,
24         i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread.
25         The reclamation thread frees the buffer.
26         This synchronization cycle may be called in any thread that calls \ref retire_ptr function.
27
28         There is a wrapper \ref cds_urcu_signal_threaded_gc "gc<signal_threaded>" for \p %signal_threaded class
29         that provides unified RCU interface. You should use this wrapper class instead \p %signal_threaded
30
31         Template arguments:
32         - \p Buffer - buffer type with FIFO semantics. Default is cds::container::VyukovMPMCCycleQueue. See \ref signal_buffered
33             for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr
34             type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
35             has been placed into the buffer. The \p %signal_threaded object has a global epoch counter
36             that is incremented on each \p synchronize call. The epoch is used internally to prevent early deletion.
37         - \p Lock - mutex type, default is \p std::mutex
38         - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
39             see the description of this class for required interface.
40         - \p Backoff - back-off schema, default is cds::backoff::Default
41     */
42     template <
43         class Buffer = cds::container::VyukovMPMCCycleQueue<
44             epoch_retired_ptr
45             ,cds::opt::buffer< cds::opt::v::dynamic_buffer< epoch_retired_ptr > >
46         >
47         ,class Lock = std::mutex
48         ,class DisposerThread = dispose_thread<Buffer>
49         ,class Backoff = cds::backoff::Default
50     >
51     class signal_threaded: public details::sh_singleton< signal_threaded_tag >
52     {
53         //@cond
54         typedef details::sh_singleton< signal_threaded_tag > base_class;
55         //@endcond
56     public:
57         typedef Buffer          buffer_type ;   ///< Buffer type
58         typedef Lock            lock_type   ;   ///< Lock type
59         typedef Backoff         back_off    ;   ///< Back-off scheme
60         typedef DisposerThread  disposer_thread ;   ///< Disposer thread type
61
62         typedef signal_threaded_tag     rcu_tag ;       ///< Thread-side RCU part
63         typedef base_class::thread_gc   thread_gc ;     ///< Access lock class
64         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
65
66         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
67
68     protected:
69         //@cond
70         typedef details::sh_singleton_instance< rcu_tag >    singleton_ptr;
71
72         struct scoped_disposer {
73             void operator ()( signal_threaded * p )
74             {
75                 delete p;
76             }
77         };
78         //@endcond
79
80     protected:
81         //@cond
82         buffer_type                     m_Buffer;
83         atomics::atomic<uint64_t>    m_nCurEpoch;
84         lock_type                       m_Lock;
85         size_t const                    m_nCapacity;
86         disposer_thread                 m_DisposerThread;
87         //@endcond
88
89     public:
90         /// Returns singleton instance
91         static signal_threaded * instance()
92         {
93             return static_cast<signal_threaded *>( base_class::instance() );
94         }
95         /// Checks if the singleton is created and ready to use
96         static bool isUsed()
97         {
98             return singleton_ptr::s_pRCU != nullptr;
99         }
100
101     protected:
102         //@cond
103         signal_threaded( size_t nBufferCapacity, int nSignal = SIGUSR1 )
104             : base_class( nSignal )
105             , m_Buffer( nBufferCapacity )
106             , m_nCurEpoch( 1 )
107             , m_nCapacity( nBufferCapacity )
108         {}
109
110         // Return: true - synchronize has been called, false - otherwise
111         bool push_buffer( epoch_retired_ptr& p )
112         {
113             bool bPushed = m_Buffer.push( p );
114             if ( !bPushed || m_Buffer.size() >= capacity() ) {
115                 synchronize();
116                 if ( !bPushed )
117                     p.free();
118                 return true;
119             }
120             return false;
121         }
122
123         //@endcond
124
125     public:
126         //@cond
127         ~signal_threaded()
128         {}
129         //@endcond
130
131         /// Creates singleton object and starts reclamation thread
132         /**
133             The \p nBufferCapacity parameter defines RCU threshold.
134
135             The \p nSignal parameter defines a signal number stated for RCU, default is \p SIGUSR1
136         */
137         static void Construct( size_t nBufferCapacity = 256, int nSignal = SIGUSR1 )
138         {
139             if ( !singleton_ptr::s_pRCU ) {
140                 std::unique_ptr< signal_threaded, scoped_disposer > pRCU( new signal_threaded( nBufferCapacity, nSignal ) );
141                 pRCU->m_DisposerThread.start();
142
143                 singleton_ptr::s_pRCU = pRCU.release();
144             }
145         }
146
147         /// Destroys singleton object and terminates internal reclamation thread
148         static void Destruct( bool bDetachAll = false )
149         {
150             if ( isUsed() ) {
151                 signal_threaded * pThis = instance();
152                 if ( bDetachAll )
153                     pThis->m_ThreadList.detach_all();
154
155                 pThis->m_DisposerThread.stop( pThis->m_Buffer, pThis->m_nCurEpoch.load( atomics::memory_order_acquire ));
156
157                 delete pThis;
158                 singleton_ptr::s_pRCU = nullptr;
159             }
160         }
161
162     public:
163         /// Retires \p p pointer
164         /**
165             The method pushes \p p pointer to internal buffer.
166             When the buffer becomes full \ref synchronize function is called
167             to wait for the end of grace period and then
168             a message is sent to the reclamation thread.
169         */
170         virtual void retire_ptr( retired_ptr& p )
171         {
172             if ( p.m_p ) {
173                 epoch_retired_ptr ep( p, m_nCurEpoch.load( atomics::memory_order_acquire ) );
174                 push_buffer( ep );
175             }
176         }
177
178         /// Retires the pointer chain [\p itFirst, \p itLast)
179         template <typename ForwardIterator>
180         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
181         {
182             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
183             while ( itFirst != itLast ) {
184                 epoch_retired_ptr p( *itFirst, nEpoch );
185                 ++itFirst;
186                 push_buffer( p );
187             }
188         }
189
190         /// Waits to finish a grace period and calls disposing thread
191         void synchronize()
192         {
193             synchronize( false );
194         }
195
196         //@cond
197         void synchronize( bool bSync )
198         {
199             uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
200
201             atomics::atomic_thread_fence( atomics::memory_order_acquire );
202             {
203                 std::unique_lock<lock_type> sl( m_Lock );
204
205                 back_off bkOff;
206                 base_class::force_membar_all_threads( bkOff );
207                 base_class::switch_next_epoch();
208                 bkOff.reset();
209                 base_class::wait_for_quiescent_state( bkOff );
210                 base_class::switch_next_epoch();
211                 bkOff.reset();
212                 base_class::wait_for_quiescent_state( bkOff );
213                 base_class::force_membar_all_threads( bkOff );
214
215                 m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
216             }
217         }
218         void force_dispose()
219         {
220             synchronize( true );
221         }
222         //@endcond
223
224         /// Returns the threshold of internal buffer
225         size_t capacity() const
226         {
227             return m_nCapacity;
228         }
229
230         /// Returns the signal number stated for RCU
231         int signal_no() const
232         {
233             return base_class::signal_no();
234         }
235     };
236 }} // namespace cds::urcu
237
238 #endif // #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
239 #endif // #ifndef _CDS_URCU_DETAILS_SIG_THREADED_H