From f005d4fa008f269b7cd807f7735709b2b4d7b705 Mon Sep 17 00:00:00 2001 From: Brian Norris Date: Tue, 9 Oct 2012 10:57:56 -0700 Subject: [PATCH] add "lock-free queue" From: http://www.codeproject.com/Articles/153898/Yet-another-implementation-of-a-lock-free-circular License: http://opensource.org/licenses/bsd-license.php --- queue/array_lock_free_queue.h | 141 +++++++++++ queue/array_lock_free_queue_impl.h | 196 +++++++++++++++ queue/array_lock_free_queue_single_producer.h | 137 +++++++++++ ...ray_lock_free_queue_single_producer_impl.h | 182 ++++++++++++++ queue/atomic_ops.h | 79 ++++++ queue/g_blocking_queue.h | 122 ++++++++++ queue/g_blocking_queue_impl.h | 224 ++++++++++++++++++ queue/makefile | 67 ++++++ queue/tags | 89 +++++++ queue/test_blocking_q.cpp | 112 +++++++++ queue/test_lock_free_q.cpp | 132 +++++++++++ queue/test_lock_free_single_producer_q.cpp | 130 ++++++++++ 12 files changed, 1611 insertions(+) create mode 100644 queue/array_lock_free_queue.h create mode 100644 queue/array_lock_free_queue_impl.h create mode 100644 queue/array_lock_free_queue_single_producer.h create mode 100644 queue/array_lock_free_queue_single_producer_impl.h create mode 100644 queue/atomic_ops.h create mode 100644 queue/g_blocking_queue.h create mode 100644 queue/g_blocking_queue_impl.h create mode 100644 queue/makefile create mode 100644 queue/tags create mode 100644 queue/test_blocking_q.cpp create mode 100644 queue/test_lock_free_q.cpp create mode 100644 queue/test_lock_free_single_producer_q.cpp diff --git a/queue/array_lock_free_queue.h b/queue/array_lock_free_queue.h new file mode 100644 index 0000000..e392889 --- /dev/null +++ b/queue/array_lock_free_queue.h @@ -0,0 +1,141 @@ +// ============================================================================ +// Copyright (c) 2010 Faustino Frechilla +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// 3. The name of the author may not be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +/// @file array_lock_free_queue.h +/// @brief Definition of a circular array based lock-free queue +/// +/// @author Faustino Frechilla +/// @history +/// Ref Who When What +/// Faustino Frechilla 11-Jul-2010 Original development +/// @endhistory +/// +// ============================================================================ + +#ifndef __ARRAY_LOCK_FREE_QUEUE_H__ +#define __ARRAY_LOCK_FREE_QUEUE_H__ + +#include // uint32_t +#include "atomic_ops.h" // atomic operations wrappers + +#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // (2^16) + +// define this variable if calls to "size" must return the real size of the +// queue. If it is undefined that function will try to take a snapshot of +// the queue, but returned value might be bogus +#undef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE +//#define ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE 1 + + +/// @brief Lock-free queue based on a circular array +/// No allocation of extra memory for the nodes handling is needed, but it has to add +/// extra overhead (extra CAS operation) when inserting to ensure the thread-safety of the queue +/// ELEM_T represents the type of elementes pushed and popped from the queue +/// TOTAL_SIZE size of the queue. It should be a power of 2 to ensure +/// indexes in the circular queue keep stable when the uint32_t +/// variable that holds the current position rolls over from FFFFFFFF +/// to 0. For instance +/// 2 -> 0x02 +/// 4 -> 0x04 +/// 8 -> 0x08 +/// 16 -> 0x10 +/// (...) +/// 1024 -> 0x400 +/// 2048 -> 0x800 +/// +/// if queue size is not defined as requested, let's say, for +/// instance 100, when current position is FFFFFFFF (4,294,967,295) +/// index in the circular array is 4,294,967,295 % 100 = 95. +/// When that value is incremented it will be set to 0, that is the +/// last 4 elements of the queue are not used when the counter rolls +/// over to 0 +template +class ArrayLockFreeQueue +{ +public: + /// @brief constructor of the class + ArrayLockFreeQueue(); + virtual ~ArrayLockFreeQueue(); + + /// @brief returns the current number of items in the queue + /// It tries to take a snapshot of the size of the queue, but in busy environments + /// this function might return bogus values. + /// + /// If a reliable queue size must be kept you might want to have a look at + /// the preprocessor variable in this header file called 'ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE' + /// it enables a reliable size though it hits overall performance of the queue + /// (when the reliable size variable is on it's got an impact of about 20% in time) + uint32_t size(); + + /// @brief push an element at the tail of the queue + /// @param the element to insert in the queue + /// Note that the element is not a pointer or a reference, so if you are using large data + /// structures to be inserted in the queue you should think of instantiate the template + /// of the queue as a pointer to that large structure + /// @returns true if the element was inserted in the queue. False if the queue was full + bool push(const ELEM_T &a_data); + + /// @brief pop the element at the head of the queue + /// @param a reference where the element in the head of the queue will be saved to + /// Note that the a_data parameter might contain rubbish if the function returns false + /// @returns true if the element was successfully extracted from the queue. False if the queue was empty + bool pop(ELEM_T &a_data); + +private: + /// @brief array to keep the elements + ELEM_T m_theQueue[Q_SIZE]; + +#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE + /// @brief number of elements in the queue + volatile uint32_t m_count; +#endif + + /// @brief where a new element will be inserted + volatile uint32_t m_writeIndex; + + /// @brief where the next element where be extracted from + volatile uint32_t m_readIndex; + + /// @brief maximum read index for multiple producer queues + /// If it's not the same as m_writeIndex it means + /// there are writes pending to be "committed" to the queue, that means, + /// the place for the data was reserved (the index in the array) but + /// data is still not in the queue, so the thread trying to read will have + /// to wait for those other threads to save the data into the queue + /// + /// note this index is only used for MultipleProducerThread queues + volatile uint32_t m_maximumReadIndex; + + /// @brief calculate the index in the circular array that corresponds + /// to a particular "count" value + inline uint32_t countToIndex(uint32_t a_count); +}; + +// include the implementation file +#include "array_lock_free_queue_impl.h" + +#endif // __ARRAY_LOCK_FREE_QUEUE_H__ diff --git a/queue/array_lock_free_queue_impl.h b/queue/array_lock_free_queue_impl.h new file mode 100644 index 0000000..d368bb5 --- /dev/null +++ b/queue/array_lock_free_queue_impl.h @@ -0,0 +1,196 @@ +// ============================================================================ +// Copyright (c) 2010 Faustino Frechilla +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// 3. The name of the author may not be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +/// @file array_lock_free_queue_impl.h +/// @brief Implementation of a circular array based lock-free queue +/// +/// @author Faustino Frechilla +/// @history +/// Ref Who When What +/// Faustino Frechilla 11-Jul-2010 Original development +/// @endhistory +/// +// ============================================================================ + +#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_H__ +#define __ARRAY_LOCK_FREE_QUEUE_IMPL_H__ + +#include // assert() +#include // sched_yield() + +template +ArrayLockFreeQueue::ArrayLockFreeQueue() : + m_writeIndex(0), + m_readIndex(0), + m_maximumReadIndex(0) // only for MultipleProducerThread queues +{ +#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE + m_count = 0; +#endif +} + +template +ArrayLockFreeQueue::~ArrayLockFreeQueue() +{ +} + +template +inline +uint32_t ArrayLockFreeQueue::countToIndex(uint32_t a_count) +{ + // if Q_SIZE is a power of 2 this statement could be also written as + // return (a_count & (Q_SIZE - 1)); + return (a_count % Q_SIZE); +} + +template +uint32_t ArrayLockFreeQueue::size() +{ +#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE + return m_count; +#else + uint32_t currentWriteIndex = m_writeIndex; + uint32_t currentReadIndex = m_readIndex; + + // let's think of a scenario where this function returns bogus data + // 1. when the statement 'currentWriteIndex = m_writeIndex' is run + // m_writeIndex is 3 and m_readIndex is 2. Real size is 1 + // 2. afterwards this thread is preemted. While this thread is inactive 2 + // elements are inserted and removed from the queue, so m_writeIndex is 5 + // m_readIndex 4. Real size is still 1 + // 3. Now the current thread comes back from preemption and reads m_readIndex. + // currentReadIndex is 4 + // 4. currentReadIndex is bigger than currentWriteIndex, so + // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, + // it returns that the queue is almost full, when it is almost empty + + if (currentWriteIndex >= currentReadIndex) + { + return (currentWriteIndex - currentReadIndex); + } + else + { + return (Q_SIZE + currentWriteIndex - currentReadIndex); + } +#endif // ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE +} + +template +bool ArrayLockFreeQueue::push(const ELEM_T &a_data) +{ + uint32_t currentReadIndex; + uint32_t currentWriteIndex; + + do + { + currentWriteIndex = m_writeIndex; + currentReadIndex = m_readIndex; + if (countToIndex(currentWriteIndex + 1) == + countToIndex(currentReadIndex)) + { + // the queue is full + return false; + } + + } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); + + // We know now that this index is reserved for us. Use it to save the data + m_theQueue[countToIndex(currentWriteIndex)] = a_data; + + // update the maximum read index after saving the data. It wouldn't fail if there is only one thread + // inserting in the queue. It might fail if there are more than 1 producer threads because this + // operation has to be done in the same order as the previous CAS + while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) + { + // this is a good place to yield the thread in case there are more + // software threads than hardware processors and you have more + // than 1 producer thread + // have a look at sched_yield (POSIX.1b) + sched_yield(); + } + + // The value was successfully inserted into the queue +#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE + AtomicAdd(&m_count, 1); +#endif + + return true; +} + +template +bool ArrayLockFreeQueue::pop(ELEM_T &a_data) +{ + uint32_t currentMaximumReadIndex; + uint32_t currentReadIndex; + + do + { + // to ensure thread-safety when there is more than 1 producer thread + // a second index is defined (m_maximumReadIndex) + currentReadIndex = m_readIndex; + currentMaximumReadIndex = m_maximumReadIndex; + + if (countToIndex(currentReadIndex) == + countToIndex(currentMaximumReadIndex)) + { + // the queue is empty or + // a producer thread has allocate space in the queue but is + // waiting to commit the data into it + return false; + } + + // retrieve the data from the queue + a_data = m_theQueue[countToIndex(currentReadIndex)]; + + // try to perfrom now the CAS operation on the read index. If we succeed + // a_data already contains what m_readIndex pointed to before we + // increased it + if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) + { + // got here. The value was retrieved from the queue. Note that the + // data inside the m_queue array is not deleted nor reseted +#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE + AtomicSub(&m_count, 1); +#endif + return true; + } + + // it failed retrieving the element off the queue. Someone else must + // have read the element stored at countToIndex(currentReadIndex) + // before we could perform the CAS operation + + } while(1); // keep looping to try again! + + // Something went wrong. it shouldn't be possible to reach here + assert(0); + + // Add this return statement to avoid compiler warnings + return false; +} + +#endif // __ARRAY_LOCK_FREE_QUEUE_IMPL_H__ + diff --git a/queue/array_lock_free_queue_single_producer.h b/queue/array_lock_free_queue_single_producer.h new file mode 100644 index 0000000..1d80e16 --- /dev/null +++ b/queue/array_lock_free_queue_single_producer.h @@ -0,0 +1,137 @@ +// ============================================================================ +// Copyright (c) 2010 Faustino Frechilla +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// 3. The name of the author may not be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +/// @file array_lock_free_queue_single_producer.h +/// @brief Definition of a circular array based lock-free queue +/// +/// WARNING: This queue is not thread safe when several threads try to insert +/// elements into the queue. It is allowed to use as many consumers +/// as needed though. +/// +/// @author Faustino Frechilla +/// @history +/// Ref Who When What +/// Faustino Frechilla 11-Jul-2010 Original development +/// @endhistory +/// +// ============================================================================ + +#ifndef __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__ +#define __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__ + +#include // uint32_t +#include "atomic_ops.h" // atomic operations wrappers + +#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65536 // 2^16 = 65,536 elements by default + +// define this variable if calls to "size" must return the real size of the +// queue. If it is undefined that function will try to take a snapshot of +// the queue, but returned value might be bogus +#undef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE +//#define ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE 1 + + +/// @brief especialisation of the ArrayLockFreeQueue to be used when there is +/// only one producer thread +/// No allocation of extra memory for the nodes handling is needed +/// WARNING: This queue is not thread safe when several threads try to insert elements +/// into the queue +/// ELEM_T represents the type of elementes pushed and popped from the queue +/// TOTAL_SIZE size of the queue. It should be a power of 2 to ensure +/// indexes in the circular queue keep stable when the uint32_t +/// variable that holds the current position rolls over from FFFFFFFF +/// to 0. For instance +/// 2 -> 0x02 +/// 4 -> 0x04 +/// 8 -> 0x08 +/// 16 -> 0x10 +/// (...) +/// 1024 -> 0x400 +/// 2048 -> 0x800 +/// +/// if queue size is not defined as requested, let's say, for +/// instance 100, when current position is FFFFFFFF (4,294,967,295) +/// index in the circular array is 4,294,967,295 % 100 = 95. +/// When that value is incremented it will be set to 0, that is the +/// last 4 elements of the queue are not used when the counter rolls +/// over to 0 +template +class ArrayLockFreeQueueSingleProducer +{ +public: + /// @brief constructor of the class + ArrayLockFreeQueueSingleProducer(); + virtual ~ArrayLockFreeQueueSingleProducer(); + + /// @brief returns the current number of items in the queue + /// It tries to take a snapshot of the size of the queue, but in busy environments + /// this function might return bogus values. + /// + /// If a reliable queue size must be kept you might want to have a look at + /// the preprocessor variable in this header file called 'ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE' + /// it enables a reliable size though it hits overall performance of the queue + /// (when the reliable size variable is on it's got an impact of about 20% in time) + uint32_t size(); + + /// @brief push an element at the tail of the queue + /// @param the element to insert in the queue + /// Note that the element is not a pointer or a reference, so if you are using large data + /// structures to be inserted in the queue you should think of instantiate the template + /// of the queue as a pointer to that large structure + /// @returns true if the element was inserted in the queue. False if the queue was full + bool push(const ELEM_T &a_data); + + /// @brief pop the element at the head of the queue + /// @param a reference where the element in the head of the queue will be saved to + /// Note that the a_data parameter might contain rubbish if the function returns false + /// @returns true if the element was successfully extracted from the queue. False if the queue was empty + bool pop(ELEM_T &a_data); + +private: + /// @brief array to keep the elements + ELEM_T m_theQueue[Q_SIZE]; + +#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE + /// @brief number of elements in the queue + volatile uint32_t m_count; +#endif + + /// @brief where a new element will be inserted + volatile uint32_t m_writeIndex; + + /// @brief where the next element where be extracted from + volatile uint32_t m_readIndex; + + /// @brief calculate the index in the circular array that corresponds + /// to a particular "count" value + inline uint32_t countToIndex(uint32_t a_count); +}; + +// include the implementation file +#include "array_lock_free_queue_single_producer_impl.h" + +#endif // __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__ diff --git a/queue/array_lock_free_queue_single_producer_impl.h b/queue/array_lock_free_queue_single_producer_impl.h new file mode 100644 index 0000000..3c27ec4 --- /dev/null +++ b/queue/array_lock_free_queue_single_producer_impl.h @@ -0,0 +1,182 @@ +// ============================================================================ +// Copyright (c) 2010 Faustino Frechilla +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// 3. The name of the author may not be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +/// @file array_lock_free_queue_single_producer_impl.h +/// @brief Implementation of a circular array based lock-free queue +/// +/// @author Faustino Frechilla +/// @history +/// Ref Who When What +/// Faustino Frechilla 11-Jul-2010 Original development +/// @endhistory +/// +// ============================================================================ + +#ifndef __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__ +#define __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__ + +#include // assert() + +template +ArrayLockFreeQueueSingleProducer::ArrayLockFreeQueueSingleProducer() : + m_writeIndex(0), + m_readIndex(0) +{ +#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE + m_count = 0; +#endif +} + +template +ArrayLockFreeQueueSingleProducer::~ArrayLockFreeQueueSingleProducer() +{ +} + +template +inline +uint32_t ArrayLockFreeQueueSingleProducer::countToIndex(uint32_t a_count) +{ + // if Q_SIZE is a power of 2 this statement could be also written as + // return (a_count & (Q_SIZE - 1)); + return (a_count % Q_SIZE); +} + +template +uint32_t ArrayLockFreeQueueSingleProducer::size() +{ +#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE + return m_count; +#else + uint32_t currentWriteIndex = m_writeIndex; + uint32_t currentReadIndex = m_readIndex; + + // let's think of a scenario where this function returns bogus data + // 1. when the statement 'currentWriteIndex = m_writeIndex' is run + // m_writeIndex is 3 and m_readIndex is 2. Real size is 1 + // 2. afterwards this thread is preemted. While this thread is inactive 2 + // elements are inserted and removed from the queue, so m_writeIndex is 5 + // m_readIndex 4. Real size is still 1 + // 3. Now the current thread comes back from preemption and reads m_readIndex. + // currentReadIndex is 4 + // 4. currentReadIndex is bigger than currentWriteIndex, so + // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, + // it returns that the queue is almost full, when it is almost empty + + if (currentWriteIndex >= currentReadIndex) + { + return (currentWriteIndex - currentReadIndex); + } + else + { + return (Q_SIZE + currentWriteIndex - currentReadIndex); + } +#endif // ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE +} + +template +bool ArrayLockFreeQueueSingleProducer::push(const ELEM_T &a_data) +{ + uint32_t currentReadIndex; + uint32_t currentWriteIndex; + + currentWriteIndex = m_writeIndex; + currentReadIndex = m_readIndex; + if (countToIndex(currentWriteIndex + 1) == + countToIndex(currentReadIndex)) + { + // the queue is full + return false; + } + + // save the date into the q + m_theQueue[countToIndex(currentWriteIndex)] = a_data; + // increment atomically write index. Now a consumer thread can read + // the piece of data that was just stored + AtomicAdd(&m_writeIndex, 1); + + // The value was successfully inserted into the queue +#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE + AtomicAdd(&m_count, 1); +#endif + + return true; +} + +template +bool ArrayLockFreeQueueSingleProducer::pop(ELEM_T &a_data) +{ + uint32_t currentMaximumReadIndex; + uint32_t currentReadIndex; + + do + { + // m_maximumReadIndex doesn't exist when the queue is set up as + // single-producer. The maximum read index is described by the current + // write index + currentReadIndex = m_readIndex; + currentMaximumReadIndex = m_writeIndex; + + if (countToIndex(currentReadIndex) == + countToIndex(currentMaximumReadIndex)) + { + // the queue is empty or + // a producer thread has allocate space in the queue but is + // waiting to commit the data into it + return false; + } + + // retrieve the data from the queue + a_data = m_theQueue[countToIndex(currentReadIndex)]; + + // try to perfrom now the CAS operation on the read index. If we succeed + // a_data already contains what m_readIndex pointed to before we + // increased it + if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) + { + // got here. The value was retrieved from the queue. Note that the + // data inside the m_queue array is not deleted nor reseted +#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE + AtomicSub(&m_count, 1); +#endif + return true; + } + + // it failed retrieving the element off the queue. Someone else must + // have read the element stored at countToIndex(currentReadIndex) + // before we could perform the CAS operation + + } while(1); // keep looping to try again! + + // Something went wrong. it shouldn't be possible to reach here + assert(0); + + // Add this return statement to avoid compiler warnings + return false; +} + +#endif // __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__ + diff --git a/queue/atomic_ops.h b/queue/atomic_ops.h new file mode 100644 index 0000000..30e9fe3 --- /dev/null +++ b/queue/atomic_ops.h @@ -0,0 +1,79 @@ +// ============================================================================ +// Copyright (c) 2010 Faustino Frechilla +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// 3. The name of the author may not be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +/// @file atomic_ops.h +/// @brief This file contains functions to wrap the built-in atomic operations +/// defined for your compiler +/// +/// @author Faustino Frechilla +/// @history +/// Ref Who When What +/// Faustino Frechilla 11-Jul-2010 Original development. GCC support +/// @endhistory +/// +// ============================================================================ + +#ifndef __ATOMIC_OPS_H +#define __ATOMIC_OPS_H + +#ifdef __GNUC__ +// Atomic functions in GCC are present from version 4.1.0 on +// http://gcc.gnu.org/onlinedocs/gcc-4.1.0/gcc/Atomic-Builtins.html + +// Test for GCC >= 4.1.0 +#if (__GNUC__ < 4) || \ + ((__GNUC__ == 4) && ((__GNUC_MINOR__ < 1) || \ + ((__GNUC_MINOR__ == 1) && \ + (__GNUC_PATCHLEVEL__ < 0))) ) + +#error Atomic built-in functions are only available in GCC in versions >= 4.1.0 +#endif // end of check for GCC 4.1.0 + +/// @brief atomically adds a_count to the variable pointed by a_ptr +/// @return the value that had previously been in memory +#define AtomicAdd(a_ptr,a_count) __sync_fetch_and_add (a_ptr, a_count) + +/// @brief atomically substracts a_count from the variable pointed by a_ptr +/// @return the value that had previously been in memory +#define AtomicSub(a_ptr,a_count) __sync_fetch_and_sub (a_ptr, a_count) + +/// @brief Compare And Swap +/// If the current value of *a_ptr is a_oldVal, then write a_newVal into *a_ptr +/// @return true if the comparison is successful and a_newVal was written +#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal) + +/// @brief Compare And Swap +/// If the current value of *a_ptr is a_oldVal, then write a_newVal into *a_ptr +/// @return the contents of *a_ptr before the operation +#define CASVal(a_ptr, a_oldVal, a_newVal) __sync_val_compare_and_swap(a_ptr, a_oldVal, a_newVal) + +#else +#error Atomic functions such as CAS or AtomicAdd are not defined for your compiler. Please add them in atomic_ops.h +#endif // __GNUC__ + + +#endif // __ATOMIC_OPS_H diff --git a/queue/g_blocking_queue.h b/queue/g_blocking_queue.h new file mode 100644 index 0000000..a43295d --- /dev/null +++ b/queue/g_blocking_queue.h @@ -0,0 +1,122 @@ +// ============================================================================ +// Copyright (c) 2009-2010 Faustino Frechilla +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// 3. The name of the author may not be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +/// @file q_blocking_queue.h +/// @brief Definition of a thread-safe queue based on glib system calls +/// It internally contains a std::queue which is protected from concurrent +/// access by glib mutextes and conditional variables +/// +/// @author Faustino Frechilla +/// @history +/// Ref Who When What +/// Faustino Frechilla 04-May-2009 Original development (based on pthreads) +/// Faustino Frechilla 19-May-2010 Ported to glib. Removed pthread dependency +/// @endhistory +/// +// ============================================================================ + +#ifndef _GBLOCKINGQUEUE_H_ +#define _GBLOCKINGQUEUE_H_ + +#include +#include +#include // std::numeric_limits<>::max + +#define BLOCKING_QUEUE_DEFAULT_MAX_SIZE std::numeric_limits::max() + +/// @brief blocking thread-safe queue +/// It uses a mutex+condition variables to protect the internal queue +/// implementation. Inserting or reading elements use the same mutex +template +class BlockingQueue +{ +public: + BlockingQueue(std::size_t a_maxSize = BLOCKING_QUEUE_DEFAULT_MAX_SIZE); + ~BlockingQueue(); + + /// @brief Check if the queue is empty + /// This call can block if another thread owns the lock that protects the + /// queue + /// @return true if the queue is empty. False otherwise + bool IsEmpty(); + + /// @brief inserts an element into queue queue + /// This call can block if another thread owns the lock that protects the + /// queue. If the queue is full The thread will be blocked in this queue + /// until someone else gets an element from the queue + /// @param element to insert into the queue + /// @return True if the elem was successfully inserted into the queue. + /// False otherwise + bool Push(const T &a_elem); + + /// @brief inserts an element into queue queue + /// This call can block if another thread owns the lock that protects the + /// queue. If the queue is full The call will return false and the element + /// won't be inserted + /// @param element to insert into the queue + /// @return True if the elem was successfully inserted into the queue. + /// False otherwise + bool TryPush(const T &a_elem); + + /// @brief extracts an element from the queue (and deletes it from the q) + /// If the queue is empty this call will block the thread until there is + /// something in the queue to be extracted + /// @param a reference where the element from the queue will be saved to + void Pop(T &out_data); + + /// @brief extracts an element from the queue (and deletes it from the q) + /// This call gets the block that protects the queue. It will extract the + /// element from the queue only if there are elements in it + /// @param reference to the variable where the result will be saved + /// @return True if the element was retrieved from the queue. + /// False if the queue was empty + bool TryPop(T &out_data); + + /// @brief extracts an element from the queue (and deletes it from the q) + /// If the queue is empty this call will block the thread until there + /// is something in the queue to be extracted or until the timer + /// (2nd parameter) expires + /// @param reference to the variable where the result will be saved + /// @param microsecondsto wait before returning if the queue was empty + /// @return True if the element was retrieved from the queue. + /// False if the timeout was reached + bool TimedWaitPop(T &data, glong microsecs); + +protected: + std::queue m_theQueue; + /// maximum number of elements for the queue + std::size_t m_maximumSize; + /// Mutex to protect the queue + GMutex* m_mutex; + /// Conditional variable to wake up threads + GCond* m_cond; +}; + +// include the implementation file +#include "g_blocking_queue_impl.h" + +#endif /* _GBLOCKINGQUEUE_H_ */ diff --git a/queue/g_blocking_queue_impl.h b/queue/g_blocking_queue_impl.h new file mode 100644 index 0000000..7312814 --- /dev/null +++ b/queue/g_blocking_queue_impl.h @@ -0,0 +1,224 @@ +// ============================================================================ +// Copyright (c) 2009-2010 Faustino Frechilla +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// 3. The name of the author may not be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +/// @file q_blocking_queue_impl.h +/// @brief Implementation of a thread-safe queue based on glib system calls +/// It internally contains a std::queue which is protected from concurrent +/// access by glib mutextes and conditional variables +/// +/// @author Faustino Frechilla +/// @history +/// Ref Who When What +/// Faustino Frechilla 04-May-2009 Original development (based on pthreads) +/// Faustino Frechilla 19-May-2010 Ported to glib. Removed pthread dependency +/// @endhistory +/// +// ============================================================================ + +#ifndef _GBLOCKINGQUEUEIMPL_H_ +#define _GBLOCKINGQUEUEIMPL_H_ + +#include + +#define NANOSECONDS_PER_SECOND 1000000000 + +template +BlockingQueue::BlockingQueue(std::size_t a_maxSize) : + m_maximumSize(a_maxSize) +{ + if (!g_thread_supported ()) + { + // glib thread system hasn't been initialized yet + g_thread_init(NULL); + } + + m_mutex = g_mutex_new(); + m_cond = g_cond_new(); + + assert(m_mutex != NULL); + assert(m_cond != NULL); +} + +template +BlockingQueue::~BlockingQueue() +{ + g_cond_free(m_cond); + g_mutex_free(m_mutex); +} + +template +bool BlockingQueue::IsEmpty() +{ + bool rv; + + g_mutex_lock(m_mutex); + rv = m_theQueue.empty(); + g_mutex_unlock(m_mutex); + + return rv; +} + +template +bool BlockingQueue::Push(const T &a_elem) +{ + g_mutex_lock(m_mutex); + + while (m_theQueue.size() >= m_maximumSize) + { + g_cond_wait(m_cond, m_mutex); + } + + bool queueEmpty = m_theQueue.empty(); + + m_theQueue.push(a_elem); + + if (queueEmpty) + { + // wake up threads waiting for stuff + g_cond_broadcast(m_cond); + } + + g_mutex_unlock(m_mutex); + + return true; +} + +template +bool BlockingQueue::TryPush(const T &a_elem) +{ + g_mutex_lock(m_mutex); + + bool rv = false; + bool queueEmpty = m_theQueue.empty(); + + if (m_theQueue.size() < m_maximumSize) + { + m_theQueue.push(a_elem); + rv = true; + } + + if (queueEmpty) + { + // wake up threads waiting for stuff + g_cond_broadcast(m_cond); + } + + g_mutex_unlock(m_mutex); + + return rv; +} + +template +void BlockingQueue::Pop(T &out_data) +{ + g_mutex_lock(m_mutex); + + while (m_theQueue.empty()) + { + g_cond_wait(m_cond, m_mutex); + } + + bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false; + + out_data = m_theQueue.front(); + m_theQueue.pop(); + + if (queueFull) + { + // wake up threads waiting for stuff + g_cond_broadcast(m_cond); + } + + g_mutex_unlock(m_mutex); +} + +template +bool BlockingQueue::TryPop(T &out_data) +{ + g_mutex_lock(m_mutex); + + bool rv = false; + if (!m_theQueue.empty()) + { + bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false; + + out_data = m_theQueue.front(); + m_theQueue.pop(); + + if (queueFull) + { + // wake up threads waiting for stuff + g_cond_broadcast(m_cond); + } + + rv = true; + } + + g_mutex_unlock(m_mutex); + + return rv; +} + +template +bool BlockingQueue::TimedWaitPop(T &data, glong microsecs) +{ + g_mutex_lock(m_mutex); + + // adding microsecs to now + GTimeVal abs_time; + g_get_current_time(&abs_time); + g_time_val_add(&abs_time, microsecs); + + gboolean retcode = TRUE; + while (m_theQueue.empty() && (retcode != FALSE)) + { + // Returns TRUE if cond was signalled, or FALSE on timeout + retcode = g_cond_timed_wait(m_cond, m_mutex, &abs_time); + } + + bool rv = false; + bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false; + if (retcode != FALSE) + { + data = m_theQueue.front(); + m_theQueue.pop(); + + rv = true; + } + + if (rv && queueFull) + { + // wake up threads waiting for stuff + g_cond_broadcast(m_cond); + } + + g_mutex_unlock(m_mutex); + + return rv; +} + +#endif /* _GBLOCKINGQUEUEIMPL_H_ */ diff --git a/queue/makefile b/queue/makefile new file mode 100644 index 0000000..ee96d75 --- /dev/null +++ b/queue/makefile @@ -0,0 +1,67 @@ +# -fno-schedule-insns -fno-rerun-loop-opt are a workaround for a compiler error in 4.2 +# -Wno-unused-parameter + +CC = g++ +CFLAGS = -g -O3 -fopenmp -fno-schedule-insns -fno-schedule-insns2 -W -Wall #-Wno-unused-parameter +CFLAGS += `pkg-config --cflags glib-2.0` +#CFLAGS += -march=i686 +#CFLAGS += -march=core2 +LDFLAGS = -lgomp +LDFLAGS+= `pkg-config --libs glib-2.0` +# g_blocking_queue also depends on gthread-2.0 +CFLAGS_GTHREAD = `pkg-config gthread-2.0` +LDFLAGS_GTHREAD = `pkg-config --libs gthread-2.0` + +#compile-time parameters +ifdef N_PRODUCERS +CFLAGS += -DN_PRODUCERS=$(N_PRODUCERS) +endif +ifdef N_CONSUMERS +CFLAGS += -DN_CONSUMERS=$(N_CONSUMERS) +endif +ifdef N_ITERATIONS +CFLAGS += -DN_ITERATIONS=$(N_ITERATIONS) +endif +ifdef QUEUE_SIZE +CFLAGS += -DQUEUE_SIZE=$(QUEUE_SIZE) +endif + + +LOCK_FREE_Q_INCLUDE = \ + array_lock_free_queue.h \ + array_lock_free_queue_impl.h + +BLOCKING_Q_INCLUDE = \ + g_blocking_queue.h \ + g_blocking_queue_impl.h + +LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE = \ + array_lock_free_queue_single_producer.h \ + array_lock_free_queue_single_producer_impl.h + +SHARED_INCLUDE = \ + atomic_ops.h + +all : test_lock_free_q test_lock_free_single_producer_q test_blocking_q + +test_lock_free_q : test_lock_free_q.o + $(CC) $(OBJS) -o $@ $@.o $(LDFLAGS) + +test_blocking_q : test_blocking_q.o + $(CC) $(OBJS) -o $@ $@.o $(LDFLAGS) $(LDFLAGS_GTHREAD) + +test_lock_free_single_producer_q : test_lock_free_single_producer_q.o + $(CC) $(OBJS) -o $@ $@.o $(LDFLAGS) + +test_lock_free_q.o : test_lock_free_q.cpp $(SHARED_INCLUDE) $(LOCK_FREE_Q_INCLUDE) + $(CC) -c $< $(CFLAGS) + +test_lock_free_single_producer_q.o : test_lock_free_single_producer_q.cpp $(SHARED_INCLUDE) $(LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE) + $(CC) -c $< $(CFLAGS) + +test_blocking_q.o: test_blocking_q.cpp $(SHARED_INCLUDE) $(BLOCKING_Q_INCLUDE) + $(CC) -c $< $(CFLAGS) $(CFLAGS_GTHREAD) + +clean: + rm test_lock_free_q test_blocking_q test_lock_free_single_producer_q; rm *.o + diff --git a/queue/tags b/queue/tags new file mode 100644 index 0000000..8c5c88d --- /dev/null +++ b/queue/tags @@ -0,0 +1,89 @@ +!_TAG_FILE_FORMAT 2 /extended format; --format=1 will not append ;" to lines/ +!_TAG_FILE_SORTED 1 /0=unsorted, 1=sorted, 2=foldcase/ +!_TAG_PROGRAM_AUTHOR Darren Hiebert /dhiebert@users.sourceforge.net/ +!_TAG_PROGRAM_NAME Exuberant Ctags // +!_TAG_PROGRAM_URL http://ctags.sourceforge.net /official site/ +!_TAG_PROGRAM_VERSION 5.9~svn20110310 // +ARRAY_LOCK_FREE_Q_DEFAULT_SIZE array_lock_free_queue.h 45;" d +ARRAY_LOCK_FREE_Q_DEFAULT_SIZE array_lock_free_queue_single_producer.h 49;" d +ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE array_lock_free_queue.h 50;" d +ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE array_lock_free_queue_single_producer.h 54;" d +ArrayLockFreeQueue array_lock_free_queue.h /^class ArrayLockFreeQueue$/;" c +ArrayLockFreeQueue array_lock_free_queue_impl.h /^ArrayLockFreeQueue::ArrayLockFreeQueue() :$/;" f class:ArrayLockFreeQueue +ArrayLockFreeQueueSingleProducer array_lock_free_queue_single_producer.h /^class ArrayLockFreeQueueSingleProducer$/;" c +ArrayLockFreeQueueSingleProducer array_lock_free_queue_single_producer_impl.h /^ArrayLockFreeQueueSingleProducer::ArrayLockFreeQueueSingleProducer() :$/;" f class:ArrayLockFreeQueueSingleProducer +AtomicAdd atomic_ops.h 51;" d +AtomicAdd atomic_ops.h 87;" d +AtomicSub atomic_ops.h 55;" d +AtomicSub atomic_ops.h 91;" d +BLOCKING_QUEUE_DEFAULT_MAX_SIZE g_blocking_queue.h 49;" d +BLOCKING_Q_INCLUDE makefile /^BLOCKING_Q_INCLUDE = \\$/;" m +BlockingQueue g_blocking_queue.h /^class BlockingQueue$/;" c +BlockingQueue g_blocking_queue_impl.h /^BlockingQueue::BlockingQueue(std::size_t a_maxSize) :$/;" f class:BlockingQueue +CAS atomic_ops.h 60;" d +CAS atomic_ops.h 96;" d +CASVal atomic_ops.h 101;" d +CASVal atomic_ops.h 65;" d +CC makefile /^CC = g++$/;" m +CFLAGS_GTHREAD makefile /^CFLAGS_GTHREAD = `pkg-config gthread-2.0`$/;" m +IsEmpty g_blocking_queue_impl.h /^bool BlockingQueue::IsEmpty()$/;" f class:BlockingQueue +LDFLAGS_GTHREAD makefile /^LDFLAGS_GTHREAD = `pkg-config --libs gthread-2.0`$/;" m +LOCK_FREE_Q_INCLUDE makefile /^LOCK_FREE_Q_INCLUDE = \\$/;" m +LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE makefile /^LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE = \\$/;" m +NANOSECONDS_PER_SECOND g_blocking_queue_impl.h 47;" d +N_CONSUMERS test_blocking_q.cpp 17;" d file: +N_CONSUMERS test_lock_free_q.cpp 17;" d file: +N_CONSUMERS test_lock_free_single_producer_q.cpp 13;" d file: +N_ITERATIONS test_blocking_q.cpp 21;" d file: +N_ITERATIONS test_lock_free_q.cpp 21;" d file: +N_ITERATIONS test_lock_free_single_producer_q.cpp 17;" d file: +N_PRODUCERS test_blocking_q.cpp 13;" d file: +N_PRODUCERS test_lock_free_q.cpp 13;" d file: +Pop g_blocking_queue_impl.h /^void BlockingQueue::Pop(T &out_data)$/;" f class:BlockingQueue +Push g_blocking_queue_impl.h /^bool BlockingQueue::Push(const T &a_elem)$/;" f class:BlockingQueue +QUEUE_SIZE test_blocking_q.cpp 25;" d file: +QUEUE_SIZE test_lock_free_q.cpp 25;" d file: +QUEUE_SIZE test_lock_free_single_producer_q.cpp 21;" d file: +SHARED_INCLUDE makefile /^SHARED_INCLUDE = \\$/;" m +TestBlockingQueue test_blocking_q.cpp /^void TestBlockingQueue()$/;" f +TestLockFreeQueue test_lock_free_q.cpp /^void TestLockFreeQueue()$/;" f +TestLockFreeQueue test_lock_free_single_producer_q.cpp /^void TestLockFreeQueue()$/;" f +TimedWaitPop g_blocking_queue_impl.h /^bool BlockingQueue::TimedWaitPop(T &data, glong microsecs)$/;" f class:BlockingQueue +TryPop g_blocking_queue_impl.h /^bool BlockingQueue::TryPop(T &out_data)$/;" f class:BlockingQueue +TryPush g_blocking_queue_impl.h /^bool BlockingQueue::TryPush(const T &a_elem)$/;" f class:BlockingQueue +_GBLOCKINGQUEUEIMPL_H_ g_blocking_queue_impl.h 43;" d +_GBLOCKINGQUEUE_H_ g_blocking_queue.h 43;" d +__ARRAY_LOCK_FREE_QUEUE_H__ array_lock_free_queue.h 40;" d +__ARRAY_LOCK_FREE_QUEUE_IMPL_H__ array_lock_free_queue_impl.h 40;" d +__ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__ array_lock_free_queue_single_producer.h 44;" d +__ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__ array_lock_free_queue_single_producer_impl.h 40;" d +__ATOMIC_OPS_H atomic_ops.h 41;" d +atomic32 atomic_ops.h /^typedef atomic_uint atomic32;$/;" t +atomic32 atomic_ops.h /^typedef volatile uint32 atomic32;$/;" t +countToIndex array_lock_free_queue_impl.h /^uint32_t ArrayLockFreeQueue::countToIndex(uint32_t a_count)$/;" f class:ArrayLockFreeQueue +countToIndex array_lock_free_queue_single_producer_impl.h /^uint32_t ArrayLockFreeQueueSingleProducer::countToIndex(uint32_t a_count)$/;" f class:ArrayLockFreeQueueSingleProducer +m_cond g_blocking_queue.h /^ GCond* m_cond;$/;" m class:BlockingQueue +m_count array_lock_free_queue.h /^ atomic32 m_count;$/;" m class:ArrayLockFreeQueue +m_count array_lock_free_queue_single_producer.h /^ atomic32 m_count;$/;" m class:ArrayLockFreeQueueSingleProducer +m_maximumReadIndex array_lock_free_queue.h /^ atomic32 m_maximumReadIndex;$/;" m class:ArrayLockFreeQueue +m_maximumSize g_blocking_queue.h /^ std::size_t m_maximumSize;$/;" m class:BlockingQueue +m_mutex g_blocking_queue.h /^ GMutex* m_mutex;$/;" m class:BlockingQueue +m_readIndex array_lock_free_queue.h /^ atomic32 m_readIndex;$/;" m class:ArrayLockFreeQueue +m_readIndex array_lock_free_queue_single_producer.h /^ atomic32 m_readIndex;$/;" m class:ArrayLockFreeQueueSingleProducer +m_theQueue array_lock_free_queue.h /^ ELEM_T m_theQueue[Q_SIZE];$/;" m class:ArrayLockFreeQueue +m_theQueue array_lock_free_queue_single_producer.h /^ ELEM_T m_theQueue[Q_SIZE];$/;" m class:ArrayLockFreeQueueSingleProducer +m_theQueue g_blocking_queue.h /^ std::queue m_theQueue;$/;" m class:BlockingQueue +m_writeIndex array_lock_free_queue.h /^ atomic32 m_writeIndex;$/;" m class:ArrayLockFreeQueue +m_writeIndex array_lock_free_queue_single_producer.h /^ atomic32 m_writeIndex;$/;" m class:ArrayLockFreeQueueSingleProducer +main test_blocking_q.cpp /^int main(int \/*argc*\/, char** \/*argv*\/)$/;" f +main test_lock_free_q.cpp /^int main(int \/*argc*\/, char** \/*argv*\/)$/;" f +main test_lock_free_single_producer_q.cpp /^int main(int \/*argc*\/, char** \/*argv*\/)$/;" f +pop array_lock_free_queue_impl.h /^bool ArrayLockFreeQueue::pop(ELEM_T &a_data)$/;" f class:ArrayLockFreeQueue +pop array_lock_free_queue_single_producer_impl.h /^bool ArrayLockFreeQueueSingleProducer::pop(ELEM_T &a_data)$/;" f class:ArrayLockFreeQueueSingleProducer +push array_lock_free_queue_impl.h /^bool ArrayLockFreeQueue::push(const ELEM_T &a_data)$/;" f class:ArrayLockFreeQueue +push array_lock_free_queue_single_producer_impl.h /^bool ArrayLockFreeQueueSingleProducer::push(const ELEM_T &a_data)$/;" f class:ArrayLockFreeQueueSingleProducer +size array_lock_free_queue_impl.h /^uint32_t ArrayLockFreeQueue::size()$/;" f class:ArrayLockFreeQueue +size array_lock_free_queue_single_producer_impl.h /^uint32_t ArrayLockFreeQueueSingleProducer::size()$/;" f class:ArrayLockFreeQueueSingleProducer +~ArrayLockFreeQueue array_lock_free_queue_impl.h /^ArrayLockFreeQueue::~ArrayLockFreeQueue()$/;" f class:ArrayLockFreeQueue +~ArrayLockFreeQueueSingleProducer array_lock_free_queue_single_producer_impl.h /^ArrayLockFreeQueueSingleProducer::~ArrayLockFreeQueueSingleProducer()$/;" f class:ArrayLockFreeQueueSingleProducer +~BlockingQueue g_blocking_queue_impl.h /^BlockingQueue::~BlockingQueue()$/;" f class:BlockingQueue diff --git a/queue/test_blocking_q.cpp b/queue/test_blocking_q.cpp new file mode 100644 index 0000000..72472c8 --- /dev/null +++ b/queue/test_blocking_q.cpp @@ -0,0 +1,112 @@ +// ============================================================================ +/// @file test_blocking_q.cpp +/// @brief Benchmark blocking queue +// ============================================================================ + + +#include +#include // GTimeVal + g_get_current_time +#include // parallel processing support in gcc +#include "g_blocking_queue.h" + +#ifndef N_PRODUCERS +#define N_PRODUCERS 1 +#endif + +#ifndef N_CONSUMERS +#define N_CONSUMERS 1 +#endif + +#ifndef N_ITERATIONS +#define N_ITERATIONS 10000000 +#endif + +#ifndef QUEUE_SIZE +#define QUEUE_SIZE 1000 +#endif + +void TestBlockingQueue() +{ + BlockingQueue theQueue(QUEUE_SIZE); + GTimeVal iniTimestamp; + GTimeVal endTimestamp; + + std::cout << "=== Start of testing blocking queue ===" << std::endl; + g_get_current_time(&iniTimestamp); + #pragma omp parallel shared(theQueue) num_threads (2) + { + if (omp_get_thread_num() == 0) + { + if (!omp_get_nested()) + { + std::cerr << "WARNING: Nested parallel regions not supported. Working threads might have unexpected behaviour" << std::endl; + std::cerr << "Are you running with \"OMP_NESTED=TRUE\"??" << std::endl; + } + } + + #pragma omp sections //nowait + { + #pragma omp section + { + // producer section + #pragma omp parallel shared(theQueue) num_threads (N_PRODUCERS) + { + int i; + #pragma omp for schedule(static) private(i) nowait + for (i = 0 ; i < N_ITERATIONS ; i++) + { + while(!theQueue.Push(i)) + { + // queue full + } + } + } + } + + #pragma omp section + { + // consumer section + #pragma omp parallel shared(theQueue) num_threads (N_CONSUMERS) + { + int i; + int result; + #pragma omp for schedule(static) private(i, result) nowait + for (i = 0 ; i < N_ITERATIONS ; i++) + { + // this call will block if the queue is empty until + // some other thread pushes something into it + theQueue.Pop(result); + } + } + } + } + + } // #pragma omp parallel + + g_get_current_time(&endTimestamp); + + // calculate elapsed time + GTimeVal elapsedTime; + if (endTimestamp.tv_usec >= iniTimestamp.tv_usec) + { + elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec; + elapsedTime.tv_usec = endTimestamp.tv_usec - iniTimestamp.tv_usec; + } + else + { + elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec - 1; + elapsedTime.tv_usec = G_USEC_PER_SEC + endTimestamp.tv_usec - iniTimestamp.tv_usec; + } + + std::cout << "Elapsed: " << elapsedTime.tv_sec << "." << elapsedTime.tv_usec << std::endl; + std::cout << "=== End of testing blocking queue ===" << std::endl; +} + +int main(int /*argc*/, char** /*argv*/) +{ + TestBlockingQueue(); + std::cout << "Done!!!" << std::endl; + + return 0; +} + diff --git a/queue/test_lock_free_q.cpp b/queue/test_lock_free_q.cpp new file mode 100644 index 0000000..d3621e8 --- /dev/null +++ b/queue/test_lock_free_q.cpp @@ -0,0 +1,132 @@ +// ============================================================================ +/// @file test_lock_free_q.cpp +/// @brief Benchmark lock free queue +// ============================================================================ + + +#include +#include // GTimeVal + g_get_current_time +#include // parallel processing support in gcc +#include "array_lock_free_queue.h" + +#ifndef N_PRODUCERS +#define N_PRODUCERS 1 +#endif + +#ifndef N_CONSUMERS +#define N_CONSUMERS 1 +#endif + +#ifndef N_ITERATIONS +#define N_ITERATIONS 10000000 +#endif + +#ifndef QUEUE_SIZE +#define QUEUE_SIZE 1024 +#endif + +void TestLockFreeQueue() +{ + ArrayLockFreeQueue theQueue; + GTimeVal iniTimestamp; + GTimeVal endTimestamp; + + std::cout << "=== Start of testing lock-free queue ===" << std::endl; + g_get_current_time(&iniTimestamp); + #pragma omp parallel shared(theQueue) num_threads (2) + { + if (omp_get_thread_num() == 0) + { + //std::cout << "=== Testing Non blocking queue with " << omp_get_num_threads() << " threads ===" << std::endl; + + if (!omp_get_nested()) + { + std::cerr << "WARNING: Nested parallel regions not supported. Working threads might have unexpected behaviour" << std::endl; + std::cerr << "Are you running with \"OMP_NESTED=TRUE\"??" << std::endl; + } + } + + #pragma omp sections //nowait + { + #pragma omp section + { + // producer section + #pragma omp parallel shared(theQueue) num_threads (N_PRODUCERS) + { + //if (omp_get_thread_num() == 0) + //{ + // std::cout << "\t Producers: " << omp_get_num_threads() << std::endl; + //} + int i; + #pragma omp for schedule(static) private(i) nowait + for (i = 0 ; i < N_ITERATIONS ; i++) + { + while(!theQueue.push(i)) + { + // queue full + } + } + } + } + + #pragma omp section + { + // consumer section + #pragma omp parallel shared(theQueue) num_threads (N_CONSUMERS) + { + //if (omp_get_thread_num() == 0) + //{ + // std::cout << "\t Consumers: " << omp_get_num_threads() << std::endl; + //} + + int i; + int result; + #pragma omp for schedule(static) private(i, result) nowait + for (i = 0 ; i < N_ITERATIONS ; i++) + { + while (!theQueue.pop(result)) + { + // queue empty + } + +#if (N_CONSUMERS == 1 && N_PRODUCERS == 1) + if (i != result) + { + std::cout << "FAILED i=" << i << " result=" << result << std::endl; + } +#endif + } + } + } + } + + } // #pragma omp parallel + + g_get_current_time(&endTimestamp); + + // calculate elapsed time + GTimeVal elapsedTime; + if (endTimestamp.tv_usec >= iniTimestamp.tv_usec) + { + elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec; + elapsedTime.tv_usec = endTimestamp.tv_usec - iniTimestamp.tv_usec; + } + else + { + elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec - 1; + elapsedTime.tv_usec = G_USEC_PER_SEC + endTimestamp.tv_usec - iniTimestamp.tv_usec; + } + + std::cout << "Elapsed: " << elapsedTime.tv_sec << "." << elapsedTime.tv_usec << std::endl; + std::cout << "=== End of testing lock-free queue ===" << std::endl; +} + +int main(int /*argc*/, char** /*argv*/) +{ + + TestLockFreeQueue(); + std::cout << "Done!!!" << std::endl; + + return 0; +} + diff --git a/queue/test_lock_free_single_producer_q.cpp b/queue/test_lock_free_single_producer_q.cpp new file mode 100644 index 0000000..585d9ea --- /dev/null +++ b/queue/test_lock_free_single_producer_q.cpp @@ -0,0 +1,130 @@ +// ============================================================================ +/// @file test_lock_free_q.cpp +/// @brief Benchmark lock free queue +// ============================================================================ + + +#include +#include // GTimeVal + g_get_current_time +#include // parallel processing support in gcc +#include "array_lock_free_queue_single_producer.h" + +#ifndef N_CONSUMERS +#define N_CONSUMERS 1 +#endif + +#ifndef N_ITERATIONS +#define N_ITERATIONS 10000000 +#endif + +#ifndef QUEUE_SIZE +#define QUEUE_SIZE 1000 +#endif + +void TestLockFreeQueue() +{ + ArrayLockFreeQueueSingleProducer theQueue; + GTimeVal iniTimestamp; + GTimeVal endTimestamp; + + std::cout << "=== Start of testing lock-free queue ===" << std::endl; + g_get_current_time(&iniTimestamp); + #pragma omp parallel shared(theQueue) num_threads (2) + { + if (omp_get_thread_num() == 0) + { + //std::cout << "=== Testing Non blocking queue with " << omp_get_num_threads() << " threads ===" << std::endl; + + if (!omp_get_nested()) + { + std::cerr << "WARNING: Nested parallel regions not supported. Working threads might have unexpected behaviour" << std::endl; + std::cerr << "Are you running with \"OMP_NESTED=TRUE\"??" << std::endl; + } + } + + #pragma omp sections //nowait + { + #pragma omp section + { + // producer section. only 1 thread + //#pragma omp parallel shared(theQueue) num_threads (1) + { + //if (omp_get_thread_num() == 0) + //{ + // std::cout << "\t Producers: " << omp_get_num_threads() << std::endl; + //} + int i; + for (i = 0 ; i < N_ITERATIONS ; i++) + { + while(!theQueue.push(i)) + { + // queue full + ; + } + } + } + } + + #pragma omp section + { + // consumer section + #pragma omp parallel shared(theQueue) num_threads (N_CONSUMERS) + { + //if (omp_get_thread_num() == 0) + //{ + // std::cout << "\t Consumers: " << omp_get_num_threads() << std::endl; + //} + + int i; + int result; + #pragma omp for schedule(static) private(i, result) nowait + for (i = 0 ; i < N_ITERATIONS ; i++) + { + while (!theQueue.pop(result)) + { + // queue empty + ; + } + +#if N_CONSUMERS == 1 + // if there are several consumers this test will fail + if (i != result) + { + std::cout << "FAILED i=" << i << " result=" << result << std::endl; + } +#endif + } + } + } + } + + } // #pragma omp parallel + + g_get_current_time(&endTimestamp); + + // calculate elapsed time + GTimeVal elapsedTime; + if (endTimestamp.tv_usec >= iniTimestamp.tv_usec) + { + elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec; + elapsedTime.tv_usec = endTimestamp.tv_usec - iniTimestamp.tv_usec; + } + else + { + elapsedTime.tv_sec = endTimestamp.tv_sec - iniTimestamp.tv_sec - 1; + elapsedTime.tv_usec = G_USEC_PER_SEC + endTimestamp.tv_usec - iniTimestamp.tv_usec; + } + + std::cout << "Elapsed: " << elapsedTime.tv_sec << "." << elapsedTime.tv_usec << std::endl; + std::cout << "=== End of testing lock-free queue ===" << std::endl; +} + +int main(int /*argc*/, char** /*argv*/) +{ + + TestLockFreeQueue(); + std::cout << "Done!!!" << std::endl; + + return 0; +} + -- 2.34.1