runtime changes for ecoop submission
authoradash <adash>
Thu, 17 Dec 2009 00:05:57 +0000 (00:05 +0000)
committeradash <adash>
Thu, 17 Dec 2009 00:05:57 +0000 (00:05 +0000)
20 files changed:
Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c
Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h
Robust/src/Runtime/DSTM/interface/addUdpEnhance.c
Robust/src/Runtime/DSTM/interface/addUdpEnhance.h
Robust/src/Runtime/DSTM/interface/altprelookup.c
Robust/src/Runtime/DSTM/interface/clocksyncclient.c
Robust/src/Runtime/DSTM/interface/clocksyncserver.c
Robust/src/Runtime/DSTM/interface/debugmacro.h [new file with mode: 0755]
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/gCollect.c
Robust/src/Runtime/DSTM/interface/machinepile.c
Robust/src/Runtime/DSTM/interface/machinepile.h
Robust/src/Runtime/DSTM/interface/mcpileq.h
Robust/src/Runtime/DSTM/interface/prefetch.c
Robust/src/Runtime/DSTM/interface/signal.c
Robust/src/Runtime/DSTM/interface/threadnotify.c
Robust/src/Runtime/DSTM/interface/trans.c
Robust/src/Runtime/garbage.c
Robust/src/Runtime/runtime.c

index d416090280a518620ef19b07b79ec30782331edf..dee6025c64f8147056bd419fbda224531f790ecd 100644 (file)
@@ -1,5 +1,5 @@
 #include "addPrefetchEnhance.h"
-#include "prelookup.h"
+#include "altprelookup.h"
 
 extern int numprefetchsites; // Number of prefetch sites
 extern pfcstats_t *evalPrefetch; //Global array that keeps track of operation mode (ON/OFF) for each prefetch site
