8 #include "mlp_runtime.h"
9 #include "workschedule.h"
13 __thread struct Queue* seseCallStack;
14 __thread pthread_once_t mlpOnceObj = PTHREAD_ONCE_INIT;
15 void mlpInitOncePerThread() {
16 seseCallStack = createQueue();
19 __thread SESEcommon_p seseCaller;
22 void* mlpAllocSESErecord( int size ) {
23 void* newrec = RUNMALLOC( size );
25 printf( "mlpAllocSESErecord did not obtain memory!\n" );
32 void mlpFreeSESErecord( void* seseRecord ) {
33 RUNFREE( seseRecord );
36 MemoryQueue** mlpCreateMemoryQueueArray(int numMemoryQueue){
38 MemoryQueue** newMemoryQueue=(MemoryQueue**)RUNMALLOC( sizeof( MemoryQueue* ) * numMemoryQueue );
39 for(i=0; i<numMemoryQueue; i++){
40 newMemoryQueue[i]=createMemoryQueue();
42 return newMemoryQueue;
45 REntry* mlpCreateREntryArray(){
46 REntry* newREntryArray=(REntry*)RUNMALLOC(sizeof(REntry)*NUMRENTRY);
47 return newREntryArray;
50 REntry* mlpCreateREntry(int type, void* seseToIssue, void* dynID){
51 REntry* newREntry=(REntry*)RUNMALLOC(sizeof(REntry));
53 newREntry->seseRec=seseToIssue;
54 newREntry->dynID=dynID;
58 int isParent(REntry *r) {
59 if (r->type==PARENTREAD || r->type==PARENTWRITE) {
66 int isParentCoarse(REntry *r){
67 if (r->type==PARENTCOARSE){
74 int isFineRead(REntry *r) {
75 if (r->type==READ || r->type==PARENTREAD) {
82 int isFineWrite(REntry *r) {
83 if (r->type==WRITE || r->type==PARENTWRITE) {
90 int isCoarse(REntry *r){
91 if(r->type==COARSE || r->type==PARENTCOARSE){
106 int isSingleItem(MemoryQueueItem *qItem){
107 if(qItem->type==SINGLEITEM){
114 int isHashtable(MemoryQueueItem *qItem){
115 if(qItem->type==HASHTABLE){
122 int isVector(MemoryQueueItem *qItem){
123 if(qItem->type==VECTOR){
130 int isReadBinItem(BinItem* b){
131 if(b->type==READBIN){
138 int isWriteBinItem(BinItem* b){
139 if(b->type==WRITEBIN){
146 int generateKey(unsigned int data){
147 return (data&H_MASK)>> 4;
150 Hashtable* createHashtable(){
152 Hashtable* newTable=(Hashtable*)RUNMALLOC(sizeof(Hashtable));
153 //newTable->array=(BinElement*)RUNMALLOC(sizeof(BinElement)*NUMBINS);
154 for(i=0;i<NUMBINS;i++){
155 newTable->array[i]=(BinElement*)RUNMALLOC(sizeof(BinElement));
156 newTable->array[i]->head=NULL;
157 newTable->array[i]->tail=NULL;
162 WriteBinItem* createWriteBinItem(){
163 WriteBinItem* binitem=(WriteBinItem*)RUNMALLOC(sizeof(WriteBinItem));
164 binitem->item.type=WRITEBIN;
168 ReadBinItem* createReadBinItem(){
169 ReadBinItem* binitem=(ReadBinItem*)RUNMALLOC(sizeof(ReadBinItem));
170 binitem->array=(REntry*)RUNMALLOC(sizeof(REntry*)*NUMREAD);
172 binitem->item.type=READBIN;
176 Vector* createVector(){
177 Vector* vector=(Vector*)RUNMALLOC(sizeof(Vector));
178 vector->array=(REntry*)RUNMALLOC(sizeof(REntry*)*NUMITEMS);
184 SCC* scc=(SCC*)RUNMALLOC(sizeof(SCC));
188 MemoryQueue* createMemoryQueue(){
189 MemoryQueue* queue = (MemoryQueue*)RUNMALLOC(sizeof(MemoryQueue));
191 MemoryQueueItem* dummy=(MemoryQueueItem*)RUNMALLOC(sizeof(MemoryQueueItem));
200 int ADDRENTRY(MemoryQueue * q, REntry * r) {
201 if (isFineRead(r) || isFineWrite(r)) {
202 return ADDTABLE(q, r);
203 } else if (isCoarse(r)) {
204 return ADDVECTOR(q, r);
205 } else if (isSCC(r)) {
210 int ADDTABLE(MemoryQueue *q, REntry *r) {
211 if(!isHashtable(q->tail)) {
213 MemoryQueueItem* tail=q->tail;
214 if (isParent(r) && tail->total==0 && q->tail==q->head) {
219 Hashtable* h=createHashtable();
220 tail->next=(MemoryQueueItem*)h;
221 //************NEED memory barrier here to ensure compiler does not cache Q.tail.status********
222 if (BARRIER() && tail->status==READY && tail->total==0 && q->tail==q->head) {
223 //previous Q item is finished
224 h->item.status=READY;
226 q->tail=(MemoryQueueItem*)h;
229 //at this point, have table
230 Hashtable* table=(Hashtable*)q->tail;
232 int key=generateKey((unsigned int)r->dynID);
235 BinElement* bin=table->array[key];
236 val=(BinItem*)LOCKXCHG((unsigned int*)&(bin->head), (unsigned int)val);//note...talk to me about optimizations here.
237 } while(val==(BinItem*)0x1);
238 //at this point have locked bin
240 return EMPTYBINCASE(table, table->array[key], r);
242 if (isFineWrite(r)) {
243 WRITEBINCASE(table, r, val);
245 } else if (isFineRead(r)) {
246 return READBINCASE(table, r, val);
251 int EMPTYBINCASE(Hashtable *T, BinElement* be, REntry *r) {
254 if (isFineWrite(r)) {
255 b=(BinItem*)createWriteBinItem();
256 ((WriteBinItem*)b)->val=r;//<-only different statement
257 } else if (isFineRead(r)) {
258 b=(BinItem*)createReadBinItem();
259 ReadBinItem* readbin=(ReadBinItem*)b;
260 readbin->array[readbin->index++]=*r;
264 if (T->item.status==READY) {
265 //current entry is ready
269 be->head=NULL; // released lock
277 atomic_inc(&T->item.total);
281 be->head=b;//released lock
285 int WRITEBINCASE(Hashtable *T, REntry *r, BinItem *val) {
286 //chain of bins exists => tail is valid
287 //if there is something in front of us, then we are not ready
289 int key=generateKey((unsigned int)r->dynID);
290 BinElement* be=T->array[key];
292 BinItem *bintail=be->tail;
293 WriteBinItem *b=createWriteBinItem();
297 atomic_inc(&T->item.total);
300 r->binitem=(BinItem*)b;
302 be->tail->next=(BinItem*)b;
303 be->tail=(BinItem*)b;
307 READBINCASE(Hashtable *T, REntry *r, BinItem *val) {
308 int key=generateKey((unsigned int)r->dynID);
309 BinItem * bintail=T->array[key]->tail;
310 if (isReadBinItem(bintail)) {
311 return TAILREADCASE(T, r, val, bintail);
312 } else if (!isReadBinItem(bintail)) {
313 TAILWRITECASE(T, r, val, bintail);
318 int TAILREADCASE(Hashtable *T, REntry *r, BinItem *val, BinItem *bintail) {
319 int key=generateKey((unsigned int)r->dynID);
320 ReadBinItem * readbintail=(ReadBinItem*)T->array[key]->tail;
322 if (readbintail->item.status=READY) {
326 T->array[key]->head=val;//released lock
334 if (readbintail->index==NUMREAD) { // create new read group
335 ReadBinItem* rb=createReadBinItem();
336 rb->array[rb->index++]=*r;
337 rb->item.total=1;//safe only because item could not have started
338 rb->item.status=status;
339 T->array[key]->tail->next=(BinItem*)rb;
340 T->array[key]->tail=(BinItem*)rb;
341 r->binitem=(BinItem*)rb;
342 } else { // group into old tail
343 readbintail->array[readbintail->index++]=*r;
344 atomic_inc(&readbintail->item.total);
345 r->binitem=(BinItem*)readbintail;
347 atomic_inc(&T->item.total);
349 T->array[key]->head=val;//released lock
353 TAILWRITECASE(Hashtable *T, REntry *r, BinItem *val, BinItem *bintail) {
354 int key=generateKey((unsigned int)r->dynID);
355 WriteBinItem* wb=createWriteBinItem();
358 wb->item.status=NOTREADY;
359 // rb->array[rb->index++]=*r;
360 //rb->item.total=1;//safe because item could not have started
361 //rb->item.status=NOTREADY;
362 atomic_inc(&T->item.total);
364 r->binitem=(BinItem*)wb;
365 T->array[key]->tail->next=(BinItem*)wb;
366 T->array[key]->tail=(BinItem*)wb;
367 T->array[key]->head=val;//released lock
370 ADDVECTOR(MemoryQueue *Q, REntry *r) {
371 if(!isVector(Q->tail)) {
373 if (isParentCoarse(r) && Q->tail->total==0 && Q->tail==Q->head) {
378 Vector* V=createVector();
379 Q->tail->next=(MemoryQueueItem*)V;
380 //************NEED memory barrier here to ensure compiler does not cache Q.tail.status******
381 if (BARRIER() && Q->tail->status==READY&&Q->tail->total==0) {
382 //previous Q item is finished
383 V->item.status=READY;
385 Q->tail=(MemoryQueueItem*)V;
387 //at this point, have vector
388 Vector* V=(Vector*)Q->tail;
389 if (V->index==NUMITEMS) {
393 V->item.status=NOTREADY;
394 Q->tail->next=(MemoryQueueItem*)V;
395 //***NEED memory barrier here to ensure compiler does not cache Q.tail.status******
396 if (BARRIER() && Q->tail->status==READY) {
397 V->item.status=READY;
399 Q->tail=(MemoryQueueItem*)V;
402 atomic_inc(&V->item.total);
406 //*****NEED memory barrier here to ensure compiler does not reorder writes to V.array and V.index
409 //*****NEED memory barrier here to ensure compiler does not cache V.status*********
410 if (BARRIER() && V->item.status==READY) {
412 LOCKXCHG((unsigned int*)&(V->array[index]), (unsigned int)flag);
414 if (isParent(r)) { //parent's retire immediately
415 atomic_dec(&V->item.total);
419 return NOTREADY;//<- means that some other dispatcher got this one...so need to do accounting correctly
427 //SCC's don't come in parent variety
428 ADDSCC(MemoryQueue *Q, REntry *r) {
433 Q->tail->next=(MemoryQueueItem*)S;
434 //*** NEED BARRIER HERE
435 if (BARRIER() && Q->tail->status==READY && Q->tail->total==0 && Q->tail==Q->head) {
436 //previous Q item is finished
437 S->item.status=READY;
438 Q->tail=(MemoryQueueItem*)S;
440 LOCKXCHG((int*)S->val, (int)flag);
444 return NOTREADY;//<- means that some other dispatcher got this one...so need to do accounting correctly
447 Q->tail=(MemoryQueueItem*)S;
453 void RETIRERENTRY(MemoryQueue* Q, REntry * r) {
454 if (isFineWrite(r)||isFineRead(r)) {
455 RETIREHASHTABLE(Q, r);
456 } else if (isCoarse(r)) {
458 } else if (isSCC(r)) {
463 RETIRESCC(MemoryQueue *Q, REntry *r) {
465 s->item.total=0;//don't need atomicdec
470 RETIREHASHTABLE(MemoryQueue *q, REntry *r) {
471 Hashtable *T=r->hashtable;
472 BinItem *b=r->binitem;
474 atomic_dec(&T->item.total);
475 if (T->item.next!=NULL && T->item.total==0) {
480 RETIREBIN(Hashtable *T, REntry *r, BinItem *b) {
481 int key=generateKey((unsigned int)r->dynID);
483 atomic_dec(&b->total);
485 if (isFineWrite(r) || (isFineRead(r) && b->next!=NULL && b->total==0)) {
486 // CHECK FIRST IF next is nonnull to guarantee that b.total cannot change
490 val=(BinItem*)LOCKXCHG((unsigned int*)&(T->array[key]->head), (unsigned int)val);
491 } while(val==(BinItem*)1);
492 // at this point have locked bin
498 if (isReadBinItem(ptr)) {
499 ReadBinItem* rptr=(ReadBinItem*)ptr;
500 if (rptr->item.status==NOTREADY) {
501 for (i=0;i<rptr->index;i++) {
502 resolveDependencies(rptr->array[i]);
503 // XXXXX atomicdec(rptr->array[i].dependenciesCount);
504 if (isParent(&rptr->array[i])) {
505 //parents go immediately
506 atomic_dec(&rptr->item.total);
507 atomic_dec(&T->item.total);
511 rptr->item.status=READY;
512 if (rptr->item.next==NULL) {
515 if (rptr->item.total!=0) {
517 } else if ((BinItem*)rptr==val) {
520 } else if(isWriteBinItem(ptr)) {
523 if(ptr->status==NOTREADY){
524 resolveDependencies(((WriteBinItem*)ptr)->val);
526 if(isParent(((WriteBinItem*)ptr)->val)){
527 atomic_dec(&T->item.total);
531 }else{ // write bin is already resolved
535 if(ptr->status==NOTREADY) {
537 resolveDependencies(((WriteBinItem*)ptr)->val);
538 if (isParent(((WriteBinItem*)ptr)->val)) {
539 atomic_dec(&T->item.total);
549 T->array[key]->head=val; // release lock
554 RETIREVECTOR(MemoryQueue *Q, REntry *r) {
556 atomic_dec(&V->item.total);
558 if (V->item.next!=NULL && V->item.total==0) { //NOTE: ORDERING CRUCIAL HERE
563 RESOLVECHAIN(MemoryQueue *Q) {
565 MemoryQueueItem* head=Q->head;
566 if (head->next==NULL||head->total!=0) {
567 //item is not finished
568 if (head->status!=READY) {
569 //need to update status
571 if (isHashtable(head)) {
572 RESOLVEHASHTABLE(Q, head);
573 } else if (isVector(head)) {
574 RESOLVEVECTOR(Q, head);
575 } else if (isSingleItem(head)) {
578 if (head->next==NULL)
585 MemoryQueueItem* nextitem=head->next;
586 CAS((int*)Q->head, (int)head, (int)nextitem);
587 //oldvalue not needed... if we fail we just repeat
592 RESOLVEHASHTABLE(MemoryQueue *Q, Hashtable *T) {
594 for (binidx=0;binidx<NUMBINS;binidx++) {
595 BinElement* bin=T->array[binidx];
599 LOCKXCHG((int*)bin->head, (int)val);
600 } while (val==(BinItem*)1);
601 //at this point have locked bin
604 if(ptr!=NULL&&ptr->status==NOTREADY) {
606 if (isWriteBinItem(ptr)) {
609 resolveDependencies(((WriteBinItem*)ptr)->val);
610 // XXXXX atomic_dec(ptr.val.dependenciesCount);
612 if (isParent(((WriteBinItem*)ptr)->val)) {
613 atomic_dec(&T->item.total);
617 } else if (isReadBinItem(ptr)) {
619 ReadBinItem* rptr=(ReadBinItem*)ptr;
620 for(i=0;i<rptr->index;i++) {
621 resolveDependencies(rptr->array[i]);
622 // XXXXX atomicdec(ptr.array[i].dependenciesCount);
623 if (isParent(&rptr->array[i])) {
624 atomic_dec(&rptr->item.total);
625 atomic_dec(&T->item.total);
628 if (rptr->item.next==NULL||rptr->item.total!=0) {
630 } else if((BinItem*)rptr==val) {
633 rptr->item.status=READY; {
639 bin->head=val; // released lock;
643 RESOLVEVECTOR(MemoryQueue *q, Vector *V) {
649 for (i=0;i<NUMITEMS;i++) {
651 LOCKXCHG((int*)&tmp->array[i], (int)val);
653 // XXXXX atomicdec(val.dependenciesCount);
655 atomic_dec(&tmp->item.total);
659 if (tmp->item.next!=NULL&&isVector(tmp->item.next)) {
660 tmp=(Vector*)tmp->item.next;
668 //precondition: SCC's state is READY
670 LOCKXCHG((int*)S->val, (int)flag);
672 // XXXXX atomicdec(flag.dependenciesCount);
677 resolveDependencies(REntry* rentry){
678 SESEcommon* seseCommon=(SESEcommon*)rentry->seseRec;
679 if(rentry->type==0 || rentry->type==1){
680 if( atomic_sub_and_test(1, &(seseCommon->unresolvedDependencies)) ){
681 workScheduleSubmit(seseCommon);
683 }else if(rentry->type==2 || rentry->type==3){
684 pthread_cond_signal(&(rentry->stallDone));