4b687dc9fab0a7920717dadc5d906b5cf345606b
[libcds.git] / cds / container / rwqueue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_CONTAINER_RWQUEUE_H
4 #define __CDS_CONTAINER_RWQUEUE_H
5
6 #include <memory>
7 #include <cds/opt/options.h>
8 #include <cds/lock/spinlock.h>
9 #include <cds/intrusive/details/queue_stat.h>
10 #include <cds/details/allocator.h>
11 #include <cds/details/trivial_assign.h>
12 #include <cds/ref.h>
13
14 namespace cds { namespace container {
15
16     /// Michael & Scott blocking queue with fine-grained synchronization schema
17     /** @ingroup cds_nonintrusive_queue
18         The queue has two different locks: one for reading and one for writing.
19         Therefore, one writer and one reader can simultaneously access to the queue.
20         The queue does not require any garbage collector.
21
22         <b>Source</b>
23             - [1998] Maged Michael, Michael Scott "Simple, fast, and practical non-blocking
24                 and blocking concurrent queue algorithms"
25
26         <b>Template arguments</b>
27         - \p T - type to be stored in the queue
28         - \p Options - options
29
30         \p Options are:
31         - opt::allocator - allocator (like \p std::allocator). Default is \ref CDS_DEFAULT_ALLOCATOR
32         - opt::lock_type - type of lock primitive. Default is cds::lock::Spin.
33         - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter
34         - opt::stat - the type to gather internal statistics.
35             Possible option value are: queue_stat, queue_dummy_stat, user-provided class that supports queue_stat interface.
36             Default is \ref intrusive::queue_dummy_stat.
37             <tt>RWQueue</tt> uses only \p onEnqueue and \p onDequeue counter.
38         - opt::alignment - the alignment for \p lock_type to prevent false sharing. Default is opt::cache_line_alignment
39
40         This queue has no intrusive counterpart.
41     */
42     template <typename T, typename... Options>
43     class RWQueue
44     {
45         //@cond
46         struct default_options
47         {
48             typedef lock::Spin  lock_type;
49             typedef CDS_DEFAULT_ALLOCATOR   allocator;
50             typedef atomicity::empty_item_counter item_counter;
51             typedef intrusive::queue_dummy_stat stat;
52             enum { alignment = opt::cache_line_alignment };
53         };
54         //@endcond
55
56     public:
57         //@cond
58         typedef typename opt::make_options<
59             typename cds::opt::find_type_traits< default_options, Options... >::type
60             ,Options...
61         >::type   options;
62         //@endcond
63
64     public:
65         /// Rebind template arguments
66         template <typename T2, typename... Options2>
67         struct rebind {
68             typedef RWQueue< T2, Options2...> other   ;   ///< Rebinding result
69         };
70
71     public:
72         typedef T   value_type  ;   ///< type of value stored in the queue
73
74         typedef typename options::lock_type lock_type   ;   ///< Locking primitive used
75
76     protected:
77         //@cond
78         /// Node type
79         struct node_type
80         {
81             node_type * volatile    m_pNext ;   ///< Pointer to the next node in queue
82             value_type              m_value ;   ///< Value stored in the node
83
84             node_type( value_type const& v )
85                 : m_pNext( nullptr )
86                 , m_value(v)
87             {}
88
89             node_type()
90                 : m_pNext( nullptr )
91             {}
92
93             template <typename... Args>
94             node_type( Args&&... args )
95                 : m_pNext( nullptr )
96                 , m_value( std::forward<Args>(args)...)
97             {}
98         };
99         //@endcond
100
101     public:
102         typedef typename options::allocator::template rebind<node_type>::other allocator_type   ; ///< Allocator type used for allocate/deallocate the queue nodes
103         typedef typename options::item_counter item_counter ;   ///< Item counting policy used
104         typedef typename options::stat      stat        ;   ///< Internal statistics policy used
105
106     protected:
107         //@cond
108         typedef typename opt::details::alignment_setter< lock_type, options::alignment >::type aligned_lock_type;
109         typedef cds::lock::scoped_lock<lock_type>   auto_lock;
110         typedef cds::details::Allocator< node_type, allocator_type >  node_allocator;
111
112         item_counter    m_ItemCounter;
113         stat            m_Stat;
114
115         mutable aligned_lock_type   m_HeadLock;
116         node_type * m_pHead;
117         mutable aligned_lock_type   m_TailLock;
118         node_type * m_pTail;
119         //@endcond
120
121     protected:
122         //@cond
123         static node_type * alloc_node()
124         {
125             return node_allocator().New();
126         }
127
128         static node_type * alloc_node( T const& data )
129         {
130             return node_allocator().New( data );
131         }
132
133         template <typename... Args>
134         static node_type * alloc_node_move( Args&&... args )
135         {
136             return node_allocator().MoveNew( std::forward<Args>( args )... );
137         }
138
139         static void free_node( node_type * pNode )
140         {
141             node_allocator().Delete( pNode );
142         }
143
144         bool enqueue_node( node_type * p )
145         {
146             assert( p != nullptr );
147             {
148                 auto_lock lock( m_TailLock );
149                 m_pTail =
150                     m_pTail->m_pNext = p;
151             }
152             ++m_ItemCounter;
153             m_Stat.onEnqueue();
154             return true;
155         }
156
157         struct node_disposer {
158             void operator()( node_type * pNode )
159             {
160                 free_node( pNode );
161             }
162         };
163         typedef std::unique_ptr< node_type, node_disposer >     scoped_node_ptr;
164         //@endcond
165
166     public:
167         /// Makes empty queue
168         RWQueue()
169         {
170             node_type * pNode = alloc_node();
171             m_pHead =
172                 m_pTail = pNode;
173         }
174
175         /// Destructor clears queue
176         ~RWQueue()
177         {
178             clear();
179             assert( m_pHead == m_pTail );
180             free_node( m_pHead );
181         }
182
183         /// Enqueues \p data. Always return \a true
184         bool enqueue( value_type const& data )
185         {
186             scoped_node_ptr p( alloc_node( data ));
187             if ( enqueue_node( p.get() )) {
188                 p.release();
189                 return true;
190             }
191             return false;
192         }
193
194         /// Enqueues \p data to queue using copy functor
195         /**
196             \p Func is a functor called to copy value \p data of type \p Type
197             which may be differ from type \p T stored in the queue.
198             The functor's interface is:
199             \code
200             struct myFunctor {
201                 void operator()(T& dest, Type const& data)
202                 {
203                     // // Code to copy \p data to \p dest
204                     dest = data;
205                 }
206             };
207             \endcode
208             You may use \p boost:ref construction to pass functor \p f by reference.
209
210             <b>Requirements</b> The functor \p Func should not throw any exception.
211         */
212         template <typename Type, typename Func>
213         bool enqueue( Type const& data, Func f  )
214         {
215             scoped_node_ptr p( alloc_node());
216             unref(f)( p->m_value, data );
217             if ( enqueue_node( p.get() )) {
218                 p.release();
219                 return true;
220             }
221             return false;
222         }
223
224         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
225         template <typename... Args>
226         bool emplace( Args&&... args )
227         {
228             scoped_node_ptr p( alloc_node_move( std::forward<Args>(args)... ));
229             if ( enqueue_node( p.get() )) {
230                 p.release();
231                 return true;
232             }
233             return false;
234         }
235
236         /// Dequeues a value using copy functor
237         /**
238             \p Func is a functor called to copy dequeued value to \p dest of type \p Type
239             which may be differ from type \p T stored in the queue.
240             The functor's interface is:
241             \code
242             struct myFunctor {
243                 void operator()(Type& dest, T const& data)
244                 {
245                     // // Copy \p data to \p dest
246                     dest = data;
247                 }
248             };
249             \endcode
250             You may use \p boost:ref construction to pass functor \p f by reference.
251
252             <b>Requirements</b> The functor \p Func should not throw any exception.
253         */
254         template <typename Type, typename Func>
255         bool dequeue( Type& dest, Func f )
256         {
257             node_type * pNode;
258             {
259                 auto_lock lock( m_HeadLock );
260                 pNode = m_pHead;
261                 node_type * pNewHead = pNode->m_pNext;
262                 if ( pNewHead == nullptr )
263                     return false;
264                 unref(f)( dest, pNewHead->m_value );
265                 m_pHead = pNewHead;
266             }    // unlock here
267             --m_ItemCounter;
268             free_node( pNode );
269             m_Stat.onDequeue();
270             return true;
271         }
272
273         /** Dequeues a value to \p dest.
274
275             If queue is empty returns \a false, \p dest may be corrupted.
276             If queue is not empty returns \a true, \p dest contains the value dequeued
277         */
278         bool dequeue( value_type& dest )
279         {
280             typedef cds::details::trivial_assign<value_type, value_type> functor;
281             return dequeue( dest, functor() );
282         }
283
284         /// Synonym for \ref enqueue
285         bool push( value_type const& data )
286         {
287             return enqueue( data );
288         }
289
290         /// Synonym for template version of \ref enqueue function
291         template <typename Type, typename Func>
292         bool push( Type const& data, Func f  )
293         {
294             return enqueue( data, f );
295         }
296
297         /// Synonym for \ref dequeue
298         bool pop( value_type& data )
299         {
300             return dequeue( data );
301         }
302
303         /// Synonym for template version of \ref dequeue function
304         template <typename Type, typename Func>
305         bool pop( Type& dest, Func f )
306         {
307             return dequeue( dest, f );
308         }
309
310         /// Checks if queue is empty
311         bool empty() const
312         {
313             auto_lock lock( m_HeadLock );
314             return m_pHead->m_pNext == nullptr;
315         }
316
317         /// Clears queue
318         void clear()
319         {
320             auto_lock lockR( m_HeadLock );
321             auto_lock lockW( m_TailLock );
322             while ( m_pHead->m_pNext != nullptr ) {
323                 node_type * pHead = m_pHead;
324                 m_pHead = m_pHead->m_pNext;
325                 free_node( pHead );
326             }
327         }
328
329         /// Returns queue's item count
330         /**
331             The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
332             this function always returns 0.
333
334             <b>Warning</b>: even if you use real item counter and it returns 0, this fact is not mean that the queue
335             is empty. To check queue emptyness use \ref empty() method.
336         */
337         size_t    size() const
338         {
339             return m_ItemCounter.value();
340         }
341
342         /// Returns reference to internal statistics
343         stat const& statistics() const
344         {
345             return m_Stat;
346         }
347
348     };
349
350 }}  // namespace cds::container
351
352 #endif // #ifndef __CDS_CONTAINER_RWQUEUE_H