Fixed GCC 4.8 incompatibility
[libcds.git] / cds / urcu / dispose_thread.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_URCU_DISPOSE_THREAD_H
32 #define CDSLIB_URCU_DISPOSE_THREAD_H
33
34 #include <memory>
35 #include <thread>
36 #include <mutex>
37 #include <condition_variable>
38 #include <cds/details/aligned_type.h>
39 #include <cds/algo/atomic.h>
40
41 namespace cds { namespace urcu {
42
43     /// Reclamation thread for \p general_threaded URCU
44     /**
45         The object of this class contains a reclamation thread object and
46         necessary synchronization object(s). The object manages reclamation thread
47         and defines a set of messages (i.e. methods) to communicate with the thread.
48
49         Template argument \p Buffer defines the buffer type of \ref general_threaded URCU.
50     */
51     template <class Buffer>
52     class dispose_thread
53     {
54     public:
55         typedef Buffer  buffer_type ;   ///< Buffer type
56     private:
57         //@cond
58         typedef std::thread             thread_type;
59         typedef std::mutex              mutex_type;
60         typedef std::condition_variable condvar_type;
61         typedef std::unique_lock< mutex_type >  unique_lock;
62
63         class dispose_thread_starter: public thread_type
64         {
65             static void thread_func( dispose_thread * pThis )
66             {
67                 pThis->execute();
68             }
69
70         public:
71             dispose_thread_starter( dispose_thread * pThis )
72                 : thread_type( thread_func, pThis )
73             {}
74         };
75
76         typedef char thread_placeholder[ sizeof(dispose_thread_starter) ];
77         typename cds::details::aligned_type< thread_placeholder, alignof( dispose_thread_starter ) >::type  m_threadPlaceholder;
78         dispose_thread_starter *                m_DisposeThread;
79
80         // synchronization with disposing thread
81         mutex_type      m_Mutex;
82         condvar_type    m_cvDataReady;
83
84         // Task for thread (dispose cycle)
85         atomics::atomic<buffer_type *>  m_pBuffer;
86         uint64_t m_nCurEpoch = 0;
87
88         // Quit flag
89         bool    m_bQuit = false;
90
91         // disposing pass sync
92         condvar_type        m_cvReady;
93         bool                m_bReady = false;
94         //@endcond
95
96     private: // methods called from disposing thread
97         //@cond
98         void execute()
99         {
100             buffer_type *   pBuffer;
101             uint64_t        nCurEpoch;
102             bool            bQuit = false;
103
104             while ( !bQuit ) {
105
106                 // signal that we are ready to dispose
107                 {
108                     unique_lock lock( m_Mutex );
109                     m_bReady = true;
110                 }
111                 m_cvReady.notify_one();
112
113                 {
114                     // wait new data portion
115                     unique_lock lock( m_Mutex );
116
117                     while ( (pBuffer = m_pBuffer.load( atomics::memory_order_relaxed )) == nullptr )
118                         m_cvDataReady.wait( lock );
119
120                     // New work is ready
121                     m_bReady = false; // we are busy
122
123                     bQuit = m_bQuit;
124                     nCurEpoch = m_nCurEpoch;
125                     m_pBuffer.store( nullptr, atomics::memory_order_relaxed );
126                 }
127
128                 if ( pBuffer )
129                     dispose_buffer( pBuffer, nCurEpoch );
130             }
131         }
132
133         void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
134         {
135             epoch_retired_ptr * p;
136             while ( ( p = pBuf->front()) != nullptr ) {
137                 if ( p->m_nEpoch <= nCurEpoch ) {
138                     p->free();
139                     CDS_VERIFY( pBuf->pop_front());
140                 }
141                 else
142                     break;
143             }
144         }
145         //@endcond
146
147     public: // methods called from any thread
148         //@cond
149         dispose_thread()
150             : m_pBuffer( nullptr )
151         {}
152         //@endcond
153
154         /// Start reclamation thread
155         /**
156             This function is called by \ref general_threaded object to start
157             internal reclamation thread.
158         */
159         void start()
160         {
161             m_DisposeThread = new (m_threadPlaceholder) dispose_thread_starter( this );
162         }
163
164         /// Stop reclamation thread
165         /**
166             This function is called by \ref general_threaded object to
167             start reclamation cycle and then to terminate reclamation thread.
168
169             \p buf buffer contains retired objects ready to free.
170         */
171         void stop( buffer_type& buf, uint64_t nCurEpoch )
172         {
173             {
174                 unique_lock lock( m_Mutex );
175
176                 // wait while retiring pass done
177                 while ( !m_bReady )
178                     m_cvReady.wait( lock );
179
180                 // give a new work and set stop flag
181                 m_nCurEpoch = nCurEpoch;
182                 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
183                 m_bQuit = true;
184             }
185             m_cvDataReady.notify_one();
186
187             m_DisposeThread->join();
188         }
189
190         /// Start reclamation cycle
191         /**
192             This function is called by \ref general_threaded object
193             to notify the reclamation thread about a new work.
194             \p buf buffer contains retired objects ready to free.
195             The reclamation thread should free all \p buf objects
196             \p m_nEpoch field of which is no more than \p nCurEpoch.
197
198             If \p bSync parameter is \p true the calling thread should
199             wait until disposing done.
200         */
201         void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
202         {
203             {
204                 unique_lock lock( m_Mutex );
205
206                 // wait while disposing pass done
207                 while ( !m_bReady )
208                     m_cvReady.wait( lock );
209
210                 // new work
211                 m_bReady = false;
212                 m_nCurEpoch = nCurEpoch;
213                 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
214             }
215             m_cvDataReady.notify_one();
216
217             if ( bSync ) {
218                 unique_lock lock( m_Mutex );
219                 while ( !m_bReady )
220                     m_cvReady.wait( lock );
221             }
222         }
223     };
224 }} // namespace cds::urcu
225
226 #endif // #ifdef CDSLIB_URCU_DISPOSE_THREAD_H