more data structures
[cdsspec-compiler.git] / benchmark / mpmc-queue / mpmc-queue.h
1 #include <stdatomic.h>
2 #include <unrelacy.h>
3
4 template <typename t_element, size_t t_size>
5 struct mpmc_boundq_1_alt
6 {
7 private:
8
9         // elements should generally be cache-line-size padded :
10         t_element               m_array[t_size];
11
12         // rdwr counts the reads & writes that have started
13         atomic<unsigned int>    m_rdwr;
14         // "read" and "written" count the number completed
15         atomic<unsigned int>    m_read;
16         atomic<unsigned int>    m_written;
17
18 public:
19
20         mpmc_boundq_1_alt()
21         {
22                 m_rdwr = 0;
23                 m_read = 0;
24                 m_written = 0;
25         }
26         
27
28         /**
29         @Global_define:
30         Order_queue<unsigned int*> spec_queue;
31         */
32
33         //-----------------------------------------------------
34
35         t_element * read_fetch() {
36                 unsigned int rdwr = m_rdwr.load(mo_acquire);
37                 unsigned int rd,wr;
38                 for(;;) {
39                         rd = (rdwr>>16) & 0xFFFF;
40                         wr = rdwr & 0xFFFF;
41
42                         if ( wr == rd ) { // empty
43                                 return false;
44                         }
45
46                         if ( m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel) )
47                                 break;
48                         else
49                                 thrd_yield();
50                 }
51
52                 // (*1)
53                 rl::backoff bo;
54                 while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) {
55                         thrd_yield();
56                 }
57
58                 t_element * p = & ( m_array[ rd % t_size ] );
59                 
60                 /**
61                         @Commit_point_Check: true
62                         @Label: ANY
63                         @Check:
64                                 spec_queue.peak() == p
65                 */
66                 return p;
67         }
68
69         void read_consume() {
70                 m_read.fetch_add(1,mo_release);
71                 /**
72                         @Commit_point_define: true
73                         @Label: Read_Consume_Success
74                         @Check:
75                                 spec_queue.size() > 0
76                         @Action:
77                                 spec_queue.remove();
78                 */
79         }
80
81         //-----------------------------------------------------
82
83         t_element * write_prepare() {
84                 unsigned int rdwr = m_rdwr.load(mo_acquire);
85                 unsigned int rd,wr;
86                 for(;;) {
87                         rd = (rdwr>>16) & 0xFFFF;
88                         wr = rdwr & 0xFFFF;
89
90                         if ( wr == ((rd + t_size)&0xFFFF) ) // full
91                                 return NULL;
92
93                         if ( m_rdwr.compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_acq_rel) )
94                                 break;
95                         else
96                                 thrd_yield();
97                 }
98
99                 // (*1)
100                 rl::backoff bo;
101                 while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) {
102                         thrd_yield();
103                 }
104
105
106                 t_element * p = & ( m_array[ wr % t_size ] );
107
108                 /**
109                         @Commit_point_check: ANY
110                         @Action: spec_queue.add(p);
111                 */
112                 return p;
113         }
114
115         void write_publish()
116         {
117                 m_written.fetch_add(1,mo_release);
118         }
119
120         //-----------------------------------------------------
121
122
123 };