add feature to memPool for detecting misuse of records by clients, also added a funct...
[IRC.git] / Robust / src / Runtime / squeue.h
1 #ifndef ___MYQUE_H__
2 #define ___MYQUE_H__
3
4 //////////////////////////////////////////////////////////
5 //
6 //  A memory pool implements POOLCREATE, POOLALLOC and 
7 //  POOLFREE to improve memory allocation by reusing records.
8 //
9 //  This implementation uses a lock-free singly-linked list
10 //  to store reusable records.  The list is initialized with
11 //  one valid record, and the list is considered empty when
12 //  it has only one record; this allows the enqueue operation's
13 //  CAS to assume tail can always be dereferenced.
14 //
15 //  poolfree adds newly freed records to the list BACK
16 //
17 //  poolalloc either takes records from FRONT or mallocs
18 //
19 //////////////////////////////////////////////////////////
20
21 #include <stdlib.h>
22 #include "runtime.h"
23 #include "mem.h"
24 #include "mlp_lock.h"
25
26
27 #define CACHELINESIZE 64
28 #define DQ_POP_EMPTY NULL
29 #define DQ_POP_ABORT NULL
30
31
32 typedef struct sqMemPoolItem_t {
33   void* next;
34 } sqMemPoolItem;
35
36
37 typedef struct sqMemPool_t {
38   int itemSize;
39
40   sqMemPoolItem* head;
41
42   // avoid cache line contention between producer/consumer...
43   char buffer[CACHELINESIZE];
44
45   sqMemPoolItem* tail;
46
47 } sqMemPool;
48
49
50
51 typedef struct dequeItem_t {
52   void *otherqueue;
53   struct dequeItem_t * next;
54   volatile void *work;
55 } dequeItem;
56
57 typedef struct deque_t {
58   dequeItem* head;
59
60   // avoid cache line contention between producer/consumer...
61   char buffer[CACHELINESIZE - sizeof(void*)];
62
63   dequeItem* tail;
64   
65   sqMemPool objret;
66 } deque;
67
68 #define EXTRACTPTR(x) (x&0x0000ffffffffffff)
69 #define INCREMENTTAG     0x0001000000000000
70
71 // the memory pool must always have at least one
72 // item in it
73 static void dqInit(deque *q) {
74   q->head       = calloc( 1, sizeof(dequeItem) );
75   q->head->next = NULL;
76   q->tail       = q->head;
77   q->objret.itemSize=sizeof(dequeItem);
78   q->objret.head=calloc(1, sizeof(dequeItem));
79   q->objret.head->next=NULL;
80   q->objret.tail=q->objret.head;
81 }
82
83 static inline void tagpoolfreeinto( sqMemPool* p, void* ptr, void *realptr ) {
84   sqMemPoolItem* tailCurrent;
85   sqMemPoolItem* tailActual;
86   
87   // set up the now unneeded record to as the tail of the
88   // free list by treating its first bytes as next pointer,
89   sqMemPoolItem* tailNew = (sqMemPoolItem*) realptr;
90   tailNew->next = NULL;
91
92   while( 1 ) {
93     // make sure the null happens before the insertion,
94     // also makes sure that we reload tailCurrent, etc..
95     BARRIER();
96
97     tailCurrent = p->tail;
98     tailActual = (sqMemPoolItem*)
99       CAS( &(p->tail),         // ptr to set
100            (INTPTR) tailCurrent, // current tail's next should be NULL
101            (INTPTR) realptr);  // try set to our new tail
102     
103     if( tailActual == tailCurrent ) {
104       // success, update tail
105       tailCurrent->next = (sqMemPoolItem *) ptr;
106       return;
107     }
108
109     // if CAS failed, retry entire operation
110   }
111 }
112
113 static inline void* tagpoolalloc( sqMemPool* p ) {
114
115   // to protect CAS in poolfree from dereferencing
116   // null, treat the queue as empty when there is
117   // only one item.  The dequeue operation is only
118   // executed by the thread that owns the pool, so
119   // it doesn't require an atomic op
120   sqMemPoolItem* headCurrent = p->head;
121   sqMemPoolItem* realHead=(sqMemPoolItem *) EXTRACTPTR((INTPTR)headCurrent);
122   sqMemPoolItem* next=realHead->next;
123   int i;
124   if(next == NULL) {
125     // only one item, so don't take from pool
126     return (void*) RUNMALLOC( p->itemSize );
127   }
128  
129   p->head = next;
130
131   //////////////////////////////////////////////////////////
132   //
133   //
134   //  static inline void prefetch(void *x) 
135   //  { 
136   //    asm volatile("prefetcht0 %0" :: "m" (*(unsigned long *)x));
137   //  } 
138   //
139   //
140   //  but this built-in gcc one seems the most portable:
141   //////////////////////////////////////////////////////////
142   //__builtin_prefetch( &(p->head->next) );
143   sqMemPoolItem* realNext=(sqMemPoolItem *) EXTRACTPTR((INTPTR)next);
144   asm volatile( "prefetcht0 (%0)" :: "r" (realNext));
145   realNext=(sqMemPoolItem*)(((char *)realNext)+CACHELINESIZE);
146   asm volatile( "prefetcht0 (%0)" :: "r" (realNext));
147
148   return (void*)headCurrent;
149 }
150
151
152
153 // CAS
154 // in: a ptr, expected old, desired new
155 // return: actual old
156 //
157 // Pass in a ptr, what you expect the old value is and
158 // what you want the new value to be.
159 // The CAS returns what the value is actually: if it matches
160 // your proposed old value then you assume the update was successful,
161 // otherwise someone did CAS before you, so try again (the return
162 // value is the old value you will pass next time.)
163
164 static inline void dqPushBottom( deque* p, void* work ) {
165   dequeItem *ptr=(dequeItem *) tagpoolalloc(&p->objret);
166   //  dequeItem *ptr=(dequeItem *) calloc(1,sizeof(dequeItem));
167   dequeItem *realptr=(dequeItem *) EXTRACTPTR((INTPTR)ptr);
168   ptr=(dequeItem *) (((INTPTR)ptr)+INCREMENTTAG);
169   realptr->work=work;
170   BARRIER();
171   p->tail->next=ptr;
172   p->tail=realptr;
173 }
174
175 static inline void* dqPopTop(deque *p) {
176   dequeItem *ptr=p->head;
177   dequeItem *realptr=(dequeItem *) EXTRACTPTR((INTPTR)ptr);
178   dequeItem *next=realptr->next;
179   //remove if we can..steal work no matter what
180   if (likely(next!=NULL)) {
181     if (((dequeItem *)CAS(&(p->head),(INTPTR)ptr, (INTPTR)next))!=ptr)
182       return DQ_POP_EMPTY;
183     void * item=NULL;
184     item=(void *)LOCKXCHG((unsigned INTPTR*) &(realptr->work), (unsigned INTPTR) item);
185     realptr->next=NULL;
186     BARRIER();
187     tagpoolfreeinto(&p->objret,ptr, realptr);
188     return item;
189   } else {
190     void * item=NULL;
191     item=(void *) LOCKXCHG((unsigned INTPTR*) &(realptr->work), (unsigned INTPTR) item);
192     return item;
193   }
194 }
195
196 #define dqPopBottom dqPopTop
197
198
199 #endif // ___MEMPOOL_H__
200
201
202
203
204
205
206
207
208
209