1622670751576e9b1301e60562afbd2bbab0f4fc
[libcds.git] / cds / container / rwqueue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_CONTAINER_RWQUEUE_H
4 #define __CDS_CONTAINER_RWQUEUE_H
5
6 #include <mutex>        // unique_lock
7 #include <cds/container/msqueue.h>
8 #include <cds/lock/spinlock.h>
9
10 namespace cds { namespace container {
11     /// RWQueue related definitions
12     /** @ingroup cds_nonintrusive_helper
13     */
14     namespace rwqueue {
15         /// RWQueue default type traits
16         struct traits
17         {
18             /// Lock policy
19             typedef cds::lock::Spin  lock_type;
20
21             /// Node allocator
22             typedef CDS_DEFAULT_ALLOCATOR   allocator;
23
24             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
25             typedef cds::atomicity::empty_item_counter item_counter;
26
27             /// Alignment of internal queue data. Default is \p opt::cache_line_alignment
28             enum { alignment = opt::cache_line_alignment };
29         };
30
31         /// Metafunction converting option list to \p rwqueue::traits
32         /**
33             Supported \p Options are:
34             - opt::lock_type - lock policy, default is \p cds::lock::Spin. Any type satisfied \p Mutex C++ concept may be used.
35             - opt::allocator - allocator (like \p std::allocator) used for allocating queue nodes. Default is \ref CDS_DEFAULT_ALLOCATOR
36             - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
37                 To enable item counting use \p cds::atomicity::item_counter.
38             - opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment
39             - opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
40                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
41
42             Example: declare mutex-based \p %RWQueue with item counting
43             \code
44             typedef cds::container::RWQueue< Foo, 
45                 typename cds::container::rwqueue::make_traits<
46                     cds::opt::item_counter< cds::atomicity::item_counter >,
47                     cds::opt::lock_type< std::mutex >
48                 >::type
49             > myQueue;
50             \endcode
51         */
52         template <typename... Options>
53         struct make_traits {
54 #   ifdef CDS_DOXYGEN_INVOKED
55             typedef implementation_defined type;   ///< Metafunction result
56 #   else
57             typedef typename cds::opt::make_options<
58                 typename cds::opt::find_type_traits< traits, Options... >::type
59                 , Options...
60             >::type type;
61 #   endif
62         };
63
64     } // namespace rwqueue
65
66     /// Michael & Scott blocking queue with fine-grained synchronization schema
67     /** @ingroup cds_nonintrusive_queue
68         The queue has two different locks: one for reading and one for writing.
69         Therefore, one writer and one reader can simultaneously access to the queue.
70         The queue does not require any garbage collector.
71
72         <b>Source</b>
73             - [1998] Maged Michael, Michael Scott "Simple, fast, and practical non-blocking
74                 and blocking concurrent queue algorithms"
75
76         <b>Template arguments</b>
77         - \p T - value type to be stored in the queue
78         - \p Traits - queue traits, default is \p rwqueue::traits. You can use \p rwqueue::make_traits
79             metafunction to make your traits or just derive your traits from \p %rwqueue::traits:
80             \code
81             struct myTraits: public cds::container::rwqueue::traits {
82                 typedef cds::atomicity::item_counter    item_counter;
83             };
84             typedef cds::container::RWQueue< Foo, myTraits > myQueue;
85
86             // Equivalent make_traits example:
87             typedef cds::container::RWQueue< Foo, 
88                 typename cds::container::rwqueue::make_traits< 
89                     cds::opt::item_counter< cds::atomicity::item_counter >
90                 >::type
91             > myQueue;
92             \endcode
93     */
94     template <typename T, typename Traits = rwqueue::traits >
95     class RWQueue
96     {
97     public:
98         /// Rebind template arguments
99         template <typename T2, typename Traits2>
100         struct rebind {
101             typedef RWQueue< T2, Traits2 > other   ;   ///< Rebinding result
102         };
103
104     public:
105         typedef T       value_type; ///< Type of value to be stored in the queue
106         typedef Traits  traits;     ///< Queue traits
107
108         typedef typename traits::lock_type  lock_type;      ///< Locking primitive
109         typedef typename traits::item_counter item_counter; ///< Item counting policy used
110         typedef typename traits::memory_model memory_model;   ///< Memory ordering. See \p cds::opt::memory_model option
111
112     protected:
113         //@cond
114         /// Node type
115         struct node_type
116         {
117             node_type * volatile    m_pNext ;   ///< Pointer to the next node in the queue
118             value_type              m_value ;   ///< Value stored in the node
119
120             node_type( value_type const& v )
121                 : m_pNext( nullptr )
122                 , m_value(v)
123             {}
124
125             node_type()
126                 : m_pNext( nullptr )
127             {}
128
129             template <typename... Args>
130             node_type( Args&&... args )
131                 : m_pNext( nullptr )
132                 , m_value( std::forward<Args>(args)...)
133             {}
134         };
135         //@endcond
136
137     public:
138         typedef typename traits::allocator::template rebind<node_type>::other allocator_type; ///< Allocator type used for allocate/deallocate the queue nodes
139
140     protected:
141         //@cond
142         typedef typename opt::details::alignment_setter< lock_type, traits::alignment >::type aligned_lock_type;
143         typedef std::unique_lock<lock_type> scoped_lock;
144         typedef cds::details::Allocator< node_type, allocator_type >  node_allocator;
145
146         item_counter    m_ItemCounter;
147
148         mutable aligned_lock_type   m_HeadLock;
149         node_type * m_pHead;
150         mutable aligned_lock_type   m_TailLock;
151         node_type * m_pTail;
152         //@endcond
153
154     protected:
155         //@cond
156         static node_type * alloc_node()
157         {
158             return node_allocator().New();
159         }
160
161         static node_type * alloc_node( T const& data )
162         {
163             return node_allocator().New( data );
164         }
165
166         template <typename... Args>
167         static node_type * alloc_node_move( Args&&... args )
168         {
169             return node_allocator().MoveNew( std::forward<Args>( args )... );
170         }
171
172         static void free_node( node_type * pNode )
173         {
174             node_allocator().Delete( pNode );
175         }
176
177         bool enqueue_node( node_type * p )
178         {
179             assert( p != nullptr );
180             {
181                 scoped_lock lock( m_TailLock );
182                 m_pTail =
183                     m_pTail->m_pNext = p;
184             }
185             ++m_ItemCounter;
186             return true;
187         }
188
189         struct node_disposer {
190             void operator()( node_type * pNode )
191             {
192                 free_node( pNode );
193             }
194         };
195         typedef std::unique_ptr< node_type, node_disposer >     scoped_node_ptr;
196         //@endcond
197
198     public:
199         /// Makes empty queue
200         RWQueue()
201         {
202             node_type * pNode = alloc_node();
203             m_pHead =
204                 m_pTail = pNode;
205         }
206
207         /// Destructor clears queue
208         ~RWQueue()
209         {
210             clear();
211             assert( m_pHead == m_pTail );
212             free_node( m_pHead );
213         }
214
215         /// Enqueues \p data. Always return \a true
216         bool enqueue( value_type const& data )
217         {
218             scoped_node_ptr p( alloc_node( data ));
219             if ( enqueue_node( p )) {
220                 p.release();
221                 return true;
222             }
223             return false;
224         }
225
226         /// Enqueues \p data to the queue using a functor
227         /**
228             \p Func is a functor called to create node.
229             The functor \p f takes one argument - a reference to a new node of type \ref value_type :
230             \code
231             cds::container::RWQueue< cds::gc::HP, Foo > myQueue;
232             Bar bar;
233             myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = bar; } );
234             \endcode
235         */
236         template <typename Func>
237         bool enqueue_with( Func f )
238         {
239             scoped_node_ptr p( alloc_node() );
240             f( p->m_value );
241             if ( enqueue_node( p ) ) {
242                 p.release();
243                 return true;
244             }
245             return false;
246         }
247
248         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
249         template <typename... Args>
250         bool emplace( Args&&... args )
251         {
252             scoped_node_ptr p( alloc_node_move( std::forward<Args>(args)... ));
253             if ( enqueue_node( p.get() )) {
254                 p.release();
255                 return true;
256             }
257             return false;
258         }
259
260         /// Synonym for \p enqueue() function
261         bool push( value_type const& val )
262         {
263             return enqueue( val );
264         }
265
266         /// Synonym for \p enqueue_with() function
267         template <typename Func>
268         bool push_with( Func f )
269         {
270             return enqueue_with( f );
271         }
272
273         /// Dequeues a value to \p dest.
274         /**
275             If queue is empty returns \a false, \p dest can be corrupted.
276             If queue is not empty returns \a true, \p dest contains the value dequeued
277         */
278         bool dequeue( value_type& dest )
279         {
280             return dequeue( [&dest]( value_type * src ) { dest = src; } );
281         }
282
283         /// Dequeues a value using a functor
284         /**
285             \p Func is a functor called to copy dequeued value.
286             The functor takes one argument - a reference to removed node:
287             \code
288             cds:container::RWQueue< cds::gc::HP, Foo > myQueue;
289             Bar bar;
290             myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
291             \endcode
292             The functor is called only if the queue is not empty.
293         */
294         template <typename Func>
295         bool dequeue_with( Func f )
296         {
297             node_type * pNode;
298             {
299                 scoped_lock lock( m_HeadLock );
300                 pNode = m_pHead;
301                 node_type * pNewHead = pNode->m_pNext;
302                 if ( pNewHead == nullptr )
303                     return false;
304                 f( pNewHead->m_value );
305                 m_pHead = pNewHead;
306             }    // unlock here
307             --m_ItemCounter;
308             free_node( pNode );
309             return true;
310         }
311
312         /// Synonym for \p dequeue() function
313         bool pop( value_type& dest )
314         {
315             return dequeue( dest );
316         }
317
318         /// Synonym for \p dequeue_with() function
319         template <typename Func>
320         bool pop_with( Func f )
321         {
322             return dequeue_with( f );
323         }
324
325         /// Checks if queue is empty
326         bool empty() const
327         {
328             scoped_lock lock( m_HeadLock );
329             return m_pHead->m_pNext == nullptr;
330         }
331
332         /// Clears queue
333         void clear()
334         {
335             scoped_lock lockR( m_HeadLock );
336             scoped_lock lockW( m_TailLock );
337             while ( m_pHead->m_pNext != nullptr ) {
338                 node_type * pHead = m_pHead;
339                 m_pHead = m_pHead->m_pNext;
340                 free_node( pHead );
341             }
342         }
343
344         /// Returns queue's item count
345         /**
346             The value returned depends on \p rwqueue::traits::item_counter. For \p atomicity::empty_item_counter,
347             this function always returns 0.
348
349             @note Even if you use real item counter and it returns 0, this fact is not mean that the queue
350             is empty. To check queue emptyness use \p empty() method.
351         */
352         size_t    size() const
353         {
354             return m_ItemCounter.value();
355         }
356
357         //@cond
358         /// Returns reference to internal statistics
359         cds::container::msqueue::empty_stat statistics() const
360         {
361             return cds::container::msqueue::empty_stat();
362         }
363         //@endcond
364     };
365
366 }}  // namespace cds::container
367
368 #endif // #ifndef __CDS_CONTAINER_RWQUEUE_H