Fixed bug in RCU batch_retire()
[libcds.git] / cds / urcu / details / gpb.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_URCU_DETAILS_GPB_H
4 #define CDSLIB_URCU_DETAILS_GPB_H
5
6 #include <mutex>
7 #include <cds/urcu/details/gp.h>
8 #include <cds/algo/backoff_strategy.h>
9 #include <cds/container/vyukov_mpmc_cycle_queue.h>
10
11 namespace cds { namespace urcu {
12
13     /// User-space general-purpose RCU with deferred (buffered) reclamation
14     /**
15         @headerfile cds/urcu/general_buffered.h
16
17         This URCU implementation contains an internal buffer where retired objects are
18         accumulated. When the buffer becomes full, the RCU \p synchronize function is called
19         that waits until all reader/updater threads end up their read-side critical sections,
20         i.e. until the RCU quiescent state will come. After that the buffer and all retired objects are freed.
21         This synchronization cycle may be called in any thread that calls \p retire_ptr function.
22
23         The \p Buffer contains items of \ref cds_urcu_retired_ptr "retired_ptr" type and it should support a queue interface with
24         three function:
25         - <tt> bool push( retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
26             returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
27         - <tt>bool pop( retired_ptr& p ) </tt> - pops queue's head item into \p p parameter; if the queue is empty
28             this function must return \p false
29         - <tt>size_t size()</tt> - returns queue's item count.
30
31         The buffer is considered as full if \p push returns \p false or the buffer size reaches the RCU threshold.
32
33         There is a wrapper \ref cds_urcu_general_buffered_gc "gc<general_buffered>" for \p %general_buffered class
34         that provides unified RCU interface. You should use this wrapper class instead \p %general_buffered
35
36         Template arguments:
37         - \p Buffer - buffer type. Default is cds::container::VyukovMPMCCycleQueue
38         - \p Lock - mutex type, default is \p std::mutex
39         - \p Backoff - back-off schema, default is cds::backoff::Default
40     */
41     template <
42         class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
43         ,class Lock = std::mutex
44         ,class Backoff = cds::backoff::Default
45     >
46     class general_buffered: public details::gp_singleton< general_buffered_tag >
47     {
48         //@cond
49         typedef details::gp_singleton< general_buffered_tag > base_class;
50         //@endcond
51     public:
52         typedef general_buffered_tag rcu_tag ;  ///< RCU tag
53         typedef Buffer  buffer_type ;   ///< Buffer type
54         typedef Lock    lock_type   ;   ///< Lock type
55         typedef Backoff back_off    ;   ///< Back-off type
56
57         typedef base_class::thread_gc thread_gc ;   ///< Thread-side RCU part
58         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
59
60         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
61
62     protected:
63         //@cond
64         typedef details::gp_singleton_instance< rcu_tag >    singleton_ptr;
65         //@endcond
66
67     protected:
68         //@cond
69         buffer_type                     m_Buffer;
70         atomics::atomic<uint64_t>    m_nCurEpoch;
71         lock_type                       m_Lock;
72         size_t const                    m_nCapacity;
73         //@endcond
74
75     public:
76         /// Returns singleton instance
77         static general_buffered * instance()
78         {
79             return static_cast<general_buffered *>( base_class::instance() );
80         }
81         /// Checks if the singleton is created and ready to use
82         static bool isUsed()
83         {
84             return singleton_ptr::s_pRCU != nullptr;
85         }
86
87     protected:
88         //@cond
89         general_buffered( size_t nBufferCapacity )
90             : m_Buffer( nBufferCapacity )
91             , m_nCurEpoch(0)
92             , m_nCapacity( nBufferCapacity )
93         {}
94
95         ~general_buffered()
96         {
97             clear_buffer( (uint64_t) -1 );
98         }
99
100         void flip_and_wait()
101         {
102             back_off bkoff;
103             base_class::flip_and_wait( bkoff );
104         }
105
106         void clear_buffer( uint64_t nEpoch )
107         {
108             epoch_retired_ptr p;
109             while ( m_Buffer.pop( p )) {
110                 if ( p.m_nEpoch <= nEpoch ) {
111                     p.free();
112                 }
113                 else {
114                     push_buffer( std::move(p) );
115                     break;
116                 }
117             }
118         }
119
120         // Return: true - synchronize has been called, false - otherwise
121         bool push_buffer( epoch_retired_ptr&& ep )
122         {
123             bool bPushed = m_Buffer.push( ep );
124             if ( !bPushed || m_Buffer.size() >= capacity() ) {
125                 synchronize();
126                 if ( !bPushed ) {
127                     ep.free();
128                 }
129                 return true;
130             }
131             return false;
132         }
133         //@endcond
134
135     public:
136         /// Creates singleton object
137         /**
138             The \p nBufferCapacity parameter defines RCU threshold.
139         */
140         static void Construct( size_t nBufferCapacity = 256 )
141         {
142             if ( !singleton_ptr::s_pRCU )
143                 singleton_ptr::s_pRCU = new general_buffered( nBufferCapacity );
144         }
145
146         /// Destroys singleton object
147         static void Destruct( bool bDetachAll = false )
148         {
149             if ( isUsed() ) {
150                 instance()->clear_buffer( (uint64_t) -1 );
151                 if ( bDetachAll )
152                     instance()->m_ThreadList.detach_all();
153                 delete instance();
154                 singleton_ptr::s_pRCU = nullptr;
155             }
156         }
157
158     public:
159         /// Retire \p p pointer
160         /**
161             The method pushes \p p pointer to internal buffer.
162             When the buffer becomes full \ref synchronize function is called
163             to wait for the end of grace period and then to free all pointers from the buffer.
164         */
165         virtual void retire_ptr( retired_ptr& p )
166         {
167             if ( p.m_p )
168                 push_buffer( epoch_retired_ptr( p, m_nCurEpoch.load( atomics::memory_order_relaxed )));
169         }
170
171         /// Retires the pointer chain [\p itFirst, \p itLast)
172         template <typename ForwardIterator>
173         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
174         {
175             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
176             while ( itFirst != itLast ) {
177                 epoch_retired_ptr ep( *itFirst, nEpoch );
178                 ++itFirst;
179                 push_buffer( std::move(ep) );
180             }
181         }
182
183         /// Retires the pointer chain until \p Func returns \p nullptr retired pointer
184         template <typename Func>
185         void batch_retire( Func e )
186         {
187             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
188             for ( retired_ptr p{ e() }; p.m_p; ) {
189                 epoch_retired_ptr ep( p, nEpoch );
190                 p = e();
191                 push_buffer( std::move(ep));
192             }
193         }
194
195         /// Wait to finish a grace period and then clear the buffer
196         void synchronize()
197         {
198             epoch_retired_ptr ep( retired_ptr(), m_nCurEpoch.load( atomics::memory_order_relaxed ));
199             synchronize( ep );
200         }
201
202         //@cond
203         bool synchronize( epoch_retired_ptr& ep )
204         {
205             uint64_t nEpoch;
206             atomics::atomic_thread_fence( atomics::memory_order_acquire );
207             {
208                 std::unique_lock<lock_type> sl( m_Lock );
209                 if ( ep.m_p && m_Buffer.push( ep ) )
210                     return false;
211                 nEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_relaxed );
212                 flip_and_wait();
213                 flip_and_wait();
214             }
215             clear_buffer( nEpoch );
216             atomics::atomic_thread_fence( atomics::memory_order_release );
217             return true;
218         }
219         //@endcond
220
221         /// Returns internal buffer capacity
222         size_t capacity() const
223         {
224             return m_nCapacity;
225         }
226     };
227
228 }} // namespace cds::urcu
229
230 #endif // #ifndef CDSLIB_URCU_DETAILS_GPB_H