initial commit of new memory queue. still just working with fine-grain cases.
[IRC.git] / Robust / src / Runtime / mlp_runtime.c
1 #include "runtime.h"
2
3 #include <stdlib.h>
4 #include <stdio.h>
5 #include <assert.h>
6
7 #include "mem.h"
8 #include "mlp_runtime.h"
9 #include "workschedule.h"
10
11
12 /*
13 __thread struct Queue* seseCallStack;
14 __thread pthread_once_t mlpOnceObj = PTHREAD_ONCE_INIT;
15 void mlpInitOncePerThread() {
16   seseCallStack = createQueue();
17 }
18 */
19 __thread SESEcommon_p seseCaller;
20
21
22 void* mlpAllocSESErecord( int size ) {
23   void* newrec = RUNMALLOC( size );  
24   if( newrec == 0 ) {
25     printf( "mlpAllocSESErecord did not obtain memory!\n" );
26     exit( -1 );
27   }
28   return newrec;
29 }
30
31
32 void mlpFreeSESErecord( void* seseRecord ) {
33   RUNFREE( seseRecord );
34 }
35
36 MemoryQueue** mlpCreateMemoryQueueArray(int numMemoryQueue){
37   int i;
38   MemoryQueue** newMemoryQueue=(MemoryQueue**)RUNMALLOC( sizeof( MemoryQueue* ) * numMemoryQueue );
39   for(i=0; i<numMemoryQueue; i++){
40     newMemoryQueue[i]=createMemoryQueue();
41   }
42   return newMemoryQueue;
43 }
44
45 REntry* mlpCreateREntryArray(){
46   REntry* newREntryArray=(REntry*)RUNMALLOC(sizeof(REntry)*NUMRENTRY);
47   return newREntryArray;
48 }
49
50 REntry* mlpCreateREntry(int type, void* seseToIssue, void* dynID){
51   REntry* newREntry=(REntry*)RUNMALLOC(sizeof(REntry));
52   newREntry->type=type;
53   newREntry->seseRec=seseToIssue;
54   newREntry->dynID=dynID; 
55   return newREntry;
56 }
57
58 int isParent(REntry *r) {
59   if (r->type==PARENTREAD || r->type==PARENTWRITE) {
60     return TRUE;
61   } else {
62     return FALSE;
63   }
64 }
65
66 int isParentCoarse(REntry *r){
67   if (r->type==PARENTCOARSE){
68     return TRUE;
69   }else{
70     return FALSE;
71   }
72 }
73
74 int isFineRead(REntry *r) {
75   if (r->type==READ || r->type==PARENTREAD) {
76     return TRUE;
77   } else {
78     return FALSE;
79   }
80 }
81
82 int isFineWrite(REntry *r) {
83   if (r->type==WRITE || r->type==PARENTWRITE) {
84     return TRUE;
85   } else {
86     return FALSE;
87   }
88 }
89
90 int isCoarse(REntry *r){
91   if(r->type==COARSE || r->type==PARENTCOARSE){
92     return TRUE;
93   } else {
94     return FALSE;
95   }
96 }
97
98 int isSCC(REntry *r){
99   if(r->type==SCCITEM){
100     return TRUE;
101   } else {
102     return FALSE;
103   }
104 }
105
106 int isSingleItem(MemoryQueueItem *qItem){
107   if(qItem->type==SINGLEITEM){
108     return TRUE;
109   } else {
110     return FALSE;
111   }
112 }
113
114 int isHashtable(MemoryQueueItem *qItem){
115   if(qItem->type==HASHTABLE){
116     return TRUE;
117   } else {
118     return FALSE;
119   }
120 }
121
122 int isVector(MemoryQueueItem *qItem){
123   if(qItem->type==VECTOR){
124     return TRUE;
125   } else {
126     return FALSE;
127   }
128 }
129
130 int isReadBinItem(BinItem* b){
131   if(b->type==READBIN){
132     return TRUE;
133   }else{
134     return FALSE;
135   }
136 }
137
138 int isWriteBinItem(BinItem* b){
139   if(b->type==WRITEBIN){
140     return TRUE;
141   }else{
142     return FALSE;
143   }
144 }
145
146 int generateKey(unsigned int data){
147   return (data&H_MASK)>> 4;
148 }
149
150 Hashtable* createHashtable(){
151   int i=0;
152   Hashtable* newTable=(Hashtable*)RUNMALLOC(sizeof(Hashtable));
153   //newTable->array=(BinElement*)RUNMALLOC(sizeof(BinElement)*NUMBINS);
154   for(i=0;i<NUMBINS;i++){
155     newTable->array[i]=(BinElement*)RUNMALLOC(sizeof(BinElement));
156     newTable->array[i]->head=NULL;
157     newTable->array[i]->tail=NULL;
158   }
159   return newTable;
160 }
161
162 WriteBinItem* createWriteBinItem(){
163   WriteBinItem* binitem=(WriteBinItem*)RUNMALLOC(sizeof(WriteBinItem));
164   binitem->item.type=WRITEBIN;
165   return binitem;
166 }
167
168 ReadBinItem* createReadBinItem(){
169   ReadBinItem* binitem=(ReadBinItem*)RUNMALLOC(sizeof(ReadBinItem));
170   binitem->array=(REntry*)RUNMALLOC(sizeof(REntry*)*NUMREAD);
171   binitem->index=0;
172   binitem->item.type=READBIN;
173   return binitem;
174 }
175
176 Vector* createVector(){
177   Vector* vector=(Vector*)RUNMALLOC(sizeof(Vector));
178   vector->array=(REntry*)RUNMALLOC(sizeof(REntry*)*NUMITEMS);
179   vector->index=0;
180   return vector;
181 }
182
183 SCC* createSCC(){
184   SCC* scc=(SCC*)RUNMALLOC(sizeof(SCC));
185   return scc;
186 }
187
188 MemoryQueue* createMemoryQueue(){
189   MemoryQueue* queue = (MemoryQueue*)RUNMALLOC(sizeof(MemoryQueue));
190
191   MemoryQueueItem* dummy=(MemoryQueueItem*)RUNMALLOC(sizeof(MemoryQueueItem));
192   dummy->type=3;
193   dummy->total=0;
194   dummy->status=READY;
195   queue->head = dummy;
196   queue->tail = dummy;
197   return queue;
198 }
199
200 int ADDRENTRY(MemoryQueue * q, REntry * r) {
201   if (isFineRead(r) || isFineWrite(r)) { 
202     return ADDTABLE(q, r);
203   } else if (isCoarse(r)) {
204     return ADDVECTOR(q, r);
205   } else if (isSCC(r)) {
206     return ADDSCC(q, r);
207   }
208 }
209
210 int ADDTABLE(MemoryQueue *q, REntry *r) {
211   if(!isHashtable(q->tail)) {
212     //Fast Case
213     MemoryQueueItem* tail=q->tail;
214     if (isParent(r) && tail->total==0 && q->tail==q->head) {
215       return READY;
216     }
217
218     //Add table
219     Hashtable* h=createHashtable();
220     tail->next=(MemoryQueueItem*)h;
221     //************NEED memory barrier here to ensure compiler does not cache Q.tail.status********
222     if (BARRIER() && tail->status==READY && tail->total==0 && q->tail==q->head) { 
223       //previous Q item is finished
224       h->item.status=READY;
225     }
226     q->tail=(MemoryQueueItem*)h;
227   }
228
229   //at this point, have table
230   Hashtable* table=(Hashtable*)q->tail;
231   BinItem * val;
232   int key=generateKey((unsigned int)r->dynID);
233   do {  
234     val=(BinItem*)0x1;       
235     BinElement* bin=table->array[key];
236     val=(BinItem*)LOCKXCHG((unsigned int*)&(bin->head), (unsigned int)val);//note...talk to me about optimizations here. 
237   } while(val==(BinItem*)0x1);
238   //at this point have locked bin
239   if (val==NULL) {
240     return EMPTYBINCASE(table, table->array[key], r);
241   } else {
242     if (isFineWrite(r)) {
243       WRITEBINCASE(table, r, val);
244       return NOTREADY;
245     } else if (isFineRead(r)) {
246       return READBINCASE(table, r, val);
247     }
248   }
249 }
250
251 int EMPTYBINCASE(Hashtable *T, BinElement* be, REntry *r) {
252   int retval;
253   BinItem* b;
254   if (isFineWrite(r)) {
255     b=(BinItem*)createWriteBinItem();
256     ((WriteBinItem*)b)->val=r;//<-only different statement
257   } else if (isFineRead(r)) {
258     b=(BinItem*)createReadBinItem();
259     ReadBinItem* readbin=(ReadBinItem*)b;
260     readbin->array[readbin->index++]=*r;
261   }
262   b->total=1;
263
264   if (T->item.status==READY) { 
265     //current entry is ready
266     b->status=READY;
267     retval=READY;
268     if (isParent(r)) {
269       be->head=NULL; // released lock
270       return retval;
271     }
272   } else {
273     b->status=NOTREADY;
274     retval=NOTREADY;
275   }
276
277   atomic_inc(&T->item.total);
278   r->hashtable=T;
279   r->binitem=b;
280   be->tail=b;
281   be->head=b;//released lock
282   return retval;
283 }
284
285 int WRITEBINCASE(Hashtable *T, REntry *r, BinItem *val) {
286   //chain of bins exists => tail is valid
287   //if there is something in front of us, then we are not ready
288
289   int key=generateKey((unsigned int)r->dynID);
290   BinElement* be=T->array[key];
291
292   BinItem *bintail=be->tail;
293   WriteBinItem *b=createWriteBinItem();
294   b->val=r;
295   b->item.total=1;
296   
297   atomic_inc(&T->item.total);
298
299   r->hashtable=T;
300   r->binitem=(BinItem*)b;
301
302   be->tail->next=(BinItem*)b;
303   be->tail=(BinItem*)b;
304   be->head=val;
305 }
306
307 READBINCASE(Hashtable *T, REntry *r, BinItem *val) {
308   int key=generateKey((unsigned int)r->dynID);
309   BinItem * bintail=T->array[key]->tail;
310   if (isReadBinItem(bintail)) {
311     return TAILREADCASE(T, r, val, bintail);
312   } else if (!isReadBinItem(bintail)) {
313     TAILWRITECASE(T, r, val, bintail);
314     return NOTREADY;
315   }
316 }
317
318 int TAILREADCASE(Hashtable *T, REntry *r, BinItem *val, BinItem *bintail) {
319   int key=generateKey((unsigned int)r->dynID);
320   ReadBinItem * readbintail=(ReadBinItem*)T->array[key]->tail;
321   int status, retval;
322   if (readbintail->item.status=READY) { 
323     status=READY;
324     retval=READY;
325     if (isParent(r)) {
326       T->array[key]->head=val;//released lock
327       return READY;
328     }
329   } else {
330     status=NOTREADY;
331     retval=NOTREADY;
332   }
333
334   if (readbintail->index==NUMREAD) { // create new read group
335     ReadBinItem* rb=createReadBinItem();
336     rb->array[rb->index++]=*r;
337     rb->item.total=1;//safe only because item could not have started
338     rb->item.status=status;
339     T->array[key]->tail->next=(BinItem*)rb;
340     T->array[key]->tail=(BinItem*)rb;
341     r->binitem=(BinItem*)rb;
342   } else { // group into old tail
343     readbintail->array[readbintail->index++]=*r;
344     atomic_inc(&readbintail->item.total);
345     r->binitem=(BinItem*)readbintail;
346   }
347   atomic_inc(&T->item.total);
348   r->hashtable=T;
349   T->array[key]->head=val;//released lock
350   return retval;
351 }
352
353 TAILWRITECASE(Hashtable *T, REntry *r, BinItem *val, BinItem *bintail) {
354   int key=generateKey((unsigned int)r->dynID);
355   WriteBinItem* wb=createWriteBinItem();
356   wb->val=r;
357   wb->item.total=1;
358   wb->item.status=NOTREADY;
359   //  rb->array[rb->index++]=*r;
360   //rb->item.total=1;//safe because item could not have started
361   //rb->item.status=NOTREADY;
362   atomic_inc(&T->item.total);
363   r->hashtable=T;
364   r->binitem=(BinItem*)wb;
365   T->array[key]->tail->next=(BinItem*)wb;
366   T->array[key]->tail=(BinItem*)wb;
367   T->array[key]->head=val;//released lock
368 }
369
370 ADDVECTOR(MemoryQueue *Q, REntry *r) {
371   if(!isVector(Q->tail)) {
372     //Fast Case
373     if (isParentCoarse(r) && Q->tail->total==0 && Q->tail==Q->head) { 
374       return READY;
375     }
376
377     //added vector
378     Vector* V=createVector();
379     Q->tail->next=(MemoryQueueItem*)V;     
380     //************NEED memory barrier here to ensure compiler does not cache Q.tail.status******
381     if (BARRIER() && Q->tail->status==READY&&Q->tail->total==0) { 
382       //previous Q item is finished
383       V->item.status=READY;
384     }
385     Q->tail=(MemoryQueueItem*)V;
386   }
387   //at this point, have vector
388   Vector* V=(Vector*)Q->tail;
389   if (V->index==NUMITEMS) {
390     //vector is full
391     //added vector
392     V=createVector();
393     V->item.status=NOTREADY;
394     Q->tail->next=(MemoryQueueItem*)V;
395     //***NEED memory barrier here to ensure compiler does not cache Q.tail.status******
396     if (BARRIER() && Q->tail->status==READY) { 
397       V->item.status=READY;
398     }
399     Q->tail=(MemoryQueueItem*)V;
400   }
401
402   atomic_inc(&V->item.total);
403   //expose entry
404   int index=V->index;
405   V->array[index]=*r;
406   //*****NEED memory barrier here to ensure compiler does not reorder writes to V.array and V.index
407   BARRIER();
408   V->index++;
409   //*****NEED memory barrier here to ensure compiler does not cache V.status*********
410   if (BARRIER() && V->item.status==READY) {
411     void* flag=NULL;
412     LOCKXCHG((unsigned int*)&(V->array[index]), (unsigned int)flag); 
413     if (flag!=NULL) {
414       if (isParent(r)) { //parent's retire immediately
415         atomic_dec(&V->item.total);
416       }
417       return READY;
418     } else {
419       return NOTREADY;//<- means that some other dispatcher got this one...so need to do accounting correctly
420     }
421   } else {
422     return NOTREADY;
423   }
424 }
425
426
427 //SCC's don't come in parent variety
428 ADDSCC(MemoryQueue *Q, REntry *r) {
429   //added SCC
430   SCC* S=createSCC();
431   S->item.total=1; 
432   S->val=r;
433   Q->tail->next=(MemoryQueueItem*)S;
434   //*** NEED BARRIER HERE
435   if (BARRIER() && Q->tail->status==READY && Q->tail->total==0 && Q->tail==Q->head) {
436     //previous Q item is finished
437     S->item.status=READY;
438     Q->tail=(MemoryQueueItem*)S;
439     void* flag=NULL;
440     LOCKXCHG((int*)S->val, (int)flag);
441     if (flag!=NULL) {
442       return READY;
443     } else {
444       return NOTREADY;//<- means that some other dispatcher got this one...so need to do accounting correctly
445     }
446   } else {
447     Q->tail=(MemoryQueueItem*)S;
448     return NOTREADY;
449   }
450 }
451
452
453 void RETIRERENTRY(MemoryQueue* Q, REntry * r) {
454   if (isFineWrite(r)||isFineRead(r)) {
455     RETIREHASHTABLE(Q, r);
456   } else if (isCoarse(r)) {
457     RETIREVECTOR(Q, r);
458   } else if (isSCC(r)) {
459     RETIRESCC(Q, r);
460   }
461 }
462
463 RETIRESCC(MemoryQueue *Q, REntry *r) {
464   SCC* s=r->scc;
465   s->item.total=0;//don't need atomicdec
466   RESOLVECHAIN(Q);
467 }
468
469
470 RETIREHASHTABLE(MemoryQueue *q, REntry *r) {
471   Hashtable *T=r->hashtable;
472   BinItem *b=r->binitem;
473   RETIREBIN(T,r,b);
474   atomic_dec(&T->item.total);
475   if (T->item.next!=NULL && T->item.total==0) { 
476     RESOLVECHAIN(q);
477   }
478 }
479
480 RETIREBIN(Hashtable *T, REntry *r, BinItem *b) {
481   int key=generateKey((unsigned int)r->dynID);
482   if(isFineRead(r)) {
483     atomic_dec(&b->total);
484   }
485   if (isFineWrite(r) || (isFineRead(r) && b->next!=NULL && b->total==0)) {
486     // CHECK FIRST IF next is nonnull to guarantee that b.total cannot change
487     BinItem * val;
488     do {  
489       val=(BinItem*)1;
490       val=(BinItem*)LOCKXCHG((unsigned int*)&(T->array[key]->head), (unsigned int)val);
491     } while(val==(BinItem*)1);
492     // at this point have locked bin
493     BinItem *ptr=val;
494     int haveread=FALSE;
495
496     int i;
497     while (ptr!=NULL) {
498       if (isReadBinItem(ptr)) {
499         ReadBinItem* rptr=(ReadBinItem*)ptr;
500         if (rptr->item.status==NOTREADY) {
501           for (i=0;i<rptr->index;i++) {     
502             resolveDependencies(rptr->array[i]);
503             // XXXXX atomicdec(rptr->array[i].dependenciesCount);
504             if (isParent(&rptr->array[i])) {
505               //parents go immediately
506               atomic_dec(&rptr->item.total);
507               atomic_dec(&T->item.total);
508             }
509           }
510         }
511         rptr->item.status=READY; 
512         if (rptr->item.next==NULL) {
513           break;
514         }
515         if (rptr->item.total!=0) {
516           haveread=TRUE; 
517         } else if ((BinItem*)rptr==val) {
518           val=val->next;
519         }
520       } else if(isWriteBinItem(ptr)) {
521         if (haveread)  
522           break;
523         if(ptr->status==NOTREADY){
524           resolveDependencies(((WriteBinItem*)ptr)->val);
525           ptr->status=READY;
526           if(isParent(((WriteBinItem*)ptr)->val)){
527             atomic_dec(&T->item.total);
528             val=val->next;        
529           }else
530             break;
531         }else{ // write bin is already resolved
532           val=val->next;
533         }
534         /*
535         if(ptr->status==NOTREADY) {           
536           ptr->status=READY; 
537           resolveDependencies(((WriteBinItem*)ptr)->val);
538           if (isParent(((WriteBinItem*)ptr)->val)) {  
539             atomic_dec(&T->item.total);
540             //val=val->next;
541             val=ptr->next;
542           } else
543             break;
544         } 
545         */
546       } 
547       ptr=ptr->next;
548     }
549     T->array[key]->head=val; // release lock
550   }
551 }
552
553
554 RETIREVECTOR(MemoryQueue *Q, REntry *r) {
555   Vector* V=r->vector;
556   atomic_dec(&V->item.total);
557
558   if (V->item.next!=NULL && V->item.total==0) { //NOTE: ORDERING CRUCIAL HERE
559     RESOLVECHAIN(Q);
560   }
561 }
562
563 RESOLVECHAIN(MemoryQueue *Q) {
564   while(TRUE) {
565     MemoryQueueItem* head=Q->head;
566     if (head->next==NULL||head->total!=0) { 
567       //item is not finished
568       if (head->status!=READY) {  
569         //need to update status
570         head->status=READY;
571         if (isHashtable(head)) {
572           RESOLVEHASHTABLE(Q, head);
573         } else if (isVector(head)) {
574           RESOLVEVECTOR(Q, head);
575         } else if (isSingleItem(head)) {
576           RESOLVESCC(head);
577         }
578         if (head->next==NULL)
579           break;
580         if (head->total!=0)
581           break;
582       } else
583         break;
584     }
585     MemoryQueueItem* nextitem=head->next;
586     CAS((int*)Q->head, (int)head, (int)nextitem);
587     //oldvalue not needed...  if we fail we just repeat
588   }
589 }
590
591
592 RESOLVEHASHTABLE(MemoryQueue *Q, Hashtable *T) {  
593   int binidx;
594   for (binidx=0;binidx<NUMBINS;binidx++) {    
595     BinElement* bin=T->array[binidx];
596     BinItem* val;
597     do {
598       val=(BinItem*)1;
599       LOCKXCHG((int*)bin->head, (int)val);
600     } while (val==(BinItem*)1);
601     //at this point have locked bin    
602     int haveread=FALSE; 
603     BinItem* ptr=val;
604     if(ptr!=NULL&&ptr->status==NOTREADY) {
605       do {
606         if (isWriteBinItem(ptr)) {
607           if (haveread)
608             break;        
609           resolveDependencies(((WriteBinItem*)ptr)->val);
610           // XXXXX atomic_dec(ptr.val.dependenciesCount);
611           ptr->status=READY;  
612           if (isParent(((WriteBinItem*)ptr)->val)) {
613             atomic_dec(&T->item.total);
614             val=val->next;
615           } else
616             break;
617         } else if (isReadBinItem(ptr)) {
618           int i;
619           ReadBinItem* rptr=(ReadBinItem*)ptr;
620           for(i=0;i<rptr->index;i++) {
621             resolveDependencies(rptr->array[i]);
622             // XXXXX atomicdec(ptr.array[i].dependenciesCount);
623             if (isParent(&rptr->array[i])) {
624               atomic_dec(&rptr->item.total);
625               atomic_dec(&T->item.total);
626             }
627           }
628           if (rptr->item.next==NULL||rptr->item.total!=0) {
629             haveread=TRUE;
630           } else if((BinItem*)rptr==val) {
631             val=val->next;
632           }
633           rptr->item.status=READY; { 
634           }
635           ptr=ptr->next;
636         } 
637       }while(ptr!=NULL);   
638     }
639     bin->head=val; // released lock;
640   }
641 }
642
643 RESOLVEVECTOR(MemoryQueue *q, Vector *V) {
644   int i;
645   Vector* tmp=V;
646   //handle ready cases
647   while(TRUE) {
648     //enqueue everything
649     for (i=0;i<NUMITEMS;i++) {
650       void* val=NULL;
651       LOCKXCHG((int*)&tmp->array[i], (int)val); 
652       if (val!=NULL) { 
653         // XXXXX atomicdec(val.dependenciesCount);
654         if (isParent(val)) {
655           atomic_dec(&tmp->item.total);
656         }
657       }
658     }
659     if (tmp->item.next!=NULL&&isVector(tmp->item.next)) {
660       tmp=(Vector*)tmp->item.next;
661     } else {
662       break;
663     }
664   }
665 }
666
667 RESOLVESCC(SCC *S) {
668   //precondition: SCC's state is READY
669   void* flag=NULL;
670   LOCKXCHG((int*)S->val, (int)flag); 
671   if (flag!=NULL) {
672     // XXXXX atomicdec(flag.dependenciesCount);
673   }
674 }
675
676
677 resolveDependencies(REntry* rentry){
678   SESEcommon* seseCommon=(SESEcommon*)rentry->seseRec;
679   if(rentry->type==0 || rentry->type==1){    
680     if( atomic_sub_and_test(1, &(seseCommon->unresolvedDependencies)) ){
681       workScheduleSubmit(seseCommon);
682     }   
683   }else if(rentry->type==2 || rentry->type==3){
684     pthread_cond_signal(&(rentry->stallDone));
685   }
686 }