Fixed assertion in threaded uRCU
[libcds.git] / cds / urcu / dispose_thread.h
1 //$$CDS-header$$1
2
3 #ifndef CDSLIB_URCU_DISPOSE_THREAD_H
4 #define CDSLIB_URCU_DISPOSE_THREAD_H
5
6 #include <memory>
7 #include <thread>
8 #include <mutex>
9 #include <condition_variable>
10 #include <cds/details/aligned_type.h>
11
12 namespace cds { namespace urcu {
13
14     /// Reclamation thread for \p general_threaded and \p signal_threaded URCU
15     /**
16         The object of this class contains a reclamation thread object and
17         necessary synchronization object(s). The object manages reclamation thread
18         and defines a set of messages (i.e. methods) to communicate with the thread.
19
20         Template argument \p Buffer defines the buffer type of \ref general_threaded (or \ref signal_threaded) URCU.
21     */
22     template <class Buffer>
23     class dispose_thread
24     {
25     public:
26         typedef Buffer  buffer_type ;   ///< Buffer type
27     private:
28         //@cond
29         typedef std::thread                 thread_type;
30         typedef std::mutex              mutex_type;
31         typedef std::condition_variable condvar_type;
32         typedef std::unique_lock< mutex_type >  unique_lock;
33
34         class dispose_thread_starter: public thread_type
35         {
36             static void thread_func( dispose_thread * pThis )
37             {
38                 pThis->execute();
39             }
40
41         public:
42             dispose_thread_starter( dispose_thread * pThis )
43                 : thread_type( thread_func, pThis )
44             {}
45         };
46
47         typedef char thread_placeholder[ sizeof(dispose_thread_starter) ];
48         typename cds::details::aligned_type< thread_placeholder, alignof( dispose_thread_starter ) >::type  m_threadPlaceholder;
49         dispose_thread_starter *                m_DisposeThread;
50
51         // synchronization with disposing thread
52         mutex_type      m_Mutex;
53         condvar_type    m_cvDataReady;
54
55         // Task for thread (dispose cycle)
56         buffer_type * volatile  m_pBuffer;
57         uint64_t volatile       m_nCurEpoch;
58
59         // Quit flag
60         bool volatile           m_bQuit;
61
62         // disposing pass sync
63         condvar_type            m_cvReady;
64         bool volatile           m_bReady;
65         //@endcond
66
67     private: // methods called from disposing thread
68         //@cond
69         void execute()
70         {
71             buffer_type *   pBuffer;
72             uint64_t        nCurEpoch;
73             bool            bQuit = false;
74
75             epoch_retired_ptr rest;
76
77             while ( !bQuit ) {
78                 {
79                     unique_lock lock( m_Mutex );
80
81                     // signal that we are ready to dispose
82                     m_bReady = true;
83                     m_cvReady.notify_one();
84
85                     // wait new data portion
86                     while ( !m_pBuffer )
87                         m_cvDataReady.wait( lock );
88
89                     // New work is ready
90                     m_bReady = false ;   // we are busy
91
92                     bQuit = m_bQuit;
93                     nCurEpoch = m_nCurEpoch;
94                     pBuffer = m_pBuffer;
95                     m_pBuffer = nullptr;
96                 }
97
98                 if ( rest.m_p ) {
99                     assert( rest.m_nEpoch <= nCurEpoch );
100                     rest.free();
101                 }
102
103                 if ( pBuffer )
104                     rest = dispose_buffer( pBuffer, nCurEpoch );
105             }
106         }
107
108         epoch_retired_ptr dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
109         {
110             epoch_retired_ptr p;
111             while ( pBuf->pop( p )) {
112                 if ( p.m_nEpoch <= nCurEpoch ) {
113                     p.free();
114                 }
115                 else {
116                     if ( !pBuf->push( p ))
117                         return p;
118                     break;
119                 }
120             }
121             return epoch_retired_ptr();
122         }
123         //@endcond
124
125     public:
126         //@cond
127         dispose_thread()
128             : m_pBuffer( nullptr )
129             , m_nCurEpoch(0)
130             , m_bQuit( false )
131             , m_bReady( false )
132         {}
133         //@endcond
134
135     public: // methods called from any thread
136         /// Start reclamation thread
137         /**
138             This function is called by \ref general_threaded object to start
139             internal reclamation thread.
140         */
141         void start()
142         {
143             m_DisposeThread = new (m_threadPlaceholder) dispose_thread_starter( this );
144         }
145
146         /// Stop reclamation thread
147         /**
148             This function is called by \ref general_threaded object to
149             start reclamation cycle and then to terminate reclamation thread.
150
151             \p buf buffer contains retired objects ready to free.
152         */
153         void stop( buffer_type& buf, uint64_t nCurEpoch )
154         {
155             {
156                 unique_lock lock( m_Mutex );
157
158                 // wait while retiring pass done
159                 while ( !m_bReady )
160                     m_cvReady.wait( lock );
161
162                 // give a new work and set stop flag
163                 m_pBuffer = &buf;
164                 m_nCurEpoch = nCurEpoch;
165                 m_bQuit = true;
166             }
167             m_cvDataReady.notify_one();
168
169             m_DisposeThread->join();
170         }
171
172         /// Start reclamation cycle
173         /**
174             This function is called by \ref general_threaded object
175             to notify the reclamation thread about a new work.
176             \p buf buffer contains retired objects ready to free.
177             The reclamation thread should free all \p buf objects
178             \p m_nEpoch field of which is no more than \p nCurEpoch.
179
180             If \p bSync parameter is \p true the calling thread should
181             wait until disposing done.
182         */
183         void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
184         {
185             unique_lock lock( m_Mutex );
186
187             // wait while disposing pass done
188             while ( !m_bReady )
189                 m_cvReady.wait( lock );
190
191             if ( bSync )
192                 m_bReady = false;
193
194             // new work
195             m_nCurEpoch = nCurEpoch;
196             m_pBuffer = &buf;
197
198             m_cvDataReady.notify_one();
199
200             if ( bSync ) {
201                 while ( !m_bReady )
202                     m_cvReady.wait( lock );
203             }
204         }
205     };
206 }} // namespace cds::urcu
207
208 #endif // #ifdef CDSLIB_URCU_DISPOSE_THREAD_H