Uses different pass count for different parallel queue test cases
[libcds.git] / cds / urcu / dispose_thread.h
index 3c1619c12a2fceebc5720e43841be0d4b1e29ab9..a83376ba5fa3eb19601285db0ae6e163becc6412 100644 (file)
@@ -1,24 +1,52 @@
-//$$CDS-header$$1
-
-#ifndef _CDS_URCU_DISPOSE_THREAD_H
-#define _CDS_URCU_DISPOSE_THREAD_H
-
-//#include <cds/backoff_strategy.h>
-#include <cds/details/std/thread.h>
-#include <cds/details/std/mutex.h>
-#include <cds/details/std/condition_variable.h>
-#include <cds/details/std/memory.h>     // unique_ptr
+/*
+    This file is a part of libcds - Concurrent Data Structures library
+
+    (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
+
+    Source code repo: http://github.com/khizmax/libcds/
+    Download: http://sourceforge.net/projects/libcds/files/
+
+    Redistribution and use in source and binary forms, with or without
+    modification, are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright notice, this
+      list of conditions and the following disclaimer.
+
+    * Redistributions in binary form must reproduce the above copyright notice,
+      this list of conditions and the following disclaimer in the documentation
+      and/or other materials provided with the distribution.
+
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+    AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+    IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+    FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+    SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+    OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifndef CDSLIB_URCU_DISPOSE_THREAD_H
+#define CDSLIB_URCU_DISPOSE_THREAD_H
+
+#include <memory>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
 #include <cds/details/aligned_type.h>
+#include <cds/algo/atomic.h>
 
 namespace cds { namespace urcu {
 
-    /// Reclamation thread for \p general_threaded and \p signal_threaded URCU
+    /// Reclamation thread for \p general_threaded URCU
     /**
         The object of this class contains a reclamation thread object and
         necessary synchronization object(s). The object manages reclamation thread
         and defines a set of messages (i.e. methods) to communicate with the thread.
 
-        Template argument \p Buffer defines the buffer type of \ref general_threaded (or \ref signal_threaded) URCU.
+        Template argument \p Buffer defines the buffer type of \ref general_threaded URCU.
     */
     template <class Buffer>
     class dispose_thread
