55185f811e1954b58b80f005c473918f39f99050
[libcds.git] / cds / container / rwqueue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_CONTAINER_RWQUEUE_H
4 #define __CDS_CONTAINER_RWQUEUE_H
5
6 #include <mutex>        // unique_lock
7 #include <cds/container/msqueue.h>
8 #include <cds/lock/spinlock.h>
9
10 namespace cds { namespace container {
11     /// RWQueue related definitions
12     /** @ingroup cds_nonintrusive_helper
13     */
14     namespace rwqueue {
15         /// RWQueue default type traits
16         struct traits
17         {
18             /// Lock policy
19             typedef cds::lock::Spin  lock_type;
20
21             /// Node allocator
22             typedef CDS_DEFAULT_ALLOCATOR   allocator;
23
24             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
25             typedef cds::atomicity::empty_item_counter item_counter;
26
27             /// Alignment of internal queue data. Default is \p opt::cache_line_alignment
28             enum { alignment = opt::cache_line_alignment };
29         };
30
31         /// Metafunction converting option list to \p rwqueue::traits
32         /**
33             Supported \p Options are:
34             - opt::lock_type - lock policy, default is \p cds::lock::Spin. Any type satisfied \p Mutex C++ concept may be used.
35             - opt::allocator - allocator (like \p std::allocator) used for allocating queue nodes. Default is \ref CDS_DEFAULT_ALLOCATOR
36             - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
37                 To enable item counting use \p cds::atomicity::item_counter.
38             - opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment
39
40             Example: declare mutex-based \p %RWQueue with item counting
41             \code
42             typedef cds::container::RWQueue< Foo,
43                 typename cds::container::rwqueue::make_traits<
44                     cds::opt::item_counter< cds::atomicity::item_counter >,
45                     cds::opt::lock_type< std::mutex >
46                 >::type
47             > myQueue;
48             \endcode
49         */
50         template <typename... Options>
51         struct make_traits {
52 #   ifdef CDS_DOXYGEN_INVOKED
53             typedef implementation_defined type;   ///< Metafunction result
54 #   else
55             typedef typename cds::opt::make_options<
56                 typename cds::opt::find_type_traits< traits, Options... >::type
57                 , Options...
58             >::type type;
59 #   endif
60         };
61
62     } // namespace rwqueue
63
64     /// Michael & Scott blocking queue with fine-grained synchronization schema
65     /** @ingroup cds_nonintrusive_queue
66         The queue has two different locks: one for reading and one for writing.
67         Therefore, one writer and one reader can simultaneously access to the queue.
68         The queue does not require any garbage collector.
69
70         <b>Source</b>
71             - [1998] Maged Michael, Michael Scott "Simple, fast, and practical non-blocking
72                 and blocking concurrent queue algorithms"
73
74         <b>Template arguments</b>
75         - \p T - value type to be stored in the queue
76         - \p Traits - queue traits, default is \p rwqueue::traits. You can use \p rwqueue::make_traits
77             metafunction to make your traits or just derive your traits from \p %rwqueue::traits:
78             \code
79             struct myTraits: public cds::container::rwqueue::traits {
80                 typedef cds::atomicity::item_counter    item_counter;
81             };
82             typedef cds::container::RWQueue< Foo, myTraits > myQueue;
83
84             // Equivalent make_traits example:
85             typedef cds::container::RWQueue< Foo,
86                 typename cds::container::rwqueue::make_traits<
87                     cds::opt::item_counter< cds::atomicity::item_counter >
88                 >::type
89             > myQueue;
90             \endcode
91     */
92     template <typename T, typename Traits = rwqueue::traits >
93     class RWQueue
94     {
95     public:
96         /// Rebind template arguments
97         template <typename T2, typename Traits2>
98         struct rebind {
99             typedef RWQueue< T2, Traits2 > other   ;   ///< Rebinding result
100         };
101
102     public:
103         typedef T       value_type; ///< Type of value to be stored in the queue
104         typedef Traits  traits;     ///< Queue traits
105
106         typedef typename traits::lock_type  lock_type;      ///< Locking primitive
107         typedef typename traits::item_counter item_counter; ///< Item counting policy used
108
109     protected:
110         //@cond
111         /// Node type
112         struct node_type
113         {
114             node_type * volatile    m_pNext ;   ///< Pointer to the next node in the queue
115             value_type              m_value ;   ///< Value stored in the node
116
117             node_type( value_type const& v )
118                 : m_pNext( nullptr )
119                 , m_value(v)
120             {}
121
122             node_type()
123                 : m_pNext( nullptr )
124             {}
125
126             template <typename... Args>
127             node_type( Args&&... args )
128                 : m_pNext( nullptr )
129                 , m_value( std::forward<Args>(args)...)
130             {}
131         };
132         //@endcond
133
134     public:
135         typedef typename traits::allocator::template rebind<node_type>::other allocator_type; ///< Allocator type used for allocate/deallocate the queue nodes
136
137     protected:
138         //@cond
139         typedef typename opt::details::alignment_setter< lock_type, traits::alignment >::type aligned_lock_type;
140         typedef std::unique_lock<lock_type> scoped_lock;
141         typedef cds::details::Allocator< node_type, allocator_type >  node_allocator;
142
143         item_counter    m_ItemCounter;
144
145         mutable aligned_lock_type   m_HeadLock;
146         node_type * m_pHead;
147         mutable aligned_lock_type   m_TailLock;
148         node_type * m_pTail;
149         //@endcond
150
151     protected:
152         //@cond
153         static node_type * alloc_node()
154         {
155             return node_allocator().New();
156         }
157
158         static node_type * alloc_node( T const& data )
159         {
160             return node_allocator().New( data );
161         }
162
163         template <typename... Args>
164         static node_type * alloc_node_move( Args&&... args )
165         {
166             return node_allocator().MoveNew( std::forward<Args>( args )... );
167         }
168
169         static void free_node( node_type * pNode )
170         {
171             node_allocator().Delete( pNode );
172         }
173
174         bool enqueue_node( node_type * p )
175         {
176             assert( p != nullptr );
177             {
178                 scoped_lock lock( m_TailLock );
179                 m_pTail =
180                     m_pTail->m_pNext = p;
181             }
182             ++m_ItemCounter;
183             return true;
184         }
185
186         struct node_disposer {
187             void operator()( node_type * pNode )
188             {
189                 free_node( pNode );
190             }
191         };
192         typedef std::unique_ptr< node_type, node_disposer >     scoped_node_ptr;
193         //@endcond
194
195     public:
196         /// Makes empty queue
197         RWQueue()
198         {
199             node_type * pNode = alloc_node();
200             m_pHead =
201                 m_pTail = pNode;
202         }
203
204         /// Destructor clears queue
205         ~RWQueue()
206         {
207             clear();
208             assert( m_pHead == m_pTail );
209             free_node( m_pHead );
210         }
211
212         /// Enqueues \p data. Always return \a true
213         bool enqueue( value_type const& data )
214         {
215             scoped_node_ptr p( alloc_node( data ));
216             if ( enqueue_node( p.get() )) {
217                 p.release();
218                 return true;
219             }
220             return false;
221         }
222
223         /// Enqueues \p data to the queue using a functor
224         /**
225             \p Func is a functor called to create node.
226             The functor \p f takes one argument - a reference to a new node of type \ref value_type :
227             \code
228             cds::container::RWQueue< cds::gc::HP, Foo > myQueue;
229             Bar bar;
230             myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = bar; } );
231             \endcode
232         */
233         template <typename Func>
234         bool enqueue_with( Func f )
235         {
236             scoped_node_ptr p( alloc_node() );
237             f( p->m_value );
238             if ( enqueue_node( p.get() )) {
239                 p.release();
240                 return true;
241             }
242             return false;
243         }
244
245         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
246         template <typename... Args>
247         bool emplace( Args&&... args )
248         {
249             scoped_node_ptr p( alloc_node_move( std::forward<Args>(args)... ));
250             if ( enqueue_node( p.get() )) {
251                 p.release();
252                 return true;
253             }
254             return false;
255         }
256
257         /// Synonym for \p enqueue() function
258         bool push( value_type const& val )
259         {
260             return enqueue( val );
261         }
262
263         /// Synonym for \p enqueue_with() function
264         template <typename Func>
265         bool push_with( Func f )
266         {
267             return enqueue_with( f );
268         }
269
270         /// Dequeues a value to \p dest.
271         /**
272             If queue is empty returns \a false, \p dest can be corrupted.
273             If queue is not empty returns \a true, \p dest contains the value dequeued
274         */
275         bool dequeue( value_type& dest )
276         {
277             return dequeue_with( [&dest]( value_type& src ) { dest = src; } );
278         }
279
280         /// Dequeues a value using a functor
281         /**
282             \p Func is a functor called to copy dequeued value.
283             The functor takes one argument - a reference to removed node:
284             \code
285             cds:container::RWQueue< cds::gc::HP, Foo > myQueue;
286             Bar bar;
287             myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
288             \endcode
289             The functor is called only if the queue is not empty.
290         */
291         template <typename Func>
292         bool dequeue_with( Func f )
293         {
294             node_type * pNode;
295             {
296                 scoped_lock lock( m_HeadLock );
297                 pNode = m_pHead;
298                 node_type * pNewHead = pNode->m_pNext;
299                 if ( pNewHead == nullptr )
300                     return false;
301                 f( pNewHead->m_value );
302                 m_pHead = pNewHead;
303             }    // unlock here
304             --m_ItemCounter;
305             free_node( pNode );
306             return true;
307         }
308
309         /// Synonym for \p dequeue() function
310         bool pop( value_type& dest )
311         {
312             return dequeue( dest );
313         }
314
315         /// Synonym for \p dequeue_with() function
316         template <typename Func>
317         bool pop_with( Func f )
318         {
319             return dequeue_with( f );
320         }
321
322         /// Checks if queue is empty
323         bool empty() const
324         {
325             scoped_lock lock( m_HeadLock );
326             return m_pHead->m_pNext == nullptr;
327         }
328
329         /// Clears queue
330         void clear()
331         {
332             scoped_lock lockR( m_HeadLock );
333             scoped_lock lockW( m_TailLock );
334             while ( m_pHead->m_pNext != nullptr ) {
335                 node_type * pHead = m_pHead;
336                 m_pHead = m_pHead->m_pNext;
337                 free_node( pHead );
338             }
339         }
340
341         /// Returns queue's item count
342         /**
343             The value returned depends on \p rwqueue::traits::item_counter. For \p atomicity::empty_item_counter,
344             this function always returns 0.
345
346             @note Even if you use real item counter and it returns 0, this fact is not mean that the queue
347             is empty. To check queue emptyness use \p empty() method.
348         */
349         size_t    size() const
350         {
351             return m_ItemCounter.value();
352         }
353
354         //@cond
355         /// Returns reference to internal statistics
356         cds::container::msqueue::empty_stat statistics() const
357         {
358             return cds::container::msqueue::empty_stat();
359         }
360         //@endcond
361     };
362
363 }}  // namespace cds::container
364
365 #endif // #ifndef __CDS_CONTAINER_RWQUEUE_H