3 #ifndef __CDS_CONTAINER_RWQUEUE_H
4 #define __CDS_CONTAINER_RWQUEUE_H
7 #include <functional> // ref
8 #include <mutex> // unique_lock
9 #include <cds/opt/options.h>
10 #include <cds/lock/spinlock.h>
11 #include <cds/intrusive/details/queue_stat.h>
12 #include <cds/details/allocator.h>
13 #include <cds/details/trivial_assign.h>
15 namespace cds { namespace container {
17 /// Michael & Scott blocking queue with fine-grained synchronization schema
18 /** @ingroup cds_nonintrusive_queue
19 The queue has two different locks: one for reading and one for writing.
20 Therefore, one writer and one reader can simultaneously access to the queue.
21 The queue does not require any garbage collector.
24 - [1998] Maged Michael, Michael Scott "Simple, fast, and practical non-blocking
25 and blocking concurrent queue algorithms"
27 <b>Template arguments</b>
28 - \p T - type to be stored in the queue
29 - \p Options - options
32 - opt::allocator - allocator (like \p std::allocator). Default is \ref CDS_DEFAULT_ALLOCATOR
33 - opt::lock_type - type of lock primitive. Default is cds::lock::Spin.
34 - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter
35 - opt::stat - the type to gather internal statistics.
36 Possible option value are: queue_stat, queue_dummy_stat, user-provided class that supports queue_stat interface.
37 Default is \ref intrusive::queue_dummy_stat.
38 <tt>RWQueue</tt> uses only \p onEnqueue and \p onDequeue counter.
39 - opt::alignment - the alignment for \p lock_type to prevent false sharing. Default is opt::cache_line_alignment
41 This queue has no intrusive counterpart.
43 template <typename T, typename... Options>
47 struct default_options
49 typedef lock::Spin lock_type;
50 typedef CDS_DEFAULT_ALLOCATOR allocator;
51 typedef atomicity::empty_item_counter item_counter;
52 typedef intrusive::queue_dummy_stat stat;
53 enum { alignment = opt::cache_line_alignment };
59 typedef typename opt::make_options<
60 typename cds::opt::find_type_traits< default_options, Options... >::type
66 /// Rebind template arguments
67 template <typename T2, typename... Options2>
69 typedef RWQueue< T2, Options2...> other ; ///< Rebinding result
73 typedef T value_type ; ///< type of value stored in the queue
75 typedef typename options::lock_type lock_type ; ///< Locking primitive used
82 node_type * volatile m_pNext ; ///< Pointer to the next node in queue
83 value_type m_value ; ///< Value stored in the node
85 node_type( value_type const& v )
94 template <typename... Args>
95 node_type( Args&&... args )
97 , m_value( std::forward<Args>(args)...)
103 typedef typename options::allocator::template rebind<node_type>::other allocator_type ; ///< Allocator type used for allocate/deallocate the queue nodes
104 typedef typename options::item_counter item_counter ; ///< Item counting policy used
105 typedef typename options::stat stat ; ///< Internal statistics policy used
109 typedef typename opt::details::alignment_setter< lock_type, options::alignment >::type aligned_lock_type;
110 typedef std::unique_lock<lock_type> auto_lock;
111 typedef cds::details::Allocator< node_type, allocator_type > node_allocator;
113 item_counter m_ItemCounter;
116 mutable aligned_lock_type m_HeadLock;
118 mutable aligned_lock_type m_TailLock;
124 static node_type * alloc_node()
126 return node_allocator().New();
129 static node_type * alloc_node( T const& data )
131 return node_allocator().New( data );
134 template <typename... Args>
135 static node_type * alloc_node_move( Args&&... args )
137 return node_allocator().MoveNew( std::forward<Args>( args )... );
140 static void free_node( node_type * pNode )
142 node_allocator().Delete( pNode );
145 bool enqueue_node( node_type * p )
147 assert( p != nullptr );
149 auto_lock lock( m_TailLock );
151 m_pTail->m_pNext = p;
158 struct node_disposer {
159 void operator()( node_type * pNode )
164 typedef std::unique_ptr< node_type, node_disposer > scoped_node_ptr;
168 /// Makes empty queue
171 node_type * pNode = alloc_node();
176 /// Destructor clears queue
180 assert( m_pHead == m_pTail );
181 free_node( m_pHead );
184 /// Enqueues \p data. Always return \a true
185 bool enqueue( value_type const& data )
187 scoped_node_ptr p( alloc_node( data ));
188 if ( enqueue_node( p.get() )) {
195 /// Enqueues \p data to queue using copy functor
197 \p Func is a functor called to copy value \p data of type \p Type
198 which may be differ from type \p T stored in the queue.
199 The functor's interface is:
202 void operator()(T& dest, Type const& data)
204 // // Code to copy \p data to \p dest
209 You may use \p boost:ref construction to pass functor \p f by reference.
211 <b>Requirements</b> The functor \p Func should not throw any exception.
213 template <typename Type, typename Func>
214 bool enqueue( Type const& data, Func f )
216 scoped_node_ptr p( alloc_node());
217 f( p->m_value, data );
218 if ( enqueue_node( p.get() )) {
225 /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
226 template <typename... Args>
227 bool emplace( Args&&... args )
229 scoped_node_ptr p( alloc_node_move( std::forward<Args>(args)... ));
230 if ( enqueue_node( p.get() )) {
237 /// Dequeues a value using copy functor
239 \p Func is a functor called to copy dequeued value to \p dest of type \p Type
240 which may be differ from type \p T stored in the queue.
241 The functor's interface is:
244 void operator()(Type& dest, T const& data)
246 // // Copy \p data to \p dest
251 You may use \p boost:ref construction to pass functor \p f by reference.
253 <b>Requirements</b> The functor \p Func should not throw any exception.
255 template <typename Type, typename Func>
256 bool dequeue( Type& dest, Func f )
260 auto_lock lock( m_HeadLock );
262 node_type * pNewHead = pNode->m_pNext;
263 if ( pNewHead == nullptr )
265 f( dest, pNewHead->m_value );
274 /** Dequeues a value to \p dest.
276 If queue is empty returns \a false, \p dest may be corrupted.
277 If queue is not empty returns \a true, \p dest contains the value dequeued
279 bool dequeue( value_type& dest )
281 typedef cds::details::trivial_assign<value_type, value_type> functor;
282 return dequeue( dest, functor() );
285 /// Synonym for \ref enqueue
286 bool push( value_type const& data )
288 return enqueue( data );
291 /// Synonym for template version of \ref enqueue function
292 template <typename Type, typename Func>
293 bool push( Type const& data, Func f )
295 return enqueue( data, f );
298 /// Synonym for \ref dequeue
299 bool pop( value_type& data )
301 return dequeue( data );
304 /// Synonym for template version of \ref dequeue function
305 template <typename Type, typename Func>
306 bool pop( Type& dest, Func f )
308 return dequeue( dest, f );
311 /// Checks if queue is empty
314 auto_lock lock( m_HeadLock );
315 return m_pHead->m_pNext == nullptr;
321 auto_lock lockR( m_HeadLock );
322 auto_lock lockW( m_TailLock );
323 while ( m_pHead->m_pNext != nullptr ) {
324 node_type * pHead = m_pHead;
325 m_pHead = m_pHead->m_pNext;
330 /// Returns queue's item count
332 The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
333 this function always returns 0.
335 <b>Warning</b>: even if you use real item counter and it returns 0, this fact is not mean that the queue
336 is empty. To check queue emptyness use \ref empty() method.
340 return m_ItemCounter.value();
343 /// Returns reference to internal statistics
344 stat const& statistics() const
351 }} // namespace cds::container
353 #endif // #ifndef __CDS_CONTAINER_RWQUEUE_H