@@ -27,10 +55,10 @@ namespace cds { namespace urcu {
         typedef Buffer  buffer_type ;   ///< Buffer type
     private:
         //@cond
-        typedef cds_std::thread                     thread_type;
-        typedef cds_std::mutex                      mutex_type;
-        typedef cds_std::condition_variable         condvar_type;
-        typedef cds_std::unique_lock< mutex_type >  unique_lock;
+        typedef std::thread             thread_type;
+        typedef std::mutex              mutex_type;
+        typedef std::condition_variable condvar_type;
+        typedef std::unique_lock< mutex_type >  unique_lock;
 
         class dispose_thread_starter: public thread_type
         {
@@ -54,15 +82,15 @@ namespace cds { namespace urcu {
         condvar_type    m_cvDataReady;
 
         // Task for thread (dispose cycle)
-        buffer_type * volatile  m_pBuffer;
-        uint64_t volatile       m_nCurEpoch;
+        atomics::atomic<buffer_type *>  m_pBuffer;
+        uint64_t m_nCurEpoch = 0;
 
         // Quit flag
-        bool volatile           m_bQuit;
+        bool    m_bQuit = false;
 
         // disposing pass sync
-        condvar_type            m_cvReady;
-        bool volatile           m_bReady;
+        condvar_type        m_cvReady;
+        bool                m_bReady = false;
         //@endcond
 
     private: // methods called from disposing thread
@@ -74,24 +102,27 @@ namespace cds { namespace urcu {
             bool            bQuit = false;
 
             while ( !bQuit ) {
+
+                // signal that we are ready to dispose
                 {
                     unique_lock lock( m_Mutex );
-
-                    // signal that we are ready to dispose
                     m_bReady = true;
-                    m_cvReady.notify_one();
+                }
+                m_cvReady.notify_one();
 
+                {
                     // wait new data portion
-                    while ( !m_pBuffer )
+                    unique_lock lock( m_Mutex );
+
+                    while ( (pBuffer = m_pBuffer.load( atomics::memory_order_relaxed )) == nullptr )
                         m_cvDataReady.wait( lock );
 
                     // New work is ready
-                    m_bReady = false ;   // we are busy
+                    m_bReady = false; // we are busy
 
                     bQuit = m_bQuit;
                     nCurEpoch = m_nCurEpoch;
-                    pBuffer = m_pBuffer;
-                    m_pBuffer = nullptr;
+                    m_pBuffer.store( nullptr, atomics::memory_order_relaxed );
                 }
 
                 if ( pBuffer )
@@ -101,29 +132,25 @@ namespace cds { namespace urcu {
 
         void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
         {
-            epoch_retired_ptr p;
-            while ( pBuf->pop( p ) ) {
-                if ( p.m_nEpoch <= nCurEpoch )
-                    p.free();
-                else {
-                    pBuf->push( p );
-                    break;
+            epoch_retired_ptr * p;
+            while ( ( p = pBuf->front()) != nullptr ) {
+                if ( p->m_nEpoch <= nCurEpoch ) {
+                    p->free();
+                    CDS_VERIFY( pBuf->pop_front());
                 }
+                else
+                    break;
             }
         }
         //@endcond
 
-    public:
+    public: // methods called from any thread
         //@cond
         dispose_thread()
             : m_pBuffer( nullptr )
-            , m_nCurEpoch(0)
-            , m_bQuit( false )
-            , m_bReady( false )
         {}
         //@endcond
 
-    public: // methods called from any thread
         /// Start reclamation thread
         /**
             This function is called by \ref general_threaded object to start
@@ -151,8 +178,8 @@ namespace cds { namespace urcu {
                     m_cvReady.wait( lock );
 
                 // give a new work and set stop flag
-                m_pBuffer = &buf;
                 m_nCurEpoch = nCurEpoch;
+                m_pBuffer.store( &buf, atomics::memory_order_relaxed );
                 m_bQuit = true;
             }
             m_cvDataReady.notify_one();
@@ -163,7 +190,7 @@ namespace cds { namespace urcu {
         /// Start reclamation cycle
         /**
             This function is called by \ref general_threaded object
-            to notify the reclamation thread about new work.
+            to notify the reclamation thread about new work.
             \p buf buffer contains retired objects ready to free.
             The reclamation thread should free all \p buf objects
             \p m_nEpoch field of which is no more than \p nCurEpoch.
@@ -173,22 +200,22 @@ namespace cds { namespace urcu {
         */
         void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
         {
-            unique_lock lock( m_Mutex );
+            {
+                unique_lock lock( m_Mutex );
 
-            // wait while disposing pass done
-            while ( !m_bReady )
-                m_cvReady.wait( lock );
+                // wait while disposing pass done
+                while ( !m_bReady )
+                    m_cvReady.wait( lock );
 
-            if ( bSync )
+                // new work
                 m_bReady = false;
-
-            // new work
-            m_nCurEpoch = nCurEpoch;
-            m_pBuffer = &buf;
-
+                m_nCurEpoch = nCurEpoch;
+                m_pBuffer.store( &buf, atomics::memory_order_relaxed );
+            }
             m_cvDataReady.notify_one();
 
             if ( bSync ) {
+                unique_lock lock( m_Mutex );
                 while ( !m_bReady )
                     m_cvReady.wait( lock );
             }
@@ -196,4 +223,4 @@ namespace cds { namespace urcu {
     };
 }} // namespace cds::urcu
 
-#endif // #ifdef _CDS_URCU_DISPOSE_THREAD_H
+#endif // #ifdef CDSLIB_URCU_DISPOSE_THREAD_H