Fixed memory leaks in threaded URCU (general_threaded, signal_threaded)
[libcds.git] / cds / urcu / dispose_thread.h
1 //$$CDS-header$$1
2
3 #ifndef CDSLIB_URCU_DISPOSE_THREAD_H
4 #define CDSLIB_URCU_DISPOSE_THREAD_H
5
6 #include <memory>
7 #include <thread>
8 #include <mutex>
9 #include <condition_variable>
10 #include <cds/details/aligned_type.h>
11
12 namespace cds { namespace urcu {
13
14     /// Reclamation thread for \p general_threaded and \p signal_threaded URCU
15     /**
16         The object of this class contains a reclamation thread object and
17         necessary synchronization object(s). The object manages reclamation thread
18         and defines a set of messages (i.e. methods) to communicate with the thread.
19
20         Template argument \p Buffer defines the buffer type of \ref general_threaded (or \ref signal_threaded) URCU.
21     */
22     template <class Buffer>
23     class dispose_thread
24     {
25     public:
26         typedef Buffer  buffer_type ;   ///< Buffer type
27     private:
28         //@cond
29         typedef std::thread                 thread_type;
30         typedef std::mutex              mutex_type;
31         typedef std::condition_variable condvar_type;
32         typedef std::unique_lock< mutex_type >  unique_lock;
33
34         class dispose_thread_starter: public thread_type
35         {
36             static void thread_func( dispose_thread * pThis )
37             {
38                 pThis->execute();
39             }
40
41         public:
42             dispose_thread_starter( dispose_thread * pThis )
43                 : thread_type( thread_func, pThis )
44             {}
45         };
46
47         typedef char thread_placeholder[ sizeof(dispose_thread_starter) ];
48         typename cds::details::aligned_type< thread_placeholder, alignof( dispose_thread_starter ) >::type  m_threadPlaceholder;
49         dispose_thread_starter *                m_DisposeThread;
50
51         // synchronization with disposing thread
52         mutex_type      m_Mutex;
53         condvar_type    m_cvDataReady;
54
55         // Task for thread (dispose cycle)
56         buffer_type * volatile  m_pBuffer;
57         uint64_t volatile       m_nCurEpoch;
58
59         // Quit flag
60         bool volatile           m_bQuit;
61
62         // disposing pass sync
63         condvar_type            m_cvReady;
64         bool volatile           m_bReady;
65         //@endcond
66
67     private: // methods called from disposing thread
68         //@cond
69         void execute()
70         {
71             buffer_type *   pBuffer;
72             uint64_t        nCurEpoch;
73             bool            bQuit = false;
74
75             epoch_retired_ptr rest;
76
77             while ( !bQuit ) {
78                 {
79                     unique_lock lock( m_Mutex );
80
81                     // signal that we are ready to dispose
82                     m_bReady = true;
83                     m_cvReady.notify_one();
84
85                     // wait new data portion
86                     while ( !m_pBuffer )
87                         m_cvDataReady.wait( lock );
88
89                     // New work is ready
90                     m_bReady = false ;   // we are busy
91
92                     bQuit = m_bQuit;
93                     nCurEpoch = m_nCurEpoch;
94                     pBuffer = m_pBuffer;
95                     m_pBuffer = nullptr;
96                 }
97
98                 if ( rest.m_p ) {
99                     assert( rest.m_nEpoch < nCurEpoch );
100                     rest.free();
101                 }
102
103                 if ( pBuffer )
104                     rest = dispose_buffer( pBuffer, nCurEpoch );
105             }
106         }
107
108         epoch_retired_ptr dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
109         {
110             epoch_retired_ptr p;
111             while ( pBuf->pop( p )) {
112                 if ( p.m_nEpoch <= nCurEpoch ) {
113                     p.free();
114                 }
115                 else {
116                     if ( !pBuf->push( p ))
117                         return p;
118                     break;
119                 }
120             }
121             return epoch_retired_ptr();
122         }
123         //@endcond
124
125     public:
126         //@cond
127         dispose_thread()
128             : m_pBuffer( nullptr )
129             , m_nCurEpoch(0)
130             , m_bQuit( false )
131             , m_bReady( false )
132         {}
133         //@endcond
134
135     public: // methods called from any thread
136         /// Start reclamation thread
137         /**
138             This function is called by \ref general_threaded object to start
139             internal reclamation thread.
140         */
141         void start()
142         {
143             m_DisposeThread = new (m_threadPlaceholder) dispose_thread_starter( this );
144         }
145
146         /// Stop reclamation thread
147         /**
148             This function is called by \ref general_threaded object to
149             start reclamation cycle and then to terminate reclamation thread.
150
151             \p buf buffer contains retired objects ready to free.
152         */
153         void stop( buffer_type& buf, uint64_t nCurEpoch )
154         {
155             {
156                 unique_lock lock( m_Mutex );
157
158                 // wait while retiring pass done
159                 while ( !m_bReady )
160                     m_cvReady.wait( lock );
161
162                 // give a new work and set stop flag
163                 m_pBuffer = &buf;
164                 m_nCurEpoch = nCurEpoch;
165                 m_bQuit = true;
166             }
167             m_cvDataReady.notify_one();
168
169             m_DisposeThread->join();
170         }
171
172         /// Start reclamation cycle
173         /**
174             This function is called by \ref general_threaded object
175             to notify the reclamation thread about new work.
176             \p buf buffer contains retired objects ready to free.
177             The reclamation thread should free all \p buf objects
178             \p m_nEpoch field of which is no more than \p nCurEpoch.
179
180             If \p bSync parameter is \p true the calling thread should
181             wait until disposing done.
182         */
183         void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
184         {
185             unique_lock lock( m_Mutex );
186
187             // wait while disposing pass done
188             while ( !m_bReady )
189                 m_cvReady.wait( lock );
190
191             if ( bSync )
192                 m_bReady = false;
193
194             // new work
195             m_nCurEpoch = nCurEpoch;
196             m_pBuffer = &buf;
197
198             m_cvDataReady.notify_one();
199
200             if ( bSync ) {
201                 while ( !m_bReady )
202                     m_cvReady.wait( lock );
203             }
204         }
205     };
206 }} // namespace cds::urcu
207
208 #endif // #ifdef CDSLIB_URCU_DISPOSE_THREAD_H