Move libcds 1.6.0 from SVN
[libcds.git] / cds / container / fcpriority_queue.h
diff --git a/cds/container/fcpriority_queue.h b/cds/container/fcpriority_queue.h
new file mode 100644 (file)
index 0000000..740b7c5
--- /dev/null
@@ -0,0 +1,293 @@
+//$$CDS-header$$
+
+#ifndef __CDS_CONTAINER_FCPRIORITY_QUEUE_H
+#define __CDS_CONTAINER_FCPRIORITY_QUEUE_H
+
+#include <cds/algo/flat_combining.h>
+#include <cds/algo/elimination_opt.h>
+#include <queue>
+
+namespace cds { namespace container {
+
+    /// FCPriorityQueue related definitions
+    /** @ingroup cds_nonintrusive_helper
+    */
+    namespace fcpqueue {
+
+        /// FCPriorityQueue internal statistics
+        template <typename Counter = cds::atomicity::event_counter >
+        struct stat: public cds::algo::flat_combining::stat<Counter>
+        {
+            typedef cds::algo::flat_combining::stat<Counter>    flat_combining_stat; ///< Flat-combining statistics
+            typedef typename flat_combining_stat::counter_type  counter_type;        ///< Counter type
+
+            counter_type    m_nPush     ;  ///< Count of push operations
+            counter_type    m_nPushMove ;  ///< Count of push operations with move semantics
+            counter_type    m_nPop      ;  ///< Count of success pop operations
+            counter_type    m_nFailedPop;  ///< Count of failed pop operations (pop from empty queue)
+
+            //@cond
+            void    onPush()             { ++m_nPush; }
+            void    onPushMove()         { ++m_nPushMove; }
+            void    onPop( bool bFailed ) { if ( bFailed ) ++m_nFailedPop; else ++m_nPop;  }
+            //@endcond
+        };
+
+        /// FCPriorityQueue dummy statistics, no overhead
+        struct empty_stat: public cds::algo::flat_combining::empty_stat
+        {
+            //@cond
+            void    onPush()       {}
+            void    onPushMove()   {}
+            void    onPop(bool)    {}
+            //@endcond
+        };
+
+        /// FCPriorityQueue type traits
+        struct type_traits: public cds::algo::flat_combining::type_traits
+        {
+            typedef empty_stat      stat;   ///< Internal statistics
+        };
+
+        /// Metafunction converting option list to traits
+        /**
+            This is a wrapper for <tt> cds::opt::make_options< type_traits, Options...> </tt>
+            \p Options are:
+            - \p opt::lock_type - mutex type, default is \p cds::lock::Spin
+            - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::Default
+            - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
+            - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
+            - \p opt::memory_model - C++ memory ordering model.
+                List of all available memory ordering see opt::memory_model.
+                Default is cds::opt::v:relaxed_ordering
+        */
+        template <CDS_DECL_OPTIONS7>
+        struct make_traits {
+#   ifdef CDS_DOXYGEN_INVOKED
+            typedef implementation_defined type ;   ///< Metafunction result
+#   else
+            typedef typename cds::opt::make_options<
+                typename cds::opt::find_type_traits< type_traits, CDS_OPTIONS7 >::type
+                ,CDS_OPTIONS7
+            >::type   type;
+#   endif
+        };
+
+    } // namespace fcpqueue
+
+    /// Flat-combining priority queue
+    /**
+        @ingroup cds_nonintrusive_priority_queue
+        @ingroup cds_flat_combining_container
+
+        \ref cds_flat_combining_description "Flat combining" sequential priority queue.
+        The class can be considered as a concurrent FC-based wrapper for \p std::priority_queue.
+
+        Template parameters:
+        - \p T - a value type stored in the queue
+        - \p PriorityQueue - sequential priority queue implementation, default is \p std::priority_queue<T>
+        - \p Traits - type traits of flat combining, default is \p fcpqueue::type_traits.
+            \p fcpqueue::make_traits metafunction can be used to construct specialized \p %type_traits
+    */
+    template <typename T,
+        class PriorityQueue = std::priority_queue<T>,
+        typename Traits = fcpqueue::type_traits
+    >
+    class FCPriorityQueue
+#ifndef CDS_DOXYGEN_INVOKED
+        : public cds::algo::flat_combining::container
+#endif
+    {
+    public:
+        typedef T               value_type;          ///< Value type
+        typedef PriorityQueue   priority_queue_type; ///< Sequential priority queue class
+        typedef Traits          type_traits;         ///< Priority queue type traits
+
+        typedef typename type_traits::stat  stat;    ///< Internal statistics type
+
+    protected:
+        //@cond
+        // Priority queue operation IDs
+        enum fc_operation {
+            op_push = cds::algo::flat_combining::req_Operation,
+            op_push_move,
+            op_pop,
+            op_clear
+        };
+
+        // Flat combining publication list record
+        struct fc_record: public cds::algo::flat_combining::publication_record
+        {
+            union {
+                value_type const *  pValPush; // Value to push
+                value_type *        pValPop;  // Pop destination
+            };
+            bool            bEmpty; // true if the queue is empty
+        };
+        //@endcond
+
+        /// Flat combining kernel
+        typedef cds::algo::flat_combining::kernel< fc_record, type_traits > fc_kernel;
+
+    protected:
+        //@cond
+        fc_kernel               m_FlatCombining;
+        priority_queue_type     m_PQueue;
+        //@endcond
+
+    public:
+        /// Initializes empty priority queue object
+        FCPriorityQueue()
+        {}
+
+        /// Initializes empty priority queue object and gives flat combining parameters
+        FCPriorityQueue(
+            unsigned int nCompactFactor     ///< Flat combining: publication list compacting factor
+            ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
+            )
+            : m_FlatCombining( nCompactFactor, nCombinePassCount )
+        {}
+
+        /// Inserts a new element in the priority queue
+        /**
+            The function always returns \p true
+        */
+        bool push(
+            value_type const& val ///< Value to be copied to inserted element
+        )
+        {
+            fc_record * pRec = m_FlatCombining.acquire_record();
+            pRec->pValPush = &val;
+
+            m_FlatCombining.combine( op_push, pRec, *this );
+
+            assert( pRec->is_done() );
+            m_FlatCombining.release_record( pRec );
+            m_FlatCombining.internal_statistics().onPush();
+            return true;
+        }
+
+#   ifdef CDS_MOVE_SEMANTICS_SUPPORT
+        /// Inserts a new element in the priority queue (move semantics)
+        /**
+            The function always returns \p true
+        */
+        bool push(
+            value_type&& val ///< Value to be moved to inserted element
+        )
+        {
+            fc_record * pRec = m_FlatCombining.acquire_record();
+            pRec->pValPush = &val;
+
+            m_FlatCombining.combine( op_push_move, pRec, *this );
+
+            assert( pRec->is_done() );
+            m_FlatCombining.release_record( pRec );
+            m_FlatCombining.internal_statistics().onPushMove();
+            return true;
+        }
+#   endif
+
+        /// Removes the top element from priority queue
+        /**
+            The function returns \p false if the queue is empty, \p true otherwise.
+            If the queue is empty \p val is not changed.
+        */
+        bool pop(
+            value_type& val ///< Target to be received the copy of top element
+        )
+        {
+            fc_record * pRec = m_FlatCombining.acquire_record();
+            pRec->pValPop = &val;
+
+            m_FlatCombining.combine( op_pop, pRec, *this );
+
+            assert( pRec->is_done() );
+            m_FlatCombining.release_record( pRec );
+            m_FlatCombining.internal_statistics().onPop( pRec->bEmpty );
+            return !pRec->bEmpty;
+        }
+
+        /// Clears the priority queue
+        void clear()
+        {
+            fc_record * pRec = m_FlatCombining.acquire_record();
+
+           m_FlatCombining.combine( op_clear, pRec, *this );
+
+            assert( pRec->is_done() );
+            m_FlatCombining.release_record( pRec );
+        }
+
+        /// Returns the number of elements in the priority queue.
+        /**
+            Note that <tt>size() == 0</tt> does not mean that the queue is empty because
+            combining record can be in process.
+            To check emptiness use \ref empty function.
+        */
+        size_t size() const
+        {
+            return m_PQueue.size();
+        }
+
+        /// Checks if the priority queue is empty
+        /**
+            If the combining is in process the function waits while combining done.
+        */
+        bool empty() const
+        {
+            m_FlatCombining.wait_while_combining();
+            return m_PQueue.empty();
+        }
+
+        /// Internal statistics
+        stat const& statistics() const
+        {
+            return m_FlatCombining.statistics();
+        }
+
+    public: // flat combining cooperation, not for direct use!
+        //@cond
+        /*
+            The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
+            object if the current thread becomes a combiner. Invocation of the function means that
+            the priority queue should perform an action recorded in \p pRec.
+        */
+        void fc_apply( fc_record * pRec )
+        {
+            assert( pRec );
+
+            switch ( pRec->op() ) {
+            case op_push:
+                assert( pRec->pValPush );
+                m_PQueue.push( *(pRec->pValPush) );
+                break;
+#       ifdef CDS_MOVE_SEMANTICS_SUPPORT
+            case op_push_move:
+                assert( pRec->pValPush );
+                m_PQueue.push( std::move( *(pRec->pValPush )) );
+                break;
+#       endif
+            case op_pop:
+                assert( pRec->pValPop );
+                pRec->bEmpty = m_PQueue.empty();
+                if ( !pRec->bEmpty ) {
+                    *(pRec->pValPop) = m_PQueue.top();
+                    m_PQueue.pop();
+                }
+                break;
+            case op_clear:
+                while ( !m_PQueue.empty() )
+                    m_PQueue.pop();
+                break;
+            default:
+                assert(false);
+                break;
+            }
+        }
+        //@endcond
+    };
+
+}} // namespace cds::container
+
+#endif // #ifndef __CDS_CONTAINER_FCPRIORITY_QUEUE_H