Removed redundant ctor
[libcds.git] / cds / urcu / dispose_thread.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_URCU_DISPOSE_THREAD_H
32 #define CDSLIB_URCU_DISPOSE_THREAD_H
33
34 #include <memory>
35 #include <thread>
36 #include <mutex>
37 #include <condition_variable>
38 #include <cds/details/aligned_type.h>
39 #include <cds/algo/atomic.h>
40
41 namespace cds { namespace urcu {
42
43     /// Reclamation thread for \p general_threaded and \p signal_threaded URCU
44     /**
45         The object of this class contains a reclamation thread object and
46         necessary synchronization object(s). The object manages reclamation thread
47         and defines a set of messages (i.e. methods) to communicate with the thread.
48
49         Template argument \p Buffer defines the buffer type of \ref general_threaded (or \ref signal_threaded) URCU.
50     */
51     template <class Buffer>
52     class dispose_thread
53     {
54     public:
55         typedef Buffer  buffer_type ;   ///< Buffer type
56     private:
57         //@cond
58         typedef std::thread             thread_type;
59         typedef std::mutex              mutex_type;
60         typedef std::condition_variable condvar_type;
61         typedef std::unique_lock< mutex_type >  unique_lock;
62
63         class dispose_thread_starter: public thread_type
64         {
65             static void thread_func( dispose_thread * pThis )
66             {
67                 pThis->execute();
68             }
69
70         public:
71             dispose_thread_starter( dispose_thread * pThis )
72                 : thread_type( thread_func, pThis )
73             {}
74         };
75
76         typedef char thread_placeholder[ sizeof(dispose_thread_starter) ];
77         typename cds::details::aligned_type< thread_placeholder, alignof( dispose_thread_starter ) >::type  m_threadPlaceholder;
78         dispose_thread_starter *                m_DisposeThread;
79
80         // synchronization with disposing thread
81         mutex_type      m_Mutex;
82         condvar_type    m_cvDataReady;
83
84         // Task for thread (dispose cycle)
85         atomics::atomic<buffer_type *>  m_pBuffer{ nullptr };
86         uint64_t m_nCurEpoch = 0;
87
88         // Quit flag
89         bool    m_bQuit = false;
90
91         // disposing pass sync
92         condvar_type        m_cvReady;
93         bool                m_bReady = false;
94         //@endcond
95
96     private: // methods called from disposing thread
97         //@cond
98         void execute()
99         {
100             buffer_type *   pBuffer;
101             uint64_t        nCurEpoch;
102             bool            bQuit = false;
103
104             while ( !bQuit ) {
105
106                 // signal that we are ready to dispose
107                 {
108                     unique_lock lock( m_Mutex );
109                     m_bReady = true;
110                 }
111                 m_cvReady.notify_one();
112
113                 {
114                     // wait new data portion
115                     unique_lock lock( m_Mutex );
116
117                     while ( (pBuffer = m_pBuffer.load( atomics::memory_order_relaxed )) == nullptr )
118                         m_cvDataReady.wait( lock );
119
120                     // New work is ready
121                     m_bReady = false; // we are busy
122
123                     bQuit = m_bQuit;
124                     nCurEpoch = m_nCurEpoch;
125                     m_pBuffer.store( nullptr, atomics::memory_order_relaxed );
126                 }
127
128                 if ( pBuffer )
129                     dispose_buffer( pBuffer, nCurEpoch );
130             }
131         }
132
133         void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
134         {
135             epoch_retired_ptr * p;
136             while ( ( p = pBuf->front()) != nullptr ) {
137                 if ( p->m_nEpoch <= nCurEpoch ) {
138                     p->free();
139                     CDS_VERIFY( pBuf->pop_front());
140                 }
141                 else
142                     break;
143             }
144         }
145         //@endcond
146
147     public: // methods called from any thread
148         /// Start reclamation thread
149         /**
150             This function is called by \ref general_threaded object to start
151             internal reclamation thread.
152         */
153         void start()
154         {
155             m_DisposeThread = new (m_threadPlaceholder) dispose_thread_starter( this );
156         }
157
158         /// Stop reclamation thread
159         /**
160             This function is called by \ref general_threaded object to
161             start reclamation cycle and then to terminate reclamation thread.
162
163             \p buf buffer contains retired objects ready to free.
164         */
165         void stop( buffer_type& buf, uint64_t nCurEpoch )
166         {
167             {
168                 unique_lock lock( m_Mutex );
169
170                 // wait while retiring pass done
171                 while ( !m_bReady )
172                     m_cvReady.wait( lock );
173
174                 // give a new work and set stop flag
175                 m_nCurEpoch = nCurEpoch;
176                 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
177                 m_bQuit = true;
178             }
179             m_cvDataReady.notify_one();
180
181             m_DisposeThread->join();
182         }
183
184         /// Start reclamation cycle
185         /**
186             This function is called by \ref general_threaded object
187             to notify the reclamation thread about a new work.
188             \p buf buffer contains retired objects ready to free.
189             The reclamation thread should free all \p buf objects
190             \p m_nEpoch field of which is no more than \p nCurEpoch.
191
192             If \p bSync parameter is \p true the calling thread should
193             wait until disposing done.
194         */
195         void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
196         {
197             {
198                 unique_lock lock( m_Mutex );
199
200                 // wait while disposing pass done
201                 while ( !m_bReady )
202                     m_cvReady.wait( lock );
203
204                 // new work
205                 m_bReady = false;
206                 m_nCurEpoch = nCurEpoch;
207                 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
208             }
209             m_cvDataReady.notify_one();
210
211             if ( bSync ) {
212                 unique_lock lock( m_Mutex );
213                 while ( !m_bReady )
214                     m_cvReady.wait( lock );
215             }
216         }
217     };
218 }} // namespace cds::urcu
219
220 #endif // #ifdef CDSLIB_URCU_DISPOSE_THREAD_H