c822268df02cd9db3eecbd3df9f08656d32b7b2c
[libcds.git] / cds / urcu / details / gpb.h
1 //$$CDS-header$$
2
3 #ifndef _CDS_URCU_DETAILS_GPB_H
4 #define _CDS_URCU_DETAILS_GPB_H
5
6 #include <mutex>
7 #include <cds/urcu/details/gp.h>
8 #include <cds/algo/backoff_strategy.h>
9 #include <cds/container/vyukov_mpmc_cycle_queue.h>
10
11 namespace cds { namespace urcu {
12
13     /// User-space general-purpose RCU with deferred (buffered) reclamation
14     /**
15         @headerfile cds/urcu/general_buffered.h
16
17         This URCU implementation contains an internal buffer where retired objects are
18         accumulated. When the buffer becomes full, the RCU \p synchronize function is called
19         that waits until all reader/updater threads end up their read-side critical sections,
20         i.e. until the RCU quiescent state will come. After that the buffer and all retired objects are freed.
21         This synchronization cycle may be called in any thread that calls \p retire_ptr function.
22
23         The \p Buffer contains items of \ref cds_urcu_retired_ptr "retired_ptr" type and it should support a queue interface with
24         three function:
25         - <tt> bool push( retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
26             returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
27         - <tt>bool pop( retired_ptr& p ) </tt> - pops queue's head item into \p p parameter; if the queue is empty
28             this function must return \p false
29         - <tt>size_t size()</tt> - returns queue's item count.
30
31         The buffer is considered as full if \p push returns \p false or the buffer size reaches the RCU threshold.
32
33         There is a wrapper \ref cds_urcu_general_buffered_gc "gc<general_buffered>" for \p %general_buffered class
34         that provides unified RCU interface. You should use this wrapper class instead \p %general_buffered
35
36         Template arguments:
37         - \p Buffer - buffer type. Default is cds::container::VyukovMPMCCycleQueue
38         - \p Lock - mutex type, default is \p std::mutex
39         - \p Backoff - back-off schema, default is cds::backoff::Default
40     */
41     template <
42         class Buffer = cds::container::VyukovMPMCCycleQueue<
43             epoch_retired_ptr
44             ,cds::opt::buffer< cds::opt::v::dynamic_buffer< epoch_retired_ptr > >
45         >
46         ,class Lock = std::mutex
47         ,class Backoff = cds::backoff::Default
48     >
49     class general_buffered: public details::gp_singleton< general_buffered_tag >
50     {
51         //@cond
52         typedef details::gp_singleton< general_buffered_tag > base_class;
53         //@endcond
54     public:
55         typedef general_buffered_tag rcu_tag ;  ///< RCU tag
56         typedef Buffer  buffer_type ;   ///< Buffer type
57         typedef Lock    lock_type   ;   ///< Lock type
58         typedef Backoff back_off    ;   ///< Back-off type
59
60         typedef base_class::thread_gc thread_gc ;   ///< Thread-side RCU part
61         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
62
63         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
64
65     protected:
66         //@cond
67         typedef details::gp_singleton_instance< rcu_tag >    singleton_ptr;
68         //@endcond
69
70     protected:
71         //@cond
72         buffer_type                     m_Buffer;
73         atomics::atomic<uint64_t>    m_nCurEpoch;
74         lock_type                       m_Lock;
75         size_t const                    m_nCapacity;
76         //@endcond
77
78     public:
79         /// Returns singleton instance
80         static general_buffered * instance()
81         {
82             return static_cast<general_buffered *>( base_class::instance() );
83         }
84         /// Checks if the singleton is created and ready to use
85         static bool isUsed()
86         {
87             return singleton_ptr::s_pRCU != nullptr;
88         }
89
90     protected:
91         //@cond
92         general_buffered( size_t nBufferCapacity )
93             : m_Buffer( nBufferCapacity )
94             , m_nCurEpoch(0)
95             , m_nCapacity( nBufferCapacity )
96         {}
97
98         ~general_buffered()
99         {
100             clear_buffer( (uint64_t) -1 );
101         }
102
103         void flip_and_wait()
104         {
105             back_off bkoff;
106             base_class::flip_and_wait( bkoff );
107         }
108
109         void clear_buffer( uint64_t nEpoch )
110         {
111             epoch_retired_ptr p;
112             while ( m_Buffer.pop( p )) {
113                 if ( p.m_nEpoch <= nEpoch )
114                     p.free();
115                 else {
116                     push_buffer( p );
117                     break;
118                 }
119             }
120         }
121
122         // Return: true - synchronize has been called, false - otherwise
123         bool push_buffer( epoch_retired_ptr& ep )
124         {
125             bool bPushed = m_Buffer.push( ep );
126             if ( !bPushed || m_Buffer.size() >= capacity() ) {
127                 synchronize();
128                 if ( !bPushed )
129                     ep.free();
130                 return true;
131             }
132             return false;
133         }
134         //@endcond
135
136     public:
137         /// Creates singleton object
138         /**
139             The \p nBufferCapacity parameter defines RCU threshold.
140         */
141         static void Construct( size_t nBufferCapacity = 256 )
142         {
143             if ( !singleton_ptr::s_pRCU )
144                 singleton_ptr::s_pRCU = new general_buffered( nBufferCapacity );
145         }
146
147         /// Destroys singleton object
148         static void Destruct( bool bDetachAll = false )
149         {
150             if ( isUsed() ) {
151                 instance()->clear_buffer( (uint64_t) -1 );
152                 if ( bDetachAll )
153                     instance()->m_ThreadList.detach_all();
154                 delete instance();
155                 singleton_ptr::s_pRCU = nullptr;
156             }
157         }
158
159     public:
160         /// Retire \p p pointer
161         /**
162             The method pushes \p p pointer to internal buffer.
163             When the buffer becomes full \ref synchronize function is called
164             to wait for the end of grace period and then to free all pointers from the buffer.
165         */
166         virtual void retire_ptr( retired_ptr& p )
167         {
168             if ( p.m_p ) {
169                 epoch_retired_ptr ep( p, m_nCurEpoch.load( atomics::memory_order_relaxed ));
170                 push_buffer( ep );
171             }
172         }
173
174         /// Retires the pointer chain [\p itFirst, \p itLast)
175         template <typename ForwardIterator>
176         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
177         {
178             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
179             while ( itFirst != itLast ) {
180                 epoch_retired_ptr ep( *itFirst, nEpoch );
181                 ++itFirst;
182                 push_buffer( ep );
183             }
184         }
185
186         /// Wait to finish a grace period and then clear the buffer
187         void synchronize()
188         {
189             epoch_retired_ptr ep( retired_ptr(), m_nCurEpoch.load( atomics::memory_order_relaxed ));
190             synchronize( ep );
191         }
192
193         //@cond
194         bool synchronize( epoch_retired_ptr& ep )
195         {
196             uint64_t nEpoch;
197             atomics::atomic_thread_fence( atomics::memory_order_acquire );
198             {
199                 cds::lock::scoped_lock<lock_type> sl( m_Lock );
200                 if ( ep.m_p && m_Buffer.push( ep ) )
201                     return false;
202                 nEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_relaxed );
203                 flip_and_wait();
204                 flip_and_wait();
205             }
206             clear_buffer( nEpoch );
207             atomics::atomic_thread_fence( atomics::memory_order_release );
208             return true;
209         }
210         //@endcond
211
212         /// Returns internal buffer capacity
213         size_t capacity() const
214         {
215             return m_nCapacity;
216         }
217     };
218
219 }} // namespace cds::urcu
220
221 #endif // #ifndef _CDS_URCU_DETAILS_GPB_H