batch communications for prefetches
authorbdemsky <bdemsky>
Fri, 13 Mar 2009 03:50:41 +0000 (03:50 +0000)
committerbdemsky <bdemsky>
Fri, 13 Mar 2009 03:50:41 +0000 (03:50 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/gCollect.h
Robust/src/Runtime/DSTM/interface/objstr.c
Robust/src/Runtime/DSTM/interface/prefetch.c
Robust/src/Runtime/DSTM/interface/prefetch.h
Robust/src/Runtime/DSTM/interface/prelookup.c
Robust/src/Runtime/DSTM/interface/trans.c

index a621f9bd688c5f5cda2909aa0cc0bcba26f5ccd8..bd271231e423133165eaf60cdd14ecb64d7de360 100644 (file)
@@ -49,7 +49,6 @@
 
 //Max number of objects
 #define MAX_OBJECTS  20
-#define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
 //Transaction id per machine
 #define TID_LEN 20
 #define LISTEN_PORT 2156
index ff3987d0bfd924fae17eb1fc2348047433aec8d5..ca40fac2ad2c07d4768e9019f4143ca77510c072 100644 (file)
@@ -11,6 +11,7 @@
 #ifdef COMPILER
 #include "thread.h"
 #endif
+#include "gCollect.h"
 
 #define BACKLOG 10 //max pending connections
 #define RECEIVE_BUFFER_SIZE 2048
@@ -789,7 +790,7 @@ int prefetchReq(int acceptfd) {
       /* Calculate the oid corresponding to the offset value */
       for(i = 0 ; i< numoffset ; i++) {
        /* Check for arrays  */
-       if(TYPE(header) > NUMCLASSES) {
+       if(TYPE(header) >= NUMCLASSES) {
          int elementsize = classsize[TYPE(header)];
          struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
          unsigned short length = ao->___length___;
index 5a1b8e4ace1f3c2557b371abe2c27e06cf1bfcba..747e987fe9f46f92c2e0acafdf4f4211f95e852d 100644 (file)
@@ -7,13 +7,14 @@
  ****** Global constants **********
  **********************************/
 
-#define STALE_MINTHRESHOLD 30
+#define STALE_MINTHRESHOLD 10 //minimum size
 
-#define STALE_MAXTHRESHOLD 40 //ugly hack..if you make this too small things
+#define STALE_MAXTHRESHOLD 30 //ugly hack..if you make this too small things
 // will fail in odd subtle ways
 
-#define PREFETCH_FLUSH_THRESHOLD 20
-#define STALL_THRESHOLD 30
+#define DEFAULT_OBJ_STORE_SIZE (4194304-16) //just a little less the 4MB
+#define PREFETCH_FLUSH_THRESHOLD 10 //MINIMUM SIZE BEFORE FLUSHING
+#define STALL_THRESHOLD 15 //number of prefetches stores before we can start freeing old ones
 
 
 
index 1e3bcb54ad4b13c09c3fedad91b5a4fa4ecd16e2..173b069d0d5aa4ab175a4ef4f9e64f3ab2c3d88b 100644 (file)
@@ -1,4 +1,5 @@
 #include "dstm.h"
+#include "gCollect.h"
 
 #define OSUSED(x) (((unsigned int)(x)->top)-((unsigned int) (x+1)))
 #define OSFREE(x) ((x)->size-OSUSED(x))
index d28ba1f7f18952b4a5e7870d71fda6c4dccb9799..b85deec823b92a1bf9e2b806dde42ef874e81ae6 100644 (file)
@@ -79,14 +79,19 @@ perMcPrefetchList_t *processLocal(char *ptr) {
   for(top=0; top>=0;) {
     oid=getNextOid(header, offsetarray, dfsList, top);
     if (oid&1) {
+      int oldisField=TYPE(header) < NUMCLASSES;
       top+=2;
       dfsList[top]=oid;
       dfsList[top+1]=0;
       header=searchObj(oid);
       if (header==NULL) {
        //forward prefetch
-       int machinenum = lhashSearch(dfsList[top]);
-       insertPrefetch(machinenum, dfsList[top], numoffset-top, &offsetarray[top], &head);
+       int machinenum = lhashSearch(oid);
+       
+       if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1])))
+         insertPrefetch(machinenum, oid, 2+numoffset-top, &offsetarray[top-2], &head);
+       else
+         insertPrefetch(machinenum, oid, numoffset-top, &offsetarray[top], &head);
       } else if (top<offstop)
        //okay to continue going down
        continue;
@@ -115,9 +120,14 @@ perMcPrefetchList_t *processLocal(char *ptr) {
   return head;
 }
 
+#define PBUFFERSIZE 16384
+
+
 perMcPrefetchList_t *processRemote(unsigned int oid,  short * offsetarray, int sd, short numoffset) {
   int top;
   unsigned int dfsList[numoffset];
+  char buffer[PBUFFERSIZE];
+  int bufoffset=0;
 
   /* Initialize */
   perMcPrefetchList_t *head = NULL;
@@ -130,7 +140,7 @@ perMcPrefetchList_t *processRemote(unsigned int oid,  short * offsetarray, int s
     insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
     return head;
   } else {
-    sendOidFound(header, oid, sd);
+    sendOidFound(header, oid, sd, buffer, &bufoffset);
   }
 
   dfsList[0]=oid;
@@ -139,18 +149,21 @@ perMcPrefetchList_t *processRemote(unsigned int oid,  short * offsetarray, int s
   //Start searching the dfsList
   for(top=0; top>=0;) {
     oid=getNextOid(header, offsetarray, dfsList, top);
-
     if (oid&1) {
+      int oldisField=TYPE(header) < NUMCLASSES;
       top+=2;
       dfsList[top]=oid;
       dfsList[top+1]=0;
       header=searchObj(oid);
       if (header==NULL) {
        //forward prefetch
-       int machinenum = lhashSearch(dfsList[top]);
-       insertPrefetch(machinenum, dfsList[top], numoffset-top, &offsetarray[top], &head);
+       int machinenum = lhashSearch(oid);
+       if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1])))
+         insertPrefetch(machinenum, oid, 2+numoffset-top, &offsetarray[top-2], &head);
+       else
+         insertPrefetch(machinenum, oid, numoffset-top, &offsetarray[top], &head);
       } else {
-       sendOidFound(header, oid, sd);
+       sendOidFound(header, oid, sd, buffer, &bufoffset);
        if (top<offstop)
          //okay to continue going down
          continue;
@@ -166,8 +179,10 @@ perMcPrefetchList_t *processRemote(unsigned int oid,  short * offsetarray, int s
     do {
       do {
        top-=2;
-       if (top<0)
+       if (top<0) {
+         flushResponses(sd, buffer, &bufoffset);
          return head;
+       }
       } while(dfsList[top+1] == GET_RANGE(offsetarray[top + 3]));
 
       header=searchObj(dfsList[top]);
@@ -177,6 +192,7 @@ perMcPrefetchList_t *processRemote(unsigned int oid,  short * offsetarray, int s
     //increment
     dfsList[top+1]++;
   }
+  flushResponses(sd, buffer, &bufoffset);
   return head;
 }
 
@@ -323,7 +339,6 @@ int getRangePrefetchResponse(int sd) {
   char control = *((char *) recvbuffer);
   unsigned int oid;
   if(control == OBJECT_FOUND) {
-    oid = *((unsigned int *)(recvbuffer + sizeof(char)));
     size = size - (sizeof(char) + sizeof(unsigned int));
     pthread_mutex_lock(&prefetchcache_mutex);
     void *ptr;
@@ -333,21 +348,36 @@ int getRangePrefetchResponse(int sd) {
       pthread_mutex_unlock(&prefetchcache_mutex);
       return -1;
     }
+
+    void *tmp=ptr;
+    int osize=size;
     pthread_mutex_unlock(&prefetchcache_mutex);
+
     memcpy(ptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
-    STATUS(ptr)=0;
 
-    /* Insert into prefetch hash lookup table */
-    void * oldptr;
-    if((oldptr = prehashSearch(oid)) != NULL) {
-      if(((objheader_t *)oldptr)->version <= ((objheader_t *)ptr)->version) {
-       prehashRemove(oid);
+    //ignore oid value...we'll get it from the object
+
+    while(size>0) {
+      unsigned int objsize;
+      GETSIZE(objsize, ptr);
+      STATUS(ptr)=0;
+      oid=OID(ptr);
+      objsize+=sizeof(objheader_t);
+      
+      /* Insert into prefetch hash lookup table */
+      void * oldptr;
+      if((oldptr = prehashSearch(oid)) != NULL) {
+       if(((objheader_t *)oldptr)->version <= ((objheader_t *)ptr)->version) {
+         prehashRemove(oid);
+         prehashInsert(oid, ptr);
+       }
+      } else {
        prehashInsert(oid, ptr);
       }
-    } else {
-      prehashInsert(oid, ptr);
+      ptr=(void *)(((unsigned int)ptr)+objsize);
+      size-=objsize;
     }
-    objheader_t *head = prehashSearch(oid);
+
     pthread_mutex_lock(&pflookup.lock);
     pthread_cond_broadcast(&pflookup.cond);
     pthread_mutex_unlock(&pflookup.lock);
@@ -406,7 +436,7 @@ unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int
   int currcount = dfsList[top+1];
   int range = GET_RANGE(offsetarray[top + 3]);
 
-  if(TYPE(header) > NUMCLASSES) {
+  if(TYPE(header) >= NUMCLASSES) {
     //Array case
     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
     int stride = GET_STRIDE(offsetarray[top + 3])+1;
@@ -450,22 +480,45 @@ unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int
   }
 }
 
-int sendOidFound(objheader_t * header, unsigned int oid, int sd) {
-  int incr = 0;
+void flushResponses(int sd, char * buffer, int * bufoffset) {
+  if ((*bufoffset)!=0) {
+    send_data(sd, buffer, *bufoffset);
+    *bufoffset=0;
+  }
+}
+
+int sendOidFound(objheader_t * header, unsigned int oid, int sd, char *buffer, int *bufoffset) {
+  int incr;
   int objsize;
   GETSIZE(objsize, header);
-  int size  = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
-  char sendbuffer[size];
-  *((int *)(sendbuffer + incr)) = size;
-  incr += sizeof(int);
-  *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
-  incr += sizeof(char);
-  *((unsigned int *)(sendbuffer + incr)) = oid;
-  incr += sizeof(unsigned int);
-  memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+  int size  = sizeof(objheader_t) + objsize;
+  char *sendbuffer;
+
+  if ((incr=(*bufoffset))==0) {
+    buffer[incr] = TRANS_PREFETCH_RESPONSE;
+    incr+=sizeof(char);
+    *((int *)(buffer + incr)) = size+sizeof(int)+sizeof(char)+sizeof(unsigned int);
+    incr += sizeof(int);
+    *((char *)(buffer + incr)) = OBJECT_FOUND;
+    incr += sizeof(char);
+    *((unsigned int *)(buffer + incr)) = oid;
+    incr += sizeof(unsigned int);
+  } else
+    *((int *)(buffer+sizeof(char)))+=size;
+  
+  if ((incr+size)<PBUFFERSIZE) {
+    //don't need to allocate, just copy
+    sendbuffer=buffer;
+    (*bufoffset)=incr+size;
+  } else {
+    sendbuffer=alloca(size+incr);
+    memcpy(sendbuffer, buffer, incr);
+    *bufoffset=0;
+  }
 
-  char control = TRANS_PREFETCH_RESPONSE;
-  sendPrefetchResponse(sd, &control, sendbuffer, &size);
+  memcpy(sendbuffer + incr, header, size);
+  if ((*bufoffset)==0)
+    send_data(sd, sendbuffer, size+incr);
   return 0;
 }
 
index 867bde8fb73fea57359bb95d23838cce20f06386..08697d57e44a9ef7a723a9c0700c3727c6c71ffc 100644 (file)
@@ -62,7 +62,8 @@ INLINE objheader_t *searchObj(unsigned int);
 
 /*********** Functions for computation at the participant end **********/
 unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int *dfsList, int top);
-int sendOidFound(objheader_t *, unsigned int, int);
+int sendOidFound(objheader_t *, unsigned int, int, char *buffer, int *bufoffset);
 int sendOidNotFound(unsigned int oid, int sd);
+void flushResponses(int sd, char * buffer, int * bufoffset);
 
 #endif
index 15aeaad8e8088d35717f0eb4331e2458f17d92b9..3fc1ea1865f0238b1e90aeb1469141d499f45264 100644 (file)
@@ -128,7 +128,6 @@ unsigned int prehashResize(unsigned int newsize) {
   unsigned int oldsize;
   int i,index;
   unsigned int mask;
-
   ptr = pflookup.table;
   oldsize = pflookup.size;
 
index 42aef3188b626778d13434c6c8763da241088c87..8e654af4f1d42e48dee35fa04de4c5785812c446 100644 (file)
@@ -1366,7 +1366,7 @@ int lookupObject(unsigned int * oid, short offset) {
     return 0;
   }
 
-  if(TYPE(header) > NUMCLASSES) {
+  if(TYPE(header) >= NUMCLASSES) {
     int elementsize = classsize[TYPE(header)];
     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
     int length = ao->___length___;