mpmc-queue: add MPMC queue header
[model-checker-benchmarks.git] / mpmc-queue / mpmc-queue.h
1 template <typename t_element, size_t t_size>
2 struct mpmc_boundq_1_alt
3 {
4 private:
5
6         // elements should generally be cache-line-size padded :
7         nonatomic<t_element>  m_array[t_size];
8
9         // rdwr counts the reads & writes that have started
10         atomic<unsigned int>    m_rdwr;
11         // "read" and "written" count the number completed
12         atomic<unsigned int>    m_read;
13         atomic<unsigned int>    m_written;
14
15 public:
16
17         mpmc_boundq_1_alt() : m_rdwr(0), m_read(0), m_written(0)
18         {
19         }
20
21         //-----------------------------------------------------
22
23         nonatomic<t_element> * read_fetch() {
24                 unsigned int rdwr = m_rdwr($).load(mo_acquire);
25                 unsigned int rd,wr;
26                 for(;;) {
27                         rd = (rdwr>>16) & 0xFFFF;
28                         wr = rdwr & 0xFFFF;
29
30                         if ( wr == rd ) // empty
31                                 return false;
32
33                         if ( m_rdwr($).compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel) )
34                                 break;
35                 }
36
37                 // (*1)
38                 rl::backoff bo;
39                 while ( (m_written($).load(mo_acquire) & 0xFFFF) != wr ) {
40                         bo.yield($);
41                 }
42
43                 nonatomic<t_element> * p = & ( m_array[ rd % t_size ] );
44
45                 return p;
46         }
47
48         void read_consume() {
49                 m_read($).fetch_add(1,mo_release);
50         }
51
52         //-----------------------------------------------------
53
54         nonatomic<t_element> * write_prepare() {
55                 unsigned int rdwr = m_rdwr($).load(mo_acquire);
56                 unsigned int rd,wr;
57                 for(;;) {
58                         rd = (rdwr>>16) & 0xFFFF;
59                         wr = rdwr & 0xFFFF;
60
61                         if ( wr == ((rd + t_size)&0xFFFF) ) // full
62                                 return NULL;
63
64                         if ( m_rdwr($).compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_acq_rel) )
65                                 break;
66                 }
67
68                 // (*1)
69                 rl::backoff bo;
70                 while ( (m_read($).load(mo_acquire) & 0xFFFF) != rd ) {
71                         bo.yield($);
72                 }
73
74                 nonatomic<t_element> * p = & ( m_array[ wr % t_size ] );
75
76                 return p;
77         }
78
79         void write_publish()
80         {
81                 m_written($).fetch_add(1,mo_release);
82         }
83
84         //-----------------------------------------------------
85
86
87 };