Merge branch 'integration' into dev
[libcds.git] / cds / urcu / details / gpb.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_URCU_DETAILS_GPB_H
4 #define CDSLIB_URCU_DETAILS_GPB_H
5
6 #include <mutex>
7 #include <cds/urcu/details/gp.h>
8 #include <cds/algo/backoff_strategy.h>
9 #include <cds/container/vyukov_mpmc_cycle_queue.h>
10
11 namespace cds { namespace urcu {
12
13     /// User-space general-purpose RCU with deferred (buffered) reclamation
14     /**
15         @headerfile cds/urcu/general_buffered.h
16
17         This URCU implementation contains an internal buffer where retired objects are
18         accumulated. When the buffer becomes full, the RCU \p synchronize function is called
19         that waits until all reader/updater threads end up their read-side critical sections,
20         i.e. until the RCU quiescent state will come. After that the buffer and all retired objects are freed.
21         This synchronization cycle may be called in any thread that calls \p retire_ptr function.
22
23         The \p Buffer contains items of \ref cds_urcu_retired_ptr "retired_ptr" type and it should support a queue interface with
24         three function:
25         - <tt> bool push( retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
26             returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
27         - <tt>bool pop( retired_ptr& p ) </tt> - pops queue's head item into \p p parameter; if the queue is empty
28             this function must return \p false
29         - <tt>size_t size()</tt> - returns queue's item count.
30
31         The buffer is considered as full if \p push returns \p false or the buffer size reaches the RCU threshold.
32
33         There is a wrapper \ref cds_urcu_general_buffered_gc "gc<general_buffered>" for \p %general_buffered class
34         that provides unified RCU interface. You should use this wrapper class instead \p %general_buffered
35
36         Template arguments:
37         - \p Buffer - buffer type. Default is cds::container::VyukovMPMCCycleQueue
38         - \p Lock - mutex type, default is \p std::mutex
39         - \p Backoff - back-off schema, default is cds::backoff::Default
40     */
41     template <
42         class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
43         ,class Lock = std::mutex
44         ,class Backoff = cds::backoff::Default
45     >
46     class general_buffered: public details::gp_singleton< general_buffered_tag >
47     {
48         //@cond
49         typedef details::gp_singleton< general_buffered_tag > base_class;
50         //@endcond
51     public:
52         typedef general_buffered_tag rcu_tag ;  ///< RCU tag
53         typedef Buffer  buffer_type ;   ///< Buffer type
54         typedef Lock    lock_type   ;   ///< Lock type
55         typedef Backoff back_off    ;   ///< Back-off type
56
57         typedef base_class::thread_gc thread_gc ;   ///< Thread-side RCU part
58         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
59
60         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
61
62     protected:
63         //@cond
64         typedef details::gp_singleton_instance< rcu_tag >    singleton_ptr;
65         //@endcond
66
67     protected:
68         //@cond
69         buffer_type                     m_Buffer;
70         atomics::atomic<uint64_t>    m_nCurEpoch;
71         lock_type                       m_Lock;
72         size_t const                    m_nCapacity;
73         //@endcond
74
75     public:
76         /// Returns singleton instance
77         static general_buffered * instance()
78         {
79             return static_cast<general_buffered *>( base_class::instance() );
80         }
81         /// Checks if the singleton is created and ready to use
82         static bool isUsed()
83         {
84             return singleton_ptr::s_pRCU != nullptr;
85         }
86
87     protected:
88         //@cond
89         general_buffered( size_t nBufferCapacity )
90             : m_Buffer( nBufferCapacity )
91             , m_nCurEpoch(0)
92             , m_nCapacity( nBufferCapacity )
93         {}
94
95         ~general_buffered()
96         {
97             clear_buffer( (uint64_t) -1 );
98         }
99
100         void flip_and_wait()
101         {
102             back_off bkoff;
103             base_class::flip_and_wait( bkoff );
104         }
105
106         void clear_buffer( uint64_t nEpoch )
107         {
108             epoch_retired_ptr p;
109             while ( m_Buffer.pop( p )) {
110                 if ( p.m_nEpoch <= nEpoch ) {
111                     CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
112                     p.free();
113                     CDS_TSAN_ANNOTATE_IGNORE_RW_END;
114                 }
115                 else {
116                     push_buffer( p );
117                     break;
118                 }
119             }
120         }
121
122         // Return: true - synchronize has been called, false - otherwise
123         bool push_buffer( epoch_retired_ptr& ep )
124         {
125             bool bPushed = m_Buffer.push( ep );
126             if ( !bPushed || m_Buffer.size() >= capacity() ) {
127                 synchronize();
128                 if ( !bPushed ) {
129                     CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
130                     ep.free();
131                     CDS_TSAN_ANNOTATE_IGNORE_RW_END;
132                 }
133                 return true;
134             }
135             return false;
136         }
137         //@endcond
138
139     public:
140         /// Creates singleton object
141         /**
142             The \p nBufferCapacity parameter defines RCU threshold.
143         */
144         static void Construct( size_t nBufferCapacity = 256 )
145         {
146             if ( !singleton_ptr::s_pRCU )
147                 singleton_ptr::s_pRCU = new general_buffered( nBufferCapacity );
148         }
149
150         /// Destroys singleton object
151         static void Destruct( bool bDetachAll = false )
152         {
153             if ( isUsed() ) {
154                 instance()->clear_buffer( (uint64_t) -1 );
155                 if ( bDetachAll )
156                     instance()->m_ThreadList.detach_all();
157                 delete instance();
158                 singleton_ptr::s_pRCU = nullptr;
159             }
160         }
161
162     public:
163         /// Retire \p p pointer
164         /**
165             The method pushes \p p pointer to internal buffer.
166             When the buffer becomes full \ref synchronize function is called
167             to wait for the end of grace period and then to free all pointers from the buffer.
168         */
169         virtual void retire_ptr( retired_ptr& p )
170         {
171             if ( p.m_p ) {
172                 epoch_retired_ptr ep( p, m_nCurEpoch.load( atomics::memory_order_relaxed ));
173                 push_buffer( ep );
174             }
175         }
176
177         /// Retires the pointer chain [\p itFirst, \p itLast)
178         template <typename ForwardIterator>
179         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
180         {
181             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
182             while ( itFirst != itLast ) {
183                 epoch_retired_ptr ep( *itFirst, nEpoch );
184                 ++itFirst;
185                 push_buffer( ep );
186             }
187         }
188
189         /// Wait to finish a grace period and then clear the buffer
190         void synchronize()
191         {
192             epoch_retired_ptr ep( retired_ptr(), m_nCurEpoch.load( atomics::memory_order_relaxed ));
193             synchronize( ep );
194         }
195
196         //@cond
197         bool synchronize( epoch_retired_ptr& ep )
198         {
199             uint64_t nEpoch;
200             atomics::atomic_thread_fence( atomics::memory_order_acquire );
201             {
202                 std::unique_lock<lock_type> sl( m_Lock );
203                 if ( ep.m_p && m_Buffer.push( ep ) )
204                     return false;
205                 nEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_relaxed );
206                 flip_and_wait();
207                 flip_and_wait();
208             }
209             clear_buffer( nEpoch );
210             atomics::atomic_thread_fence( atomics::memory_order_release );
211             return true;
212         }
213         //@endcond
214
215         /// Returns internal buffer capacity
216         size_t capacity() const
217         {
218             return m_nCapacity;
219         }
220     };
221
222 }} // namespace cds::urcu
223
224 #endif // #ifndef CDSLIB_URCU_DETAILS_GPB_H