From: bdemsky Date: Wed, 23 Sep 2009 07:59:07 +0000 (+0000) Subject: batch the sends X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=5a0337c352c885de2ca31a5ee91a6325ce617e44;p=IRC.git batch the sends --- diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 983405db..9e80df55 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -13,6 +13,7 @@ #define GET_PTR_OID(x) ((unsigned int *)(x + 2*sizeof(int))) #define GET_PTR_EOFF(x,n) ((short *)(x + 2*sizeof(int) + (n*sizeof(unsigned int)))) #define GET_PTR_ARRYFLD(x,n) ((short *)(x + 2*sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short)))) + #define ENDEBUG(s) { printf("Inside %s()\n", s); fflush(stdout);} #define EXDEBUG(s) {printf("Outside %s()\n", s); fflush(stdout);} /***************************************** @@ -287,7 +288,7 @@ void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size); void prefetch(int, int, unsigned int *, unsigned short *, short*); void *transPrefetch(void *); void *mcqProcess(void *); -prefetchpile_t *foundLocal(char *); // returns node with prefetch elements(oids, offsets) +prefetchpile_t *foundLocal(char *, int); // returns node with prefetch elements(oids, offsets) int lookupObject(unsigned int * oid, short offset); int checkoid(unsigned int oid); int transPrefetchProcess(int **, short); diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c index 41e44033..75f79ac3 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.c +++ b/Robust/src/Runtime/DSTM/interface/queue.c @@ -83,6 +83,38 @@ void * gettail() { return memory+tailoffset+sizeof(int); } +int numavailable() { + int tmp=tailoffset; + int available=0; + if (*((int *)(memory+tmp))==-1) { + tmp=0; + } + while(tmp!=headoffset) { + available++; + tmp=tmp+*((int *)(memory+tmp)); + if (tmp>QSIZE|| (*((int *)(memory+tmp))==-1)) { + break; + } + } + return available; +} + +void incmulttail(int num) { + int i; + for(i=0;iQSIZE) + tailoffset=0; + else + tailoffset=tmpoffset; + } +} + +void resetqueue() { + headoffset=0; + tailoffset=0; +} + void inctail() { int tmpoffset=tailoffset+*((int *)(memory+tailoffset)); if (tmpoffset>QSIZE) diff --git a/Robust/src/Runtime/DSTM/interface/queue.h b/Robust/src/Runtime/DSTM/interface/queue.h index 2e1aa9ec..e284615d 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.h +++ b/Robust/src/Runtime/DSTM/interface/queue.h @@ -13,4 +13,7 @@ void movehead(int size); void * gettail(); void inctail(); void predealloc(); +int numavailable(); +void resetqueue(); +void incmulttail(int); #endif diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 51d14358..d9c35860 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -284,7 +284,8 @@ inline int findmax(int *array, int arraylength) { return max; } -#define INLINEPREFETCH 1 +#define INLINEPREFETCH 0 +#define PREFTHRESHOLD 4 /* This function is a prefetch call generated by the compiler that * populates the shared primary prefetch queue*/ @@ -293,7 +294,13 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); int len; #ifdef INLINEPREFETCH - char node[qnodesize]; + int attempted=0; + char *node; + do { + node=getmemory(qnodesize); + if (node==NULL&&attempted) + break; + if (node!=NULL) { #else char *node=getmemory(qnodesize); #endif @@ -316,25 +323,30 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short)); #ifdef INLINEPREFETCH - prefetchpile_t *pilehead = foundLocal(node); + movehead(qnodesize); + } + int numpref=numavailable(); + attempted=1; - if (pilehead!=NULL) { - // Get sock from shared pool - - /* Send Prefetch Request */ - prefetchpile_t *ptr = pilehead; - while(ptr != NULL) { - int sd = getSock2(transPrefetchSockPool, ptr->mid); - sendPrefetchReq(ptr, sd); - ptr = ptr->next; + if (node==NULL && numpref!=0 || numpref==PREFTHRESHOLD) { + node=gettail(); + prefetchpile_t *pilehead = foundLocal(node,numpref); + if (pilehead!=NULL) { + // Get sock from shared pool + + /* Send Prefetch Request */ + prefetchpile_t *ptr = pilehead; + while(ptr != NULL) { + int sd = getSock2(transPrefetchSockPool, ptr->mid); + sendPrefetchReq(ptr, sd); + ptr = ptr->next; + } + + mcdealloc(pilehead); + resetqueue(); } - - /* Release socket */ - // freeSock(transPrefetchSockPool, pilehead->mid, sd); - - /* Deallocated pilehead */ - mcdealloc(pilehead); - } + }//end do prefetch if condition + } while(node==NULL); #else /* Lock and insert into primary prefetch queue */ movehead(qnodesize); @@ -1470,47 +1482,53 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) { return 0; } -prefetchpile_t *foundLocal(char *ptr) { - int siteid = *(GET_SITEID(ptr)); - int ntuples = *(GET_NTUPLES(ptr)); - unsigned int * oidarray = GET_PTR_OID(ptr); - unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples); - short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples); +prefetchpile_t *foundLocal(char *ptr, int numprefetches) { + int i; + int j; prefetchpile_t * head=NULL; - int numLocal = 0; - int i; - for(i=0; i