Rebuilt threaded uRCU logic
[libcds.git] / cds / urcu / details / gpb.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_URCU_DETAILS_GPB_H
4 #define CDSLIB_URCU_DETAILS_GPB_H
5
6 #include <mutex>
7 #include <limits>
8 #include <cds/urcu/details/gp.h>
9 #include <cds/algo/backoff_strategy.h>
10 #include <cds/container/vyukov_mpmc_cycle_queue.h>
11
12 namespace cds { namespace urcu {
13
14     /// User-space general-purpose RCU with deferred (buffered) reclamation
15     /**
16         @headerfile cds/urcu/general_buffered.h
17
18         This URCU implementation contains an internal buffer where retired objects are
19         accumulated. When the buffer becomes full, the RCU \p synchronize function is called
20         that waits until all reader/updater threads end up their read-side critical sections,
21         i.e. until the RCU quiescent state will come. After that the buffer and all retired objects are freed.
22         This synchronization cycle may be called in any thread that calls \p retire_ptr function.
23
24         The \p Buffer contains items of \ref cds_urcu_retired_ptr "epoch_retired_ptr" type and it should support a queue interface with
25         three function:
26         - <tt> bool push( retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
27             returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
28         - <tt>bool pop( retired_ptr& p ) </tt> - pops queue's head item into \p p parameter; if the queue is empty
29             this function must return \p false
30         - <tt>size_t size()</tt> - returns queue's item count.
31
32         The buffer is considered as full if \p push() returns \p false or the buffer size reaches the RCU threshold.
33
34         There is a wrapper \ref cds_urcu_general_buffered_gc "gc<general_buffered>" for \p %general_buffered class
35         that provides unified RCU interface. You should use this wrapper class instead \p %general_buffered
36
37         Template arguments:
38         - \p Buffer - buffer type. Default is \p cds::container::VyukovMPMCCycleQueue
39         - \p Lock - mutex type, default is \p std::mutex
40         - \p Backoff - back-off schema, default is cds::backoff::Default
41     */
42     template <
43         class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
44         ,class Lock = std::mutex
45         ,class Backoff = cds::backoff::Default
46     >
47     class general_buffered: public details::gp_singleton< general_buffered_tag >
48     {
49         //@cond
50         typedef details::gp_singleton< general_buffered_tag > base_class;
51         //@endcond
52     public:
53         typedef general_buffered_tag rcu_tag ;  ///< RCU tag
54         typedef Buffer  buffer_type ;   ///< Buffer type
55         typedef Lock    lock_type   ;   ///< Lock type
56         typedef Backoff back_off    ;   ///< Back-off type
57
58         typedef base_class::thread_gc thread_gc ;   ///< Thread-side RCU part
59         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
60
61         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
62
63     protected:
64         //@cond
65         typedef details::gp_singleton_instance< rcu_tag >    singleton_ptr;
66         //@endcond
67
68     protected:
69         //@cond
70         buffer_type                m_Buffer;
71         atomics::atomic<uint64_t>  m_nCurEpoch;
72         lock_type                  m_Lock;
73         size_t const               m_nCapacity;
74         //@endcond
75
76     public:
77         /// Returns singleton instance
78         static general_buffered * instance()
79         {
80             return static_cast<general_buffered *>( base_class::instance() );
81         }
82         /// Checks if the singleton is created and ready to use
83         static bool isUsed()
84         {
85             return singleton_ptr::s_pRCU != nullptr;
86         }
87
88     protected:
89         //@cond
90         general_buffered( size_t nBufferCapacity )
91             : m_Buffer( nBufferCapacity )
92             , m_nCurEpoch(0)
93             , m_nCapacity( nBufferCapacity )
94         {}
95
96         ~general_buffered()
97         {
98             clear_buffer( std::numeric_limits< uint64_t >::max());
99         }
100
101         void flip_and_wait()
102         {
103             back_off bkoff;
104             base_class::flip_and_wait( bkoff );
105         }
106
107         void clear_buffer( uint64_t nEpoch )
108         {
109             epoch_retired_ptr p;
110             while ( m_Buffer.pop( p )) {
111                 if ( p.m_nEpoch <= nEpoch ) {
112                     p.free();
113                 }
114                 else {
115                     push_buffer( std::move(p) );
116                     break;
117                 }
118             }
119         }
120
121         // Return: true - synchronize has been called, false - otherwise
122         bool push_buffer( epoch_retired_ptr&& ep )
123         {
124             bool bPushed = m_Buffer.push( ep );
125             if ( !bPushed || m_Buffer.size() >= capacity() ) {
126                 synchronize();
127                 if ( !bPushed ) {
128                     ep.free();
129                 }
130                 return true;
131             }
132             return false;
133         }
134         //@endcond
135
136     public:
137         /// Creates singleton object
138         /**
139             The \p nBufferCapacity parameter defines RCU threshold.
140         */
141         static void Construct( size_t nBufferCapacity = 256 )
142         {
143             if ( !singleton_ptr::s_pRCU )
144                 singleton_ptr::s_pRCU = new general_buffered( nBufferCapacity );
145         }
146
147         /// Destroys singleton object
148         static void Destruct( bool bDetachAll = false )
149         {
150             if ( isUsed() ) {
151                 instance()->clear_buffer( std::numeric_limits< uint64_t >::max());
152                 if ( bDetachAll )
153                     instance()->m_ThreadList.detach_all();
154                 delete instance();
155                 singleton_ptr::s_pRCU = nullptr;
156             }
157         }
158
159     public:
160         /// Retire \p p pointer
161         /**
162             The method pushes \p p pointer to internal buffer.
163             When the buffer becomes full \ref synchronize function is called
164             to wait for the end of grace period and then to free all pointers from the buffer.
165         */
166         virtual void retire_ptr( retired_ptr& p )
167         {
168             if ( p.m_p )
169                 push_buffer( epoch_retired_ptr( p, m_nCurEpoch.load( atomics::memory_order_relaxed )));
170         }
171
172         /// Retires the pointer chain [\p itFirst, \p itLast)
173         template <typename ForwardIterator>
174         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
175         {
176             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
177             while ( itFirst != itLast ) {
178                 epoch_retired_ptr ep( *itFirst, nEpoch );
179                 ++itFirst;
180                 push_buffer( std::move(ep) );
181             }
182         }
183
184         /// Retires the pointer chain until \p Func returns \p nullptr retired pointer
185         template <typename Func>
186         void batch_retire( Func e )
187         {
188             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
189             for ( retired_ptr p{ e() }; p.m_p; ) {
190                 epoch_retired_ptr ep( p, nEpoch );
191                 p = e();
192                 push_buffer( std::move(ep));
193             }
194         }
195
196         /// Wait to finish a grace period and then clear the buffer
197         void synchronize()
198         {
199             epoch_retired_ptr ep( retired_ptr(), m_nCurEpoch.load( atomics::memory_order_relaxed ));
200             synchronize( ep );
201         }
202
203         //@cond
204         bool synchronize( epoch_retired_ptr& ep )
205         {
206             uint64_t nEpoch;
207             atomics::atomic_thread_fence( atomics::memory_order_acquire );
208             {
209                 std::unique_lock<lock_type> sl( m_Lock );
210                 if ( ep.m_p && m_Buffer.push( ep ) )
211                     return false;
212                 nEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_relaxed );
213                 flip_and_wait();
214                 flip_and_wait();
215             }
216             clear_buffer( nEpoch );
217             atomics::atomic_thread_fence( atomics::memory_order_release );
218             return true;
219         }
220         //@endcond
221
222         /// Returns internal buffer capacity
223         size_t capacity() const
224         {
225             return m_nCapacity;
226         }
227     };
228
229 }} // namespace cds::urcu
230
231 #endif // #ifndef CDSLIB_URCU_DETAILS_GPB_H