Uses different pass count for different parallel queue test cases
[libcds.git] / cds / urcu / details / gpt.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_URCU_DETAILS_GPT_H
32 #define CDSLIB_URCU_DETAILS_GPT_H
33
34 #include <mutex>    //unique_lock
35 #include <limits>
36 #include <cds/urcu/details/gp.h>
37 #include <cds/urcu/dispose_thread.h>
38 #include <cds/algo/backoff_strategy.h>
39 #include <cds/container/vyukov_mpmc_cycle_queue.h>
40
41 namespace cds { namespace urcu {
42
43     /// User-space general-purpose RCU with deferred threaded reclamation
44     /**
45         @headerfile cds/urcu/general_threaded.h
46
47         This implementation is similar to \ref general_buffered but separate thread is created
48         for deleting the retired objects. Like \p %general_buffered, the class contains an internal buffer
49         where retired objects are accumulated. When the buffer becomes full,
50         the RCU \p synchronize() function is called that waits until all reader/updater threads end up their read-side critical sections,
51         i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread.
52         The reclamation thread frees the buffer.
53         This synchronization cycle may be called in any thread that calls \p retire_ptr() function.
54
55         There is a wrapper \ref cds_urcu_general_threaded_gc "gc<general_threaded>" for \p %general_threaded class
56         that provides unified RCU interface. You should use this wrapper class instead \p %general_threaded
57
58         The \p Buffer contains items of \ref cds_urcu_retired_ptr "epoch_retired_ptr" type
59
60         and it should support a multiple producer/single consumer queue with the following interface:
61         - <tt> bool push( epoch_retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
62         returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
63         - <tt>epoch_retired_ptr * front() </tt> - returns a pointer to the top element or \p nullptr if the buffer is empty.
64         - <tt>bool pop_front() </tt> - pops the top element; returns \p false if the buffer is empty.
65         - <tt>size_t size()</tt> - returns queue's item count.
66
67         The buffer is considered as full if \p push() returns \p false or the buffer size reaches the RCU threshold.
68
69         Template arguments:
70         - \p Buffer - MPSC (muliple producer/single consumer) buffer type with FIFO semantics.
71
72             Default is \p cds::container::VyukovMPSCCycleQueue. The buffer contains the objects of \ref epoch_retired_ptr
73             type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
74             has been placed into the buffer. The \p %general_threaded object has a global epoch counter
75             that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion.
76         - \p Lock - mutex type, default is \p std::mutex
77         - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
78             see the description of this class for required interface.
79         - \p Backoff - back-off schema, default is cds::backoff::Default
80     */
81     template <
82         class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
83         ,class Lock = std::mutex
84         ,class DisposerThread = dispose_thread<Buffer>
85         ,class Backoff = cds::backoff::Default
86     >
87     class general_threaded: public details::gp_singleton< general_threaded_tag >
88     {
89         //@cond
90         typedef details::gp_singleton< general_threaded_tag > base_class;
91         //@endcond
92     public:
93         typedef Buffer          buffer_type ;   ///< Buffer type
94         typedef Lock            lock_type   ;   ///< Lock type
95         typedef Backoff         back_off    ;   ///< Back-off scheme
96         typedef DisposerThread  disposer_thread ;   ///< Disposer thread type
97
98         typedef general_threaded_tag    rcu_tag ;       ///< Thread-side RCU part
99         typedef base_class::thread_gc   thread_gc ;     ///< Access lock class
100         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
101
102         //@cond
103         static bool const c_bBuffered = true ; ///< Bufferized RCU
104         //@endcond
105
106     protected:
107         //@cond
108         typedef details::gp_singleton_instance< rcu_tag >    singleton_ptr;
109
110         struct scoped_disposer {
111             void operator ()( general_threaded * p )
112             {
113                 delete p;
114             }
115         };
116         //@endcond
117
118     protected:
119         //@cond
120         buffer_type               m_Buffer;
121         atomics::atomic<uint64_t> m_nCurEpoch;
122         lock_type                 m_Lock;
123         size_t const              m_nCapacity;
124         disposer_thread           m_DisposerThread;
125         //@endcond
126
127     public:
128         /// Returns singleton instance
129         static general_threaded * instance()
130         {
131             return static_cast<general_threaded *>( base_class::instance());
132         }
133         /// Checks if the singleton is created and ready to use
134         static bool isUsed()
135         {
136             return singleton_ptr::s_pRCU != nullptr;
137         }
138
139     protected:
140         //@cond
141         general_threaded( size_t nBufferCapacity )
142             : m_Buffer( nBufferCapacity )
143             , m_nCurEpoch( 1 )
144             , m_nCapacity( nBufferCapacity )
145         {}
146
147         void flip_and_wait()
148         {
149             back_off bkoff;
150             base_class::flip_and_wait( bkoff );
151         }
152
153         // Return: true - synchronize has been called, false - otherwise
154         bool push_buffer( epoch_retired_ptr&& p )
155         {
156             bool bPushed = m_Buffer.push( p );
157             if ( !bPushed || m_Buffer.size() >= capacity()) {
158                 synchronize();
159                 if ( !bPushed )
160                     p.free();
161                 return true;
162             }
163             return false;
164         }
165
166         //@endcond
167
168     public:
169         //@cond
170         ~general_threaded()
171         {}
172         //@endcond
173
174         /// Creates singleton object and starts reclamation thread
175         /**
176             The \p nBufferCapacity parameter defines RCU threshold.
177         */
178         static void Construct( size_t nBufferCapacity = 256 )
179         {
180             if ( !singleton_ptr::s_pRCU ) {
181                 std::unique_ptr< general_threaded, scoped_disposer > pRCU( new general_threaded( nBufferCapacity ));
182                 pRCU->m_DisposerThread.start();
183
184                 singleton_ptr::s_pRCU = pRCU.release();
185             }
186         }
187
188         /// Destroys singleton object and terminates internal reclamation thread
189         static void Destruct( bool bDetachAll = false )
190         {
191             if ( isUsed()) {
192                 general_threaded * pThis = instance();
193                 if ( bDetachAll )
194                     pThis->m_ThreadList.detach_all();
195
196                 pThis->m_DisposerThread.stop( pThis->m_Buffer, std::numeric_limits< uint64_t >::max());
197
198                 delete pThis;
199                 singleton_ptr::s_pRCU = nullptr;
200             }
201         }
202
203     public:
204         /// Retires \p p pointer
205         /**
206             The method pushes \p p pointer to internal buffer.
207             When the buffer becomes full \ref synchronize function is called
208             to wait for the end of grace period and then
209             a message is sent to the reclamation thread.
210         */
211         virtual void retire_ptr( retired_ptr& p ) override
212         {
213             if ( p.m_p )
214                 push_buffer( epoch_retired_ptr( p, m_nCurEpoch.load( atomics::memory_order_acquire )));
215         }
216
217         /// Retires the pointer chain [\p itFirst, \p itLast)
218         template <typename ForwardIterator>
219         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
220         {
221             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire );
222             while ( itFirst != itLast ) {
223                 epoch_retired_ptr ep( *itFirst, nEpoch );
224                 ++itFirst;
225                 push_buffer( std::move(ep));
226             }
227         }
228
229         /// Retires the pointer chain until \p Func returns \p nullptr retired pointer
230         template <typename Func>
231         void batch_retire( Func e )
232         {
233             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire );
234             for ( retired_ptr p{ e() }; p.m_p; ) {
235                 epoch_retired_ptr ep( p, nEpoch );
236                 p = e();
237                 push_buffer( std::move(ep));
238             }
239         }
240
241         /// Waits to finish a grace period and calls disposing thread
242         void synchronize()
243         {
244             synchronize( false );
245         }
246
247         //@cond
248         void synchronize( bool bSync )
249         {
250             uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
251             {
252                 std::unique_lock<lock_type> sl( m_Lock );
253                 flip_and_wait();
254                 flip_and_wait();
255             }
256             m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
257         }
258         void force_dispose()
259         {
260             synchronize( true );
261         }
262         //@endcond
263
264         /// Returns the threshold of internal buffer
265         size_t capacity() const
266         {
267             return m_nCapacity;
268         }
269     };
270
271     /// User-space general-purpose RCU with deferred threaded reclamation (stripped version)
272     /**
273         @headerfile cds/urcu/general_threaded.h
274
275         This short version of \p general_threaded is intended for stripping debug info.
276         If you use \p %general_threaded with default template arguments you may use
277         this stripped version. All functionality of both classes are identical.
278     */
279     class general_threaded_stripped: public general_threaded<>
280     {};
281
282 }} // namespace cds::urcu
283
284 #endif // #ifndef CDSLIB_URCU_DETAILS_GPT_H