Fix tabbing.... Please fix your editors so they do tabbing correctly!!! (Spaces...
[IRC.git] / Robust / src / Runtime / squeue.h
1 #ifndef ___MYQUE_H__
2 #define ___MYQUE_H__
3
4 //////////////////////////////////////////////////////////
5 //
6 //  A memory pool implements POOLCREATE, POOLALLOC and
7 //  POOLFREE to improve memory allocation by reusing records.
8 //
9 //  This implementation uses a lock-free singly-linked list
10 //  to store reusable records.  The list is initialized with
11 //  one valid record, and the list is considered empty when
12 //  it has only one record; this allows the enqueue operation's
13 //  CAS to assume tail can always be dereferenced.
14 //
15 //  poolfree adds newly freed records to the list BACK
16 //
17 //  poolalloc either takes records from FRONT or mallocs
18 //
19 //////////////////////////////////////////////////////////
20
21 #include <stdlib.h>
22 #include "runtime.h"
23 #include "mem.h"
24 #include "mlp_lock.h"
25
26
27 #define CACHELINESIZE 64
28 #define DQ_POP_EMPTY NULL
29 #define DQ_POP_ABORT NULL
30
31
32 typedef struct sqMemPoolItem_t {
33   void* next;
34 } sqMemPoolItem;
35
36 typedef struct sqMemPool_t {
37   int itemSize;
38   sqMemPoolItem* head;
39
40   // avoid cache line contention between producer/consumer...
41   char buffer[CACHELINESIZE];
42   sqMemPoolItem* tail;
43 } sqMemPool;
44
45
46
47 typedef struct dequeItem_t {
48   void *otherqueue;
49   struct dequeItem_t * next;
50   volatile void *work;
51 } dequeItem;
52
53 typedef struct deque_t {
54   dequeItem* head;
55   // avoid cache line contention between producer/consumer...
56   char buffer[CACHELINESIZE - sizeof(void*)];
57   dequeItem* tail;
58   sqMemPool objret;
59 } deque;
60
61 #define EXTRACTPTR(x) (x&0x0000ffffffffffff)
62 #define INCREMENTTAG     0x0001000000000000
63
64 // the memory pool must always have at least one
65 // item in it
66 static void dqInit(deque *q) {
67   q->head       = calloc(1, sizeof(dequeItem) );
68   q->head->next = NULL;
69   q->tail       = q->head;
70   q->objret.itemSize=sizeof(dequeItem);
71   q->objret.head=calloc(1, sizeof(dequeItem));
72   q->objret.head->next=NULL;
73   q->objret.tail=q->objret.head;
74 }
75
76 static inline void tagpoolfreeinto(sqMemPool* p, void* ptr, void *realptr) {
77   // set up the now unneeded record to as the tail of the
78   // free list by treating its first bytes as next pointer,
79   sqMemPoolItem* tailNew = (sqMemPoolItem*) realptr;
80   tailNew->next = NULL;
81   CFENCE;
82   sqMemPoolItem* tailCurrent=(sqMemPoolItem *)LOCKXCHG((INTPTR *) &p->tail, (INTPTR) realptr);
83   tailCurrent->next=(sqMemPoolItem *) ptr;
84 }
85
86 static inline void* tagpoolalloc(sqMemPool* p) {
87   // to protect CAS in poolfree from dereferencing
88   // null, treat the queue as empty when there is
89   // only one item.  The dequeue operation is only
90   // executed by the thread that owns the pool, so
91   // it doesn't require an atomic op
92   sqMemPoolItem* headCurrent = p->head;
93   sqMemPoolItem* realHead=(sqMemPoolItem *) EXTRACTPTR((INTPTR)headCurrent);
94   sqMemPoolItem* next=realHead->next;
95   int i;
96   if(next == NULL) {
97     // only one item, so don't take from pool
98     sqMemPoolItem * newitem=(sqMemPoolItem *) RUNMALLOC(p->itemSize);
99     ((dequeItem *)newitem)->next=NULL;
100     return newitem;
101   }
102   p->head = next;
103
104   sqMemPoolItem* realNext=(sqMemPoolItem *) EXTRACTPTR((INTPTR)next);
105   asm volatile ( "prefetcht0 (%0)" :: "r" (realNext));
106   realNext=(sqMemPoolItem*)(((char *)realNext)+CACHELINESIZE);
107   asm volatile ( "prefetcht0 (%0)" :: "r" (realNext));
108
109   return (void*)headCurrent;
110 }
111
112
113
114 // CAS
115 // in: a ptr, expected old, desired new
116 // return: actual old
117 //
118 // Pass in a ptr, what you expect the old value is and
119 // what you want the new value to be.
120 // The CAS returns what the value is actually: if it matches
121 // your proposed old value then you assume the update was successful,
122 // otherwise someone did CAS before you, so try again (the return
123 // value is the old value you will pass next time.)
124
125 static inline void dqPushBottom(deque* p, void* work) {
126   dequeItem *ptr=(dequeItem *) tagpoolalloc(&p->objret);
127   dequeItem *realptr=(dequeItem *) EXTRACTPTR((unsigned INTPTR)ptr);
128   ptr=(dequeItem *) (((unsigned INTPTR)ptr)+INCREMENTTAG);
129   realptr->work=work;
130   BARRIER();
131   //thread unsafe enqueue...only works if one thread enqueue to a queue
132   p->tail->next=ptr;
133   p->tail=realptr;
134 }
135
136 static inline void* dqPopTopSelf(deque *p) {
137   int tryagain=1;
138   while(1) {
139     dequeItem *ptr=p->head;
140     dequeItem *realptr=(dequeItem *) EXTRACTPTR((INTPTR)ptr);
141     dequeItem *next=realptr->next;
142     //remove if we can..steal work no matter what
143     if (likely(next!=NULL)) {
144       if (((dequeItem *)CAS(&(p->head),(INTPTR)ptr, (INTPTR)next))!=ptr)
145         return DQ_POP_EMPTY;
146       void * item=NULL;
147       item=(void *)LOCKXCHG((unsigned INTPTR*) &(realptr->work), (unsigned INTPTR) item);
148       realptr->next=NULL;
149       BARRIER();
150       tagpoolfreeinto(&p->objret,ptr, realptr);
151       if (item==NULL&&tryagain) {
152         tryagain=0;
153         continue;
154       }
155       return item;
156     } else {
157       void * item=NULL;
158       if (realptr->work!=NULL)
159         item=(void *) LOCKXCHG((unsigned INTPTR*) &(realptr->work), (unsigned INTPTR) item);
160       return item;
161     }
162   }
163 }
164
165 static inline void* dqPopTop(deque *p) {
166   dequeItem *ptr=p->head;
167   dequeItem *realptr=(dequeItem *) EXTRACTPTR((INTPTR)ptr);
168   dequeItem *next=realptr->next;
169   //remove if we can..steal work no matter what
170   if (likely(next!=NULL)) {
171     if (((dequeItem *)CAS(&(p->head),(INTPTR)ptr, (INTPTR)next))!=ptr)
172       return DQ_POP_EMPTY;
173     void * item=NULL;
174     item=(void *)LOCKXCHG((unsigned INTPTR*) &(realptr->work), (unsigned INTPTR) item);
175     realptr->next=NULL;
176     BARRIER();
177     tagpoolfreeinto(&p->objret,ptr, realptr);
178     return item;
179   } else {
180     void * item=NULL;
181     if (realptr->work!=NULL)
182       item=(void *) LOCKXCHG((unsigned INTPTR*) &(realptr->work), (unsigned INTPTR) item);
183     return item;
184   }
185 }
186
187 #define dqPopBottom dqPopTopSelf
188
189 #endif // ___MEMPOOL_H__
190
191
192
193
194
195
196
197
198
199