4 template <typename t_element, size_t t_size>
5 struct mpmc_boundq_1_alt
9 // elements should generally be cache-line-size padded :
10 t_element m_array[t_size];
12 // rdwr counts the reads & writes that have started
13 atomic<unsigned int> m_rdwr;
14 // "read" and "written" count the number completed
15 atomic<unsigned int> m_read;
16 atomic<unsigned int> m_written;
32 CLASS = mpmc_boundq_1_alt;
43 list = new_spec_list();
46 elem* new_elem(t_element *pos, call_id_t id) {
47 elem *e = (elem*) MODEL_MALLOC(sizeof(elem));
53 elem* get_elem(t_element *pos) {
54 for (int i = 0; i < size(list); i++) {
55 elem *e = (elem*) elem_at_index(list, i);
63 int has_elem(t_element *pos) {
64 for (int i = 0; i < size(list); i++) {
65 elem *existing = (elem*) elem_at_index(list, i);
66 if (pos == existing->pos) {
73 void prepare(t_element *pos) {
78 void publish(t_element *pos) {
83 void consume_elem(t_element *pos) {
84 int idx = has_elem(pos);
87 remove_at_index(list, idx);
92 //-----------------------------------------------------
94 t_element * read_fetch() {
95 unsigned int rdwr = m_rdwr.load(mo_acquire);
98 rd = (rdwr>>16) & 0xFFFF;
101 if ( wr == rd ) { // empty
105 if ( m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel) )
113 while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) {
117 t_element * p = & ( m_array[ rd % t_size ] );
122 void read_consume() {
123 m_read.fetch_add(1,mo_release);
126 //-----------------------------------------------------
128 t_element * write_prepare() {
129 unsigned int rdwr = m_rdwr.load(mo_acquire);
132 rd = (rdwr>>16) & 0xFFFF;
135 if ( wr == ((rd + t_size)&0xFFFF) ) // full
138 if ( m_rdwr.compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_acq_rel) )
146 while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) {
151 t_element * p = & ( m_array[ wr % t_size ] );
158 m_written.fetch_add(1,mo_release);
161 //-----------------------------------------------------