Make Vyukov queue linearizable
authorkhizmax <libcds.dev@gmail.com>
Thu, 17 Dec 2015 19:44:14 +0000 (22:44 +0300)
committerkhizmax <libcds.dev@gmail.com>
Thu, 17 Dec 2015 19:44:14 +0000 (22:44 +0300)
cds/container/vyukov_mpmc_cycle_queue.h
cds/intrusive/vyukov_mpmc_cycle_queue.h

index 8864e403f7a7fd16247091f14c4f5e6f9938b798..0238152ec6f8dd129b24f0fd7aa61bcee5592cbb 100644 (file)
@@ -49,6 +49,9 @@ namespace cds { namespace container {
 
             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
             enum { padding = opt::cache_line_padding };
+
+            /// Back-off strategy
+            typedef cds::backoff::Default           back_off;
         };
 
         /// Metafunction converting option list to \p vyukov_queue::traits
@@ -62,6 +65,7 @@ namespace cds { namespace container {
                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
                 If \p T is a complex type, \p value_cleaner can be an useful feature.
                 Default value is \ref opt::v::destruct_cleaner
+            - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
                 To enable item counting use \p cds::atomicity::item_counter
             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
@@ -136,6 +140,7 @@ namespace cds { namespace container {
         typedef typename traits::item_counter  item_counter;  ///< Item counter type
         typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See cds::opt::memory_model option
         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
+        typedef typename traits::back_off  back_off;          ///< back-off strategy
 
         /// Rebind template arguments
         template <typename T2, typename Traits2>
@@ -214,8 +219,9 @@ namespace cds { namespace container {
         bool enqueue_with(Func f)
         {
             cell_type* cell;
-            size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
+            back_off bkoff;
 
+            size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
             for (;;)
             {
                 cell = &m_buffer[pos & m_nBufferMask];
@@ -227,8 +233,13 @@ namespace cds { namespace container {
                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
                         break;
                 }
-                else if (dif < 0)
-                    return false;
+                else if (dif < 0) {
+                    // Queue full?
+                    if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity() )
+                        return false;   // queue full
+                    bkoff();
+                    pos = m_posEnqueue.load( memory_model::memory_order_relaxed );
+                }
                 else
                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
             }
@@ -291,8 +302,9 @@ namespace cds { namespace container {
         bool dequeue_with( Func f )
         {
             cell_type * cell;
-            size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
+            back_off bkoff;
 
+            size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
             for (;;)
             {
                 cell = &m_buffer[pos & m_nBufferMask];
@@ -303,8 +315,13 @@ namespace cds { namespace container {
                     if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
                         break;
                 }
-                else if (dif < 0)
-                    return false;
+                else if (dif < 0) {
+                    // Queue empty?
+                    if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
+                        return false;   // queue empty
+                    bkoff();
+                    pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+                }
                 else
                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
             }
@@ -345,8 +362,9 @@ namespace cds { namespace container {
         bool empty() const
         {
             const cell_type * cell;
-            size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
+            back_off bkoff;
 
+            size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
             for (;;)
             {
                 cell = &m_buffer[pos & m_nBufferMask];
@@ -355,10 +373,12 @@ namespace cds { namespace container {
 
                 if (dif == 0)
                     return false;
-                else if (dif < 0)
-                    return true;
-                else
-                    pos = m_posDequeue.load(memory_model::memory_order_relaxed);
+                else if (dif < 0) {
+                    if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
+                        return true;
+                }
+                bkoff();
+                pos = m_posDequeue.load(memory_model::memory_order_relaxed);
             }
         }
 
index 1a44211d2a21fbc04f3e566874d70a2e0ebb026b..b6ba2ca9d8987b7adbf146c412ffa728a046be7e 100644 (file)
@@ -30,6 +30,7 @@ namespace cds { namespace intrusive {
                 This option is used only in \p clear() member function.
             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
                 To enable item counting use \p cds::atomicity::item_counter
+            - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
@@ -39,7 +40,7 @@ namespace cds { namespace intrusive {
             typedef cds::intrusive::VyukovMPMCCycleQueue< Foo,
                 typename cds::intrusive::vyukov_queue::make_traits<
                     cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
-                    cds::opt::item_counte< cds::atomicity::item_counter >
+                    cds::opt::item_counter< cds::atomicity::item_counter >
                 >::type
             > myQueue;
             \endcode
@@ -119,9 +120,10 @@ namespace cds { namespace intrusive {
     public:
         typedef T value_type;   ///< type of data to be stored in the queue
         typedef Traits traits;  ///< Queue traits
-        typedef typename traits::item_counter   item_counter;   ///< Item counter type
-        typedef typename traits::memory_model   memory_model;   ///< Memory ordering. See cds::opt::memory_model option
-        typedef typename traits::disposer       disposer;       ///< Item disposer
+        typedef typename traits::item_counter item_counter; ///< Item counter type
+        typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
+        typedef typename traits::disposer     disposer;     ///< Item disposer
+        typedef typename traits::back_off     back_off;     ///< back-off strategy
 
     public:
         /// Rebind template arguments