Changes... small fix to jim's bugfix...touching our record after we free it is also...
[IRC.git] / Robust / src / Runtime / oooJava / hashStructure.c
1 #include "hashStructure.h"
2 //#include "WaitingQueue.h"
3 #include "mlp_lock.h"
4 #include "rcr_runtime.h"
5 #include "mem.h"
6 #include "classdefs.h"
7
8
9 //NOTE: this is only temporary (for testing) and will be removed in favor of thread local variables
10 //It's basically an array of hashStructures so we can simulate what would happen in a many-threaded version
11 HashStructure ** allHashStructures;
12 #define ISWRITEBIN(x) (x&BINMASK)
13 #define ISREADBIN(x) (!(x&BINMASK))
14 //#define POPCOUNT(x) __builtin_popcountll(x)
15 //__builtin_popcountll
16
17
18 inline enqueuerecord(struct rcrRecord *rcrrec, int tmpkey, BinItem_rcr *item) {
19   if (likely(rcrrec!=NULL)) {
20     struct rcrRecord * tmprec;
21     if(likely(rcrrec->index<RCRSIZE)) {
22       int index=rcrrec->index++;
23       rcrrec->ptrarray[index]=(void *) item;
24       rcrrec->array[index]=tmpkey;
25     } else if(likely((tmprec=rcrrec->next)!=NULL)&&likely(tmprec->index<RCRSIZE)) {
26       int index=tmprec->index++;
27       tmprec->ptrarray[index]=(void *) item;
28       tmprec->array[index]=tmpkey;
29     } else {
30       struct rcrRecord *trec=RUNMALLOC(sizeof(struct rcrRecord));
31       trec->ptrarray[0]=(void *) item;
32       trec->array[0]=tmpkey;
33       trec->index=1;
34       trec->next=tmprec;
35       rcrrec->next=trec;
36     }
37   }
38 }
39
40 //NOTE: only temporary
41 void rcr_createMasterHashTableArray(int maxSize){
42   allHashStructures = (HashStructure **) malloc(sizeof(HashStructure *) * maxSize);
43 }
44
45 HashStructure* rcr_createHashtable(int sizeofWaitingQueue){
46   int i=0;
47   HashStructure* newTable=(HashStructure*)RUNMALLOC(sizeof(HashStructure));
48   for(i=0;i<RNUMBINS;i++){
49     newTable->array[i].head=NULL;
50     newTable->array[i].tail=NULL;
51   }
52
53   return newTable;
54 }
55
56 WriteBinItem_rcr* rcr_createWriteBinItem(){
57   WriteBinItem_rcr* binitem=(WriteBinItem_rcr*)RUNMALLOC(sizeof(WriteBinItem_rcr));
58   binitem->item.type=WRITEBIN;
59   return binitem;
60 }
61
62 ReadBinItem_rcr* rcr_createReadBinItem(){
63   ReadBinItem_rcr* binitem=(ReadBinItem_rcr*)RUNMALLOC(sizeof(ReadBinItem_rcr));
64   binitem->index=0;
65   binitem->item.type=READBIN;
66   return binitem;
67 }
68
69 inline int rcr_generateKey(void * ptr){
70   return (((struct ___Object___ *) ptr)->oid)&RH_MASK;
71 }
72
73 inline int rcr_BWRITEBINCASE(HashStructure *T, int key, SESEcommon *task, struct rcrRecord *rcrrec, int index, int mode) {
74   //chain of bins exists => tail is valid
75   //if there is something in front of us, then we are not ready
76   BinItem_rcr * val;
77   BinElement_rcr* be= &(T->array[key]); //do not grab head from here since it's locked (i.e. = 0x1)
78
79   //LOCK is still needed as different threads will remove items...
80   do {  
81     val=(BinItem_rcr *)0x1;       
82     val=(BinItem_rcr *)LOCKXCHG((unsigned INTPTR*)&(be->head), (unsigned INTPTR)val);
83   } while(val==(BinItem_rcr*)0x1);     
84
85   if (val==NULL) {
86     BinItem_rcr * b=(BinItem_rcr*)rcr_createWriteBinItem();
87     WriteBinItem_rcr * td = (WriteBinItem_rcr*)b;
88     b->total=1;
89     b->status=READY;
90     
91     //common to both types
92     td->task=task;
93     td->bitindexrd=td->bitindexwr=1<<index;
94     be->tail=b;
95     BARRIER();//do tail before head
96     //release lock
97     be->head=b;
98     enqueuerecord(rcrrec, key, b);
99     return READY;
100   }
101   BARRIER();//read head before tail
102   BinItem_rcr *bintail=be->tail;
103   bitvt rdmask=0,wrmask=0;
104   int status=NOTREADY;
105
106   if (ISWRITEBIN(bintail->type)) {
107     WriteBinItem_rcr * td = (WriteBinItem_rcr *)bintail;
108     //last one is to check for SESE blocks in a while loop.
109     if(unlikely(td->task == task)) {
110
111       bitvt bit=1<<index;
112       if (!(bit & td->bitindexwr)) {
113         td->bitindexwr|=bit;
114         td->bitindexrd|=bit;
115         be->head=val;
116         if (mode) {
117           while(bintail->status!=READY) {
118             BARRIER();
119           }
120           return READY;
121         } else {
122           return bintail->status;
123         }
124       } else {
125         be->head=val;
126         return READY;
127       }
128     }
129   } else {
130     TraverserData * td = &((ReadBinItem_rcr *)bintail)->array[((ReadBinItem_rcr *)bintail)->index - 1];
131     if(unlikely(td->task == task)) {
132       //if it matches, then we remove it and the code below will upgrade it to a write.
133       ((ReadBinItem_rcr *)bintail)->index--;
134       atomic_dec(&bintail->total);
135       rdmask=td->bitindex;
136       if (bintail->status!=READY)
137         wrmask=rdmask;
138       status=SPECNOTREADY;
139     }
140   }
141
142   WriteBinItem_rcr *b=rcr_createWriteBinItem();
143   b->item.total=1;
144   b->task=task;
145
146   bitvt bit=1<<index;
147   if (wrmask&bit) {
148     //count already includes this
149     status=SPECREADY;
150   }
151   b->bitindexwr=bit|wrmask;
152   b->bitindexrd=bit|rdmask;
153   b->item.status=status;
154   bintail->next=(BinItem_rcr*)b;
155   be->tail=(BinItem_rcr*)b;
156   
157   if (bintail->status==READY&&bintail->total==0) {
158     //we may have to set write as ready
159     while(val->total==0) {
160       if (val==((BinItem_rcr *)b)) {
161         b->item.status=READY;
162         be->head=val;
163         if (status&SPEC) {
164           return READY;
165         } else {
166           enqueuerecord(rcrrec, key, (BinItem_rcr *) b);
167           return READY;
168         }
169       }
170       val=val->next;
171     }
172   }
173
174   //RELEASE LOCK
175   be->head=val;
176   if (mode) {
177     while(b->item.status==NOTREADY) {
178       BARRIER();
179     }
180     return status&READY;
181   } else {
182     if (!(status&SPEC))
183       enqueuerecord(rcrrec, key, (BinItem_rcr *) b);
184     return status&READY;
185   }
186 }
187
188 inline int rcr_BREADBINCASE(HashStructure *T, int key, SESEcommon *task, struct rcrRecord *rcrrec, int index, int mode) {
189   BinItem_rcr * val;
190   BinElement_rcr * be = &(T->array[key]);
191   
192   //LOCK is still needed as different threads will remove items...
193   do {  
194     val=(BinItem_rcr *)0x1;       
195     val=(BinItem_rcr *)LOCKXCHG((unsigned INTPTR*)&(be->head), (unsigned INTPTR)val);
196   } while(val==(BinItem_rcr*)0x1);     
197   
198   if (val==NULL) {
199     BinItem_rcr * b=(BinItem_rcr*)rcr_createReadBinItem();
200     ReadBinItem_rcr* readbin=(ReadBinItem_rcr*)b;
201     TraverserData * td = &(readbin->array[readbin->index++]);
202     b->total=1;
203     b->status=READY;
204     
205     //common to both types
206     td->task=task;
207     td->bitindex=1<<index;
208     be->tail=b;
209     
210     //release lock
211     be->head=b;
212     enqueuerecord(rcrrec, key, b);    
213     return READY;
214   }
215
216
217   BinItem_rcr * bintail=be->tail;
218   
219   //check if already added item or not.
220   if (ISWRITEBIN(bintail->type)) {
221     WriteBinItem_rcr * td = (WriteBinItem_rcr *)bintail;
222     if(unlikely(td->task==task)) {
223       //RELEASE LOCK
224       bitvt bit=1<<index;
225       int status=bintail->status;
226       if (!(td->bitindexrd & bit)) {
227         td->bitindexrd|=bit;
228         td->bitindexwr|=bit;
229       } else 
230         status=READY;
231       be->head=val;
232       if (mode) {
233         while(bintail->status!=READY) {
234           BARRIER();
235         }
236         return READY;
237       } else {
238         return status;
239       }
240     }
241   } else {
242     TraverserData * td = &((ReadBinItem_rcr *)bintail)->array[((ReadBinItem_rcr *)bintail)->index - 1];
243     if (unlikely(td->task==task)) {
244       //RELEASE LOCK
245       bitvt bit=1<<index;
246       int status=bintail->status;
247       if (!(td->bitindex & bit)) {
248         td->bitindex|=bit;
249       } else 
250         status=READY;
251       be->head=val;
252       if (mode) {
253         while(bintail->status!=READY) {
254           BARRIER();
255         }
256       }
257       return status;
258     }
259   }
260
261   if (ISREADBIN(bintail->type)) {
262     int stat=rcr_TAILREADCASE(T, val, bintail, key, task, rcrrec, index);
263     if (mode) {
264       struct BinItem_rcr * bt=be->tail;
265       while(bt->status!=READY) {
266         BARRIER();
267       }
268       return READY;
269     } else {
270       return stat;
271     }
272   } else {
273     rcr_TAILWRITECASE(T, val, bintail, key, task, rcrrec, index);
274     if (mode) {
275       struct BinItem_rcr * bt=be->tail;
276       while(bt->status!=READY) {
277         BARRIER();
278       }
279       return READY;
280     } else {
281       return NOTREADY;
282     }
283   }
284 }
285
286
287 int rcr_WRITEBINCASE(HashStructure *T, int key, SESEcommon *task, struct rcrRecord *rcrrec, int index) {
288   return rcr_BWRITEBINCASE(T, key, task, rcrrec, index, 0);
289 }
290 int rcr_READBINCASE(HashStructure *T, int key, SESEcommon * task, struct rcrRecord *rcrrec, int index) {
291   return rcr_BREADBINCASE(T, key, task, rcrrec, index, 0);
292 }
293
294 int rcr_WTWRITEBINCASE(HashStructure *T, int key, SESEcommon *task, struct rcrRecord *rcrrec, int index) {
295   return rcr_BWRITEBINCASE(T, key, task, rcrrec, index, 1);
296 }
297
298 int rcr_WTREADBINCASE(HashStructure *T, int key, SESEcommon * task, struct rcrRecord *rcrrec, int index) {
299   return rcr_BREADBINCASE(T, key, task, rcrrec, index, 1);
300 }
301
302  int rcr_TAILREADCASE(HashStructure *T, BinItem_rcr *val, BinItem_rcr *bintail, int key, SESEcommon * task, struct rcrRecord * rcrrec, int index) {
303   ReadBinItem_rcr * readbintail=(ReadBinItem_rcr*)T->array[key].tail;
304   int status, retval;
305   TraverserData *td;
306   if (readbintail->item.status==READY) {
307     status=READY;
308     retval=READY;
309   } else {
310     status=NOTREADY;
311     retval=NOTREADY;
312   }
313
314   if (readbintail->index==RNUMREAD) { // create new read group
315     ReadBinItem_rcr* rb=rcr_createReadBinItem();
316     td = &rb->array[rb->index++];
317
318     rb->item.total=1;
319     rb->item.status=status;
320     T->array[key].tail->next=(BinItem_rcr*)rb;
321     T->array[key].tail=(BinItem_rcr*)rb;
322     enqueuerecord(rcrrec, key, (BinItem_rcr *) rb);
323   } else { // group into old tail
324     td = &readbintail->array[readbintail->index++];
325     atomic_inc(&readbintail->item.total);
326     enqueuerecord(rcrrec, key, (BinItem_rcr *) readbintail);
327   }
328
329   td->task=task;
330   td->bitindex=1<<index;
331
332   T->array[key].head=val;//released lock
333   return retval;
334 }
335
336 void rcr_TAILWRITECASE(HashStructure *T, BinItem_rcr *val, BinItem_rcr *bintail, int key, SESEcommon * task, struct rcrRecord *rcrrec, int index) {
337   ReadBinItem_rcr* rb=rcr_createReadBinItem();
338   TraverserData * td = &(rb->array[rb->index++]);
339   rb->item.total=1;
340   rb->item.status=NOTREADY;
341
342   td->task=task;
343   td->bitindex=1<<index;
344   enqueuerecord(rcrrec, key, (BinItem_rcr *) rb);
345
346   T->array[key].tail->next=(BinItem_rcr*)rb;
347   T->array[key].tail=(BinItem_rcr*)rb;
348   T->array[key].head=val;//released lock
349 }
350
351 void rcr_RETIREHASHTABLE(HashStructure *T, SESEcommon *task, int key, BinItem_rcr *b) {
352   atomic_dec(&b->total);
353   if(ISREADBIN(b->type)) {
354     if (b->next==NULL || b->total>0) {
355       return;
356     }
357   }
358   
359   //We either have a write bin or we are at the end of a read bin
360   BinElement_rcr * be = &(T->array[key]);
361   {
362     // CHECK FIRST IF next is nonnull to guarantee that b.total cannot change
363     BinItem_rcr * val=(BinItem_rcr *)0x1;
364     do {
365       val=(BinItem_rcr*)LOCKXCHG((unsigned INTPTR*)&(be->head), (unsigned INTPTR)val);
366     } while(val==(BinItem_rcr*)0x1);
367     
368     // at this point have locked bin
369     BinItem_rcr *ptr=val;
370     int haveread=FALSE;
371     int i;
372     while (ptr!=NULL) {
373       if (ISREADBIN(ptr->type)) {
374         if (ptr->status==NOTREADY) {
375           ReadBinItem_rcr* rptr=(ReadBinItem_rcr*)ptr;
376           for (i=0;i<rptr->index;i++) {
377             TraverserData * td=&rptr->array[i];
378             if (task==td->task) {
379               RESOLVE(td->task, td->bitindex);
380               if (((INTPTR)rptr->array[i].task)&PARENTBIN) {
381                 //parents go immediately
382                 atomic_dec(&rptr->item.total);
383               }
384               break;
385             }
386           }
387           ptr->status=READY;
388         }
389         if (ptr->next==NULL) {
390           break;
391         }
392         if (ptr->total!=0) {
393           haveread=TRUE;
394         } else if (ptr==val) {
395           val=val->next;
396         }
397       } else if (ptr->total==0) {
398         //skip past retired item
399         if (ptr==val)
400           val=val->next;
401       } else {
402         //write bin case
403         if (haveread)
404           break;
405         if(ptr->status==NOTREADY) {
406           WriteBinItem_rcr* wptr=(WriteBinItem_rcr*)ptr;
407           RESOLVE(wptr->task, wptr->bitindexwr);
408           ptr->status=READY;
409           if(((INTPTR)wptr->task)&PARENTBIN) {
410             val=val->next;
411           } else
412             break;
413         }
414       }
415       ptr=ptr->next;
416     }
417     be->head=val; // release lock
418   }
419 }
420  
421 void RESOLVE(SESEcommon *record, bitvt mask) {
422   int index=-1;
423   struct rcrRecord * array=(struct rcrRecord *)(((char *)record)+record->offsetToParamRecords);
424   while(mask!=0) {
425     int shift=__builtin_ctzll(mask)+1;
426     index+=shift;
427     if (atomic_sub_and_test(1,&array[index].count)) {
428       if(unlikely(record->classID<0)) {
429         //parent stall...clear it
430         psem_give_tag(record->parentsStallSem, ((SESEstall *)record)->tag);
431         //mark the record unused
432         BARRIER();
433         record->rcrstatus=0;
434       } else {
435         int flag=LOCKXCHG32(&array[index].flag,0);
436         if (flag) {
437           if(atomic_sub_and_test(1, &(record->unresolvedDependencies))) 
438             workScheduleSubmit((void *)record);
439         }
440       }
441     }
442     mask=mask>>shift;
443   }
444 }