Updated copyright
[libcds.git] / cds / urcu / dispose_thread.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_URCU_DISPOSE_THREAD_H
32 #define CDSLIB_URCU_DISPOSE_THREAD_H
33
34 #include <memory>
35 #include <thread>
36 #include <mutex>
37 #include <condition_variable>
38 #include <cds/details/aligned_type.h>
39 #include <cds/algo/atomic.h>
40
41 namespace cds { namespace urcu {
42
43     /// Reclamation thread for \p general_threaded and \p signal_threaded URCU
44     /**
45         The object of this class contains a reclamation thread object and
46         necessary synchronization object(s). The object manages reclamation thread
47         and defines a set of messages (i.e. methods) to communicate with the thread.
48
49         Template argument \p Buffer defines the buffer type of \ref general_threaded (or \ref signal_threaded) URCU.
50     */
51     template <class Buffer>
52     class dispose_thread
53     {
54     public:
55         typedef Buffer  buffer_type ;   ///< Buffer type
56     private:
57         //@cond
58         typedef std::thread             thread_type;
59         typedef std::mutex              mutex_type;
60         typedef std::condition_variable condvar_type;
61         typedef std::unique_lock< mutex_type >  unique_lock;
62
63         class dispose_thread_starter: public thread_type
64         {
65             static void thread_func( dispose_thread * pThis )
66             {
67                 pThis->execute();
68             }
69
70         public:
71             dispose_thread_starter( dispose_thread * pThis )
72                 : thread_type( thread_func, pThis )
73             {}
74         };
75
76         typedef char thread_placeholder[ sizeof(dispose_thread_starter) ];
77         typename cds::details::aligned_type< thread_placeholder, alignof( dispose_thread_starter ) >::type  m_threadPlaceholder;
78         dispose_thread_starter *                m_DisposeThread;
79
80         // synchronization with disposing thread
81         mutex_type      m_Mutex;
82         condvar_type    m_cvDataReady;
83
84         // Task for thread (dispose cycle)
85         atomics::atomic<buffer_type *>  m_pBuffer;
86         uint64_t volatile      m_nCurEpoch;
87
88         // Quit flag
89         atomics::atomic<bool>  m_bQuit;
90
91         // disposing pass sync
92         condvar_type           m_cvReady;
93         atomics::atomic<bool>  m_bReady;
94         //@endcond
95
96     private: // methods called from disposing thread
97         //@cond
98         void execute()
99         {
100             buffer_type *   pBuffer;
101             uint64_t        nCurEpoch;
102             bool            bQuit = false;
103
104             while ( !bQuit ) {
105
106                 // signal that we are ready to dispose
107                 {
108                     unique_lock lock( m_Mutex );
109                     m_bReady.store( true, atomics::memory_order_relaxed );
110                 }
111                 m_cvReady.notify_one();
112
113                 {
114                     // wait new data portion
115                     unique_lock lock( m_Mutex );
116
117                     while ( (pBuffer = m_pBuffer.load( atomics::memory_order_relaxed )) == nullptr )
118                         m_cvDataReady.wait( lock );
119
120                     // New work is ready
121                     m_bReady.store( false, atomics::memory_order_relaxed ); // we are busy
122
123                     bQuit = m_bQuit.load( atomics::memory_order_relaxed );
124                     nCurEpoch = m_nCurEpoch;
125                     m_pBuffer.store( nullptr, atomics::memory_order_relaxed );
126                 }
127
128                 if ( pBuffer )
129                     dispose_buffer( pBuffer, nCurEpoch );
130             }
131         }
132
133         void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
134         {
135             epoch_retired_ptr * p;
136             while ( ( p = pBuf->front()) != nullptr ) {
137                 if ( p->m_nEpoch <= nCurEpoch ) {
138                     p->free();
139                     CDS_VERIFY( pBuf->pop_front());
140                 }
141                 else
142                     break;
143             }
144         }
145         //@endcond
146
147     public:
148         //@cond
149         dispose_thread()
150             : m_pBuffer( nullptr )
151             , m_nCurEpoch(0)
152             , m_bQuit( false )
153             , m_bReady( false )
154         {}
155         //@endcond
156
157     public: // methods called from any thread
158         /// Start reclamation thread
159         /**
160             This function is called by \ref general_threaded object to start
161             internal reclamation thread.
162         */
163         void start()
164         {
165             m_DisposeThread = new (m_threadPlaceholder) dispose_thread_starter( this );
166         }
167
168         /// Stop reclamation thread
169         /**
170             This function is called by \ref general_threaded object to
171             start reclamation cycle and then to terminate reclamation thread.
172
173             \p buf buffer contains retired objects ready to free.
174         */
175         void stop( buffer_type& buf, uint64_t nCurEpoch )
176         {
177             {
178                 unique_lock lock( m_Mutex );
179
180                 // wait while retiring pass done
181                 while ( !m_bReady.load( atomics::memory_order_relaxed ))
182                     m_cvReady.wait( lock );
183
184                 // give a new work and set stop flag
185                 m_nCurEpoch = nCurEpoch;
186                 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
187                 m_bQuit.store( true, atomics::memory_order_relaxed );
188             }
189             m_cvDataReady.notify_one();
190
191             m_DisposeThread->join();
192         }
193
194         /// Start reclamation cycle
195         /**
196             This function is called by \ref general_threaded object
197             to notify the reclamation thread about a new work.
198             \p buf buffer contains retired objects ready to free.
199             The reclamation thread should free all \p buf objects
200             \p m_nEpoch field of which is no more than \p nCurEpoch.
201
202             If \p bSync parameter is \p true the calling thread should
203             wait until disposing done.
204         */
205         void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
206         {
207             {
208                 unique_lock lock( m_Mutex );
209
210                 // wait while disposing pass done
211                 while ( !m_bReady.load( atomics::memory_order_relaxed ))
212                     m_cvReady.wait( lock );
213
214                 // new work
215                 m_bReady.store( false, atomics::memory_order_relaxed );
216                 m_nCurEpoch = nCurEpoch;
217                 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
218             }
219             m_cvDataReady.notify_one();
220
221             if ( bSync ) {
222                 unique_lock lock( m_Mutex );
223                 while ( !m_bReady.load( atomics::memory_order_relaxed ))
224                     m_cvReady.wait( lock );
225             }
226         }
227     };
228 }} // namespace cds::urcu
229
230 #endif // #ifdef CDSLIB_URCU_DISPOSE_THREAD_H