0f73ba3913f7a8e481535f597d5b656994915736
[libcds.git] / cds / urcu / details / sig_buffered.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #ifndef CDSLIB_URCU_DETAILS_SIG_BUFFERED_H
32 #define CDSLIB_URCU_DETAILS_SIG_BUFFERED_H
33
34 #include <cds/urcu/details/sh.h>
35 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
36
37 #include <mutex>
38 #include <limits>
39 #include <cds/algo/backoff_strategy.h>
40 #include <cds/container/vyukov_mpmc_cycle_queue.h>
41
42 namespace cds { namespace urcu {
43
44     /// User-space signal-handled RCU with deferred (buffered) reclamation
45     /**
46         @headerfile cds/urcu/signal_buffered.h
47
48         This URCU implementation contains an internal buffer where retired objects are
49         accumulated. When the buffer becomes full, the RCU \p synchronize function is called
50         that waits until all reader/updater threads end up their read-side critical sections,
51         i.e. until the RCU quiescent state will come. After that the buffer and all retired objects are freed.
52         This synchronization cycle may be called in any thread that calls \p retire_ptr function.
53
54         The \p Buffer contains items of \ref cds_urcu_retired_ptr "retired_ptr" type and it should support a queue interface with
55         three function:
56         - <tt> bool push( retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
57             returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
58         - <tt>bool pop( retired_ptr& p ) </tt> - pops queue's head item into \p p parameter; if the queue is empty
59             this function must return \p false
60         - <tt>size_t size()</tt> - returns queue's item count.
61
62         The buffer is considered as full if \p push returns \p false or the buffer size reaches the RCU threshold.
63
64         There is a wrapper \ref cds_urcu_signal_buffered_gc "gc<signal_buffered>" for \p %signal_buffered class
65         that provides unified RCU interface. You should use this wrapper class instead \p %signal_buffered
66
67         Template arguments:
68         - \p Buffer - buffer type. Default is cds::container::VyukovMPMCCycleQueue
69         - \p Lock - mutex type, default is \p std::mutex
70         - \p Backoff - back-off schema, default is cds::backoff::Default
71     */
72     template <
73         class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
74         ,class Lock = std::mutex
75         ,class Backoff = cds::backoff::Default
76     >
77     class signal_buffered: public details::sh_singleton< signal_buffered_tag >
78     {
79         //@cond
80         typedef details::sh_singleton< signal_buffered_tag > base_class;
81         //@endcond
82     public:
83         typedef signal_buffered_tag rcu_tag ;  ///< RCU tag
84         typedef Buffer  buffer_type ;   ///< Buffer type
85         typedef Lock    lock_type   ;   ///< Lock type
86         typedef Backoff back_off    ;   ///< Back-off type
87
88         typedef base_class::thread_gc thread_gc ;   ///< Thread-side RCU part
89         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
90
91         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
92
93     protected:
94         //@cond
95         typedef details::sh_singleton_instance< rcu_tag >    singleton_ptr;
96         //@endcond
97
98     protected:
99         //@cond
100         buffer_type               m_Buffer;
101         atomics::atomic<uint64_t> m_nCurEpoch;
102         lock_type                 m_Lock;
103         size_t const              m_nCapacity;
104         //@endcond
105
106     public:
107         /// Returns singleton instance
108         static signal_buffered * instance()
109         {
110             return static_cast<signal_buffered *>( base_class::instance() );
111         }
112         /// Checks if the singleton is created and ready to use
113         static bool isUsed()
114         {
115             return singleton_ptr::s_pRCU != nullptr;
116         }
117
118     protected:
119         //@cond
120         signal_buffered( size_t nBufferCapacity, int nSignal = SIGUSR1 )
121             : base_class( nSignal )
122             , m_Buffer( nBufferCapacity )
123             , m_nCurEpoch(0)
124             , m_nCapacity( nBufferCapacity )
125         {}
126
127         ~signal_buffered()
128         {
129             clear_buffer( std::numeric_limits< uint64_t >::max() );
130         }
131
132         void clear_buffer( uint64_t nEpoch )
133         {
134             epoch_retired_ptr p;
135             while ( m_Buffer.pop( p )) {
136                 if ( p.m_nEpoch <= nEpoch ) {
137                     p.free();
138                 }
139                 else {
140                     push_buffer( std::move(p) );
141                     break;
142                 }
143             }
144         }
145
146         bool push_buffer( epoch_retired_ptr&& ep )
147         {
148             bool bPushed = m_Buffer.push( ep );
149             if ( !bPushed || m_Buffer.size() >= capacity() ) {
150                 synchronize();
151                 if ( !bPushed ) {
152                     ep.free();
153                 }
154                 return true;
155             }
156             return false;
157         }
158         //@endcond
159
160     public:
161         /// Creates singleton object
162         /**
163             The \p nBufferCapacity parameter defines RCU threshold.
164
165             The \p nSignal parameter defines a signal number stated for RCU, default is \p SIGUSR1
166         */
167         static void Construct( size_t nBufferCapacity = 256, int nSignal = SIGUSR1 )
168         {
169             if ( !singleton_ptr::s_pRCU )
170                 singleton_ptr::s_pRCU = new signal_buffered( nBufferCapacity, nSignal );
171         }
172
173         /// Destroys singleton object
174         static void Destruct( bool bDetachAll = false )
175         {
176             if ( isUsed() ) {
177                 instance()->clear_buffer( std::numeric_limits< uint64_t >::max());
178                 if ( bDetachAll )
179                     instance()->m_ThreadList.detach_all();
180                 delete instance();
181                 singleton_ptr::s_pRCU = nullptr;
182             }
183         }
184
185     public:
186         /// Retire \p p pointer
187         /**
188             The method pushes \p p pointer to internal buffer.
189             When the buffer becomes full \ref synchronize function is called
190             to wait for the end of grace period and then to free all pointers from the buffer.
191         */
192         virtual void retire_ptr( retired_ptr& p )
193         {
194             if ( p.m_p )
195                 push_buffer( epoch_retired_ptr( p, m_nCurEpoch.load( atomics::memory_order_relaxed )));
196         }
197
198         /// Retires the pointer chain [\p itFirst, \p itLast)
199         template <typename ForwardIterator>
200         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
201         {
202             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
203             while ( itFirst != itLast ) {
204                 epoch_retired_ptr ep( *itFirst, nEpoch );
205                 ++itFirst;
206                 push_buffer( std::move(ep));
207             }
208         }
209
210         /// Retires the pointer chain until \p Func returns \p nullptr retired pointer
211         template <typename Func>
212         void batch_retire( Func e )
213         {
214             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
215             for ( retired_ptr p{ e() }; p.m_p; ) {
216                 epoch_retired_ptr ep( p, nEpoch );
217                 p = e();
218                 push_buffer( std::move(ep));
219             }
220         }
221
222         /// Wait to finish a grace period and then clear the buffer
223         void synchronize()
224         {
225             epoch_retired_ptr ep( retired_ptr(), m_nCurEpoch.load( atomics::memory_order_relaxed ));
226             synchronize( ep );
227         }
228
229         //@cond
230         bool synchronize( epoch_retired_ptr& ep )
231         {
232             uint64_t nEpoch;
233             atomics::atomic_thread_fence( atomics::memory_order_acquire );
234             {
235                 std::unique_lock<lock_type> sl( m_Lock );
236                 if ( ep.m_p && m_Buffer.push( ep ) && m_Buffer.size() < capacity())
237                     return false;
238                 nEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_relaxed );
239
240                 back_off bkOff;
241                 base_class::force_membar_all_threads( bkOff );
242                 base_class::switch_next_epoch();
243                 bkOff.reset();
244                 base_class::wait_for_quiescent_state( bkOff );
245                 base_class::switch_next_epoch();
246                 bkOff.reset();
247                 base_class::wait_for_quiescent_state( bkOff );
248                 base_class::force_membar_all_threads( bkOff );
249             }
250
251             clear_buffer( nEpoch );
252             return true;
253         }
254         //@endcond
255
256         /// Returns the threshold of internal buffer
257         size_t capacity() const
258         {
259             return m_nCapacity;
260         }
261
262         /// Returns the signal number stated for RCU
263         int signal_no() const
264         {
265             return base_class::signal_no();
266         }
267     };
268
269 }} // namespace cds::urcu
270
271 #endif // #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
272 #endif // #ifndef CDSLIB_URCU_DETAILS_SIG_BUFFERED_H