Move libcds 1.6.0 from SVN
[libcds.git] / cds / container / rwqueue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_CONTAINER_RWQUEUE_H
4 #define __CDS_CONTAINER_RWQUEUE_H
5
6 #include <cds/opt/options.h>
7 #include <cds/lock/spinlock.h>
8 #include <cds/intrusive/queue_stat.h>
9 #include <cds/details/allocator.h>
10 #include <cds/details/trivial_assign.h>
11 #include <cds/details/std/memory.h>
12 #include <cds/ref.h>
13
14 namespace cds { namespace container {
15
16     /// Michael & Scott blocking queue with fine-grained synchronization schema
17     /** @ingroup cds_nonintrusive_queue
18         The queue has two different locks: one for reading and one for writing.
19         Therefore, one writer and one reader can simultaneously access to the queue.
20         The queue does not require any garbage collector.
21
22         <b>Source</b>
23             - [1998] Maged Michael, Michael Scott "Simple, fast, and practical non-blocking
24                 and blocking concurrent queue algorithms"
25
26         <b>Template arguments</b>
27         - \p T - type to be stored in the queue
28         - \p Options - options
29
30         \p Options are:
31         - opt::allocator - allocator (like \p std::allocator). Default is \ref CDS_DEFAULT_ALLOCATOR
32         - opt::lock_type - type of lock primitive. Default is cds::lock::Spin.
33         - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter
34         - opt::stat - the type to gather internal statistics.
35             Possible option value are: queue_stat, queue_dummy_stat, user-provided class that supports queue_stat interface.
36             Default is \ref intrusive::queue_dummy_stat.
37             <tt>RWQueue</tt> uses only \p onEnqueue and \p onDequeue counter.
38         - opt::alignment - the alignment for \p lock_type to prevent false sharing. Default is opt::cache_line_alignment
39
40         This queue has no intrusive counterpart.
41     */
42     template <typename T, CDS_DECL_OPTIONS6>
43     class RWQueue
44     {
45         //@cond
46         struct default_options
47         {
48             typedef lock::Spin  lock_type;
49             typedef CDS_DEFAULT_ALLOCATOR   allocator;
50             typedef atomicity::empty_item_counter item_counter;
51             typedef intrusive::queue_dummy_stat stat;
52             enum { alignment = opt::cache_line_alignment };
53         };
54         //@endcond
55
56     public:
57         //@cond
58         typedef typename opt::make_options<
59             typename cds::opt::find_type_traits< default_options, CDS_OPTIONS6 >::type
60             ,CDS_OPTIONS6
61         >::type   options;
62         //@endcond
63
64     public:
65         /// Rebind template arguments
66         template <typename T2, CDS_DECL_OTHER_OPTIONS6>
67         struct rebind {
68             typedef RWQueue< T2, CDS_OTHER_OPTIONS6> other   ;   ///< Rebinding result
69         };
70
71     public:
72         typedef T   value_type  ;   ///< type of value stored in the queue
73
74         typedef typename options::lock_type lock_type   ;   ///< Locking primitive used
75
76     protected:
77         //@cond
78         /// Node type
79         struct node_type
80         {
81             node_type * volatile    m_pNext ;   ///< Pointer to the next node in queue
82             value_type              m_value ;   ///< Value stored in the node
83
84             node_type( value_type const& v )
85                 : m_pNext(null_ptr<node_type *>())
86                 , m_value(v)
87             {}
88
89             node_type()
90                 : m_pNext( null_ptr<node_type *>() )
91             {}
92
93 #       ifdef CDS_EMPLACE_SUPPORT
94             template <typename... Args>
95             node_type( Args&&... args )
96                 : m_pNext(null_ptr<node_type *>())
97                 , m_value( std::forward<Args>(args)...)
98             {}
99 #       endif
100         };
101         //@endcond
102
103     public:
104         typedef typename options::allocator::template rebind<node_type>::other allocator_type   ; ///< Allocator type used for allocate/deallocate the queue nodes
105         typedef typename options::item_counter item_counter ;   ///< Item counting policy used
106         typedef typename options::stat      stat        ;   ///< Internal statistics policy used
107
108     protected:
109         //@cond
110         typedef typename opt::details::alignment_setter< lock_type, options::alignment >::type aligned_lock_type;
111         typedef cds::lock::scoped_lock<lock_type>   auto_lock;
112         typedef cds::details::Allocator< node_type, allocator_type >  node_allocator;
113
114         item_counter    m_ItemCounter;
115         stat            m_Stat;
116
117         mutable aligned_lock_type   m_HeadLock;
118         node_type * m_pHead;
119         mutable aligned_lock_type   m_TailLock;
120         node_type * m_pTail;
121         //@endcond
122
123     protected:
124         //@cond
125         static node_type * alloc_node()
126         {
127             return node_allocator().New();
128         }
129
130         static node_type * alloc_node( T const& data )
131         {
132             return node_allocator().New( data );
133         }
134
135 #   ifdef CDS_EMPLACE_SUPPORT
136         template <typename... Args>
137         static node_type * alloc_node_move( Args&&... args )
138         {
139             return node_allocator().MoveNew( std::forward<Args>( args )... );
140         }
141 #   endif
142
143         static void free_node( node_type * pNode )
144         {
145             node_allocator().Delete( pNode );
146         }
147
148         bool enqueue_node( node_type * p )
149         {
150             assert( p != null_ptr<node_type *>());
151             {
152                 auto_lock lock( m_TailLock );
153                 m_pTail =
154                     m_pTail->m_pNext = p;
155             }
156             ++m_ItemCounter;
157             m_Stat.onEnqueue();
158             return true;
159         }
160
161         struct node_disposer {
162             void operator()( node_type * pNode )
163             {
164                 free_node( pNode );
165             }
166         };
167         typedef std::unique_ptr< node_type, node_disposer >     scoped_node_ptr;
168         //@endcond
169
170     public:
171         /// Makes empty queue
172         RWQueue()
173         {
174             node_type * pNode = alloc_node();
175             m_pHead =
176                 m_pTail = pNode;
177         }
178
179         /// Destructor clears queue
180         ~RWQueue()
181         {
182             clear();
183             assert( m_pHead == m_pTail );
184             free_node( m_pHead );
185         }
186
187         /// Enqueues \p data. Always return \a true
188         bool enqueue( value_type const& data )
189         {
190             scoped_node_ptr p( alloc_node( data ));
191             if ( enqueue_node( p.get() )) {
192                 p.release();
193                 return true;
194             }
195             return false;
196         }
197
198         /// Enqueues \p data to queue using copy functor
199         /**
200             \p Func is a functor called to copy value \p data of type \p Type
201             which may be differ from type \p T stored in the queue.
202             The functor's interface is:
203             \code
204             struct myFunctor {
205                 void operator()(T& dest, Type const& data)
206                 {
207                     // // Code to copy \p data to \p dest
208                     dest = data;
209                 }
210             };
211             \endcode
212             You may use \p boost:ref construction to pass functor \p f by reference.
213
214             <b>Requirements</b> The functor \p Func should not throw any exception.
215         */
216         template <typename Type, typename Func>
217         bool enqueue( Type const& data, Func f  )
218         {
219             scoped_node_ptr p( alloc_node());
220             unref(f)( p->m_value, data );
221             if ( enqueue_node( p.get() )) {
222                 p.release();
223                 return true;
224             }
225             return false;
226         }
227
228 #   ifdef CDS_EMPLACE_SUPPORT
229         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
230         /**
231             This function is available only for compiler that supports
232             variadic template and move semantics
233         */
234         template <typename... Args>
235         bool emplace( Args&&... args )
236         {
237             scoped_node_ptr p( alloc_node_move( std::forward<Args>(args)... ));
238             if ( enqueue_node( p.get() )) {
239                 p.release();
240                 return true;
241             }
242             return false;
243         }
244 #   endif
245
246         /// Dequeues a value using copy functor
247         /**
248             \p Func is a functor called to copy dequeued value to \p dest of type \p Type
249             which may be differ from type \p T stored in the queue.
250             The functor's interface is:
251             \code
252             struct myFunctor {
253                 void operator()(Type& dest, T const& data)
254                 {
255                     // // Copy \p data to \p dest
256                     dest = data;
257                 }
258             };
259             \endcode
260             You may use \p boost:ref construction to pass functor \p f by reference.
261
262             <b>Requirements</b> The functor \p Func should not throw any exception.
263         */
264         template <typename Type, typename Func>
265         bool dequeue( Type& dest, Func f )
266         {
267             node_type * pNode;
268             {
269                 auto_lock lock( m_HeadLock );
270                 pNode = m_pHead;
271                 node_type * pNewHead = pNode->m_pNext;
272                 if ( pNewHead == null_ptr<node_type *>() )
273                     return false;
274                 unref(f)( dest, pNewHead->m_value );
275                 m_pHead = pNewHead;
276             }    // unlock here
277             --m_ItemCounter;
278             free_node( pNode );
279             m_Stat.onDequeue();
280             return true;
281         }
282
283         /** Dequeues a value to \p dest.
284
285             If queue is empty returns \a false, \p dest may be corrupted.
286             If queue is not empty returns \a true, \p dest contains the value dequeued
287         */
288         bool dequeue( value_type& dest )
289         {
290             typedef cds::details::trivial_assign<value_type, value_type> functor;
291             return dequeue( dest, functor() );
292         }
293
294         /// Synonym for \ref enqueue
295         bool push( value_type const& data )
296         {
297             return enqueue( data );
298         }
299
300         /// Synonym for template version of \ref enqueue function
301         template <typename Type, typename Func>
302         bool push( Type const& data, Func f  )
303         {
304             return enqueue( data, f );
305         }
306
307         /// Synonym for \ref dequeue
308         bool pop( value_type& data )
309         {
310             return dequeue( data );
311         }
312
313         /// Synonym for template version of \ref dequeue function
314         template <typename Type, typename Func>
315         bool pop( Type& dest, Func f )
316         {
317             return dequeue( dest, f );
318         }
319
320         /// Checks if queue is empty
321         bool empty() const
322         {
323             auto_lock lock( m_HeadLock );
324             return m_pHead->m_pNext == null_ptr<node_type *>();
325         }
326
327         /// Clears queue
328         void clear()
329         {
330             auto_lock lockR( m_HeadLock );
331             auto_lock lockW( m_TailLock );
332             while ( m_pHead->m_pNext != null_ptr<node_type *>() ) {
333                 node_type * pHead = m_pHead;
334                 m_pHead = m_pHead->m_pNext;
335                 free_node( pHead );
336             }
337         }
338
339         /// Returns queue's item count
340         /**
341             The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
342             this function always returns 0.
343
344             <b>Warning</b>: even if you use real item counter and it returns 0, this fact is not mean that the queue
345             is empty. To check queue emptyness use \ref empty() method.
346         */
347         size_t    size() const
348         {
349             return m_ItemCounter.value();
350         }
351
352         /// Returns reference to internal statistics
353         stat const& statistics() const
354         {
355             return m_Stat;
356         }
357
358     };
359
360 }}  // namespace cds::container
361
362 #endif // #ifndef __CDS_CONTAINER_RWQUEUE_H