fix bug
[IRC.git] / Robust / src / Runtime / oooJava / hashStructure.c
1 #include "hashStructure.h"
2 //#include "WaitingQueue.h"
3 #include "mlp_lock.h"
4 #include "rcr_runtime.h"
5 #include "mem.h"
6 #include "classdefs.h"
7
8
9 //NOTE: this is only temporary (for testing) and will be removed in favor of thread local variables
10 //It's basically an array of hashStructures so we can simulate what would happen in a many-threaded version
11 __thread HashStructure ** allHashStructures;
12 #define ISWRITEBIN(x) (x&BINMASK)
13 #define ISREADBIN(x) (!(x&BINMASK))
14 //#define POPCOUNT(x) __builtin_popcountll(x)
15 //__builtin_popcountll
16 #define ONEVAL 1ULL
17
18
19 inline enqueuerecord(struct rcrRecord *rcrrec, int tmpkey, BinItem_rcr *item) {
20   if (likely(rcrrec!=NULL)) {
21     struct rcrRecord * tmprec;
22     if(likely(rcrrec->index<RCRSIZE)) {
23       int index=rcrrec->index++;
24       rcrrec->ptrarray[index]=(void *) item;
25       rcrrec->array[index]=tmpkey;
26     } else if(likely((tmprec=rcrrec->next)!=NULL)&&likely(tmprec->index<RCRSIZE)) {
27       int index=tmprec->index++;
28       tmprec->ptrarray[index]=(void *) item;
29       tmprec->array[index]=tmpkey;
30     } else {
31       struct rcrRecord *trec=RUNMALLOC(sizeof(struct rcrRecord));
32       trec->ptrarray[0]=(void *) item;
33       trec->array[0]=tmpkey;
34       trec->index=1;
35       trec->next=tmprec;
36       rcrrec->next=trec;
37     }
38   }
39 }
40
41 //NOTE: only temporary
42 HashStructure ** rcr_createMasterHashTableArray(int maxSize){
43   return (HashStructure **) malloc(sizeof(HashStructure *) * maxSize);
44 }
45
46 HashStructure* rcr_createHashtable(int sizeofWaitingQueue){
47   int i=0;
48   HashStructure* newTable=(HashStructure*)RUNMALLOC(sizeof(HashStructure));
49   for(i=0;i<RNUMBINS;i++){
50     newTable->array[i].head=NULL;
51     newTable->array[i].tail=NULL;
52   }
53
54   return newTable;
55 }
56
57 WriteBinItem_rcr* rcr_createWriteBinItem(){
58   WriteBinItem_rcr* binitem=(WriteBinItem_rcr*)RUNMALLOC(sizeof(WriteBinItem_rcr));
59   binitem->item.type=WRITEBIN;
60   return binitem;
61 }
62
63 ReadBinItem_rcr* rcr_createReadBinItem(){
64   ReadBinItem_rcr* binitem=(ReadBinItem_rcr*)RUNMALLOC(sizeof(ReadBinItem_rcr));
65   binitem->index=0;
66   binitem->item.type=READBIN;
67   return binitem;
68 }
69
70 inline int rcr_generateKey(void * ptr){
71   return (((struct ___Object___ *) ptr)->oid)&RH_MASK;
72 }
73
74 inline int rcr_BWRITEBINCASE(HashStructure *T, int key, SESEcommon *task, struct rcrRecord *rcrrec, int index, int mode) {
75   //chain of bins exists => tail is valid
76   //if there is something in front of us, then we are not ready
77   BinItem_rcr * val;
78   BinElement_rcr* be= &(T->array[key]); //do not grab head from here since it's locked (i.e. = 0x1)
79
80   //LOCK is still needed as different threads will remove items...
81   do {  
82     val=(BinItem_rcr *)0x1;       
83     val=(BinItem_rcr *)LOCKXCHG((unsigned INTPTR*)&(be->head), (unsigned INTPTR)val);
84   } while(val==(BinItem_rcr*)0x1);     
85
86   if (val==NULL) {
87     BinItem_rcr * b=(BinItem_rcr*)rcr_createWriteBinItem();
88     WriteBinItem_rcr * td = (WriteBinItem_rcr*)b;
89     b->total=1;
90     b->status=READY;
91     
92     //common to both types
93     td->task=task;
94     td->bitindexrd=td->bitindexwr=ONEVAL<<index;
95     be->tail=b;
96     BARRIER();//do tail before head
97     //release lock
98     be->head=b;
99     enqueuerecord(rcrrec, key, b);
100     return READY;
101   }
102   BARRIER();//read head before tail
103   BinItem_rcr *bintail=be->tail;
104   bitvt rdmask=0,wrmask=0;
105   int status=NOTREADY;
106
107   if (ISWRITEBIN(bintail->type)) {
108     WriteBinItem_rcr * td = (WriteBinItem_rcr *)bintail;
109     //last one is to check for SESE blocks in a while loop.
110     if(unlikely(td->task == task)) {
111
112       bitvt bit=ONEVAL<<index;
113       if (!(bit & td->bitindexwr)) {
114         td->bitindexwr|=bit;
115         td->bitindexrd|=bit;
116         be->head=val;
117         if (mode) {
118           while(bintail->status!=READY) {
119             BARRIER();
120           }
121           return READY;
122         } else {
123           return bintail->status;
124         }
125       } else {
126         be->head=val;
127         return READY;
128       }
129     }
130   } else {
131     TraverserData * td = &((ReadBinItem_rcr *)bintail)->array[((ReadBinItem_rcr *)bintail)->index - 1];
132     if(unlikely(td->task == task)) {
133       //if it matches, then we remove it and the code below will upgrade it to a write.
134       ((ReadBinItem_rcr *)bintail)->index--;
135       atomic_dec(&bintail->total);
136       rdmask=td->bitindex;
137       if (bintail->status!=READY)
138         wrmask=rdmask;
139       status=SPECNOTREADY;
140     }
141   }
142
143   WriteBinItem_rcr *b=rcr_createWriteBinItem();
144   b->item.total=1;
145   b->task=task;
146
147   bitvt bit=ONEVAL<<index;
148   if (wrmask&bit) {
149     //count already includes this
150     status=SPECREADY;
151   }
152   b->bitindexwr=bit|wrmask;
153   b->bitindexrd=bit|rdmask;
154   b->item.status=status;
155   bintail->next=(BinItem_rcr*)b;
156   be->tail=(BinItem_rcr*)b;
157   
158   if (bintail->status==READY&&bintail->total==0) {
159     //we may have to set write as ready
160     while(val->total==0) {
161       if (val==((BinItem_rcr *)b)) {
162         b->item.status=READY;
163         be->head=val;
164         if (status&SPEC) {
165           return READY;
166         } else {
167           enqueuerecord(rcrrec, key, (BinItem_rcr *) b);
168           return READY;
169         }
170       }
171       val=val->next;
172     }
173   }
174
175   //RELEASE LOCK
176   be->head=val;
177   if (mode) {
178     while(b->item.status==NOTREADY) {
179       BARRIER();
180     }
181     return status&READY;
182   } else {
183     if (!(status&SPEC))
184       enqueuerecord(rcrrec, key, (BinItem_rcr *) b);
185     return status&READY;
186   }
187 }
188
189 inline int rcr_BREADBINCASE(HashStructure *T, int key, SESEcommon *task, struct rcrRecord *rcrrec, int index, int mode) {
190   BinItem_rcr * val;
191   BinElement_rcr * be = &(T->array[key]);
192   
193   //LOCK is still needed as different threads will remove items...
194   do {  
195     val=(BinItem_rcr *)0x1;       
196     val=(BinItem_rcr *)LOCKXCHG((unsigned INTPTR*)&(be->head), (unsigned INTPTR)val);
197   } while(val==(BinItem_rcr*)0x1);     
198   
199   if (val==NULL) {
200     BinItem_rcr * b=(BinItem_rcr*)rcr_createReadBinItem();
201     ReadBinItem_rcr* readbin=(ReadBinItem_rcr*)b;
202     TraverserData * td = &(readbin->array[readbin->index++]);
203     b->total=1;
204     b->status=READY;
205     
206     //common to both types
207     td->task=task;
208     td->bitindex=ONEVAL<<index;
209     be->tail=b;
210     
211     //release lock
212     be->head=b;
213     enqueuerecord(rcrrec, key, b);    
214     return READY;
215   }
216
217
218   BinItem_rcr * bintail=be->tail;
219   
220   //check if already added item or not.
221   if (ISWRITEBIN(bintail->type)) {
222     WriteBinItem_rcr * td = (WriteBinItem_rcr *)bintail;
223     if(unlikely(td->task==task)) {
224       //RELEASE LOCK
225       bitvt bit=ONEVAL<<index;
226       int status=bintail->status;
227       if (!(td->bitindexrd & bit)) {
228         td->bitindexrd|=bit;
229         td->bitindexwr|=bit;
230       } else 
231         status=READY;
232       be->head=val;
233       if (mode) {
234         while(bintail->status!=READY) {
235           BARRIER();
236         }
237         return READY;
238       } else {
239         return status;
240       }
241     }
242   } else {
243     TraverserData * td = &((ReadBinItem_rcr *)bintail)->array[((ReadBinItem_rcr *)bintail)->index - 1];
244     if (unlikely(td->task==task)) {
245       //RELEASE LOCK
246       bitvt bit=ONEVAL<<index;
247       int status=bintail->status;
248       if (!(td->bitindex & bit)) {
249         td->bitindex|=bit;
250       } else 
251         status=READY;
252       be->head=val;
253       if (mode) {
254         while(bintail->status!=READY) {
255           BARRIER();
256         }
257       }
258       return status;
259     }
260   }
261
262   if (ISREADBIN(bintail->type)) {
263     int stat=rcr_TAILREADCASE(T, val, bintail, key, task, rcrrec, index);
264     if (mode) {
265       struct BinItem_rcr * bt=be->tail;
266       while(bt->status!=READY) {
267         BARRIER();
268       }
269       return READY;
270     } else {
271       return stat;
272     }
273   } else {
274     rcr_TAILWRITECASE(T, val, bintail, key, task, rcrrec, index);
275     if (mode) {
276       struct BinItem_rcr * bt=be->tail;
277       while(bt->status!=READY) {
278         BARRIER();
279       }
280       return READY;
281     } else {
282       return NOTREADY;
283     }
284   }
285 }
286
287
288 int rcr_WRITEBINCASE(HashStructure *T, int key, SESEcommon *task, struct rcrRecord *rcrrec, int index) {
289   return rcr_BWRITEBINCASE(T, key, task, rcrrec, index, 0);
290 }
291 int rcr_READBINCASE(HashStructure *T, int key, SESEcommon * task, struct rcrRecord *rcrrec, int index) {
292   return rcr_BREADBINCASE(T, key, task, rcrrec, index, 0);
293 }
294
295 int rcr_WTWRITEBINCASE(HashStructure *T, int key, SESEcommon *task, struct rcrRecord *rcrrec, int index) {
296   return rcr_BWRITEBINCASE(T, key, task, rcrrec, index, 1);
297 }
298
299 int rcr_WTREADBINCASE(HashStructure *T, int key, SESEcommon * task, struct rcrRecord *rcrrec, int index) {
300   return rcr_BREADBINCASE(T, key, task, rcrrec, index, 1);
301 }
302
303  int rcr_TAILREADCASE(HashStructure *T, BinItem_rcr *val, BinItem_rcr *bintail, int key, SESEcommon * task, struct rcrRecord * rcrrec, int index) {
304   ReadBinItem_rcr * readbintail=(ReadBinItem_rcr*)T->array[key].tail;
305   int status, retval;
306   TraverserData *td;
307   if (readbintail->item.status==READY) {
308     status=READY;
309     retval=READY;
310   } else {
311     status=NOTREADY;
312     retval=NOTREADY;
313   }
314
315   if (readbintail->index==RNUMREAD) { // create new read group
316     ReadBinItem_rcr* rb=rcr_createReadBinItem();
317     td = &rb->array[rb->index++];
318
319     rb->item.total=1;
320     rb->item.status=status;
321     T->array[key].tail->next=(BinItem_rcr*)rb;
322     T->array[key].tail=(BinItem_rcr*)rb;
323     enqueuerecord(rcrrec, key, (BinItem_rcr *) rb);
324   } else { // group into old tail
325     td = &readbintail->array[readbintail->index++];
326     atomic_inc(&readbintail->item.total);
327     enqueuerecord(rcrrec, key, (BinItem_rcr *) readbintail);
328   }
329
330   td->task=task;
331   td->bitindex=ONEVAL<<index;
332
333   T->array[key].head=val;//released lock
334   return retval;
335 }
336
337 void rcr_TAILWRITECASE(HashStructure *T, BinItem_rcr *val, BinItem_rcr *bintail, int key, SESEcommon * task, struct rcrRecord *rcrrec, int index) {
338   ReadBinItem_rcr* rb=rcr_createReadBinItem();
339   TraverserData * td = &(rb->array[rb->index++]);
340   rb->item.total=1;
341   rb->item.status=NOTREADY;
342
343   td->task=task;
344   td->bitindex=ONEVAL<<index;
345   enqueuerecord(rcrrec, key, (BinItem_rcr *) rb);
346
347   T->array[key].tail->next=(BinItem_rcr*)rb;
348   T->array[key].tail=(BinItem_rcr*)rb;
349   T->array[key].head=val;//released lock
350 }
351
352 void rcr_RETIREHASHTABLE(HashStructure *T, SESEcommon *task, int key, BinItem_rcr *b) {
353   atomic_dec(&b->total);
354   if(ISREADBIN(b->type)) {
355     if (b->next==NULL || b->total>0) {
356       return;
357     }
358   }
359   
360   //We either have a write bin or we are at the end of a read bin
361   BinElement_rcr * be = &(T->array[key]);
362   {
363     // CHECK FIRST IF next is nonnull to guarantee that b.total cannot change
364     BinItem_rcr * val=(BinItem_rcr *)0x1;
365     do {
366       val=(BinItem_rcr*)LOCKXCHG((unsigned INTPTR*)&(be->head), (unsigned INTPTR)val);
367     } while(val==(BinItem_rcr*)0x1);
368     
369     // at this point have locked bin
370     BinItem_rcr *ptr=val;
371     int haveread=FALSE;
372     int i;
373     while (ptr!=NULL) {
374       if (ISREADBIN(ptr->type)) {
375         if (ptr->status==NOTREADY) {
376           ReadBinItem_rcr* rptr=(ReadBinItem_rcr*)ptr;
377           for (i=0;i<rptr->index;i++) {
378             TraverserData * td=&rptr->array[i];
379             if (task==td->task) {
380               SESEcommon *record=td->task;
381               if (((INTPTR)rptr->array[i].task)&PARENTBIN) {
382                 //parents go immediately
383                 atomic_dec(&rptr->item.total);
384                 record=(SESEcommon *)(((INTPTR)record)&~1ULL);
385               }
386               RESOLVE(record, td->bitindex);
387               break;
388             }
389           }
390           ptr->status=READY;
391         }
392         if (ptr->next==NULL) {
393           break;
394         }
395         if (ptr->total!=0) {
396           haveread=TRUE;
397         } else if (ptr==val) {
398           val=val->next;
399         }
400       } else if (ptr->total==0) {
401         //skip past retired item
402         if (ptr==val)
403           val=val->next;
404       } else {
405         //write bin case
406         if (haveread)
407           break;
408         if(ptr->status==NOTREADY) {
409           WriteBinItem_rcr* wptr=(WriteBinItem_rcr*)ptr;
410           RESOLVE((SESEcommon *)(((INTPTR)wptr->task)&~1ULL), wptr->bitindexwr);
411           ptr->status=READY;
412           if(((INTPTR)wptr->task)&PARENTBIN) {
413             val=val->next;
414           } else
415             break;
416         }
417       }
418       ptr=ptr->next;
419     }
420     be->head=val; // release lock
421   }
422 }
423  
424 void RESOLVE(SESEcommon *record, bitvt mask) {
425   int index=-1;
426   struct rcrRecord * array=(struct rcrRecord *)(((char *)record)+record->offsetToParamRecords);
427   while(mask!=0) {
428     int shift=__builtin_ctzll(mask)+1;
429     index+=shift;
430     if (atomic_sub_and_test(1,&array[index].count)) {
431       if(unlikely(record->classID<0)) {
432         //parent stall...clear it
433         psem_give_tag(record->parentsStallSem, ((SESEstall *)record)->tag);
434         //mark the record unused
435         BARRIER();
436         record->rcrstatus=0;
437 #ifndef OOO_DISABLE_TASKMEMPOOL
438         RELEASE_REFERENCE_TO(record);
439 #endif
440       } else {
441         int flag=LOCKXCHG32(&array[index].flag,0);
442         if (flag) {
443           if(atomic_sub_and_test(1, &(record->unresolvedDependencies))) 
444             workScheduleSubmit((void *)record);
445         }
446       }
447     }
448     mask=mask>>shift;
449   }
450 }