+++ /dev/null
-// ============================================================================
-// Copyright (c) 2009-2010 Faustino Frechilla
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are met:
-//
-// 1. Redistributions of source code must retain the above copyright notice,
-// this list of conditions and the following disclaimer.
-// 2. Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
-// documentation and/or other materials provided with the distribution.
-// 3. The name of the author may not be used to endorse or promote products
-// derived from this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
-// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-// POSSIBILITY OF SUCH DAMAGE.
-//
-/// @file q_blocking_queue_impl.h
-/// @brief Implementation of a thread-safe queue based on glib system calls
-/// It internally contains a std::queue which is protected from concurrent
-/// access by glib mutextes and conditional variables
-///
-/// @author Faustino Frechilla
-/// @history
-/// Ref Who When What
-/// Faustino Frechilla 04-May-2009 Original development (based on pthreads)
-/// Faustino Frechilla 19-May-2010 Ported to glib. Removed pthread dependency
-/// @endhistory
-///
-// ============================================================================
-
-#ifndef _GBLOCKINGQUEUEIMPL_H_
-#define _GBLOCKINGQUEUEIMPL_H_
-
-#include <assert.h>
-
-#define NANOSECONDS_PER_SECOND 1000000000
-
-template <typename T>
-BlockingQueue<T>::BlockingQueue(std::size_t a_maxSize) :
- m_maximumSize(a_maxSize)
-{
- if (!g_thread_supported ())
- {
- // glib thread system hasn't been initialized yet
- g_thread_init(NULL);
- }
-
- m_mutex = g_mutex_new();
- m_cond = g_cond_new();
-
- assert(m_mutex != NULL);
- assert(m_cond != NULL);
-}
-
-template <typename T>
-BlockingQueue<T>::~BlockingQueue()
-{
- g_cond_free(m_cond);
- g_mutex_free(m_mutex);
-}
-
-template <typename T>
-bool BlockingQueue<T>::IsEmpty()
-{
- bool rv;
-
- g_mutex_lock(m_mutex);
- rv = m_theQueue.empty();
- g_mutex_unlock(m_mutex);
-
- return rv;
-}
-
-template <typename T>
-bool BlockingQueue<T>::Push(const T &a_elem)
-{
- g_mutex_lock(m_mutex);
-
- while (m_theQueue.size() >= m_maximumSize)
- {
- g_cond_wait(m_cond, m_mutex);
- }
-
- bool queueEmpty = m_theQueue.empty();
-
- m_theQueue.push(a_elem);
-
- if (queueEmpty)
- {
- // wake up threads waiting for stuff
- g_cond_broadcast(m_cond);
- }
-
- g_mutex_unlock(m_mutex);
-
- return true;
-}
-
-template <typename T>
-bool BlockingQueue<T>::TryPush(const T &a_elem)
-{
- g_mutex_lock(m_mutex);
-
- bool rv = false;
- bool queueEmpty = m_theQueue.empty();
-
- if (m_theQueue.size() < m_maximumSize)
- {
- m_theQueue.push(a_elem);
- rv = true;
- }
-
- if (queueEmpty)
- {
- // wake up threads waiting for stuff
- g_cond_broadcast(m_cond);
- }
-
- g_mutex_unlock(m_mutex);
-
- return rv;
-}
-
-template <typename T>
-void BlockingQueue<T>::Pop(T &out_data)
-{
- g_mutex_lock(m_mutex);
-
- while (m_theQueue.empty())
- {
- g_cond_wait(m_cond, m_mutex);
- }
-
- bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
-
- out_data = m_theQueue.front();
- m_theQueue.pop();
-
- if (queueFull)
- {
- // wake up threads waiting for stuff
- g_cond_broadcast(m_cond);
- }
-
- g_mutex_unlock(m_mutex);
-}
-
-template <typename T>
-bool BlockingQueue<T>::TryPop(T &out_data)
-{
- g_mutex_lock(m_mutex);
-
- bool rv = false;
- if (!m_theQueue.empty())
- {
- bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
-
- out_data = m_theQueue.front();
- m_theQueue.pop();
-
- if (queueFull)
- {
- // wake up threads waiting for stuff
- g_cond_broadcast(m_cond);
- }
-
- rv = true;
- }
-
- g_mutex_unlock(m_mutex);
-
- return rv;
-}
-
-template <typename T>
-bool BlockingQueue<T>::TimedWaitPop(T &data, glong microsecs)
-{
- g_mutex_lock(m_mutex);
-
- // adding microsecs to now
- GTimeVal abs_time;
- g_get_current_time(&abs_time);
- g_time_val_add(&abs_time, microsecs);
-
- gboolean retcode = TRUE;
- while (m_theQueue.empty() && (retcode != FALSE))
- {
- // Returns TRUE if cond was signalled, or FALSE on timeout
- retcode = g_cond_timed_wait(m_cond, m_mutex, &abs_time);
- }
-
- bool rv = false;
- bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
- if (retcode != FALSE)
- {
- data = m_theQueue.front();
- m_theQueue.pop();
-
- rv = true;
- }
-
- if (rv && queueFull)
- {
- // wake up threads waiting for stuff
- g_cond_broadcast(m_cond);
- }
-
- g_mutex_unlock(m_mutex);
-
- return rv;
-}
-
-#endif /* _GBLOCKINGQUEUEIMPL_H_ */