extern pthread_mutex_t prefetchcache_mutex;
extern prehashtable_t pflookup;
-//#define LOGTIMES
-#ifdef LOGTIMES
-extern char bigarray1[6*1024*1024];
-extern unsigned int bigarray2[6*1024*1024];
-extern unsigned int bigarray3[6*1024*1024];
-extern long long bigarray4[6*1024*1024];
-extern int bigarray5[6*1024*1024];
-extern int bigindex1;
-#define LOGTIME(x,y,z,a,b) {\
- int tmp=bigindex1; \
- bigarray1[tmp]=x; \
- bigarray2[tmp]=y; \
- bigarray3[tmp]=z; \
- bigarray4[tmp]=a; \
- bigarray5[tmp]=b; \
- bigindex1++; \
-}
-#else
-#define LOGTIME(x,y,z,a,b)
-//log(eventname, oid, type, time, unqiue id)
-#endif
-
// Function for new prefetch call
void rangePrefetch(unsigned int oid, short numoffset, short *offsets) {
/* Allocate memory in prefetch queue and push the block there */
((unsigned int *)node)[0] = oid;
index = index + (sizeof(unsigned int));
*((short *)(node+index)) = numoffset;
- LOGTIME('R',oid,numoffset,0,0);
index = index + (sizeof(short));
memcpy(node+index, offsets, numoffset * sizeof(short));
movehead(qnodesize);
int count = numavailable();
/* Check tuples if they are found locally */
- LOGTIME('Z',0,0,0,0);
perMcPrefetchList_t* pilehead = processLocal(node,count);
- LOGTIME('Y',0,0,0,0);
if (pilehead!=NULL) {
}
perMcPrefetchList_t *processLocal(char *ptr, int numprefetches) {
- int i,j;
+ int j;
/* Initialize */
perMcPrefetchList_t *head = NULL;
int countInvalidObj=-1;
objheader_t * header = searchObjInv(oid, top, &countInvalidObj);
- //printf("%u %x\n", oid, header);
if (header==NULL) {
- LOGTIME('b',oid,0,0,countInvalidObj);
//forward prefetch
- int machinenum = lhashSearch(oid);
- insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
+ if(oid!=0) {
+ int machinenum = lhashSearch(oid);
+ insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
+ }
+ //update ptr
+ ptr=((char *)&offsetarray[numoffset])+sizeof(int);
continue;
}
dfsList[0]=oid;
dfsList[1]=0;
- LOGTIME('B',oid,TYPE(header),0,countInvalidObj);
//Start searching the dfsList
for(top=0; top>=0;) {
oid=getNextOid(header, offsetarray, dfsList, top, &countInvalidObj);
- LOGTIME('O',oid,0,0,countInvalidObj);
- int isLastOffset=0;
if (oid&1) {
int oldisField=TYPE(header) < NUMCLASSES;
top+=2;
dfsList[top+1]=0;
header=searchObjInv(oid, top, &countInvalidObj);
if (header==NULL) {
- LOGTIME('c',oid,top,0,countInvalidObj);
//forward prefetch
int machinenum = lhashSearch(oid);
- if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1])))
+ if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1]))) {
insertPrefetch(machinenum, oid, 2+numoffset-top, &offsetarray[top-2], &head);
- else
+ } else {
insertPrefetch(machinenum, oid, numoffset-top, &offsetarray[top], &head);
+ }
} else if (top<offstop) {
- LOGTIME('C',oid,TYPE(header),0,top);
//okay to continue going down
continue;
}
} else if (oid==2) {
- LOGTIME('D',oid,0,0,top);
//send prefetch first
int objindex=top+2;
int machinenum = lhashSearch(dfsList[objindex]);
do {
top-=2;
if (top<0) {
- return head;
+ goto tuple;
+ //return head;
}
} while(dfsList[top+1] == GET_RANGE(offsetarray[top + 3]));
- //we backtracked past the invalid obj...set out countInvalidObj=-1
- if (top<countInvalidObj)
- countInvalidObj=-1;
+ //we backtracked past the invalid obj...set out countInvalidObj=-1
+ if (top<countInvalidObj)
+ countInvalidObj=-1;
header=searchObjInv(dfsList[top], top, &countInvalidObj);
//header shouldn't be null unless the object moves away, but allow
//ourselves the option to just continue on if we lose the object
} while(header==NULL);
- LOGTIME('F',OID(header),TYPE(header),0,top);
//increment
dfsList[top+1]++;
}
+tuple:
+ //update ptr
+ ptr=((char *)&offsetarray[numoffset])+sizeof(int);
}
return head;
}
-#define PBUFFERSIZE 16384
+//#define PBUFFERSIZE 16384
+#define PBUFFERSIZE 2048
//#define PBUFFERSIZE 8192 //Used only for Moldyn benchmark
-perMcPrefetchList_t *processRemote(unsigned int oid, short * offsetarray, int sd, short numoffset) {
+perMcPrefetchList_t *processRemote(unsigned int oid, short * offsetarray, int sd, short numoffset, unsigned int mid) {
int top;
unsigned int dfsList[numoffset];
char buffer[PBUFFERSIZE];
int offstop=numoffset-2;
if (header==NULL) {
- LOGTIME('g',oid,0,0,0);
//forward prefetch
//int machinenum = lhashSearch(oid);
//insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
dfsList[0]=oid;
dfsList[1]=0;
- LOGTIME('G',OID(header),TYPE(header),0, 0);
//Start searching the dfsList
for(top=0; top>=0;) {
dfsList[top+1]=0;
header=searchObj(oid);
if (header==NULL) {
- LOGTIME('h',oid,top,0,0);
//forward prefetch
//int machinenum = lhashSearch(oid);
- //if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1])))
+ if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1]))) {
// insertPrefetch(machinenum, oid, 2+numoffset-top, &offsetarray[top-2], &head);
- //else
+ } else {
// insertPrefetch(machinenum, oid, numoffset-top, &offsetarray[top], &head);
+ }
} else {
sendOidFound(header, oid, sd, buffer, &bufoffset);
- LOGTIME('H',oid,TYPE(header),0,top);
if (top<offstop)
//okay to continue going down
continue;
}
} else if (oid==2) {
- LOGTIME('I',oid,top,0,0);
//send prefetch first
int objindex=top+2;
//int machinenum = lhashSearch(dfsList[objindex]);
//header shouldn't be null unless the object moves away, but allow
//ourselves the option to just continue on if we lose the object
} while(header==NULL);
- LOGTIME('K',OID(header),TYPE(header),0,top);
//increment
dfsList[top+1]++;
}
short offsetsarry[numoffset];
recv_data_buf(acceptfd, readbuffer, offsetsarry, numoffset*sizeof(short));
- perMcPrefetchList_t * pilehead=processRemote(baseoid, offsetsarry, sd, numoffset);
+ perMcPrefetchList_t * pilehead=processRemote(baseoid, offsetsarry, sd, numoffset, mid);
if (pilehead!= NULL) {
perMcPrefetchList_t *ptr = pilehead;
}
memcpy(sendbuffer + incr, header, size);
- if ((*bufoffset)==0)
+ if ((*bufoffset)==0) {
send_data(sd, sendbuffer, size+incr);
+ }
return 0;
}
void rangePrefetch(unsigned int, short, short *);
void *transPrefetchNew();
perMcPrefetchList_t* processLocal(char *ptr, int);
-perMcPrefetchList_t *processRemote(unsigned int oid, short * offsetarray, int sd, short numoffset);
+//perMcPrefetchList_t *processRemote(unsigned int oid, short * offsetarray, int sd, short numoffset);
+perMcPrefetchList_t *processRemote(unsigned int oid, short * offsetarray, int sd, short numoffset, unsigned int mid);
void insertPrefetch(int, unsigned int, short, short*, perMcPrefetchList_t **);
/******** Sending and Receiving Prefetches *******/
int rangePrefetchReq(int acceptfd, struct readstruct * readbuffer);
int processOidFound(objheader_t *, short *, int, int, int);
int getRangePrefetchResponse(int sd, struct readstruct *);
-//INLINE objheader_t *searchObj(unsigned int);
INLINE objheader_t *searchObj(unsigned int);
INLINE objheader_t *searchObjInv(unsigned int, int, int*);
/*********** Functions for computation at the participant end **********/
-//unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int *dfsList, int top);
unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int *dfsList, int top, int *);
int sendOidFound(objheader_t *, unsigned int, int, char *buffer, int *bufoffset);
int sendOidNotFound(unsigned int oid, int sd);