Move libcds 1.6.0 from SVN
[libcds.git] / cds / urcu / details / sig_threaded.h
1 //$$CDS-header$$1
2
3 #ifndef _CDS_URCU_DETAILS_SIG_THREADED_H
4 #define _CDS_URCU_DETAILS_SIG_THREADED_H
5
6 #include <cds/urcu/details/sh.h>
7 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
8
9 #include <cds/urcu/dispose_thread.h>
10 #include <cds/backoff_strategy.h>
11 #include <cds/container/vyukov_mpmc_cycle_queue.h>
12
13 namespace cds { namespace urcu {
14
15     /// User-space signal-handled RCU with deferred threaded reclamation
16     /**
17         @headerfile cds/urcu/signal_threaded.h
18
19         This implementation is similar to \ref signal_buffered but separate thread is created
20         for deleting the retired objects. Like \p %signal_buffered, the class contains an internal buffer
21         where retired objects are accumulated. When the buffer becomes full,
22         the RCU \p synchronize function is called that waits until all reader/updater threads end up their read-side critical sections,
23         i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread.
24         The reclamation thread frees the buffer.
25         This synchronization cycle may be called in any thread that calls \ref retire_ptr function.
26
27         There is a wrapper \ref cds_urcu_signal_threaded_gc "gc<signal_threaded>" for \p %signal_threaded class
28         that provides unified RCU interface. You should use this wrapper class instead \p %signal_threaded
29
30         Template arguments:
31         - \p Buffer - buffer type with FIFO semantics. Default is cds::container::VyukovMPMCCycleQueue. See \ref signal_buffered
32             for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr
33             type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
34             has been placed into the buffer. The \p %signal_threaded object has a global epoch counter
35             that is incremented on each \p synchronize call. The epoch is used internally to prevent early deletion.
36         - \p Lock - mutex type, default is \p std::mutex
37         - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
38             see the description of this class for required interface.
39         - \p Backoff - back-off schema, default is cds::backoff::Default
40     */
41     template <
42         class Buffer = cds::container::VyukovMPMCCycleQueue<
43             epoch_retired_ptr
44             ,cds::opt::buffer< cds::opt::v::dynamic_buffer< epoch_retired_ptr > >
45         >
46         ,class Lock = cds_std::mutex
47         ,class DisposerThread = dispose_thread<Buffer>
48         ,class Backoff = cds::backoff::Default
49     >
50     class signal_threaded: public details::sh_singleton< signal_threaded_tag >
51     {
52         //@cond
53         typedef details::sh_singleton< signal_threaded_tag > base_class;
54         //@endcond
55     public:
56         typedef Buffer          buffer_type ;   ///< Buffer type
57         typedef Lock            lock_type   ;   ///< Lock type
58         typedef Backoff         back_off    ;   ///< Back-off scheme
59         typedef DisposerThread  disposer_thread ;   ///< Disposer thread type
60
61         typedef signal_threaded_tag     rcu_tag ;       ///< Thread-side RCU part
62         typedef base_class::thread_gc   thread_gc ;     ///< Access lock class
63         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
64
65         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
66
67     protected:
68         //@cond
69         typedef details::sh_singleton_instance< rcu_tag >    singleton_ptr;
70
71         struct scoped_disposer {
72             void operator ()( signal_threaded * p )
73             {
74                 delete p;
75             }
76         };
77         //@endcond
78
79     protected:
80         //@cond
81         buffer_type                     m_Buffer;
82         CDS_ATOMIC::atomic<uint64_t>    m_nCurEpoch;
83         lock_type                       m_Lock;
84         size_t const                    m_nCapacity;
85         disposer_thread                 m_DisposerThread;
86         //@endcond
87
88     public:
89         /// Returns singleton instance
90         static signal_threaded * instance()
91         {
92             return static_cast<signal_threaded *>( base_class::instance() );
93         }
94         /// Checks if the singleton is created and ready to use
95         static bool isUsed()
96         {
97             return singleton_ptr::s_pRCU != null_ptr<singleton_vtbl *>();
98         }
99
100     protected:
101         //@cond
102         signal_threaded( size_t nBufferCapacity, int nSignal = SIGUSR1 )
103             : base_class( nSignal )
104             , m_Buffer( nBufferCapacity )
105             , m_nCurEpoch( 1 )
106             , m_nCapacity( nBufferCapacity )
107         {}
108
109         // Return: true - synchronize has been called, false - otherwise
110         bool push_buffer( epoch_retired_ptr& p )
111         {
112             bool bPushed = m_Buffer.push( p );
113             if ( !bPushed || m_Buffer.size() >= capacity() ) {
114                 synchronize();
115                 if ( !bPushed )
116                     p.free();
117                 return true;
118             }
119             return false;
120         }
121
122         //@endcond
123
124     public:
125         //@cond
126         ~signal_threaded()
127         {}
128         //@endcond
129
130         /// Creates singleton object and starts reclamation thread
131         /**
132             The \p nBufferCapacity parameter defines RCU threshold.
133
134             The \p nSignal parameter defines a signal number stated for RCU, default is \p SIGUSR1
135         */
136         static void Construct( size_t nBufferCapacity = 256, int nSignal = SIGUSR1 )
137         {
138             if ( !singleton_ptr::s_pRCU ) {
139                 std::unique_ptr< signal_threaded, scoped_disposer > pRCU( new signal_threaded( nBufferCapacity, nSignal ) );
140                 pRCU->m_DisposerThread.start();
141
142                 singleton_ptr::s_pRCU = pRCU.release();
143             }
144         }
145
146         /// Destroys singleton object and terminates internal reclamation thread
147         static void Destruct( bool bDetachAll = false )
148         {
149             if ( isUsed() ) {
150                 signal_threaded * pThis = instance();
151                 if ( bDetachAll )
152                     pThis->m_ThreadList.detach_all();
153
154                 pThis->m_DisposerThread.stop( pThis->m_Buffer, pThis->m_nCurEpoch.load( CDS_ATOMIC::memory_order_acquire ));
155
156                 delete pThis;
157                 singleton_ptr::s_pRCU = null_ptr<singleton_vtbl *>();
158             }
159         }
160
161     public:
162         /// Retires \p p pointer
163         /**
164             The method pushes \p p pointer to internal buffer.
165             When the buffer becomes full \ref synchronize function is called
166             to wait for the end of grace period and then
167             a message is sent to the reclamation thread.
168         */
169         virtual void retire_ptr( retired_ptr& p )
170         {
171             if ( p.m_p ) {
172                 epoch_retired_ptr ep( p, m_nCurEpoch.load( CDS_ATOMIC::memory_order_acquire ) );
173                 push_buffer( ep );
174             }
175         }
176
177         /// Retires the pointer chain [\p itFirst, \p itLast)
178         template <typename ForwardIterator>
179         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
180         {
181             uint64_t nEpoch = m_nCurEpoch.load( CDS_ATOMIC::memory_order_relaxed );
182             while ( itFirst != itLast ) {
183                 epoch_retired_ptr p( *itFirst, nEpoch );
184                 ++itFirst;
185                 push_buffer( p );
186             }
187         }
188
189         /// Waits to finish a grace period and calls disposing thread
190         void synchronize()
191         {
192             synchronize( false );
193         }
194
195         //@cond
196         void synchronize( bool bSync )
197         {
198             uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, CDS_ATOMIC::memory_order_release );
199
200             CDS_ATOMIC::atomic_thread_fence( CDS_ATOMIC::memory_order_acquire );
201             {
202                 cds::lock::scoped_lock<lock_type> sl( m_Lock );
203
204                 back_off bkOff;
205                 base_class::force_membar_all_threads( bkOff );
206                 base_class::switch_next_epoch();
207                 bkOff.reset();
208                 base_class::wait_for_quiescent_state( bkOff );
209                 base_class::switch_next_epoch();
210                 bkOff.reset();
211                 base_class::wait_for_quiescent_state( bkOff );
212                 base_class::force_membar_all_threads( bkOff );
213
214                 m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
215             }
216         }
217         void force_dispose()
218         {
219             synchronize( true );
220         }
221         //@endcond
222
223         /// Returns the threshold of internal buffer
224         size_t capacity() const
225         {
226             return m_nCapacity;
227         }
228
229         /// Returns the signal number stated for RCU
230         int signal_no() const
231         {
232             return base_class::signal_no();
233         }
234     };
235 }} // namespace cds::urcu
236
237 #endif // #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
238 #endif // #ifndef _CDS_URCU_DETAILS_SIG_THREADED_H