c25d18e42051c37c793b148ae908bde0177ae7be
[libcds.git] / cds / container / rwqueue.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_CONTAINER_RWQUEUE_H
4 #define CDSLIB_CONTAINER_RWQUEUE_H
5
6 #include <mutex>        // unique_lock
7 #include <cds/sync/spinlock.h>
8 #include <cds/opt/options.h>
9 #include <cds/details/allocator.h>
10
11 namespace cds { namespace container {
12     /// RWQueue related definitions
13     /** @ingroup cds_nonintrusive_helper
14     */
15     namespace rwqueue {
16         /// RWQueue default type traits
17         struct traits
18         {
19             /// Lock policy
20             typedef cds::sync::spin  lock_type;
21
22             /// Node allocator
23             typedef CDS_DEFAULT_ALLOCATOR   allocator;
24
25             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
26             typedef cds::atomicity::empty_item_counter item_counter;
27
28             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
29             enum { padding = opt::cache_line_padding };
30         };
31
32         /// Metafunction converting option list to \p rwqueue::traits
33         /**
34             Supported \p Options are:
35             - opt::lock_type - lock policy, default is \p cds::sync::spin. Any type satisfied \p Mutex C++ concept may be used.
36             - opt::allocator - allocator (like \p std::allocator) used for allocating queue nodes. Default is \ref CDS_DEFAULT_ALLOCATOR
37             - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
38                 To enable item counting use \p cds::atomicity::item_counter.
39             - \p opt::padding - padding for internal critical data. Default is \p opt::cache_line_padding
40
41             Example: declare mutex-based \p %RWQueue with item counting
42             \code
43             typedef cds::container::RWQueue< Foo,
44                 typename cds::container::rwqueue::make_traits<
45                     cds::opt::item_counter< cds::atomicity::item_counter >,
46                     cds::opt::lock_type< std::mutex >
47                 >::type
48             > myQueue;
49             \endcode
50         */
51         template <typename... Options>
52         struct make_traits {
53 #   ifdef CDS_DOXYGEN_INVOKED
54             typedef implementation_defined type;   ///< Metafunction result
55 #   else
56             typedef typename cds::opt::make_options<
57                 typename cds::opt::find_type_traits< traits, Options... >::type
58                 , Options...
59             >::type type;
60 #   endif
61         };
62
63     } // namespace rwqueue
64
65     /// Michael & Scott blocking queue with fine-grained synchronization schema
66     /** @ingroup cds_nonintrusive_queue
67         The queue has two different locks: one for reading and one for writing.
68         Therefore, one writer and one reader can simultaneously access to the queue.
69         The queue does not require any garbage collector.
70
71         <b>Source</b>
72             - [1998] Maged Michael, Michael Scott "Simple, fast, and practical non-blocking
73                 and blocking concurrent queue algorithms"
74
75         <b>Template arguments</b>
76         - \p T - value type to be stored in the queue
77         - \p Traits - queue traits, default is \p rwqueue::traits. You can use \p rwqueue::make_traits
78             metafunction to make your traits or just derive your traits from \p %rwqueue::traits:
79             \code
80             struct myTraits: public cds::container::rwqueue::traits {
81                 typedef cds::atomicity::item_counter    item_counter;
82             };
83             typedef cds::container::RWQueue< Foo, myTraits > myQueue;
84
85             // Equivalent make_traits example:
86             typedef cds::container::RWQueue< Foo,
87                 typename cds::container::rwqueue::make_traits<
88                     cds::opt::item_counter< cds::atomicity::item_counter >
89                 >::type
90             > myQueue;
91             \endcode
92     */
93     template <typename T, typename Traits = rwqueue::traits >
94     class RWQueue
95     {
96     public:
97         /// Rebind template arguments
98         template <typename T2, typename Traits2>
99         struct rebind {
100             typedef RWQueue< T2, Traits2 > other   ;   ///< Rebinding result
101         };
102
103     public:
104         typedef T       value_type; ///< Type of value to be stored in the queue
105         typedef Traits  traits;     ///< Queue traits
106
107         typedef typename traits::lock_type  lock_type;      ///< Locking primitive
108         typedef typename traits::item_counter item_counter; ///< Item counting policy used
109
110     protected:
111         //@cond
112         /// Node type
113         struct node_type
114         {
115             atomics::atomic< node_type *> m_pNext;  ///< Pointer to the next node in the queue
116             value_type              m_value;        ///< Value stored in the node
117
118             node_type( value_type const& v )
119                 : m_pNext( nullptr )
120                 , m_value(v)
121             {}
122
123             node_type()
124                 : m_pNext( nullptr )
125             {}
126
127             template <typename... Args>
128             node_type( Args&&... args )
129                 : m_pNext( nullptr )
130                 , m_value( std::forward<Args>(args)...)
131             {}
132         };
133         //@endcond
134
135     public:
136         typedef typename traits::allocator::template rebind<node_type>::other allocator_type; ///< Allocator type used for allocate/deallocate the queue nodes
137
138     protected:
139         //@cond
140         typedef std::unique_lock<lock_type> scoped_lock;
141         typedef cds::details::Allocator< node_type, allocator_type >  node_allocator;
142
143         struct head_type {
144             mutable lock_type lock;
145             node_type *       ptr;
146         };
147
148         head_type m_Head;
149         typename opt::details::apply_padding< head_type, traits::padding >::padding_type pad_;
150         head_type m_Tail;
151
152         item_counter    m_ItemCounter;
153         //@endcond
154
155     protected:
156         //@cond
157         static node_type * alloc_node()
158         {
159             return node_allocator().New();
160         }
161
162         static node_type * alloc_node( T const& data )
163         {
164             return node_allocator().New( data );
165         }
166
167         template <typename... Args>
168         static node_type * alloc_node_move( Args&&... args )
169         {
170             return node_allocator().MoveNew( std::forward<Args>( args )... );
171         }
172
173         static void free_node( node_type * pNode )
174         {
175             node_allocator().Delete( pNode );
176         }
177
178         bool enqueue_node( node_type * p )
179         {
180             assert( p != nullptr );
181             {
182                 scoped_lock lock( m_Tail.lock );
183                 m_Tail.ptr->m_pNext.store( p, atomics::memory_order_release );
184                 m_Tail.ptr = p;
185             }
186             ++m_ItemCounter;
187             return true;
188         }
189
190         struct node_disposer {
191             void operator()( node_type * pNode )
192             {
193                 free_node( pNode );
194             }
195         };
196         typedef std::unique_ptr< node_type, node_disposer >     scoped_node_ptr;
197         //@endcond
198
199     public:
200         /// Makes empty queue
201         RWQueue()
202         {
203             node_type * pNode = alloc_node();
204             m_Head.ptr =
205                 m_Tail.ptr = pNode;
206         }
207
208         /// Destructor clears queue
209         ~RWQueue()
210         {
211             clear();
212             assert( m_Head.ptr == m_Tail.ptr );
213             free_node( m_Head.ptr );
214         }
215
216         /// Enqueues \p data. Always return \a true
217         bool enqueue( value_type const& data )
218         {
219             scoped_node_ptr p( alloc_node( data ));
220             if ( enqueue_node( p.get() )) {
221                 p.release();
222                 return true;
223             }
224             return false;
225         }
226
227         /// Enqueues \p data to the queue using a functor
228         /**
229             \p Func is a functor called to create node.
230             The functor \p f takes one argument - a reference to a new node of type \ref value_type :
231             \code
232             cds::container::RWQueue< cds::gc::HP, Foo > myQueue;
233             Bar bar;
234             myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = bar; } );
235             \endcode
236         */
237         template <typename Func>
238         bool enqueue_with( Func f )
239         {
240             scoped_node_ptr p( alloc_node() );
241             f( p->m_value );
242             if ( enqueue_node( p.get() )) {
243                 p.release();
244                 return true;
245             }
246             return false;
247         }
248
249         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
250         template <typename... Args>
251         bool emplace( Args&&... args )
252         {
253             scoped_node_ptr p( alloc_node_move( std::forward<Args>(args)... ));
254             if ( enqueue_node( p.get() )) {
255                 p.release();
256                 return true;
257             }
258             return false;
259         }
260
261         /// Synonym for \p enqueue() function
262         bool push( value_type const& val )
263         {
264             return enqueue( val );
265         }
266
267         /// Synonym for \p enqueue_with() function
268         template <typename Func>
269         bool push_with( Func f )
270         {
271             return enqueue_with( f );
272         }
273
274         /// Dequeues a value to \p dest.
275         /**
276             If queue is empty returns \a false, \p dest can be corrupted.
277             If queue is not empty returns \a true, \p dest contains the value dequeued
278         */
279         bool dequeue( value_type& dest )
280         {
281             return dequeue_with( [&dest]( value_type& src ) { dest = src; } );
282         }
283
284         /// Dequeues a value using a functor
285         /**
286             \p Func is a functor called to copy dequeued value.
287             The functor takes one argument - a reference to removed node:
288             \code
289             cds:container::RWQueue< cds::gc::HP, Foo > myQueue;
290             Bar bar;
291             myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
292             \endcode
293             The functor is called only if the queue is not empty.
294         */
295         template <typename Func>
296         bool dequeue_with( Func f )
297         {
298             node_type * pNode;
299             {
300                 scoped_lock lock( m_Head.lock );
301                 pNode = m_Head.ptr;
302                 node_type * pNewHead = pNode->m_pNext.load( atomics::memory_order_acquire );
303                 if ( pNewHead == nullptr )
304                     return false;
305                 f( pNewHead->m_value );
306                 m_Head.ptr = pNewHead;
307             }    // unlock here
308             --m_ItemCounter;
309             free_node( pNode );
310             return true;
311         }
312
313         /// Synonym for \p dequeue() function
314         bool pop( value_type& dest )
315         {
316             return dequeue( dest );
317         }
318
319         /// Synonym for \p dequeue_with() function
320         template <typename Func>
321         bool pop_with( Func f )
322         {
323             return dequeue_with( f );
324         }
325
326         /// Checks if queue is empty
327         bool empty() const
328         {
329             scoped_lock lock( m_Head.lock );
330             return m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) == nullptr;
331         }
332
333         /// Clears queue
334         void clear()
335         {
336             scoped_lock lockR( m_Head.lock );
337             scoped_lock lockW( m_Tail.lock );
338             while ( m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) != nullptr ) {
339                 node_type * pHead = m_Head.ptr;
340                 m_Head.ptr = m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed );
341                 free_node( pHead );
342             }
343         }
344
345         /// Returns queue's item count
346         /**
347             The value returned depends on \p rwqueue::traits::item_counter. For \p atomicity::empty_item_counter,
348             this function always returns 0.
349
350             @note Even if you use real item counter and it returns 0, this fact is not mean that the queue
351             is empty. To check queue emptyness use \p empty() method.
352         */
353         size_t    size() const
354         {
355             return m_ItemCounter.value();
356         }
357
358         //@cond
359         /// The class has no internal statistics. For test consistency only
360         nullptr_t statistics() const
361         {
362             return nullptr;
363         }
364         //@endcond
365     };
366
367 }}  // namespace cds::container
368
369 #endif // #ifndef CDSLIB_CONTAINER_RWQUEUE_H