Fixed memory leaks in threaded URCU (general_threaded, signal_threaded)
[libcds.git] / cds / urcu / details / sig_buffered.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_URCU_DETAILS_SIG_BUFFERED_H
4 #define CDSLIB_URCU_DETAILS_SIG_BUFFERED_H
5
6 #include <cds/urcu/details/sh.h>
7 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
8
9 #include <mutex>
10 #include <limits>
11 #include <cds/algo/backoff_strategy.h>
12 #include <cds/container/vyukov_mpmc_cycle_queue.h>
13
14 namespace cds { namespace urcu {
15
16     /// User-space signal-handled RCU with deferred (buffered) reclamation
17     /**
18         @headerfile cds/urcu/signal_buffered.h
19
20         This URCU implementation contains an internal buffer where retired objects are
21         accumulated. When the buffer becomes full, the RCU \p synchronize function is called
22         that waits until all reader/updater threads end up their read-side critical sections,
23         i.e. until the RCU quiescent state will come. After that the buffer and all retired objects are freed.
24         This synchronization cycle may be called in any thread that calls \p retire_ptr function.
25
26         The \p Buffer contains items of \ref cds_urcu_retired_ptr "retired_ptr" type and it should support a queue interface with
27         three function:
28         - <tt> bool push( retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
29             returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
30         - <tt>bool pop( retired_ptr& p ) </tt> - pops queue's head item into \p p parameter; if the queue is empty
31             this function must return \p false
32         - <tt>size_t size()</tt> - returns queue's item count.
33
34         The buffer is considered as full if \p push returns \p false or the buffer size reaches the RCU threshold.
35
36         There is a wrapper \ref cds_urcu_signal_buffered_gc "gc<signal_buffered>" for \p %signal_buffered class
37         that provides unified RCU interface. You should use this wrapper class instead \p %signal_buffered
38
39         Template arguments:
40         - \p Buffer - buffer type. Default is cds::container::VyukovMPMCCycleQueue
41         - \p Lock - mutex type, default is \p std::mutex
42         - \p Backoff - back-off schema, default is cds::backoff::Default
43     */
44     template <
45         class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
46         ,class Lock = std::mutex
47         ,class Backoff = cds::backoff::Default
48     >
49     class signal_buffered: public details::sh_singleton< signal_buffered_tag >
50     {
51         //@cond
52         typedef details::sh_singleton< signal_buffered_tag > base_class;
53         //@endcond
54     public:
55         typedef signal_buffered_tag rcu_tag ;  ///< RCU tag
56         typedef Buffer  buffer_type ;   ///< Buffer type
57         typedef Lock    lock_type   ;   ///< Lock type
58         typedef Backoff back_off    ;   ///< Back-off type
59
60         typedef base_class::thread_gc thread_gc ;   ///< Thread-side RCU part
61         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
62
63         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
64
65     protected:
66         //@cond
67         typedef details::sh_singleton_instance< rcu_tag >    singleton_ptr;
68         //@endcond
69
70     protected:
71         //@cond
72         buffer_type               m_Buffer;
73         atomics::atomic<uint64_t> m_nCurEpoch;
74         lock_type                 m_Lock;
75         size_t const              m_nCapacity;
76         //@endcond
77
78     public:
79         /// Returns singleton instance
80         static signal_buffered * instance()
81         {
82             return static_cast<signal_buffered *>( base_class::instance() );
83         }
84         /// Checks if the singleton is created and ready to use
85         static bool isUsed()
86         {
87             return singleton_ptr::s_pRCU != nullptr;
88         }
89
90     protected:
91         //@cond
92         signal_buffered( size_t nBufferCapacity, int nSignal = SIGUSR1 )
93             : base_class( nSignal )
94             , m_Buffer( nBufferCapacity )
95             , m_nCurEpoch(0)
96             , m_nCapacity( nBufferCapacity )
97         {}
98
99         ~signal_buffered()
100         {
101             clear_buffer( std::numeric_limits< uint64_t >::max() );
102         }
103
104         void clear_buffer( uint64_t nEpoch )
105         {
106             epoch_retired_ptr p;
107             while ( m_Buffer.pop( p )) {
108                 if ( p.m_nEpoch <= nEpoch ) {
109                     p.free();
110                 }
111                 else {
112                     push_buffer( std::move(p) );
113                     break;
114                 }
115             }
116         }
117
118         bool push_buffer( epoch_retired_ptr&& ep )
119         {
120             bool bPushed = m_Buffer.push( ep );
121             if ( !bPushed || m_Buffer.size() >= capacity() ) {
122                 synchronize();
123                 if ( !bPushed ) {
124                     ep.free();
125                 }
126                 return true;
127             }
128             return false;
129         }
130         //@endcond
131
132     public:
133         /// Creates singleton object
134         /**
135             The \p nBufferCapacity parameter defines RCU threshold.
136
137             The \p nSignal parameter defines a signal number stated for RCU, default is \p SIGUSR1
138         */
139         static void Construct( size_t nBufferCapacity = 256, int nSignal = SIGUSR1 )
140         {
141             if ( !singleton_ptr::s_pRCU )
142                 singleton_ptr::s_pRCU = new signal_buffered( nBufferCapacity, nSignal );
143         }
144
145         /// Destroys singleton object
146         static void Destruct( bool bDetachAll = false )
147         {
148             if ( isUsed() ) {
149                 instance()->clear_buffer( std::numeric_limits< uint64_t >::max());
150                 if ( bDetachAll )
151                     instance()->m_ThreadList.detach_all();
152                 delete instance();
153                 singleton_ptr::s_pRCU = nullptr;
154             }
155         }
156
157     public:
158         /// Retire \p p pointer
159         /**
160             The method pushes \p p pointer to internal buffer.
161             When the buffer becomes full \ref synchronize function is called
162             to wait for the end of grace period and then to free all pointers from the buffer.
163         */
164         virtual void retire_ptr( retired_ptr& p )
165         {
166             if ( p.m_p )
167                 push_buffer( epoch_retired_ptr( p, m_nCurEpoch.load( atomics::memory_order_relaxed )));
168         }
169
170         /// Retires the pointer chain [\p itFirst, \p itLast)
171         template <typename ForwardIterator>
172         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
173         {
174             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
175             while ( itFirst != itLast ) {
176                 epoch_retired_ptr ep( *itFirst, nEpoch );
177                 ++itFirst;
178                 push_buffer( std::move(ep));
179             }
180         }
181
182         /// Retires the pointer chain until \p Func returns \p nullptr retired pointer
183         template <typename Func>
184         void batch_retire( Func e )
185         {
186             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
187             for ( retired_ptr p{ e() }; p.m_p; ) {
188                 epoch_retired_ptr ep( p, nEpoch );
189                 p = e();
190                 push_buffer( std::move(ep));
191             }
192         }
193
194         /// Wait to finish a grace period and then clear the buffer
195         void synchronize()
196         {
197             epoch_retired_ptr ep( retired_ptr(), m_nCurEpoch.load( atomics::memory_order_relaxed ));
198             synchronize( ep );
199         }
200
201         //@cond
202         bool synchronize( epoch_retired_ptr& ep )
203         {
204             uint64_t nEpoch;
205             atomics::atomic_thread_fence( atomics::memory_order_acquire );
206             {
207                 std::unique_lock<lock_type> sl( m_Lock );
208                 if ( ep.m_p && m_Buffer.push( ep ) && m_Buffer.size() < capacity())
209                     return false;
210                 nEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_relaxed );
211
212                 back_off bkOff;
213                 base_class::force_membar_all_threads( bkOff );
214                 base_class::switch_next_epoch();
215                 bkOff.reset();
216                 base_class::wait_for_quiescent_state( bkOff );
217                 base_class::switch_next_epoch();
218                 bkOff.reset();
219                 base_class::wait_for_quiescent_state( bkOff );
220                 base_class::force_membar_all_threads( bkOff );
221             }
222
223             clear_buffer( nEpoch );
224             return true;
225         }
226         //@endcond
227
228         /// Returns the threshold of internal buffer
229         size_t capacity() const
230         {
231             return m_nCapacity;
232         }
233
234         /// Returns the signal number stated for RCU
235         int signal_no() const
236         {
237             return base_class::signal_no();
238         }
239     };
240
241 }} // namespace cds::urcu
242
243 #endif // #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
244 #endif // #ifndef CDSLIB_URCU_DETAILS_SIG_BUFFERED_H