3 #ifndef CDSLIB_URCU_DISPOSE_THREAD_H
4 #define CDSLIB_URCU_DISPOSE_THREAD_H
9 #include <condition_variable>
10 #include <cds/details/aligned_type.h>
11 #include <cds/algo/atomic.h>
13 namespace cds { namespace urcu {
15 /// Reclamation thread for \p general_threaded and \p signal_threaded URCU
17 The object of this class contains a reclamation thread object and
18 necessary synchronization object(s). The object manages reclamation thread
19 and defines a set of messages (i.e. methods) to communicate with the thread.
21 Template argument \p Buffer defines the buffer type of \ref general_threaded (or \ref signal_threaded) URCU.
23 template <class Buffer>
27 typedef Buffer buffer_type ; ///< Buffer type
30 typedef std::thread thread_type;
31 typedef std::mutex mutex_type;
32 typedef std::condition_variable condvar_type;
33 typedef std::unique_lock< mutex_type > unique_lock;
35 class dispose_thread_starter: public thread_type
37 static void thread_func( dispose_thread * pThis )
43 dispose_thread_starter( dispose_thread * pThis )
44 : thread_type( thread_func, pThis )
48 typedef char thread_placeholder[ sizeof(dispose_thread_starter) ];
49 typename cds::details::aligned_type< thread_placeholder, alignof( dispose_thread_starter ) >::type m_threadPlaceholder;
50 dispose_thread_starter * m_DisposeThread;
52 // synchronization with disposing thread
54 condvar_type m_cvDataReady;
56 // Task for thread (dispose cycle)
57 atomics::atomic<buffer_type *> m_pBuffer;
58 uint64_t volatile m_nCurEpoch;
61 atomics::atomic<bool> m_bQuit;
63 // disposing pass sync
64 condvar_type m_cvReady;
65 atomics::atomic<bool> m_bReady;
68 private: // methods called from disposing thread
72 buffer_type * pBuffer;
78 // signal that we are ready to dispose
80 unique_lock lock( m_Mutex );
81 m_bReady.store( true, atomics::memory_order_relaxed );
83 m_cvReady.notify_one();
86 // wait new data portion
87 unique_lock lock( m_Mutex );
89 while ( (pBuffer = m_pBuffer.load( atomics::memory_order_relaxed )) == nullptr )
90 m_cvDataReady.wait( lock );
93 m_bReady.store( false, atomics::memory_order_relaxed ); // we are busy
95 bQuit = m_bQuit.load( atomics::memory_order_relaxed );
96 nCurEpoch = m_nCurEpoch;
97 m_pBuffer.store( nullptr, atomics::memory_order_relaxed );
101 dispose_buffer( pBuffer, nCurEpoch );
105 void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
107 epoch_retired_ptr * p;
108 while ( ( p = pBuf->front()) != nullptr ) {
109 if ( p->m_nEpoch <= nCurEpoch ) {
111 CDS_VERIFY( pBuf->pop_front() );
122 : m_pBuffer( nullptr )
129 public: // methods called from any thread
130 /// Start reclamation thread
132 This function is called by \ref general_threaded object to start
133 internal reclamation thread.
137 m_DisposeThread = new (m_threadPlaceholder) dispose_thread_starter( this );
140 /// Stop reclamation thread
142 This function is called by \ref general_threaded object to
143 start reclamation cycle and then to terminate reclamation thread.
145 \p buf buffer contains retired objects ready to free.
147 void stop( buffer_type& buf, uint64_t nCurEpoch )
150 unique_lock lock( m_Mutex );
152 // wait while retiring pass done
153 while ( !m_bReady.load( atomics::memory_order_relaxed ))
154 m_cvReady.wait( lock );
156 // give a new work and set stop flag
157 m_nCurEpoch = nCurEpoch;
158 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
159 m_bQuit.store( true, atomics::memory_order_relaxed );
161 m_cvDataReady.notify_one();
163 m_DisposeThread->join();
166 /// Start reclamation cycle
168 This function is called by \ref general_threaded object
169 to notify the reclamation thread about a new work.
170 \p buf buffer contains retired objects ready to free.
171 The reclamation thread should free all \p buf objects
172 \p m_nEpoch field of which is no more than \p nCurEpoch.
174 If \p bSync parameter is \p true the calling thread should
175 wait until disposing done.
177 void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
180 unique_lock lock( m_Mutex );
182 // wait while disposing pass done
183 while ( !m_bReady.load( atomics::memory_order_relaxed ))
184 m_cvReady.wait( lock );
187 m_bReady.store( false, atomics::memory_order_relaxed );
188 m_nCurEpoch = nCurEpoch;
189 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
191 m_cvDataReady.notify_one();
194 unique_lock lock( m_Mutex );
195 while ( !m_bReady.load( atomics::memory_order_relaxed ))
196 m_cvReady.wait( lock );
200 }} // namespace cds::urcu
202 #endif // #ifdef CDSLIB_URCU_DISPOSE_THREAD_H