-//$$CDS-header$$
+/*
+ This file is a part of libcds - Concurrent Data Structures library
-#ifndef __CDS_CONTAINER_RWQUEUE_H
-#define __CDS_CONTAINER_RWQUEUE_H
+ (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
-#include <memory>
-#include <functional> // ref
+ Source code repo: http://github.com/khizmax/libcds/
+ Download: http://sourceforge.net/projects/libcds/files/
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifndef CDSLIB_CONTAINER_RWQUEUE_H
+#define CDSLIB_CONTAINER_RWQUEUE_H
+
+#include <mutex> // unique_lock
+#include <cds/sync/spinlock.h>
#include <cds/opt/options.h>
-#include <cds/lock/spinlock.h>
-#include <cds/intrusive/details/queue_stat.h>
#include <cds/details/allocator.h>
-#include <cds/details/trivial_assign.h>
namespace cds { namespace container {
+ /// RWQueue related definitions
+ /** @ingroup cds_nonintrusive_helper
+ */
+ namespace rwqueue {
+ /// RWQueue default type traits
+ struct traits
+ {
+ /// Lock policy
+ typedef cds::sync::spin lock_type;
+
+ /// Node allocator
+ typedef CDS_DEFAULT_ALLOCATOR allocator;
+
+ /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
+ typedef cds::atomicity::empty_item_counter item_counter;
+
+ /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
+ enum { padding = opt::cache_line_padding };
+ };
+
+ /// Metafunction converting option list to \p rwqueue::traits
+ /**
+ Supported \p Options are:
+ - opt::lock_type - lock policy, default is \p cds::sync::spin. Any type satisfied \p Mutex C++ concept may be used.
+ - opt::allocator - allocator (like \p std::allocator) used for allocating queue nodes. Default is \ref CDS_DEFAULT_ALLOCATOR
+ - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
+ To enable item counting use \p cds::atomicity::item_counter.
+ - \p opt::padding - padding for internal critical data. Default is \p opt::cache_line_padding
+
+ Example: declare mutex-based \p %RWQueue with item counting
+ \code
+ typedef cds::container::RWQueue< Foo,
+ typename cds::container::rwqueue::make_traits<
+ cds::opt::item_counter< cds::atomicity::item_counter >,
+ cds::opt::lock_type< std::mutex >
+ >::type
+ > myQueue;
+ \endcode
+ */
+ template <typename... Options>
+ struct make_traits {
+# ifdef CDS_DOXYGEN_INVOKED
+ typedef implementation_defined type; ///< Metafunction result
+# else
+ typedef typename cds::opt::make_options<
+ typename cds::opt::find_type_traits< traits, Options... >::type
+ , Options...
+ >::type type;
+# endif
+ };
+
+ } // namespace rwqueue
/// Michael & Scott blocking queue with fine-grained synchronization schema
/** @ingroup cds_nonintrusive_queue
and blocking concurrent queue algorithms"
<b>Template arguments</b>
- - \p T - type to be stored in the queue
- - \p Options - options
-
- \p Options are:
- - opt::allocator - allocator (like \p std::allocator). Default is \ref CDS_DEFAULT_ALLOCATOR
- - opt::lock_type - type of lock primitive. Default is cds::lock::Spin.
- - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter
- - opt::stat - the type to gather internal statistics.
- Possible option value are: queue_stat, queue_dummy_stat, user-provided class that supports queue_stat interface.
- Default is \ref intrusive::queue_dummy_stat.
- <tt>RWQueue</tt> uses only \p onEnqueue and \p onDequeue counter.
- - opt::alignment - the alignment for \p lock_type to prevent false sharing. Default is opt::cache_line_alignment
-
- This queue has no intrusive counterpart.
+ - \p T - value type to be stored in the queue
+ - \p Traits - queue traits, default is \p rwqueue::traits. You can use \p rwqueue::make_traits
+ metafunction to make your traits or just derive your traits from \p %rwqueue::traits:
+ \code
+ struct myTraits: public cds::container::rwqueue::traits {
+ typedef cds::atomicity::item_counter item_counter;
+ };
+ typedef cds::container::RWQueue< Foo, myTraits > myQueue;
+
+ // Equivalent make_traits example:
+ typedef cds::container::RWQueue< Foo,
+ typename cds::container::rwqueue::make_traits<
+ cds::opt::item_counter< cds::atomicity::item_counter >
+ >::type
+ > myQueue;
+ \endcode
*/
- template <typename T, typename... Options>
+ template <typename T, typename Traits = rwqueue::traits >
class RWQueue
{
- //@cond
- struct default_options
- {
- typedef lock::Spin lock_type;
- typedef CDS_DEFAULT_ALLOCATOR allocator;
- typedef atomicity::empty_item_counter item_counter;
- typedef intrusive::queue_dummy_stat stat;
- enum { alignment = opt::cache_line_alignment };
- };
- //@endcond
-
- public:
- //@cond
- typedef typename opt::make_options<
- typename cds::opt::find_type_traits< default_options, Options... >::type
- ,Options...
- >::type options;
- //@endcond
-
public:
/// Rebind template arguments
- template <typename T2, typename... Options2>
+ template <typename T2, typename Traits2>
struct rebind {
- typedef RWQueue< T2, Options2...> other ; ///< Rebinding result
+ typedef RWQueue< T2, Traits2 > other ; ///< Rebinding result
};
public:
- typedef T value_type ; ///< type of value stored in the queue
+ typedef T value_type; ///< Type of value to be stored in the queue
+ typedef Traits traits; ///< Queue traits
- typedef typename options::lock_type lock_type ; ///< Locking primitive used
+ typedef typename traits::lock_type lock_type; ///< Locking primitive
+ typedef typename traits::item_counter item_counter; ///< Item counting policy used
protected:
//@cond
/// Node type
struct node_type
{
- node_type * volatile m_pNext ; ///< Pointer to the next node in queue
- value_type m_value ; ///< Value stored in the node
+ atomics::atomic< node_type *> m_pNext; ///< Pointer to the next node in the queue
+ value_type m_value; ///< Value stored in the node
node_type( value_type const& v )
: m_pNext( nullptr )
//@endcond
public:
- typedef typename options::allocator::template rebind<node_type>::other allocator_type ; ///< Allocator type used for allocate/deallocate the queue nodes
- typedef typename options::item_counter item_counter ; ///< Item counting policy used
- typedef typename options::stat stat ; ///< Internal statistics policy used
+ typedef typename traits::allocator::template rebind<node_type>::other allocator_type; ///< Allocator type used for allocate/deallocate the queue nodes
protected:
//@cond
- typedef typename opt::details::alignment_setter< lock_type, options::alignment >::type aligned_lock_type;
- typedef cds::lock::scoped_lock<lock_type> auto_lock;
+ typedef std::unique_lock<lock_type> scoped_lock;
typedef cds::details::Allocator< node_type, allocator_type > node_allocator;
- item_counter m_ItemCounter;
- stat m_Stat;
+ struct head_type {
+ mutable lock_type lock;
+ node_type * ptr;
+ };
- mutable aligned_lock_type m_HeadLock;
- node_type * m_pHead;
- mutable aligned_lock_type m_TailLock;
- node_type * m_pTail;
+ head_type m_Head;
+ typename opt::details::apply_padding< head_type, traits::padding >::padding_type pad_;
+ head_type m_Tail;
+
+ item_counter m_ItemCounter;
//@endcond
protected:
{
assert( p != nullptr );
{
- auto_lock lock( m_TailLock );
- m_pTail =
- m_pTail->m_pNext = p;
+ scoped_lock lock( m_Tail.lock );
+ m_Tail.ptr->m_pNext.store( p, atomics::memory_order_release );
+ m_Tail.ptr = p;
}
++m_ItemCounter;
- m_Stat.onEnqueue();
return true;
}
RWQueue()
{
node_type * pNode = alloc_node();
- m_pHead =
- m_pTail = pNode;
+ m_Head.ptr =
+ m_Tail.ptr = pNode;
}
/// Destructor clears queue
~RWQueue()
{
clear();
- assert( m_pHead == m_pTail );
- free_node( m_pHead );
+ assert( m_Head.ptr == m_Tail.ptr );
+ free_node( m_Head.ptr );
}
/// Enqueues \p data. Always return \a true
bool enqueue( value_type const& data )
{
scoped_node_ptr p( alloc_node( data ));
- if ( enqueue_node( p.get() )) {
+ if ( enqueue_node( p.get())) {
+ p.release();
+ return true;
+ }
+ return false;
+ }
+
+ /// Enqueues \p data, move semantics
+ bool enqueue( value_type&& data )
+ {
+ scoped_node_ptr p( alloc_node_move( std::move( data )));
+ if ( enqueue_node( p.get()) ) {
p.release();
return true;
}
return false;
}
- /// Enqueues \p data to queue using copy functor
+ /// Enqueues \p data to the queue using a functor
/**
- \p Func is a functor called to copy value \p data of type \p Type
- which may be differ from type \p T stored in the queue.
- The functor's interface is:
+ \p Func is a functor called to create node.
+ The functor \p f takes one argument - a reference to a new node of type \ref value_type :
\code
- struct myFunctor {
- void operator()(T& dest, Type const& data)
- {
- // // Code to copy \p data to \p dest
- dest = data;
- }
- };
+ cds::container::RWQueue< cds::gc::HP, Foo > myQueue;
+ Bar bar;
+ myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = bar; } );
\endcode
- You may use \p boost:ref construction to pass functor \p f by reference.
-
- <b>Requirements</b> The functor \p Func should not throw any exception.
*/
- template <typename Type, typename Func>
- bool enqueue( Type const& data, Func f )
+ template <typename Func>
+ bool enqueue_with( Func f )
{
scoped_node_ptr p( alloc_node());
- f( p->m_value, data );
- if ( enqueue_node( p.get() )) {
+ f( p->m_value );
+ if ( enqueue_node( p.get())) {
p.release();
return true;
}
bool emplace( Args&&... args )
{
scoped_node_ptr p( alloc_node_move( std::forward<Args>(args)... ));
- if ( enqueue_node( p.get() )) {
+ if ( enqueue_node( p.get())) {
p.release();
return true;
}
return false;
}
- /// Dequeues a value using copy functor
+ /// Synonym for \p enqueue( value_type const& ) function
+ bool push( value_type const& val )
+ {
+ return enqueue( val );
+ }
+
+ /// Synonym for \p enqueue( value_type&& ) function
+ bool push( value_type&& val )
+ {
+ return enqueue( std::move( val ));
+ }
+
+ /// Synonym for \p enqueue_with() function
+ template <typename Func>
+ bool push_with( Func f )
+ {
+ return enqueue_with( f );
+ }
+
+ /// Dequeues a value to \p dest.
+ /**
+ If queue is empty returns \a false, \p dest can be corrupted.
+ If queue is not empty returns \a true, \p dest contains the value dequeued
+ */
+ bool dequeue( value_type& dest )
+ {
+ return dequeue_with( [&dest]( value_type& src ) { dest = std::move( src ); });
+ }
+
+ /// Dequeues a value using a functor
/**
- \p Func is a functor called to copy dequeued value to \p dest of type \p Type
- which may be differ from type \p T stored in the queue.
- The functor's interface is:
+ \p Func is a functor called to copy dequeued value.
+ The functor takes one argument - a reference to removed node:
\code
- struct myFunctor {
- void operator()(Type& dest, T const& data)
- {
- // // Copy \p data to \p dest
- dest = data;
- }
- };
+ cds:container::RWQueue< cds::gc::HP, Foo > myQueue;
+ Bar bar;
+ myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
\endcode
- You may use \p boost:ref construction to pass functor \p f by reference.
-
- <b>Requirements</b> The functor \p Func should not throw any exception.
+ The functor is called only if the queue is not empty.
*/
- template <typename Type, typename Func>
- bool dequeue( Type& dest, Func f )
+ template <typename Func>
+ bool dequeue_with( Func f )
{
node_type * pNode;
{
- auto_lock lock( m_HeadLock );
- pNode = m_pHead;
- node_type * pNewHead = pNode->m_pNext;
+ scoped_lock lock( m_Head.lock );
+ pNode = m_Head.ptr;
+ node_type * pNewHead = pNode->m_pNext.load( atomics::memory_order_acquire );
if ( pNewHead == nullptr )
return false;
- f( dest, pNewHead->m_value );
- m_pHead = pNewHead;
+ f( pNewHead->m_value );
+ m_Head.ptr = pNewHead;
} // unlock here
--m_ItemCounter;
free_node( pNode );
- m_Stat.onDequeue();
return true;
}
- /** Dequeues a value to \p dest.
-
- If queue is empty returns \a false, \p dest may be corrupted.
- If queue is not empty returns \a true, \p dest contains the value dequeued
- */
- bool dequeue( value_type& dest )
- {
- typedef cds::details::trivial_assign<value_type, value_type> functor;
- return dequeue( dest, functor() );
- }
-
- /// Synonym for \ref enqueue
- bool push( value_type const& data )
- {
- return enqueue( data );
- }
-
- /// Synonym for template version of \ref enqueue function
- template <typename Type, typename Func>
- bool push( Type const& data, Func f )
- {
- return enqueue( data, f );
- }
-
- /// Synonym for \ref dequeue
- bool pop( value_type& data )
+ /// Synonym for \p dequeue() function
+ bool pop( value_type& dest )
{
- return dequeue( data );
+ return dequeue( dest );
}
- /// Synonym for template version of \ref dequeue function
- template <typename Type, typename Func>
- bool pop( Type& dest, Func f )
+ /// Synonym for \p dequeue_with() function
+ template <typename Func>
+ bool pop_with( Func f )
{
- return dequeue( dest, f );
+ return dequeue_with( f );
}
/// Checks if queue is empty
bool empty() const
{
- auto_lock lock( m_HeadLock );
- return m_pHead->m_pNext == nullptr;
+ scoped_lock lock( m_Head.lock );
+ return m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) == nullptr;
}
/// Clears queue
void clear()
{
- auto_lock lockR( m_HeadLock );
- auto_lock lockW( m_TailLock );
- while ( m_pHead->m_pNext != nullptr ) {
- node_type * pHead = m_pHead;
- m_pHead = m_pHead->m_pNext;
+ scoped_lock lockR( m_Head.lock );
+ scoped_lock lockW( m_Tail.lock );
+ while ( m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) != nullptr ) {
+ node_type * pHead = m_Head.ptr;
+ m_Head.ptr = m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed );
free_node( pHead );
}
+ m_ItemCounter.reset();
}
/// Returns queue's item count
/**
- The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
+ The value returned depends on \p rwqueue::traits::item_counter. For \p atomicity::empty_item_counter,
this function always returns 0.
- <b>Warning</b>: even if you use real item counter and it returns 0, this fact is not mean that the queue
- is empty. To check queue emptyness use \ref empty() method.
+ @note Even if you use real item counter and it returns 0, this fact is not mean that the queue
+ is empty. To check queue emptyness use \p empty() method.
*/
size_t size() const
{
return m_ItemCounter.value();
}
- /// Returns reference to internal statistics
- stat const& statistics() const
+ //@cond
+ /// The class has no internal statistics. For test consistency only
+ std::nullptr_t statistics() const
{
- return m_Stat;
+ return nullptr;
}
-
+ //@endcond
};
}} // namespace cds::container
-#endif // #ifndef __CDS_CONTAINER_RWQUEUE_H
+#endif // #ifndef CDSLIB_CONTAINER_RWQUEUE_H