add "lock-free queue"
authorBrian Norris <banorris@uci.edu>
Tue, 9 Oct 2012 17:57:56 +0000 (10:57 -0700)
committerBrian Norris <banorris@uci.edu>
Tue, 9 Oct 2012 17:57:56 +0000 (10:57 -0700)
From:
http://www.codeproject.com/Articles/153898/Yet-another-implementation-of-a-lock-free-circular

License:
http://opensource.org/licenses/bsd-license.php

12 files changed:
queue/array_lock_free_queue.h [new file with mode: 0644]
queue/array_lock_free_queue_impl.h [new file with mode: 0644]
queue/array_lock_free_queue_single_producer.h [new file with mode: 0644]
queue/array_lock_free_queue_single_producer_impl.h [new file with mode: 0644]
queue/atomic_ops.h [new file with mode: 0644]
queue/g_blocking_queue.h [new file with mode: 0644]
queue/g_blocking_queue_impl.h [new file with mode: 0644]
queue/makefile [new file with mode: 0644]
queue/tags [new file with mode: 0644]
queue/test_blocking_q.cpp [new file with mode: 0644]
queue/test_lock_free_q.cpp [new file with mode: 0644]
queue/test_lock_free_single_producer_q.cpp [new file with mode: 0644]

diff --git a/queue/array_lock_free_queue.h b/queue/array_lock_free_queue.h
new file mode 100644 (file)
index 0000000..e392889
--- /dev/null
@@ -0,0 +1,141 @@
+// ============================================================================
+// Copyright (c) 2010 Faustino Frechilla
+// All rights reserved.
+// 
+// Redistribution and use in source and binary forms, with or without 
+// modification, are permitted provided that the following conditions are met:
+//
+//  1. Redistributions of source code must retain the above copyright notice,
+//     this list of conditions and the following disclaimer.
+//  2. Redistributions in binary form must reproduce the above copyright 
+//     notice, this list of conditions and the following disclaimer in the
+//     documentation and/or other materials provided with the distribution.
+//  3. The name of the author may not be used to endorse or promote products
+//     derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
+// POSSIBILITY OF SUCH DAMAGE.
+//
+/// @file array_lock_free_queue.h
+/// @brief Definition of a circular array based lock-free queue
+///
+/// @author Faustino Frechilla
+/// @history
+/// Ref  Who                 When         What
+///      Faustino Frechilla  11-Jul-2010  Original development
+/// @endhistory
+/// 
+// ============================================================================
+
+#ifndef __ARRAY_LOCK_FREE_QUEUE_H__
+#define __ARRAY_LOCK_FREE_QUEUE_H__
+
+#include <stdint.h>     // uint32_t
+#include "atomic_ops.h" // atomic operations wrappers
+
+#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // (2^16)
+
+// define this variable if calls to "size" must return the real size of the 
+// queue. If it is undefined  that function will try to take a snapshot of 
+// the queue, but returned value might be bogus
+#undef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+//#define ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE 1
+
+
+/// @brief Lock-free queue based on a circular array
+/// No allocation of extra memory for the nodes handling is needed, but it has to add
+/// extra overhead (extra CAS operation) when inserting to ensure the thread-safety of the queue
+/// ELEM_T     represents the type of elementes pushed and popped from the queue
+/// TOTAL_SIZE size of the queue. It should be a power of 2 to ensure 
+///            indexes in the circular queue keep stable when the uint32_t 
+///            variable that holds the current position rolls over from FFFFFFFF
+///            to 0. For instance
+///            2    -> 0x02 
+///            4    -> 0x04
+///            8    -> 0x08
+///            16   -> 0x10
+///            (...) 
+///            1024 -> 0x400
+///            2048 -> 0x800
+///
+///            if queue size is not defined as requested, let's say, for
+///            instance 100, when current position is FFFFFFFF (4,294,967,295)
+///            index in the circular array is 4,294,967,295 % 100 = 95. 
+///            When that value is incremented it will be set to 0, that is the 
+///            last 4 elements of the queue are not used when the counter rolls
+///            over to 0
+template <typename ELEM_T, uint32_t Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
+class ArrayLockFreeQueue
+{
+public:
+    /// @brief constructor of the class
+    ArrayLockFreeQueue();
+    virtual ~ArrayLockFreeQueue();
+
+    /// @brief returns the current number of items in the queue
+    /// It tries to take a snapshot of the size of the queue, but in busy environments
+    /// this function might return bogus values. 
+    ///
+    /// If a reliable queue size must be kept you might want to have a look at 
+    /// the preprocessor variable in this header file called 'ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE'
+    /// it enables a reliable size though it hits overall performance of the queue 
+    /// (when the reliable size variable is on it's got an impact of about 20% in time)
+    uint32_t size();
+
+    /// @brief push an element at the tail of the queue
+    /// @param the element to insert in the queue
+    /// Note that the element is not a pointer or a reference, so if you are using large data
+    /// structures to be inserted in the queue you should think of instantiate the template
+    /// of the queue as a pointer to that large structure
+    /// @returns true if the element was inserted in the queue. False if the queue was full
+    bool push(const ELEM_T &a_data);
+
+    /// @brief pop the element at the head of the queue
+    /// @param a reference where the element in the head of the queue will be saved to
+    /// Note that the a_data parameter might contain rubbish if the function returns false
+    /// @returns true if the element was successfully extracted from the queue. False if the queue was empty
+    bool pop(ELEM_T &a_data);
+
+private:
+    /// @brief array to keep the elements
+    ELEM_T m_theQueue[Q_SIZE];
+
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+    /// @brief number of elements in the queue
+    volatile uint32_t m_count;
+#endif
+
+    /// @brief where a new element will be inserted
+    volatile uint32_t m_writeIndex;
+
+    /// @brief where the next element where be extracted from
+    volatile uint32_t m_readIndex;
+
+    /// @brief maximum read index for multiple producer queues
+    /// If it's not the same as m_writeIndex it means
+    /// there are writes pending to be "committed" to the queue, that means,
+    /// the place for the data was reserved (the index in the array) but  
+    /// data is still not in the queue, so the thread trying to read will have 
+    /// to wait for those other threads to save the data into the queue
+    ///
+    /// note this index is only used for MultipleProducerThread queues
+    volatile uint32_t m_maximumReadIndex;
+
+    /// @brief calculate the index in the circular array that corresponds
+    /// to a particular "count" value
+    inline uint32_t countToIndex(uint32_t a_count);
+};
+
+// include the implementation file
+#include "array_lock_free_queue_impl.h"
+
+#endif // __ARRAY_LOCK_FREE_QUEUE_H__
diff --git a/queue/array_lock_free_queue_impl.h b/queue/array_lock_free_queue_impl.h
new file mode 100644 (file)
index 0000000..d368bb5
--- /dev/null
@@ -0,0 +1,196 @@
+// ============================================================================
+// Copyright (c) 2010 Faustino Frechilla
+// All rights reserved.
+// 
+// Redistribution and use in source and binary forms, with or without 
+// modification, are permitted provided that the following conditions are met:
+//
+//  1. Redistributions of source code must retain the above copyright notice,
+//     this list of conditions and the following disclaimer.
+//  2. Redistributions in binary form must reproduce the above copyright 
+//     notice, this list of conditions and the following disclaimer in the
+//     documentation and/or other materials provided with the distribution.
+//  3. The name of the author may not be used to endorse or promote products
+//     derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
+// POSSIBILITY OF SUCH DAMAGE.
+//
+/// @file array_lock_free_queue_impl.h
+/// @brief Implementation of a circular array based lock-free queue
+///
+/// @author Faustino Frechilla
+/// @history
+/// Ref  Who                 When         What
+///      Faustino Frechilla  11-Jul-2010  Original development
+/// @endhistory
+/// 
+// ============================================================================
+
+#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_H__
+#define __ARRAY_LOCK_FREE_QUEUE_IMPL_H__
+
+#include <assert.h> // assert()
+#include <sched.h>  // sched_yield()
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() :
+    m_writeIndex(0),
+    m_readIndex(0),
+    m_maximumReadIndex(0) // only for MultipleProducerThread queues
+{
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+    m_count = 0;
+#endif
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue()
+{
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+inline
+uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)
+{
+    // if Q_SIZE is a power of 2 this statement could be also written as 
+    // return (a_count & (Q_SIZE - 1));
+    return (a_count % Q_SIZE);
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size()
+{
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+    return m_count;
+#else
+    uint32_t currentWriteIndex = m_writeIndex;
+    uint32_t currentReadIndex  = m_readIndex;
+
+    // let's think of a scenario where this function returns bogus data
+    // 1. when the statement 'currentWriteIndex = m_writeIndex' is run
+    // m_writeIndex is 3 and m_readIndex is 2. Real size is 1
+    // 2. afterwards this thread is preemted. While this thread is inactive 2 
+    // elements are inserted and removed from the queue, so m_writeIndex is 5
+    // m_readIndex 4. Real size is still 1
+    // 3. Now the current thread comes back from preemption and reads m_readIndex.
+    // currentReadIndex is 4
+    // 4. currentReadIndex is bigger than currentWriteIndex, so
+    // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
+    // it returns that the queue is almost full, when it is almost empty
+    
+    if (currentWriteIndex >= currentReadIndex)
+    {
+        return (currentWriteIndex - currentReadIndex);
+    }
+    else
+    {
+        return (Q_SIZE + currentWriteIndex - currentReadIndex);
+    }
+#endif // ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::push(const ELEM_T &a_data)
+{
+    uint32_t currentReadIndex;
+    uint32_t currentWriteIndex;
+
+    do
+    {
+        currentWriteIndex = m_writeIndex;
+        currentReadIndex  = m_readIndex;
+        if (countToIndex(currentWriteIndex + 1) ==
+            countToIndex(currentReadIndex))
+        {
+            // the queue is full
+            return false;
+        }
+
+    } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
+
+    // We know now that this index is reserved for us. Use it to save the data
+    m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+
+    // update the maximum read index after saving the data. It wouldn't fail if there is only one thread 
+    // inserting in the queue. It might fail if there are more than 1 producer threads because this
+    // operation has to be done in the same order as the previous CAS
+    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
+    {
+        // this is a good place to yield the thread in case there are more
+        // software threads than hardware processors and you have more
+        // than 1 producer thread
+        // have a look at sched_yield (POSIX.1b)
+        sched_yield();
+    }
+
+    // The value was successfully inserted into the queue
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+    AtomicAdd(&m_count, 1);
+#endif
+
+    return true;
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::pop(ELEM_T &a_data)
+{
+    uint32_t currentMaximumReadIndex;
+    uint32_t currentReadIndex;
+
+    do
+    {
+        // to ensure thread-safety when there is more than 1 producer thread
+        // a second index is defined (m_maximumReadIndex)
+        currentReadIndex        = m_readIndex;
+        currentMaximumReadIndex = m_maximumReadIndex;
+
+        if (countToIndex(currentReadIndex) == 
+            countToIndex(currentMaximumReadIndex))
+        {
+            // the queue is empty or
+            // a producer thread has allocate space in the queue but is 
+            // waiting to commit the data into it
+            return false;
+        }
+
+        // retrieve the data from the queue
+        a_data = m_theQueue[countToIndex(currentReadIndex)];
+
+        // try to perfrom now the CAS operation on the read index. If we succeed
+        // a_data already contains what m_readIndex pointed to before we 
+        // increased it
+        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
+        {
+            // got here. The value was retrieved from the queue. Note that the
+            // data inside the m_queue array is not deleted nor reseted
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+            AtomicSub(&m_count, 1);
+#endif
+            return true;
+        }
+        
+        // it failed retrieving the element off the queue. Someone else must
+        // have read the element stored at countToIndex(currentReadIndex)
+        // before we could perform the CAS operation        
+
+    } while(1); // keep looping to try again!
+
+    // Something went wrong. it shouldn't be possible to reach here
+    assert(0);
+
+    // Add this return statement to avoid compiler warnings
+    return false;
+}
+
+#endif // __ARRAY_LOCK_FREE_QUEUE_IMPL_H__
+
diff --git a/queue/array_lock_free_queue_single_producer.h b/queue/array_lock_free_queue_single_producer.h
new file mode 100644 (file)
index 0000000..1d80e16
--- /dev/null
@@ -0,0 +1,137 @@
+// ============================================================================
+// Copyright (c) 2010 Faustino Frechilla
+// All rights reserved.
+// 
+// Redistribution and use in source and binary forms, with or without 
+// modification, are permitted provided that the following conditions are met:
+//
+//  1. Redistributions of source code must retain the above copyright notice,
+//     this list of conditions and the following disclaimer.
+//  2. Redistributions in binary form must reproduce the above copyright 
+//     notice, this list of conditions and the following disclaimer in the
+//     documentation and/or other materials provided with the distribution.
+//  3. The name of the author may not be used to endorse or promote products
+//     derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
+// POSSIBILITY OF SUCH DAMAGE.
+//
+/// @file array_lock_free_queue_single_producer.h
+/// @brief Definition of a circular array based lock-free queue
+///
+/// WARNING: This queue is not thread safe when several threads try to insert 
+///          elements into the queue. It is allowed to use as many consumers
+///          as needed though.
+///
+/// @author Faustino Frechilla
+/// @history
+/// Ref  Who                 When         What
+///      Faustino Frechilla  11-Jul-2010  Original development
+/// @endhistory
+/// 
+// ============================================================================
+
+#ifndef __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__
+#define __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__
+
+#include <stdint.h>     // uint32_t
+#include "atomic_ops.h" // atomic operations wrappers
+
+#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65536 // 2^16 = 65,536 elements by default
+
+// define this variable if calls to "size" must return the real size of the 
+// queue. If it is undefined  that function will try to take a snapshot of 
+// the queue, but returned value might be bogus
+#undef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+//#define ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE 1
+
+
+/// @brief especialisation of the ArrayLockFreeQueue to be used when there is
+///        only one producer thread
+/// No allocation of extra memory for the nodes handling is needed
+/// WARNING: This queue is not thread safe when several threads try to insert elements
+/// into the queue
+/// ELEM_T     represents the type of elementes pushed and popped from the queue
+/// TOTAL_SIZE size of the queue. It should be a power of 2 to ensure 
+///            indexes in the circular queue keep stable when the uint32_t 
+///            variable that holds the current position rolls over from FFFFFFFF
+///            to 0. For instance
+///            2    -> 0x02 
+///            4    -> 0x04
+///            8    -> 0x08
+///            16   -> 0x10
+///            (...) 
+///            1024 -> 0x400
+///            2048 -> 0x800
+///
+///            if queue size is not defined as requested, let's say, for
+///            instance 100, when current position is FFFFFFFF (4,294,967,295)
+///            index in the circular array is 4,294,967,295 % 100 = 95. 
+///            When that value is incremented it will be set to 0, that is the 
+///            last 4 elements of the queue are not used when the counter rolls
+///            over to 0
+template <typename ELEM_T, uint32_t Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
+class ArrayLockFreeQueueSingleProducer
+{
+public:
+    /// @brief constructor of the class
+    ArrayLockFreeQueueSingleProducer();
+    virtual ~ArrayLockFreeQueueSingleProducer();
+
+    /// @brief returns the current number of items in the queue
+    /// It tries to take a snapshot of the size of the queue, but in busy environments
+    /// this function might return bogus values. 
+    ///
+    /// If a reliable queue size must be kept you might want to have a look at 
+    /// the preprocessor variable in this header file called 'ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE'
+    /// it enables a reliable size though it hits overall performance of the queue 
+    /// (when the reliable size variable is on it's got an impact of about 20% in time)
+    uint32_t size();
+
+    /// @brief push an element at the tail of the queue
+    /// @param the element to insert in the queue
+    /// Note that the element is not a pointer or a reference, so if you are using large data
+    /// structures to be inserted in the queue you should think of instantiate the template
+    /// of the queue as a pointer to that large structure
+    /// @returns true if the element was inserted in the queue. False if the queue was full
+    bool push(const ELEM_T &a_data);
+
+    /// @brief pop the element at the head of the queue
+    /// @param a reference where the element in the head of the queue will be saved to
+    /// Note that the a_data parameter might contain rubbish if the function returns false
+    /// @returns true if the element was successfully extracted from the queue. False if the queue was empty
+    bool pop(ELEM_T &a_data);
+
+private:
+    /// @brief array to keep the elements
+    ELEM_T m_theQueue[Q_SIZE];
+
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+    /// @brief number of elements in the queue
+    volatile uint32_t m_count;
+#endif
+
+    /// @brief where a new element will be inserted
+    volatile uint32_t m_writeIndex;
+
+    /// @brief where the next element where be extracted from
+    volatile uint32_t m_readIndex;
+
+    /// @brief calculate the index in the circular array that corresponds
+    /// to a particular "count" value
+    inline uint32_t countToIndex(uint32_t a_count);
+};
+
+// include the implementation file
+#include "array_lock_free_queue_single_producer_impl.h"
+
+#endif // __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__
diff --git a/queue/array_lock_free_queue_single_producer_impl.h b/queue/array_lock_free_queue_single_producer_impl.h
new file mode 100644 (file)
index 0000000..3c27ec4
--- /dev/null
@@ -0,0 +1,182 @@
+// ============================================================================
+// Copyright (c) 2010 Faustino Frechilla
+// All rights reserved.
+// 
+// Redistribution and use in source and binary forms, with or without 
+// modification, are permitted provided that the following conditions are met:
+//
+//  1. Redistributions of source code must retain the above copyright notice,
+//     this list of conditions and the following disclaimer.
+//  2. Redistributions in binary form must reproduce the above copyright 
+//     notice, this list of conditions and the following disclaimer in the
+//     documentation and/or other materials provided with the distribution.
+//  3. The name of the author may not be used to endorse or promote products
+//     derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
+// POSSIBILITY OF SUCH DAMAGE.
+//
+/// @file array_lock_free_queue_single_producer_impl.h
+/// @brief Implementation of a circular array based lock-free queue
+///
+/// @author Faustino Frechilla
+/// @history
+/// Ref  Who                 When         What
+///      Faustino Frechilla  11-Jul-2010  Original development
+/// @endhistory
+/// 
+// ============================================================================
+
+#ifndef __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__
+#define __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__
+
+#include <assert.h> // assert()
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::ArrayLockFreeQueueSingleProducer() :
+    m_writeIndex(0),
+    m_readIndex(0)
+{
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+    m_count = 0;
+#endif
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::~ArrayLockFreeQueueSingleProducer()
+{
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+inline
+uint32_t ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)
+{
+    // if Q_SIZE is a power of 2 this statement could be also written as 
+    // return (a_count & (Q_SIZE - 1));
+    return (a_count % Q_SIZE);
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+uint32_t ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::size()
+{
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+    return m_count;
+#else
+    uint32_t currentWriteIndex = m_writeIndex;
+    uint32_t currentReadIndex  = m_readIndex;
+
+    // let's think of a scenario where this function returns bogus data
+    // 1. when the statement 'currentWriteIndex = m_writeIndex' is run
+    // m_writeIndex is 3 and m_readIndex is 2. Real size is 1
+    // 2. afterwards this thread is preemted. While this thread is inactive 2 
+    // elements are inserted and removed from the queue, so m_writeIndex is 5
+    // m_readIndex 4. Real size is still 1
+    // 3. Now the current thread comes back from preemption and reads m_readIndex.
+    // currentReadIndex is 4
+    // 4. currentReadIndex is bigger than currentWriteIndex, so
+    // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
+    // it returns that the queue is almost full, when it is almost empty
+    
+    if (currentWriteIndex >= currentReadIndex)
+    {
+        return (currentWriteIndex - currentReadIndex);
+    }
+    else
+    {
+        return (Q_SIZE + currentWriteIndex - currentReadIndex);
+    }
+#endif // ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+bool ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::push(const ELEM_T &a_data)
+{
+    uint32_t currentReadIndex;
+    uint32_t currentWriteIndex;
+
+    currentWriteIndex = m_writeIndex;
+    currentReadIndex  = m_readIndex;
+    if (countToIndex(currentWriteIndex + 1) == 
+        countToIndex(currentReadIndex))
+    {
+        // the queue is full
+        return false;
+    }
+
+    // save the date into the q
+    m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+    // increment atomically write index. Now a consumer thread can read
+    // the piece of data that was just stored 
+    AtomicAdd(&m_writeIndex, 1);
+
+    // The value was successfully inserted into the queue
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+    AtomicAdd(&m_count, 1);
+#endif
+
+    return true;
+}
+
+template <typename ELEM_T, uint32_t Q_SIZE>
+bool ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::pop(ELEM_T &a_data)
+{
+    uint32_t currentMaximumReadIndex;
+    uint32_t currentReadIndex;
+
+    do
+    {
+        // m_maximumReadIndex doesn't exist when the queue is set up as
+        // single-producer. The maximum read index is described by the current
+        // write index
+        currentReadIndex        = m_readIndex;
+        currentMaximumReadIndex = m_writeIndex;
+
+        if (countToIndex(currentReadIndex) == 
+            countToIndex(currentMaximumReadIndex))
+        {
+            // the queue is empty or
+            // a producer thread has allocate space in the queue but is 
+            // waiting to commit the data into it
+            return false;
+        }
+
+        // retrieve the data from the queue
+        a_data = m_theQueue[countToIndex(currentReadIndex)];
+
+        // try to perfrom now the CAS operation on the read index. If we succeed
+        // a_data already contains what m_readIndex pointed to before we 
+        // increased it
+        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
+        {
+            // got here. The value was retrieved from the queue. Note that the
+            // data inside the m_queue array is not deleted nor reseted
+#ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
+            AtomicSub(&m_count, 1);
+#endif
+            return true;
+        }
+        
+        // it failed retrieving the element off the queue. Someone else must
+        // have read the element stored at countToIndex(currentReadIndex)
+        // before we could perform the CAS operation        
+
+    } while(1); // keep looping to try again!
+
+    // Something went wrong. it shouldn't be possible to reach here
+    assert(0);
+
+    // Add this return statement to avoid compiler warnings
+    return false;
+}
+
+#endif // __ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__
+
diff --git a/queue/atomic_ops.h b/queue/atomic_ops.h
new file mode 100644 (file)
index 0000000..30e9fe3
--- /dev/null
@@ -0,0 +1,79 @@
+// ============================================================================
+// Copyright (c) 2010 Faustino Frechilla
+// All rights reserved.
+// 
+// Redistribution and use in source and binary forms, with or without 
+// modification, are permitted provided that the following conditions are met:
+//
+//  1. Redistributions of source code must retain the above copyright notice,
+//     this list of conditions and the following disclaimer.
+//  2. Redistributions in binary form must reproduce the above copyright 
+//     notice, this list of conditions and the following disclaimer in the
+//     documentation and/or other materials provided with the distribution.
+//  3. The name of the author may not be used to endorse or promote products
+//     derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
+// POSSIBILITY OF SUCH DAMAGE.
+//
+/// @file atomic_ops.h
+/// @brief This file contains functions to wrap the built-in atomic operations 
+///        defined for your compiler
+///
+/// @author Faustino Frechilla
+/// @history
+/// Ref  Who                 When         What
+///      Faustino Frechilla  11-Jul-2010  Original development. GCC support
+/// @endhistory
+/// 
+// ============================================================================
+
+#ifndef __ATOMIC_OPS_H
+#define __ATOMIC_OPS_H
+
+#ifdef __GNUC__
+// Atomic functions in GCC are present from version 4.1.0 on
+// http://gcc.gnu.org/onlinedocs/gcc-4.1.0/gcc/Atomic-Builtins.html
+
+// Test for GCC >= 4.1.0
+#if (__GNUC__ < 4) || \
+    ((__GNUC__ == 4) && ((__GNUC_MINOR__ < 1) || \
+                        ((__GNUC_MINOR__     == 1) && \
+                         (__GNUC_PATCHLEVEL__ < 0))) )
+                        
+#error Atomic built-in functions are only available in GCC in versions >= 4.1.0
+#endif // end of check for GCC 4.1.0
+
+/// @brief atomically adds a_count to the variable pointed by a_ptr
+/// @return the value that had previously been in memory
+#define AtomicAdd(a_ptr,a_count) __sync_fetch_and_add (a_ptr, a_count)
+
+/// @brief atomically substracts a_count from the variable pointed by a_ptr
+/// @return the value that had previously been in memory
+#define AtomicSub(a_ptr,a_count) __sync_fetch_and_sub (a_ptr, a_count)
+
+/// @brief Compare And Swap
+///        If the current value of *a_ptr is a_oldVal, then write a_newVal into *a_ptr
+/// @return true if the comparison is successful and a_newVal was written
+#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
+
+/// @brief Compare And Swap
+///        If the current value of *a_ptr is a_oldVal, then write a_newVal into *a_ptr
+/// @return the contents of *a_ptr before the operation
+#define CASVal(a_ptr, a_oldVal, a_newVal) __sync_val_compare_and_swap(a_ptr, a_oldVal, a_newVal)
+
+#else
+#error Atomic functions such as CAS or AtomicAdd are not defined for your compiler. Please add them in atomic_ops.h
+#endif // __GNUC__
+
+
+#endif // __ATOMIC_OPS_H
diff --git a/queue/g_blocking_queue.h b/queue/g_blocking_queue.h
new file mode 100644 (file)
index 0000000..a43295d
--- /dev/null
@@ -0,0 +1,122 @@
+// ============================================================================
+// Copyright (c) 2009-2010 Faustino Frechilla
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//
+//  1. Redistributions of source code must retain the above copyright notice,
+//     this list of conditions and the following disclaimer.
+//  2. Redistributions in binary form must reproduce the above copyright
+//     notice, this list of conditions and the following disclaimer in the
+//     documentation and/or other materials provided with the distribution.
+//  3. The name of the author may not be used to endorse or promote products
+//     derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+// POSSIBILITY OF SUCH DAMAGE.
+//
+/// @file q_blocking_queue.h
+/// @brief Definition of a thread-safe queue based on glib system calls
+/// It internally contains a std::queue which is protected from concurrent
+/// access by glib mutextes and conditional variables
+///
+/// @author Faustino Frechilla
+/// @history
+/// Ref  Who                 When         What
+///      Faustino Frechilla 04-May-2009 Original development (based on pthreads)
+///      Faustino Frechilla 19-May-2010 Ported to glib. Removed pthread dependency
+/// @endhistory
+///
+// ============================================================================
+
+#ifndef _GBLOCKINGQUEUE_H_
+#define _GBLOCKINGQUEUE_H_
+
+#include <glib.h>
+#include <queue>
+#include <limits> // std::numeric_limits<>::max
+
+#define BLOCKING_QUEUE_DEFAULT_MAX_SIZE std::numeric_limits<std::size_t >::max()
+
+/// @brief blocking thread-safe queue
+/// It uses a mutex+condition variables to protect the internal queue
+/// implementation. Inserting or reading elements use the same mutex
+template <typename T>
+class BlockingQueue
+{
+public:
+    BlockingQueue(std::size_t a_maxSize = BLOCKING_QUEUE_DEFAULT_MAX_SIZE);
+    ~BlockingQueue();
+
+    /// @brief Check if the queue is empty
+    /// This call can block if another thread owns the lock that protects the
+    /// queue
+    /// @return true if the queue is empty. False otherwise
+    bool IsEmpty();
+
+    /// @brief inserts an element into queue queue
+    /// This call can block if another thread owns the lock that protects the
+    /// queue. If the queue is full The thread will be blocked in this queue
+    /// until someone else gets an element from the queue
+    /// @param element to insert into the queue
+    /// @return True if the elem was successfully inserted into the queue.
+    ///         False otherwise
+    bool Push(const T &a_elem);
+
+    /// @brief inserts an element into queue queue
+    /// This call can block if another thread owns the lock that protects the
+    /// queue. If the queue is full The call will return false and the element
+    /// won't be inserted
+    /// @param element to insert into the queue
+    /// @return True if the elem was successfully inserted into the queue.
+    ///         False otherwise
+    bool TryPush(const T &a_elem);
+
+    /// @brief extracts an element from the queue (and deletes it from the q)
+    /// If the queue is empty this call will block the thread until there is
+    /// something in the queue to be extracted
+    /// @param a reference where the element from the queue will be saved to
+    void Pop(T &out_data);
+
+    /// @brief extracts an element from the queue (and deletes it from the q)
+    /// This call gets the block that protects the queue. It will extract the
+    /// element from the queue only if there are elements in it
+    /// @param reference to the variable where the result will be saved
+    /// @return True if the element was retrieved from the queue.
+    ///         False if the queue was empty
+    bool TryPop(T &out_data);
+
+    /// @brief extracts an element from the queue (and deletes it from the q)
+    /// If the queue is empty this call will block the thread until there
+    /// is something in the queue to be extracted or until the timer
+    /// (2nd parameter) expires
+    /// @param reference to the variable where the result will be saved
+    /// @param microsecondsto wait before returning if the queue was empty
+    /// @return True if the element was retrieved from the queue.
+    ///         False if the timeout was reached
+    bool TimedWaitPop(T &data, glong microsecs);
+
+protected:
+    std::queue<T> m_theQueue;
+    /// maximum number of elements for the queue
+    std::size_t m_maximumSize;
+    /// Mutex to protect the queue
+    GMutex* m_mutex;
+    /// Conditional variable to wake up threads
+    GCond*  m_cond;
+};
+
+// include the implementation file
+#include "g_blocking_queue_impl.h"
+
+#endif /* _GBLOCKINGQUEUE_H_ */
diff --git a/queue/g_blocking_queue_impl.h b/queue/g_blocking_queue_impl.h
new file mode 100644 (file)
index 0000000..7312814
--- /dev/null
@@ -0,0 +1,224 @@
+// ============================================================================
+// Copyright (c) 2009-2010 Faustino Frechilla
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//
+//  1. Redistributions of source code must retain the above copyright notice,
+//     this list of conditions and the following disclaimer.
+//  2. Redistributions in binary form must reproduce the above copyright
+//     notice, this list of conditions and the following disclaimer in the
+//     documentation and/or other materials provided with the distribution.
+//  3. The name of the author may not be used to endorse or promote products
+//     derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+// POSSIBILITY OF SUCH DAMAGE.
+//
+/// @file q_blocking_queue_impl.h
+/// @brief Implementation of a thread-safe queue based on glib system calls
+/// It internally contains a std::queue which is protected from concurrent
+/// access by glib mutextes and conditional variables
+///
+/// @author Faustino Frechilla
+/// @history
+/// Ref  Who                 When         What
+///      Faustino Frechilla 04-May-2009 Original development (based on pthreads)
+///      Faustino Frechilla 19-May-2010 Ported to glib. Removed pthread dependency
+/// @endhistory
+///
+// ============================================================================
+
+#ifndef _GBLOCKINGQUEUEIMPL_H_
+#define _GBLOCKINGQUEUEIMPL_H_
+
+#include <assert.h>
+
+#define NANOSECONDS_PER_SECOND 1000000000
+
+template <typename T>
+BlockingQueue<T>::BlockingQueue(std::size_t a_maxSize) :
+    m_maximumSize(a_maxSize)
+{
+    if (!g_thread_supported ())
+    {
+        // glib thread system hasn't been initialized yet
+        g_thread_init(NULL);
+    }
+
+    m_mutex = g_mutex_new();
+    m_cond  = g_cond_new();
+
+    assert(m_mutex != NULL);
+    assert(m_cond != NULL);
+}
+
+template <typename T>
+BlockingQueue<T>::~BlockingQueue()
+{
+    g_cond_free(m_cond);
+    g_mutex_free(m_mutex);
+}
+
+template <typename T>
+bool BlockingQueue<T>::IsEmpty()
+{
+    bool rv;
+
+    g_mutex_lock(m_mutex);
+    rv = m_theQueue.empty();
+    g_mutex_unlock(m_mutex);
+
+    return rv;
+}
+
+template <typename T>
+bool BlockingQueue<T>::Push(const T &a_elem)
+{
+    g_mutex_lock(m_mutex);
+
+    while (m_theQueue.size() >= m_maximumSize)
+    {
+        g_cond_wait(m_cond, m_mutex);
+    }
+
+    bool queueEmpty = m_theQueue.empty();
+
+    m_theQueue.push(a_elem);
+
+    if (queueEmpty)
+    {
+        // wake up threads waiting for stuff
+        g_cond_broadcast(m_cond);
+    }
+
+    g_mutex_unlock(m_mutex);
+
+    return true;
+}
+
+template <typename T>
+bool BlockingQueue<T>::TryPush(const T &a_elem)
+{
+    g_mutex_lock(m_mutex);
+
+    bool rv = false;
+    bool queueEmpty = m_theQueue.empty();
+
+    if (m_theQueue.size() < m_maximumSize)
+    {
+        m_theQueue.push(a_elem);
+        rv = true;
+    }
+
+    if (queueEmpty)
+    {
+        // wake up threads waiting for stuff
+        g_cond_broadcast(m_cond);
+    }
+
+    g_mutex_unlock(m_mutex);
+
+    return rv;
+}
+
+template <typename T>
+void BlockingQueue<T>::Pop(T &out_data)
+{
+    g_mutex_lock(m_mutex);
+
+    while (m_theQueue.empty())
+    {
+        g_cond_wait(m_cond, m_mutex);
+    }
+
+    bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
+
+    out_data = m_theQueue.front();
+    m_theQueue.pop();
+
+    if (queueFull)
+    {
+        // wake up threads waiting for stuff
+        g_cond_broadcast(m_cond);
+    }
+
+    g_mutex_unlock(m_mutex);
+}
+
+template <typename T>
+bool BlockingQueue<T>::TryPop(T &out_data)
+{
+    g_mutex_lock(m_mutex);
+
+    bool rv = false;
+    if (!m_theQueue.empty())
+    {
+        bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
+
+        out_data = m_theQueue.front();
+        m_theQueue.pop();
+
+        if (queueFull)
+        {
+            // wake up threads waiting for stuff
+            g_cond_broadcast(m_cond);
+        }
+
+        rv = true;
+    }
+
+    g_mutex_unlock(m_mutex);
+
+    return rv;
+}
+
+template <typename T>
+bool BlockingQueue<T>::TimedWaitPop(T &data, glong microsecs)
+{
+    g_mutex_lock(m_mutex);
+
+    // adding microsecs to now
+    GTimeVal abs_time;
+    g_get_current_time(&abs_time);
+    g_time_val_add(&abs_time, microsecs);
+
+    gboolean retcode = TRUE;
+    while (m_theQueue.empty() && (retcode != FALSE))
+    {
+        // Returns TRUE if cond was signalled, or FALSE on timeout
+        retcode = g_cond_timed_wait(m_cond, m_mutex, &abs_time);
+    }
+
+    bool rv = false;
+    bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
+    if (retcode != FALSE)
+    {
+        data = m_theQueue.front();
+        m_theQueue.pop();
+
+        rv = true;
+    }
+
+    if (rv && queueFull)
+    {
+        // wake up threads waiting for stuff
+        g_cond_broadcast(m_cond);
+    }
+
+    g_mutex_unlock(m_mutex);
+
+    return rv;
+}
+
+#endif /* _GBLOCKINGQUEUEIMPL_H_ */
diff --git a/queue/makefile b/queue/makefile
new file mode 100644 (file)
index 0000000..ee96d75
--- /dev/null
@@ -0,0 +1,67 @@
+# -fno-schedule-insns -fno-rerun-loop-opt are a workaround for a compiler error in 4.2
+# -Wno-unused-parameter
+
+CC      = g++
+CFLAGS  = -g -O3 -fopenmp -fno-schedule-insns -fno-schedule-insns2 -W -Wall #-Wno-unused-parameter
+CFLAGS += `pkg-config --cflags glib-2.0`
+#CFLAGS += -march=i686
+#CFLAGS += -march=core2
+LDFLAGS = -lgomp
+LDFLAGS+= `pkg-config --libs glib-2.0`
+# g_blocking_queue also depends on gthread-2.0
+CFLAGS_GTHREAD = `pkg-config gthread-2.0`
+LDFLAGS_GTHREAD = `pkg-config --libs gthread-2.0`
+
+#compile-time parameters
+ifdef N_PRODUCERS
+CFLAGS += -DN_PRODUCERS=$(N_PRODUCERS)
+endif
+ifdef N_CONSUMERS
+CFLAGS += -DN_CONSUMERS=$(N_CONSUMERS)
+endif
+ifdef N_ITERATIONS
+CFLAGS += -DN_ITERATIONS=$(N_ITERATIONS)
+endif
+ifdef QUEUE_SIZE
+CFLAGS += -DQUEUE_SIZE=$(QUEUE_SIZE)
+endif
+
+
+LOCK_FREE_Q_INCLUDE = \
+    array_lock_free_queue.h \
+    array_lock_free_queue_impl.h
+
+BLOCKING_Q_INCLUDE = \
+    g_blocking_queue.h \
+    g_blocking_queue_impl.h
+
+LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE = \
+    array_lock_free_queue_single_producer.h \
+    array_lock_free_queue_single_producer_impl.h
+
+SHARED_INCLUDE = \
+    atomic_ops.h
+
+all : test_lock_free_q  test_lock_free_single_producer_q test_blocking_q
+
+test_lock_free_q : test_lock_free_q.o
+       $(CC) $(OBJS) -o $@ $@.o $(LDFLAGS)
+
+test_blocking_q : test_blocking_q.o
+       $(CC) $(OBJS) -o $@ $@.o $(LDFLAGS) $(LDFLAGS_GTHREAD)
+
+test_lock_free_single_producer_q : test_lock_free_single_producer_q.o
+           $(CC) $(OBJS) -o $@ $@.o $(LDFLAGS)
+    
+test_lock_free_q.o : test_lock_free_q.cpp $(SHARED_INCLUDE) $(LOCK_FREE_Q_INCLUDE)
+       $(CC) -c $< $(CFLAGS)
+
+test_lock_free_single_producer_q.o : test_lock_free_single_producer_q.cpp $(SHARED_INCLUDE) $(LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE)
+       $(CC) -c $< $(CFLAGS)
+    
+test_blocking_q.o: test_blocking_q.cpp $(SHARED_INCLUDE) $(BLOCKING_Q_INCLUDE)
+       $(CC) -c $< $(CFLAGS) $(CFLAGS_GTHREAD) 
+
+clean:
+       rm test_lock_free_q test_blocking_q test_lock_free_single_producer_q; rm *.o
+
diff --git a/queue/tags b/queue/tags
new file mode 100644 (file)
index 0000000..8c5c88d
--- /dev/null
@@ -0,0 +1,89 @@
+!_TAG_FILE_FORMAT      2       /extended format; --format=1 will not append ;" to lines/
+!_TAG_FILE_SORTED      1       /0=unsorted, 1=sorted, 2=foldcase/
+!_TAG_PROGRAM_AUTHOR   Darren Hiebert  /dhiebert@users.sourceforge.net/
+!_TAG_PROGRAM_NAME     Exuberant Ctags //
+!_TAG_PROGRAM_URL      http://ctags.sourceforge.net    /official site/
+!_TAG_PROGRAM_VERSION  5.9~svn20110310 //
+ARRAY_LOCK_FREE_Q_DEFAULT_SIZE array_lock_free_queue.h 45;"    d
+ARRAY_LOCK_FREE_Q_DEFAULT_SIZE array_lock_free_queue_single_producer.h 49;"    d
+ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE       array_lock_free_queue.h 50;"    d
+ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE       array_lock_free_queue_single_producer.h 54;"    d
+ArrayLockFreeQueue     array_lock_free_queue.h /^class ArrayLockFreeQueue$/;"  c
+ArrayLockFreeQueue     array_lock_free_queue_impl.h    /^ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() :$/;"        f       class:ArrayLockFreeQueue
+ArrayLockFreeQueueSingleProducer       array_lock_free_queue_single_producer.h /^class ArrayLockFreeQueueSingleProducer$/;"    c
+ArrayLockFreeQueueSingleProducer       array_lock_free_queue_single_producer_impl.h    /^ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::ArrayLockFreeQueueSingleProducer() :$/;"    f       class:ArrayLockFreeQueueSingleProducer
+AtomicAdd      atomic_ops.h    51;"    d
+AtomicAdd      atomic_ops.h    87;"    d
+AtomicSub      atomic_ops.h    55;"    d
+AtomicSub      atomic_ops.h    91;"    d
+BLOCKING_QUEUE_DEFAULT_MAX_SIZE        g_blocking_queue.h      49;"    d
+BLOCKING_Q_INCLUDE     makefile        /^BLOCKING_Q_INCLUDE = \\$/;"   m
+BlockingQueue  g_blocking_queue.h      /^class BlockingQueue$/;"       c
+BlockingQueue  g_blocking_queue_impl.h /^BlockingQueue<T>::BlockingQueue(std::size_t a_maxSize) :$/;"  f       class:BlockingQueue
+CAS    atomic_ops.h    60;"    d
+CAS    atomic_ops.h    96;"    d
+CASVal atomic_ops.h    101;"   d
+CASVal atomic_ops.h    65;"    d
+CC     makefile        /^CC      = g++$/;"     m
+CFLAGS_GTHREAD makefile        /^CFLAGS_GTHREAD = `pkg-config gthread-2.0`$/;" m
+IsEmpty        g_blocking_queue_impl.h /^bool BlockingQueue<T>::IsEmpty()$/;"  f       class:BlockingQueue
+LDFLAGS_GTHREAD        makefile        /^LDFLAGS_GTHREAD = `pkg-config --libs gthread-2.0`$/;" m
+LOCK_FREE_Q_INCLUDE    makefile        /^LOCK_FREE_Q_INCLUDE = \\$/;"  m
+LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE    makefile        /^LOCK_FREE_SINGLE_PRODUCER_Q_INCLUDE = \\$/;"  m
+NANOSECONDS_PER_SECOND g_blocking_queue_impl.h 47;"    d
+N_CONSUMERS    test_blocking_q.cpp     17;"    d       file:
+N_CONSUMERS    test_lock_free_q.cpp    17;"    d       file:
+N_CONSUMERS    test_lock_free_single_producer_q.cpp    13;"    d       file:
+N_ITERATIONS   test_blocking_q.cpp     21;"    d       file:
+N_ITERATIONS   test_lock_free_q.cpp    21;"    d       file:
+N_ITERATIONS   test_lock_free_single_producer_q.cpp    17;"    d       file:
+N_PRODUCERS    test_blocking_q.cpp     13;"    d       file:
+N_PRODUCERS    test_lock_free_q.cpp    13;"    d       file:
+Pop    g_blocking_queue_impl.h /^void BlockingQueue<T>::Pop(T &out_data)$/;"   f       class:BlockingQueue
+Push   g_blocking_queue_impl.h /^bool BlockingQueue<T>::Push(const T &a_elem)$/;"      f       class:BlockingQueue
+QUEUE_SIZE     test_blocking_q.cpp     25;"    d       file:
+QUEUE_SIZE     test_lock_free_q.cpp    25;"    d       file:
+QUEUE_SIZE     test_lock_free_single_producer_q.cpp    21;"    d       file:
+SHARED_INCLUDE makefile        /^SHARED_INCLUDE = \\$/;"       m
+TestBlockingQueue      test_blocking_q.cpp     /^void TestBlockingQueue()$/;"  f
+TestLockFreeQueue      test_lock_free_q.cpp    /^void TestLockFreeQueue()$/;"  f
+TestLockFreeQueue      test_lock_free_single_producer_q.cpp    /^void TestLockFreeQueue()$/;"  f
+TimedWaitPop   g_blocking_queue_impl.h /^bool BlockingQueue<T>::TimedWaitPop(T &data, glong microsecs)$/;"     f       class:BlockingQueue
+TryPop g_blocking_queue_impl.h /^bool BlockingQueue<T>::TryPop(T &out_data)$/;"        f       class:BlockingQueue
+TryPush        g_blocking_queue_impl.h /^bool BlockingQueue<T>::TryPush(const T &a_elem)$/;"   f       class:BlockingQueue
+_GBLOCKINGQUEUEIMPL_H_ g_blocking_queue_impl.h 43;"    d
+_GBLOCKINGQUEUE_H_     g_blocking_queue.h      43;"    d
+__ARRAY_LOCK_FREE_QUEUE_H__    array_lock_free_queue.h 40;"    d
+__ARRAY_LOCK_FREE_QUEUE_IMPL_H__       array_lock_free_queue_impl.h    40;"    d
+__ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_H__    array_lock_free_queue_single_producer.h 44;"    d
+__ARRAY_LOCK_FREE_QUEUE_SINGLE_PRODUCER_IMPL_H__       array_lock_free_queue_single_producer_impl.h    40;"    d
+__ATOMIC_OPS_H atomic_ops.h    41;"    d
+atomic32       atomic_ops.h    /^typedef atomic_uint atomic32;$/;"     t
+atomic32       atomic_ops.h    /^typedef volatile uint32 atomic32;$/;" t
+countToIndex   array_lock_free_queue_impl.h    /^uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)$/;"       f       class:ArrayLockFreeQueue
+countToIndex   array_lock_free_queue_single_producer_impl.h    /^uint32_t ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)$/;" f       class:ArrayLockFreeQueueSingleProducer
+m_cond g_blocking_queue.h      /^    GCond*  m_cond;$/;"       m       class:BlockingQueue
+m_count        array_lock_free_queue.h /^    atomic32 m_count;$/;"     m       class:ArrayLockFreeQueue
+m_count        array_lock_free_queue_single_producer.h /^    atomic32 m_count;$/;"     m       class:ArrayLockFreeQueueSingleProducer
+m_maximumReadIndex     array_lock_free_queue.h /^    atomic32 m_maximumReadIndex;$/;"  m       class:ArrayLockFreeQueue
+m_maximumSize  g_blocking_queue.h      /^    std::size_t m_maximumSize;$/;"    m       class:BlockingQueue
+m_mutex        g_blocking_queue.h      /^    GMutex* m_mutex;$/;"      m       class:BlockingQueue
+m_readIndex    array_lock_free_queue.h /^    atomic32 m_readIndex;$/;" m       class:ArrayLockFreeQueue
+m_readIndex    array_lock_free_queue_single_producer.h /^    atomic32 m_readIndex;$/;" m       class:ArrayLockFreeQueueSingleProducer
+m_theQueue     array_lock_free_queue.h /^    ELEM_T m_theQueue[Q_SIZE];$/;"    m       class:ArrayLockFreeQueue
+m_theQueue     array_lock_free_queue_single_producer.h /^    ELEM_T m_theQueue[Q_SIZE];$/;"    m       class:ArrayLockFreeQueueSingleProducer
+m_theQueue     g_blocking_queue.h      /^    std::queue<T> m_theQueue;$/;"     m       class:BlockingQueue
+m_writeIndex   array_lock_free_queue.h /^    atomic32 m_writeIndex;$/;"        m       class:ArrayLockFreeQueue
+m_writeIndex   array_lock_free_queue_single_producer.h /^    atomic32 m_writeIndex;$/;"        m       class:ArrayLockFreeQueueSingleProducer
+main   test_blocking_q.cpp     /^int main(int \/*argc*\/, char** \/*argv*\/)$/;"       f
+main   test_lock_free_q.cpp    /^int main(int \/*argc*\/, char** \/*argv*\/)$/;"       f
+main   test_lock_free_single_producer_q.cpp    /^int main(int \/*argc*\/, char** \/*argv*\/)$/;"       f
+pop    array_lock_free_queue_impl.h    /^bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::pop(ELEM_T &a_data)$/;"      f       class:ArrayLockFreeQueue
+pop    array_lock_free_queue_single_producer_impl.h    /^bool ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::pop(ELEM_T &a_data)$/;"        f       class:ArrayLockFreeQueueSingleProducer
+push   array_lock_free_queue_impl.h    /^bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::push(const ELEM_T &a_data)$/;"       f       class:ArrayLockFreeQueue
+push   array_lock_free_queue_single_producer_impl.h    /^bool ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::push(const ELEM_T &a_data)$/;" f       class:ArrayLockFreeQueueSingleProducer
+size   array_lock_free_queue_impl.h    /^uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size()$/;"       f       class:ArrayLockFreeQueue
+size   array_lock_free_queue_single_producer_impl.h    /^uint32_t ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::size()$/;" f       class:ArrayLockFreeQueueSingleProducer
+~ArrayLockFreeQueue    array_lock_free_queue_impl.h    /^ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue()$/;" f       class:ArrayLockFreeQueue
+~ArrayLockFreeQueueSingleProducer      array_lock_free_queue_single_producer_impl.h    /^ArrayLockFreeQueueSingleProducer<ELEM_T, Q_SIZE>::~ArrayLockFreeQueueSingleProducer()$/;"     f       class:ArrayLockFreeQueueSingleProducer
+~BlockingQueue g_blocking_queue_impl.h /^BlockingQueue<T>::~BlockingQueue()$/;"        f       class:BlockingQueue
diff --git a/queue/test_blocking_q.cpp b/queue/test_blocking_q.cpp
new file mode 100644 (file)
index 0000000..72472c8
--- /dev/null
@@ -0,0 +1,112 @@
+// ============================================================================
+/// @file  test_blocking_q.cpp
+/// @brief Benchmark blocking queue
+// ============================================================================
+
+
+#include <iostream>
+#include <glib.h>   // GTimeVal + g_get_current_time
+#include <omp.h>    // parallel processing support in gcc
+#include "g_blocking_queue.h"
+
+#ifndef N_PRODUCERS
+#define N_PRODUCERS         1
+#endif
+
+#ifndef N_CONSUMERS
+#define N_CONSUMERS         1
+#endif
+
+#ifndef N_ITERATIONS
+#define N_ITERATIONS 10000000
+#endif
+
+#ifndef QUEUE_SIZE
+#define QUEUE_SIZE       1000
+#endif
+
+void TestBlockingQueue()
+{
+    BlockingQueue<int> theQueue(QUEUE_SIZE);
+    GTimeVal iniTimestamp;
+    GTimeVal endTimestamp;
+
+    std::cout << "=== Start of testing blocking queue ===" << std::endl;
+    g_get_current_time(&iniTimestamp);
+    #pragma omp parallel shared(theQueue) num_threads (2)
+    {
+        if (omp_get_thread_num() == 0)
+        {
+            if (!omp_get_nested())
+            {
+                std::cerr << "WARNING: Nested parallel regions not supported. Working threads might have unexpected behaviour" << std::endl;
+                std::cerr << "Are you running with \"OMP_NESTED=TRUE\"??" << std::endl;
+            }
+        }
+
+        #pragma omp sections //nowait
+        {
+            #pragma omp section
+            {
+                // producer section
+                #pragma omp parallel shared(theQueue) num_threads (N_PRODUCERS)
+                {
+                    int i;
+                    #pragma omp for schedule(static) private(i) nowait
+                    for (i = 0 ; i < N_ITERATIONS ; i++)
+                    {
+                        while(!theQueue.Push(i))
+                        {
+                            // queue full
+                        }
+                    }
+                }
+            }
+
+            #pragma omp section
+            {
+                // consumer section
+                #pragma omp parallel shared(theQueue) num_threads (N_CONSUMERS)
+                {
+                    int i;
+                    int result;
+                    #pragma omp for schedule(static) private(i, result) nowait
+                    for (i = 0 ; i < N_ITERATIONS ; i++)
+                    {
+                        // this call will block if the queue is empty until
+                        // some other thread pushes something into it
+                        theQueue.Pop(result);
+                    }
+                }
+            }
+        }
+
+    } // #pragma omp parallel
+
+    g_get_current_time(&endTimestamp);
+    
+    // calculate elapsed time
+    GTimeVal elapsedTime;
+    if (endTimestamp.tv_usec >= iniTimestamp.tv_usec)
+    {
+        elapsedTime.tv_sec  = endTimestamp.tv_sec  - iniTimestamp.tv_sec;
+        elapsedTime.tv_usec = endTimestamp.tv_usec - iniTimestamp.tv_usec;
+    }
+    else
+    {
+        elapsedTime.tv_sec  = endTimestamp.tv_sec  - iniTimestamp.tv_sec - 1;
+        elapsedTime.tv_usec = G_USEC_PER_SEC + endTimestamp.tv_usec - iniTimestamp.tv_usec;
+    }
+    
+    std::cout << "Elapsed: " << elapsedTime.tv_sec << "." << elapsedTime.tv_usec << std::endl;
+    std::cout << "=== End of testing blocking queue ===" << std::endl;
+}
+
+int main(int /*argc*/, char** /*argv*/)
+{
+    TestBlockingQueue();
+    std::cout << "Done!!!" << std::endl;
+
+       return 0;
+}
+
diff --git a/queue/test_lock_free_q.cpp b/queue/test_lock_free_q.cpp
new file mode 100644 (file)
index 0000000..d3621e8
--- /dev/null
@@ -0,0 +1,132 @@
+// ============================================================================
+/// @file  test_lock_free_q.cpp
+/// @brief Benchmark lock free queue
+// ============================================================================
+
+
+#include <iostream>
+#include <glib.h>  // GTimeVal + g_get_current_time
+#include <omp.h>   // parallel processing support in gcc
+#include "array_lock_free_queue.h"
+
+#ifndef N_PRODUCERS
+#define N_PRODUCERS         1
+#endif
+
+#ifndef N_CONSUMERS
+#define N_CONSUMERS         1
+#endif
+
+#ifndef N_ITERATIONS
+#define N_ITERATIONS 10000000
+#endif
+
+#ifndef QUEUE_SIZE
+#define QUEUE_SIZE       1024
+#endif
+
+void TestLockFreeQueue()
+{
+    ArrayLockFreeQueue<int, QUEUE_SIZE> theQueue;
+    GTimeVal iniTimestamp;
+    GTimeVal endTimestamp;
+
+    std::cout << "=== Start of testing lock-free queue ===" << std::endl;
+    g_get_current_time(&iniTimestamp);
+    #pragma omp parallel shared(theQueue) num_threads (2)
+    {
+        if (omp_get_thread_num() == 0)
+        {
+            //std::cout << "=== Testing Non blocking queue with " << omp_get_num_threads() << " threads ===" << std::endl;
+
+            if (!omp_get_nested())
+            {
+                std::cerr << "WARNING: Nested parallel regions not supported. Working threads might have unexpected behaviour" << std::endl;
+                std::cerr << "Are you running with \"OMP_NESTED=TRUE\"??" << std::endl;
+            }
+        }
+
+        #pragma omp sections //nowait
+        {
+            #pragma omp section
+            {
+                // producer section
+                #pragma omp parallel shared(theQueue) num_threads (N_PRODUCERS)
+                {
+                    //if (omp_get_thread_num() == 0)
+                    //{
+                    //    std::cout << "\t Producers: " << omp_get_num_threads() << std::endl;
+                    //}
+                    int i;
+                    #pragma omp for schedule(static) private(i) nowait
+                    for (i = 0 ; i < N_ITERATIONS ; i++)
+                    {
+                        while(!theQueue.push(i))
+                        {
+                            // queue full
+                        }
+                    }
+                }
+            }
+
+            #pragma omp section
+            {
+                // consumer section
+                #pragma omp parallel shared(theQueue) num_threads (N_CONSUMERS)
+                {
+                    //if (omp_get_thread_num() == 0)
+                    //{
+                    //    std::cout << "\t Consumers: " << omp_get_num_threads() << std::endl;
+                    //}
+
+                    int i;
+                    int result;
+                    #pragma omp for schedule(static) private(i, result) nowait
+                    for (i = 0 ; i < N_ITERATIONS ; i++)
+                    {
+                        while (!theQueue.pop(result))
+                        {
+                            // queue empty
+                        }
+
+#if (N_CONSUMERS == 1 && N_PRODUCERS == 1)
+                        if (i != result)
+                        {
+                            std::cout << "FAILED i=" << i << " result=" << result << std::endl;
+                        }
+#endif
+                    }
+                }
+            }
+        }
+
+    } // #pragma omp parallel
+
+    g_get_current_time(&endTimestamp);
+    
+    // calculate elapsed time
+    GTimeVal elapsedTime;
+    if (endTimestamp.tv_usec >= iniTimestamp.tv_usec)
+    {
+        elapsedTime.tv_sec  = endTimestamp.tv_sec  - iniTimestamp.tv_sec;
+        elapsedTime.tv_usec = endTimestamp.tv_usec - iniTimestamp.tv_usec;
+    }
+    else
+    {
+        elapsedTime.tv_sec  = endTimestamp.tv_sec  - iniTimestamp.tv_sec - 1;
+        elapsedTime.tv_usec = G_USEC_PER_SEC + endTimestamp.tv_usec - iniTimestamp.tv_usec;
+    }
+    
+    std::cout << "Elapsed: " << elapsedTime.tv_sec << "." << elapsedTime.tv_usec << std::endl;
+    std::cout << "=== End of testing lock-free queue ===" << std::endl;    
+}
+
+int main(int /*argc*/, char** /*argv*/)
+{
+
+       TestLockFreeQueue();
+    std::cout << "Done!!!" << std::endl;
+
+       return 0;
+}
+
diff --git a/queue/test_lock_free_single_producer_q.cpp b/queue/test_lock_free_single_producer_q.cpp
new file mode 100644 (file)
index 0000000..585d9ea
--- /dev/null
@@ -0,0 +1,130 @@
+// ============================================================================
+/// @file  test_lock_free_q.cpp
+/// @brief Benchmark lock free queue
+// ============================================================================
+
+
+#include <iostream>
+#include <glib.h>  // GTimeVal + g_get_current_time
+#include <omp.h>   // parallel processing support in gcc
+#include "array_lock_free_queue_single_producer.h"
+
+#ifndef N_CONSUMERS
+#define N_CONSUMERS         1
+#endif
+
+#ifndef N_ITERATIONS
+#define N_ITERATIONS 10000000
+#endif
+
+#ifndef QUEUE_SIZE
+#define QUEUE_SIZE       1000
+#endif
+
+void TestLockFreeQueue()
+{
+    ArrayLockFreeQueueSingleProducer<int, QUEUE_SIZE> theQueue;
+    GTimeVal iniTimestamp;
+    GTimeVal endTimestamp;
+
+    std::cout << "=== Start of testing lock-free queue ===" << std::endl;
+    g_get_current_time(&iniTimestamp);
+    #pragma omp parallel shared(theQueue) num_threads (2)
+    {
+        if (omp_get_thread_num() == 0)
+        {
+            //std::cout << "=== Testing Non blocking queue with " << omp_get_num_threads() << " threads ===" << std::endl;
+
+            if (!omp_get_nested())
+            {
+                std::cerr << "WARNING: Nested parallel regions not supported. Working threads might have unexpected behaviour" << std::endl;
+                std::cerr << "Are you running with \"OMP_NESTED=TRUE\"??" << std::endl;
+            }
+        }
+
+        #pragma omp sections //nowait
+        {
+            #pragma omp section
+            {
+                // producer section. only 1 thread
+                //#pragma omp parallel shared(theQueue) num_threads (1)
+                {
+                    //if (omp_get_thread_num() == 0)
+                    //{
+                    //    std::cout << "\t Producers: " << omp_get_num_threads() << std::endl;
+                    //}
+                    int i;
+                    for (i = 0 ; i < N_ITERATIONS ; i++)
+                    {
+                        while(!theQueue.push(i))
+                        {
+                            // queue full
+                            ;
+                        }
+                    }
+                }
+            }
+
+            #pragma omp section
+            {
+                // consumer section
+                #pragma omp parallel shared(theQueue) num_threads (N_CONSUMERS)
+                {
+                    //if (omp_get_thread_num() == 0)
+                    //{
+                    //    std::cout << "\t Consumers: " << omp_get_num_threads() << std::endl;
+                    //}
+
+                    int i;
+                    int result;
+                    #pragma omp for schedule(static) private(i, result) nowait
+                    for (i = 0 ; i < N_ITERATIONS ; i++)
+                    {
+                        while (!theQueue.pop(result))
+                        {
+                            // queue empty
+                            ;
+                        }
+
+#if N_CONSUMERS == 1
+                        // if there are several consumers this test will fail
+                        if (i != result)
+                        {
+                            std::cout << "FAILED i=" << i << " result=" << result << std::endl;
+                        }
+#endif
+                    }
+                }
+            }
+        }
+
+    } // #pragma omp parallel
+
+    g_get_current_time(&endTimestamp);
+    
+    // calculate elapsed time
+    GTimeVal elapsedTime;
+    if (endTimestamp.tv_usec >= iniTimestamp.tv_usec)
+    {
+        elapsedTime.tv_sec  = endTimestamp.tv_sec  - iniTimestamp.tv_sec;
+        elapsedTime.tv_usec = endTimestamp.tv_usec - iniTimestamp.tv_usec;
+    }
+    else
+    {
+        elapsedTime.tv_sec  = endTimestamp.tv_sec  - iniTimestamp.tv_sec - 1;
+        elapsedTime.tv_usec = G_USEC_PER_SEC + endTimestamp.tv_usec - iniTimestamp.tv_usec;
+    }
+    
+    std::cout << "Elapsed: " << elapsedTime.tv_sec << "." << elapsedTime.tv_usec << std::endl;
+    std::cout << "=== End of testing lock-free queue ===" << std::endl;    
+}
+
+int main(int /*argc*/, char** /*argv*/)
+{
+
+       TestLockFreeQueue();
+    std::cout << "Done!!!" << std::endl;
+
+       return 0;
+}
+