Mutex-based flat-combining wait strategy: try to reduce starvation
authorkhizmax <libcds.dev@gmail.com>
Sat, 11 Mar 2017 07:29:39 +0000 (10:29 +0300)
committerkhizmax <libcds.dev@gmail.com>
Sat, 11 Mar 2017 07:29:39 +0000 (10:29 +0300)
cds/algo/flat_combining/kernel.h
cds/algo/flat_combining/wait_strategy.h

index 5e333e157bf4daeb1205285e2f715c3af4db126d..c4c2ce23aea9f9533d1d167761d536e4d7cd6010 100644 (file)
@@ -422,7 +422,7 @@ namespace cds { namespace algo {
 
             /// Marks \p rec as executed
             /**
-                This function should be called by container if \p batch_combine mode is used.
+                This function should be called by container if \p batch_combine() mode is used.
                 For usual combining (see \p combine()) this function is excess.
             */
             void operation_done( publication_record& rec )
index 947699d36debb88ccd480a2deaa35f765cc4b995..8a4a567beb72c45047493e5b028c57f0776a487a 100644 (file)
@@ -212,8 +212,9 @@ namespace cds { namespace algo { namespace flat_combining {
         class single_mutex_single_condvar
         {
         //@cond
-            std::mutex m_mutex;
+            std::mutex  m_mutex;
             std::condition_variable m_condvar;
+            bool        m_wakeup;
 
             typedef std::unique_lock< std::mutex > unique_lock;
         //@endcond
@@ -229,6 +230,11 @@ namespace cds { namespace algo { namespace flat_combining {
                 typedef PublicationRecord type; ///< publication record type
             };
 
+            /// Default ctor
+            single_mutex_single_condvar()
+                : m_wakeup( false )
+            {}
+
             /// Does nothing
             template <typename PublicationRecord>
             void prepare( PublicationRecord& /*rec*/ )
@@ -240,23 +246,33 @@ namespace cds { namespace algo { namespace flat_combining {
             {
                 if ( fc.get_operation( rec ) >= req_Operation ) {
                     unique_lock lock( m_mutex );
-                    if ( fc.get_operation( rec ) >= req_Operation )
-                        return m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds )) == std::cv_status::no_timeout;
+                    if ( fc.get_operation( rec ) >= req_Operation ) {
+                        if ( m_wakeup ) {
+                            m_wakeup = false;
+                            return true;
+                        }
+
+                        bool ret = m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds ) ) == std::cv_status::no_timeout;
+                        m_wakeup = false;
+                        return ret;
+                    }
                 }
                 return false;
             }
 
             /// Calls condition variable function \p notify_all()
             template <typename FCKernel, typename PublicationRecord>
-            void notify( FCKernel& /*fc*/, PublicationRecord& /*rec*/ )
+            void notify( FCKernel& fc, PublicationRecord& /*rec*/ )
             {
-                m_condvar.notify_all();
+                wakeup( fc );
             }
 
             /// Calls condition variable function \p notify_all()
             template <typename FCKernel>
             void wakeup( FCKernel& /*fc*/ )
             {
+                unique_lock lock( m_mutex );
+                m_wakeup = true;
                 m_condvar.notify_all();
             }
         };
@@ -272,7 +288,8 @@ namespace cds { namespace algo { namespace flat_combining {
         class single_mutex_multi_condvar
         {
         //@cond
-            std::mutex m_mutex;
+            std::mutex  m_mutex;
+            bool        m_wakeup;
 
             typedef std::unique_lock< std::mutex > unique_lock;
         //@endcond
@@ -294,6 +311,11 @@ namespace cds { namespace algo { namespace flat_combining {
                 };
             };
 
+            /// Default ctor
+            single_mutex_multi_condvar()
+                : m_wakeup( false )
+            {}
+
             /// Does nothing
             template <typename PublicationRecord>
             void prepare( PublicationRecord& /*rec*/ )
@@ -305,8 +327,17 @@ namespace cds { namespace algo { namespace flat_combining {
             {
                 if ( fc.get_operation( rec ) >= req_Operation ) {
                     unique_lock lock( m_mutex );
-                    if ( fc.get_operation( rec ) >= req_Operation )
-                        return rec.m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds )) == std::cv_status::no_timeout;
+
+                    if ( fc.get_operation( rec ) >= req_Operation ) {
+                        if ( m_wakeup ) {
+                            m_wakeup = false;
+                            return true;
+                        }
+
+                        bool ret = rec.m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds ) ) == std::cv_status::no_timeout;
+                        m_wakeup = false;
+                        return ret;
+                    }
                 }
                 return false;
             }
@@ -315,6 +346,8 @@ namespace cds { namespace algo { namespace flat_combining {
             template <typename FCKernel, typename PublicationRecord>
             void notify( FCKernel& /*fc*/, PublicationRecord& rec )
             {
+                unique_lock lock( m_mutex );
+                m_wakeup = true;
                 rec.m_condvar.notify_one();
             }
 
@@ -351,6 +384,11 @@ namespace cds { namespace algo { namespace flat_combining {
                     //@cond
                     std::mutex              m_mutex;
                     std::condition_variable m_condvar;
+                    bool                    m_wakeup;
+
+                    type()
+                        : m_wakeup( false )
+                    {}
                     //@endcond
                 };
             };
@@ -366,8 +404,17 @@ namespace cds { namespace algo { namespace flat_combining {
             {
                 if ( fc.get_operation( rec ) >= req_Operation ) {
                     unique_lock lock( rec.m_mutex );
-                    if ( fc.get_operation( rec ) >= req_Operation )
-                        return rec.m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds )) == std::cv_status::no_timeout;
+
+                    if ( fc.get_operation( rec ) >= req_Operation ) {
+                        if ( rec.m_wakeup ) {
+                            rec.m_wakeup = false;
+                            return true;
+                        }
+
+                        bool ret = rec.m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds ) ) == std::cv_status::no_timeout;
+                        rec.m_wakeup = false;
+                        return ret;
+                    }
                 }
                 return false;
             }
@@ -376,6 +423,8 @@ namespace cds { namespace algo { namespace flat_combining {
             template <typename FCKernel, typename PublicationRecord>
             void notify( FCKernel& /*fc*/, PublicationRecord& rec )
             {
+                unique_lock lock( rec.m_mutex );
+                rec.m_wakeup = true;
                 rec.m_condvar.notify_one();
             }