-//$$CDS-header$$
+/*
+ This file is a part of libcds - Concurrent Data Structures library
-#ifndef __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
-#define __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
+ (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
+
+ Source code repo: http://github.com/khizmax/libcds/
+ Download: http://sourceforge.net/projects/libcds/files/
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifndef CDSLIB_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
+#define CDSLIB_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
#include <cds/intrusive/details/base.h>
-#include <cds/cxx11_atomic.h>
+#include <cds/algo/atomic.h>
#include <cds/details/bounded_container.h>
#include <cds/opt/buffer.h>
buffer for required type via \p rebind metafunction.
For \p TsigasCycleQueue queue the buffer size should have power-of-2 size.
+
+ You should use any initialized buffer type, see \p opt::buffer.
*/
- typedef cds::opt::v::dynamic_buffer< void * > buffer;
+ typedef cds::opt::v::initialized_dynamic_buffer< void * > buffer;
/// Back-off strategy
typedef cds::backoff::empty back_off;
typedef atomicity::empty_item_counter item_counter;
/// C++ memory ordering model
- /**
+ /**
Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
or \p opt::v::sequential_consistent (sequentially consisnent memory model).
*/
typedef opt::v::relaxed_ordering memory_model;
- /// Alignment for internal queue data. Default is \p opt::cache_line_alignment
- enum { alignment = opt::cache_line_alignment };
+ /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
+ enum { padding = opt::cache_line_padding };
};
/// Metafunction converting option list to \p tsigas_queue::traits
/**
Supported \p Options are:
- \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
- \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
+ \p opt::v::initialized_dynamic_buffer (the default), \p opt::v::initialized_static_buffer. The type of
element in the buffer is not important: it will be changed via \p rebind metafunction.
- \p opt::back_off - back-off strategy used, default is \p cds::backoff::empty.
- \p opt::disposer - the functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used
when dequeuing.
- \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
To enable item counting use \p cds::atomicity::item_counter
- - \p opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment
+ - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
- \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
or \p opt::v::sequential_consistent (sequentially consisnent memory model).
Example: declare \p %TsigasCycleQueue with item counting and static iternal buffer of size 1024:
\code
- typedef cds::intrusive::TsigasCycleQueue< Foo,
+ typedef cds::intrusive::TsigasCycleQueue< Foo,
typename cds::intrusive::tsigas_queue::make_traits<
- cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
+ cds::opt::buffer< cds::opt::v::initialized_static_buffer< void *, 1024 >,
cds::opt::item_counte< cds::atomicity::item_counter >
>::type
> myQueue;
typedef cds::intrusive::TsigasCycleQueue< Foo, myTraits > myQueue;
// Equivalent make_traits example:
- typedef cds::intrusive::TsigasCycleQueue< Foo,
- typename cds::intrusive::tsigas_queue::make_traits<
+ typedef cds::intrusive::TsigasCycleQueue< Foo,
+ typename cds::intrusive::tsigas_queue::make_traits<
cds::opt::item_counter< cds::atomicity::item_counter >
>::type
> myQueue;
// Queue of Foo pointers, capacity is 1024, statically allocated buffer:
struct queue_traits: public cds::intrusive::tsigas_queue::traits
{
- typedef cds::opt::v::static_buffer< Foo, 1024 > buffer;
+ typedef cds::opt::v::initialized_static_buffer< Foo, 1024 > buffer;
};
typedef cds::intrusive::TsigasCycleQueue< Foo, queue_traits > static_queue;
static_queue stQueue;
// Queue of Foo pointers, capacity is 1024, dynamically allocated buffer, with item counting:
typedef cds::intrusive::TsigasCycleQueue< Foo,
typename cds::intrusive::tsigas_queue::make_traits<
- cds::opt::buffer< cds::opt::v::dynamic_buffer< Foo > >,
+ cds::opt::buffer< cds::opt::v::initialized_dynamic_buffer< Foo > >,
cds::opt::item_counter< cds::atomicity::item_counter >
>::type
> dynamic_queue;
protected:
//@cond
- typedef typename opt::details::alignment_setter< buffer, traits::alignment >::type aligned_buffer;
typedef size_t index_type;
- typedef typename opt::details::alignment_setter< atomics::atomic<index_type>, traits::alignment >::type aligned_index;
//@endcond
protected:
//@cond
- buffer m_buffer ; ///< array of pointer T *, array size is equal to m_nCapacity+1
- aligned_index m_nHead ; ///< index of queue's head
- aligned_index m_nTail ; ///< index of queue's tail
- item_counter m_ItemCounter ; ///< item counter
+ buffer m_buffer ; ///< array of pointer T *, array size is equal to m_nCapacity+1
+ typename opt::details::apply_padding< index_type, traits::padding >::padding_type pad1_;
+ atomics::atomic<index_type> m_nHead ; ///< index of queue's head
+ typename opt::details::apply_padding< index_type, traits::padding >::padding_type pad2_;
+ atomics::atomic<index_type> m_nTail ; ///< index of queue's tail
+ typename opt::details::apply_padding< index_type, traits::padding >::padding_type pad3_;
+ item_counter m_ItemCounter; ///< item counter
//@endcond
protected:
static bool is_free( const value_type * p ) CDS_NOEXCEPT
{
- return p == reinterpret_cast<value_type *>(free0) || p == reinterpret_cast<value_type *>(free1);
+ return (reinterpret_cast<intptr_t>(p) & ~intptr_t(1)) == 0;
}
size_t CDS_CONSTEXPR buffer_capacity() const CDS_NOEXCEPT
public:
/// Initialize empty queue of capacity \p nCapacity
/**
- If internal buffer type is \p cds::opt::v::static_buffer, the \p nCapacity parameter is ignored.
+ If internal buffer type is \p cds::opt::v::initialized_static_buffer, the \p nCapacity parameter is ignored.
Note that the real capacity of queue is \p nCapacity - 2.
*/
bool enqueue( value_type& data )
{
value_type * pNewNode = &data;
- assert( (reinterpret_cast<ptr_atomic_t>( pNewNode ) & 1) == 0 );
+ assert( (reinterpret_cast<uintptr_t>(pNewNode) & 1) == 0 );
back_off bkoff;
const index_type nModulo = modulo();
index_type temp = ( ate + 1 ) & nModulo ; // next item after tail
// Looking for actual tail
- while ( !is_free( tt ) ) {
- if ( te != m_nTail.load(memory_model::memory_order_relaxed) ) // check the tail consistency
+ while ( !is_free( tt )) {
+ if ( te != m_nTail.load(memory_model::memory_order_relaxed)) // check the tail consistency
goto TryAgain;
- if ( temp == m_nHead.load(memory_model::memory_order_acquire) ) // queue full?
+ if ( temp == m_nHead.load(memory_model::memory_order_acquire)) // queue full?
break;
tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
ate = temp;
temp = (temp + 1) & nModulo;
}
- if ( te != m_nTail.load(memory_model::memory_order_relaxed) )
+ if ( te != m_nTail.load(memory_model::memory_order_acquire))
continue;
// Check whether queue is full
- if ( temp == m_nHead.load(memory_model::memory_order_acquire) ) {
+ if ( temp == m_nHead.load(memory_model::memory_order_acquire)) {
ate = ( temp + 1 ) & nModulo;
tt = m_buffer[ ate ].load(memory_model::memory_order_relaxed);
- if ( !is_free( tt ) ) {
+ if ( !is_free( tt )) {
return false; // Queue is full
}
continue;
}
- if ( tt == reinterpret_cast<value_type *>(free1) )
+ if ( tt == reinterpret_cast<value_type *>(free1))
pNewNode = reinterpret_cast<value_type *>(reinterpret_cast<intptr_t>( pNewNode ) | 1);
- if ( te != m_nTail.load(memory_model::memory_order_relaxed) )
+ if ( te != m_nTail.load(memory_model::memory_order_acquire))
continue;
// get actual tail and try to enqueue new node
- if ( m_buffer[ate].compare_exchange_strong( tt, pNewNode, memory_model::memory_order_release, atomics::memory_order_relaxed ) ) {
+ if ( m_buffer[ate].compare_exchange_strong( tt, pNewNode, memory_model::memory_order_release, atomics::memory_order_relaxed )) {
if ( temp % 2 == 0 )
m_nTail.compare_exchange_strong( te, temp, memory_model::memory_order_release, atomics::memory_order_relaxed );
++m_ItemCounter;
value_type * pNull;
// find the actual head after this loop
- while ( is_free( tt ) ) {
- if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
+ while ( is_free( tt )) {
+ if ( th != m_nHead.load(memory_model::memory_order_relaxed))
goto TryAgain;
// two consecutive nullptr means the queue is empty
- if ( temp == m_nTail.load(memory_model::memory_order_acquire) )
+ if ( temp == m_nTail.load(memory_model::memory_order_acquire))
return nullptr;
temp = ( temp + 1 ) & nModulo;
tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
}
- if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
+ if ( th != m_nHead.load(memory_model::memory_order_relaxed))
continue;
// check whether the queue is empty
- if ( temp == m_nTail.load(memory_model::memory_order_acquire) ) {
+ if ( temp == m_nTail.load(memory_model::memory_order_acquire)) {
// help the enqueue to update end
m_nTail.compare_exchange_weak( temp, (temp + 1) & nModulo, memory_model::memory_order_release, atomics::memory_order_relaxed );
continue;
}
- pNull = reinterpret_cast<value_type *>((reinterpret_cast<ptr_atomic_t>(tt) & 1) ? free0 : free1 );
+ pNull = reinterpret_cast<value_type *>((reinterpret_cast<uintptr_t>(tt) & 1) ? free0 : free1);
- if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
+ if ( th != m_nHead.load(memory_model::memory_order_relaxed))
continue;
// Get the actual head, null means empty
const value_type * tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
// find the actual head after this loop
- while ( is_free( tt ) ) {
- if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
+ while ( is_free( tt )) {
+ if ( th != m_nHead.load(memory_model::memory_order_relaxed))
goto TryAgain;
// two consecutive nullptr means queue empty
- if ( temp == m_nTail.load(memory_model::memory_order_relaxed) )
+ if ( temp == m_nTail.load(memory_model::memory_order_relaxed))
return true;
temp = ( temp + 1 ) & nModulo;
tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
*/
void clear()
{
- clear( disposer() );
+ clear( disposer());
}
/// Returns queue's item count
/**
- The value returned depends on \p tsigas_queue::traits::item_counter.
+ The value returned depends on \p tsigas_queue::traits::item_counter.
For \p atomicity::empty_item_counter, the function always returns 0.
*/
size_t size() const CDS_NOEXCEPT
}} // namespace cds::intrusive
-#endif // #ifndef __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
+#endif // #ifndef CDSLIB_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H