Replace cds::lock::scoped_lock with std::unique_lock, remove cds/lock/scoped_lock.h
[libcds.git] / cds / container / rwqueue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_CONTAINER_RWQUEUE_H
4 #define __CDS_CONTAINER_RWQUEUE_H
5
6 #include <memory>
7 #include <functional>   // ref
8 #include <mutex>        // unique_lock
9 #include <cds/opt/options.h>
10 #include <cds/lock/spinlock.h>
11 #include <cds/intrusive/details/queue_stat.h>
12 #include <cds/details/allocator.h>
13 #include <cds/details/trivial_assign.h>
14
15 namespace cds { namespace container {
16
17     /// Michael & Scott blocking queue with fine-grained synchronization schema
18     /** @ingroup cds_nonintrusive_queue
19         The queue has two different locks: one for reading and one for writing.
20         Therefore, one writer and one reader can simultaneously access to the queue.
21         The queue does not require any garbage collector.
22
23         <b>Source</b>
24             - [1998] Maged Michael, Michael Scott "Simple, fast, and practical non-blocking
25                 and blocking concurrent queue algorithms"
26
27         <b>Template arguments</b>
28         - \p T - type to be stored in the queue
29         - \p Options - options
30
31         \p Options are:
32         - opt::allocator - allocator (like \p std::allocator). Default is \ref CDS_DEFAULT_ALLOCATOR
33         - opt::lock_type - type of lock primitive. Default is cds::lock::Spin.
34         - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter
35         - opt::stat - the type to gather internal statistics.
36             Possible option value are: queue_stat, queue_dummy_stat, user-provided class that supports queue_stat interface.
37             Default is \ref intrusive::queue_dummy_stat.
38             <tt>RWQueue</tt> uses only \p onEnqueue and \p onDequeue counter.
39         - opt::alignment - the alignment for \p lock_type to prevent false sharing. Default is opt::cache_line_alignment
40
41         This queue has no intrusive counterpart.
42     */
43     template <typename T, typename... Options>
44     class RWQueue
45     {
46         //@cond
47         struct default_options
48         {
49             typedef lock::Spin  lock_type;
50             typedef CDS_DEFAULT_ALLOCATOR   allocator;
51             typedef atomicity::empty_item_counter item_counter;
52             typedef intrusive::queue_dummy_stat stat;
53             enum { alignment = opt::cache_line_alignment };
54         };
55         //@endcond
56
57     public:
58         //@cond
59         typedef typename opt::make_options<
60             typename cds::opt::find_type_traits< default_options, Options... >::type
61             ,Options...
62         >::type   options;
63         //@endcond
64
65     public:
66         /// Rebind template arguments
67         template <typename T2, typename... Options2>
68         struct rebind {
69             typedef RWQueue< T2, Options2...> other   ;   ///< Rebinding result
70         };
71
72     public:
73         typedef T   value_type  ;   ///< type of value stored in the queue
74
75         typedef typename options::lock_type lock_type   ;   ///< Locking primitive used
76
77     protected:
78         //@cond
79         /// Node type
80         struct node_type
81         {
82             node_type * volatile    m_pNext ;   ///< Pointer to the next node in queue
83             value_type              m_value ;   ///< Value stored in the node
84
85             node_type( value_type const& v )
86                 : m_pNext( nullptr )
87                 , m_value(v)
88             {}
89
90             node_type()
91                 : m_pNext( nullptr )
92             {}
93
94             template <typename... Args>
95             node_type( Args&&... args )
96                 : m_pNext( nullptr )
97                 , m_value( std::forward<Args>(args)...)
98             {}
99         };
100         //@endcond
101
102     public:
103         typedef typename options::allocator::template rebind<node_type>::other allocator_type   ; ///< Allocator type used for allocate/deallocate the queue nodes
104         typedef typename options::item_counter item_counter ;   ///< Item counting policy used
105         typedef typename options::stat      stat        ;   ///< Internal statistics policy used
106
107     protected:
108         //@cond
109         typedef typename opt::details::alignment_setter< lock_type, options::alignment >::type aligned_lock_type;
110         typedef std::unique_lock<lock_type>   auto_lock;
111         typedef cds::details::Allocator< node_type, allocator_type >  node_allocator;
112
113         item_counter    m_ItemCounter;
114         stat            m_Stat;
115
116         mutable aligned_lock_type   m_HeadLock;
117         node_type * m_pHead;
118         mutable aligned_lock_type   m_TailLock;
119         node_type * m_pTail;
120         //@endcond
121
122     protected:
123         //@cond
124         static node_type * alloc_node()
125         {
126             return node_allocator().New();
127         }
128
129         static node_type * alloc_node( T const& data )
130         {
131             return node_allocator().New( data );
132         }
133
134         template <typename... Args>
135         static node_type * alloc_node_move( Args&&... args )
136         {
137             return node_allocator().MoveNew( std::forward<Args>( args )... );
138         }
139
140         static void free_node( node_type * pNode )
141         {
142             node_allocator().Delete( pNode );
143         }
144
145         bool enqueue_node( node_type * p )
146         {
147             assert( p != nullptr );
148             {
149                 auto_lock lock( m_TailLock );
150                 m_pTail =
151                     m_pTail->m_pNext = p;
152             }
153             ++m_ItemCounter;
154             m_Stat.onEnqueue();
155             return true;
156         }
157
158         struct node_disposer {
159             void operator()( node_type * pNode )
160             {
161                 free_node( pNode );
162             }
163         };
164         typedef std::unique_ptr< node_type, node_disposer >     scoped_node_ptr;
165         //@endcond
166
167     public:
168         /// Makes empty queue
169         RWQueue()
170         {
171             node_type * pNode = alloc_node();
172             m_pHead =
173                 m_pTail = pNode;
174         }
175
176         /// Destructor clears queue
177         ~RWQueue()
178         {
179             clear();
180             assert( m_pHead == m_pTail );
181             free_node( m_pHead );
182         }
183
184         /// Enqueues \p data. Always return \a true
185         bool enqueue( value_type const& data )
186         {
187             scoped_node_ptr p( alloc_node( data ));
188             if ( enqueue_node( p.get() )) {
189                 p.release();
190                 return true;
191             }
192             return false;
193         }
194
195         /// Enqueues \p data to queue using copy functor
196         /**
197             \p Func is a functor called to copy value \p data of type \p Type
198             which may be differ from type \p T stored in the queue.
199             The functor's interface is:
200             \code
201             struct myFunctor {
202                 void operator()(T& dest, Type const& data)
203                 {
204                     // // Code to copy \p data to \p dest
205                     dest = data;
206                 }
207             };
208             \endcode
209             You may use \p boost:ref construction to pass functor \p f by reference.
210
211             <b>Requirements</b> The functor \p Func should not throw any exception.
212         */
213         template <typename Type, typename Func>
214         bool enqueue( Type const& data, Func f  )
215         {
216             scoped_node_ptr p( alloc_node());
217             f( p->m_value, data );
218             if ( enqueue_node( p.get() )) {
219                 p.release();
220                 return true;
221             }
222             return false;
223         }
224
225         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
226         template <typename... Args>
227         bool emplace( Args&&... args )
228         {
229             scoped_node_ptr p( alloc_node_move( std::forward<Args>(args)... ));
230             if ( enqueue_node( p.get() )) {
231                 p.release();
232                 return true;
233             }
234             return false;
235         }
236
237         /// Dequeues a value using copy functor
238         /**
239             \p Func is a functor called to copy dequeued value to \p dest of type \p Type
240             which may be differ from type \p T stored in the queue.
241             The functor's interface is:
242             \code
243             struct myFunctor {
244                 void operator()(Type& dest, T const& data)
245                 {
246                     // // Copy \p data to \p dest
247                     dest = data;
248                 }
249             };
250             \endcode
251             You may use \p boost:ref construction to pass functor \p f by reference.
252
253             <b>Requirements</b> The functor \p Func should not throw any exception.
254         */
255         template <typename Type, typename Func>
256         bool dequeue( Type& dest, Func f )
257         {
258             node_type * pNode;
259             {
260                 auto_lock lock( m_HeadLock );
261                 pNode = m_pHead;
262                 node_type * pNewHead = pNode->m_pNext;
263                 if ( pNewHead == nullptr )
264                     return false;
265                 f( dest, pNewHead->m_value );
266                 m_pHead = pNewHead;
267             }    // unlock here
268             --m_ItemCounter;
269             free_node( pNode );
270             m_Stat.onDequeue();
271             return true;
272         }
273
274         /** Dequeues a value to \p dest.
275
276             If queue is empty returns \a false, \p dest may be corrupted.
277             If queue is not empty returns \a true, \p dest contains the value dequeued
278         */
279         bool dequeue( value_type& dest )
280         {
281             typedef cds::details::trivial_assign<value_type, value_type> functor;
282             return dequeue( dest, functor() );
283         }
284
285         /// Synonym for \ref enqueue
286         bool push( value_type const& data )
287         {
288             return enqueue( data );
289         }
290
291         /// Synonym for template version of \ref enqueue function
292         template <typename Type, typename Func>
293         bool push( Type const& data, Func f  )
294         {
295             return enqueue( data, f );
296         }
297
298         /// Synonym for \ref dequeue
299         bool pop( value_type& data )
300         {
301             return dequeue( data );
302         }
303
304         /// Synonym for template version of \ref dequeue function
305         template <typename Type, typename Func>
306         bool pop( Type& dest, Func f )
307         {
308             return dequeue( dest, f );
309         }
310
311         /// Checks if queue is empty
312         bool empty() const
313         {
314             auto_lock lock( m_HeadLock );
315             return m_pHead->m_pNext == nullptr;
316         }
317
318         /// Clears queue
319         void clear()
320         {
321             auto_lock lockR( m_HeadLock );
322             auto_lock lockW( m_TailLock );
323             while ( m_pHead->m_pNext != nullptr ) {
324                 node_type * pHead = m_pHead;
325                 m_pHead = m_pHead->m_pNext;
326                 free_node( pHead );
327             }
328         }
329
330         /// Returns queue's item count
331         /**
332             The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
333             this function always returns 0.
334
335             <b>Warning</b>: even if you use real item counter and it returns 0, this fact is not mean that the queue
336             is empty. To check queue emptyness use \ref empty() method.
337         */
338         size_t    size() const
339         {
340             return m_ItemCounter.value();
341         }
342
343         /// Returns reference to internal statistics
344         stat const& statistics() const
345         {
346             return m_Stat;
347         }
348
349     };
350
351 }}  // namespace cds::container
352
353 #endif // #ifndef __CDS_CONTAINER_RWQUEUE_H