Removed TSan annotations, tuned memory ordering
[libcds.git] / cds / urcu / dispose_thread.h
1 //$$CDS-header$$1
2
3 #ifndef CDSLIB_URCU_DISPOSE_THREAD_H
4 #define CDSLIB_URCU_DISPOSE_THREAD_H
5
6 #include <memory>
7 #include <thread>
8 #include <mutex>
9 #include <condition_variable>
10 #include <cds/details/aligned_type.h>
11
12 namespace cds { namespace urcu {
13
14     /// Reclamation thread for \p general_threaded and \p signal_threaded URCU
15     /**
16         The object of this class contains a reclamation thread object and
17         necessary synchronization object(s). The object manages reclamation thread
18         and defines a set of messages (i.e. methods) to communicate with the thread.
19
20         Template argument \p Buffer defines the buffer type of \ref general_threaded (or \ref signal_threaded) URCU.
21     */
22     template <class Buffer>
23     class dispose_thread
24     {
25     public:
26         typedef Buffer  buffer_type ;   ///< Buffer type
27     private:
28         //@cond
29         typedef std::thread                 thread_type;
30         typedef std::mutex              mutex_type;
31         typedef std::condition_variable condvar_type;
32         typedef std::unique_lock< mutex_type >  unique_lock;
33
34         class dispose_thread_starter: public thread_type
35         {
36             static void thread_func( dispose_thread * pThis )
37             {
38                 pThis->execute();
39             }
40
41         public:
42             dispose_thread_starter( dispose_thread * pThis )
43                 : thread_type( thread_func, pThis )
44             {}
45         };
46
47         typedef char thread_placeholder[ sizeof(dispose_thread_starter) ];
48         typename cds::details::aligned_type< thread_placeholder, alignof( dispose_thread_starter ) >::type  m_threadPlaceholder;
49         dispose_thread_starter *                m_DisposeThread;
50
51         // synchronization with disposing thread
52         mutex_type      m_Mutex;
53         condvar_type    m_cvDataReady;
54
55         // Task for thread (dispose cycle)
56         buffer_type * volatile  m_pBuffer;
57         uint64_t volatile       m_nCurEpoch;
58
59         // Quit flag
60         bool volatile           m_bQuit;
61
62         // disposing pass sync
63         condvar_type            m_cvReady;
64         bool volatile           m_bReady;
65         //@endcond
66
67     private: // methods called from disposing thread
68         //@cond
69         void execute()
70         {
71             buffer_type *   pBuffer;
72             uint64_t        nCurEpoch;
73             bool            bQuit = false;
74
75             while ( !bQuit ) {
76                 {
77                     unique_lock lock( m_Mutex );
78
79                     // signal that we are ready to dispose
80                     m_bReady = true;
81                     m_cvReady.notify_one();
82
83                     // wait new data portion
84                     while ( !m_pBuffer )
85                         m_cvDataReady.wait( lock );
86
87                     // New work is ready
88                     m_bReady = false ;   // we are busy
89
90                     bQuit = m_bQuit;
91                     nCurEpoch = m_nCurEpoch;
92                     pBuffer = m_pBuffer;
93                     m_pBuffer = nullptr;
94                 }
95
96                 if ( pBuffer )
97                     dispose_buffer( pBuffer, nCurEpoch );
98             }
99         }
100
101         void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
102         {
103             epoch_retired_ptr p;
104             while ( pBuf->pop( p ) ) {
105                 if ( p.m_nEpoch <= nCurEpoch ) {
106                     p.free();
107                 }
108                 else {
109                     pBuf->push( p );
110                     break;
111                 }
112             }
113         }
114         //@endcond
115
116     public:
117         //@cond
118         dispose_thread()
119             : m_pBuffer( nullptr )
120             , m_nCurEpoch(0)
121             , m_bQuit( false )
122             , m_bReady( false )
123         {}
124         //@endcond
125
126     public: // methods called from any thread
127         /// Start reclamation thread
128         /**
129             This function is called by \ref general_threaded object to start
130             internal reclamation thread.
131         */
132         void start()
133         {
134             m_DisposeThread = new (m_threadPlaceholder) dispose_thread_starter( this );
135         }
136
137         /// Stop reclamation thread
138         /**
139             This function is called by \ref general_threaded object to
140             start reclamation cycle and then to terminate reclamation thread.
141
142             \p buf buffer contains retired objects ready to free.
143         */
144         void stop( buffer_type& buf, uint64_t nCurEpoch )
145         {
146             {
147                 unique_lock lock( m_Mutex );
148
149                 // wait while retiring pass done
150                 while ( !m_bReady )
151                     m_cvReady.wait( lock );
152
153                 // give a new work and set stop flag
154                 m_pBuffer = &buf;
155                 m_nCurEpoch = nCurEpoch;
156                 m_bQuit = true;
157             }
158             m_cvDataReady.notify_one();
159
160             m_DisposeThread->join();
161         }
162
163         /// Start reclamation cycle
164         /**
165             This function is called by \ref general_threaded object
166             to notify the reclamation thread about new work.
167             \p buf buffer contains retired objects ready to free.
168             The reclamation thread should free all \p buf objects
169             \p m_nEpoch field of which is no more than \p nCurEpoch.
170
171             If \p bSync parameter is \p true the calling thread should
172             wait until disposing done.
173         */
174         void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
175         {
176             unique_lock lock( m_Mutex );
177
178             // wait while disposing pass done
179             while ( !m_bReady )
180                 m_cvReady.wait( lock );
181
182             if ( bSync )
183                 m_bReady = false;
184
185             // new work
186             m_nCurEpoch = nCurEpoch;
187             m_pBuffer = &buf;
188
189             m_cvDataReady.notify_one();
190
191             if ( bSync ) {
192                 while ( !m_bReady )
193                     m_cvReady.wait( lock );
194             }
195         }
196     };
197 }} // namespace cds::urcu
198
199 #endif // #ifdef CDSLIB_URCU_DISPOSE_THREAD_H