@@ -57,10 +57,11 @@ char getOperationMode(int siteid) {
  * we take action accordingly */
 void handleDynPrefetching(int numLocal, int ntuples, int siteid) {
   if(numLocal < ntuples) {
-    /* prefetch not found locally(miss in cache) */
+    /* prefetch not found locally(miss in cache); turn on prefetching*/
     evalPrefetch[siteid].operMode = 1;
     evalPrefetch[siteid].uselesscount = SHUTDOWNINTERVAL;
   } else {
+    //Turn off prefetch site
     if(getOperationMode(siteid) != 0) {
       evalPrefetch[siteid].uselesscount--;
       if(evalPrefetch[siteid].uselesscount <= 0) {
@@ -175,14 +176,10 @@ int copyToCache(int numoid, unsigned int *oidarray, char oidType) {
       newAddr->version += 1;
       newAddr->notifylist = NULL;
     }
+    STATUS(newAddr)=0;
+
     //make an entry in prefetch lookup hashtable
-    void *oldptr;
-    if((oldptr = prehashSearch(oid)) != NULL) {
-      prehashRemove(oid);
-      prehashInsert(oid, newAddr);
-    } else {
-      prehashInsert(oid, newAddr);
-    }
+    prehashInsert(oid, newAddr);
   } //end of for
   return 0;
 }
index 44c87049fedbb4e2e7f29bda53c84a1883b1d2fe..7eb3c5192247f90771a81b2d9810fb0cdd629be8 100644 (file)
@@ -2,7 +2,7 @@
 #define _ADDPREFETCHENHANCE_H_
 
 #include "dstm.h"
-#include "mlookup.h"
+#include "altmlookup.h"
 #include "gCollect.h"
 
 typedef struct prefetchCountStats {
index 5c9363a0f0e82080eac4fd5ae5a75b1133d11be8..adaf6671242d68675c7ec18c428122d3c0640447 100644 (file)
@@ -5,7 +5,7 @@
 #include <math.h>
 #include <netinet/tcp.h>
 #include "addUdpEnhance.h"
-#include "prelookup.h"
+#include "altprelookup.h"
 #ifdef ABORTREADERS
 #include "abortreaders.h"
 #endif
@@ -209,7 +209,9 @@ int invalidateFromPrefetchCache(char *buffer) {
       objheader_t *header;
       /* Lookup Objects in prefetch cache and remove them */
       if(((header = prehashSearch(oid)) != NULL)) {
-       prehashRemove(oid);
+        //Keep invalid objects
+        STATUS(header)=DIRTY;
+        //prehashRemove(oid);
       }
       offset += sizeof(unsigned int);
     }
index b3964bce4ea67ccb68c6c6d47b088bdf6e8dc394..5011df313efd126537009b3e1c9b637d347188dd 100644 (file)
@@ -2,7 +2,7 @@
 #define _ADDUDPENHANCE_H
 
 #include "dstm.h"
-#include "mlookup.h"
+#include "altmlookup.h"
 
 
 /*******************************
index 87afbe893594d0df6435ac796ca8407845ceb5b7..b44e8e99047ca02e0ccd00a86e445788ed6ce482 100644 (file)
@@ -121,54 +121,52 @@ void *prehashSearch(unsigned int key) {
 }
 
 unsigned int prehashRemove(unsigned int key) {
-  int index;
-  prehashlistnode_t *prev;
-  prehashlistnode_t *ptr, *node;
-
-  //eom
-  unsigned int keyindex=key>>1;
+  unsigned int keyindex = key >> 1;
   volatile unsigned int * lockptr=&pflookup.larray[keyindex&PRELOCKMASK].lock;
+  prehashlistnode_t *node, *prev;
 
   while(!write_trylock(lockptr)) {
     sched_yield();
   }
-  
   prehashlistnode_t *curr = &pflookup.table[keyindex&pflookup.mask];
-  //eom
-
-  for (; curr != NULL; curr = curr->next) {
-    if (curr->key == key) {        
-      // Find a match in the hash table
-      //decrement the number of elements in the global hashtable  
+  // If there are no elements
+  //delete from first bin of table
+  if (curr->next == NULL && curr->key == key) {
+    curr->key = 0;
+    //TODO free(val) ?
+    curr->val = NULL;
+    atomic_dec(&(pflookup.numelements));
+    write_unlock(lockptr);
+    return 0;
+  }
+  //delete from first bin of table but elements follow in linked list
+  if (curr->next != NULL && curr->key == key) {
+    curr->key = curr->next->key;
+    curr->val = curr->next->val;
+    node = curr->next;
+    curr->next = node->next;
+    free(node);
+    atomic_dec(&(pflookup.numelements));
+    write_unlock(lockptr);
+    return 0;
+  }
+  prev = curr;
+  curr = curr->next;
+  //delete from elements in the linked list
+  for(; curr != NULL; curr = curr->next) {
+    if (curr->key == key) {
+      prev->next = curr->next;
+      free(curr);
       atomic_dec(&(pflookup.numelements));
-      
-     if ((curr == &ptr[index]) && (curr->next == NULL)) {  
-       // Delete the first item inside the hashtable with no linked list of prehashlistnode_t
-       curr->key = 0;
-       curr->val = NULL;
-      } else if ((curr == &ptr[index]) && (curr->next != NULL)) { 
-       //Delete the first item with a linked list of prehashlistnode_t  connected
-       curr->key = curr->next->key;
-       curr->val = curr->next->val;
-       node = curr->next;
-       curr->next = curr->next->next;
-       free(node);
-      } else {                                          
-       // Regular delete from linked listed
-       prev->next = curr->next;
-       free(curr);
-      }
-      //pthread_mutex_unlock(&pflookup.lock);
-     write_unlock(lockptr);
+      write_unlock(lockptr);
       return 0;
     }
     prev = curr;
   }
   write_unlock(lockptr);
-
   return 1;
 }
-
 unsigned int prehashResize(unsigned int newsize) {
   prehashlistnode_t *node, *ptr;  // curr and next keep track of the current and the next chashlistnodes in a linked list
   unsigned int oldsize;
index 749ac47ef1716fb24e7dbaaeb08848c55172a375..46084aca6c48296488a1adfc31fd869d180560b3 100644 (file)
 #include <netinet/in.h>
 #include <netdb.h>
 #include <string.h>
+#include <math.h>
 
 #define PORT        8500
              /* REPLACE with your server machine name*/
 #define DIRSIZE     64
-#define NUMITER   1024
+#define NUMITER   10000
 
 
 static __inline__ unsigned long long rdtsc(void)
@@ -108,14 +109,30 @@ int main(int argc, char **argv) {
     }
     //printf("DEBUG: dir[0]= %lld\n", dir[0]);
     array2[i]=rdtsc() - dir[0];
+    printf("%lld\n", array2[i]);
   }
 
   for(i=0;i<(NUMITER-1);i++) {
     norm += array2[i];
   }
 
+
+
   /* spew-out the results */
   //printf("DEBUG: Average offset= %lld\n", (norm/(NUMITER-1)));
+  long long average=(norm/(NUMITER-1));
+  printf("average= %lld",(norm/(NUMITER-1)));
+  long long stddev, avg1=0;
+  for(i=0;i<(NUMITER-1);i++) {
+    avg1 += ((array2[i] - average) * (array2[i] - average));
+  }
+  float ans = (avg1/(NUMITER-1));
+  float squareroot= sqrt(ans);
+  float squareroot2= sqrt(avg1);
+
+  printf("stddev= %f\n", squareroot); 
+  printf("error= %f\n", squareroot2/(NUMITER-1));
+
   fprintf(f1,"%lld",(norm/(NUMITER-1)));
 
   close(sd);
index 9e481d15d949d7ff9bc46a22806b610659090fec..9e1ddafeb47f180bf21f1e515802d671339dbd1e 100644 (file)
 #include <string.h>
 #include <stdlib.h>
 #include <unistd.h>
+#include <math.h>
 
 #define PORT           8500
-#define NUMITER     1024
+#define NUMITER     10000
 #define DIRSIZE        1
 
 static __inline__ unsigned long long rdtsc(void)
@@ -89,7 +90,9 @@ int main() {
     //printf("DEBUG: dir[0]= %lld\n", dir[0]);
     array2[i] = rdtsc();
     //printf("DEBUG: array2[i]= %lld\n", array2[i]);
-    array1[i]=array2[i] - dir[0];
+    //array1[i]=array2[i] - dir[0];
+    array1[i]= dir[0] - array2[i];
+    printf("%lld\n", array1[i]);
 
     /* acknowledge the message, reply w/ the file names */
     if (send(sd_current, &array2[i], sizeof(unsigned long long), MSG_NOSIGNAL) == -1) {
@@ -105,7 +108,20 @@ int main() {
 
   /* spew-out the results */
   //printf("DEBUG: Average offset= %lld\n", (norm/(NUMITER-1)));
-  fprintf(f1,"%lld",(norm/(NUMITER-1)));
+  long long average=(norm/(NUMITER-1));
+  printf("average= %lld",(norm/(NUMITER-1)));
+
+  long long stddev, avg1=0;
+  for(i=0;i<(NUMITER-1);i++) {
+    avg1 += ((array1[i] - average) * (array1[i] - average));
+  }
+  float ans = (avg1/(NUMITER-1));
+  float squareroot= sqrt(ans);
+  float squareroot2= sqrt(avg1);
+
+  printf("stddev= %f\n", squareroot); 
+  printf("error= %f\n", squareroot2/(NUMITER-1));
+  fprintf(f1,"%lld\n",(norm/(NUMITER-1)));
 
 
   /* give client a chance to properly shutdown */
diff --git a/Robust/src/Runtime/DSTM/interface/debugmacro.h b/Robust/src/Runtime/DSTM/interface/debugmacro.h
new file mode 100755 (executable)
index 0000000..c11be59
--- /dev/null
@@ -0,0 +1,59 @@
+#ifndef _DEBUGMACRO_H_
+#define _DEBUGMACRO_H_
+
+/** Macro to print oid and object type **/
+//#define LOGOIDTYPES //turn on printing oid and type events
+#ifdef LOGOIDTYPES
+#define LOGOIDTYPE(x,y,z,t) printf("[%s: %u %u %lld]\n", x, y, z, t);
+#else
+#define LOGOIDTYPE(x,y,z,t)
+#endif
+
+
+/** Macro to print prefetch site id **/
+//#define LOGPREFETCHSITES
+#ifdef LOGPREFETCHSITES 
+#define LOGPREFETCHSITE(PTR) printf("[siteid= %u] ", PTR->siteid);
+#else
+#define LOGPREFETCHSITE(PTR)
+#endif
+
+
+/*
+#define LOGEVENTS //turn on Logging events
+#ifdef LOGEVENTS
+char bigarray[16*1024*1024];
+int bigindex=0;
+#define LOGEVENT(x) { \
+    int tmp=bigindex++;                                \
+    bigarray[tmp]=x;                           \
+  }
+#else
+#define LOGEVENT(x)
+#endif
+*/
+
+/**
+ * Record Time after clock synchronization 
+ **/
+/*
+#define LOGTIMES
+#ifdef LOGTIMES
+char bigarray1[8*1024*1024];
+unsigned int bigarray2[8*1024*1024];
+unsigned int bigarray3[8*1024*1024];
+long long bigarray4[8*1024*1024];
+int bigindex1=0;
+#define LOGTIME(x,y,z,a) {\
+  int tmp=bigindex1++; \
+  bigarray1[tmp]=x; \
+  bigarray2[tmp]=y; \
+  bigarray3[tmp]=z; \
+  bigarray4[tmp]=a; \
+}
+#else
+#define LOGTIME(x,y,z,a)
+#endif
+*/
+
+#endif
index 2e9c6e9d2e328ea47800bda119a8f35786aa32a4..df4a20ca71b80b7bb3f6c3416597ce33f1b112bf 100644 (file)
@@ -56,9 +56,9 @@
 #define UDP_PORT 2158
 //Prefetch tuning paramters
 //#define RETRYINTERVAL  20 //N (For Em3d, SOR, Moldyn benchmarks)
-//#define SHUTDOWNINTERVAL  3  //M
-#define RETRYINTERVAL  20 //N  (For MatrixMultiply, 2DFFT benchmarks)
-#define SHUTDOWNINTERVAL 1  //M
+//#define SHUTDOWNINTERVAL  3 //M
+#define RETRYINTERVAL 100  //N  (For MatrixMultiply, 2DFFT, 2DConv benchmarks)
+#define SHUTDOWNINTERVAL 1 //M
 
 #include <stdlib.h>
 #include <stdio.h>
@@ -264,6 +264,7 @@ void mapObjMethod(unsigned short);
 
 void randomdelay();
 void transStart();
+//#define TRANSREAD(x,y,z(tobe passed as a parameter to transRead2)) {
 #define TRANSREAD(x,y) { \
   unsigned int inputvalue;\
 if ((inputvalue=(unsigned int)y)==0) x=NULL;\
@@ -292,11 +293,11 @@ void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size);
 void prefetch(int, int, unsigned int *, unsigned short *, short*);
 void *transPrefetch(void *);
 void *mcqProcess(void *);
-prefetchpile_t *foundLocal(char *, int); // returns node with prefetch elements(oids, offsets)
-int lookupObject(unsigned int * oid, short offset);
-int checkoid(unsigned int oid);
+prefetchpile_t *foundLocal(char *, int, int); // returns node with prefetch elements(oids, offsets, siteid)
+int lookupObject(unsigned int * oid, short offset, int *);
+int checkoid(unsigned int oid, int isLastOffset);
 int transPrefetchProcess(int **, short);
-void sendPrefetchReq(prefetchpile_t*, int);
+void sendPrefetchReq(prefetchpile_t*, int, int);
 void sendPrefetchReqnew(prefetchpile_t*, int);
 int getPrefetchResponse(int, struct readstruct *);
 unsigned short getObjType(unsigned int oid);
@@ -305,6 +306,7 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
 void commitCountForObjRead(char *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short);
 void commitCountForObjMod(char *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short);
 
+long long myrdtsc(void);
 /* Sends notification request for thread join, if sucessful returns 0 else returns -1 */
 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid);
 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid);
index 4122ddc89f4c9972b3c2d7e14ed280e29bedb05b..859bc4c77641d2ad96e0ff265d9ba6257d71b0b0 100644 (file)
@@ -3,7 +3,7 @@
 
 #include <netinet/tcp.h>
 #include "dstm.h"
-#include "mlookup.h"
+#include "altmlookup.h"
 #include "llookup.h"
 #include "threadnotify.h"
 #include "prefetch.h"
@@ -13,6 +13,7 @@
 #endif
 #include "gCollect.h"
 #include "readstruct.h"
+#include "debugmacro.h"
 
 #define BACKLOG 10 //max pending connections
 #define RECEIVE_BUFFER_SIZE 2048
 extern int classsize[];
 extern int numHostsInSystem;
 extern pthread_mutex_t notifymutex;
+extern unsigned long long clockoffset;
+long long startreq, endreq, diff;
+
+//#define LOGTIMES
+#ifdef LOGTIMES
+extern char bigarray1[6*1024*1024];
+extern unsigned int bigarray2[6*1024*1024];
+extern unsigned int bigarray3[6*1024*1024];
+extern long long bigarray4[6*1024*1024];
+extern int bigarray5[6*1024*1024];
+extern int bigindex1;
+#define LOGTIME(x,y,z,a,b) {\
+  int tmp=bigindex1; \
+  bigarray1[tmp]=x; \
+  bigarray2[tmp]=y; \
+  bigarray3[tmp]=z; \
+  bigarray4[tmp]=a; \
+  bigarray5[tmp]=b; \
+  bigindex1++; \
+}
+#else
+#define LOGTIME(x,y,z,a,b)
+#endif
+
+
+long long myrdtsc(void)
+{
+  unsigned hi, lo; 
+  __asm__ __volatile__ ("rdtsc" : "=a"(lo), "=d"(hi));
+  return ( (unsigned long long)lo)|( ((unsigned long long)hi)<<32 );
+}
 
 objstr_t *mainobjstore;
 pthread_mutex_t mainobjstore_mutex;
@@ -199,6 +231,7 @@ void *dstmAccept(void *acceptfd) {
        break;
       }
 #else
+      LOGTIME('X',0,0,myrdtsc(),0);
       if((val = prefetchReq((int)acceptfd, &readbuffer)) != 0) {
        printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
        break;
@@ -552,7 +585,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
       free(oidlocked);
     }
     */
-    //control=TRANS_DISAGREE;
+    control=TRANS_DISAGREE;
     send_data(acceptfd, &control, sizeof(char));
 #ifdef CACHE
     send_data(acceptfd, &numBytes, sizeof(int));
@@ -603,7 +636,6 @@ char getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
        *numBytes += size;
        /* Send TRANS_DISAGREE to Coordinator */
        *control = TRANS_DISAGREE;
-       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
       }
       //Keep track of oid locked
       oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
@@ -620,7 +652,6 @@ char getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
        size += sizeof(objheader_t);
        *numBytes += size;
        *control = TRANS_DISAGREE;
-       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
       }
     }
   }
