batch the sends
authorbdemsky <bdemsky>
Wed, 23 Sep 2009 07:59:07 +0000 (07:59 +0000)
committerbdemsky <bdemsky>
Wed, 23 Sep 2009 07:59:07 +0000 (07:59 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/queue.c
Robust/src/Runtime/DSTM/interface/queue.h
Robust/src/Runtime/DSTM/interface/trans.c

index 983405db5c0a6f31455217e232dbfb4faddb4458..9e80df5592f1362be9a7851bedea129824b30a89 100644 (file)
@@ -13,6 +13,7 @@
 #define GET_PTR_OID(x)  ((unsigned int *)(x + 2*sizeof(int)))
 #define GET_PTR_EOFF(x,n) ((short *)(x + 2*sizeof(int) + (n*sizeof(unsigned int))))
 #define GET_PTR_ARRYFLD(x,n) ((short *)(x + 2*sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
+
 #define ENDEBUG(s) { printf("Inside %s()\n", s); fflush(stdout);}
 #define EXDEBUG(s) {printf("Outside %s()\n", s); fflush(stdout);}
 /*****************************************
@@ -287,7 +288,7 @@ void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size);
 void prefetch(int, int, unsigned int *, unsigned short *, short*);
 void *transPrefetch(void *);
 void *mcqProcess(void *);
-prefetchpile_t *foundLocal(char *); // returns node with prefetch elements(oids, offsets)
+prefetchpile_t *foundLocal(char *, int); // returns node with prefetch elements(oids, offsets)
 int lookupObject(unsigned int * oid, short offset);
 int checkoid(unsigned int oid);
 int transPrefetchProcess(int **, short);
index 41e44033d88f64d033fdd7dbbf22f9735e825798..75f79ac370ec17d35cf2dfa605bb948105650d38 100644 (file)
@@ -83,6 +83,38 @@ void * gettail() {
   return memory+tailoffset+sizeof(int);
 }
 
+int numavailable() {
+  int tmp=tailoffset;
+  int available=0;
+  if (*((int *)(memory+tmp))==-1) {
+    tmp=0;
+  }
+  while(tmp!=headoffset) {
+    available++;
+    tmp=tmp+*((int *)(memory+tmp));
+    if (tmp>QSIZE|| (*((int *)(memory+tmp))==-1)) {
+      break;
+    }
+  }
+  return available;
+}
+
+void incmulttail(int num) {
+  int i;
+  for(i=0;i<num;i++) {
+    int tmpoffset=tailoffset+*((int *)(memory+tailoffset));
+    if (tmpoffset>QSIZE)
+      tailoffset=0;
+    else
+      tailoffset=tmpoffset;
+  }
+}
+
+void resetqueue() {
+  headoffset=0;
+  tailoffset=0;
+}
+
 void inctail() {
   int tmpoffset=tailoffset+*((int *)(memory+tailoffset));
   if (tmpoffset>QSIZE)
index 2e1aa9ec309a01aa4775731e1fa2d5e61d0b6cf9..e284615ded7b81f18d3fb94fa881c08c5b23c7b4 100644 (file)
@@ -13,4 +13,7 @@ void movehead(int size);
 void * gettail();
 void inctail();
 void predealloc();
+int numavailable();
+void resetqueue();
+void incmulttail(int);
 #endif
index 51d143587a82e53b1499b009e98771dce0f979bf..d9c35860f2806c35360f72dbe320297f25559814 100644 (file)
@@ -284,7 +284,8 @@ inline int findmax(int *array, int arraylength) {
   return max;
 }
 
-#define INLINEPREFETCH 1
+#define INLINEPREFETCH 0
+#define PREFTHRESHOLD 4
 
 /* This function is a prefetch call generated by the compiler that
  * populates the shared primary prefetch queue*/
@@ -293,7 +294,13 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof
   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
   int len;
 #ifdef INLINEPREFETCH
-  char node[qnodesize];
+  int attempted=0;
+  char *node;
+  do {
+  node=getmemory(qnodesize);
+  if (node==NULL&&attempted)
+    break;
+  if (node!=NULL) {
 #else
   char *node=getmemory(qnodesize);
 #endif
@@ -316,25 +323,30 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof
   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
 
 #ifdef INLINEPREFETCH
-  prefetchpile_t *pilehead = foundLocal(node);
+  movehead(qnodesize);
+  }
+  int numpref=numavailable();
+  attempted=1;
 
-  if (pilehead!=NULL) {
-    // Get sock from shared pool
-    
-    /* Send  Prefetch Request */
-    prefetchpile_t *ptr = pilehead;
-    while(ptr != NULL) {
-      int sd = getSock2(transPrefetchSockPool, ptr->mid);
-      sendPrefetchReq(ptr, sd);
-      ptr = ptr->next;
+  if (node==NULL && numpref!=0 || numpref==PREFTHRESHOLD) {
+    node=gettail();
+    prefetchpile_t *pilehead = foundLocal(node,numpref);
+    if (pilehead!=NULL) {
+      // Get sock from shared pool
+      
+      /* Send  Prefetch Request */
+      prefetchpile_t *ptr = pilehead;
+      while(ptr != NULL) {
+       int sd = getSock2(transPrefetchSockPool, ptr->mid);
+       sendPrefetchReq(ptr, sd);
+       ptr = ptr->next;
+      }
+      
+      mcdealloc(pilehead);
+      resetqueue();
     }
-    
-    /* Release socket */
-    // freeSock(transPrefetchSockPool, pilehead->mid, sd);
-    
-    /* Deallocated pilehead */
-    mcdealloc(pilehead);
-  }
+  }//end do prefetch if condition
+  } while(node==NULL);
 #else
   /* Lock and insert into primary prefetch queue */
   movehead(qnodesize);
@@ -1470,47 +1482,53 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
   return 0;
 }
 
-prefetchpile_t *foundLocal(char *ptr) {
-  int siteid = *(GET_SITEID(ptr));
-  int ntuples = *(GET_NTUPLES(ptr));
-  unsigned int * oidarray = GET_PTR_OID(ptr);
-  unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
-  short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+prefetchpile_t *foundLocal(char *ptr, int numprefetches) {
+  int i;
+  int j;
   prefetchpile_t * head=NULL;
-  int numLocal = 0;
 
-  int i;
-  for(i=0; i<ntuples; i++) {
-    unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
-    unsigned short endindex=endoffsets[i];
-    unsigned int oid=oidarray[i];
-    int newbase;
-    int machinenum;
-
-    if (oid==0)
-      continue;
-    //Look up fields locally
-    for(newbase=baseindex; newbase<endindex; newbase++) {
-      if (!lookupObject(&oid, arryfields[newbase]))
-       break;
-      //Ended in a null pointer...
+  for(j=0;j<numprefetches;j++) {
+    int siteid = *(GET_SITEID(ptr));
+    int ntuples = *(GET_NTUPLES(ptr));
+    unsigned int * oidarray = GET_PTR_OID(ptr);
+    unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
+    short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+    int numLocal = 0;
+    
+    for(i=0; i<ntuples; i++) {
+      unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
+      unsigned short endindex=endoffsets[i];
+      unsigned int oid=oidarray[i];
+      int newbase;
+      int machinenum;
+      
       if (oid==0)
+       continue;
+      //Look up fields locally
+      for(newbase=baseindex; newbase<endindex; newbase++) {
+       if (!lookupObject(&oid, arryfields[newbase]))
+         break;
+       //Ended in a null pointer...
+       if (oid==0)
+         goto tuple;
+      }
+      //Entire prefetch is local
+      if (newbase==endindex&&checkoid(oid)) {
+       numLocal++;
        goto tuple;
+      }
+      //Add to remote requests
+      machinenum=lhashSearch(oid);
+      insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
+    tuple:
+      ;
     }
-    //Entire prefetch is local
-    if (newbase==endindex&&checkoid(oid)) {
-      numLocal++;
-      goto tuple;
-    }
-    //Add to remote requests
-    machinenum=lhashSearch(oid);
-    insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
-tuple:
-    ;
+    
+    /* handle dynamic prefetching */
+    handleDynPrefetching(numLocal, ntuples, siteid);
+    ptr=((char *)&arryfields[endoffsets[ntuples-1]])+sizeof(int);
   }
 
-  /* handle dynamic prefetching */
-  handleDynPrefetching(numLocal, ntuples, siteid);
   return head;
 }
 
@@ -1565,7 +1583,8 @@ void *transPrefetch(void *t) {
     void *node=gettail();
     /* Check if the tuples are found locally, if yes then reduce them further*/
     /* and group requests by remote machine ids by calling the makePreGroups() */
-    prefetchpile_t *pilehead = foundLocal(node);
+    int count=numavailable();
+    prefetchpile_t *pilehead = foundLocal(node, count);
 
     if (pilehead!=NULL) {
       // Get sock from shared pool
@@ -1585,7 +1604,7 @@ void *transPrefetch(void *t) {
       mcdealloc(pilehead);
     }
     // Deallocate the prefetch queue pile node
-    inctail();
+    inctail(numavailable);
   }
 }
 
@@ -1622,6 +1641,8 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
   int len, endpair;
   char control;
   objpile_t *tmp;
+  struct writestruct writebuffer;
+  writebuffer.offset=0;
 
   /* Send TRANS_PREFETCH control message */
   int first=1;
@@ -1652,7 +1673,10 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
       *((int *)(&oidnoffset[len]))=-1;
       len+=sizeof(int);
     }
-    send_data(sd, oidnoffset, len);
+    if (tmp!=NULL)
+      send_buf(sd, & writebuffer, oidnoffset, len);
+    else
+      forcesend_buf(sd, & writebuffer, oidnoffset, len);
   }
 
   LOGEVENT('S');