grab locks on sockpool for all communications involving cache and prefetch
authoradash <adash>
Fri, 16 Apr 2010 00:18:03 +0000 (00:18 +0000)
committeradash <adash>
Fri, 16 Apr 2010 00:18:03 +0000 (00:18 +0000)
Robust/src/Runtime/DSTM/interface_recovery/dstm.h
Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/trans.c

index fd0bbd3654fbd68c1a269472741097b04fdfcfb7..3349b21a018afd027765bc9247054e463db224d9 100644 (file)
@@ -86,8 +86,8 @@
 //Prefetch tuning paramters
 //#define RETRYINTERVAL  20 //N (For Em3d, SOR, Moldyn benchmarks)
 //#define SHUTDOWNINTERVAL  3  //M
-#define RETRYINTERVAL  75 //N  (For MatrixMultiply, 2DFFT benchmarks)
-#define SHUTDOWNINTERVAL  1  //M
+#define RETRYINTERVAL  60 //N  (For MatrixMultiply, 2DFFT benchmarks)
+#define SHUTDOWNINTERVAL  10  //M
 #define NUM_TRY_TO_COMMIT 2
 #define MEM_ALLOC_THRESHOLD 20485760//20MB
 
index 8e82efcbff209663739b5df7c18613ff317b1489..163a16843c13a75249a6eacdf7d293f16eb7dc64 100644 (file)
@@ -1626,7 +1626,11 @@ int prefetchReq(int acceptfd) {
        freeSockWithLock(transPResponseSocketPool, mid, sd);
       }
       mid=oidmid.mid;
-      sd = getSockWithLock(transPResponseSocketPool, mid);
+      //sd = getSockWithLock(transPResponseSocketPool, mid);
+      if((sd = getSockWithLock(transPResponseSocketPool, mid)) < 0) {
+        printf("%s() Socket Create Error at %s, %d\n", __func__, __FILE__, __LINE__);
+        return -1;
+      }
     }
     short offsetarry[numoffset];
     recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short));
index 38954dab2500919a1625549310f40e0d48c6a458..edc9c1bec777b7496ea5e1fc947407a8013de13c 100644 (file)
@@ -463,8 +463,14 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof
       /* Send  Prefetch Request */
       prefetchpile_t *ptr = pilehead;
       while(ptr != NULL) {
-        int sd = getSock2(transPrefetchSockPool, ptr->mid);
+        //int sd = getSock2(transPrefetchSockPool, ptr->mid);
+        int sd;
+        if((sd = getSockWithLock(transPrefetchSockPool, ptr->mid)) < 0) {
+          printf("%s() Socket Create Error at %s, %d\n", __func__, __FILE__, __LINE__);
+          return;
+        }
         sendPrefetchReq(ptr, sd);
+        freeSockWithLock(transPrefetchSockPool, ptr->mid, sd);
         ptr = ptr->next;
       }
       
@@ -1663,7 +1669,12 @@ void *getRemoteObj(unsigned int mnum, unsigned int oid) {
   objheader_t *h;
   void *objcopy = NULL;
 
-  int sd = getSock2(transReadSockPool, mnum);
+  //int sd = getSock2(transReadSockPool, mnum);
+  int sd;
+  if((sd = getSockWithLock(transReadSockPool, mnum)) < 0) {
+    printf("%s() Socket Create Error at %s, %d\n", __func__, __FILE__, __LINE__);
+    return NULL;
+  }
   char readrequest[sizeof(char)+sizeof(unsigned int)];
   readrequest[0] = READ_REQUEST;
   *((unsigned int *)(&readrequest[1])) = oid;
@@ -1700,6 +1711,7 @@ void *getRemoteObj(unsigned int mnum, unsigned int oid) {
   totalObjSize += size;
 #endif
        }
+  freeSockWithLock(transReadSockPool, mnum, sd);
        return objcopy;
 }
 
@@ -2840,6 +2852,7 @@ unsigned short getObjType(unsigned int oid) {
   objheader_t *objheader;
   unsigned short numoffset[] ={0};
   short fieldoffset[] ={};
+  int sd=0;
 
   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
 #ifdef CACHE
@@ -2851,10 +2864,18 @@ unsigned short getObjType(unsigned int oid) {
     unsigned int machineID;
     static int flipBit = 0;
     machineID = (flipBit)?(getPrimaryMachine(mid)):(getBackupMachine(mid));
-    int sd = getSock2(transReadSockPool, machineID);
+    //int sd = getSock2(transReadSockPool, machineID);
+    if((sd = getSockWithLock(transReadSockPool, machineID)) < 0) {
+      printf("%s() Socket Create Error at %s, %d\n", __func__, __FILE__, __LINE__);
+      return 0;
+    }
 #else
     unsigned int mid = lhashSearch(oid);
-    int sd = getSock2(transReadSockPool, mid);
+    //int sd = getSock2(transReadSockPool, mid);
+    if((sd = getSockWithLock(transReadSockPool, mid)) < 0) {
+      printf("%s() Socket Create Error at %s, %d\n", __func__, __FILE__, __LINE__);
+      return 0;
+    }
 #endif
     char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
     remotereadrequest[0] = READ_REQUEST;
@@ -2881,18 +2902,34 @@ unsigned short getObjType(unsigned int oid) {
       pthread_mutex_unlock(&prefetchcache_mutex);
       recv_data(sd, objheader, size);
       prehashInsert(oid, objheader);
+#ifdef RECOVERY
+      freeSockWithLock(transReadSockPool, machineID, sd);
+#else
+      freeSockWithLock(transReadSockPool, mid, sd);
       return TYPE(objheader);
+#endif
 #else
       char *buffer;
       if((buffer = calloc(1, size)) == NULL) {
        printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
        fflush(stdout);
+#ifdef RECOVERY
+    freeSockWithLock(transReadSockPool, machineID, sd);
+#else
+       freeSockWithLock(transReadSockPool, mid, sd);
+#endif
        return 0;
       }
       recv_data(sd, buffer, size);
       objheader = (objheader_t *)buffer;
       unsigned short type = TYPE(objheader);
       free(buffer);
+#ifdef RECOVERY
+    freeSockWithLock(transReadSockPool, machineID, sd);
+#else
+       freeSockWithLock(transReadSockPool, mid, sd);
+#endif
+
       return type;
 #endif
     }
@@ -4060,6 +4097,7 @@ void printRecoveryStat() {
   }
   printf("**************************\n\n");
   fflush(stdout);
+  fflush(stdout);
 #else
   printf("No stat\n");
 #endif