1 // ============================================================================
2 // Copyright (c) 2010 Faustino Frechilla
3 // All rights reserved.
5 // Redistribution and use in source and binary forms, with or without
6 // modification, are permitted provided that the following conditions are met:
8 // 1. Redistributions of source code must retain the above copyright notice,
9 // this list of conditions and the following disclaimer.
10 // 2. Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 // 3. The name of the author may not be used to endorse or promote products
14 // derived from this software without specific prior written permission.
16 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
20 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 // POSSIBILITY OF SUCH DAMAGE.
28 /// @file array_lock_free_queue_impl.h
29 /// @brief Implementation of a circular array based lock-free queue
31 /// @author Faustino Frechilla
34 /// Faustino Frechilla 11-Jul-2010 Original development
37 // ============================================================================
39 #ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_H__
40 #define __ARRAY_LOCK_FREE_QUEUE_IMPL_H__
42 #include <assert.h> // assert()
43 #include <sched.h> // sched_yield()
45 template <typename ELEM_T, uint32_t Q_SIZE>
46 ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() :
49 m_maximumReadIndex(0) // only for MultipleProducerThread queues
51 #ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
56 template <typename ELEM_T, uint32_t Q_SIZE>
57 ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue()
61 template <typename ELEM_T, uint32_t Q_SIZE>
63 uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)
65 // if Q_SIZE is a power of 2 this statement could be also written as
66 // return (a_count & (Q_SIZE - 1));
67 return (a_count % Q_SIZE);
70 template <typename ELEM_T, uint32_t Q_SIZE>
71 uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size()
73 #ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
76 uint32_t currentWriteIndex = m_writeIndex;
77 uint32_t currentReadIndex = m_readIndex;
79 // let's think of a scenario where this function returns bogus data
80 // 1. when the statement 'currentWriteIndex = m_writeIndex' is run
81 // m_writeIndex is 3 and m_readIndex is 2. Real size is 1
82 // 2. afterwards this thread is preemted. While this thread is inactive 2
83 // elements are inserted and removed from the queue, so m_writeIndex is 5
84 // m_readIndex 4. Real size is still 1
85 // 3. Now the current thread comes back from preemption and reads m_readIndex.
86 // currentReadIndex is 4
87 // 4. currentReadIndex is bigger than currentWriteIndex, so
88 // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
89 // it returns that the queue is almost full, when it is almost empty
91 if (currentWriteIndex >= currentReadIndex)
93 return (currentWriteIndex - currentReadIndex);
97 return (Q_SIZE + currentWriteIndex - currentReadIndex);
99 #endif // ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
102 template <typename ELEM_T, uint32_t Q_SIZE>
103 bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::push(const ELEM_T &a_data)
105 uint32_t currentReadIndex;
106 uint32_t currentWriteIndex;
110 currentWriteIndex = m_writeIndex;
111 currentReadIndex = m_readIndex;
112 if (countToIndex(currentWriteIndex + 1) ==
113 countToIndex(currentReadIndex))
119 } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
121 // We know now that this index is reserved for us. Use it to save the data
122 m_theQueue[countToIndex(currentWriteIndex)] = a_data;
124 // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
125 // inserting in the queue. It might fail if there are more than 1 producer threads because this
126 // operation has to be done in the same order as the previous CAS
127 while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
129 // this is a good place to yield the thread in case there are more
130 // software threads than hardware processors and you have more
131 // than 1 producer thread
132 // have a look at sched_yield (POSIX.1b)
136 // The value was successfully inserted into the queue
137 #ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
138 AtomicAdd(&m_count, 1);
144 template <typename ELEM_T, uint32_t Q_SIZE>
145 bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::pop(ELEM_T &a_data)
147 uint32_t currentMaximumReadIndex;
148 uint32_t currentReadIndex;
152 // to ensure thread-safety when there is more than 1 producer thread
153 // a second index is defined (m_maximumReadIndex)
154 currentReadIndex = m_readIndex;
155 currentMaximumReadIndex = m_maximumReadIndex;
157 if (countToIndex(currentReadIndex) ==
158 countToIndex(currentMaximumReadIndex))
160 // the queue is empty or
161 // a producer thread has allocate space in the queue but is
162 // waiting to commit the data into it
166 // retrieve the data from the queue
167 a_data = m_theQueue[countToIndex(currentReadIndex)];
169 // try to perfrom now the CAS operation on the read index. If we succeed
170 // a_data already contains what m_readIndex pointed to before we
172 if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
174 // got here. The value was retrieved from the queue. Note that the
175 // data inside the m_queue array is not deleted nor reseted
176 #ifdef ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
177 AtomicSub(&m_count, 1);
182 // it failed retrieving the element off the queue. Someone else must
183 // have read the element stored at countToIndex(currentReadIndex)
184 // before we could perform the CAS operation
186 } while(1); // keep looping to try again!
188 // Something went wrong. it shouldn't be possible to reach here
191 // Add this return statement to avoid compiler warnings
195 #endif // __ARRAY_LOCK_FREE_QUEUE_IMPL_H__