modifies getTypeObj()
authorjihoonl <jihoonl>
Sat, 3 Oct 2009 07:45:04 +0000 (07:45 +0000)
committerjihoonl <jihoonl>
Sat, 3 Oct 2009 07:45:04 +0000 (07:45 +0000)
fix in sortPiles()

Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/trans.c

index 59e4aea609f7bb82232e604fcec93bcb4f4e202e..3025d42e42ccf389e08dd4118ab569a37eaf6554 100644 (file)
@@ -33,7 +33,6 @@ extern unsigned int *hostIpAddrs;
 #ifdef RECOVERY
 extern unsigned int *locateObjHosts;
 extern int *liveHosts;
-extern int liveHostsValid;
 extern int numLiveHostsInSystem;
 int clearNotifyListFlag;
 #endif
@@ -321,6 +320,7 @@ void *dstmAccept(void *acceptfd) {
                                recv_data((int)acceptfd, &oid, sizeof(unsigned int));
                                while((srcObj = mhashSearch(oid)) == NULL) {
                                        int ret;
+//          printf("HERE!!\n");
                                        if((ret = sched_yield()) != 0) {
                                                printf("%s(): error no %d in thread yield\n", __func__, errno);
                                        }
@@ -507,7 +507,6 @@ void *dstmAccept(void *acceptfd) {
 #ifdef DEBUG
         printf("control -> RESPOND_LIVE\n");
 #endif
-                               liveHostsValid = 0;
                                ctrl = LIVE;
                                send_data((int)acceptfd, &ctrl, sizeof(ctrl));
 #ifdef DEBUG
@@ -563,7 +562,6 @@ void *dstmAccept(void *acceptfd) {
                          recv_data((int)acceptfd, liveHosts, sizeof(int)*numHostsInSystem);
                                recv_data((int)acceptfd, locateObjHosts, sizeof(unsigned int)*numHostsInSystem*2);
                                pthread_mutex_unlock(&liveHosts_mutex);
-                               liveHostsValid = 1;
                                numLiveHostsInSystem = getNumLiveHostsInSystem();
 #ifdef DEBUG
                                printHostsStatus();
@@ -1406,9 +1404,6 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
       printf("Error: mhashsearch returns NULL at dstmserver.c %d\n", __LINE__);
                        return 1;
 #else
-#ifdef DEBUG
-                       printf("DEBUG->*backup* i:%d, nummod:%d\n", i, nummod);
-#endif
                        header = (objheader_t *)(modptr+offset);
                        header->version += 1;
                        header->isBackup = 1;
index d91bba49a0951b41afcd91db70788b029b06b810..b8f69dd2f04a57ed96ee4870797369e2ea57b89d 100644 (file)
@@ -1,3 +1,5 @@
+#include "dstm.h"
+#include "ip.h"
 #include "machinepile.h"
 #include "mlookup.h"
 #include "llookup.h"
@@ -53,9 +55,6 @@ unsigned int *hostIpAddrs;
 int sizeOfHostArray;
 int numHostsInSystem;
 int myIndexInHostArray;
-int waitThreadMid;
-unsigned int waitThreadID; 
-
 unsigned int oidsPerBlock;
 unsigned int oidMin;
 unsigned int oidMax;
@@ -79,25 +78,28 @@ int nSoftAbort = 0;
 int bytesSent = 0;
 int bytesRecv = 0;
 int totalObjSize = 0;
+int sendRemoteReq = 0;
+int getResponse = 0;
 
 #ifdef RECOVERY
 /***********************************
  * Global variables for Duplication
  ***********************************/
 int *liveHosts;
-int liveHostsValid;
 int numLiveHostsInSystem;      
-int flipBit;                                                           // Used to distribute requests between primary and backup evenly
 unsigned int *locateObjHosts;
-#endif
+
+
+/* variables to clear dead threads */
+int waitThreadMid;            
+unsigned int waitThreadID; 
 
 int transRetryFlag;
-unsigned int transIDMax;
 unsigned int transIDMin;
-unsigned int transIDIndex;
-char ip[16];
+unsigned int transIDMax;
+
+char ip[16];      // for debugging purpose
 
-#ifdef RECOVERY
 /******************************
  * Global variables for Paxos
  ******************************/
@@ -115,6 +117,17 @@ void printhex(unsigned char *, int);
 plistnode_t *createPiles();
 plistnode_t *sortPiles(plistnode_t *pileptr);
 
+#ifdef LOGEVENTS
+char bigarray[16*1024*1024];
+int bigindex=0;
+#define LOGEVENT(x) { \
+    int tmp=bigindex++;         \
+    bigarray[tmp]=x;          \
+  }
+#else
+#define LOGEVENT(x)
+#endif
+
 /*******************************
 * Send and Recv function calls
 *******************************/
@@ -124,21 +137,20 @@ int send_data(int fd, void *buf, int buflen) {
   int numbytes;
 
   while (size > 0) {
-
 #ifdef GDBDEBUG
 GDBSEND1:
 #endif
     numbytes = send(fd, buffer, size, 0);
 
-    if( numbytes>0) {
+    if( numbytes > 0) {
       bytesSent += numbytes;
       size -= numbytes;
     }
 #ifdef RECOVERY
-    else if( numbytes < 0) {
+    else if( numbytes < 0) {    
       // Receive returned an error.
       // Analyze underlying cause
-#ifndef DEBUG
+#ifdef DEBUG
       printf("%s -> fd : %d errno = %d %s\n",__func__, fd, errno,strerror(errno));
       fflush(stdout);
 #endif
@@ -155,11 +167,10 @@ GDBSEND1:
         return -1;
       } else {
 #ifdef GDBDEBUG
-      if(errno == 4)
-        goto GDBSEND1;    
+        if(errno == 4)
+          goto GDBSEND1;    
 #endif
 
-
 #ifdef DEBUG
         printf("%s -> Unexpected ERROR!\n",__func__);
 #endif
@@ -169,21 +180,21 @@ GDBSEND1:
     else{
       // Case : numbytes == 0
       // // machine has failed -- this case probably doesn't occur in reality
-      //
-
-
-      
 #ifdef DEBUG
       printf("%s -> SHOULD NOT BE HERE\n",__func__);
 #endif
       return -1;
     }
+#else
+    if(numbytes == -1) {
+      perror("send");
+      exit(0);
+    }
 #endif
   } // close while loop
 #ifdef DEBUG
   printf("%s-> Exiting\n", __func__);
 #endif
-
   return 0; // completed sending data
 }
 
@@ -254,6 +265,11 @@ GDBRECV1:
 #endif
       return -1;
     }
+#else
+    if( numbytes == -1) {
+      perror("recv");
+      exit(0);
+    }
 #endif
   } //close while loop
 #ifdef DEBUG
@@ -281,7 +297,6 @@ int recv_data_errorcode(int fd, void *buf, int buflen) {
       perror("recv_data_errorcode");
       return -1;
     }
-    
     buffer += numbytes;
     size -= numbytes;
   }
@@ -321,10 +336,12 @@ inline int findmax(int *array, int arraylength) {
   return max;
 }
 
+#ifdef RECOVERY
 char* midtoIPString(unsigned int mid){
                midtoIP(mid, ip);
                return ip;
 }
+#endif
 /* This function is a prefetch call generated by the compiler that
  * populates the shared primary prefetch queue*/
 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
@@ -397,6 +414,7 @@ int dstmStartup(const char * option) {
                setLocateObjHosts();
                updateLiveHostsCommit();
                paxos();
+    printHostsStatus();
                if(!allHostsLive()) {
                        printf("Not all hosts live. Exiting.\n");
                        exit(-1);
@@ -692,6 +710,10 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
       /* Insert into cache's lookup table */
       STATUS(objcopy)=0;
       t_chashInsert(OID(objheader), objcopy);
+#ifdef DEBUG
+      printf("%s -> obj type = %d\n",__func__,getObjType(oid));
+      printf("%s -> obj grabbed\n",__func__);
+#endif
 #ifdef COMPILER
       return &objcopy[1];
 #else
@@ -699,8 +721,9 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
 #endif
     } else {
 #ifdef CACHE
-      , TYPE(header)if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+      if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
 #ifdef TRANSSTATS
+      LOGEVENT('P')
       nprehashSearch++;
 #endif
       /* Look up in prefetch cache */
@@ -718,30 +741,29 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
        } 
 #endif
                /* Get the object from the remote location */
-
+    if((machinenumber = lhashSearch(oid)) == 0) {
+      printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
+           return NULL;
+       }
 #ifdef DEBUG
        printf("%s-> Grab from remote machine\n", __func__);
 #endif
 #ifdef RECOVERY
     transRetryFlag = 0;
-    unsigned int mindex = findHost(lhashSearch(oid));
-    machinenumber = locateObjHosts[2*mindex+flipBit];
-  
-    if(numLiveHostsInSystem > 1)
-      flipBit ^= 1;
-    else
-      flipBit = 0;
+
+    unsigned int machinenumber; 
+    static int flipBit = 0;        // Used to distribute requests between primary and backup evenly
+    // either primary or backup machine
+    machinenumber = (flipBit)?getPrimaryMachine(lhashSearch(oid)):getBackupMachine(lhashSearch(oid));
+    flipBit ^= 1;
 
 #ifdef DEBUG
     printf("mindex:%d, oid:%d, machinenumber:%s\n", mindex, oid, midtoIPString(machinenumber));
 #endif
-#else
-    if((machinenumber = lhashSearch(oid)) == 0) {
-      printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
-           return NULL;
-       }
 #endif
+
          objcopy = getRemoteObj(machinenumber, oid);
+
 #ifdef RECOVERY
     if(transRetryFlag) {
       restoreDuplicationState(machinenumber);
@@ -751,13 +773,13 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
       return transRead2(oid);
     }
 #endif
-   }
 
   if(objcopy == NULL) {
          printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
                return NULL;
        } else {
 #ifdef TRANSSTATS
+    LOGEVENT('R');
          nRemoteSend++;
 #endif
 #ifdef COMPILER
@@ -766,9 +788,11 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
                return objcopy;
 #endif
        }
+    }
 #ifdef DEBUG
   printf("%s -> Finished!!\n",__func__);
 #endif
+    
 }
 
 /* This function creates objects in the transaction record */
@@ -793,6 +817,10 @@ objheader_t *transCreateObj(unsigned int size) {
 /* This function creates machine piles based on all machines involved in a
  * transaction commit request */
 plistnode_t *createPiles() {
+
+#ifdef DEBUG
+  printf("%s -> Entering\n",__func__);
+#endif
   int i;
        unsigned int oid;
   plistnode_t *pile = NULL;
@@ -814,47 +842,30 @@ plistnode_t *createPiles() {
 
 #if RECOVERY
       oid = OID(headeraddr);
-#ifdef DEBUG
-                       printf("%s-> oid:%u, version:%d, status:%d, type:%d\n", __func__, OID(headeraddr), headeraddr->version, STATUS(headeraddr), TYPE(headeraddr));
 
-                       if (STATUS(headeraddr) & NEW) {  // new/local object
-                               printf("%s-> new/local object\n", __func__);
-                       } 
-      else if ((mhashSearch(curr->key) != NULL)) {     //local/nonnew
-        if(STATUS(headeraddr) & DIRTY) {       // modified
-                                       printf("%s-> old/local/mod object\n", __func__);
-                               }
-                               else {  //read
-                                       printf("%s-> old/local/read object\n", __func__);
-                               }
-                       } 
-      else if ((machinenum = lhashSearch(curr->key)) != 0) { // remote/nonnew object
-                               if(STATUS(headeraddr) & DIRTY) {                //modified
-                                       printf("%s-> remote/local/mod object\n", __func__);
-                               }
-                               else {  //read
-                                       printf("%s-> remote/local/read object\n", __func__);
-                               }
-                       } 
-      else {
-                               printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
-                               return NULL;
-                       }
-                       unsigned int pmid = getPrimaryMachine(lhashSearch(oid));
-                       unsigned int bmid = getBackupMachine(lhashSearch(oid));
-                       printf("%s-> Primary Machine: [%s], ", __func__, midtoIPString(pmid));
-                       printf("Backup Machine: [%s]\n", midtoIPString(bmid));
-#endif 
-                       int makedirty = 0;
-                       if(STATUS(headeraddr) & DIRTY || STATUS(headeraddr) & NEW) {
-                               makedirty = 1;
-                       }
-                       pile = pInsert(pile, headeraddr, getPrimaryMachine(lhashSearch(oid)), c_numelements);
-//problem here
-                       if(makedirty) { 
-                               STATUS(headeraddr) = DIRTY;
-                       }
-                       pile = pInsert(pile, headeraddr, getBackupMachine(lhashSearch(oid)), c_numelements);
+                         int makedirty = 0;
+        unsigned int mid;
+
+        mid = lhashSearch(oid);
+
+        // if the obj is dirty or new
+                       if(STATUS(headeraddr) & DIRTY || STATUS(headeraddr) & NEW) {
+          // set flag for backup machine
+                               makedirty = 1;
+                   }
+
+        // if the obj is new or local, destination will be my Ip
+        if((mid = lhashSearch(oid)) == 0) {
+            mid = myIpAddr;
+        }
+                   pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+                       
+        if(makedirty) { 
+                                 STATUS(headeraddr) = DIRTY;
+                       }
+
+                         pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
 #else
                        // Get machine location for object id (and whether local or not)
                        if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
@@ -919,9 +930,12 @@ int transCommit() {
   int firsttime=1;
   trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
   char finalResponse;
+
+#ifdef RECOVERY
   int deadsd = -1;
   int deadmid = -1;
   unsigned int transID = getNewTransID();
+#endif
 
 #ifdef DEBUG
   printf("%s -> Starts transCommit\n",__func__);
@@ -964,7 +978,7 @@ int transCommit() {
     /* Create a list of machine ids(Participants) involved in transaction   */
     listmid = calloc(pilecount, sizeof(unsigned int));
     pListMid(pile, listmid);
-       
+
     /* Create a socket and getReplyCtrl array, initialize */
     int socklist[pilecount];
     int loopcount;
@@ -989,15 +1003,14 @@ int transCommit() {
                        tosend[sockindex].f.numread = pile->numread;
                        tosend[sockindex].f.nummod = pile->nummod;
                        tosend[sockindex].f.numcreated = pile->numcreated;
-#ifdef DEBUG
-                       printf("%s-> numread:%d, nummod:%d, numcreated:%d\n", __func__, pile->numread, pile->nummod, pile->numcreated);
-#endif
                        tosend[sockindex].f.sum_bytes = pile->sum_bytes;
                        tosend[sockindex].listmid = listmid;
                        tosend[sockindex].objread = pile->objread;
                        tosend[sockindex].oidmod = pile->oidmod;
                        tosend[sockindex].oidcreated = pile->oidcreated;
-                       int sd = 0;
+
+
+      int sd = 0;
                        if(pile->mid != myIpAddr) {
                                if((sd = getSockWithLock(transRequestSockPool, pile->mid)) < 0) {
                                        printf("\ntransRequest(): socket create error\n");
@@ -1030,9 +1043,6 @@ int transCommit() {
                                        printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
                                        free(listmid);
                                        free(tosend);
-#ifdef DEBUG
-                                       printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
                                        return 1;
                                }
                                int offset = 0;
@@ -1045,9 +1055,6 @@ int transCommit() {
                                                free(modptr);
                                                free(listmid);
                                                free(tosend);
-#ifdef DEBUG
-                                               printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
                                                return 1;
                                        }
                                        GETSIZE(size,headeraddr);
@@ -1063,7 +1070,6 @@ int transCommit() {
 #endif
                                free(modptr);
                        } else { //handle request locally
-                               localReqsock = sockindex;
                                handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
                        }
                        sockindex++;
@@ -1077,8 +1083,6 @@ int transCommit() {
                int i;
 
                for(i = 0; i < pilecount; i++) {
-                       if(i == localReqsock)
-                               continue;
                        int sd = socklist[i]; 
                        if(sd != 0) {
                                char control;
@@ -1087,11 +1091,10 @@ int transCommit() {
                                //Update common data structure with new ctrl msg
                                getReplyCtrl[i] = control;
                                /* Recv Objects if participant sends TRANS_DISAGREE */
-                               //printf("getReplyCtrl[%d] = %d\n", i, (int)getReplyCtrl[i]);
 #ifdef CACHE
                                if(control == TRANS_DISAGREE) {
                                        int length;
-                                       timeout = recv_data(sd, &length, sizeof(int));
+                                       recv_data(sd, &length, sizeof(int));
                                        void *newAddr;
                                        pthread_mutex_lock(&prefetchcache_mutex);
                                        if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
@@ -1099,13 +1102,10 @@ int transCommit() {
                                                free(tosend);
                                                free(listmid);
                                                pthread_mutex_unlock(&prefetchcache_mutex);
-#ifdef DEBUG
-                                               printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
                                                return 1;
                                        }
                                        pthread_mutex_unlock(&prefetchcache_mutex);
-                                       timeout = recv_data(sd, newAddr, length);
+                                       recv_data(sd, newAddr, length);
                                        int offset = 0;
                                        while(length != 0) {
                                                unsigned int oidToPrefetch;
@@ -1132,10 +1132,6 @@ int transCommit() {
 
 #ifdef RECOVERY
         if(timeout < 0) {
-#ifdef DEBUG
-          printf("%s -> TIMEOUT!!!!!!!\n",__func__);
-#endif
-
           deadmid = listmid[i];
           deadsd = sd;
 #ifdef DEBUG
@@ -1156,9 +1152,6 @@ int transCommit() {
                        printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
                        free(tosend);
                        free(listmid);
-#ifdef DEBUG
-                       printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
                        return 1;
                }
 #ifdef DEBUG
@@ -1168,8 +1161,9 @@ int transCommit() {
                /* Send responses to all machines */
                for(i = 0; i < pilecount; i++) {
                        int sd = socklist[i];
-
+#ifdef RECOVERY
       if(sd != deadsd) {
+#endif
                        if(sd != 0) {
 #ifdef CACHE
                                if(finalResponse == TRANS_COMMIT) {
@@ -1179,9 +1173,6 @@ int transCommit() {
                                                printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
                                                  free(tosend);
                                                free(listmid);
-#ifdef DEBUG
-                                               printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
                                                return 1;
                                        }
 
@@ -1191,9 +1182,6 @@ int transCommit() {
                                                        printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
                                                        free(tosend);
                                                        free(listmid);
-#ifdef DEBUG
-                                                       printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
                                                        return 1;
                                                }
                                        }
@@ -1231,23 +1219,17 @@ int transCommit() {
                                  }
 #endif
                        }
-      } else {
-#ifdef ABORTREADERS
-        removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
-        removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
+#ifdef RECOVERY
+      } 
 #endif
-      }
-               }
+    }
 
 
-#ifdef DEBUG
-               printf("%s-> Free sockets\n", __func__);
-#endif
-               for(i = 0; i < pilecount; i++) {
-                       if(socklist[i] != 0) {
-                         freeSockWithLock(transRequestSockPool, listmid[i], socklist[i]);      
-                       }
-               }
+   for(i = 0; i< pilecount; i++) {
+     if(socklist[i] > 0) {
+       freeSockWithLock(transRequestSockPool,listmid[i], socklist[i]);
+     }
+   }
 
                        /* Free resources */
     free(tosend);
@@ -1261,19 +1243,17 @@ int transCommit() {
                        nSoftAbort++;
 #endif
                }
+    
+
        } while (treplyretry && deadmid != -1);
 
        if(finalResponse == TRANS_ABORT) {
-
 #ifdef TRANSSTATS
                numTransAbort++;
 #endif
     /* Free Resources */
     objstrDelete(t_cache);
     t_chashDelete();
-#ifdef DEBUG
-         printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
 #ifdef RECOVERY
     if(deadmid != -1) { /* if deadmid is greater than or equal to 0, 
                           then there is dead machine. */
@@ -1281,9 +1261,6 @@ int transCommit() {
       printf("%s -> Dead machine Detected : %s\n",__func__,midtoIPString(deadmid));
 #endif
       restoreDuplicationState(deadmid);
-#ifdef DEBUG
-      printf("%s -> Duplication completed\n",__func__);
-#endif
     }
 #endif
     return TRANS_ABORT;
@@ -1294,19 +1271,13 @@ int transCommit() {
     /* Free Resources */
     objstrDelete(t_cache);
     t_chashDelete();
-#ifdef DEBUG
-                                       printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
     return 0;
   } else {
     //TODO Add other cases
     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
-#ifdef DEBUG
-       printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
     exit(-1);
   }
-#ifdef DEBUG
+#ifndef DEBUG
        printf("%s-> End, line:%d\n\n", __func__, __LINE__);
 #endif
   return 0;
@@ -1363,16 +1334,10 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, cha
 
   /* Condition to send TRANS_AGREE */
   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
-#ifdef DEBUG
-    printf("%s -> TRANS_AGREE\n",__func__);
-#endif
     *getReplyCtrl = TRANS_AGREE;
   }
   /* Condition to send TRANS_SOFT_ABORT */
   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
-#ifdef DEBUG
-    printf("%s -> TRANS_SOFT_ABORT\n",__func__);
-#endif
     *getReplyCtrl = TRANS_SOFT_ABORT;
   }
 }
@@ -1478,30 +1443,25 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
  * cache. */
 
 void *getRemoteObj(unsigned int mnum, unsigned int oid) {
+#ifdef DEBUG
+  printf("%s -> entering\n",__func__);
+#endif
   int size, val;
   struct sockaddr_in serv_addr;
-  char machineip[16];
   char control = 0;
   objheader_t *h;
   void *objcopy = NULL;
 
-  int sd;
-  int flag;
-
-  if((sd = getSock2(transReadSockPool, mnum)) != -1) {
-    char readrequest[sizeof(char)+sizeof(unsigned int)];
-    readrequest[0] = READ_REQUEST;
-    *((unsigned int *)(&readrequest[1])) = oid;
-    send_data(sd, readrequest, sizeof(readrequest));
-  }
-  else {
-      printf("%s -> creating socket error\n",__func__);
-  }
+  int sd = getSock2(transRequestSockPool, mnum);
+  char readrequest[sizeof(char)+sizeof(unsigned int)];
+  readrequest[0] = READ_REQUEST;
+  *((unsigned int *)(&readrequest[1])) = oid;
+  send_data(sd, readrequest, sizeof(readrequest));
   
   /* Read response from the Participant */
   if(recv_data(sd, &control, sizeof(char)) < 0) {
-      transRetryFlag = 1;
-      return NULL;
+    transRetryFlag = 1;
+    return NULL;
   }
 
   if (control==OBJECT_NOT_FOUND) {
@@ -1521,7 +1481,6 @@ void *getRemoteObj(unsigned int mnum, unsigned int oid) {
     transRetryFlag = 1;
     return NULL;
   }
-
   STATUS(objcopy)=0;
   
   /* Insert into cache's lookup table */
@@ -1546,7 +1505,7 @@ char receiveDecisionFromBackup(unsigned int transID,int nummid,unsigned int *lis
   char response;
   
   for(i = 0; i < nummid; i++) {
-    if((sd = getSock(transReadSockPool, listmid[i])) < 0) {
+    if((sd = getSock(transPrefetchSockPool, listmid[i])) < 0) {
       printf("%s -> socket Error!!\n");
     }
     else {
@@ -1563,7 +1522,7 @@ char receiveDecisionFromBackup(unsigned int transID,int nummid,unsigned int *lis
         break;  // received response
 
       // else check next machine
-      freeSock(transReadSockPool, listmid[i],sd);
+      freeSock(transPrefetchSockPool, listmid[i],sd);
      }
   }
 #ifdef DEBUG
@@ -1579,12 +1538,12 @@ void restoreDuplicationState(unsigned int deadHost) {
        int sd;
        char ctrl;
 
-       if(!liveHosts[findHost(deadHost)]) {
+       if(!liveHosts[findHost(deadHost)]) {  // if it is already fixed
                sleep(WAIT_TIME);
                return;
        }
 
-       if(deadHost == leader)
+       if(deadHost == leader)  // if leader is dead, then pick a new leader
                paxos();
        
 #ifdef DEBUG
@@ -1597,7 +1556,7 @@ void restoreDuplicationState(unsigned int deadHost) {
                        leaderFixing = 1;
                        pthread_mutex_unlock(&leaderFixing_mutex);
 
-      if(!liveHosts[findHost(deadHost)]) {
+      if(!liveHosts[findHost(deadHost)]) {  // if it is already fixed
 #ifdef DEBUG
         printf("%s -> already fixed\n",__func__);
 #endif
@@ -1605,7 +1564,7 @@ void restoreDuplicationState(unsigned int deadHost) {
         leaderFixing =0;
         pthread_mutex_unlock(&leaderFixing_mutex);
       }
-      else {
+      else {                // if i am the leader
                        updateLiveHosts();
         duplicateLostObjects(deadHost);
                        
@@ -1626,7 +1585,7 @@ void restoreDuplicationState(unsigned int deadHost) {
                        sleep(WAIT_TIME);
                }
        }
-       else {
+       else {    // request leader to fix the situation
                if((sd = getSockWithLock(transPrefetchSockPool, leader)) < 0) {
                        printf("%s -> socket create error\n",__func__);
                        exit(-1);
@@ -1771,10 +1730,6 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
   numlocked = transinfo->numlocked;
   oidlocked = transinfo->objlocked;
 
-#ifdef DEBUG
-       printf("%s-> nummod: %d, numcreated: %d, numlocked: %d\n", __func__, nummod, numcreated, numlocked);
-#endif
-
   for (i = 0; i < nummod; i++) {
     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
@@ -1799,13 +1754,10 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
     header->version += 1;
     //printf("oid: %u, new header version: %d\n", oidmod[i], header->version);
     if(header->notifylist != NULL) {
-#ifdef DEBUG
-      printf("%s -> type : %d notifylist : %d\n",__func__,TYPE(header),header->notifylist);
-#endif
 #ifdef RECOVERY
-      if(header->isBackup != 0)
+      if(header->isBackup != 0)   // if it is primary obj, notify
         notifyAll(&header->notifylist, OID(header), header->version);
-      else
+      else                        // if not, just clear the notification list
         clearNotifyList(OID(header));
 #else  
       notifyAll(&header->notifylist, OID(header), header->version);
@@ -1819,7 +1771,6 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
       return 1;
     }
     header->version += 1;
-    //printf("oid: %u, new header version: %d\n", oidcreated[i], header->version);
     GETSIZE(tmpsize, header);
     tmpsize += sizeof(objheader_t);
     pthread_mutex_lock(&mainobjstore_mutex);
@@ -1834,6 +1785,7 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
     memcpy(ptrcreate, header, tmpsize);
     mhashInsert(oidcreated[i], ptrcreate);
     lhashInsert(oidcreated[i], myIpAddr);
+//    printf("oid created : %u\n",oidcreated[i]);
   }
   /* Unlock locked objects */
   int useWriteUnlock = 0;
@@ -2099,8 +2051,17 @@ unsigned short getObjType(unsigned int oid) {
 #ifdef CACHE
     if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
 #endif
+
+#ifdef RECOVERY
+    unsigned int mid = lhashSearch(oid);
+    unsigned int machineID;
+    static flipBit = 0;
+    machineID = (flipBit)?(getPrimaryMachine(mid)):(getBackupMachine(mid));
+    int sd = getSock2(transReadSockPool, machineID);
+#else
     unsigned int mid = lhashSearch(oid);
     int sd = getSock2(transReadSockPool, mid);
+#endif
     char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
     remotereadrequest[0] = READ_REQUEST;
     *((unsigned int *)(&remotereadrequest[1])) = oid;
@@ -2191,6 +2152,7 @@ unsigned int getNewOID(void) {
   return id;
 }
 
+#ifdef RECOVERY
 static unsigned int tid = 0xFFFFFFFF;
 unsigned int getNewTransID(void) {
   tid++;
@@ -2199,6 +2161,7 @@ unsigned int getNewTransID(void) {
   }
   return tid;
 }
+#endif
 
 int processConfigFile() {
   FILE *configFile;
@@ -2223,8 +2186,6 @@ int processConfigFile() {
 #ifdef RECOVERY        
        liveHosts = calloc(sizeOfHostArray, sizeof(unsigned int));
        locateObjHosts = calloc(sizeOfHostArray*2, sizeof(unsigned int));
-
-  liveHostsValid = 0;
 #endif
 
        while(fgets(lineBuffer, maxLineLength, configFile) != NULL) {
@@ -2258,7 +2219,6 @@ int processConfigFile() {
   myIndexInHostArray = findHost(myIpAddr);
 #ifdef RECOVERY
        liveHosts[myIndexInHostArray] = 1;
-       //locateObjHosts[myIndexInHostArray] = myIpAddr;
 #endif  
        if (myIndexInHostArray == -1) {
     printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
@@ -2295,6 +2255,9 @@ unsigned int getPrimaryMachine(unsigned int mid) {
        unsigned int pmid;
        int pmidindex = 2*findHost(mid);
 
+  if(pmidindex < 0)
+    printf("What!!!\n");
+
        pthread_mutex_lock(&liveHosts_mutex);
        pmid = locateObjHosts[pmidindex];
        pthread_mutex_unlock(&liveHosts_mutex);
@@ -2305,6 +2268,9 @@ unsigned int getBackupMachine(unsigned int mid) {
        unsigned int bmid;
        int bmidindex = 2*findHost(mid)+1;
 
+  if(bmidindex < 0)
+    printf("damn!!\n");
+
        pthread_mutex_lock(&liveHosts_mutex);
        bmid = locateObjHosts[bmidindex];
        pthread_mutex_unlock(&liveHosts_mutex);
@@ -2326,7 +2292,6 @@ unsigned int updateLiveHosts() {
     printf("%s-> Entering updateLiveHosts\n", __func__);       
 #endif
        // update everyone's list
-    liveHostsValid = 0;
        
   //foreach in hostipaddrs, ping -> update list of livemachines        
   //socket connection?
@@ -2336,63 +2301,52 @@ unsigned int updateLiveHosts() {
        int sd = 0, i, j, tmpNumLiveHosts = 0;
        for(i = 0; i < numHostsInSystem; i++) {
     if(i == myIndexInHostArray) 
-               {       
+               {
+      liveHosts[i] = 1;
                        tmpNumLiveHosts++;
                        continue;
                }
-               for(j = 0; j < 5; j++) {        // hard define num of retries
-                       if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) {
-#ifdef DEBUG
-               printf("%s -> Cannot create socket connection to [%s], attempt %d\n", __func__, midtoIPString(hostIpAddrs[i]), j);
-#endif
-                               usleep(1000);
-  
-        if(j == 4) {
-                         if(liveHosts[i]) {
-            liveHosts[i] = 0;
-            deadhost = i;
-          }
-        }
-                       continue;
-                 }
+               if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) {
+                       usleep(1000);
+    
+               if(liveHosts[i]) {
+        liveHosts[i] = 0;
+        deadhost = i;
+      }
+      continue;
+               }
       
-      char liverequest[sizeof(char)];
-                       liverequest[0] = RESPOND_LIVE;
+    char liverequest;
+               liverequest = RESPOND_LIVE;
        
-                       send_data(sd, &liverequest[0], sizeof(liverequest));
+               send_data(sd, &liverequest, sizeof(char));
       
-                       char response = 0;
-                       int timeout = recv_data(sd, &response, sizeof(response));
+               char response = 0;
+               int timeout = recv_data(sd, &response, sizeof(char));
                        
-                       //try to send msg
-                       //if timeout, dead host
-                       if(response == LIVE) {
-                       liveHosts[i] = 1;
-                       tmpNumLiveHosts++;
-                       }
-                       else {
-        if(liveHosts[i]) {
-          liveHosts[i] = 0;
-          deadhost = i;
-        }
-                 }
-      freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
-                       break;
+               //try to send msg
+               //if timeout, dead host
+               if(response == LIVE) {
+       liveHosts[i] = 1;
+               tmpNumLiveHosts++;
                }
-#ifdef DEBUG
-         if(liveHosts[i] == 0)
-
-                 printf("updateLiveHosts(): cannot make connection to machine %s\n", midtoIPString(hostIpAddrs[i]));
-#endif
+               else {
+      if(liveHosts[i]) {
+        liveHosts[i] = 0;
+        deadhost = i;
+      }
+               }
+    freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
        }
-       numLiveHostsInSystem = tmpNumLiveHosts;
+  
+  numLiveHostsInSystem = tmpNumLiveHosts;
 #ifdef DEBUG
        printf("numLiveHostsInSystem:%d\n", numLiveHostsInSystem);
 #endif
        //have updated list of live machines
 #ifdef DEBUG   
-  printf("%s-> Exiting updateLiveHosts\n", __func__);  
        printHostsStatus();
+  printf("%s-> Exiting updateLiveHosts\n", __func__);  
 #endif
 
   return deadhost;
@@ -2414,9 +2368,8 @@ int updateLiveHostsCommit() {
        int sd = 0, i;
        
        char updaterequest[sizeof(char)+sizeof(int)*numHostsInSystem+sizeof(unsigned int)*(numHostsInSystem*2)];
-  
+
   updaterequest[0] = UPDATE_LIVE_HOSTS;
-               
        for(i = 0; i < numHostsInSystem; i++) {
                *((int *)(&updaterequest[i*4+1])) = liveHosts[i];  // clean this up later
        }
@@ -2426,7 +2379,6 @@ int updateLiveHostsCommit() {
        }
 
        //for each machine send data
-        
        for(i = 0; i < numHostsInSystem; i++) {         // hard define num of retries
                if(i == myIndexInHostArray) 
                        continue;
@@ -2439,7 +2391,6 @@ int updateLiveHostsCommit() {
       freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
                }
        }
-       liveHostsValid = 1;
 #ifdef DEBUG
        printHostsStatus();
   printf("%s -> Finish\n",__func__);
@@ -2447,16 +2398,15 @@ int updateLiveHostsCommit() {
 
        return 0;
 }
+#endif
 
+#ifdef RECOVERY
 void setLocateObjHosts() {
        int i = 0, validIndex = 0;
 
        //check num hosts even valid first
        
-       for(;i < numHostsInSystem; i++) {
-#ifdef DEBUG
-    printf("%s-> i:%d\n", __func__, i);
-#endif
+       for(i = 0;i < numHostsInSystem; i++) {
                
                while(liveHosts[(i+validIndex)%numHostsInSystem] == 0) {
                        validIndex++;
@@ -2490,6 +2440,17 @@ void setReLocateObjHosts(int mid)
   int newPrimaryIndex = findHost(newPrimary);
   int i;
 
+  /* duplicateLostObject example
+   * Before M24 die,
+   * MID        21      24      26
+   * Primary    21      24      26
+   * Backup     26      21      24
+   * After M24 die,
+   * MID        21      26
+   * Primary   21,24    26
+   * Backup     26      21,24
+   */
+
   locateObjHosts[2*newPrimaryIndex+1] = backupMachine;
   locateObjHosts[2*mIndex] = newPrimary;
 
@@ -2527,7 +2488,9 @@ int allHostsLive() {
        }
        return 1;
 }
+#endif
 
+#ifdef RECOVERY
 void duplicateLostObjects(unsigned int mid){
 
        printf("%s-> Start, mid: [%s]\n", __func__, midtoIPString(mid));  
@@ -2546,7 +2509,6 @@ void duplicateLostObjects(unsigned int mid){
        
        //connect to these machines
        //go through their object store copying necessary (in a transaction)
-       //transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
        int sd = 0, i, j, tmpNumLiveHosts = 0;
 
   /* duplicateLostObject example
@@ -2978,11 +2940,7 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
 
   while(*head != NULL) {
     ptr = *head;
-
     mid = ptr->mid;
-#ifdef DEBUG
-    printf("%s -> trying to connect MID : %s\n",__func__,midtoIPString(mid));
-#endif
 
     //create a socket connection to that machine
     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
@@ -3001,9 +2959,6 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
       fflush(stdout);
       status = -1;
     } else {
-#ifdef DEBUG
-      printf("%s -> connected\n",__func__);
-#endif
       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
       msg[0] = THREAD_NOTIFY_RESPONSE;
       *((unsigned int *)&msg[1]) = oid;
@@ -3017,13 +2972,9 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
     }
     //close socket
     close(sock);
-
     // Update head
     *head = ptr->next;
     free(ptr);
-#ifdef DEBUG
-    printf("%s -> End notifying MID : %s\n",__func__,midtoIPString(mid));  
-#endif
   }
   return status;
 }
@@ -3042,11 +2993,10 @@ void transAbort() {
 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
   plistnode_t *ptr, *tmp;
   int found = 0, offset = 0;
-  char ip[16];
   tmp = pile;
+
   //Add oid into a machine that is already present in the pile linked list structure
   while(tmp != NULL) {
-//    printf("tmp->mid = [%s], mid = [%s]\n", midtoIPString(tmp->mid), midtoIPString(mid));
     if (tmp->mid == mid) {
       int tmpsize;
 
@@ -3078,8 +3028,10 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
   if (!found) {
     int tmpsize;
     if((ptr = pCreate(num_objs)) == NULL) {
+      printf("pCreate Error\n");
       return NULL;
     }
+
     ptr->mid = mid;
     if (STATUS(headeraddr) & NEW) {
       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
@@ -3097,6 +3049,7 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
       ptr->numread++;
     }
+
     ptr->next = pile;
     pile = ptr;
   }
@@ -3121,6 +3074,7 @@ plistnode_t *sortPiles(plistnode_t *pileptr) {
        /* Arrange local machine processing at the end of the pile list */
        while(ptr != NULL) {
                if(ptr != tail) {
+      /*
                        if(ptr->mid == myIpAddr && (prev != pileptr)) {
                                prev->next = ptr->next;
                                ptr->next = NULL;
@@ -3128,11 +3082,20 @@ plistnode_t *sortPiles(plistnode_t *pileptr) {
                                return pileptr;
                        }
                        if((ptr->mid == myIpAddr) && (prev == pileptr)) {
-                               prev = ptr->next;
-                               ptr->next = NULL;
-                               tail->next = ptr;
-                               return prev;
+        prev->next = ptr->next;
+        ptr->next = NULL;
+        tail->next = ptr;
+                               return pileptr;
                        }
+      */
+
+      if((ptr->mid == myIpAddr))
+      {
+        tail->next = pileptr;
+        pileptr = ptr->next;
+        ptr->next = NULL;
+        return pileptr;
+      }
                        prev = ptr;
                }
                ptr = ptr->next;
@@ -3342,8 +3305,9 @@ void paxosLearn()
        }
        //return v_a;
 }
+#endif
 
-
+#ifdef RECOVERY
 void clearDeadThreadsNotification() 
 {