Uses different pass count for different parallel queue test cases
[libcds.git] / cds / urcu / dispose_thread.h
index 46baa8fa7c67029b5b83e19daa348074eda07b05..a83376ba5fa3eb19601285db0ae6e163becc6412 100644 (file)
@@ -1,4 +1,32 @@
-//$$CDS-header$$1
+/*
+    This file is a part of libcds - Concurrent Data Structures library
+
+    (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
+
+    Source code repo: http://github.com/khizmax/libcds/
+    Download: http://sourceforge.net/projects/libcds/files/
+
+    Redistribution and use in source and binary forms, with or without
+    modification, are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright notice, this
+      list of conditions and the following disclaimer.
+
+    * Redistributions in binary form must reproduce the above copyright notice,
+      this list of conditions and the following disclaimer in the documentation
+      and/or other materials provided with the distribution.
+
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+    AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+    IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+    FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+    SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+    OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
 
 #ifndef CDSLIB_URCU_DISPOSE_THREAD_H
 #define CDSLIB_URCU_DISPOSE_THREAD_H
 #include <mutex>
 #include <condition_variable>
 #include <cds/details/aligned_type.h>
+#include <cds/algo/atomic.h>
 
 namespace cds { namespace urcu {
 
-    /// Reclamation thread for \p general_threaded and \p signal_threaded URCU
+    /// Reclamation thread for \p general_threaded URCU
     /**
         The object of this class contains a reclamation thread object and
         necessary synchronization object(s). The object manages reclamation thread
         and defines a set of messages (i.e. methods) to communicate with the thread.
 
-        Template argument \p Buffer defines the buffer type of \ref general_threaded (or \ref signal_threaded) URCU.
+        Template argument \p Buffer defines the buffer type of \ref general_threaded URCU.
     */
     template <class Buffer>
     class dispose_thread
@@ -26,7 +55,7 @@ namespace cds { namespace urcu {
         typedef Buffer  buffer_type ;   ///< Buffer type
     private:
         //@cond
-        typedef std::thread                 thread_type;
+        typedef std::thread             thread_type;
         typedef std::mutex              mutex_type;
         typedef std::condition_variable condvar_type;
         typedef std::unique_lock< mutex_type >  unique_lock;
@@ -53,15 +82,15 @@ namespace cds { namespace urcu {
         condvar_type    m_cvDataReady;
 
         // Task for thread (dispose cycle)
-        buffer_type * volatile  m_pBuffer;
-        uint64_t volatile       m_nCurEpoch;
+        atomics::atomic<buffer_type *>  m_pBuffer;
+        uint64_t m_nCurEpoch = 0;
 
         // Quit flag
-        bool volatile           m_bQuit;
+        bool    m_bQuit = false;
 
         // disposing pass sync
-        condvar_type            m_cvReady;
-        bool volatile           m_bReady;
+        condvar_type        m_cvReady;
+        bool                m_bReady = false;
         //@endcond
 
     private: // methods called from disposing thread
@@ -73,24 +102,27 @@ namespace cds { namespace urcu {
             bool            bQuit = false;
 
             while ( !bQuit ) {
+
+                // signal that we are ready to dispose
                 {
                     unique_lock lock( m_Mutex );
-
-                    // signal that we are ready to dispose
                     m_bReady = true;
-                    m_cvReady.notify_one();
+                }
+                m_cvReady.notify_one();
 
+                {
                     // wait new data portion
-                    while ( !m_pBuffer )
+                    unique_lock lock( m_Mutex );
+
+                    while ( (pBuffer = m_pBuffer.load( atomics::memory_order_relaxed )) == nullptr )
                         m_cvDataReady.wait( lock );
 
                     // New work is ready
-                    m_bReady = false ;   // we are busy
+                    m_bReady = false; // we are busy
 
                     bQuit = m_bQuit;
                     nCurEpoch = m_nCurEpoch;
-                    pBuffer = m_pBuffer;
-                    m_pBuffer = nullptr;
+                    m_pBuffer.store( nullptr, atomics::memory_order_relaxed );
                 }
 
                 if ( pBuffer )
@@ -100,32 +132,25 @@ namespace cds { namespace urcu {
 
         void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
         {
-            epoch_retired_ptr p;
-            while ( pBuf->pop( p ) ) {
-                if ( p.m_nEpoch <= nCurEpoch ) {
-                    CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
-                    p.free();
-                    CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+            epoch_retired_ptr * p;
+            while ( ( p = pBuf->front()) != nullptr ) {
+                if ( p->m_nEpoch <= nCurEpoch ) {
+                    p->free();
+                    CDS_VERIFY( pBuf->pop_front());
                 }
-                else {
-                    pBuf->push( p );
+                else
                     break;
-                }
             }
         }
         //@endcond
 
-    public:
+    public: // methods called from any thread
         //@cond
         dispose_thread()
             : m_pBuffer( nullptr )
-            , m_nCurEpoch(0)
-            , m_bQuit( false )
-            , m_bReady( false )
         {}
         //@endcond
 
-    public: // methods called from any thread
         /// Start reclamation thread
         /**
             This function is called by \ref general_threaded object to start
@@ -153,8 +178,8 @@ namespace cds { namespace urcu {
                     m_cvReady.wait( lock );
 
                 // give a new work and set stop flag
-                m_pBuffer = &buf;
                 m_nCurEpoch = nCurEpoch;
+                m_pBuffer.store( &buf, atomics::memory_order_relaxed );
                 m_bQuit = true;
             }
             m_cvDataReady.notify_one();
@@ -165,7 +190,7 @@ namespace cds { namespace urcu {
         /// Start reclamation cycle
         /**
             This function is called by \ref general_threaded object
-            to notify the reclamation thread about new work.
+            to notify the reclamation thread about new work.
             \p buf buffer contains retired objects ready to free.
             The reclamation thread should free all \p buf objects
             \p m_nEpoch field of which is no more than \p nCurEpoch.
@@ -175,22 +200,22 @@ namespace cds { namespace urcu {
         */
         void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
         {
-            unique_lock lock( m_Mutex );
+            {
+                unique_lock lock( m_Mutex );
 
-            // wait while disposing pass done
-            while ( !m_bReady )
-                m_cvReady.wait( lock );
+                // wait while disposing pass done
+                while ( !m_bReady )
+                    m_cvReady.wait( lock );
 
-            if ( bSync )
+                // new work
                 m_bReady = false;
-
-            // new work
-            m_nCurEpoch = nCurEpoch;
-            m_pBuffer = &buf;
-
+                m_nCurEpoch = nCurEpoch;
+                m_pBuffer.store( &buf, atomics::memory_order_relaxed );
+            }
             m_cvDataReady.notify_one();
 
             if ( bSync ) {
+                unique_lock lock( m_Mutex );
                 while ( !m_bReady )
                     m_cvReady.wait( lock );
             }