Merge branch 'integration' of github.com:khizmax/libcds into integration
[libcds.git] / cds / urcu / details / gpt.h
1 //$$CDS-header$$1
2
3 #ifndef CDSLIB_URCU_DETAILS_GPT_H
4 #define CDSLIB_URCU_DETAILS_GPT_H
5
6 #include <mutex>    //unique_lock
7 #include <limits>
8 #include <cds/urcu/details/gp.h>
9 #include <cds/urcu/dispose_thread.h>
10 #include <cds/algo/backoff_strategy.h>
11 #include <cds/container/vyukov_mpmc_cycle_queue.h>
12
13 namespace cds { namespace urcu {
14
15     /// User-space general-purpose RCU with deferred threaded reclamation
16     /**
17         @headerfile cds/urcu/general_threaded.h
18
19         This implementation is similar to \ref general_buffered but separate thread is created
20         for deleting the retired objects. Like \p %general_buffered, the class contains an internal buffer
21         where retired objects are accumulated. When the buffer becomes full,
22         the RCU \p synchronize() function is called that waits until all reader/updater threads end up their read-side critical sections,
23         i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread.
24         The reclamation thread frees the buffer.
25         This synchronization cycle may be called in any thread that calls \p retire_ptr() function.
26
27         There is a wrapper \ref cds_urcu_general_threaded_gc "gc<general_threaded>" for \p %general_threaded class
28         that provides unified RCU interface. You should use this wrapper class instead \p %general_threaded
29
30         The \p Buffer contains items of \ref cds_urcu_retired_ptr "epoch_retired_ptr" type 
31         and it should support a multiple producer/single consumer queue with the following interface:
32         - <tt> bool push( epoch_retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
33         returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
34         - <tt>epoch_retired_ptr * front() </tt> - returns a pointer to the top element or \p nullptr if the buffer is empty.
35         - <tt>bool pop_front() </tt> - pops the top element; returns \p false if the buffer is empty.
36         - <tt>size_t size()</tt> - returns queue's item count.
37
38         The buffer is considered as full if \p push() returns \p false or the buffer size reaches the RCU threshold.
39
40         Template arguments:
41         - \p Buffer - MPSC (muliple producer/single consumer) buffer type with FIFO semantics. 
42             Default is \p cds::container::VyukovMPSCCycleQueue. The buffer contains the objects of \ref epoch_retired_ptr
43             type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
44             has been placed into the buffer. The \p %general_threaded object has a global epoch counter
45             that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion.
46         - \p Lock - mutex type, default is \p std::mutex
47         - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
48             see the description of this class for required interface.
49         - \p Backoff - back-off schema, default is cds::backoff::Default
50     */
51     template <
52         class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
53         ,class Lock = std::mutex
54         ,class DisposerThread = dispose_thread<Buffer>
55         ,class Backoff = cds::backoff::Default
56     >
57     class general_threaded: public details::gp_singleton< general_threaded_tag >
58     {
59         //@cond
60         typedef details::gp_singleton< general_threaded_tag > base_class;
61         //@endcond
62     public:
63         typedef Buffer          buffer_type ;   ///< Buffer type
64         typedef Lock            lock_type   ;   ///< Lock type
65         typedef Backoff         back_off    ;   ///< Back-off scheme
66         typedef DisposerThread  disposer_thread ;   ///< Disposer thread type
67
68         typedef general_threaded_tag    rcu_tag ;       ///< Thread-side RCU part
69         typedef base_class::thread_gc   thread_gc ;     ///< Access lock class
70         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
71
72         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
73
74     protected:
75         //@cond
76         typedef details::gp_singleton_instance< rcu_tag >    singleton_ptr;
77
78         struct scoped_disposer {
79             void operator ()( general_threaded * p )
80             {
81                 delete p;
82             }
83         };
84         //@endcond
85
86     protected:
87         //@cond
88         buffer_type               m_Buffer;
89         atomics::atomic<uint64_t> m_nCurEpoch;
90         lock_type                 m_Lock;
91         size_t const              m_nCapacity;
92         disposer_thread           m_DisposerThread;
93         //@endcond
94
95     public:
96         /// Returns singleton instance
97         static general_threaded * instance()
98         {
99             return static_cast<general_threaded *>( base_class::instance() );
100         }
101         /// Checks if the singleton is created and ready to use
102         static bool isUsed()
103         {
104             return singleton_ptr::s_pRCU != nullptr;
105         }
106
107     protected:
108         //@cond
109         general_threaded( size_t nBufferCapacity )
110             : m_Buffer( nBufferCapacity )
111             , m_nCurEpoch( 1 )
112             , m_nCapacity( nBufferCapacity )
113         {}
114
115         void flip_and_wait()
116         {
117             back_off bkoff;
118             base_class::flip_and_wait( bkoff );
119         }
120
121         // Return: true - synchronize has been called, false - otherwise
122         bool push_buffer( epoch_retired_ptr&& p )
123         {
124             bool bPushed = m_Buffer.push( p );
125             if ( !bPushed || m_Buffer.size() >= capacity() ) {
126                 synchronize();
127                 if ( !bPushed )
128                     p.free();
129                 return true;
130             }
131             return false;
132         }
133
134         //@endcond
135
136     public:
137         //@cond
138         ~general_threaded()
139         {}
140         //@endcond
141
142         /// Creates singleton object and starts reclamation thread
143         /**
144             The \p nBufferCapacity parameter defines RCU threshold.
145         */
146         static void Construct( size_t nBufferCapacity = 256 )
147         {
148             if ( !singleton_ptr::s_pRCU ) {
149                 std::unique_ptr< general_threaded, scoped_disposer > pRCU( new general_threaded( nBufferCapacity ) );
150                 pRCU->m_DisposerThread.start();
151
152                 singleton_ptr::s_pRCU = pRCU.release();
153             }
154         }
155
156         /// Destroys singleton object and terminates internal reclamation thread
157         static void Destruct( bool bDetachAll = false )
158         {
159             if ( isUsed() ) {
160                 general_threaded * pThis = instance();
161                 if ( bDetachAll )
162                     pThis->m_ThreadList.detach_all();
163
164                 pThis->m_DisposerThread.stop( pThis->m_Buffer, std::numeric_limits< uint64_t >::max());
165
166                 delete pThis;
167                 singleton_ptr::s_pRCU = nullptr;
168             }
169         }
170
171     public:
172         /// Retires \p p pointer
173         /**
174             The method pushes \p p pointer to internal buffer.
175             When the buffer becomes full \ref synchronize function is called
176             to wait for the end of grace period and then
177             a message is sent to the reclamation thread.
178         */
179         virtual void retire_ptr( retired_ptr& p )
180         {
181             if ( p.m_p )
182                 push_buffer( epoch_retired_ptr( p, m_nCurEpoch.load( atomics::memory_order_acquire )));
183         }
184
185         /// Retires the pointer chain [\p itFirst, \p itLast)
186         template <typename ForwardIterator>
187         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
188         {
189             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire );
190             while ( itFirst != itLast ) {
191                 epoch_retired_ptr ep( *itFirst, nEpoch );
192                 ++itFirst;
193                 push_buffer( std::move(ep));
194             }
195         }
196
197         /// Retires the pointer chain until \p Func returns \p nullptr retired pointer
198         template <typename Func>
199         void batch_retire( Func e )
200         {
201             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire );
202             for ( retired_ptr p{ e() }; p.m_p; ) {
203                 epoch_retired_ptr ep( p, nEpoch );
204                 p = e();
205                 push_buffer( std::move(ep));
206             }
207         }
208
209         /// Waits to finish a grace period and calls disposing thread
210         void synchronize()
211         {
212             synchronize( false );
213         }
214
215         //@cond
216         void synchronize( bool bSync )
217         {
218             uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
219             {
220                 std::unique_lock<lock_type> sl( m_Lock );
221                 flip_and_wait();
222                 flip_and_wait();
223             }
224             m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
225         }
226         void force_dispose()
227         {
228             synchronize( true );
229         }
230         //@endcond
231
232         /// Returns the threshold of internal buffer
233         size_t capacity() const
234         {
235             return m_nCapacity;
236         }
237     };
238 }} // namespace cds::urcu
239
240 #endif // #ifndef CDSLIB_URCU_DETAILS_GPT_H