Fixed data races found by tsan
[libcds.git] / cds / urcu / dispose_thread.h
1 //$$CDS-header$$1
2
3 #ifndef CDSLIB_URCU_DISPOSE_THREAD_H
4 #define CDSLIB_URCU_DISPOSE_THREAD_H
5
6 #include <memory>
7 #include <thread>
8 #include <mutex>
9 #include <condition_variable>
10 #include <cds/details/aligned_type.h>
11
12 namespace cds { namespace urcu {
13
14     /// Reclamation thread for \p general_threaded and \p signal_threaded URCU
15     /**
16         The object of this class contains a reclamation thread object and
17         necessary synchronization object(s). The object manages reclamation thread
18         and defines a set of messages (i.e. methods) to communicate with the thread.
19
20         Template argument \p Buffer defines the buffer type of \ref general_threaded (or \ref signal_threaded) URCU.
21     */
22     template <class Buffer>
23     class dispose_thread
24     {
25     public:
26         typedef Buffer  buffer_type ;   ///< Buffer type
27     private:
28         //@cond
29         typedef std::thread                 thread_type;
30         typedef std::mutex              mutex_type;
31         typedef std::condition_variable condvar_type;
32         typedef std::unique_lock< mutex_type >  unique_lock;
33
34         class dispose_thread_starter: public thread_type
35         {
36             static void thread_func( dispose_thread * pThis )
37             {
38                 pThis->execute();
39             }
40
41         public:
42             dispose_thread_starter( dispose_thread * pThis )
43                 : thread_type( thread_func, pThis )
44             {}
45         };
46
47         typedef char thread_placeholder[ sizeof(dispose_thread_starter) ];
48         typename cds::details::aligned_type< thread_placeholder, alignof( dispose_thread_starter ) >::type  m_threadPlaceholder;
49         dispose_thread_starter *                m_DisposeThread;
50
51         // synchronization with disposing thread
52         mutex_type      m_Mutex;
53         condvar_type    m_cvDataReady;
54
55         // Task for thread (dispose cycle)
56         buffer_type * volatile  m_pBuffer;
57         uint64_t volatile       m_nCurEpoch;
58
59         // Quit flag
60         bool volatile           m_bQuit;
61
62         // disposing pass sync
63         condvar_type            m_cvReady;
64         bool volatile           m_bReady;
65         //@endcond
66
67     private: // methods called from disposing thread
68         //@cond
69         void execute()
70         {
71             buffer_type *   pBuffer;
72             uint64_t        nCurEpoch;
73             bool            bQuit = false;
74
75             while ( !bQuit ) {
76                 {
77                     unique_lock lock( m_Mutex );
78
79                     // signal that we are ready to dispose
80                     m_bReady = true;
81                     m_cvReady.notify_one();
82
83                     // wait new data portion
84                     while ( !m_pBuffer )
85                         m_cvDataReady.wait( lock );
86
87                     // New work is ready
88                     m_bReady = false ;   // we are busy
89
90                     bQuit = m_bQuit;
91                     nCurEpoch = m_nCurEpoch;
92                     pBuffer = m_pBuffer;
93                     m_pBuffer = nullptr;
94                 }
95
96                 if ( pBuffer )
97                     dispose_buffer( pBuffer, nCurEpoch );
98             }
99         }
100
101         void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
102         {
103             epoch_retired_ptr p;
104             while ( pBuf->pop( p ) ) {
105                 if ( p.m_nEpoch <= nCurEpoch ) {
106                     CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
107                     p.free();
108                     CDS_TSAN_ANNOTATE_IGNORE_RW_END;
109                 }
110                 else {
111                     pBuf->push( p );
112                     break;
113                 }
114             }
115         }
116         //@endcond
117
118     public:
119         //@cond
120         dispose_thread()
121             : m_pBuffer( nullptr )
122             , m_nCurEpoch(0)
123             , m_bQuit( false )
124             , m_bReady( false )
125         {}
126         //@endcond
127
128     public: // methods called from any thread
129         /// Start reclamation thread
130         /**
131             This function is called by \ref general_threaded object to start
132             internal reclamation thread.
133         */
134         void start()
135         {
136             m_DisposeThread = new (m_threadPlaceholder) dispose_thread_starter( this );
137         }
138
139         /// Stop reclamation thread
140         /**
141             This function is called by \ref general_threaded object to
142             start reclamation cycle and then to terminate reclamation thread.
143
144             \p buf buffer contains retired objects ready to free.
145         */
146         void stop( buffer_type& buf, uint64_t nCurEpoch )
147         {
148             {
149                 unique_lock lock( m_Mutex );
150
151                 // wait while retiring pass done
152                 while ( !m_bReady )
153                     m_cvReady.wait( lock );
154
155                 // give a new work and set stop flag
156                 m_pBuffer = &buf;
157                 m_nCurEpoch = nCurEpoch;
158                 m_bQuit = true;
159             }
160             m_cvDataReady.notify_one();
161
162             m_DisposeThread->join();
163         }
164
165         /// Start reclamation cycle
166         /**
167             This function is called by \ref general_threaded object
168             to notify the reclamation thread about new work.
169             \p buf buffer contains retired objects ready to free.
170             The reclamation thread should free all \p buf objects
171             \p m_nEpoch field of which is no more than \p nCurEpoch.
172
173             If \p bSync parameter is \p true the calling thread should
174             wait until disposing done.
175         */
176         void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
177         {
178             unique_lock lock( m_Mutex );
179
180             // wait while disposing pass done
181             while ( !m_bReady )
182                 m_cvReady.wait( lock );
183
184             if ( bSync )
185                 m_bReady = false;
186
187             // new work
188             m_nCurEpoch = nCurEpoch;
189             m_pBuffer = &buf;
190
191             m_cvDataReady.notify_one();
192
193             if ( bSync ) {
194                 while ( !m_bReady )
195                     m_cvReady.wait( lock );
196             }
197         }
198     };
199 }} // namespace cds::urcu
200
201 #endif // #ifdef CDSLIB_URCU_DISPOSE_THREAD_H