more changes in rangeprefeches to handle prefetch requests at the server end
authoradash <adash>
Thu, 8 Jan 2009 07:53:18 +0000 (07:53 +0000)
committeradash <adash>
Thu, 8 Jan 2009 07:53:18 +0000 (07:53 +0000)
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/prefetch.c
Robust/src/Runtime/DSTM/interface/prefetch.h

index 69e10ca1c31e14235d23c585162416c97302ab7a..578886e1ca6777970a9b114c3ee77853bc234a7e 100644 (file)
@@ -6,6 +6,7 @@
 #include "mlookup.h"
 #include "llookup.h"
 #include "threadnotify.h"
+#include "prefetch.h"
 #ifdef COMPILER
 #include "thread.h"
 #endif
@@ -184,17 +185,31 @@ void *dstmAccept(void *acceptfd) {
       break;
 
     case TRANS_PREFETCH:
+#ifdef RANGEPREFETCH
+      if((val = rangePrefetchReq((int)acceptfd)) != 0) {
+       printf("Error: In rangePrefetchReq() %s, %d\n", __FILE__, __LINE__);
+       break;
+      }
+#else
       if((val = prefetchReq((int)acceptfd)) != 0) {
        printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
        break;
       }
+#endif
       break;
 
     case TRANS_PREFETCH_RESPONSE:
+#ifdef RANGEPREFETCH
+      if((val = getRangePrefetchResponse((int)acceptfd)) != 0) {
+       printf("Error: In getRangePrefetchRespose() %s, %d\n", __FILE__, __LINE__);
+       break;
+      }
+#else
       if((val = getPrefetchResponse((int) acceptfd)) != 0) {
        printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
        break;
       }
+#endif
       break;
 
     case START_REMOTE_THREAD:
index de2826a1b7ccec72b060a4b849c0a622b047806b..4bb74e9754dab71e2879de60ba70e51ce36bed7e 100644 (file)
@@ -4,6 +4,7 @@
 
 extern sockPoolHashTable_t *transPrefetchSockPool;
 extern unsigned int myIpAddr;
+extern sockPoolHashTable_t *transPResponseSocketPool;
 
 // Function for new prefetch call
 void rangePrefetch(unsigned int oid, short numoffset, short *offsets) {
@@ -72,6 +73,7 @@ perMcPrefetchList_t*  checkIfLocal(char *ptr) {
   int numLocal = 0;
 
   perMcPrefetchList_t * head=NULL;
+  //printf("Inside %s()\n", __func__);
 
   // Iterate for the object
   int noffset = (int) numoffsets;
@@ -82,10 +84,12 @@ perMcPrefetchList_t*  checkIfLocal(char *ptr) {
     tmpobjset[l] = GET_RANGE(offsets[2*l+1]);
   }
   int maxChldOids = getsize(tmpobjset, sizetmpObjSet)+1;
+  //printf("%s() maxChldOids = %d\n", __func__, maxChldOids);
   unsigned int chldOffstFrmBase[maxChldOids];
   chldOffstFrmBase[0] = oid;
   int tovisit = 0, visited = -1;
   // Iterate for each element of offsets
+  //printf("%s() noffset = %d, sizetmpObjSet= %d, visited = %d, tovisit= %d\n", __func__, noffset, sizetmpObjSet, visited, tovisit);
   for (j = 0; j < noffset; j++) {
     // Iterate over each element to be visited
     while (visited != tovisit) {
@@ -99,8 +103,9 @@ perMcPrefetchList_t*  checkIfLocal(char *ptr) {
        unsigned int oid = chldOffstFrmBase[visited+1];
        int machinenum = lhashSearch(oid);
        //TODO Group a bunch of oids to send in one prefetch request
-       insertPrefetch(machinenum, oid, noffset-j, offsets, &head);
-       break;
+       //printf("Oid Not Found, send Prefetch for oid = %d, noffset-j= %d j should point to the new offset = %d\n", oid, noffset-j, j);
+       insertPrefetch(machinenum, oid, noffset-j, &offsets[j], &head);
+       goto tuple;
       } else {
        // iterate over each offset
        int retval;
@@ -111,7 +116,9 @@ perMcPrefetchList_t*  checkIfLocal(char *ptr) {
          return NULL;
        }
       }
+      //printf("%s() visited = %d, tovisit= %d\n", __func__, visited, tovisit);
       visited++;
+      fflush(stdout);
     }
   } // end iterate for each element of offsets
 
@@ -140,6 +147,7 @@ int isOidAvail(unsigned int oid) {
 
 int lookForObjs(int *chldOffstFrmBase, short *offsets,
                 int *index, int *visited, int *tovisit, int *noffset) {
+  //printf("Inside %s()\n", __func__);
   objheader_t *header;
   unsigned int oid = chldOffstFrmBase[*visited+1];
   if((header = (objheader_t *)mhashSearch(oid))!= NULL) {
@@ -160,6 +168,7 @@ int lookForObjs(int *chldOffstFrmBase, short *offsets,
     /* Check if array out of bounds */
     int startindex = offsets[*index];
     int range = GET_RANGE(offsets[(*index)+1]);
+    //printf("%s() Array range = %d\n", __func__, range);
     if(range > 0 && range < length) {
       short stride = GET_STRIDE(offsets[(*index)+1]);
       stride = stride + 1; //NOTE  bit pattern 000 => stride = 1, 001 => stride = 2
@@ -208,15 +217,20 @@ int lookForObjs(int *chldOffstFrmBase, short *offsets,
   } else { //linked list
     int startindex = offsets[*index];
     int range = GET_RANGE(offsets[(*index)+1]);
+    //printf("%s() LinkedList range = %d\n", __func__, range);
     unsigned int oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
+    //printf("Oid = %d\n", oid);
     if (range == 0) {
       chldOffstFrmBase[*tovisit+1] = oid;
       if(isOidAvail(oid)) {
        *visited = *visited + 1;
+       *tovisit = *tovisit + 1;
        *index = *index + 2;
+       //printf("%s() Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index);
        return 1;
       } else {
        *tovisit = *tovisit + 1;
+       //printf("%s() Not Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index);
        return 1;
       }
     } else {
@@ -246,7 +260,6 @@ int lookForObjs(int *chldOffstFrmBase, short *offsets,
       }
       return 1;
     }
-
   }
   return 1;
 }
@@ -380,3 +393,183 @@ void sendRangePrefetchReq(perMcPrefetchList_t *mcpilenode, int sd) {
 
   return;
 }
+
+int getRangePrefetchResponse(int sd) {
+  //printf("Inside %s()\n", __func__);
+
+
+  return 0;
+}
+
+
+int rangePrefetchReq(int acceptfd) {
+  int numoffset, sd = -1;
+  unsigned int oid, mid = -1;
+  oidmidpair_t oidmid;
+  //printf("Inside %s()\n", __func__);
+
+  while (1) {
+    recv_data(acceptfd, &numoffset, sizeof(int));
+    if(numoffset == -1)
+      break;
+    recv_data(acceptfd, &oidmid, 2*sizeof(unsigned int));
+    oid = oidmid.oid;
+    if(mid != oidmid.mid) {
+      if(mid!= -1)
+       freeSockWithLock(transPResponseSocketPool, mid, sd);
+      mid = oidmid.mid;
+      sd = getSockWithLock(transPResponseSocketPool, mid);
+    }
+
+    short offsetsarry[numoffset];
+    recv_data(acceptfd, offsetsarry, numoffset*sizeof(short));
+
+    /*Process each oid */
+    objheader_t *header;
+    if((header = (objheader_t *)mhashSearch(oid)) == NULL) {
+      int size = sizeof(int) + sizeof(char) + sizeof(unsigned int);
+      char sendbuffer[size];
+      *((int *) sendbuffer) = size;
+      *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+      *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(unsigned int))) = oid;
+      char control = TRANS_PREFETCH_RESPONSE;
+      sendPrefetchResponse(sd, &control, sendbuffer, &size);
+      break;
+    } else { //Obj found
+      int retval;
+      if((retval = processOidFound(header, offsetsarry, 0, sd)) != 0) {
+       printf("%s() Error: in processOidFound() at line %d in %s()\n",
+              __func__, __LINE__, __FILE__);
+       return -1;
+      }
+    }
+  }
+
+  //Release socket
+  if(mid!=-1)
+    freeSockWithLock(transPResponseSocketPool, mid, sd);
+
+  return 0;
+}
+
+int processOidFound(objheader_t *header, short * offsetsarry, int index, int sd) {
+  int objsize;
+  GETSIZE(objsize, header);
+  int size = sizeof(int) + sizeof(char) + sizeof(unsigned int) +
+             sizeof(objheader_t) + objsize;
+  char sendbuffer[size];
+  int incr = 0;
+  *((int *)(sendbuffer + incr)) = size;
+  incr += sizeof(int);
+  *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+  incr += sizeof(char);
+  *((unsigned int *)(sendbuffer + incr)) = OID(header);
+  incr += sizeof(unsigned int);
+  memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+
+  char control = TRANS_PREFETCH_RESPONSE;
+  sendPrefetchResponse(sd, &control, sendbuffer, &size);
+
+  if(TYPE(header) > NUMCLASSES) {
+    int elementsize = classsize[TYPE(header)];
+    struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
+    int length = ao->___length___;
+    /* Check if array out of bounds */
+    int startindex = offsetsarry[index];
+    int range = GET_RANGE(offsetsarry[index+1]);
+    //printf("%s() Array range = %d\n", __func__, range);
+    if(range > 0 && range < length) {
+      short stride = GET_STRIDE(offsets[index+1]);
+      stride = stride + 1; //NOTE  bit pattern 000 => stride = 1, 001 => stride = 2
+      int i;
+      //check is stride +ve or negative
+      if(GET_STRIDEINC(offsets[index]+1)) { //-ve stride
+       for(i = startindex; i <= range+1; i = i - stride) {
+         unsigned int oid = 0;
+         if((i < 0 || i >= length)) {
+           //if yes treat the object as found
+           oid = 0;
+           continue;
+         } else {
+           // compute new object
+           oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
+         }
+         if(oid == 0)
+           goto end;
+       }
+      } else { //+ve stride
+       for(i = startindex; i <= range; i = i + stride) {
+         unsigned int oid = 0;
+         if(i < 0 || i >= length) {
+           //if yes treat the object as found
+           oid = 0;
+           continue;
+         } else {
+           // compute new object
+           oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
+         }
+         // add new object
+         chldOffstFrmBase[*tovisit] = oid;
+         *tovisit = *tovisit + 1;
+       }
+      }
+    } else if(range == 0) {
+      if(startindex >=0 || startindex < length) {
+       unsigned int oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*startindex)));
+       // add new object
+       chldOffstFrmBase[*tovisit] = oid;
+       *tovisit = *tovisit + 1;
+      }
+    }
+    *index = *index + 2;
+  } else { //linked list
+    int startindex = offsets[*index];
+    int range = GET_RANGE(offsets[(*index)+1]);
+    //printf("%s() LinkedList range = %d\n", __func__, range);
+    unsigned int oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
+    //printf("Oid = %d\n", oid);
+    if (range == 0) {
+      chldOffstFrmBase[*tovisit+1] = oid;
+      if(isOidAvail(oid)) {
+       *visited = *visited + 1;
+       *tovisit = *tovisit + 1;
+       *index = *index + 2;
+       //printf("%s() Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index);
+       return 1;
+      } else {
+       *tovisit = *tovisit + 1;
+       //printf("%s() Not Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index);
+       return 1;
+      }
+    } else {
+      int i;
+      for(i = 0; i<range; i++) {
+       chldOffstFrmBase[*tovisit+1] = oid;
+       if(isOidAvail(oid)) {
+         //get the next object
+         if((header = (objheader_t *)mhashSearch(oid))!= NULL) {
+           //Found on machine
+           ;
+         } else if((header = (objheader_t *)prehashSearch(oid))!=NULL) {
+           //Found in prefetch cache
+           ;
+         } else {
+           ;
+         }
+         oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
+         *tovisit = *tovisit + 1;
+         *visited = *visited + 1;
+       } else {
+         //update range
+         offsets[(*index)+1]= (offsets[(*index)+1] & 0x0fff) - 1;
+         *tovisit = *tovisit + 1;
+         return 1;
+       }
+      }
+      return 1;
+    }
+  }
+end:
+  ;
+  return 0;
+}
index 9b38b28b6a8c2789bf77f48b49cfceb67338ec71..8c3837d14ca0bf2b8bc99d01f3d02af99f348225 100644 (file)
@@ -44,7 +44,11 @@ perMcPrefetchList_t* checkIfLocal(char *ptr);
 int isOidAvail(unsigned int oid);
 int lookForObjs(int*, short *, int *, int *, int *, int *);
 void insertPrefetch(int, unsigned int, short, short*, perMcPrefetchList_t **);
+
+/******** Sending and Receiving Prefetches *******/
 void sendRangePrefetchReq(perMcPrefetchList_t *, int sd);
+int rangePrefetchReq(int acceptfd);
+int getRangePrefetchResponse(int sd);
 
 /************* Internal functions *******************/
 int getsize(short *ptr, int n);