Replace cds::lock::scoped_lock with std::unique_lock, remove cds/lock/scoped_lock.h
[libcds.git] / cds / urcu / details / sig_buffered.h
1 //$$CDS-header$$
2
3 #ifndef _CDS_URCU_DETAILS_SIG_BUFFERED_H
4 #define _CDS_URCU_DETAILS_SIG_BUFFERED_H
5
6 #include <cds/urcu/details/sh.h>
7 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
8
9 #include <mutex>
10 #include <cds/algo/backoff_strategy.h>
11 #include <cds/container/vyukov_mpmc_cycle_queue.h>
12
13 namespace cds { namespace urcu {
14
15     /// User-space signal-handled RCU with deferred (buffered) reclamation
16     /**
17         @headerfile cds/urcu/signal_buffered.h
18
19         This URCU implementation contains an internal buffer where retired objects are
20         accumulated. When the buffer becomes full, the RCU \p synchronize function is called
21         that waits until all reader/updater threads end up their read-side critical sections,
22         i.e. until the RCU quiescent state will come. After that the buffer and all retired objects are freed.
23         This synchronization cycle may be called in any thread that calls \p retire_ptr function.
24
25         The \p Buffer contains items of \ref cds_urcu_retired_ptr "retired_ptr" type and it should support a queue interface with
26         three function:
27         - <tt> bool push( retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
28             returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
29         - <tt>bool pop( retired_ptr& p ) </tt> - pops queue's head item into \p p parameter; if the queue is empty
30             this function must return \p false
31         - <tt>size_t size()</tt> - returns queue's item count.
32
33         The buffer is considered as full if \p push returns \p false or the buffer size reaches the RCU threshold.
34
35         There is a wrapper \ref cds_urcu_signal_buffered_gc "gc<signal_buffered>" for \p %signal_buffered class
36         that provides unified RCU interface. You should use this wrapper class instead \p %signal_buffered
37
38         Template arguments:
39         - \p Buffer - buffer type. Default is cds::container::VyukovMPMCCycleQueue
40         - \p Lock - mutex type, default is \p std::mutex
41         - \p Backoff - back-off schema, default is cds::backoff::Default
42     */
43     template <
44         class Buffer = cds::container::VyukovMPMCCycleQueue<
45             epoch_retired_ptr
46             ,cds::opt::buffer< cds::opt::v::dynamic_buffer< epoch_retired_ptr > >
47         >
48         ,class Lock = std::mutex
49         ,class Backoff = cds::backoff::Default
50     >
51     class signal_buffered: public details::sh_singleton< signal_buffered_tag >
52     {
53         //@cond
54         typedef details::sh_singleton< signal_buffered_tag > base_class;
55         //@endcond
56     public:
57         typedef signal_buffered_tag rcu_tag ;  ///< RCU tag
58         typedef Buffer  buffer_type ;   ///< Buffer type
59         typedef Lock    lock_type   ;   ///< Lock type
60         typedef Backoff back_off    ;   ///< Back-off type
61
62         typedef base_class::thread_gc thread_gc ;   ///< Thread-side RCU part
63         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
64
65         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
66
67     protected:
68         //@cond
69         typedef details::sh_singleton_instance< rcu_tag >    singleton_ptr;
70         //@endcond
71
72     protected:
73         //@cond
74         buffer_type                     m_Buffer;
75         atomics::atomic<uint64_t>    m_nCurEpoch;
76         lock_type                       m_Lock;
77         size_t const                    m_nCapacity;
78         //@endcond
79
80     public:
81         /// Returns singleton instance
82         static signal_buffered * instance()
83         {
84             return static_cast<signal_buffered *>( base_class::instance() );
85         }
86         /// Checks if the singleton is created and ready to use
87         static bool isUsed()
88         {
89             return singleton_ptr::s_pRCU != nullptr;
90         }
91
92     protected:
93         //@cond
94         signal_buffered( size_t nBufferCapacity, int nSignal = SIGUSR1 )
95             : base_class( nSignal )
96             , m_Buffer( nBufferCapacity )
97             , m_nCurEpoch(0)
98             , m_nCapacity( nBufferCapacity )
99         {}
100
101         ~signal_buffered()
102         {
103             clear_buffer( (uint64_t) -1 );
104         }
105
106         void clear_buffer( uint64_t nEpoch )
107         {
108             epoch_retired_ptr p;
109             while ( m_Buffer.pop( p )) {
110                 if ( p.m_nEpoch <= nEpoch )
111                     p.free();
112                 else {
113                     push_buffer( p );
114                     break;
115                 }
116             }
117         }
118
119         bool push_buffer( epoch_retired_ptr& ep )
120         {
121             bool bPushed = m_Buffer.push( ep );
122             if ( !bPushed || m_Buffer.size() >= capacity() ) {
123                 synchronize();
124                 if ( !bPushed )
125                     ep.free();
126                 return true;
127             }
128             return false;
129         }
130         //@endcond
131
132     public:
133         /// Creates singleton object
134         /**
135             The \p nBufferCapacity parameter defines RCU threshold.
136
137             The \p nSignal parameter defines a signal number stated for RCU, default is \p SIGUSR1
138         */
139         static void Construct( size_t nBufferCapacity = 256, int nSignal = SIGUSR1 )
140         {
141             if ( !singleton_ptr::s_pRCU )
142                 singleton_ptr::s_pRCU = new signal_buffered( nBufferCapacity, nSignal );
143         }
144
145         /// Destroys singleton object
146         static void Destruct( bool bDetachAll = false )
147         {
148             if ( isUsed() ) {
149                 instance()->clear_buffer( (uint64_t) -1 );
150                 if ( bDetachAll )
151                     instance()->m_ThreadList.detach_all();
152                 delete instance();
153                 singleton_ptr::s_pRCU = nullptr;
154             }
155         }
156
157     public:
158         /// Retire \p p pointer
159         /**
160             The method pushes \p p pointer to internal buffer.
161             When the buffer becomes full \ref synchronize function is called
162             to wait for the end of grace period and then to free all pointers from the buffer.
163         */
164         virtual void retire_ptr( retired_ptr& p )
165         {
166             if ( p.m_p ) {
167                 epoch_retired_ptr ep( p, m_nCurEpoch.load( atomics::memory_order_relaxed ));
168                 push_buffer( ep );
169             }
170         }
171
172         /// Retires the pointer chain [\p itFirst, \p itLast)
173         template <typename ForwardIterator>
174         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
175         {
176             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
177             while ( itFirst != itLast ) {
178                 epoch_retired_ptr ep( *itFirst, nEpoch );
179                 ++itFirst;
180                 push_buffer( ep );
181             }
182         }
183
184         /// Wait to finish a grace period and then clear the buffer
185         void synchronize()
186         {
187             epoch_retired_ptr ep( retired_ptr(), m_nCurEpoch.load( atomics::memory_order_relaxed ));
188             synchronize( ep );
189         }
190
191         //@cond
192         bool synchronize( epoch_retired_ptr& ep )
193         {
194             uint64_t nEpoch;
195             atomics::atomic_thread_fence( atomics::memory_order_acquire );
196             {
197                 std::unique_lock<lock_type> sl( m_Lock );
198                 if ( ep.m_p && m_Buffer.push( ep ) && m_Buffer.size() < capacity())
199                     return false;
200                 nEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_relaxed );
201
202                 back_off bkOff;
203                 base_class::force_membar_all_threads( bkOff );
204                 base_class::switch_next_epoch();
205                 bkOff.reset();
206                 base_class::wait_for_quiescent_state( bkOff );
207                 base_class::switch_next_epoch();
208                 bkOff.reset();
209                 base_class::wait_for_quiescent_state( bkOff );
210                 base_class::force_membar_all_threads( bkOff );
211             }
212
213             clear_buffer( nEpoch );
214             return true;
215         }
216         //@endcond
217
218         /// Returns the threshold of internal buffer
219         size_t capacity() const
220         {
221             return m_nCapacity;
222         }
223
224         /// Returns the signal number stated for RCU
225         int signal_no() const
226         {
227             return base_class::signal_no();
228         }
229     };
230
231 }} // namespace cds::urcu
232
233 #endif // #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
234 #endif // #ifndef _CDS_URCU_DETAILS_SIG_BUFFERED_H