fixed minor bugs
[cdsspec-compiler.git] / benchmark / ms-queue / my_queue.c
1 #include <threads.h>
2 #include <stdlib.h>
3 #include <stdatomic.h>
4 #include "librace.h"
5 #include "model-assert.h"
6
7
8 typedef unsigned long long pointer;
9 typedef atomic_ullong pointer_t;
10
11 #define MAKE_POINTER(ptr, count)        ((((pointer)count) << 32) | ptr)
12 #define PTR_MASK 0xffffffffLL
13 #define COUNT_MASK (0xffffffffLL << 32)
14
15 static inline void set_count(pointer *p, unsigned int val) { *p = (*p & ~COUNT_MASK) | ((pointer)val << 32); }
16 static inline void set_ptr(pointer *p, unsigned int val) { *p = (*p & ~PTR_MASK) | val; }
17 static inline unsigned int get_count(pointer p) { return (p & COUNT_MASK) >> 32; }
18 static inline unsigned int get_ptr(pointer p) { return p & PTR_MASK; }
19
20 typedef struct node {
21         unsigned int value;
22         pointer_t next;
23 } node_t;
24
25 typedef struct {
26         pointer_t head;
27         pointer_t tail;
28         node_t nodes[MAX_NODES + 1];
29 } queue_t;
30
31 void init_queue(queue_t *q, int num_threads);
32
33 #include <list>
34 using namespace std;
35 /**
36         @Begin
37         @Global_define:
38                 @DeclareStruct:
39                 typedef struct tag_elem {
40                         Tag id;
41                         unsigned int data;
42                 } tag_elem_t;
43                 
44                 @DeclareVar:
45                 list<tag_elem_t> __queue;
46                 Tag tag;
47                 @InitVar:
48                         __queue = list<tag_elem_t>();
49                         tag = 1; // Beginning of available id
50         @Happens_before:
51                 # Only check the happens-before relationship according to the id of the
52                 # commit_point_set. For commit_point_set that has same ID, A -> B means
53                 # B happens after the previous A.
54                 Enqueue -> Dequeue
55         @End
56 */
57
58
59 int get_thread_num();
60
61
62 #define relaxed memory_order_relaxed
63 #define release memory_order_release
64 #define acquire memory_order_acquire
65
66 #define MAX_FREELIST 4 /* Each thread can own up to MAX_FREELIST free nodes */
67 #define INITIAL_FREE 2 /* Each thread starts with INITIAL_FREE free nodes */
68
69 #define POISON_IDX 0x666
70
71 static unsigned int (*free_lists)[MAX_FREELIST];
72
73 /* Search this thread's free list for a "new" node */
74 static unsigned int new_node()
75 {
76         int i;
77         int t = get_thread_num();
78         for (i = 0; i < MAX_FREELIST; i++) {
79                 unsigned int node = load_32(&free_lists[t][i]);
80                 if (node) {
81                         store_32(&free_lists[t][i], 0);
82                         return node;
83                 }
84         }
85         /* free_list is empty? */
86         MODEL_ASSERT(0);
87         return 0;
88 }
89
90 /* Place this node index back on this thread's free list */
91 static void reclaim(unsigned int node)
92 {
93         int i;
94         int t = get_thread_num();
95
96         /* Don't reclaim NULL node */
97         MODEL_ASSERT(node);
98
99         for (i = 0; i < MAX_FREELIST; i++) {
100                 /* Should never race with our own thread here */
101                 unsigned int idx = load_32(&free_lists[t][i]);
102
103                 /* Found empty spot in free list */
104                 if (idx == 0) {
105                         store_32(&free_lists[t][i], node);
106                         return;
107                 }
108         }
109         /* free list is full? */
110         MODEL_ASSERT(0);
111 }
112
113 void init_queue(queue_t *q, int num_threads)
114 {
115         /**
116                 @Begin
117                 @Entry_point
118                 @End
119         */
120
121         int i, j;
122
123         /* Initialize each thread's free list with INITIAL_FREE pointers */
124         /* The actual nodes are initialized with poison indexes */
125         free_lists = (unsigned int**) malloc(num_threads * sizeof(*free_lists));
126         for (i = 0; i < num_threads; i++) {
127                 for (j = 0; j < INITIAL_FREE; j++) {
128                         free_lists[i][j] = 2 + i * MAX_FREELIST + j;
129                         atomic_init(&q->nodes[free_lists[i][j]].next, MAKE_POINTER(POISON_IDX, 0));
130                 }
131         }
132
133         /* initialize queue */
134         atomic_init(&q->head, MAKE_POINTER(1, 0));
135         atomic_init(&q->tail, MAKE_POINTER(1, 0));
136         atomic_init(&q->nodes[1].next, MAKE_POINTER(0, 0));
137 }
138
139 /**
140         @Begin
141         @Interface: Enqueue
142         @Commit_point_set: Enqueue_Success_Point
143         @ID: tag++
144         @Action:
145                 # __ID__ is an internal macro that refers to the id of the current
146                 # interface call
147                 tag_elem_t elem;
148                 elem.id = __ID__;
149                 elem.data = val;
150                 __queue.push_back(elem);
151         @End
152 */
153 void enqueue(queue_t *q, unsigned int val)
154 {
155         int success = 0;
156         unsigned int node;
157         pointer tail;
158         pointer next;
159         pointer tmp;
160
161         node = new_node();
162         store_32(&q->nodes[node].value, val);
163         tmp = atomic_load_explicit(&q->nodes[node].next, relaxed);
164         set_ptr(&tmp, 0); // NULL
165         atomic_store_explicit(&q->nodes[node].next, tmp, relaxed);
166
167         while (!success) {
168                 tail = atomic_load_explicit(&q->tail, acquire);
169                 next = atomic_load_explicit(&q->nodes[get_ptr(tail)].next, acquire);
170                 if (tail == atomic_load_explicit(&q->tail, relaxed)) {
171
172                         /* Check for uninitialized 'next' */
173                         MODEL_ASSERT(get_ptr(next) != POISON_IDX);
174
175                         if (get_ptr(next) == 0) { // == NULL
176                                 pointer value = MAKE_POINTER(node, get_count(next) + 1);
177                                 success = atomic_compare_exchange_strong_explicit(&q->nodes[get_ptr(tail)].next,
178                                                 &next, value, release, release);
179                         }
180                         if (!success) {
181                                 unsigned int ptr = get_ptr(atomic_load_explicit(&q->nodes[get_ptr(tail)].next, acquire));
182                                 pointer value = MAKE_POINTER(ptr,
183                                                 get_count(tail) + 1);
184                                 int commit_success = 0;
185                                 commit_success = atomic_compare_exchange_strong_explicit(&q->tail,
186                                                 &tail, value, release, release);
187                                 /**
188                                         @Begin
189                                         @Commit_point_define_check: commit_success == true
190                                         @Label: Enqueue_Success_Point
191                                         @End
192                                 */
193                                 thrd_yield();
194                         }
195                 }
196         }
197         atomic_compare_exchange_strong_explicit(&q->tail,
198                         &tail,
199                         MAKE_POINTER(node, get_count(tail) + 1),
200                         release, release);
201 }
202
203 /**
204         @Begin
205         @Interface: Dequeue
206         @Commit_point_set: Dequeue_Success_Point
207         @ID: __queue.back().id
208         @Action:
209                 unsigned int _Old_Val = __queue.front().data;
210                 __queue.pop_front();
211         @Post_check:
212                 _Old_Val == __RET__
213         @End
214 */
215 unsigned int dequeue(queue_t *q)
216 {
217         unsigned int value;
218         int success = 0;
219         pointer head;
220         pointer tail;
221         pointer next;
222
223         while (!success) {
224                 head = atomic_load_explicit(&q->head, acquire);
225                 tail = atomic_load_explicit(&q->tail, relaxed);
226                 next = atomic_load_explicit(&q->nodes[get_ptr(head)].next, acquire);
227                 if (atomic_load_explicit(&q->head, relaxed) == head) {
228                         if (get_ptr(head) == get_ptr(tail)) {
229
230                                 /* Check for uninitialized 'next' */
231                                 MODEL_ASSERT(get_ptr(next) != POISON_IDX);
232
233                                 if (get_ptr(next) == 0) { // NULL
234                                         return 0; // NULL
235                                 }
236                                 atomic_compare_exchange_strong_explicit(&q->tail,
237                                                 &tail,
238                                                 MAKE_POINTER(get_ptr(next), get_count(tail) + 1),
239                                                 release, release);
240                                 thrd_yield();
241                         } else {
242                                 value = load_32(&q->nodes[get_ptr(next)].value);
243                                 success = atomic_compare_exchange_strong_explicit(&q->head,
244                                                 &head,
245                                                 MAKE_POINTER(get_ptr(next), get_count(head) + 1),
246                                                 release, release);
247                                 /**
248                                         @Begin
249                                         @Commit_point_define_check: success == true
250                                         @Label: Dequeue_Success_Point
251                                         @End
252                                 */
253                                 if (!success)
254                                         thrd_yield();
255                         }
256                 }
257         }
258         reclaim(get_ptr(head));
259         return value;
260 }
261
262
263
264 #include <stdlib.h>
265 #include <stdio.h>
266 #include <threads.h>
267
268 #include "my_queue.h"
269 #include "model-assert.h"
270
271 static int procs = 2;
272 static queue_t *queue;
273 static thrd_t *threads;
274 static unsigned int *input;
275 static unsigned int *output;
276 static int num_threads;
277
278 int get_thread_num()
279 {
280         thrd_t curr = thrd_current();
281         int i;
282         for (i = 0; i < num_threads; i++)
283                 if (curr.priv == threads[i].priv)
284                         return i;
285         MODEL_ASSERT(0);
286         return -1;
287 }
288
289 static void main_task(void *param)
290 {
291         unsigned int val;
292         int pid = *((int *)param);
293
294         if (!pid) {
295                 input[0] = 17;
296                 enqueue(queue, input[0]);
297                 output[0] = dequeue(queue);
298         } else {
299                 input[1] = 37;
300                 enqueue(queue, input[1]);
301                 output[1] = dequeue(queue);
302         }
303 }
304
305 int user_main(int argc, char **argv)
306 {
307         int i;
308         int *param;
309         unsigned int in_sum = 0, out_sum = 0;
310
311         queue = (queue_t*) calloc(1, sizeof(*queue));
312         MODEL_ASSERT(queue);
313
314         num_threads = procs;
315         threads = (thrd_t*) malloc(num_threads * sizeof(thrd_t));
316         param = (int*) malloc(num_threads * sizeof(*param));
317         input = (unsigned int*) calloc(num_threads, sizeof(*input));
318         output = (unsigned int*) calloc(num_threads, sizeof(*output));
319
320         init_queue(queue, num_threads);
321         for (i = 0; i < num_threads; i++) {
322                 param[i] = i;
323                 thrd_create(&threads[i], main_task, &param[i]);
324         }
325         for (i = 0; i < num_threads; i++)
326                 thrd_join(threads[i]);
327
328         for (i = 0; i < num_threads; i++) {
329                 in_sum += input[i];
330                 out_sum += output[i];
331         }
332         for (i = 0; i < num_threads; i++)
333                 printf("input[%d] = %u\n", i, input[i]);
334         for (i = 0; i < num_threads; i++)
335                 printf("output[%d] = %u\n", i, output[i]);
336         MODEL_ASSERT(in_sum == out_sum);
337
338         free(param);
339         free(threads);
340         free(queue);
341
342         return 0;
343 }