Move libcds 1.6.0 from SVN
[libcds.git] / cds / urcu / details / sig_buffered.h
1 //$$CDS-header$$
2
3 #ifndef _CDS_URCU_DETAILS_SIG_BUFFERED_H
4 #define _CDS_URCU_DETAILS_SIG_BUFFERED_H
5
6 #include <cds/urcu/details/sh.h>
7 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
8
9 #include <cds/backoff_strategy.h>
10 #include <cds/container/vyukov_mpmc_cycle_queue.h>
11
12 #include <cds/details/std/mutex.h>
13
14 namespace cds { namespace urcu {
15
16     /// User-space signal-handled RCU with deferred (buffered) reclamation
17     /**
18         @headerfile cds/urcu/signal_buffered.h
19
20         This URCU implementation contains an internal buffer where retired objects are
21         accumulated. When the buffer becomes full, the RCU \p synchronize function is called
22         that waits until all reader/updater threads end up their read-side critical sections,
23         i.e. until the RCU quiescent state will come. After that the buffer and all retired objects are freed.
24         This synchronization cycle may be called in any thread that calls \p retire_ptr function.
25
26         The \p Buffer contains items of \ref cds_urcu_retired_ptr "retired_ptr" type and it should support a queue interface with
27         three function:
28         - <tt> bool push( retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
29             returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
30         - <tt>bool pop( retired_ptr& p ) </tt> - pops queue's head item into \p p parameter; if the queue is empty
31             this function must return \p false
32         - <tt>size_t size()</tt> - returns queue's item count.
33
34         The buffer is considered as full if \p push returns \p false or the buffer size reaches the RCU threshold.
35
36         There is a wrapper \ref cds_urcu_signal_buffered_gc "gc<signal_buffered>" for \p %signal_buffered class
37         that provides unified RCU interface. You should use this wrapper class instead \p %signal_buffered
38
39         Template arguments:
40         - \p Buffer - buffer type. Default is cds::container::VyukovMPMCCycleQueue
41         - \p Lock - mutex type, default is \p std::mutex
42         - \p Backoff - back-off schema, default is cds::backoff::Default
43     */
44     template <
45         class Buffer = cds::container::VyukovMPMCCycleQueue<
46             epoch_retired_ptr
47             ,cds::opt::buffer< cds::opt::v::dynamic_buffer< epoch_retired_ptr > >
48         >
49         ,class Lock = cds_std::mutex
50         ,class Backoff = cds::backoff::Default
51     >
52     class signal_buffered: public details::sh_singleton< signal_buffered_tag >
53     {
54         //@cond
55         typedef details::sh_singleton< signal_buffered_tag > base_class;
56         //@endcond
57     public:
58         typedef signal_buffered_tag rcu_tag ;  ///< RCU tag
59         typedef Buffer  buffer_type ;   ///< Buffer type
60         typedef Lock    lock_type   ;   ///< Lock type
61         typedef Backoff back_off    ;   ///< Back-off type
62
63         typedef base_class::thread_gc thread_gc ;   ///< Thread-side RCU part
64         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
65
66         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
67
68     protected:
69         //@cond
70         typedef details::sh_singleton_instance< rcu_tag >    singleton_ptr;
71         //@endcond
72
73     protected:
74         //@cond
75         buffer_type                     m_Buffer;
76         CDS_ATOMIC::atomic<uint64_t>    m_nCurEpoch;
77         lock_type                       m_Lock;
78         size_t const                    m_nCapacity;
79         //@endcond
80
81     public:
82         /// Returns singleton instance
83         static signal_buffered * instance()
84         {
85             return static_cast<signal_buffered *>( base_class::instance() );
86         }
87         /// Checks if the singleton is created and ready to use
88         static bool isUsed()
89         {
90             return singleton_ptr::s_pRCU != null_ptr<singleton_vtbl *>();
91         }
92
93     protected:
94         //@cond
95         signal_buffered( size_t nBufferCapacity, int nSignal = SIGUSR1 )
96             : base_class( nSignal )
97             , m_Buffer( nBufferCapacity )
98             , m_nCurEpoch(0)
99             , m_nCapacity( nBufferCapacity )
100         {}
101
102         ~signal_buffered()
103         {
104             clear_buffer( (uint64_t) -1 );
105         }
106
107         void clear_buffer( uint64_t nEpoch )
108         {
109             epoch_retired_ptr p;
110             while ( m_Buffer.pop( p )) {
111                 if ( p.m_nEpoch <= nEpoch )
112                     p.free();
113                 else {
114                     push_buffer( p );
115                     break;
116                 }
117             }
118         }
119
120         bool push_buffer( epoch_retired_ptr& ep )
121         {
122             bool bPushed = m_Buffer.push( ep );
123             if ( !bPushed || m_Buffer.size() >= capacity() ) {
124                 synchronize();
125                 if ( !bPushed )
126                     ep.free();
127                 return true;
128             }
129             return false;
130         }
131         //@endcond
132
133     public:
134         /// Creates singleton object
135         /**
136             The \p nBufferCapacity parameter defines RCU threshold.
137
138             The \p nSignal parameter defines a signal number stated for RCU, default is \p SIGUSR1
139         */
140         static void Construct( size_t nBufferCapacity = 256, int nSignal = SIGUSR1 )
141         {
142             if ( !singleton_ptr::s_pRCU )
143                 singleton_ptr::s_pRCU = new signal_buffered( nBufferCapacity, nSignal );
144         }
145
146         /// Destroys singleton object
147         static void Destruct( bool bDetachAll = false )
148         {
149             if ( isUsed() ) {
150                 instance()->clear_buffer( (uint64_t) -1 );
151                 if ( bDetachAll )
152                     instance()->m_ThreadList.detach_all();
153                 delete instance();
154                 singleton_ptr::s_pRCU = null_ptr<singleton_vtbl *>();
155             }
156         }
157
158     public:
159         /// Retire \p p pointer
160         /**
161             The method pushes \p p pointer to internal buffer.
162             When the buffer becomes full \ref synchronize function is called
163             to wait for the end of grace period and then to free all pointers from the buffer.
164         */
165         virtual void retire_ptr( retired_ptr& p )
166         {
167             if ( p.m_p ) {
168                 epoch_retired_ptr ep( p, m_nCurEpoch.load( CDS_ATOMIC::memory_order_relaxed ));
169                 push_buffer( ep );
170             }
171         }
172
173         /// Retires the pointer chain [\p itFirst, \p itLast)
174         template <typename ForwardIterator>
175         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
176         {
177             uint64_t nEpoch = m_nCurEpoch.load( CDS_ATOMIC::memory_order_relaxed );
178             while ( itFirst != itLast ) {
179                 epoch_retired_ptr ep( *itFirst, nEpoch );
180                 ++itFirst;
181                 push_buffer( ep );
182             }
183         }
184
185         /// Wait to finish a grace period and then clear the buffer
186         void synchronize()
187         {
188             epoch_retired_ptr ep( retired_ptr(), m_nCurEpoch.load( CDS_ATOMIC::memory_order_relaxed ));
189             synchronize( ep );
190         }
191
192         //@cond
193         bool synchronize( epoch_retired_ptr& ep )
194         {
195             uint64_t nEpoch;
196             CDS_ATOMIC::atomic_thread_fence( CDS_ATOMIC::memory_order_acquire );
197             {
198                 cds::lock::scoped_lock<lock_type> sl( m_Lock );
199                 if ( ep.m_p && m_Buffer.push( ep ) && m_Buffer.size() < capacity())
200                     return false;
201                 nEpoch = m_nCurEpoch.fetch_add( 1, CDS_ATOMIC::memory_order_relaxed );
202
203                 back_off bkOff;
204                 base_class::force_membar_all_threads( bkOff );
205                 base_class::switch_next_epoch();
206                 bkOff.reset();
207                 base_class::wait_for_quiescent_state( bkOff );
208                 base_class::switch_next_epoch();
209                 bkOff.reset();
210                 base_class::wait_for_quiescent_state( bkOff );
211                 base_class::force_membar_all_threads( bkOff );
212             }
213
214             clear_buffer( nEpoch );
215             return true;
216         }
217         //@endcond
218
219         /// Returns the threshold of internal buffer
220         size_t capacity() const
221         {
222             return m_nCapacity;
223         }
224
225         /// Returns the signal number stated for RCU
226         int signal_no() const
227         {
228             return base_class::signal_no();
229         }
230     };
231
232 }} // namespace cds::urcu
233
234 #endif // #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
235 #endif // #ifndef _CDS_URCU_DETAILS_SIG_BUFFERED_H