X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=cds%2Fcontainer%2Fvyukov_mpmc_cycle_queue.h;h=50b787c819d90879e5cc93488d020e966f7adc6e;hb=f31988b031453d7fdf7fe212f966554fa558af3e;hp=6bde66419f41b6cb6021c1615590e43ff1abe9ad;hpb=92242dd6abd8fb9b72bc44f20a1ffa609881dc9a;p=libcds.git diff --git a/cds/container/vyukov_mpmc_cycle_queue.h b/cds/container/vyukov_mpmc_cycle_queue.h index 6bde6641..50b787c8 100644 --- a/cds/container/vyukov_mpmc_cycle_queue.h +++ b/cds/container/vyukov_mpmc_cycle_queue.h @@ -1,4 +1,32 @@ -//$$CDS-header$$ +/* + This file is a part of libcds - Concurrent Data Structures library + + (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016 + + Source code repo: http://github.com/khizmax/libcds/ + Download: http://sourceforge.net/projects/libcds/files/ + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H @@ -21,17 +49,21 @@ namespace cds { namespace container { /// Buffer type for internal array /* The type of element for the buffer is not important: the queue rebinds - buffer for required type via \p rebind metafunction. + the buffer for required type via \p rebind metafunction. For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size. + + You should use only uninitialized buffer for the queue - + \p cds::opt::v::uninitialized_dynamic_buffer (the default), + \p cds::opt::v::uninitialized_static_buffer. */ - typedef cds::opt::v::dynamic_buffer< void * > buffer; + typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer; /// A functor to clean item dequeued. /** - The functor calls the destructor for queue item. + The functor calls the destructor for queue item. After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied. - If \p T is a complex type, \p value_cleaner may be the useful feature. + If \p T is a complex type, \p value_cleaner may be useful feature. Default value is \ref opt::v::destruct_cleaner */ @@ -43,25 +75,38 @@ namespace cds { namespace container { /// C++ memory ordering model /** Can be \p opt::v::relaxed_ordering (relaxed memory model, the default) - or \p opt::v::sequential_consistent (sequentially consisnent memory model). + or \p opt::v::sequential_consistent (sequentially consistent memory model). */ typedef opt::v::relaxed_ordering memory_model; /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding enum { padding = opt::cache_line_padding }; + + /// Back-off strategy + typedef cds::backoff::Default back_off; + + /// Single-consumer version + /** + For single-consumer version of algorithm some additional functions + (\p front(), \p pop_front()) is available. + + Default is \p false + */ + static CDS_CONSTEXPR bool const single_consumer = false; }; /// Metafunction converting option list to \p vyukov_queue::traits /** Supported \p Options are: - - \p opt::buffer - the buffer type for internal cyclic array. Possible types are: - \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of + - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are: + \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of element in the buffer is not important: it will be changed via \p rebind metafunction. - \p opt::value_cleaner - a functor to clean item dequeued. The functor calls the destructor for queue item. After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied. If \p T is a complex type, \p value_cleaner can be an useful feature. Default value is \ref opt::v::destruct_cleaner + - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used. - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled) To enable item counting use \p cds::atomicity::item_counter - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding @@ -72,7 +117,7 @@ namespace cds { namespace container { \code typedef cds::container::VyukovMPMCCycleQueue< Foo, typename cds::container::vyukov_queue::make_traits< - cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >, + cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >, cds::opt::item_counte< cds::atomicity::item_counter > >::type > myQueue; @@ -103,22 +148,25 @@ namespace cds { namespace container { No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue), i.e. do not touch the same data while queue is not empty. + There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue + that supports \p front() and \p pop_front() functions. + Source: - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue Template parameters - \p T - type stored in queue. - - \p Traits - queue traits, default is \p vykov_queue::traits. You can use \p vykov_queue::make_traits - metafunction to make your traits or just derive your traits from \p %vykov_queue::traits: + - \p Traits - queue traits, default is \p vyukov_queue::traits. You can use \p vyukov_queue::make_traits + metafunction to make your traits or just derive your traits from \p %vyukov_queue::traits: \code - struct myTraits: public cds::container::vykov_queue::traits { + struct myTraits: public cds::container::vyukov_queue::traits { typedef cds::atomicity::item_counter item_counter; }; typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue; // Equivalent make_traits example: typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo, - typename cds::container::vykov_queue::make_traits< + typename cds::container::vyukov_queue::make_traits< cds::opt::item_counter< cds::atomicity::item_counter > >::type > myQueue; @@ -134,8 +182,12 @@ namespace cds { namespace container { typedef T value_type; ///< Value type to be stored in the queue typedef Traits traits; ///< Queue traits typedef typename traits::item_counter item_counter; ///< Item counter type - typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option + typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner + typedef typename traits::back_off back_off; ///< back-off strategy + + /// \p true for single-consumer version, \p false otherwise + static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer; /// Rebind template arguments template @@ -161,8 +213,8 @@ namespace cds { namespace container { protected: //@cond buffer m_buffer; - typename opt::details::apply_padding< buffer, traits::padding >::padding_type pad1_; size_t const m_nBufferMask; + typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_; sequence_type m_posEnqueue; typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_; sequence_type m_posDequeue; @@ -173,7 +225,7 @@ namespace cds { namespace container { public: /// Constructs the queue of capacity \p nCapacity /** - For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored. + For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored. The buffer capacity must be the power of two. */ @@ -214,8 +266,9 @@ namespace cds { namespace container { bool enqueue_with(Func f) { cell_type* cell; - size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed); + back_off bkoff; + size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed); for (;;) { cell = &m_buffer[pos & m_nBufferMask]; @@ -227,8 +280,13 @@ namespace cds { namespace container { if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed )) break; } - else if (dif < 0) - return false; + else if (dif < 0) { + // Queue full? + if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity() ) + return false; // queue full + bkoff(); + pos = m_posEnqueue.load( memory_model::memory_order_relaxed ); + } else pos = m_posEnqueue.load(memory_model::memory_order_relaxed); } @@ -251,12 +309,24 @@ namespace cds { namespace container { return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); }); } - /// Synonym for \p enqueue() + /// Enqueues \p val value into the queue, move semantics + bool enqueue( value_type&& val ) + { + return enqueue_with( [&val]( value_type& dest ) { new (&dest) value_type( std::move( val ));}); + } + + /// Synonym for \p enqueue( valuetype const& ) bool push( value_type const& data ) { return enqueue( data ); } + /// Synonym for \p enqueue( value_type&& ) + bool push( value_type&& data ) + { + return enqueue( std::move( data )); + } + /// Synonym for \p enqueue_with() template bool push_with( Func f ) @@ -267,13 +337,14 @@ namespace cds { namespace container { /// Enqueues data of type \ref value_type constructed with std::forward(args)... template bool emplace( Args&&... args ) - { + { #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900) - //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda. - return enqueue_with ( std::bind([]( value_type& dest,Args ... args ){ new ( &dest ) value_type( std::forward(args)... );}, std::placeholders::_1 ,args...)); -#else - return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward(args)... ); }); -#endif + //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda. + value_type val( std::forward(args)... ); + return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( std::move( val )); }); +#else + return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward( args )... ); }); +#endif } /// Dequeues a value using a functor @@ -291,8 +362,9 @@ namespace cds { namespace container { bool dequeue_with( Func f ) { cell_type * cell; - size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed); + back_off bkoff; + size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed ); for (;;) { cell = &m_buffer[pos & m_nBufferMask]; @@ -303,8 +375,13 @@ namespace cds { namespace container { if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed)) break; } - else if (dif < 0) - return false; + else if (dif < 0) { + // Queue empty? + if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 ) + return false; // queue empty + bkoff(); + pos = m_posDequeue.load( memory_model::memory_order_relaxed ); + } else pos = m_posDequeue.load(memory_model::memory_order_relaxed); } @@ -319,13 +396,13 @@ namespace cds { namespace container { /// Dequeues a value from the queue /** - If queue is not empty, the function returns \p true, \p dest contains copy of + If queue is not empty, the function returns \p true, \p dest contains a copy of dequeued value. The assignment operator for type \ref value_type is invoked. If queue is empty, the function returns \p false, \p dest is unchanged. */ - bool dequeue(value_type & dest ) + bool dequeue(value_type& dest ) { - return dequeue_with( [&dest]( value_type& src ){ dest = src; } ); + return dequeue_with( [&dest]( value_type& src ){ dest = std::move( src );}); } /// Synonym for \p dequeue() @@ -341,12 +418,50 @@ namespace cds { namespace container { return dequeue_with( f ); } + /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version) + template + typename std::enable_if::type front() + { + static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true"); + + cell_type * cell; + back_off bkoff; + + size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed ); + for ( ;;) + { + cell = &m_buffer[pos & m_nBufferMask]; + size_t seq = cell->sequence.load( memory_model::memory_order_acquire ); + intptr_t dif = static_cast(seq) - static_cast(pos + 1); + + if ( dif == 0 ) + return &cell->data; + else if ( dif < 0 ) { + // Queue empty? + if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 ) + return nullptr; // queue empty + bkoff(); + pos = m_posDequeue.load( memory_model::memory_order_relaxed ); + } + else + pos = m_posDequeue.load( memory_model::memory_order_relaxed ); + } + } + + /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version) + template + typename std::enable_if::type pop_front() + { + return dequeue_with( []( value_type& ) {} ); + } + /// Checks if the queue is empty bool empty() const { const cell_type * cell; - size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed); + back_off bkoff; + size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed); for (;;) { cell = &m_buffer[pos & m_nBufferMask]; @@ -355,10 +470,12 @@ namespace cds { namespace container { if (dif == 0) return false; - else if (dif < 0) - return true; - else - pos = m_posDequeue.load(memory_model::memory_order_relaxed); + else if (dif < 0) { + if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 ) + return true; + } + bkoff(); + pos = m_posDequeue.load(memory_model::memory_order_relaxed); } } @@ -385,6 +502,22 @@ namespace cds { namespace container { return m_buffer.capacity(); } }; + + //@cond + namespace vyukov_queue { + + template + struct single_consumer_traits : public Traits + { + static CDS_CONSTEXPR bool const single_consumer = true; + }; + } // namespace vyukov_queue + //@endcond + + /// Vyukov's queue multiple producer - single consumer version + template + using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits >; + }} // namespace cds::container #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H