-//$$CDS-header$$
-
-#ifndef __CDS_CONTAINER_RWQUEUE_H
-#define __CDS_CONTAINER_RWQUEUE_H
+/*
+ This file is a part of libcds - Concurrent Data Structures library
+
+ (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
+
+ Source code repo: http://github.com/khizmax/libcds/
+ Download: http://sourceforge.net/projects/libcds/files/
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifndef CDSLIB_CONTAINER_RWQUEUE_H
+#define CDSLIB_CONTAINER_RWQUEUE_H
#include <mutex> // unique_lock
-#include <cds/container/msqueue.h>
-#include <cds/lock/spinlock.h>
+#include <cds/sync/spinlock.h>
+#include <cds/opt/options.h>
+#include <cds/details/allocator.h>
namespace cds { namespace container {
/// RWQueue related definitions
struct traits
{
/// Lock policy
- typedef cds::lock::Spin lock_type;
+ typedef cds::sync::spin lock_type;
/// Node allocator
typedef CDS_DEFAULT_ALLOCATOR allocator;
/// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
typedef cds::atomicity::empty_item_counter item_counter;
- /// Alignment of internal queue data. Default is \p opt::cache_line_alignment
- enum { alignment = opt::cache_line_alignment };
+ /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
+ enum { padding = opt::cache_line_padding };
};
/// Metafunction converting option list to \p rwqueue::traits
/**
Supported \p Options are:
- - opt::lock_type - lock policy, default is \p cds::lock::Spin. Any type satisfied \p Mutex C++ concept may be used.
+ - opt::lock_type - lock policy, default is \p cds::sync::spin. Any type satisfied \p Mutex C++ concept may be used.
- opt::allocator - allocator (like \p std::allocator) used for allocating queue nodes. Default is \ref CDS_DEFAULT_ALLOCATOR
- opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
To enable item counting use \p cds::atomicity::item_counter.
- - opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment
+ - \p opt::padding - padding for internal critical data. Default is \p opt::cache_line_padding
Example: declare mutex-based \p %RWQueue with item counting
\code
typedef T value_type; ///< Type of value to be stored in the queue
typedef Traits traits; ///< Queue traits
- typedef typename traits::lock_type lock_type; ///< Locking primitive
+ typedef typename traits::lock_type lock_type; ///< Locking primitive
typedef typename traits::item_counter item_counter; ///< Item counting policy used
protected:
/// Node type
struct node_type
{
- node_type * volatile m_pNext ; ///< Pointer to the next node in the queue
- value_type m_value ; ///< Value stored in the node
+ atomics::atomic< node_type *> m_pNext; ///< Pointer to the next node in the queue
+ value_type m_value; ///< Value stored in the node
node_type( value_type const& v )
: m_pNext( nullptr )
protected:
//@cond
- typedef typename opt::details::alignment_setter< lock_type, traits::alignment >::type aligned_lock_type;
typedef std::unique_lock<lock_type> scoped_lock;
typedef cds::details::Allocator< node_type, allocator_type > node_allocator;
- item_counter m_ItemCounter;
+ struct head_type {
+ mutable lock_type lock;
+ node_type * ptr;
+ };
+
+ head_type m_Head;
+ typename opt::details::apply_padding< head_type, traits::padding >::padding_type pad_;
+ head_type m_Tail;
- mutable aligned_lock_type m_HeadLock;
- node_type * m_pHead;
- mutable aligned_lock_type m_TailLock;
- node_type * m_pTail;
+ item_counter m_ItemCounter;
//@endcond
protected:
{
assert( p != nullptr );
{
- scoped_lock lock( m_TailLock );
- m_pTail =
- m_pTail->m_pNext = p;
+ scoped_lock lock( m_Tail.lock );
+ m_Tail.ptr->m_pNext.store( p, atomics::memory_order_release );
+ m_Tail.ptr = p;
}
++m_ItemCounter;
return true;
RWQueue()
{
node_type * pNode = alloc_node();
- m_pHead =
- m_pTail = pNode;
+ m_Head.ptr =
+ m_Tail.ptr = pNode;
}
/// Destructor clears queue
~RWQueue()
{
clear();
- assert( m_pHead == m_pTail );
- free_node( m_pHead );
+ assert( m_Head.ptr == m_Tail.ptr );
+ free_node( m_Head.ptr );
}
/// Enqueues \p data. Always return \a true
return false;
}
+ /// Enqueues \p data, move semantics
+ bool enqueue( value_type&& data )
+ {
+ scoped_node_ptr p( alloc_node_move( std::move( data )));
+ if ( enqueue_node( p.get() ) ) {
+ p.release();
+ return true;
+ }
+ return false;
+ }
+
/// Enqueues \p data to the queue using a functor
/**
\p Func is a functor called to create node.
return false;
}
- /// Synonym for \p enqueue() function
+ /// Synonym for \p enqueue( value_type const& ) function
bool push( value_type const& val )
{
return enqueue( val );
}
+ /// Synonym for \p enqueue( value_type&& ) function
+ bool push( value_type&& val )
+ {
+ return enqueue( std::move( val ));
+ }
+
/// Synonym for \p enqueue_with() function
template <typename Func>
bool push_with( Func f )
*/
bool dequeue( value_type& dest )
{
- return dequeue_with( [&dest]( value_type& src ) { dest = src; } );
+ return dequeue_with( [&dest]( value_type& src ) { dest = std::move( src ); });
}
/// Dequeues a value using a functor
{
node_type * pNode;
{
- scoped_lock lock( m_HeadLock );
- pNode = m_pHead;
- node_type * pNewHead = pNode->m_pNext;
+ scoped_lock lock( m_Head.lock );
+ pNode = m_Head.ptr;
+ node_type * pNewHead = pNode->m_pNext.load( atomics::memory_order_acquire );
if ( pNewHead == nullptr )
return false;
f( pNewHead->m_value );
- m_pHead = pNewHead;
+ m_Head.ptr = pNewHead;
} // unlock here
--m_ItemCounter;
free_node( pNode );
/// Checks if queue is empty
bool empty() const
{
- scoped_lock lock( m_HeadLock );
- return m_pHead->m_pNext == nullptr;
+ scoped_lock lock( m_Head.lock );
+ return m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) == nullptr;
}
/// Clears queue
void clear()
{
- scoped_lock lockR( m_HeadLock );
- scoped_lock lockW( m_TailLock );
- while ( m_pHead->m_pNext != nullptr ) {
- node_type * pHead = m_pHead;
- m_pHead = m_pHead->m_pNext;
+ scoped_lock lockR( m_Head.lock );
+ scoped_lock lockW( m_Tail.lock );
+ while ( m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) != nullptr ) {
+ node_type * pHead = m_Head.ptr;
+ m_Head.ptr = m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed );
free_node( pHead );
}
+ m_ItemCounter.reset();
}
/// Returns queue's item count
}
//@cond
- /// Returns reference to internal statistics
- cds::container::msqueue::empty_stat statistics() const
+ /// The class has no internal statistics. For test consistency only
+ std::nullptr_t statistics() const
{
- return cds::container::msqueue::empty_stat();
+ return nullptr;
}
//@endcond
};
}} // namespace cds::container
-#endif // #ifndef __CDS_CONTAINER_RWQUEUE_H
+#endif // #ifndef CDSLIB_CONTAINER_RWQUEUE_H