Removed TSan annotations, tuned memory ordering
[libcds.git] / cds / urcu / details / sig_threaded.h
1 //$$CDS-header$$1
2
3 #ifndef CDSLIB_URCU_DETAILS_SIG_THREADED_H
4 #define CDSLIB_URCU_DETAILS_SIG_THREADED_H
5
6 #include <mutex>    //unique_lock
7 #include <cds/urcu/details/sh.h>
8 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
9
10 #include <cds/urcu/dispose_thread.h>
11 #include <cds/algo/backoff_strategy.h>
12 #include <cds/container/vyukov_mpmc_cycle_queue.h>
13
14 namespace cds { namespace urcu {
15
16     /// User-space signal-handled RCU with deferred threaded reclamation
17     /**
18         @headerfile cds/urcu/signal_threaded.h
19
20         This implementation is similar to \ref signal_buffered but separate thread is created
21         for deleting the retired objects. Like \p %signal_buffered, the class contains an internal buffer
22         where retired objects are accumulated. When the buffer becomes full,
23         the RCU \p synchronize function is called that waits until all reader/updater threads end up their read-side critical sections,
24         i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread.
25         The reclamation thread frees the buffer.
26         This synchronization cycle may be called in any thread that calls \ref retire_ptr function.
27
28         There is a wrapper \ref cds_urcu_signal_threaded_gc "gc<signal_threaded>" for \p %signal_threaded class
29         that provides unified RCU interface. You should use this wrapper class instead \p %signal_threaded
30
31         Template arguments:
32         - \p Buffer - buffer type with FIFO semantics. Default is cds::container::VyukovMPMCCycleQueue. See \ref signal_buffered
33             for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr
34             type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
35             has been placed into the buffer. The \p %signal_threaded object has a global epoch counter
36             that is incremented on each \p synchronize call. The epoch is used internally to prevent early deletion.
37         - \p Lock - mutex type, default is \p std::mutex
38         - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
39             see the description of this class for required interface.
40         - \p Backoff - back-off schema, default is cds::backoff::Default
41     */
42     template <
43         class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
44         ,class Lock = std::mutex
45         ,class DisposerThread = dispose_thread<Buffer>
46         ,class Backoff = cds::backoff::Default
47     >
48     class signal_threaded: public details::sh_singleton< signal_threaded_tag >
49     {
50         //@cond
51         typedef details::sh_singleton< signal_threaded_tag > base_class;
52         //@endcond
53     public:
54         typedef Buffer          buffer_type ;   ///< Buffer type
55         typedef Lock            lock_type   ;   ///< Lock type
56         typedef Backoff         back_off    ;   ///< Back-off scheme
57         typedef DisposerThread  disposer_thread ;   ///< Disposer thread type
58
59         typedef signal_threaded_tag     rcu_tag ;       ///< Thread-side RCU part
60         typedef base_class::thread_gc   thread_gc ;     ///< Access lock class
61         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
62
63         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
64
65     protected:
66         //@cond
67         typedef details::sh_singleton_instance< rcu_tag >    singleton_ptr;
68
69         struct scoped_disposer {
70             void operator ()( signal_threaded * p )
71             {
72                 delete p;
73             }
74         };
75         //@endcond
76
77     protected:
78         //@cond
79         buffer_type                     m_Buffer;
80         atomics::atomic<uint64_t>    m_nCurEpoch;
81         lock_type                       m_Lock;
82         size_t const                    m_nCapacity;
83         disposer_thread                 m_DisposerThread;
84         //@endcond
85
86     public:
87         /// Returns singleton instance
88         static signal_threaded * instance()
89         {
90             return static_cast<signal_threaded *>( base_class::instance() );
91         }
92         /// Checks if the singleton is created and ready to use
93         static bool isUsed()
94         {
95             return singleton_ptr::s_pRCU != nullptr;
96         }
97
98     protected:
99         //@cond
100         signal_threaded( size_t nBufferCapacity, int nSignal = SIGUSR1 )
101             : base_class( nSignal )
102             , m_Buffer( nBufferCapacity )
103             , m_nCurEpoch( 1 )
104             , m_nCapacity( nBufferCapacity )
105         {}
106
107         // Return: true - synchronize has been called, false - otherwise
108         bool push_buffer( epoch_retired_ptr& p )
109         {
110             bool bPushed = m_Buffer.push( p );
111             if ( !bPushed || m_Buffer.size() >= capacity() ) {
112                 synchronize();
113                 if ( !bPushed ) {
114                     p.free();
115                 }
116                 return true;
117             }
118             return false;
119         }
120
121         //@endcond
122
123     public:
124         //@cond
125         ~signal_threaded()
126         {}
127         //@endcond
128
129         /// Creates singleton object and starts reclamation thread
130         /**
131             The \p nBufferCapacity parameter defines RCU threshold.
132
133             The \p nSignal parameter defines a signal number stated for RCU, default is \p SIGUSR1
134         */
135         static void Construct( size_t nBufferCapacity = 256, int nSignal = SIGUSR1 )
136         {
137             if ( !singleton_ptr::s_pRCU ) {
138                 std::unique_ptr< signal_threaded, scoped_disposer > pRCU( new signal_threaded( nBufferCapacity, nSignal ) );
139                 pRCU->m_DisposerThread.start();
140
141                 singleton_ptr::s_pRCU = pRCU.release();
142             }
143         }
144
145         /// Destroys singleton object and terminates internal reclamation thread
146         static void Destruct( bool bDetachAll = false )
147         {
148             if ( isUsed() ) {
149                 signal_threaded * pThis = instance();
150                 if ( bDetachAll )
151                     pThis->m_ThreadList.detach_all();
152
153                 pThis->m_DisposerThread.stop( pThis->m_Buffer, pThis->m_nCurEpoch.load( atomics::memory_order_acquire ));
154
155                 delete pThis;
156                 singleton_ptr::s_pRCU = nullptr;
157             }
158         }
159
160     public:
161         /// Retires \p p pointer
162         /**
163             The method pushes \p p pointer to internal buffer.
164             When the buffer becomes full \ref synchronize function is called
165             to wait for the end of grace period and then
166             a message is sent to the reclamation thread.
167         */
168         virtual void retire_ptr( retired_ptr& p )
169         {
170             if ( p.m_p ) {
171                 epoch_retired_ptr ep( p, m_nCurEpoch.load( atomics::memory_order_acquire ) );
172                 push_buffer( ep );
173             }
174         }
175
176         /// Retires the pointer chain [\p itFirst, \p itLast)
177         template <typename ForwardIterator>
178         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
179         {
180             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
181             while ( itFirst != itLast ) {
182                 epoch_retired_ptr p( *itFirst, nEpoch );
183                 ++itFirst;
184                 push_buffer( p );
185             }
186         }
187
188         /// Waits to finish a grace period and calls disposing thread
189         void synchronize()
190         {
191             synchronize( false );
192         }
193
194         //@cond
195         void synchronize( bool bSync )
196         {
197             uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
198
199             atomics::atomic_thread_fence( atomics::memory_order_acquire );
200             {
201                 std::unique_lock<lock_type> sl( m_Lock );
202
203                 back_off bkOff;
204                 base_class::force_membar_all_threads( bkOff );
205                 base_class::switch_next_epoch();
206                 bkOff.reset();
207                 base_class::wait_for_quiescent_state( bkOff );
208                 base_class::switch_next_epoch();
209                 bkOff.reset();
210                 base_class::wait_for_quiescent_state( bkOff );
211                 base_class::force_membar_all_threads( bkOff );
212
213                 m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
214             }
215         }
216         void force_dispose()
217         {
218             synchronize( true );
219         }
220         //@endcond
221
222         /// Returns the threshold of internal buffer
223         size_t capacity() const
224         {
225             return m_nCapacity;
226         }
227
228         /// Returns the signal number stated for RCU
229         int signal_no() const
230         {
231             return base_class::signal_no();
232         }
233     };
234 }} // namespace cds::urcu
235
236 #endif // #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
237 #endif // #ifndef CDSLIB_URCU_DETAILS_SIG_THREADED_H