Added wait strategies to flat combining technique
[libcds.git] / cds / container / fcdeque.h
index d14328b60381e31864ef0aedc0dac21058bf26ce..fc28a24e554954166b73a63c0379b9f7e0979e59 100644 (file)
@@ -94,13 +94,8 @@ namespace cds { namespace container {
         /// Metafunction converting option list to traits
         /**
             \p Options are:
-            - \p opt::lock_type - mutex type, default is \p cds::sync::spin
-            - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
-            - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
+            - any \p cds::algo::flat_combining::make_traits options
             - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
-            - \p opt::memory_model - C++ memory ordering model.
-                List of all available memory ordering see opt::memory_model.
-                Default if cds::opt::v:relaxed_ordering
             - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
                 By default, the elimination is disabled. For queue, the elimination is possible if the queue
                 is empty.
@@ -161,8 +156,7 @@ namespace cds { namespace container {
             op_push_back_move,      ///< Push back (move semantics)
             op_pop_front,           ///< Pop front
             op_pop_back,            ///< Pop back
-            op_clear,               ///< Clear
-            op_empty                ///< Empty
+            op_clear                ///< Clear
         };
 
         /// Flat combining publication list record
@@ -181,8 +175,8 @@ namespace cds { namespace container {
 
     protected:
         //@cond
-        fc_kernel   m_FlatCombining;
-        deque_type  m_Deque;
+        mutable fc_kernel m_FlatCombining;
+        deque_type        m_Deque;
         //@endcond
 
     public:
@@ -206,7 +200,7 @@ namespace cds { namespace container {
             value_type const& val ///< Value to be copied to inserted element
         )
         {
-            fc_record * pRec = m_FlatCombining.acquire_record();
+            auto pRec = m_FlatCombining.acquire_record();
             pRec->pValPush = &val;
 
             if ( c_bEliminationEnabled )
@@ -228,7 +222,7 @@ namespace cds { namespace container {
             value_type&& val ///< Value to be moved to inserted element
         )
         {
-            fc_record * pRec = m_FlatCombining.acquire_record();
+            auto pRec = m_FlatCombining.acquire_record();
             pRec->pValPush = &val;
 
             if ( c_bEliminationEnabled )
@@ -250,7 +244,7 @@ namespace cds { namespace container {
             value_type const& val ///< Value to be copied to inserted element
         )
         {
-            fc_record * pRec = m_FlatCombining.acquire_record();
+            auto pRec = m_FlatCombining.acquire_record();
             pRec->pValPush = &val;
 
             if ( c_bEliminationEnabled )
@@ -272,7 +266,7 @@ namespace cds { namespace container {
             value_type&& val ///< Value to be moved to inserted element
         )
         {
-            fc_record * pRec = m_FlatCombining.acquire_record();
+            auto pRec = m_FlatCombining.acquire_record();
             pRec->pValPush = &val;
 
             if ( c_bEliminationEnabled )
@@ -295,7 +289,7 @@ namespace cds { namespace container {
             value_type& val ///< Target to be received the copy of removed element
         )
         {
-            fc_record * pRec = m_FlatCombining.acquire_record();
+            auto pRec = m_FlatCombining.acquire_record();
             pRec->pValPop = &val;
 
             if ( c_bEliminationEnabled )
@@ -318,7 +312,7 @@ namespace cds { namespace container {
             value_type& val ///< Target to be received the copy of removed element
         )
         {
-            fc_record * pRec = m_FlatCombining.acquire_record();
+            auto pRec = m_FlatCombining.acquire_record();
             pRec->pValPop = &val;
 
             if ( c_bEliminationEnabled )
@@ -335,7 +329,7 @@ namespace cds { namespace container {
         /// Clears the deque
         void clear()
         {
-            fc_record * pRec = m_FlatCombining.acquire_record();
+            auto pRec = m_FlatCombining.acquire_record();
 
             if ( c_bEliminationEnabled )
                 m_FlatCombining.batch_combine( op_clear, pRec, *this );
@@ -361,18 +355,12 @@ namespace cds { namespace container {
         /**
             If the combining is in process the function waits while combining done.
         */
-        bool empty()
+        bool empty() const
         {
-            fc_record * pRec = m_FlatCombining.acquire_record();
-
-            if ( c_bEliminationEnabled )
-                m_FlatCombining.batch_combine( op_empty, pRec, *this );
-            else
-                m_FlatCombining.combine( op_empty, pRec, *this );
-
-            assert( pRec->is_done() );
-            m_FlatCombining.release_record( pRec );
-            return pRec->bEmpty;
+            bool bRet = false;
+            auto const& deq = m_Deque;
+            m_FlatCombining.invoke_exclusive( [&deq, &bRet]() { bRet = deq.empty(); } );
+            return bRet;
         }
 
         /// Internal statistics
@@ -417,7 +405,7 @@ namespace cds { namespace container {
                 assert( pRec->pValPop );
                 pRec->bEmpty = m_Deque.empty();
                 if ( !pRec->bEmpty ) {
-                    *(pRec->pValPop) = m_Deque.front();
+                    *(pRec->pValPop) = std::move( m_Deque.front());
                     m_Deque.pop_front();
                 }
                 break;
@@ -425,7 +413,7 @@ namespace cds { namespace container {
                 assert( pRec->pValPop );
                 pRec->bEmpty = m_Deque.empty();
                 if ( !pRec->bEmpty ) {
-                    *(pRec->pValPop) = m_Deque.back();
+                    *(pRec->pValPop) = std::move( m_Deque.back());
                     m_Deque.pop_back();
                 }
                 break;
@@ -433,9 +421,6 @@ namespace cds { namespace container {
                 while ( !m_Deque.empty() )
                     m_Deque.pop_front();
                 break;
-            case op_empty:
-                pRec->bEmpty = m_Deque.empty();
-                break;
             default:
                 assert(false);
                 break;
@@ -454,20 +439,28 @@ namespace cds { namespace container {
             for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
                 switch ( it->op() ) {
                 case op_push_front:
+                    if ( itPrev != itEnd
+                        && (itPrev->op() == op_pop_front || (m_Deque.empty() && itPrev->op() == op_pop_back)) )
+                    {
+                        collide( *it, *itPrev );
+                        itPrev = itEnd;
+                    }
+                    else
+                        itPrev = it;
+                    break;
                 case op_push_front_move:
                     if ( itPrev != itEnd
                       && (itPrev->op() == op_pop_front || ( m_Deque.empty() && itPrev->op() == op_pop_back )))
                     {
-                        collide( *it, *itPrev );
+                        collide_move( *it, *itPrev );
                         itPrev = itEnd;
                     }
                     else
                         itPrev = it;
                     break;
                 case op_push_back:
-                case op_push_back_move:
                     if ( itPrev != itEnd
-                        && (itPrev->op() == op_pop_back || ( m_Deque.empty() && itPrev->op() == op_pop_front )))
+                        && (itPrev->op() == op_pop_back || (m_Deque.empty() && itPrev->op() == op_pop_front)) )
                     {
                         collide( *it, *itPrev );
                         itPrev = itEnd;
@@ -475,24 +468,84 @@ namespace cds { namespace container {
                     else
                         itPrev = it;
                     break;
-                case op_pop_front:
+                case op_push_back_move:
                     if ( itPrev != itEnd
-                        && ( itPrev->op() == op_push_front || itPrev->op() == op_push_front_move
-                          || ( m_Deque.empty() && ( itPrev->op() == op_push_back || itPrev->op() == op_push_back_move ))))
+                        && (itPrev->op() == op_pop_back || ( m_Deque.empty() && itPrev->op() == op_pop_front )))
                     {
-                        collide( *itPrev, *it );
+                        collide_move( *it, *itPrev );
                         itPrev = itEnd;
                     }
                     else
                         itPrev = it;
                     break;
+                case op_pop_front:
+                    if ( itPrev != itEnd ) {
+                        if ( m_Deque.empty() ) {
+                            switch ( itPrev->op() ) {
+                            case op_push_back:
+                                collide( *itPrev, *it );
+                                itPrev = itEnd;
+                                break;
+                            case op_push_back_move:
+                                collide_move( *itPrev, *it );
+                                itPrev = itEnd;
+                                break;
+                            default:
+                                itPrev = it;
+                                break;
+                            }
+                        }
+                        else {
+                            switch ( itPrev->op() ) {
+                            case op_push_front:
+                                collide( *itPrev, *it );
+                                itPrev = itEnd;
+                                break;
+                            case op_push_front_move:
+                                collide_move( *itPrev, *it );
+                                itPrev = itEnd;
+                                break;
+                            default:
+                                itPrev = it;
+                                break;
+                            }
+                        }
+                    }
+                    else
+                        itPrev = it;
+                    break;
                 case op_pop_back:
-                    if ( itPrev != itEnd
-                        && ( itPrev->op() == op_push_back || itPrev->op() == op_push_back_move
-                        || ( m_Deque.empty() && ( itPrev->op() == op_push_front || itPrev->op() == op_push_front_move ))))
-                    {
-                        collide( *itPrev, *it );
-                        itPrev = itEnd;
+                    if ( itPrev != itEnd ) {
+                        if ( m_Deque.empty() ) {
+                            switch ( itPrev->op() ) {
+                            case op_push_front:
+                                collide( *itPrev, *it );
+                                itPrev = itEnd;
+                                break;
+                            case op_push_front_move:
+                                collide_move( *itPrev, *it );
+                                itPrev = itEnd;
+                                break;
+                            default:
+                                itPrev = it;
+                                break;
+                            }
+                        }
+                        else {
+                            switch ( itPrev->op() ) {
+                            case op_push_back:
+                                collide( *itPrev, *it );
+                                itPrev = itEnd;
+                                break;
+                            case op_push_back_move:
+                                collide_move( *itPrev, *it );
+                                itPrev = itEnd;
+                                break;
+                            default:
+                                itPrev = it;
+                                break;
+                            }
+                        }
                     }
                     else
                         itPrev = it;
@@ -513,6 +566,15 @@ namespace cds { namespace container {
             m_FlatCombining.operation_done( recPop );
             m_FlatCombining.internal_statistics().onCollide();
         }
+
+        void collide_move( fc_record& recPush, fc_record& recPop )
+        {
+            *(recPop.pValPop) = std::move( *(recPush.pValPush));
+            recPop.bEmpty = false;
+            m_FlatCombining.operation_done( recPush );
+            m_FlatCombining.operation_done( recPop );
+            m_FlatCombining.internal_statistics().onCollide();
+        }
         //@endcond
     };