Added wait strategies to flat combining technique
authorMarsel Galimullin <mfgalimullin@yandex.ru>
Fri, 8 May 2015 14:06:26 +0000 (17:06 +0300)
committerkhizmax <libcds.dev@gmail.com>
Fri, 10 Jun 2016 21:03:22 +0000 (00:03 +0300)
cds/algo/wait_strategy.h [new file with mode: 0644]

diff --git a/cds/algo/wait_strategy.h b/cds/algo/wait_strategy.h
new file mode 100644 (file)
index 0000000..422e12e
--- /dev/null
@@ -0,0 +1,267 @@
+/*\r
+    This file is a part of libcds - Concurrent Data Structures library\r
+\r
+    (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016\r
+\r
+    Source code repo: http://github.com/khizmax/libcds/\r
+    Download: http://sourceforge.net/projects/libcds/files/\r
+\r
+    Redistribution and use in source and binary forms, with or without\r
+    modification, are permitted provided that the following conditions are met:\r
+\r
+    * Redistributions of source code must retain the above copyright notice, this\r
+      list of conditions and the following disclaimer.\r
+\r
+    * Redistributions in binary form must reproduce the above copyright notice,\r
+      this list of conditions and the following disclaimer in the documentation\r
+      and/or other materials provided with the distribution.\r
+\r
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"\r
+    AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE\r
+    IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE\r
+    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE\r
+    FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL\r
+    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR\r
+    SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER\r
+    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,\r
+    OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\r
+    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\r
+*/\r
+\r
+#ifndef CDSLIB_ALGO_FLAT_COMBINING_WAIT_STRATEGY_H\r
+#define CDSLIB_ALGO_FLAT_COMBINING_WAIT_STRATEGY_H\r
+\r
+#include <iostream>\r
+\r
+#include <boost/thread.hpp>\r
+#include <cds/algo/backoff_strategy.h>\r
+\r
+namespace cds {  namespace algo {  namespace flat_combining {\r
+       /// Waiting strategy for flat combining\r
+\r
+    // Special values of publication_record::nRequest\r
+    enum request_value\r
+    {\r
+        req_EmptyRecord,    ///< Publication record is empty\r
+        req_Response,       ///< Operation is done\r
+\r
+        req_Operation       ///< First operation id for derived classes\r
+    };\r
+\r
+    /// publication_record state\r
+    enum record_state {\r
+        inactive,       ///< Record is inactive\r
+        active,         ///< Record is active\r
+        removed         ///< Record should be removed\r
+    };\r
+\r
+    /// do-nothing strategy\r
+    template<typename UserPublicationRecord, typename Traits>\r
+    struct BareWaitStartegy\r
+    {\r
+        struct ExtendedPublicationRecord: public UserPublicationRecord\r
+        {\r
+        };\r
+\r
+        void wait(ExtendedPublicationRecord * pRec){}\r
+        void notify(ExtendedPublicationRecord* pRec){\r
+            pRec->nRequest.store( req_Response, Traits::memory_model::memory_order_release);\r
+        }\r
+    };\r
+\r
+    /// Wait/notify strategy based on thread-local mutex and thread-local condition variable\r
+    template<typename UserPublicationRecord, typename Traits>\r
+    struct WaitStartegyMultMutexMultCondVar\r
+    {\r
+        struct ExtendedPublicationRecord: public UserPublicationRecord\r
+        {\r
+            boost::mutex                _waitMutex;\r
+            boost::condition_variable   _condVar;\r
+        };\r
+\r
+        void wait(ExtendedPublicationRecord * pRec){\r
+            boost::unique_lock<boost::mutex> lock(pRec->_waitMutex);\r
+            if (pRec->nRequest.load( Traits::memory_model::memory_order_acquire ) >= req_Operation)\r
+                               pRec->_condVar.wait(lock);\r
+        }\r
+\r
+        void notify(ExtendedPublicationRecord* pRec){\r
+            boost::unique_lock<boost::mutex> lock(pRec->_waitMutex);\r
+            pRec->nRequest.store( req_Response, Traits::memory_model::memory_order_release);\r
+            pRec->_condVar.notify_one();\r
+        }\r
+    };\r
+\r
+    /// Back-off strategy\r
+    /**\r
+     * When N threads compete for the critical resource that can be accessed with the help of CAS-operations,\r
+     * only one of them gets an access. Other N–1 threads interrupt each other and consume process time in vain.\r
+     */\r
+    template<typename UserPublicationRecord, typename Traits>\r
+       struct WaitBakkOffStrategy\r
+       {\r
+        struct ExtendedPublicationRecord: public UserPublicationRecord\r
+        {\r
+        };\r
+        void wait(ExtendedPublicationRecord * pRec){\r
+                             cds::backoff::delay_of<2>   back_off;\r
+            back_off();\r
+        }\r
+        void notify(ExtendedPublicationRecord* pRec){\r
+            pRec->nRequest.store( req_Response, Traits::memory_model::memory_order_release);\r
+        }\r
+    };\r
+\r
+    /// Wait/notify strategy based on the global mutex and the condition variable\r
+    /**\r
+     *  The strategy is based on the usage of synchronization primitives of\r
+     *  the FC core which are shared by all threads.\r
+     */\r
+    template<typename UserPublicationRecord, typename Traits>\r
+    struct WaitOneMutexOneCondVarStrategy\r
+       {\r
+        boost::mutex              _globalMutex;\r
+        boost::condition_variable _globalCondVar;\r
+\r
+        struct ExtendedPublicationRecord: public UserPublicationRecord\r
+        {\r
+        };\r
+        void wait(ExtendedPublicationRecord * pRec){\r
+            boost::unique_lock<boost::mutex> lock(_globalMutex);\r
+            if (pRec->nRequest.load( Traits::memory_model::memory_order_acquire ) >= req_Operation)\r
+                               //_globalCondVar.timed_wait(lock, static_cast<boost::posix_time::seconds>(1));\r
+                               _globalCondVar.wait(lock);\r
+        }\r
+\r
+        void notify(ExtendedPublicationRecord* pRec){\r
+            boost::unique_lock<boost::mutex> lock(_globalMutex);\r
+            pRec->nRequest.store( req_Response, Traits::memory_model::memory_order_release);\r
+            _globalCondVar.notify_all();\r
+        }\r
+    };\r
+\r
+    /// Wait/notify strategy based on the global mutex and the thread-local condition variable\r
+    /**\r
+     * It uses one extra mutex aggregated in the FC core and a condition variable for every\r
+     * thread aggregated in thread's publication record in the publication list\r
+     */\r
+       template<typename UserPublicationRecord, typename Traits>\r
+       struct WaitStratygyBaseOnSingleMutexMutlLocalCondVars\r
+       {\r
+               boost::mutex              _globalMutex;\r
+\r
+               struct ExtendedPublicationRecord : public UserPublicationRecord\r
+               {\r
+                       boost::condition_variable _globalCondVar;\r
+               };\r
+               void wait(ExtendedPublicationRecord * pRec){\r
+                       boost::unique_lock<boost::mutex> lock(_globalMutex);\r
+                       if (pRec->nRequest.load(Traits::memory_model::memory_order_acquire) >= req_Operation)\r
+                               //_globalCondVar.timed_wait(lock, static_cast<boost::posix_time::seconds>(1));\r
+                               pRec->_globalCondVar.wait(lock);\r
+               }\r
+\r
+               void notify(ExtendedPublicationRecord* pRec){\r
+                       boost::unique_lock<boost::mutex> lock(_globalMutex);\r
+                       pRec->nRequest.store(req_Response, Traits::memory_model::memory_order_release);\r
+                       pRec->_globalCondVar.notify_one();\r
+               }\r
+       };\r
+\r
+       //===================================================================\r
+    template<typename UserPublicationRecord, typename Traits>\r
+    struct TimedWaitMultMutexMultCondVar\r
+    {\r
+        struct ExtendedPublicationRecord: public UserPublicationRecord\r
+        {\r
+            boost::mutex                _waitMutex;\r
+            boost::condition_variable   _condVar;\r
+        };\r
+\r
+        void wait(ExtendedPublicationRecord * pRec){\r
+            boost::unique_lock<boost::mutex> lock(pRec->_waitMutex);\r
+            if (pRec->nRequest.load( Traits::memory_model::memory_order_acquire ) >= req_Operation)\r
+                pRec->_condVar.timed_wait(lock, boost::posix_time::millisec(2));\r
+        }\r
+\r
+        void notify(ExtendedPublicationRecord* pRec){\r
+            pRec->nRequest.store(req_Response, Traits::memory_model::memory_order_release);\r
+            pRec->_condVar.notify_one();\r
+        }\r
+    };\r
+    //===================================================================\r
+    template<typename UserPublicationRecord, typename Traits>\r
+    struct TimedWaitGlobalMutexAndCondVar\r
+    {\r
+        boost::mutex              _globalMutex;\r
+        boost::condition_variable _globalCondVar;\r
+\r
+        struct ExtendedPublicationRecord: public UserPublicationRecord\r
+        {\r
+        };\r
+        void wait(ExtendedPublicationRecord * pRec){\r
+            boost::unique_lock<boost::mutex> lock(_globalMutex);\r
+            if (pRec->nRequest.load( Traits::memory_model::memory_order_acquire ) >= req_Operation)\r
+                _globalCondVar.timed_wait(lock, boost::posix_time::millisec(2));\r
+        }\r
+\r
+        void notify(ExtendedPublicationRecord* pRec){\r
+            pRec->nRequest.store( req_Response, Traits::memory_model::memory_order_release);\r
+            _globalCondVar.notify_all();\r
+        }\r
+    };\r
+    //===================================================================\r
+    template <int v>\r
+    struct Int2Type{\r
+        enum {value = v};\r
+    };\r
+\r
+    ///Adaptive strategy\r
+    /**\r
+     * It works like “back-off”-strategy with “light” elements with small size\r
+     * and like Wait/notify strategy based on thread-local mutex and thread-local condition variable\r
+     * with “heavy” elements.\r
+     */\r
+    template<typename UserPublicationRecord, typename Traits>\r
+    struct AutoWaitStrategy\r
+    {\r
+        struct ExtendedPublicationRecord: public UserPublicationRecord\r
+        {\r
+            boost::mutex                _waitMutex;\r
+            boost::condition_variable   _condVar;\r
+        };\r
+\r
+        void wait(ExtendedPublicationRecord * pRec){\r
+            doWait(pRec, Int2Type<sizeof(typename UserPublicationRecord::value_type) <= 4*sizeof(int) >());\r
+        }\r
+\r
+        void notify(ExtendedPublicationRecord* pRec){\r
+            doNotify(pRec, Int2Type<sizeof(typename UserPublicationRecord::value_type) <= 4*sizeof(int) >());\r
+        }\r
+\r
+     private:\r
+        //The container consists a small data\r
+        void doWait(ExtendedPublicationRecord * pRec, Int2Type<true>){\r
+            cds::backoff::delay_of<2>   back_off;\r
+            back_off();\r
+        }\r
+\r
+        void doNotify(ExtendedPublicationRecord * pRec, Int2Type<true>){\r
+            pRec->nRequest.store( req_Response, Traits::memory_model::memory_order_release);\r
+        }\r
+\r
+        //The container consists a big data\r
+        void doWait(ExtendedPublicationRecord * pRec, Int2Type<false>){\r
+            boost::unique_lock<boost::mutex> lock(pRec->_waitMutex);\r
+            if (pRec->nRequest.load( Traits::memory_model::memory_order_acquire ) >= req_Operation)\r
+                pRec->_condVar.timed_wait(lock, boost::posix_time::millisec(2));\r
+        }\r
+\r
+        void doNotify(ExtendedPublicationRecord * pRec, Int2Type<false>){\r
+            pRec->nRequest.store(req_Response, Traits::memory_model::memory_order_release);\r
+            pRec->_condVar.notify_one();\r
+        }\r
+    };//class AutoWaitStrategy\r
+}}}//end namespace cds::algo::flat_combining\r
+\r
+#endif //CDSLIB_ALGO_FLAT_COMBINING_WAIT_STRATEGY_H\r