Rebuilt threaded uRCU logic
[libcds.git] / cds / urcu / dispose_thread.h
1 //$$CDS-header$$1
2
3 #ifndef CDSLIB_URCU_DISPOSE_THREAD_H
4 #define CDSLIB_URCU_DISPOSE_THREAD_H
5
6 #include <memory>
7 #include <thread>
8 #include <mutex>
9 #include <condition_variable>
10 #include <cds/details/aligned_type.h>
11 #include <cds/algo/atomic.h>
12
13 namespace cds { namespace urcu {
14
15     /// Reclamation thread for \p general_threaded and \p signal_threaded URCU
16     /**
17         The object of this class contains a reclamation thread object and
18         necessary synchronization object(s). The object manages reclamation thread
19         and defines a set of messages (i.e. methods) to communicate with the thread.
20
21         Template argument \p Buffer defines the buffer type of \ref general_threaded (or \ref signal_threaded) URCU.
22     */
23     template <class Buffer>
24     class dispose_thread
25     {
26     public:
27         typedef Buffer  buffer_type ;   ///< Buffer type
28     private:
29         //@cond
30         typedef std::thread             thread_type;
31         typedef std::mutex              mutex_type;
32         typedef std::condition_variable condvar_type;
33         typedef std::unique_lock< mutex_type >  unique_lock;
34
35         class dispose_thread_starter: public thread_type
36         {
37             static void thread_func( dispose_thread * pThis )
38             {
39                 pThis->execute();
40             }
41
42         public:
43             dispose_thread_starter( dispose_thread * pThis )
44                 : thread_type( thread_func, pThis )
45             {}
46         };
47
48         typedef char thread_placeholder[ sizeof(dispose_thread_starter) ];
49         typename cds::details::aligned_type< thread_placeholder, alignof( dispose_thread_starter ) >::type  m_threadPlaceholder;
50         dispose_thread_starter *                m_DisposeThread;
51
52         // synchronization with disposing thread
53         mutex_type      m_Mutex;
54         condvar_type    m_cvDataReady;
55
56         // Task for thread (dispose cycle)
57         atomics::atomic<buffer_type *>  m_pBuffer;
58         uint64_t volatile      m_nCurEpoch;
59
60         // Quit flag
61         atomics::atomic<bool>  m_bQuit;
62
63         // disposing pass sync
64         condvar_type           m_cvReady;
65         atomics::atomic<bool>  m_bReady;
66         //@endcond
67
68     private: // methods called from disposing thread
69         //@cond
70         void execute()
71         {
72             buffer_type *   pBuffer;
73             uint64_t        nCurEpoch;
74             bool            bQuit = false;
75
76             while ( !bQuit ) {
77
78                 // signal that we are ready to dispose
79                 {
80                     unique_lock lock( m_Mutex );
81                     m_bReady.store( true, atomics::memory_order_relaxed );
82                 }
83                 m_cvReady.notify_one();
84
85                 {
86                     // wait new data portion
87                     unique_lock lock( m_Mutex );
88
89                     while ( (pBuffer = m_pBuffer.load( atomics::memory_order_relaxed )) == nullptr )
90                         m_cvDataReady.wait( lock );
91
92                     // New work is ready
93                     m_bReady.store( false, atomics::memory_order_relaxed ); // we are busy
94
95                     bQuit = m_bQuit.load( atomics::memory_order_relaxed );
96                     nCurEpoch = m_nCurEpoch;
97                     m_pBuffer.store( nullptr, atomics::memory_order_relaxed );
98                 }
99
100                 if ( pBuffer )
101                     dispose_buffer( pBuffer, nCurEpoch );
102             }
103         }
104
105         void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
106         {
107             epoch_retired_ptr * p;
108             while ( ( p = pBuf->front()) != nullptr ) {
109                 if ( p->m_nEpoch <= nCurEpoch ) {
110                     p->free();
111                     CDS_VERIFY( pBuf->pop_front() );
112                 }
113                 else
114                     break;
115             }
116         }
117         //@endcond
118
119     public:
120         //@cond
121         dispose_thread()
122             : m_pBuffer( nullptr )
123             , m_nCurEpoch(0)
124             , m_bQuit( false )
125             , m_bReady( false )
126         {}
127         //@endcond
128
129     public: // methods called from any thread
130         /// Start reclamation thread
131         /**
132             This function is called by \ref general_threaded object to start
133             internal reclamation thread.
134         */
135         void start()
136         {
137             m_DisposeThread = new (m_threadPlaceholder) dispose_thread_starter( this );
138         }
139
140         /// Stop reclamation thread
141         /**
142             This function is called by \ref general_threaded object to
143             start reclamation cycle and then to terminate reclamation thread.
144
145             \p buf buffer contains retired objects ready to free.
146         */
147         void stop( buffer_type& buf, uint64_t nCurEpoch )
148         {
149             {
150                 unique_lock lock( m_Mutex );
151
152                 // wait while retiring pass done
153                 while ( !m_bReady.load( atomics::memory_order_relaxed ))
154                     m_cvReady.wait( lock );
155
156                 // give a new work and set stop flag
157                 m_nCurEpoch = nCurEpoch;
158                 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
159                 m_bQuit.store( true, atomics::memory_order_relaxed );
160             }
161             m_cvDataReady.notify_one();
162
163             m_DisposeThread->join();
164         }
165
166         /// Start reclamation cycle
167         /**
168             This function is called by \ref general_threaded object
169             to notify the reclamation thread about a new work.
170             \p buf buffer contains retired objects ready to free.
171             The reclamation thread should free all \p buf objects
172             \p m_nEpoch field of which is no more than \p nCurEpoch.
173
174             If \p bSync parameter is \p true the calling thread should
175             wait until disposing done.
176         */
177         void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
178         {
179             {
180                 unique_lock lock( m_Mutex );
181
182                 // wait while disposing pass done
183                 while ( !m_bReady.load( atomics::memory_order_relaxed ))
184                     m_cvReady.wait( lock );
185
186                 // new work
187                 m_bReady.store( false, atomics::memory_order_relaxed );
188                 m_nCurEpoch = nCurEpoch;
189                 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
190             }
191             m_cvDataReady.notify_one();
192
193             if ( bSync ) {
194                 unique_lock lock( m_Mutex );
195                 while ( !m_bReady.load( atomics::memory_order_relaxed ))
196                     m_cvReady.wait( lock );
197             }
198         }
199     };
200 }} // namespace cds::urcu
201
202 #endif // #ifdef CDSLIB_URCU_DISPOSE_THREAD_H