X-Git-Url: http://plrg.eecs.uci.edu/git/?p=libcds.git;a=blobdiff_plain;f=cds%2Fintrusive%2Ftsigas_cycle_queue.h;h=1fcac55a5db77fdadc24f1ab1560c7daf667163b;hp=ac240517fc81d35a4b449a1694f6dcd5b436481f;hb=cd515d6402be81b84e2eb0c9d4cf0a1ca9e4d95a;hpb=98aa954aa9d1b640f6f6d81018542eec1c2046bb diff --git a/cds/intrusive/tsigas_cycle_queue.h b/cds/intrusive/tsigas_cycle_queue.h index ac240517..1fcac55a 100644 --- a/cds/intrusive/tsigas_cycle_queue.h +++ b/cds/intrusive/tsigas_cycle_queue.h @@ -1,16 +1,120 @@ -//$$CDS-header$$ - -#ifndef __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H -#define __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H - -#include -#include +/* + This file is a part of libcds - Concurrent Data Structures library + + (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016 + + Source code repo: http://github.com/khizmax/libcds/ + Download: http://sourceforge.net/projects/libcds/files/ + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#ifndef CDSLIB_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H +#define CDSLIB_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H + +#include +#include #include #include -#include namespace cds { namespace intrusive { + /// TsigasCycleQueue related definitions + /** @ingroup cds_intrusive_helper + */ + namespace tsigas_queue { + + /// TsigasCycleQueue default traits + struct traits + { + /// Buffer type for cyclic array + /* + The type of element for the buffer is not important: the queue rebinds + buffer for required type via \p rebind metafunction. + + For \p TsigasCycleQueue queue the buffer size should have power-of-2 size. + */ + typedef cds::opt::v::dynamic_buffer< void * > buffer; + + /// Back-off strategy + typedef cds::backoff::empty back_off; + + /// The functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used for dequeuing + typedef opt::v::empty_disposer disposer; + + /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting + typedef atomicity::empty_item_counter item_counter; + + /// C++ memory ordering model + /** + Can be \p opt::v::relaxed_ordering (relaxed memory model, the default) + or \p opt::v::sequential_consistent (sequentially consisnent memory model). + */ + typedef opt::v::relaxed_ordering memory_model; + + /// Alignment for internal queue data. Default is \p opt::cache_line_alignment + enum { alignment = opt::cache_line_alignment }; + }; + + /// Metafunction converting option list to \p tsigas_queue::traits + /** + Supported \p Options are: + - \p opt::buffer - the buffer type for internal cyclic array. Possible types are: + \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of + element in the buffer is not important: it will be changed via \p rebind metafunction. + - \p opt::back_off - back-off strategy used, default is \p cds::backoff::empty. + - \p opt::disposer - the functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used + when dequeuing. + - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled) + To enable item counting use \p cds::atomicity::item_counter + - \p opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment + - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default) + or \p opt::v::sequential_consistent (sequentially consisnent memory model). + + Example: declare \p %TsigasCycleQueue with item counting and static iternal buffer of size 1024: + \code + typedef cds::intrusive::TsigasCycleQueue< Foo, + typename cds::intrusive::tsigas_queue::make_traits< + cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >, + cds::opt::item_counte< cds::atomicity::item_counter > + >::type + > myQueue; + \endcode + */ + template + struct make_traits { +# ifdef CDS_DOXYGEN_INVOKED + typedef implementation_defined type; ///< Metafunction result +# else + typedef typename cds::opt::make_options< + typename cds::opt::find_type_traits< traits, Options... >::type + , Options... + >::type type; +# endif + }; + + + } //namespace tsigas_queue + /// Non-blocking cyclic queue discovered by Philippas Tsigas and Yi Zhang /** @ingroup cds_intrusive_queue @@ -19,21 +123,25 @@ namespace cds { namespace intrusive { for Shared Memory Multiprocessor Systems" Template arguments: - - T - data stored in queue. The queue stores pointers to passed data of type \p T. + - \p T - value type to be stored in queue. The queue stores pointers to passed data of type \p T. Restriction: the queue can manage at least two-byte aligned data: the least significant bit (LSB) of any pointer stored in the queue must be zero since the algorithm may use LSB as a flag that marks the free cell. - - Options - options - - \p Options are: - - opt::buffer - buffer to store items. Mandatory option, see option description for full list of possible types. - - opt::disposer - the functor used for dispose removed items. Default is opt::v::empty_disposer. This option is used - only in \ref clear function. - - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter - - opt::back_off - back-off strategy used. If the option is not specified, the cds::backoff::empty is used. - - opt::alignment - the alignment for internal queue data. Default is opt::cache_line_alignment - - opt::memory_model - C++ memory ordering model. Can be opt::v::relaxed_ordering (relaxed memory model, the default) - or opt::v::sequential_consistent (sequentially consisnent memory model). + - \p Traits - queue traits, default is \p tsigas_queue::traits. You can use \p tsigas_queue::make_traits + metafunction to make your traits or just derive your traits from \p %tsigas_queue::traits: + \code + struct myTraits: public cds::intrusive::tsigas_queue::traits { + typedef cds::atomicity::item_counter item_counter; + }; + typedef cds::intrusive::TsigasCycleQueue< Foo, myTraits > myQueue; + + // Equivalent make_traits example: + typedef cds::intrusive::TsigasCycleQueue< Foo, + typename cds::intrusive::tsigas_queue::make_traits< + cds::opt::item_counter< cds::atomicity::item_counter > + >::type + > myQueue; + \endcode This queue algorithm does not require any garbage collector. @@ -46,62 +154,47 @@ namespace cds { namespace intrusive { }; // Queue of Foo pointers, capacity is 1024, statically allocated buffer: - typedef cds::intrusive::TsigasCycleQueue< - Foo - ,cds::opt::buffer< cds::opt::v::static_buffer< Foo, 1024 > > - > static_queue; + struct queue_traits: public cds::intrusive::tsigas_queue::traits + { + typedef cds::opt::v::static_buffer< Foo, 1024 > buffer; + }; + typedef cds::intrusive::TsigasCycleQueue< Foo, queue_traits > static_queue; static_queue stQueue; - // Queue of Foo pointers, capacity is 1024, dynamically allocated buffer: - typedef cds::intrusive::TsigasCycleQueue< - Foo - ,cds::opt::buffer< cds::opt::v::dynamic_buffer< Foo > > + // Queue of Foo pointers, capacity is 1024, dynamically allocated buffer, with item counting: + typedef cds::intrusive::TsigasCycleQueue< Foo, + typename cds::intrusive::tsigas_queue::make_traits< + cds::opt::buffer< cds::opt::v::dynamic_buffer< Foo > >, + cds::opt::item_counter< cds::atomicity::item_counter > + >::type > dynamic_queue; dynamic_queue dynQueue( 1024 ); \endcode */ - template + template class TsigasCycleQueue: public cds::bounded_container { - //@cond - struct default_options - { - typedef cds::backoff::empty back_off; - typedef opt::v::empty_disposer disposer; - typedef atomicity::empty_item_counter item_counter; - typedef opt::v::relaxed_ordering memory_model; - enum { alignment = opt::cache_line_alignment }; - }; - //@endcond - - public: - //@cond - typedef typename opt::make_options< - typename cds::opt::find_type_traits< default_options, CDS_OPTIONS7>::type - ,CDS_OPTIONS7 - >::type options; - //@endcond - public: /// Rebind template arguments - template + template struct rebind { - typedef TsigasCycleQueue< T2, CDS_OTHER_OPTIONS7> other ; ///< Rebinding result + typedef TsigasCycleQueue< T2, Traits2 > other ; ///< Rebinding result }; public: - typedef T value_type ; ///< type of value stored in the queue - typedef typename options::item_counter item_counter; ///< Item counter type - typedef typename options::disposer disposer ; ///< Item disposer - typedef typename options::back_off back_off ; ///< back-off strategy used - typedef typename options::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option + typedef T value_type; ///< type of value to be stored in the queue + typedef Traits traits; ///< Queue traits + typedef typename traits::item_counter item_counter; ///< Item counter type + typedef typename traits::disposer disposer; ///< Item disposer + typedef typename traits::back_off back_off; ///< back-off strategy used + typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option + typedef typename traits::buffer::template rebind< atomics::atomic >::other buffer; ///< Internal buffer protected: //@cond - typedef typename options::buffer::template rebind< CDS_ATOMIC::atomic >::other buffer; - typedef typename opt::details::alignment_setter< buffer, options::alignment >::type aligned_buffer; + typedef typename opt::details::alignment_setter< buffer, traits::alignment >::type aligned_buffer; typedef size_t index_type; - typedef typename opt::details::alignment_setter< CDS_ATOMIC::atomic, options::alignment >::type aligned_index; + typedef typename opt::details::alignment_setter< atomics::atomic, traits::alignment >::type aligned_index; //@endcond protected: @@ -114,25 +207,20 @@ namespace cds { namespace intrusive { protected: //@cond - static CDS_CONSTEXPR value_type * free0() CDS_NOEXCEPT - { - return nullptr; - } - static CDS_CONSTEXPR value_type * free1() CDS_NOEXCEPT - { - return (value_type*) 1; - } + static CDS_CONSTEXPR intptr_t const free0 = 0; + static CDS_CONSTEXPR intptr_t const free1 = 1; + static bool is_free( const value_type * p ) CDS_NOEXCEPT { - return p == free0() || p == free1(); + return (reinterpret_cast(p) & ~intptr_t(1)) == 0; } - size_t buffer_capacity() const CDS_NOEXCEPT + size_t CDS_CONSTEXPR buffer_capacity() const CDS_NOEXCEPT { return m_buffer.capacity(); } - index_type modulo() const CDS_NOEXCEPT + index_type CDS_CONSTEXPR modulo() const CDS_NOEXCEPT { return buffer_capacity() - 1; } @@ -141,7 +229,7 @@ namespace cds { namespace intrusive { public: /// Initialize empty queue of capacity \p nCapacity /** - For cds::opt::v::static_buffer the \p nCapacity parameter is ignored. + If internal buffer type is \p cds::opt::v::static_buffer, the \p nCapacity parameter is ignored. Note that the real capacity of queue is \p nCapacity - 2. */ @@ -159,30 +247,14 @@ namespace cds { namespace intrusive { clear(); } - /// Returns queue's item count - /** - The value returned depends on opt::item_counter option. For atomicity::empty_item_counter, - this function always returns 0. - */ - size_t size() const CDS_NOEXCEPT - { - return m_ItemCounter.value(); - } - - /// Returns capacity of cyclic buffer - size_t capacity() const CDS_NOEXCEPT - { - return buffer_capacity() - 2; - } - - /// Enqueues item from the queue + /// Enqueues an item to the queue /** @anchor cds_intrusive_TsigasQueue_enqueue - Returns \p true if success, \p false otherwise (for example, if queue is full) + Returns \p true if success, \p false if queue is full */ bool enqueue( value_type& data ) { value_type * pNewNode = &data; - assert( (reinterpret_cast( pNewNode ) & 1) == 0 ); + assert( (reinterpret_cast(pNewNode) & 1) == 0 ); back_off bkoff; const index_type nModulo = modulo(); @@ -204,7 +276,7 @@ namespace cds { namespace intrusive { temp = (temp + 1) & nModulo; } - if ( te != m_nTail.load(memory_model::memory_order_relaxed) ) + if ( te != m_nTail.load(memory_model::memory_order_acquire) ) continue; // Check whether queue is full @@ -212,23 +284,23 @@ namespace cds { namespace intrusive { ate = ( temp + 1 ) & nModulo; tt = m_buffer[ ate ].load(memory_model::memory_order_relaxed); if ( !is_free( tt ) ) { - return false ; // Queue is full + return false; // Queue is full } // help the dequeue to update head - m_nHead.compare_exchange_strong( temp, ate, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed ); + m_nHead.compare_exchange_strong( temp, ate, memory_model::memory_order_release, atomics::memory_order_relaxed ); continue; } - if ( tt == free1() ) + if ( tt == reinterpret_cast(free1) ) pNewNode = reinterpret_cast(reinterpret_cast( pNewNode ) | 1); - if ( te != m_nTail.load(memory_model::memory_order_relaxed) ) + if ( te != m_nTail.load(memory_model::memory_order_acquire) ) continue; // get actual tail and try to enqueue new node - if ( m_buffer[ate].compare_exchange_strong( tt, pNewNode, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed ) ) { + if ( m_buffer[ate].compare_exchange_strong( tt, pNewNode, memory_model::memory_order_release, atomics::memory_order_relaxed ) ) { if ( temp % 2 == 0 ) - m_nTail.compare_exchange_strong( te, temp, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed ); + m_nTail.compare_exchange_strong( te, temp, memory_model::memory_order_release, atomics::memory_order_relaxed ); ++m_ItemCounter; return true; } @@ -241,9 +313,9 @@ namespace cds { namespace intrusive { /// Dequeues item from the queue /** @anchor cds_intrusive_TsigasQueue_dequeue - If the queue is empty the function returns \a NULL + If the queue is empty the function returns \p nullptr - Dequeue does not call value disposer. You can manually dispose returned value if it is needed. + Dequeue does not call value disposer. You may manually dispose returned value if it is needed. */ value_type * dequeue() { @@ -261,9 +333,9 @@ namespace cds { namespace intrusive { if ( th != m_nHead.load(memory_model::memory_order_relaxed) ) goto TryAgain; - // two consecutive NULL means queue empty + // two consecutive nullptr means the queue is empty if ( temp == m_nTail.load(memory_model::memory_order_acquire) ) - return NULL; + return nullptr; temp = ( temp + 1 ) & nModulo; tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed); @@ -275,19 +347,19 @@ namespace cds { namespace intrusive { // check whether the queue is empty if ( temp == m_nTail.load(memory_model::memory_order_acquire) ) { // help the enqueue to update end - m_nTail.compare_exchange_strong( temp, (temp + 1) & nModulo, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed ); + m_nTail.compare_exchange_weak( temp, (temp + 1) & nModulo, memory_model::memory_order_release, atomics::memory_order_relaxed ); continue; } - pNull = (reinterpret_cast( tt ) & 1) ? free0() : free1(); + pNull = reinterpret_cast((reinterpret_cast(tt) & 1) ? free0 : free1); if ( th != m_nHead.load(memory_model::memory_order_relaxed) ) continue; // Get the actual head, null means empty - if ( m_buffer[temp].compare_exchange_strong( tt, pNull, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed )) { + if ( m_buffer[temp].compare_exchange_weak( tt, pNull, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) { if ( temp % 2 == 0 ) - m_nHead.compare_exchange_strong( th, temp, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed ); + m_nHead.compare_exchange_weak( th, temp, memory_model::memory_order_release, atomics::memory_order_relaxed ); --m_ItemCounter; return reinterpret_cast(reinterpret_cast( tt ) & ~intptr_t(1)); } @@ -299,13 +371,13 @@ namespace cds { namespace intrusive { return nullptr; } - /// Synonym of \ref cds_intrusive_TsigasQueue_enqueue "enqueue" + /// Synonym for \p enqueue() bool push( value_type& data ) { return enqueue( data ); } - /// Synonym of \ref cds_intrusive_TsigasQueue_dequeue "dequeue" + /// Synonym for \p dequeue() value_type * pop() { return dequeue(); @@ -325,7 +397,7 @@ namespace cds { namespace intrusive { while ( is_free( tt ) ) { if ( th != m_nHead.load(memory_model::memory_order_relaxed) ) goto TryAgain; - // two consecutive NULL means queue empty + // two consecutive nullptr means queue empty if ( temp == m_nTail.load(memory_model::memory_order_relaxed) ) return true; temp = ( temp + 1 ) & nModulo; @@ -336,35 +408,47 @@ namespace cds { namespace intrusive { /// Clears queue in lock-free manner. /** - \p f parameter is a functor to dispose removed items. - The interface of \p DISPOSER is: + \p f parameter is a functor to dispose removed items: \code - struct myDisposer { - void operator ()( T * val ); - }; + myQueue.clear( []( value_type * p ) { delete p; } ); \endcode - You can pass \p disposer by reference using \p boost::ref. - The disposer will be called immediately for each item. */ template void clear( Disposer f ) { value_type * pv; while ( (pv = pop()) != nullptr ) { - unref(f)( pv ); + f( pv ); } } /// Clears the queue /** - This function uses the disposer that is specified in \p Options. + This function uses the disposer that is specified in \p Traits, + see \p tsigas_queue::traits::disposer. */ void clear() { clear( disposer() ); } + + /// Returns queue's item count + /** + The value returned depends on \p tsigas_queue::traits::item_counter. + For \p atomicity::empty_item_counter, the function always returns 0. + */ + size_t size() const CDS_NOEXCEPT + { + return m_ItemCounter.value(); + } + + /// Returns capacity of internal cyclic buffer + size_t CDS_CONSTEXPR capacity() const CDS_NOEXCEPT + { + return buffer_capacity() - 2; + } }; }} // namespace cds::intrusive -#endif // #ifndef __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H +#endif // #ifndef CDSLIB_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H