assigns workerID to the workerTR
[IRC.git] / Robust / src / Runtime / squeue.h
1 #ifndef ___MYQUE_H__
2 #define ___MYQUE_H__
3
4 //////////////////////////////////////////////////////////
5 //
6 //  A memory pool implements POOLCREATE, POOLALLOC and 
7 //  POOLFREE to improve memory allocation by reusing records.
8 //
9 //  This implementation uses a lock-free singly-linked list
10 //  to store reusable records.  The list is initialized with
11 //  one valid record, and the list is considered empty when
12 //  it has only one record; this allows the enqueue operation's
13 //  CAS to assume tail can always be dereferenced.
14 //
15 //  poolfree adds newly freed records to the list BACK
16 //
17 //  poolalloc either takes records from FRONT or mallocs
18 //
19 //////////////////////////////////////////////////////////
20
21 #include <stdlib.h>
22 #include "runtime.h"
23 #include "mem.h"
24 #include "mlp_lock.h"
25
26
27 #define CACHELINESIZE 64
28 #define DQ_POP_EMPTY NULL
29 #define DQ_POP_ABORT NULL
30
31
32 typedef struct sqMemPoolItem_t {
33   void* next;
34 } sqMemPoolItem;
35
36 typedef struct sqMemPool_t {
37   int itemSize;
38   sqMemPoolItem* head;
39
40   // avoid cache line contention between producer/consumer...
41   char buffer[CACHELINESIZE];
42   sqMemPoolItem* tail;
43 } sqMemPool;
44
45
46
47 typedef struct dequeItem_t {
48   void *otherqueue;
49   struct dequeItem_t * next;
50   volatile void *work;
51 } dequeItem;
52
53 typedef struct deque_t {
54   dequeItem* head;
55   // avoid cache line contention between producer/consumer...
56   char buffer[CACHELINESIZE - sizeof(void*)];
57   dequeItem* tail;
58   sqMemPool objret;
59 } deque;
60
61 #define EXTRACTPTR(x) (x&0x0000ffffffffffff)
62 #define INCREMENTTAG     0x0001000000000000
63
64 // the memory pool must always have at least one
65 // item in it
66 static void dqInit(deque *q) {
67   q->head       = calloc( 1, sizeof(dequeItem) );
68   q->head->next = NULL;
69   q->tail       = q->head;
70   q->objret.itemSize=sizeof(dequeItem);
71   q->objret.head=calloc(1, sizeof(dequeItem));
72   q->objret.head->next=NULL;
73   q->objret.tail=q->objret.head;
74 }
75
76 static inline void tagpoolfreeinto( sqMemPool* p, void* ptr, void *realptr ) {
77   // set up the now unneeded record to as the tail of the
78   // free list by treating its first bytes as next pointer,
79   sqMemPoolItem* tailNew = (sqMemPoolItem*) realptr;
80   tailNew->next = NULL;
81   CFENCE;
82   sqMemPoolItem* tailCurrent=(sqMemPoolItem *)LOCKXCHG((INTPTR *) &p->tail, (INTPTR) realptr);
83   tailCurrent->next=(sqMemPoolItem *) ptr;
84 }
85
86 static inline void* tagpoolalloc( sqMemPool* p ) {
87   // to protect CAS in poolfree from dereferencing
88   // null, treat the queue as empty when there is
89   // only one item.  The dequeue operation is only
90   // executed by the thread that owns the pool, so
91   // it doesn't require an atomic op
92   sqMemPoolItem* headCurrent = p->head;
93   sqMemPoolItem* realHead=(sqMemPoolItem *) EXTRACTPTR((INTPTR)headCurrent);
94   sqMemPoolItem* next=realHead->next;
95   int i;
96   if(next == NULL) {
97     // only one item, so don't take from pool
98     sqMemPoolItem * newitem=(sqMemPoolItem *) RUNMALLOC( p->itemSize );
99     ((dequeItem *)newitem)->next=NULL;
100     return newitem;
101   }
102   p->head = next;
103
104   sqMemPoolItem* realNext=(sqMemPoolItem *) EXTRACTPTR((INTPTR)next);
105   asm volatile( "prefetcht0 (%0)" :: "r" (realNext));
106   realNext=(sqMemPoolItem*)(((char *)realNext)+CACHELINESIZE);
107   asm volatile( "prefetcht0 (%0)" :: "r" (realNext));
108
109   return (void*)headCurrent;
110 }
111
112
113
114 // CAS
115 // in: a ptr, expected old, desired new
116 // return: actual old
117 //
118 // Pass in a ptr, what you expect the old value is and
119 // what you want the new value to be.
120 // The CAS returns what the value is actually: if it matches
121 // your proposed old value then you assume the update was successful,
122 // otherwise someone did CAS before you, so try again (the return
123 // value is the old value you will pass next time.)
124
125 static inline void dqPushBottom( deque* p, void* work ) {
126   dequeItem *ptr=(dequeItem *) tagpoolalloc(&p->objret);
127   dequeItem *realptr=(dequeItem *) EXTRACTPTR((unsigned INTPTR)ptr);
128   ptr=(dequeItem *) (((unsigned INTPTR)ptr)+INCREMENTTAG);
129   realptr->work=work;
130   BARRIER();
131   //thread unsafe enqueue...only works if one thread enqueue to a queue
132   p->tail->next=ptr;
133   p->tail=realptr;
134 }
135
136 static inline void* dqPopTop(deque *p) {
137   dequeItem *ptr=p->head;
138   dequeItem *realptr=(dequeItem *) EXTRACTPTR((INTPTR)ptr);
139   dequeItem *next=realptr->next;
140   //remove if we can..steal work no matter what
141   if (likely(next!=NULL)) {
142     if (((dequeItem *)CAS(&(p->head),(INTPTR)ptr, (INTPTR)next))!=ptr)
143       return DQ_POP_EMPTY;
144     void * item=NULL;
145     item=(void *)LOCKXCHG((unsigned INTPTR*) &(realptr->work), (unsigned INTPTR) item);
146     realptr->next=NULL;
147     BARRIER();
148     tagpoolfreeinto(&p->objret,ptr, realptr);
149     return item;
150   } else {
151     void * item=NULL;
152     if (realptr->work!=NULL)
153       item=(void *) LOCKXCHG((unsigned INTPTR*) &(realptr->work), (unsigned INTPTR) item);
154     return item;
155   }
156 }
157
158 #define dqPopBottom dqPopTop
159
160 #endif // ___MEMPOOL_H__
161
162
163
164
165
166
167
168
169
170