extern sockPoolHashTable_t *transPrefetchSockPool;
extern unsigned int myIpAddr;
+extern sockPoolHashTable_t *transPResponseSocketPool;
// Function for new prefetch call
void rangePrefetch(unsigned int oid, short numoffset, short *offsets) {
int numLocal = 0;
perMcPrefetchList_t * head=NULL;
+ //printf("Inside %s()\n", __func__);
// Iterate for the object
int noffset = (int) numoffsets;
tmpobjset[l] = GET_RANGE(offsets[2*l+1]);
}
int maxChldOids = getsize(tmpobjset, sizetmpObjSet)+1;
+ //printf("%s() maxChldOids = %d\n", __func__, maxChldOids);
unsigned int chldOffstFrmBase[maxChldOids];
chldOffstFrmBase[0] = oid;
int tovisit = 0, visited = -1;
// Iterate for each element of offsets
+ //printf("%s() noffset = %d, sizetmpObjSet= %d, visited = %d, tovisit= %d\n", __func__, noffset, sizetmpObjSet, visited, tovisit);
for (j = 0; j < noffset; j++) {
// Iterate over each element to be visited
while (visited != tovisit) {
unsigned int oid = chldOffstFrmBase[visited+1];
int machinenum = lhashSearch(oid);
//TODO Group a bunch of oids to send in one prefetch request
- insertPrefetch(machinenum, oid, noffset-j, offsets, &head);
- break;
+ //printf("Oid Not Found, send Prefetch for oid = %d, noffset-j= %d j should point to the new offset = %d\n", oid, noffset-j, j);
+ insertPrefetch(machinenum, oid, noffset-j, &offsets[j], &head);
+ goto tuple;
} else {
// iterate over each offset
int retval;
return NULL;
}
}
+ //printf("%s() visited = %d, tovisit= %d\n", __func__, visited, tovisit);
visited++;
+ fflush(stdout);
}
} // end iterate for each element of offsets
int lookForObjs(int *chldOffstFrmBase, short *offsets,
int *index, int *visited, int *tovisit, int *noffset) {
+ //printf("Inside %s()\n", __func__);
objheader_t *header;
unsigned int oid = chldOffstFrmBase[*visited+1];
if((header = (objheader_t *)mhashSearch(oid))!= NULL) {
/* Check if array out of bounds */
int startindex = offsets[*index];
int range = GET_RANGE(offsets[(*index)+1]);
+ //printf("%s() Array range = %d\n", __func__, range);
if(range > 0 && range < length) {
short stride = GET_STRIDE(offsets[(*index)+1]);
stride = stride + 1; //NOTE bit pattern 000 => stride = 1, 001 => stride = 2
} else { //linked list
int startindex = offsets[*index];
int range = GET_RANGE(offsets[(*index)+1]);
+ //printf("%s() LinkedList range = %d\n", __func__, range);
unsigned int oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
+ //printf("Oid = %d\n", oid);
if (range == 0) {
chldOffstFrmBase[*tovisit+1] = oid;
if(isOidAvail(oid)) {
*visited = *visited + 1;
+ *tovisit = *tovisit + 1;
*index = *index + 2;
+ //printf("%s() Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index);
return 1;
} else {
*tovisit = *tovisit + 1;
+ //printf("%s() Not Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index);
return 1;
}
} else {
}
return 1;
}
-
}
return 1;
}
return;
}
+
+int getRangePrefetchResponse(int sd) {
+ //printf("Inside %s()\n", __func__);
+
+
+ return 0;
+}
+
+
+int rangePrefetchReq(int acceptfd) {
+ int numoffset, sd = -1;
+ unsigned int oid, mid = -1;
+ oidmidpair_t oidmid;
+ //printf("Inside %s()\n", __func__);
+
+ while (1) {
+ recv_data(acceptfd, &numoffset, sizeof(int));
+ if(numoffset == -1)
+ break;
+ recv_data(acceptfd, &oidmid, 2*sizeof(unsigned int));
+ oid = oidmid.oid;
+ if(mid != oidmid.mid) {
+ if(mid!= -1)
+ freeSockWithLock(transPResponseSocketPool, mid, sd);
+ mid = oidmid.mid;
+ sd = getSockWithLock(transPResponseSocketPool, mid);
+ }
+
+ short offsetsarry[numoffset];
+ recv_data(acceptfd, offsetsarry, numoffset*sizeof(short));
+
+ /*Process each oid */
+ objheader_t *header;
+ if((header = (objheader_t *)mhashSearch(oid)) == NULL) {
+ int size = sizeof(int) + sizeof(char) + sizeof(unsigned int);
+ char sendbuffer[size];
+ *((int *) sendbuffer) = size;
+ *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(unsigned int))) = oid;
+ char control = TRANS_PREFETCH_RESPONSE;
+ sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ break;
+ } else { //Obj found
+ int retval;
+ if((retval = processOidFound(header, offsetsarry, 0, sd)) != 0) {
+ printf("%s() Error: in processOidFound() at line %d in %s()\n",
+ __func__, __LINE__, __FILE__);
+ return -1;
+ }
+ }
+ }
+
+ //Release socket
+ if(mid!=-1)
+ freeSockWithLock(transPResponseSocketPool, mid, sd);
+
+ return 0;
+}
+
+int processOidFound(objheader_t *header, short * offsetsarry, int index, int sd) {
+ int objsize;
+ GETSIZE(objsize, header);
+ int size = sizeof(int) + sizeof(char) + sizeof(unsigned int) +
+ sizeof(objheader_t) + objsize;
+ char sendbuffer[size];
+ int incr = 0;
+ *((int *)(sendbuffer + incr)) = size;
+ incr += sizeof(int);
+ *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(sendbuffer + incr)) = OID(header);
+ incr += sizeof(unsigned int);
+ memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+
+ char control = TRANS_PREFETCH_RESPONSE;
+ sendPrefetchResponse(sd, &control, sendbuffer, &size);
+
+ if(TYPE(header) > NUMCLASSES) {
+ int elementsize = classsize[TYPE(header)];
+ struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
+ int length = ao->___length___;
+ /* Check if array out of bounds */
+ int startindex = offsetsarry[index];
+ int range = GET_RANGE(offsetsarry[index+1]);
+ //printf("%s() Array range = %d\n", __func__, range);
+ if(range > 0 && range < length) {
+ short stride = GET_STRIDE(offsets[index+1]);
+ stride = stride + 1; //NOTE bit pattern 000 => stride = 1, 001 => stride = 2
+ int i;
+ //check is stride +ve or negative
+ if(GET_STRIDEINC(offsets[index]+1)) { //-ve stride
+ for(i = startindex; i <= range+1; i = i - stride) {
+ unsigned int oid = 0;
+ if((i < 0 || i >= length)) {
+ //if yes treat the object as found
+ oid = 0;
+ continue;
+ } else {
+ // compute new object
+ oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
+ }
+ if(oid == 0)
+ goto end;
+ }
+ } else { //+ve stride
+ for(i = startindex; i <= range; i = i + stride) {
+ unsigned int oid = 0;
+ if(i < 0 || i >= length) {
+ //if yes treat the object as found
+ oid = 0;
+ continue;
+ } else {
+ // compute new object
+ oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
+ }
+ // add new object
+ chldOffstFrmBase[*tovisit] = oid;
+ *tovisit = *tovisit + 1;
+ }
+ }
+ } else if(range == 0) {
+ if(startindex >=0 || startindex < length) {
+ unsigned int oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*startindex)));
+ // add new object
+ chldOffstFrmBase[*tovisit] = oid;
+ *tovisit = *tovisit + 1;
+ }
+ }
+ *index = *index + 2;
+ } else { //linked list
+ int startindex = offsets[*index];
+ int range = GET_RANGE(offsets[(*index)+1]);
+ //printf("%s() LinkedList range = %d\n", __func__, range);
+ unsigned int oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
+ //printf("Oid = %d\n", oid);
+ if (range == 0) {
+ chldOffstFrmBase[*tovisit+1] = oid;
+ if(isOidAvail(oid)) {
+ *visited = *visited + 1;
+ *tovisit = *tovisit + 1;
+ *index = *index + 2;
+ //printf("%s() Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index);
+ return 1;
+ } else {
+ *tovisit = *tovisit + 1;
+ //printf("%s() Not Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index);
+ return 1;
+ }
+ } else {
+ int i;
+ for(i = 0; i<range; i++) {
+ chldOffstFrmBase[*tovisit+1] = oid;
+ if(isOidAvail(oid)) {
+ //get the next object
+ if((header = (objheader_t *)mhashSearch(oid))!= NULL) {
+ //Found on machine
+ ;
+ } else if((header = (objheader_t *)prehashSearch(oid))!=NULL) {
+ //Found in prefetch cache
+ ;
+ } else {
+ ;
+ }
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
+ *tovisit = *tovisit + 1;
+ *visited = *visited + 1;
+ } else {
+ //update range
+ offsets[(*index)+1]= (offsets[(*index)+1] & 0x0fff) - 1;
+ *tovisit = *tovisit + 1;
+ return 1;
+ }
+ }
+ return 1;
+ }
+ }
+end:
+ ;
+ return 0;
+}