Fixed bug in RCU batch_retire()
[libcds.git] / cds / urcu / details / gpt.h
1 //$$CDS-header$$1
2
3 #ifndef CDSLIB_URCU_DETAILS_GPT_H
4 #define CDSLIB_URCU_DETAILS_GPT_H
5
6 #include <mutex>    //unique_lock
7 #include <cds/urcu/details/gp.h>
8 #include <cds/urcu/dispose_thread.h>
9 #include <cds/algo/backoff_strategy.h>
10 #include <cds/container/vyukov_mpmc_cycle_queue.h>
11
12 namespace cds { namespace urcu {
13
14     /// User-space general-purpose RCU with deferred threaded reclamation
15     /**
16         @headerfile cds/urcu/general_threaded.h
17
18         This implementation is similar to \ref general_buffered but separate thread is created
19         for deleting the retired objects. Like \p %general_buffered, the class contains an internal buffer
20         where retired objects are accumulated. When the buffer becomes full,
21         the RCU \p synchronize function is called that waits until all reader/updater threads end up their read-side critical sections,
22         i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation tread.
23         The reclamation thread frees the buffer.
24         This synchronization cycle may be called in any thread that calls \ref retire_ptr function.
25
26         There is a wrapper \ref cds_urcu_general_threaded_gc "gc<general_threaded>" for \p %general_threaded class
27         that provides unified RCU interface. You should use this wrapper class instead \p %general_threaded
28
29         Template arguments:
30         - \p Buffer - buffer type with FIFO semantics. Default is cds::container::VyukovMPMCCycleQueue. See \ref general_buffered
31             for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr
32             type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
33             has been placed into the buffer. The \p %general_threaded object has a global epoch counter
34             that is incremented on each \p synchronize call. The epoch is used internally to prevent early deletion.
35         - \p Lock - mutex type, default is \p std::mutex
36         - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
37             see the description of this class for required interface.
38         - \p Backoff - back-off schema, default is cds::backoff::Default
39     */
40     template <
41         class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
42         ,class Lock = std::mutex
43         ,class DisposerThread = dispose_thread<Buffer>
44         ,class Backoff = cds::backoff::Default
45     >
46     class general_threaded: public details::gp_singleton< general_threaded_tag >
47     {
48         //@cond
49         typedef details::gp_singleton< general_threaded_tag > base_class;
50         //@endcond
51     public:
52         typedef Buffer          buffer_type ;   ///< Buffer type
53         typedef Lock            lock_type   ;   ///< Lock type
54         typedef Backoff         back_off    ;   ///< Back-off scheme
55         typedef DisposerThread  disposer_thread ;   ///< Disposer thread type
56
57         typedef general_threaded_tag    rcu_tag ;       ///< Thread-side RCU part
58         typedef base_class::thread_gc   thread_gc ;     ///< Access lock class
59         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
60
61         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
62
63     protected:
64         //@cond
65         typedef details::gp_singleton_instance< rcu_tag >    singleton_ptr;
66
67         struct scoped_disposer {
68             void operator ()( general_threaded * p )
69             {
70                 delete p;
71             }
72         };
73         //@endcond
74
75     protected:
76         //@cond
77         buffer_type                     m_Buffer;
78         atomics::atomic<uint64_t>    m_nCurEpoch;
79         lock_type                       m_Lock;
80         size_t const                    m_nCapacity;
81         disposer_thread                 m_DisposerThread;
82         //@endcond
83
84     public:
85         /// Returns singleton instance
86         static general_threaded * instance()
87         {
88             return static_cast<general_threaded *>( base_class::instance() );
89         }
90         /// Checks if the singleton is created and ready to use
91         static bool isUsed()
92         {
93             return singleton_ptr::s_pRCU != nullptr;
94         }
95
96     protected:
97         //@cond
98         general_threaded( size_t nBufferCapacity )
99             : m_Buffer( nBufferCapacity )
100             , m_nCurEpoch( 1 )
101             , m_nCapacity( nBufferCapacity )
102         {}
103
104         void flip_and_wait()
105         {
106             back_off bkoff;
107             base_class::flip_and_wait( bkoff );
108         }
109
110         // Return: true - synchronize has been called, false - otherwise
111         bool push_buffer( epoch_retired_ptr&& p )
112         {
113             bool bPushed = m_Buffer.push( p );
114             if ( !bPushed || m_Buffer.size() >= capacity() ) {
115                 synchronize();
116                 if ( !bPushed ) {
117                     p.free();
118                 }
119                 return true;
120             }
121             return false;
122         }
123
124         //@endcond
125
126     public:
127         //@cond
128         ~general_threaded()
129         {}
130         //@endcond
131
132         /// Creates singleton object and starts reclamation thread
133         /**
134             The \p nBufferCapacity parameter defines RCU threshold.
135         */
136         static void Construct( size_t nBufferCapacity = 256 )
137         {
138             if ( !singleton_ptr::s_pRCU ) {
139                 std::unique_ptr< general_threaded, scoped_disposer > pRCU( new general_threaded( nBufferCapacity ) );
140                 pRCU->m_DisposerThread.start();
141
142                 singleton_ptr::s_pRCU = pRCU.release();
143             }
144         }
145
146         /// Destroys singleton object and terminates internal reclamation thread
147         static void Destruct( bool bDetachAll = false )
148         {
149             if ( isUsed() ) {
150                 general_threaded * pThis = instance();
151                 if ( bDetachAll )
152                     pThis->m_ThreadList.detach_all();
153
154                 pThis->m_DisposerThread.stop( pThis->m_Buffer, pThis->m_nCurEpoch.load( atomics::memory_order_acquire ));
155
156                 delete pThis;
157                 singleton_ptr::s_pRCU = nullptr;
158             }
159         }
160
161     public:
162         /// Retires \p p pointer
163         /**
164             The method pushes \p p pointer to internal buffer.
165             When the buffer becomes full \ref synchronize function is called
166             to wait for the end of grace period and then
167             a message is sent to the reclamation thread.
168         */
169         virtual void retire_ptr( retired_ptr& p )
170         {
171             if ( p.m_p )
172                 push_buffer( epoch_retired_ptr( p, m_nCurEpoch.load( atomics::memory_order_acquire )));
173         }
174
175         /// Retires the pointer chain [\p itFirst, \p itLast)
176         template <typename ForwardIterator>
177         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
178         {
179             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
180             while ( itFirst != itLast ) {
181                 epoch_retired_ptr ep( *itFirst, nEpoch );
182                 ++itFirst;
183                 push_buffer( std::move(ep));
184             }
185         }
186
187         /// Retires the pointer chain until \p Func returns \p nullptr retired pointer
188         template <typename Func>
189         void batch_retire( Func e )
190         {
191             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
192             for ( retired_ptr p{ e() }; p.m_p; ) {
193                 epoch_retired_ptr ep( p, nEpoch );
194                 p = e();
195                 push_buffer( std::move(ep));
196             }
197         }
198
199
200         /// Waits to finish a grace period and calls disposing thread
201         void synchronize()
202         {
203             synchronize( false );
204         }
205
206         //@cond
207         void synchronize( bool bSync )
208         {
209             uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
210
211             atomics::atomic_thread_fence( atomics::memory_order_acquire );
212             {
213                 std::unique_lock<lock_type> sl( m_Lock );
214                 flip_and_wait();
215                 flip_and_wait();
216
217                 m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
218             }
219             atomics::atomic_thread_fence( atomics::memory_order_release );
220         }
221         void force_dispose()
222         {
223             synchronize( true );
224         }
225         //@endcond
226
227         /// Returns the threshold of internal buffer
228         size_t capacity() const
229         {
230             return m_nCapacity;
231         }
232     };
233 }} // namespace cds::urcu
234
235 #endif // #ifndef CDSLIB_URCU_DETAILS_GPT_H