typo fixed
[libcds.git] / cds / urcu / details / sig_buffered.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_URCU_DETAILS_SIG_BUFFERED_H
4 #define CDSLIB_URCU_DETAILS_SIG_BUFFERED_H
5
6 #include <cds/urcu/details/sh.h>
7 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
8
9 #include <mutex>
10 #include <cds/algo/backoff_strategy.h>
11 #include <cds/container/vyukov_mpmc_cycle_queue.h>
12
13 namespace cds { namespace urcu {
14
15     /// User-space signal-handled RCU with deferred (buffered) reclamation
16     /**
17         @headerfile cds/urcu/signal_buffered.h
18
19         This URCU implementation contains an internal buffer where retired objects are
20         accumulated. When the buffer becomes full, the RCU \p synchronize function is called
21         that waits until all reader/updater threads end up their read-side critical sections,
22         i.e. until the RCU quiescent state will come. After that the buffer and all retired objects are freed.
23         This synchronization cycle may be called in any thread that calls \p retire_ptr function.
24
25         The \p Buffer contains items of \ref cds_urcu_retired_ptr "retired_ptr" type and it should support a queue interface with
26         three function:
27         - <tt> bool push( retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
28             returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
29         - <tt>bool pop( retired_ptr& p ) </tt> - pops queue's head item into \p p parameter; if the queue is empty
30             this function must return \p false
31         - <tt>size_t size()</tt> - returns queue's item count.
32
33         The buffer is considered as full if \p push returns \p false or the buffer size reaches the RCU threshold.
34
35         There is a wrapper \ref cds_urcu_signal_buffered_gc "gc<signal_buffered>" for \p %signal_buffered class
36         that provides unified RCU interface. You should use this wrapper class instead \p %signal_buffered
37
38         Template arguments:
39         - \p Buffer - buffer type. Default is cds::container::VyukovMPMCCycleQueue
40         - \p Lock - mutex type, default is \p std::mutex
41         - \p Backoff - back-off schema, default is cds::backoff::Default
42     */
43     template <
44         class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
45         ,class Lock = std::mutex
46         ,class Backoff = cds::backoff::Default
47     >
48     class signal_buffered: public details::sh_singleton< signal_buffered_tag >
49     {
50         //@cond
51         typedef details::sh_singleton< signal_buffered_tag > base_class;
52         //@endcond
53     public:
54         typedef signal_buffered_tag rcu_tag ;  ///< RCU tag
55         typedef Buffer  buffer_type ;   ///< Buffer type
56         typedef Lock    lock_type   ;   ///< Lock type
57         typedef Backoff back_off    ;   ///< Back-off type
58
59         typedef base_class::thread_gc thread_gc ;   ///< Thread-side RCU part
60         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
61
62         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
63
64     protected:
65         //@cond
66         typedef details::sh_singleton_instance< rcu_tag >    singleton_ptr;
67         //@endcond
68
69     protected:
70         //@cond
71         buffer_type               m_Buffer;
72         atomics::atomic<uint64_t> m_nCurEpoch;
73         lock_type                 m_Lock;
74         size_t const              m_nCapacity;
75         //@endcond
76
77     public:
78         /// Returns singleton instance
79         static signal_buffered * instance()
80         {
81             return static_cast<signal_buffered *>( base_class::instance() );
82         }
83         /// Checks if the singleton is created and ready to use
84         static bool isUsed()
85         {
86             return singleton_ptr::s_pRCU != nullptr;
87         }
88
89     protected:
90         //@cond
91         signal_buffered( size_t nBufferCapacity, int nSignal = SIGUSR1 )
92             : base_class( nSignal )
93             , m_Buffer( nBufferCapacity )
94             , m_nCurEpoch(0)
95             , m_nCapacity( nBufferCapacity )
96         {}
97
98         ~signal_buffered()
99         {
100             clear_buffer( (uint64_t) -1 );
101         }
102
103         void clear_buffer( uint64_t nEpoch )
104         {
105             epoch_retired_ptr p;
106             while ( m_Buffer.pop( p )) {
107                 if ( p.m_nEpoch <= nEpoch ) {
108                     p.free();
109                 }
110                 else {
111                     push_buffer( std::move(p) );
112                     break;
113                 }
114             }
115         }
116
117         bool push_buffer( epoch_retired_ptr&& ep )
118         {
119             bool bPushed = m_Buffer.push( ep );
120             if ( !bPushed || m_Buffer.size() >= capacity() ) {
121                 synchronize();
122                 if ( !bPushed ) {
123                     ep.free();
124                 }
125                 return true;
126             }
127             return false;
128         }
129         //@endcond
130
131     public:
132         /// Creates singleton object
133         /**
134             The \p nBufferCapacity parameter defines RCU threshold.
135
136             The \p nSignal parameter defines a signal number stated for RCU, default is \p SIGUSR1
137         */
138         static void Construct( size_t nBufferCapacity = 256, int nSignal = SIGUSR1 )
139         {
140             if ( !singleton_ptr::s_pRCU )
141                 singleton_ptr::s_pRCU = new signal_buffered( nBufferCapacity, nSignal );
142         }
143
144         /// Destroys singleton object
145         static void Destruct( bool bDetachAll = false )
146         {
147             if ( isUsed() ) {
148                 instance()->clear_buffer( (uint64_t) -1 );
149                 if ( bDetachAll )
150                     instance()->m_ThreadList.detach_all();
151                 delete instance();
152                 singleton_ptr::s_pRCU = nullptr;
153             }
154         }
155
156     public:
157         /// Retire \p p pointer
158         /**
159             The method pushes \p p pointer to internal buffer.
160             When the buffer becomes full \ref synchronize function is called
161             to wait for the end of grace period and then to free all pointers from the buffer.
162         */
163         virtual void retire_ptr( retired_ptr& p )
164         {
165             if ( p.m_p )
166                 push_buffer( epoch_retired_ptr( p, m_nCurEpoch.load( atomics::memory_order_relaxed )));
167         }
168
169         /// Retires the pointer chain [\p itFirst, \p itLast)
170         template <typename ForwardIterator>
171         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
172         {
173             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
174             while ( itFirst != itLast ) {
175                 push_buffer( epoch_retired_ptr( *itFirst, nEpoch ));
176                 ++itFirst;
177             }
178         }
179
180         /// Retires the pointer chain until \p Func returns \p nullptr retired pointer
181         template <typename Func>
182         void batch_retire( Func e )
183         {
184             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
185             for ( retired_ptr p{ e() }; p.m_p; p = e() )
186                 push_buffer( epoch_retired_ptr( p, nEpoch ));
187         }
188
189         /// Wait to finish a grace period and then clear the buffer
190         void synchronize()
191         {
192             epoch_retired_ptr ep( retired_ptr(), m_nCurEpoch.load( atomics::memory_order_relaxed ));
193             synchronize( ep );
194         }
195
196         //@cond
197         bool synchronize( epoch_retired_ptr& ep )
198         {
199             uint64_t nEpoch;
200             atomics::atomic_thread_fence( atomics::memory_order_acquire );
201             {
202                 std::unique_lock<lock_type> sl( m_Lock );
203                 if ( ep.m_p && m_Buffer.push( ep ) && m_Buffer.size() < capacity())
204                     return false;
205                 nEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_relaxed );
206
207                 back_off bkOff;
208                 base_class::force_membar_all_threads( bkOff );
209                 base_class::switch_next_epoch();
210                 bkOff.reset();
211                 base_class::wait_for_quiescent_state( bkOff );
212                 base_class::switch_next_epoch();
213                 bkOff.reset();
214                 base_class::wait_for_quiescent_state( bkOff );
215                 base_class::force_membar_all_threads( bkOff );
216             }
217
218             clear_buffer( nEpoch );
219             return true;
220         }
221         //@endcond
222
223         /// Returns the threshold of internal buffer
224         size_t capacity() const
225         {
226             return m_nCapacity;
227         }
228
229         /// Returns the signal number stated for RCU
230         int signal_no() const
231         {
232             return base_class::signal_no();
233         }
234     };
235
236 }} // namespace cds::urcu
237
238 #endif // #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
239 #endif // #ifndef CDSLIB_URCU_DETAILS_SIG_BUFFERED_H