@@ -653,7 +684,6 @@ char getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked
        *numBytes += size;
        /* Send TRANS_DISAGREE to Coordinator */
        *control = TRANS_DISAGREE;
-       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
       }
       //Keep track of oid locked
       oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
@@ -670,7 +700,6 @@ char getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked
        size += sizeof(objheader_t);
        *numBytes += size;
        *control = TRANS_DISAGREE;
-       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
       }
     }
   }
@@ -693,10 +722,7 @@ void procRestObjs(char *objread,
   unsigned short version;
 
   /* Process each oid in the machine pile/ group per thread */
-  //printf("DEBUG: index= %d, numread= %d, nummod= %d numread+nummod= %d\n", index,numread,nummod,numread+nummod);
   for (i = index; i < numread+nummod; i++) {
-    //printf("DEBUG: i= %d\n", i);
-    //fflush(stdout);
     if (i < numread) { //Objs only read and not modified
       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
       incr *= i;
@@ -859,9 +885,8 @@ void processVerNoMatch(unsigned int *oidnotfound,
  * Looks for the objects to be prefetched in the main object store.
  * If objects are not found then record those and if objects are found
  * then use offset values to prefetch references to other objects */
-
 int prefetchReq(int acceptfd, struct readstruct * readbuffer) {
-  int i, size, objsize, numoffset = 0;
+  int i, size, objsize, numoffset = 0, gid=0;
   int length;
   char *recvbuffer, control;
   unsigned int oid, mid=-1;
@@ -869,6 +894,7 @@ int prefetchReq(int acceptfd, struct readstruct * readbuffer) {
   oidmidpair_t oidmid;
   struct writestruct writebuffer;
   int sd = -1;
+
   while(1) {
     recv_data_buf((int)acceptfd, readbuffer, &numoffset, sizeof(int));
     if(numoffset == -1)
@@ -885,22 +911,26 @@ int prefetchReq(int acceptfd, struct readstruct * readbuffer) {
       writebuffer.offset=0;
     }
     short offsetarry[numoffset];
+    recv_data_buf((int)acceptfd, readbuffer, &gid, sizeof(int));
     recv_data_buf((int) acceptfd, readbuffer, offsetarry, numoffset*sizeof(short));
+    LOGTIME('A',oid ,0,myrdtsc(),gid); //after recv the entire prefetch request 
 
     /*Process each oid */
     if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */
       /* Save the oids not found in buffer for later use */
-      size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+      size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
       char sendbuffer[size+1];
       sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
       *((int *) (sendbuffer+sizeof(char))) = size;
       *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
       *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char)+sizeof(char))) = oid;
+      *((int *)(sendbuffer+sizeof(int) + sizeof(char)+sizeof(char)+sizeof(unsigned int))) = gid;
       send_buf(sd, &writebuffer, sendbuffer, size+1);
+      LOGTIME('J',oid, 0,myrdtsc(), gid); //send first oid not found prefetch request
     } else { /* Object Found */
       int incr = 1;
       GETSIZE(objsize, header);
-      size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+      size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
       char sendbuffer[size+1];
       sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
       *((int *)(sendbuffer + incr)) = size;
@@ -909,8 +939,12 @@ int prefetchReq(int acceptfd, struct readstruct * readbuffer) {
       incr += sizeof(char);
       *((unsigned int *)(sendbuffer+incr)) = oid;
       incr += sizeof(unsigned int);
+      *((int *)(sendbuffer+incr)) = gid;
+      incr += sizeof(int);
       memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
       send_buf(sd, &writebuffer, sendbuffer, size+1);
+      LOGOIDTYPE("SRES", oid, TYPE(header), (myrdtsc()-clockoffset));
+      LOGTIME('C',oid,TYPE(header),myrdtsc(), gid); //send first oid found from prefetch request
 
       /* Calculate the oid corresponding to the offset value */
       for(i = 0 ; i< numoffset ; i++) {
@@ -932,20 +966,24 @@ int prefetchReq(int acceptfd, struct readstruct * readbuffer) {
        if (oid==0)
          break;
 
+    LOGTIME('B',oid,0,myrdtsc(),gid); //send next oid found from prefetch request
+
        if((header = mhashSearch(oid)) == NULL) {
-         size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+         size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
          char sendbuffer[size+1];
          sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
          *((int *) (sendbuffer+1)) = size;
          *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
          *((unsigned int *)(sendbuffer + sizeof(char)+sizeof(int) + sizeof(char))) = oid;
+      *((int *)(sendbuffer+sizeof(int) + sizeof(char)+sizeof(char)+sizeof(unsigned int))) = gid;
 
          send_buf(sd, &writebuffer, sendbuffer, size+1);
+      LOGTIME('J',oid, 0,myrdtsc(), gid); //send first oid not found prefetch request
          break;
        } else { /* Obj Found */
          int incr = 1;
          GETSIZE(objsize, header);
-         size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+         size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
          char sendbuffer[size+1];
          sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
          *((int *)(sendbuffer + incr)) = size;
@@ -954,12 +992,17 @@ int prefetchReq(int acceptfd, struct readstruct * readbuffer) {
          incr += sizeof(char);
          *((unsigned int *)(sendbuffer+incr)) = oid;
          incr += sizeof(unsigned int);
+      *((int *)(sendbuffer+incr)) = gid;
+      incr += sizeof(int);
          memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
          send_buf(sd, &writebuffer, sendbuffer, size+1);
+      LOGOIDTYPE("SRES", oid, TYPE(header), (myrdtsc()-clockoffset));
+      LOGTIME('C',oid,TYPE(header),myrdtsc(), gid); //send first oid found from prefetch request
        }
       } //end of for
     }
   } //end of while
+
     //Release socket
   if (mid!=-1) {
     forcesend_buf(sd, &writebuffer, NULL, 0);
index 0be92327fa3cd772e74c83000b076c4cb3607b9c..185a02f618c422d1afa2fb3e7845ae5fb1c577a1 100644 (file)
@@ -1,9 +1,5 @@
 #include "gCollect.h"
-#if 0
 #include "altprelookup.h"
-#else
-#inlcude "prelookup.h"
-#endif
 
 
 extern pthread_mutex_t prefetchcache_mutex; //Mutex to lock Prefetch Cache
@@ -99,23 +95,12 @@ void *prefetchobjstrAlloc(unsigned int size) {
   return ptr;
 }
 
-#if 0
 void clearBlock(objstr_t *block) {
 
   unsigned long int tmpbegin=(unsigned int)block;
   unsigned long int tmpend=(unsigned int)block->top;
   int i, j;
   prehashlistnode_t *ptr;
-  //pthread_mutex_lock(&pflookup.lock);
-  /*
-  for(i=0;i<PRENUMLOCKS;i++) {
-    volatile unsigned int * lockptr=&pflookup.larray[i].lock;
-    
-    while(!write_trylock(lockptr)) {
-      sched_yield();
-    }
-  }
-  */
 
   int lockindex=0;
   ptr = pflookup.table;
@@ -163,7 +148,6 @@ void clearBlock(objstr_t *block) {
       while(!write_trylock(lockptr_new)){
         sched_yield();
       }
-      //printf("grab new lock id=%d for %d\n",lockindex,i);
       write_unlock(lockptr_current);
       lockptr_current=lockptr_new;      
     }
@@ -171,51 +155,7 @@ void clearBlock(objstr_t *block) {
   }// end of for (pflokup)
   
   write_unlock(lockptr_current);
-}
-#else
-void clearBlock(objstr_t *block) {
-  unsigned long int tmpbegin=(unsigned int)block;
-  unsigned long int tmpend=(unsigned int)block->top;
-  int i, j;
-  prehashlistnode_t *ptr;
-  pthread_mutex_lock(&pflookup.lock);
-
-  ptr = pflookup.table;
-  for(i = 0; i<pflookup.size; i++) {
-    prehashlistnode_t *orig=&ptr[i];
-    prehashlistnode_t *curr = orig;
-    prehashlistnode_t *next=curr->next;
-    for(; next != NULL; curr=next, next = next->next) {
-      unsigned int val=(unsigned int)next->val;
-      if ((val>=tmpbegin)&(val<tmpend)) {
-       prehashlistnode_t *tmp=curr->next=next->next;
-       free(next);
-       next=curr;
-       //loop condition is broken now...need to check before incrementing
-       //if (next==NULL)
-       // break;
-      }
-    }
-    {
-      unsigned int val=(unsigned int)orig->val;
-      if ((val>=tmpbegin)&(val<tmpend)) {
-       if (orig->next==NULL) {
-         orig->key=0;
-         orig->val=NULL;
-       } else {
-         next=orig->next;
-         orig->val=next->val;
-         orig->key=next->key;
-         orig->next=next->next;
-         free(next);
-       }
-      }
-    }
-  }
-  pthread_mutex_unlock(&pflookup.lock);
 }
-#endif
 
 objstr_t *allocateNew(unsigned int size) {
   objstr_t *tmp;
index 9d4a15de51537a18b063fe6b5c02c841596eaf12..49905865516d1eba8bf5c4e42915a0172ae634e9 100644 (file)
@@ -1,6 +1,6 @@
 #include "machinepile.h"
 
-void insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t **head) {
+void insertPile(int mid, unsigned int oid, int siteid, short numoffset, short *offset, prefetchpile_t **head) {
   prefetchpile_t *ptr;
   objpile_t *objnode;
   unsigned int *oidarray;
@@ -16,6 +16,7 @@ void insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefe
       objnode->offset = offset;
       objnode->oid = oid;
       objnode->numoffset = numoffset;
+      objnode->siteid = siteid;
       objnode->next = NULL;
       tmp->objpiles = objnode;
       tmp->next = *head;
@@ -37,6 +38,7 @@ void insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefe
        objnode->offset = offset;
        objnode->oid = oid;
        objnode->numoffset = numoffset;
+    objnode->siteid = siteid;
        objnode->next = *tmp;
        *tmp = objnode;
        return;
@@ -64,6 +66,7 @@ void insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefe
          objnode->offset = offset;
          objnode->oid = oid;
          objnode->numoffset = numoffset;
+      objnode->siteid = siteid;
          objnode->next = *tmp;
          *tmp = objnode;
          return;
index c32a02a86d7ded0a5d2f374ba236b6c56a53e361..53f94d6cb931748905a73f081d1bd79446532af0 100644 (file)
@@ -5,6 +5,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 
-void insertPile(int, unsigned int, short, short *, prefetchpile_t **);
+//add prefetch site as an argument for debugging
+void insertPile(int, unsigned int, int, short, short *, prefetchpile_t **);
 
 #endif
index 5c0ab8bac25d761896b3e3ebf25d5b2b29f4c627..53937e7323bf9eb0cd60c60afffda8f2a1e24017 100644 (file)
@@ -9,6 +9,7 @@
 //Structure to make machine groups when prefetching
 typedef struct objpile {
   unsigned int oid;
+  int siteid;
   short numoffset;
   short *offset;
   struct objpile *next;
index 23454d5a2a9c3759149c2ca4e13bc766c9635170..2464b48e1c902104684794f0d568aaad390bd8a1 100644 (file)
@@ -1,5 +1,5 @@
 #include "prefetch.h"
-#include "prelookup.h"
+#include "altprelookup.h"
 #include "sockpool.h"
 #include "gCollect.h"
 
@@ -400,7 +400,6 @@ int getRangePrefetchResponse(int sd, struct readstruct * readbuffer) {
       void * oldptr;
       if((oldptr = prehashSearch(oid)) != NULL) {
         if(((objheader_t *)oldptr)->version < ((objheader_t *)ptr)->version) {
-          //prehashRemove(oid);
           prehashInsert(oid, ptr);
         }
       } else {
@@ -410,9 +409,6 @@ int getRangePrefetchResponse(int sd, struct readstruct * readbuffer) {
       size-=objsize;
     }
 
-    pthread_mutex_lock(&pflookup.lock);
-    pthread_cond_broadcast(&pflookup.cond);
-    pthread_mutex_unlock(&pflookup.lock);
   } else if(control == OBJECT_NOT_FOUND) {
     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
   } else {
index 2f9c1105a1775d305996557391b0d940e78113c7..0c2d9c6af248fbc93844ea0679be9494e66be6f1 100644 (file)
@@ -2,12 +2,14 @@
 #include "addPrefetchEnhance.h"
 #include <signal.h>
 #include <fcntl.h>
+#include <sys/utsname.h>
 
 extern int numTransAbort;
 extern int numTransCommit;
 extern int nchashSearch;
 extern int nmhashSearch;
 extern int nprehashSearch;
+extern int ndirtyCacheObj;
 extern int nRemoteSend;
 extern int nSoftAbort;
 extern int bytesSent;
@@ -22,10 +24,28 @@ extern pfcstats_t *evalPrefetch;
 
 void transStatsHandler(int sig, siginfo_t* info, void *context) {
 #ifdef TRANSSTATS
-  FILE *fp;
-  if ((fp = fopen("/tmp/client_stats.txt", "a+")) == NULL) {
+  char filepath[200], exectime[10]; 
+  struct utsname buf;
+  FILE *fp, *envfp;
+
+  if ((envfp = fopen("/home/adash/.tmpenvs", "r")) == NULL) {
+    fprintf(stderr, "Error opening file .tmpenvfs");
+    exit(-1);
+  }
+  memset(filepath, 0, 200);
+  fscanf(envfp, "%s\n", filepath);
+  uname(&buf);
+  strncat(filepath + strlen(filepath), buf.nodename, 4);
+  strcat(filepath, (const char *) ".txt");
+
+  memset(exectime, 0, 10);
+  fscanf(envfp, "%s\n", exectime);
+  fclose(envfp);
+
+  if ((fp = fopen(filepath, "a+")) == NULL) {
     exit(-1);
   }
+
   fprintf(fp, "******  Transaction Stats   ******\n");
   fprintf(fp, "myIpAddr = %x\n", myIpAddr);
   fprintf(fp, "numTransAbort = %d\n", numTransAbort);
@@ -33,6 +53,7 @@ void transStatsHandler(int sig, siginfo_t* info, void *context) {
   fprintf(fp, "nchashSearch = %d\n", nchashSearch);
   fprintf(fp, "nmhashSearch = %d\n", nmhashSearch);
   fprintf(fp, "nprehashSearch = %d\n", nprehashSearch);
+  fprintf(fp, "ndirtyCacheObj = %d\n", ndirtyCacheObj);
   fprintf(fp, "nRemoteReadSend = %d\n", nRemoteSend);
   fprintf(fp, "nSoftAbort = %d\n", nSoftAbort);
   fprintf(fp, "bytesSent = %d\n", bytesSent);
@@ -40,6 +61,7 @@ void transStatsHandler(int sig, siginfo_t* info, void *context) {
   fprintf(fp, "totalObjSize= %d\n", totalObjSize);
   fprintf(fp, "sendRemoteReq= %d\n", sendRemoteReq);
   fprintf(fp, "getResponse= %d\n", getResponse);
+  fprintf(fp, "executionTime = %s\n", exectime);
   fprintf(fp, "**********************************\n");
   fflush(fp);
   fclose(fp);
@@ -57,6 +79,7 @@ void transStatsHandler(int sig, siginfo_t* info, void *context) {
   printf("nchashSearch = %d\n", nchashSearch);
   printf("nmhashSearch = %d\n", nmhashSearch);
   printf("nprehashSearch = %d\n", nprehashSearch);
+  printf("ndirtyCacheObj = %d\n", ndirtyCacheObj);
   printf("nRemoteReadSend = %d\n", nRemoteSend);
   printf("nSoftAbort = %d\n", nSoftAbort);
   printf("bytesSent = %d\n", bytesSent);
index 4df4dc435bcb38370528444022ca0d579a1fcf2c..d2614dbd22ca4c5aab9d2fa8cd82228c237e78e7 100644 (file)
@@ -78,12 +78,15 @@ unsigned int notifyhashInsert(unsigned int tid, notifydata_t *ndata) {
     // Insert at the first position in the hashtable
     ptr[index].threadid = tid;
     ptr[index].ndata = ndata;
+    nlookup.numelements++;
   } else {
     tmp = &ptr[index];
     while(tmp != NULL) {
       if(tmp->threadid == tid) {
        isFound = 1;
        tmp->ndata = ndata;
+    pthread_mutex_unlock(&nlookup.locktable);
+    return 0;
       }
       tmp = tmp->next;
     }
@@ -97,6 +100,7 @@ unsigned int notifyhashInsert(unsigned int tid, notifydata_t *ndata) {
       node->ndata = ndata;
       node->next = ptr[index].next;
       ptr[index].next = node;
+      nlookup.numelements++;
     }
   }
   pthread_mutex_unlock(&nlookup.locktable);
index 9007bc7e975311bade05455c467dca8977778cc3..11e279845af16e359714c39c2f06c7f53129519e 100644 (file)
@@ -1,10 +1,11 @@
 #include "dstm.h"
+#include "debugmacro.h"
 #include "ip.h"
 #include "machinepile.h"
-#include "mlookup.h"
+#include "altmlookup.h"
 #include "llookup.h"
 #include "plookup.h"
-#include "prelookup.h"
+#include "altprelookup.h"
 #include "threadnotify.h"
 #include "queue.h"
 #include "addUdpEnhance.h"
 #define NUM_THREADS 1
 #define CONFIG_FILENAME "dstm.conf"
 
+//#define LOGEVENTS //turn on Logging events
+#ifdef LOGEVENTS
+char bigarray[16*1024*1024];
+int bigindex=0;
+#define LOGEVENT(x) { \
+    int tmp=bigindex++;                                \
+    bigarray[tmp]=x;                           \
+  }
+#else
+#define LOGEVENT(x)
+#endif
+
+//#define LOGTIMES
+#ifdef LOGTIMES
+char bigarray1[6*1024*1024];
+unsigned int bigarray2[6*1024*1024];
+unsigned int bigarray3[6*1024*1024];
+long long bigarray4[6*1024*1024];
+int bigarray5[6*1024*1024];
+int bigindex1=0;
+#define LOGTIME(x,y,z,a,b) {\
+  int tmp=bigindex1; \
+  bigarray1[tmp]=x; \
+  bigarray2[tmp]=y; \
+  bigarray3[tmp]=z; \
+  bigarray4[tmp]=a; \
+  bigarray5[tmp]=b; \
+  bigindex1++; \
+}
+#else
+#define LOGTIME(x,y,z,a,b)
+#endif
+
 /* Thread transaction variables */
 
 __thread objstr_t *t_cache;
@@ -32,6 +66,7 @@ __thread int t_abort;
 __thread jmp_buf aborttrans;
 #endif
 
+int globalid=0; /* This variable is a unique global identifier for a sendPrefetch request */
 
 /* Global Variables */
 extern int classsize[];
@@ -67,6 +102,7 @@ int numTransAbort = 0;
 int nchashSearch = 0;
 int nmhashSearch = 0;
 int nprehashSearch = 0;
+int ndirtyCacheObj = 0;
 int nRemoteSend = 0;
 int nSoftAbort = 0;
 int bytesSent = 0;
@@ -79,17 +115,7 @@ void printhex(unsigned char *, int);
 plistnode_t *createPiles();
 plistnode_t *sortPiles(plistnode_t *pileptr);
 
-//#define LOGEVENTS
-#ifdef LOGEVENTS
-char bigarray[16*1024*1024];
-int bigindex=0;
-#define LOGEVENT(x) { \
-    int tmp=bigindex++;                                \
-    bigarray[tmp]=x;                           \
-  }
-#else
-#define LOGEVENT(x)
-#endif
+
 
 /*******************************
 * Send and Recv function calls
@@ -289,7 +315,7 @@ inline int findmax(int *array, int arraylength) {
 }
 
 //#define INLINEPREFETCH
-#define PREFTHRESHOLD 4
+#define PREFTHRESHOLD 0
 
 /* This function is a prefetch call generated by the compiler that
  * populates the shared primary prefetch queue*/
@@ -332,23 +358,24 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof
   int numpref=numavailable();
   attempted=1;
 
-  if (node==NULL && numpref!=0 || numpref==PREFTHRESHOLD) {
+  if (node==NULL && numpref!=0 || numpref>=PREFTHRESHOLD) {
     node=gettail();
-    prefetchpile_t *pilehead = foundLocal(node,numpref);
+    prefetchpile_t *pilehead = foundLocal(node,numpref,siteid);
     if (pilehead!=NULL) {
       // Get sock from shared pool
       
       /* Send  Prefetch Request */
       prefetchpile_t *ptr = pilehead;
       while(ptr != NULL) {
-       int sd = getSock2(transPrefetchSockPool, ptr->mid);
-       sendPrefetchReq(ptr, sd);
-       ptr = ptr->next;
+        globalid++;
+        int sd = getSock2(transPrefetchSockPool, ptr->mid);
+        sendPrefetchReq(ptr, sd, globalid);
+        ptr = ptr->next;
       }
       
       mcdealloc(pilehead);
-      resetqueue();
     }
+    resetqueue();
   }//end do prefetch if condition
   } while(node==NULL);
 #else
@@ -584,7 +611,6 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
 #ifdef ABORTREADERS
   if (t_abort) {
     //abort this transaction
-    //printf("ABORTING\n");
     removetransactionhash();
     objstrDelete(t_cache);
     t_chashDelete();
@@ -613,6 +639,12 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
   } else {
 #ifdef CACHE
     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+      if(STATUS(tmp) & DIRTY) {
+#ifdef TRANSSTATS
+        ndirtyCacheObj++;
+#endif
+        goto remoteread;
+      }
 #ifdef TRANSSTATS
       nprehashSearch++;
 #endif
@@ -629,6 +661,7 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
       return objcopy;
 #endif
     }
+remoteread:
 #endif
     /* Get the object from the remote location */
     if((machinenumber = lhashSearch(oid)) == 0) {
@@ -660,13 +693,7 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
       pthread_mutex_unlock(&prefetchcache_mutex);
       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
       //make an entry in prefetch lookup hashtable
-      void *oldptr;
-      if((oldptr = prehashSearch(oid)) != NULL) {
-        prehashRemove(oid);
-        prehashInsert(oid, headerObj);
-      } else {
-        prehashInsert(oid, headerObj);
-      }
+      prehashInsert(oid, headerObj);
       LOGEVENT('B');
 #endif
       return &objcopy[1];
@@ -681,6 +708,7 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
 /* This function finds the location of the objects involved in a transaction
  * and returns the pointer to the object if found in a remote location */
 __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
+//DEBUG: __attribute__((pure)) objheader_t *transRead2(unsigned int oid, char tmpptr[]) {
   unsigned int machinenumber;
   objheader_t *tmp, *objheader;
   objheader_t *objcopy;
@@ -689,7 +717,6 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
 #ifdef ABORTREADERS
   if (t_abort) {
     //abort this transaction
-    //printf("ABORTING\n");
     removetransactionhash();
     objstrDelete(t_cache);
     t_chashDelete();
@@ -718,6 +745,12 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
   } else {
 #ifdef CACHE
     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+      if(STATUS(tmp) & DIRTY) {
+#ifdef TRANSSTATS
+        ndirtyCacheObj++;
+#endif
+        goto remoteread;
+      }
 #ifdef TRANSSTATS
       LOGEVENT('P')
       nprehashSearch++;
@@ -727,6 +760,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
       size+=sizeof(objheader_t);
       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
       memcpy(objcopy, tmp, size);
+      LOGOIDTYPE("P",oid, TYPE(objcopy), myrdtsc());
       /* Insert into cache's lookup table */
       t_chashInsert(OID(tmp), objcopy);
 #ifdef COMPILER
@@ -735,6 +769,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
       return objcopy;
 #endif
     }
+remoteread:
 #endif
     /* Get the object from the remote location */
     if((machinenumber = lhashSearch(oid)) == 0) {
@@ -742,18 +777,18 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
       return NULL;
     }
     objcopy = getRemoteObj(machinenumber, oid);
-
-    if(objcopy == NULL) {
-      printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
-      return NULL;
-    } else {
 #ifdef TRANSSTATS
-
       LOGEVENT('R');
       nRemoteSend++;
 #endif
+
+    if(objcopy == NULL) {
+      printf("Error: Object %u not found in Remote location %s, %d\n", oid,__FILE__, __LINE__);
+      return NULL;
+    } else {
 #ifdef COMPILER
 #ifdef CACHE
+      LOGOIDTYPE("RR",oid, TYPE(objcopy),myrdtsc());
       //Copy object to prefetch cache
       pthread_mutex_lock(&prefetchcache_mutex);
       objheader_t *headerObj;
@@ -768,13 +803,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
       pthread_mutex_unlock(&prefetchcache_mutex);
       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
       //make an entry in prefetch lookup hashtable
-      void *oldptr;
-      if((oldptr = prehashSearch(oid)) != NULL) {
-        prehashRemove(oid);
-        prehashInsert(oid, headerObj);
-      } else {
-        prehashInsert(oid, headerObj);
-      }
+      prehashInsert(oid, headerObj);
       LOGEVENT('B');
 #endif
       return &objcopy[1];
@@ -892,12 +921,16 @@ int transCommit() {
   }
 #endif
 
+#ifdef LOGTIMES
+  int jjj;
+  for(jjj=0; jjj<bigindex1; jjj++) {
+    printf("[%c %u %u %lld %d]\n", bigarray1[jjj], bigarray2[jjj], bigarray3[jjj], bigarray4[jjj], bigarray5[jjj]);
+  }
+#endif
+
 #ifdef ABORTREADERS
   if (t_abort) {
     //abort this transaction
-    /* Debug
-     * printf("ABORTING TRANSACTION AT COMMIT\n");
-     */
     removetransactionhash();
     objstrDelete(t_cache);
     t_chashDelete();
@@ -930,12 +963,12 @@ int transCommit() {
 
     /* Create a socket and getReplyCtrl array, initialize */
     int socklist[pilecount];
+    char getReplyCtrl[pilecount];
     int loopcount;
-    for(loopcount = 0 ; loopcount < pilecount; loopcount++)
+    for(loopcount = 0 ; loopcount < pilecount; loopcount++){
       socklist[loopcount] = 0;
-    char getReplyCtrl[pilecount];
-    for(loopcount = 0 ; loopcount < pilecount; loopcount++)
       getReplyCtrl[loopcount] = 0;
+    }
 
     /* Process each machine pile */
     int sockindex = 0;
@@ -1045,13 +1078,7 @@ int transCommit() {
            GETSIZE(size, header);
            size += sizeof(objheader_t);
            //make an entry in prefetch hash table
-           void *oldptr;
-           if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
-             prehashRemove(oidToPrefetch);
-             prehashInsert(oidToPrefetch, header);
-           } else {
-             prehashInsert(oidToPrefetch, header);
-           }
+        prehashInsert(oidToPrefetch, header);
         LOGEVENT('E');
            length = length - size;
            offset += size;
@@ -1137,7 +1164,6 @@ int transCommit() {
   } while (treplyretry);
 
   if(finalResponse == TRANS_ABORT) {
-    //printf("Aborting trans\n");
 #ifdef TRANSSTATS
     LOGEVENT('A');
     numTransAbort++;
@@ -1268,7 +1294,7 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
     control = getReplyCtrl[i];
     switch(control) {
     default:
-      printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
+      printf("Participant sent unknown message %d in %s, %d\n", control, __FILE__, __LINE__);
 
       /* treat as disagree, pass thru */
     case TRANS_DISAGREE:
@@ -1291,7 +1317,7 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
     return TRANS_ABORT;
 #ifdef CACHE
     /* clear objects from prefetch cache */
-    cleanPCache();
+    //cleanPCache();
 #endif
   } else if(transagree == pilecount) {
     /* Send Commit */
@@ -1369,7 +1395,6 @@ void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigne
 
        //Keep track of what is locked
        oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
-       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
        return;
       }
     } else { //A lock is acquired some place else
@@ -1379,7 +1404,6 @@ void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigne
        (*v_nomatch)++;
        /* Send TRANS_DISAGREE to Coordinator */
        *getReplyCtrl = TRANS_DISAGREE;
-       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
        return;
       }
     }
@@ -1409,7 +1433,6 @@ void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsign
        *getReplyCtrl = TRANS_DISAGREE;
        //Keep track of what is locked
        oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
-       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
        return;
       }
     } else { //Has reached max number of readers or some other transaction
@@ -1420,7 +1443,6 @@ void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsign
        (*v_nomatch)++;
        /* Send TRANS_DISAGREE to Coordinator */
        *getReplyCtrl = TRANS_DISAGREE;
-       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
        return;
       }
     }
@@ -1537,7 +1559,7 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
   return 0;
 }
 
-prefetchpile_t *foundLocal(char *ptr, int numprefetches) {
+prefetchpile_t *foundLocal(char *ptr, int numprefetches, int mysiteid) {
   int i;
   int j;
   prefetchpile_t * head=NULL;
@@ -1556,25 +1578,38 @@ prefetchpile_t *foundLocal(char *ptr, int numprefetches) {
       unsigned int oid=oidarray[i];
       int newbase;
       int machinenum;
-      
-      if (oid==0)
+      int countInvalidObj=0;
+
+      if (oid==0) {
+       numLocal++;
        continue;
+      }
       //Look up fields locally
+      int isLastOffset=0;
+      if(endindex==0)
+          isLastOffset=1;
       for(newbase=baseindex; newbase<endindex; newbase++) {
-       if (!lookupObject(&oid, arryfields[newbase]))
+        if(newbase==(endindex-1))
+          isLastOffset=1;
+       if (!lookupObject(&oid,arryfields[newbase],&countInvalidObj)) {
          break;
+       }
        //Ended in a null pointer...
-       if (oid==0)
+       if (oid==0) {
+         numLocal++;
          goto tuple;
+       }
       }
+
       //Entire prefetch is local
-      if (newbase==endindex&&checkoid(oid)) {
+      if (newbase==endindex&&checkoid(oid,isLastOffset)) {
        numLocal++;
        goto tuple;
       }
+
       //Add to remote requests
       machinenum=lhashSearch(oid);
-      insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
+      insertPile(machinenum, oid, siteid,endindex-newbase, &arryfields[newbase], &head);
     tuple:
       ;
     }
@@ -1587,12 +1622,15 @@ prefetchpile_t *foundLocal(char *ptr, int numprefetches) {
   return head;
 }
 
-int checkoid(unsigned int oid) {
+int checkoid(unsigned int oid, int isLastOffset) {
   objheader_t *header;
   if ((header=mhashSearch(oid))!=NULL) {
     //Found on machine
     return 1;
   } else if ((header=prehashSearch(oid))!=NULL) {
+    if((STATUS(header) & DIRTY) && isLastOffset) {
+      return 0;
+    }
     //Found in cache
     return 1;
   } else {
@@ -1600,14 +1638,20 @@ int checkoid(unsigned int oid) {
   }
 }
 
-int lookupObject(unsigned int * oid, short offset) {
+int lookupObject(unsigned int * oid, short offset, int *countInvalidObj) {
   objheader_t *header;
   if ((header=mhashSearch(*oid))!=NULL) {
     //Found on machine
     ;
   } else if ((header=prehashSearch(*oid))!=NULL) {
     //Found in cache
-    ;
+    if(STATUS(header) & DIRTY) {//Read an oid that is an old entry in the cache;
+      //only once because later old entries may still cause unnecessary roundtrips during prefetching
+      (*countInvalidObj)+=1;
+      if(*countInvalidObj > 1) {
+        return 0;
+      }
+    }
   } else {
     return 0;
   }
@@ -1639,7 +1683,7 @@ void *transPrefetch(void *t) {
     /* Check if the tuples are found locally, if yes then reduce them further*/
     /* and group requests by remote machine ids by calling the makePreGroups() */
     int count=numavailable();
-    prefetchpile_t *pilehead = foundLocal(node, count);
+    prefetchpile_t *pilehead = foundLocal(node, count, 0);
 
     if (pilehead!=NULL) {
       // Get sock from shared pool
@@ -1647,9 +1691,10 @@ void *transPrefetch(void *t) {
       /* Send  Prefetch Request */
       prefetchpile_t *ptr = pilehead;
       while(ptr != NULL) {
-       int sd = getSock2(transPrefetchSockPool, ptr->mid);
-       sendPrefetchReq(ptr, sd);
-       ptr = ptr->next;
+        globalid++;
+        int sd = getSock2(transPrefetchSockPool, ptr->mid);
+        sendPrefetchReq(ptr, sd,globalid);
+        ptr = ptr->next;
       }
 
       /* Release socket */
@@ -1692,20 +1737,26 @@ void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
   return;
 }
 
-void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
+/**
+ * parameters: mcpilenode -> pile node to traverse to assemble pref requests
+ * sd -> socket id
+ * gid -> global identifier for each prefetch request sent, starts with 0
+ **/
+void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd, int gid) {
   int len, endpair;
   char control;
   objpile_t *tmp;
   struct writestruct writebuffer;
   writebuffer.offset=0;
 
+
   /* Send TRANS_PREFETCH control message */
   int first=1;
 
   /* Send Oids and offsets in pairs */
   tmp = mcpilenode->objpiles;
   while(tmp != NULL) {
-    len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+    len = sizeof(int)+sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
     char oidnoffset[len+5];
     char *buf=oidnoffset;
     if (first) {
@@ -1716,12 +1767,15 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
     *((int*)buf) = tmp->numoffset;
     buf+=sizeof(int);
     *((unsigned int *)buf) = tmp->oid;
+    LOGOIDTYPE("S",tmp->oid,tmp->numoffset,myrdtsc());
 #ifdef TRANSSTATS
     sendRemoteReq++;
 #endif
     buf+=sizeof(unsigned int);
     *((unsigned int *)buf) = myIpAddr;
-    buf += sizeof(unsigned int);
+    buf+= sizeof(unsigned int);
+    *((int*)buf) = gid;
+    buf+=sizeof(int);
     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
     tmp = tmp->next;
     if (tmp==NULL) {
@@ -1729,17 +1783,18 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
       len+=sizeof(int);
     }
     if (tmp!=NULL)
-      send_buf(sd, & writebuffer, oidnoffset, len);
+      send_buf(sd, &writebuffer, oidnoffset, len);
     else
-      forcesend_buf(sd, & writebuffer, oidnoffset, len);
+      forcesend_buf(sd, &writebuffer, oidnoffset, len);
   }
-
+  LOGOIDTYPE("SREQ",0,0,myrdtsc());
   LOGEVENT('S');
+  LOGTIME('S',0,0,myrdtsc(),gid); //after sending
   return;
 }
 
 int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
-  int length = 0, size = 0;
+  int gid,length = 0, size = 0;
   char control;
   unsigned int oid;
   void *modptr, *oldptr;
@@ -1748,14 +1803,17 @@ int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
   size = length - sizeof(int);
   char recvbuffer[size];
 #ifdef TRANSSTATS
-    getResponse++;
-    LOGEVENT('Z');
+  getResponse++;
+  LOGEVENT('Z');
+  LOGTIME('K',0,0, myrdtsc(),0); //log time after first recv
 #endif
-    recv_data_buf(sd, readbuffer, recvbuffer, size);
+  recv_data_buf(sd, readbuffer, recvbuffer, size);
   control = *((char *) recvbuffer);
   if(control == OBJECT_FOUND) {
     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
-    size = size - (sizeof(char) + sizeof(unsigned int));
+    gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int)));
+    LOGTIME('G',oid,0, myrdtsc(),gid); //log time after first recv
+    size = size - (sizeof(char) + sizeof(unsigned int) + sizeof(int));
     pthread_mutex_lock(&prefetchcache_mutex);
     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
@@ -1763,28 +1821,27 @@ int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
       return -1;
     }
     pthread_mutex_unlock(&prefetchcache_mutex);
-    memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
+    memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int)+sizeof(int), size);
     STATUS(modptr)=0;
 
+
     /* Insert the oid and its address into the prefetch hash lookup table */
     /* Do a version comparison if the oid exists */
     if((oldptr = prehashSearch(oid)) != NULL) {
       /* If older version then update with new object ptr */
       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
-       prehashRemove(oid);
-       prehashInsert(oid, modptr);
+        prehashInsert(oid, modptr);
       }
     } else { /* Else add the object ptr to hash table*/
       prehashInsert(oid, modptr);
     }
-    /* Lock the Prefetch Cache look up table*/
-    pthread_mutex_lock(&pflookup.lock);
-    /* Broadcast signal on prefetch cache condition variable */
-    pthread_cond_broadcast(&pflookup.cond);
-    /* Unlock the Prefetch Cache look up table*/
-    pthread_mutex_unlock(&pflookup.lock);
+    LOGOIDTYPE("GR",oid, TYPE(modptr),myrdtsc());
+    LOGTIME('Z',oid, TYPE(modptr), myrdtsc(),gid); //log time after copying it into the prefetch cache
   } else if(control == OBJECT_NOT_FOUND) {
     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+    gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int)));
+    LOGOIDTYPE("NF",oid,0,myrdtsc());
+    LOGTIME('F',oid, 0, myrdtsc(),gid); //log time after copying it into the prefetch cache
     /* TODO: For each object not found query DHT for new location and retrieve the object */
     /* Throw an error */
     //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
index 5160bc9f02e34d88f2ffde0c05b1c5dac7de1787..2c404c743693a8c4fc7328879bb10ffca631b8c3 100644 (file)
@@ -30,7 +30,7 @@
 
 #define NUMPTRS 100
 
-#define INITIALHEAPSIZE 5000*1024*1024L
+#define INITIALHEAPSIZE 256*1024*1024L
 #define GCPOINT(x) ((INTPTR)((x)*0.99))
 /* This define takes in how full the heap is initially and returns a new heap size to use */
 #define HEAPSIZE(x,y) ((INTPTR)(x+y))*2
index 0eacaa6ff0707212479c21fa4b00928716e2aad8..7a339cf36431b3dcd24aeae0b76a23bd3d7ec865 100644 (file)
@@ -13,7 +13,7 @@
 #include "DSTM/interface_recovery/prelookup.h"
 #else
 #include "DSTM/interface/dstm.h"
-#include "DSTM/interface/prelookup.h"
+#include "DSTM/interface/altprelookup.h"
 #include "DSTM/interface/prefetch.h"
 #endif
 #endif