Removed redundant atomics in urcu disposing thread
[libcds.git] / cds / urcu / dispose_thread.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_URCU_DISPOSE_THREAD_H
32 #define CDSLIB_URCU_DISPOSE_THREAD_H
33
34 #include <memory>
35 #include <thread>
36 #include <mutex>
37 #include <condition_variable>
38 #include <cds/details/aligned_type.h>
39 #include <cds/algo/atomic.h>
40
41 namespace cds { namespace urcu {
42
43     /// Reclamation thread for \p general_threaded and \p signal_threaded URCU
44     /**
45         The object of this class contains a reclamation thread object and
46         necessary synchronization object(s). The object manages reclamation thread
47         and defines a set of messages (i.e. methods) to communicate with the thread.
48
49         Template argument \p Buffer defines the buffer type of \ref general_threaded (or \ref signal_threaded) URCU.
50     */
51     template <class Buffer>
52     class dispose_thread
53     {
54     public:
55         typedef Buffer  buffer_type ;   ///< Buffer type
56     private:
57         //@cond
58         typedef std::thread             thread_type;
59         typedef std::mutex              mutex_type;
60         typedef std::condition_variable condvar_type;
61         typedef std::unique_lock< mutex_type >  unique_lock;
62
63         class dispose_thread_starter: public thread_type
64         {
65             static void thread_func( dispose_thread * pThis )
66             {
67                 pThis->execute();
68             }
69
70         public:
71             dispose_thread_starter( dispose_thread * pThis )
72                 : thread_type( thread_func, pThis )
73             {}
74         };
75
76         typedef char thread_placeholder[ sizeof(dispose_thread_starter) ];
77         typename cds::details::aligned_type< thread_placeholder, alignof( dispose_thread_starter ) >::type  m_threadPlaceholder;
78         dispose_thread_starter *                m_DisposeThread;
79
80         // synchronization with disposing thread
81         mutex_type      m_Mutex;
82         condvar_type    m_cvDataReady;
83
84         // Task for thread (dispose cycle)
85         atomics::atomic<buffer_type *>  m_pBuffer{ nullptr };
86         uint64_t m_nCurEpoch = 0;
87
88         // Quit flag
89         bool    m_bQuit = false;
90
91         // disposing pass sync
92         condvar_type        m_cvReady;
93         bool                m_bReady = false;
94         //@endcond
95
96     private: // methods called from disposing thread
97         //@cond
98         void execute()
99         {
100             buffer_type *   pBuffer;
101             uint64_t        nCurEpoch;
102             bool            bQuit = false;
103
104             while ( !bQuit ) {
105
106                 // signal that we are ready to dispose
107                 {
108                     unique_lock lock( m_Mutex );
109                     m_bReady = true;
110                 }
111                 m_cvReady.notify_one();
112
113                 {
114                     // wait new data portion
115                     unique_lock lock( m_Mutex );
116
117                     while ( (pBuffer = m_pBuffer.load( atomics::memory_order_relaxed )) == nullptr )
118                         m_cvDataReady.wait( lock );
119
120                     // New work is ready
121                     m_bReady = false; // we are busy
122
123                     bQuit = m_bQuit;
124                     nCurEpoch = m_nCurEpoch;
125                     m_pBuffer.store( nullptr, atomics::memory_order_relaxed );
126                 }
127
128                 if ( pBuffer )
129                     dispose_buffer( pBuffer, nCurEpoch );
130             }
131         }
132
133         void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
134         {
135             epoch_retired_ptr * p;
136             while ( ( p = pBuf->front()) != nullptr ) {
137                 if ( p->m_nEpoch <= nCurEpoch ) {
138                     p->free();
139                     CDS_VERIFY( pBuf->pop_front());
140                 }
141                 else
142                     break;
143             }
144         }
145         //@endcond
146
147     public:
148         //@cond
149         dispose_thread()
150             //: m_pBuffer( nullptr )
151         {}
152         //@endcond
153
154     public: // methods called from any thread
155         /// Start reclamation thread
156         /**
157             This function is called by \ref general_threaded object to start
158             internal reclamation thread.
159         */
160         void start()
161         {
162             m_DisposeThread = new (m_threadPlaceholder) dispose_thread_starter( this );
163         }
164
165         /// Stop reclamation thread
166         /**
167             This function is called by \ref general_threaded object to
168             start reclamation cycle and then to terminate reclamation thread.
169
170             \p buf buffer contains retired objects ready to free.
171         */
172         void stop( buffer_type& buf, uint64_t nCurEpoch )
173         {
174             {
175                 unique_lock lock( m_Mutex );
176
177                 // wait while retiring pass done
178                 while ( !m_bReady )
179                     m_cvReady.wait( lock );
180
181                 // give a new work and set stop flag
182                 m_nCurEpoch = nCurEpoch;
183                 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
184                 m_bQuit = true;
185             }
186             m_cvDataReady.notify_one();
187
188             m_DisposeThread->join();
189         }
190
191         /// Start reclamation cycle
192         /**
193             This function is called by \ref general_threaded object
194             to notify the reclamation thread about a new work.
195             \p buf buffer contains retired objects ready to free.
196             The reclamation thread should free all \p buf objects
197             \p m_nEpoch field of which is no more than \p nCurEpoch.
198
199             If \p bSync parameter is \p true the calling thread should
200             wait until disposing done.
201         */
202         void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
203         {
204             {
205                 unique_lock lock( m_Mutex );
206
207                 // wait while disposing pass done
208                 while ( !m_bReady )
209                     m_cvReady.wait( lock );
210
211                 // new work
212                 m_bReady = false;
213                 m_nCurEpoch = nCurEpoch;
214                 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
215             }
216             m_cvDataReady.notify_one();
217
218             if ( bSync ) {
219                 unique_lock lock( m_Mutex );
220                 while ( !m_bReady )
221                     m_cvReady.wait( lock );
222             }
223         }
224     };
225 }} // namespace cds::urcu
226
227 #endif // #ifdef CDSLIB_URCU_DISPOSE_THREAD_H