fix bug
[model-checker-benchmarks.git] / queue / g_blocking_queue_impl.h
1 // ============================================================================
2 // Copyright (c) 2009-2010 Faustino Frechilla
3 // All rights reserved.
4 //
5 // Redistribution and use in source and binary forms, with or without
6 // modification, are permitted provided that the following conditions are met:
7 //
8 //  1. Redistributions of source code must retain the above copyright notice,
9 //     this list of conditions and the following disclaimer.
10 //  2. Redistributions in binary form must reproduce the above copyright
11 //     notice, this list of conditions and the following disclaimer in the
12 //     documentation and/or other materials provided with the distribution.
13 //  3. The name of the author may not be used to endorse or promote products
14 //     derived from this software without specific prior written permission.
15 //
16 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
20 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 // POSSIBILITY OF SUCH DAMAGE.
27 //
28 /// @file q_blocking_queue_impl.h
29 /// @brief Implementation of a thread-safe queue based on glib system calls
30 /// It internally contains a std::queue which is protected from concurrent
31 /// access by glib mutextes and conditional variables
32 ///
33 /// @author Faustino Frechilla
34 /// @history
35 /// Ref  Who                 When         What
36 ///      Faustino Frechilla 04-May-2009 Original development (based on pthreads)
37 ///      Faustino Frechilla 19-May-2010 Ported to glib. Removed pthread dependency
38 /// @endhistory
39 ///
40 // ============================================================================
41
42 #ifndef _GBLOCKINGQUEUEIMPL_H_
43 #define _GBLOCKINGQUEUEIMPL_H_
44
45 #include <assert.h>
46
47 #define NANOSECONDS_PER_SECOND 1000000000
48
49 template <typename T>
50 BlockingQueue<T>::BlockingQueue(std::size_t a_maxSize) :
51     m_maximumSize(a_maxSize)
52 {
53     if (!g_thread_supported ())
54     {
55         // glib thread system hasn't been initialized yet
56         g_thread_init(NULL);
57     }
58
59     m_mutex = g_mutex_new();
60     m_cond  = g_cond_new();
61
62     assert(m_mutex != NULL);
63     assert(m_cond != NULL);
64 }
65
66 template <typename T>
67 BlockingQueue<T>::~BlockingQueue()
68 {
69     g_cond_free(m_cond);
70     g_mutex_free(m_mutex);
71 }
72
73 template <typename T>
74 bool BlockingQueue<T>::IsEmpty()
75 {
76     bool rv;
77
78     g_mutex_lock(m_mutex);
79     rv = m_theQueue.empty();
80     g_mutex_unlock(m_mutex);
81
82     return rv;
83 }
84
85 template <typename T>
86 bool BlockingQueue<T>::Push(const T &a_elem)
87 {
88     g_mutex_lock(m_mutex);
89
90     while (m_theQueue.size() >= m_maximumSize)
91     {
92         g_cond_wait(m_cond, m_mutex);
93     }
94
95     bool queueEmpty = m_theQueue.empty();
96
97     m_theQueue.push(a_elem);
98
99     if (queueEmpty)
100     {
101         // wake up threads waiting for stuff
102         g_cond_broadcast(m_cond);
103     }
104
105     g_mutex_unlock(m_mutex);
106
107     return true;
108 }
109
110 template <typename T>
111 bool BlockingQueue<T>::TryPush(const T &a_elem)
112 {
113     g_mutex_lock(m_mutex);
114
115     bool rv = false;
116     bool queueEmpty = m_theQueue.empty();
117
118     if (m_theQueue.size() < m_maximumSize)
119     {
120         m_theQueue.push(a_elem);
121         rv = true;
122     }
123
124     if (queueEmpty)
125     {
126         // wake up threads waiting for stuff
127         g_cond_broadcast(m_cond);
128     }
129
130     g_mutex_unlock(m_mutex);
131
132     return rv;
133 }
134
135 template <typename T>
136 void BlockingQueue<T>::Pop(T &out_data)
137 {
138     g_mutex_lock(m_mutex);
139
140     while (m_theQueue.empty())
141     {
142         g_cond_wait(m_cond, m_mutex);
143     }
144
145     bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
146
147     out_data = m_theQueue.front();
148     m_theQueue.pop();
149
150     if (queueFull)
151     {
152         // wake up threads waiting for stuff
153         g_cond_broadcast(m_cond);
154     }
155
156     g_mutex_unlock(m_mutex);
157 }
158
159 template <typename T>
160 bool BlockingQueue<T>::TryPop(T &out_data)
161 {
162     g_mutex_lock(m_mutex);
163
164     bool rv = false;
165     if (!m_theQueue.empty())
166     {
167         bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
168
169         out_data = m_theQueue.front();
170         m_theQueue.pop();
171
172         if (queueFull)
173         {
174             // wake up threads waiting for stuff
175             g_cond_broadcast(m_cond);
176         }
177
178         rv = true;
179     }
180
181     g_mutex_unlock(m_mutex);
182
183     return rv;
184 }
185
186 template <typename T>
187 bool BlockingQueue<T>::TimedWaitPop(T &data, glong microsecs)
188 {
189     g_mutex_lock(m_mutex);
190
191     // adding microsecs to now
192     GTimeVal abs_time;
193     g_get_current_time(&abs_time);
194     g_time_val_add(&abs_time, microsecs);
195
196     gboolean retcode = TRUE;
197     while (m_theQueue.empty() && (retcode != FALSE))
198     {
199         // Returns TRUE if cond was signalled, or FALSE on timeout
200         retcode = g_cond_timed_wait(m_cond, m_mutex, &abs_time);
201     }
202
203     bool rv = false;
204     bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
205     if (retcode != FALSE)
206     {
207         data = m_theQueue.front();
208         m_theQueue.pop();
209
210         rv = true;
211     }
212
213     if (rv && queueFull)
214     {
215         // wake up threads waiting for stuff
216         g_cond_broadcast(m_cond);
217     }
218
219     g_mutex_unlock(m_mutex);
220
221     return rv;
222 }
223
224 #endif /* _GBLOCKINGQUEUEIMPL_H_ */