new recovery protocol
authorjihoonl <jihoonl>
Tue, 27 Apr 2010 16:47:18 +0000 (16:47 +0000)
committerjihoonl <jihoonl>
Tue, 27 Apr 2010 16:47:18 +0000 (16:47 +0000)
Robust/src/Runtime/DSTM/interface_recovery/dstm.h
Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/sockpool.c
Robust/src/Runtime/DSTM/interface_recovery/trans.c

index 3349b21a018afd027765bc9247054e463db224d9..c7c9f216e7d31c58da583b3849577034a3a85cb0 100644 (file)
  *****************************/
 #define RESPOND_LIVE                                    30
 #define LIVE                                                                    31
-#define REMOTE_RESTORE_DUPLICATED_STATE 37  
-#define UPDATE_LIVE_HOSTS      32
-#define DUPLICATE_ORIGINAL              33
-#define DUPLICATE_BACKUP                        34
+#define UPDATE_LIVE_HOSTS      32
+#define REQUEST_DUPLICATE               33
 #define DUPLICATION_COMPLETE    35
 #define RECEIVE_DUPES                                   36
 
 /*********************************
  * Transaction Clear Messages
  *********************************/
-#define ASK_COMMIT                51
-#define CLEAR_NOTIFY_LIST         52
-#define REQUEST_TRANS_WAIT        53
-#define RESPOND_TRANS_WAIT        54
-#define REQUEST_TRANS_RESTART     55
-#define REQUEST_TRANS_LIST        56
-#define REQUEST_TRANS_RECOVERY    57
+#define CLEAR_NOTIFY_LIST         51
+#define REQUEST_TRANS_WAIT        52
+#define RESPOND_TRANS_WAIT        53
+#define RESPOND_HIGHER_EPOCH      54
+#define RELEASE_NEW_LIST          55
+#define REQUEST_TRANS_RESTART     56
 #define REQUEST_TRANS_CHECK       58
 #define REQUEST_TRANS_COMPLETE    59
 
@@ -214,6 +211,10 @@ typedef struct fixed_data {
   unsigned int nummod;                  /* no of objects modified */
   unsigned int numcreated;              /* no of objects created */
   int sum_bytes;                /* total bytes of modified objects in a transaction */
+
+#ifdef RECOVERY
+  unsigned int epoch_num;
+#endif
 } fixed_data_t;
 
 /* Structure that holds trans request information for each participant */
@@ -224,10 +225,6 @@ typedef struct trans_req_data {
   unsigned int *oidmod;         /* Pointer to array holding oids of objects that are modified */
   unsigned int *oidcreated;     /* Pointer to array holding oids of objects that are newly created */
 
-#ifdef RECOVERY
-  unsigned int transid;
-#endif
-
 } trans_req_data_t;
 
 /* Structure that holds information of objects that are not found in the participant
@@ -243,10 +240,8 @@ typedef struct trans_commit_data {
 } trans_commit_data_t;
 
 #ifdef RECOVERY
-  int leaderFixing;
-  pthread_mutex_t leaderFixing_mutex;
+  pthread_mutex_t recovery_mutex;
   pthread_mutex_t liveHosts_mutex;
-
 #endif
 
 #ifdef RECOVERYSTATS
@@ -290,6 +285,7 @@ void updateLiveHostsList(int mid);
 int updateLiveHostsCommit();
 void receiveNewHostLists(int accept);
 void stopTransactions(int TRANS_FLAG);
+void sendMyList(int);
 void sendTransList(int acceptfd);
 int receiveTransList(int acceptfd);
 int combineTransactionList(tlist_node_t* tArray,int size);
@@ -313,17 +309,18 @@ void clearDeadThreadsNotification();
 /* for recovery */
 void reqClearNotifyList(unsigned int oid);
 void clearNotifyList(unsigned int oid);
-void duplicateLostObjects(unsigned int mid);
-unsigned int duplicateLocalBackupObjects();
-unsigned int duplicateLocalOriginalObjects();
 void notifyLeaderDeadMachine(unsigned int deadHost);
-void restoreDuplicationState(unsigned int deadHost);
-void notifyRestoration();
-void clearTransaction();
-void makeTransactionLists(tlist_t**,int*);
-void releaseTransactionLists(tlist_t*,int*);
+void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num);
+int* getSocketLists();
+void freeSocketLists(int*);
+int inspectEpoch(unsigned int);
+int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t**);
+int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t*);
+int duplicateLostObjects(unsigned int epoch_num,int* sdlist);
+void restartTransactions(unsigned int epoch_num,int* sdlist);
+void makeTransactionLists(tlist_t**,int);
+void computeLiveHosts(int);
 void waitForAllMachine();
-void restartTransactions();
 int readDuplicateObjs(int);
 void printRecoveryStat();
 
index 163a16843c13a75249a6eacdf7d293f16eb7dc64..d104934798bc3eae60e719fc9556ea8c64165abe 100644 (file)
@@ -42,6 +42,7 @@ pthread_mutex_t clearNotifyList_mutex;
 tlist_t* transList;
 int okCommit; // machine flag
 extern numWaitMachine;
+extern unsigned int currentEpoch;
 
 #endif
 
@@ -53,7 +54,6 @@ pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a
 sockPoolHashTable_t *transPResponseSocketPool;
 
 #ifdef RECOVERY
-extern unsigned int leader;
 
 long long myrdtsc(void)
 {
@@ -76,7 +76,7 @@ int dstmInit(void) {
 
 #ifdef RECOVERY
        pthread_mutex_init(&liveHosts_mutex, NULL);
-       pthread_mutex_init(&leaderFixing_mutex, NULL);
+       pthread_mutex_init(&recovery_mutex, NULL);
   pthread_mutex_init(&clearNotifyList_mutex,NULL);
 #endif
 
@@ -93,6 +93,10 @@ int dstmInit(void) {
     printf("well error\n");
     return 1;
   }
+  
+  okCommit = TRANS_OK;
+  currentEpoch = 1;
+
 #endif
 
   if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
@@ -104,7 +108,6 @@ int dstmInit(void) {
     return 0;
   }
 
-  okCommit = TRANS_OK;
 
   return 0;
 }
@@ -271,6 +274,7 @@ void *dstmAccept(void *acceptfd) {
        void *dupeptr;
   unsigned int transIDreceived;
   char decision;
+  unsigned int epoch_num;
   int timeout;
 #endif
 
@@ -294,7 +298,7 @@ void *dstmAccept(void *acceptfd) {
                if (ret==0)
                        break;
                if (ret==-1) {
-                       printf("DEBUG -> RECV Error!.. retrying\n");
+//                     printf("DEBUG -> RECV Error!.. retrying\n");
        //              exit(0);
                        break;
                }
@@ -358,18 +362,6 @@ void *dstmAccept(void *acceptfd) {
                                        pthread_exit(NULL);
                                }
                                break;
-#ifdef RECOVERY
-      case ASK_COMMIT :
-
-        if(recv_data((int)acceptfd, &transIDreceived, sizeof(unsigned int)) < 0)
-          break;        
-
-        decision = checkDecision(transIDreceived);
-
-        send_data((int)acceptfd,&decision,sizeof(char));
-
-        break;
-#endif
                        case TRANS_PREFETCH:
 #ifdef DEBUG
         printf("control -> TRANS_PREFETCH\n");
@@ -500,70 +492,54 @@ void *dstmAccept(void *acceptfd) {
 #ifdef RECOVERY
                        case RESPOND_LIVE:
                                ctrl = LIVE;
-                               send_data((int)acceptfd, &ctrl, sizeof(ctrl));
-                               break;
-#endif
-#ifdef RECOVERY
-                       case REMOTE_RESTORE_DUPLICATED_STATE:
-#ifdef DEBUG
-        printf("control -> REMOTE_RESTORE_DUPLICATED_STATE\n");
-#endif
-                               recv_data((int)acceptfd, &mid, sizeof(unsigned int));
-                               if(!liveHosts[findHost(mid)]) {
-#ifdef DEBUG
-          printf("%s (REMOTE_RESTORE_DUPLICATED_STATE) -> already fixed\n",__func__);
-#endif
-                                       break;
-        }
-                               pthread_mutex_lock(&leaderFixing_mutex);
-                               if(!leaderFixing) {
-                                       leaderFixing = 1;
-                                       pthread_mutex_unlock(&leaderFixing_mutex);
-          
-          restoreDuplicationState(mid);
-          // finish fixing
-                               pthread_mutex_lock(&leaderFixing_mutex);
-                               leaderFixing = 0;
-                               pthread_mutex_unlock(&leaderFixing_mutex);
-                               }
-                               else {
-                                       pthread_mutex_unlock(&leaderFixing_mutex);
-#ifdef DEBUG
-          printf("%s (REMOTE_RESTORE_DUPLICATED_STATE -> LEADER is already fixing\n",__func__);
-#endif
-          sleep(WAIT_TIME);
-                               }
+                               send_data((int)acceptfd, &ctrl, sizeof(char));
                                break;
 #endif
 #ifdef RECOVERY
       case REQUEST_TRANS_WAIT:
-        receiveNewHostLists((int)acceptfd);        
-        stopTransactions(TRANS_BEFORE);
+        { 
+          recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int));
 
-        response = RESPOND_TRANS_WAIT;
-        send_data((int)acceptfd,&response,sizeof(char));
-//        respondToLeader();
+          if(inspectEpoch(epoch_num) < 0) {
+            response = RESPOND_HIGHER_EPOCH;
+            send_data((int)acceptfd,&response,sizeof(char));
+          }
+          else {
+            printf("Got new Leader! : %d\n",epoch_num);
+            currentEpoch = epoch_num;
+            stopTransactions(TRANS_BEFORE);
+            response = RESPOND_TRANS_WAIT;
+            send_data((int)acceptfd,&response,sizeof(char));
+            sendMyList((int)acceptfd);
+          }
+        } 
         break;
 
-      case RESPOND_TRANS_WAIT:
-        printf("control -> RESPOND_TRANS_WAIT\n");
-        pthread_mutex_lock(&liveHosts_mutex);
-        numWaitMachine++;
-        pthread_mutex_unlock(&liveHosts_mutex);
-        printf("numWaitMachine = %d\n",numWaitMachine);
-        break;
+      case RELEASE_NEW_LIST:
+        printf("control -> RELEASE_NEW_LIST\n");
+        { 
+          recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int));
 
-      case REQUEST_TRANS_LIST:
-        printf("control -> REQUEST_TRANS_LIST\n");
-        sendTransList((int)acceptfd);
-        response =  receiveTransList((int)acceptfd);
-        stopTransactions(TRANS_AFTER);
-  
-        send_data((int)acceptfd,&response,sizeof(char));
-        break;
+          if(inspectEpoch(epoch_num) < 0) {
+            response = RESPOND_HIGHER_EPOCH;
+          }
+          else 
+          {
+            response =  receiveNewList((int)acceptfd);
+            stopTransactions(TRANS_AFTER);
+          }
+          send_data((int)acceptfd,&response,sizeof(char));
+        }
+      break;
 
       case REQUEST_TRANS_RESTART:
+
+        recv_data((int)acceptfd,&epoch_num,sizeof(char));
+
+        if(inspectEpoch(epoch_num) < 0) break;
+        
         pthread_mutex_lock(&liveHosts_mutex);
+        printf("RESTART!!!\n");
         okCommit = TRANS_OK;
         pthread_mutex_unlock(&liveHosts_mutex);
         break;
@@ -581,19 +557,22 @@ void *dstmAccept(void *acceptfd) {
 #endif
 
 #ifdef RECOVERY
-                       case DUPLICATE_ORIGINAL:
+                       case REQUEST_DUPLICATE:
        {
          struct sockaddr_in remoteAddr;
          int sd;
+         unsigned int epoch_num;
 
-#ifdef DEBUG
-          printf("control -> DUPLICATE_ORIGINAL\n");
-                               printf("%s (DUPLICATE_ORIGINAL)-> Attempt to duplicate original objects\n", __func__);  
-#endif
-                               //object store stuffffff
-                               recv_data((int)acceptfd, &mid, sizeof(unsigned int));
+         recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int));
 
-          dupeptr = (char*) mhashGetDuplicate(&tempsize, 0);
+         if(inspectEpoch(epoch_num) < 0) {
+           break;
+         }
+                               
+         //object store stuffffff
+         mid = getBackupMachine(myIpAddr);
+
+         dupeptr = (char*) mhashGetDuplicate(&tempsize, 0);
 
                                //send control and dupes after
                                ctrl = RECEIVE_DUPES;
@@ -609,23 +588,28 @@ void *dstmAccept(void *acceptfd) {
           remoteAddr.sin_addr.s_addr = htonl(mid);
 
           if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {
-            printf("ORIGINAL ERROR : %s\n",strerror(errno));
-            exit(0);
+            printf("REQUEST_DUPE ERROR : %s\n",strerror(errno));
+//            exit(0);
+            break;
           }
           else {
                                send_data(sd, &ctrl, sizeof(char));
                                send_data(sd, dupeptr, tempsize);
 
+            if((readDuplicateObjs(sd) )!= 0) {
+              printf("Fail in readDuplicateObj()\n");
+              break;
+//              exit(0);
+            }
                                  recv_data(sd, &response, sizeof(char));
-#ifdef DEBUG
-            printf("%s ->response : %d  -  %d\n",__func__,response,DUPLICATION_COMPLETE);
-#endif
-                               if(response != DUPLICATION_COMPLETE) {
+                               
+            if(response != DUPLICATION_COMPLETE) {
 #ifndef DEBUG
-             printf("%s(DUPLICATION_ORIGINAL) -> DUPLICATION FAIL\n",__func__);
+             printf("%s(REQUEST_DUPE) -> DUPLICATION FAIL\n",__func__);
 #endif
                                  //fail message
-             exit(0);
+             break;
+//             exit(0);
                                  }
 
             close(sd);
@@ -634,72 +618,12 @@ void *dstmAccept(void *acceptfd) {
 
           ctrl = DUPLICATION_COMPLETE;
                                  send_data((int)acceptfd, &ctrl, sizeof(char));
-          send_data((int)acceptfd, &tempsize, sizeof(unsigned int));
 #ifdef DEBUG
-                                 printf("%s (DUPLICATE_ORIGINAL)-> Finished\n", __func__);     
+                                 printf("%s (REQUEST_DUPE)-> Finished\n", __func__);   
 #endif
        }
                                  break;
 
-                       case DUPLICATE_BACKUP:
-        {
-          struct sockaddr_in remoteAddr;
-          int sd;
-#ifdef DEBUG
-          printf("control -> DUPLICATE_BACKUP\n");
-                               printf("%s (DUPLICATE_BACKUP)-> Attempt to duplicate backup objects\n", __func__);
-#endif
-                               //object store stuffffff
-                                 recv_data((int)acceptfd, &mid, sizeof(unsigned int));
-
-          dupeptr = (char*) mhashGetDuplicate(&tempsize, 1);
-
-                               //send control and dupes after
-                               ctrl = RECEIVE_DUPES;
-
-          if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-            perror("BACKUP : ");
-            exit(0);
-          }
-
-           bzero(&remoteAddr, sizeof(remoteAddr));                                       
-           remoteAddr.sin_family = AF_INET;                                              
-           remoteAddr.sin_port = htons(LISTEN_PORT);                                     
-           remoteAddr.sin_addr.s_addr = htonl(mid);                                      
-                                                                                       
-           if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {       
-             printf("BACKUP ERROR : %s\n",strerror(errno));
-             exit(0);
-           }                                                                             
-           else {                                                                        
-            send_data(sd, &ctrl, sizeof(char));
-                               send_data(sd, dupeptr, tempsize);
-          
-            recv_data(sd, &response, sizeof(char));
-#ifdef DEBUG
-            printf("%s ->response : %d  -  %d\n",__func__,response,DUPLICATION_COMPLETE);
-#endif
-                                 if(response != DUPLICATION_COMPLETE) {
-#ifndef DEBUG
-              printf("%s(DUPLICATION_BACKUP) -> DUPLICATION FAIL\n",__func__);
-#endif
-              exit(0);
-            }
-
-            close(sd);
-           }
-
-          free(dupeptr);
-
-                                 ctrl = DUPLICATION_COMPLETE;
-                                 send_data((int)acceptfd, &ctrl, sizeof(char));
-          send_data((int)acceptfd, &tempsize, sizeof(unsigned int));
-#ifdef DEBUG
-                                 printf("%s (DUPLICATE_BACKUP)-> Finished\n", __func__);       
-#endif
-        }
-                               break;
-
                        case RECEIVE_DUPES:
 #ifdef DEBUG
         printf("control -> RECEIVE_DUPES sd : %d\n",(int)acceptfd);
@@ -708,35 +632,18 @@ void *dstmAccept(void *acceptfd) {
                                        printf("Error: In readDuplicateObjs() %s, %d\n", __FILE__, __LINE__);
                                        pthread_exit(NULL);
                                }
+        
+        dupeptr = (char*) mhashGetDuplicate(&tempsize, 1);
+        
+        send_data((int)acceptfd,dupeptr,tempsize);
 
+        free(dupeptr);
                                ctrl = DUPLICATION_COMPLETE;
                                send_data((int)acceptfd, &ctrl, sizeof(char));
 #ifdef DEBUG
         printf("%s (RECEIVE_DUPES) -> Finished\n",__func__);
 #endif
                                break;
-#endif
-#ifdef RECOVERY
-                       case PAXOS_PREPARE:
-#ifdef DEBUG
-        printf("control -> PAXOS_PREPARE\n");
-#endif
-        paxosPrepare_receiver((int)acceptfd);
-                               break;
-
-                       case PAXOS_ACCEPT:
-#ifdef DEBUG
-        printf("control -> PAXOS_ACCEPT\n");
-#endif
-        paxosAccept_receiver((int)acceptfd);
-                               break;
-
-                       case PAXOS_LEARN:
-#ifdef DEBUG
-        printf("control -> PAXOS_LEARN\n");
-#endif
-        leader = paxosLearn_receiver((int)acceptfd);
-                               break;
 #endif
                        default:
                                printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
@@ -1881,10 +1788,17 @@ void stopTransactions(int TRANS_FLAG)
   pthread_mutex_unlock(&clearNotifyList_mutex);
 }
 
+void sendMyList(int acceptfd)
+{
+  pthread_mutex_lock(&liveHosts_mutex);
+  send_data((int)acceptfd,liveHosts,sizeof(int) * numHostsInSystem);
+  pthread_mutex_unlock(&liveHosts_mutex);
+
+  sendTransList(acceptfd);
+}
+
 void sendTransList(int acceptfd)
 {
-  printf("%s -> Enter\n",__func__);
-  
   int size;
   char response;
   int transid;
@@ -1892,11 +1806,11 @@ void sendTransList(int acceptfd)
   // send on-going transaction
   tlist_node_t* transArray = tlistToArray(transList,&size);
 
-  if(transList->size != 0)
+/*  if(transList->size != 0)
     tlistPrint(transList);
 
   printf("%s -> transList->size : %d  size = %d\n",__func__,transList->size,size);
-
+*/
   send_data((int)acceptfd,&size,sizeof(int));
   send_data((int)acceptfd,transArray, sizeof(tlist_node_t) * size);
 
@@ -1917,7 +1831,7 @@ void sendTransList(int acceptfd)
   free(transArray);
 }
 
-int receiveTransList(int acceptfd)
+int receiveNewList(int acceptfd)
 {
   int size;
   tlist_node_t* tArray;
@@ -1925,7 +1839,15 @@ int receiveTransList(int acceptfd)
   int i;
   int flag = 1;
   char response;
-  
+
+  // new host lists
+  pthread_mutex_lock(&liveHosts_mutex);
+  recv_data((int)acceptfd,liveHosts,sizeof(int)*numHostsInSystem);
+  pthread_mutex_unlock(&liveHosts_mutex);
+
+  setLocateObjHosts();
+
+  // new transaction list
   recv_data((int)acceptfd,&size,sizeof(int));
 
 
@@ -1937,13 +1859,10 @@ int receiveTransList(int acceptfd)
     }
 
     recv_data((int)acceptfd,tArray,sizeof(tlist_node_t) * size);
-
     flag = combineTransactionList(tArray,size);
-
     free(tArray);
   }
 
-
   if(flag == 1)
   {
     response = TRANS_OK;
@@ -2005,6 +1924,7 @@ char inspectTransaction(char finalResponse,unsigned int transid,char* debug,int
     // if decision is not lost and okCommit is not TRANS_FLAG, get out of this loop
     while(!((tNode->decision != DECISION_LOST) && (okCommit != TRANS_FLAG))) { 
 //      printf("%s -> transID : %u decision : %d is waiting flag : %d\n",debug,tNode->transid,tNode->decision,TRANS_FLAG);
+//      sleep(3);
       randomdelay();
     }
 
index d96d82544eb176cd593e7e070eaabc855a6e6abf..ea6ab489b081f21fe4eac98f0f17103518a2ba70 100644 (file)
@@ -17,8 +17,6 @@ inline void UnLock(volatile unsigned int *addr) {
                        : "=r" (oldval), "=m" (*(addr))
                        : "0" (0), "m" (*(addr)));
 }
-#elif
-#   error need implementation of test_and_set
 #endif
 
 #define MAXSPINS 4
index edc9c1bec777b7496ea5e1fc947407a8013de13c..073daa5f4ce463bbc86a2fe9797b2db2dbf38003 100644 (file)
@@ -88,6 +88,8 @@ int sendRemoteReq = 0;
 int getResponse = 0;
 
 #ifdef RECOVERY
+
+#define INCREASE_EPOCH(x,y,z) ((x/y+1)*y + z)
 /***********************************
  * Global variables for Duplication
  ***********************************/
@@ -111,13 +113,14 @@ char ip[16];      // for debugging purpose
 extern tlist_t* transList;
 extern pthread_mutex_t clearNotifyList_mutex;
 
-extern unsigned int leader;
+unsigned int currentEpoch;
 
 #ifdef RECOVERYSTATS
   int numRecovery = 0;
   recovery_stat_t* recoverStat;
 #endif
 
+
 #endif
 
 void printhex(unsigned char *, int);
@@ -359,8 +362,8 @@ int recv_data_errorcode(int fd, void *buf, int buflen) {
     if (numbytes==0)
       return 0;
     else if (numbytes == -1) {
-      printf("%s -> ERROR NUMBER = %d %s\n",__func__,errno,strerror(errno));
-      perror("recv_data_errorcode");
+//      printf("%s -> ERROR NUMBER = %d %s\n",__func__,errno,strerror(errno));
+//      perror("recv_data_errorcode");
       return -1;
     }
     bytesRecv += numbytes;
@@ -529,7 +532,7 @@ int dstmStartup(const char * option) {
                updateLiveHosts();
                setLocateObjHosts();
                updateLiveHostsCommit();
-               leader = paxos(hostIpAddrs,liveHosts,myIpAddr,numHostsInSystem,numLiveHostsInSystem);
+//             leader = paxos(hostIpAddrs,liveHosts,myIpAddr,numHostsInSystem,numLiveHostsInSystem);
 //    printHostsStatus();
                if(!allHostsLive()) {
                        printf("Not all hosts live. Exiting.\n");
@@ -837,6 +840,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
   objheader_t *objcopy;
   int size;
 
+
 #ifdef DEBUG
        printf("%s-> Start, oid:%u\n", __func__, oid);
 #endif
@@ -1220,6 +1224,7 @@ int transCommit() {
                        tosend[sockindex].f.nummod = pile->nummod;
                        tosend[sockindex].f.numcreated = pile->numcreated;
                        tosend[sockindex].f.sum_bytes = pile->sum_bytes;
+      tosend[sockindex].f.epoch_num = currentEpoch;
                        tosend[sockindex].listmid = listmid;
                        tosend[sockindex].objread = pile->objread;
                        tosend[sockindex].oidmod = pile->oidmod;
@@ -1715,267 +1720,110 @@ void *getRemoteObj(unsigned int mnum, unsigned int oid) {
        return objcopy;
 }
 
-#ifdef RECOVERY
-/* ask machines if they received decision */
-char receiveDecisionFromBackup(unsigned int transID,int nummid,unsigned int *listmid)
-{
-#ifdef DEBUG
-  printf("%s -> Entering\n",__func__);
-#endif
-
-  int sd; // socket id
-  int i;
-  char response;
-  
-  for(i = 0; i < nummid; i++) {
-    if((sd = getSockWithLock(transPrefetchSockPool, listmid[i])) < 0) {
-      printf("%s -> socket Error!!\n");
-    }
-    else {
-      char control = ASK_COMMIT;
-
-      send_data(sd,&control, sizeof(char));
-      send_data(sd,&transID, sizeof(unsigned int));
-
-      // return -1 if it didn't receive the response
-      int timeout = recv_data(sd,&response, sizeof(char));
-
-
-      if(timeout == 0 || response > 0)
-        break;  // received response
-
-      // else check next machine
-      freeSockWithLock(transPrefetchSockPool, listmid[i],sd);
-     }
-  }
-#ifdef DEBUG
-  printf("%s -> response : %d\n",__func__,response);
-#endif
-
-  return (response==-1)?TRANS_ABORT:response;
-}
-#endif
-
 #ifdef RECOVERY
 void notifyLeaderDeadMachine(unsigned int deadHost) {
-       int sd;
-       char ctrl;
+
+  unsigned int epoch_num;
 
        if(!liveHosts[findHost(deadHost)]) {  // if it is already fixed
     printf("%s -> already fixed\n",__func__);
                sleep(WAIT_TIME);
                return;
        }
+  
+  // increase epoch number by number machines in the system
+  pthread_mutex_lock(&recovery_mutex);
+  epoch_num = currentEpoch = INCREASE_EPOCH(currentEpoch,numHostsInSystem,myIndexInHostArray);
+  pthread_mutex_unlock(&recovery_mutex);
 
-       if(deadHost == leader)  // if leader is dead, then pick a new leader
-               leader = paxos(hostIpAddrs,liveHosts,myIpAddr,numHostsInSystem,numLiveHostsInSystem);
-       
-#ifdef DEBUG
-       printf("%s-> leader?:%s, me?:%s\n", __func__, midtoIPString(leader), (myIpAddr == leader)?"LEADER":"NOT LEADER");
-#endif
-       
-       if(leader == myIpAddr) {    // if i am the leader
-               pthread_mutex_lock(&leaderFixing_mutex);
-               if(!leaderFixing) {
-                       leaderFixing = 1;
-                       pthread_mutex_unlock(&leaderFixing_mutex);
-
-      if(!liveHosts[findHost(deadHost)]) {  // if it is already fixed
-#ifndef DEBUG
-        printf("%s -> already fixed\n",__func__);
-#endif
-        pthread_mutex_lock(&leaderFixing_mutex);
-        leaderFixing =0;
-        pthread_mutex_unlock(&leaderFixing_mutex);
-      }
-      else {
-        restoreDuplicationState(deadHost);
-               
-        pthread_mutex_lock(&leaderFixing_mutex);
-                       leaderFixing = 0;
-                         pthread_mutex_unlock(&leaderFixing_mutex);
-      }
-               }
-               else {
-                       pthread_mutex_unlock(&leaderFixing_mutex);
-#ifndef DEBUG
-      printf("%s -> LEADER is already fixing\n",__func__);
-#endif
-                       sleep(WAIT_TIME);
-               }
-       }
-       else {    // request leader to fix the situation
-               if((sd = getSockWithLock(transPrefetchSockPool, leader)) < 0) {
-                       printf("%s -> socket create error\n",__func__);
-                       exit(-1);
-               }
-               ctrl = REMOTE_RESTORE_DUPLICATED_STATE;
-               send_data(sd, &ctrl, sizeof(char));
-               send_data(sd, &deadHost, sizeof(unsigned int));
-    freeSockWithLock(transPrefetchSockPool,leader,sd);
-    printf("%s -> Message sent\n",__func__);
-         sleep(WAIT_TIME);
-       }
+  // notify all machines that this machien will act as leader.
+  // if return -1, then a machine that higher epoch_num started restoration
+  restoreDuplicationState(deadHost,epoch_num);
 }
 
 /* Leader's role */ 
-void restoreDuplicationState(unsigned int deadHost)
+void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num)
 {
   printf("%s -> Entering\n",__func__);
+  int* sdlist;
+  tlist_t* tList;
 
 #ifdef RECOVERYSTATS
   printf("Recovery Start\n");
-  numRecovery++;
   long long st;
   long long fi;
+  int flag = 0;
   unsigned int dupeSize = 0;  // to calculate the size of backed up data
 
   st = myrdtsc(); // to get clock
-  recoverStat[numRecovery-1].deadMachine = deadHost;
+  recoverStat[numRecovery].deadMachine = deadHost;
 #endif
-
   // update leader's live host list and object locations
-  updateLiveHostsList(deadHost);
-  setReLocateObjHosts(deadHost);
-
-  // stop all transactions and update all other's machine list
-  notifyRestoration();
-
-
-  // wait until all machines wait for leader
-  waitForAllMachine();
+  
+  do {
+    sdlist = getSocketLists();
 
+    printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num);
+    if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) {
+      break;;
+    }
+    
+    printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num);
 
-  // clear transaction
-  clearTransaction();
+    if((flag = releaseNewLists(epoch_num,sdlist,tList)) < 0) {
+      break;;
+    }
 
-  // transfer lost objects
-  duplicateLostObjects(deadHost);
+    // transfer lost objects
+    if((flag= duplicateLostObjects(epoch_num,sdlist)) < 0) {
+      break;
+    }
+    // restart transactions
+    restartTransactions(epoch_num,sdlist);
+  }while(0);
 
-  // restart transactions
-  restartTransactions();
+  freeSocketLists(sdlist);
 
+  if(flag < 0) {
+    printf("%s -> higher epoch\n",__func__);
+    while(okCommit != TRANS_OK) {
+//      printf("%s -> Waiting\n",__func__);
+      randomdelay();
+    }
+    
+  }else {
 #ifdef RECOVERYSTATS
   fi = myrdtsc();
-  recoverStat[numRecovery-1].elapsedTime = (fi-st)/CPU_FREQ;
+  recoverStat[numRecovery].elapsedTime = (fi-st)/CPU_FREQ;
+  numRecovery++;
   printRecoveryStat();
 #endif
-
+  }
   printf("%s -> Exiting\n",__func__);
 }
 
-/* 
-  1. request all other machines to stop transactions
-  2. update their live machine list
- */
-   
-void notifyRestoration()
+int* getSocketLists()
 {
+  struct sockaddr_in remoteAddr[numHostsInSystem];
+  int* sdlist;
+  char request = RESPOND_LIVE;
+  char response;
   int i;
   int sd;
-  int sdlist[numHostsInSystem];
-
-  printHostsStatus();
-
-  pthread_mutex_lock(&liveHosts_mutex);
-  numWaitMachine = 0;
-  pthread_mutex_unlock(&liveHosts_mutex);
-  // for other machines
-  for(i = 0; i < numHostsInSystem; i++) {
-    if(liveHosts[i] != 1 || hostIpAddrs[i] == myIpAddr) {
-      sdlist[i] = -1;
-      continue;
-    }
-    
-    if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0)
-    {
-      printf("%s -> socket create error\n",__func__);
-      exit(0);
-    }
-    else {
-      sdlist[i] = sd;
-      char request = REQUEST_TRANS_WAIT; 
-
-      send_data(sd, &request, sizeof(char));
-
-      /* send new host lists and object location */
-           pthread_mutex_lock(&liveHosts_mutex);
-      send_data(sd, liveHosts, sizeof(int)*numHostsInSystem);
-           send_data(sd, locateObjHosts, sizeof(unsigned int)*numHostsInSystem*2);
-      pthread_mutex_unlock(&liveHosts_mutex);
-    }
-  }
-  
-
-  for(i = 0 ; i < numHostsInSystem; i++) {
-    if(sdlist[i] != -1)
-    {
-      char response;
-      recv_data(sdlist[i],&response,sizeof(char));
-      if(response == RESPOND_TRANS_WAIT) {
-        pthread_mutex_lock(&liveHosts_mutex);
-        numWaitMachine++;
-        pthread_mutex_unlock(&liveHosts_mutex);
-      }
-
-      freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sdlist[i]);
-    }
-  }
-  /* stop all local transactions */
-  stopTransactions(TRANS_BEFORE);
-}
-
-/* acknowledge leader that all transactions are waiting */
-void respondToLeader()
-{
-  printf("%s -> Enter\n",__func__);
-  int sd;
 
-  if((sd = getSockWithLock(transPrefetchSockPool, leader)) < 0) {
-    printf("%s -> cannot open the socket\n",__func__);
+  if((sdlist = calloc(numHostsInSystem,sizeof(int))) == NULL) 
+  {
+    printf("%s -> calloc error\n",__func__);
     exit(0);
   }
-  else {
-    char request = RESPOND_TRANS_WAIT;
-//    printf("%s -> request = %s\n sd = %d\n",__func__,(request==RESPOND_TRANS_WAIT)?"RESPOND_TRANS_WAIT":"NONO");
-    send_data(sd,&request,sizeof(char));
-    freeSockWithLock(transPrefetchSockPool,leader,sd);
-  }
-  
-  printf("%s -> Exit\n",__func__);
-  return;
-}
-
-/* wait untill receive from all machine */
-void waitForAllMachine()
-{
-  pthread_mutex_lock(&liveHosts_mutex);
-  numWaitMachine++; // for local. It is done 
-  pthread_mutex_unlock(&liveHosts_mutex);
-
-
-  /* wait untill receive from all machine */
-  while(numWaitMachine < numLiveHostsInSystem) {
-    sleep(1);
-  }
-}
-
-void clearTransaction()
-{
-  int size;
-  tlist_t* tlist;
-  int sd;
-  struct sockaddr_in remoteAddr[numHostsInSystem];
-  int sdlist[numHostsInSystem];
-  int i;
 
   // open sockets to all live machines
   for(i = 0 ; i < numHostsInSystem; i++) {
-    if(liveHosts[i] == 1 && hostIpAddrs[i] != myIpAddr) {
+    if(liveHosts[i] == 1) {
       if((sd = socket(AF_INET , SOCK_STREAM, 0 )) < 0) 
       {
-        printf("%s -> socket create Error\n",__func__);
+        sdlist[i] = -1;
+        liveHosts[i] = 0;
       }
       else {
         bzero(&remoteAddr[i], sizeof(remoteAddr[i]));
@@ -1985,49 +1833,72 @@ void clearTransaction()
 //        printf("%s -> open sd : %d to %s\n",__func__,sd,midtoIPString(hostIpAddrs[i]));
 
         if(connect(sd, (struct sockaddr *)&remoteAddr[i], sizeof(remoteAddr[i])) < 0) {
-          printf("%s -> socket connect error\n",__func__);
-          exit(0);
+          sdlist[i] = -1;
+          liveHosts[i] = 0;
         }
         else {
-          sdlist[i] = sd;
+          send_data(sd,&request,sizeof(char));
+
+          recv_data(sd,&response,sizeof(char));
+          
+          if(response == LIVE) {
+            sdlist[i] = sd;
+            liveHosts[i] = 1;
+          }        
         }
       }
     }
     else {
+      liveHosts[i] = 0;
       sdlist[i] = -1;
     }
   }
+  
+  return sdlist;
+}
 
-  /* receive transaction lists from all machines and 
-     clarefy all decisions
-     returns an array of ongoing transactions  */
-  makeTransactionLists(&tlist,sdlist);
-
-
-  /* release the cleared decisions to all machines */
-  releaseTransactionLists(tlist,sdlist);
-
+void freeSocketLists(int* sdlist)
+{
+  int i;
   for(i = 0 ; i < numHostsInSystem; i++) {
    if(sdlist[i] != -1) {
      close(sdlist[i]);
    }
   }
 
-  tlistDestroy(tlist);
-   printf("%s -> End\n",__func__);
+  free(sdlist);
 }
 
-// after this fuction
-// leader knows all the on-going transaction list and their decisions
-void makeTransactionLists(tlist_t** tlist,int* sdlist)
+// stop transactions, receive translists, live machine lists.
+int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList)
 {
+
   printf("%s -> Enter\n",__func__);
-  int sd;
   int i;
-  tlist_t* currentTransactionList = tlistCreate();
+  char request;
+  char response;
+  tlist_t* currentTransactionList;
+   
+  if(inspectEpoch(epoch_num) < 0) {
+    printf("%s -> Higher Epoch\n",__func__);
+    return -1;
+  }
+
+  currentTransactionList = tlistCreate();
+
+  // request remote amchines to stop all transactions
+  for(i = 0; i < numHostsInSystem; i++)
+  {
+    if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr)
+      continue;
+  
+    request = REQUEST_TRANS_WAIT;
+    send_data(sdlist[i],&request, sizeof(char));
+    send_data(sdlist[i],&epoch_num,sizeof(unsigned int));
+  }
 
-  printf("%s -> tlist size : %d\n",__func__,transList->size);
-  printf("%s -> currentTransactionList size : %d\n",__func__,currentTransactionList->size);
+  /* stop all local transactions */
+  stopTransactions(TRANS_BEFORE);
 
 
   // grab leader's transaction list first
@@ -2039,132 +1910,94 @@ void makeTransactionLists(tlist_t** tlist,int* sdlist)
     walker = walker->next;
   }
 
-  // receive others transaction list
-  for(i = 0; i < numHostsInSystem;i ++) {
-    if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr) {
-      char request = REQUEST_TRANS_LIST;
-      int size;
-      int j;
-      tlist_node_t* transArray;
-      tlist_node_t* tmp;
-
-      sd = sdlist[i];
-
-      // send request
-      send_data(sd, &request, sizeof(char));
-
-      // receive all on-going transaction list
-      recv_data(sd, &size, sizeof(int));
-
-      printf("%s -> %s - size : %d\n",__func__,midtoIPString(hostIpAddrs[i]),size);
-      if((transArray = calloc(size, sizeof(tlist_node_t))) == NULL) {
-        printf("%s -> calloc error\n",__func__);
-        exit(0);
-      }
-      
-      recv_data(sd,transArray, sizeof(tlist_node_t) * size);
+  for(i = 0; i < numHostsInSystem; i++)
+  {
+    if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr)
+      continue;
 
-      // add into currentTransactionList
-      for(j = 0 ; j < size; j ++) {
-        tmp = tlistSearch(currentTransactionList,transArray[j].transid);
-          
-        if(tmp == NULL) {
-          tlist_node_t* tNode = &transArray[j];
-          tNode->status = TRANS_OK;
-          currentTransactionList = tlistInsertNode2(currentTransactionList,&(transArray[j]));
-        }
-        else {
-          if((tmp->decision != TRANS_COMMIT && tmp->decision != TRANS_ABORT) 
-                && (transArray[j].decision == TRANS_COMMIT || transArray[j].decision == TRANS_ABORT))
-          {
-           tmp->decision = transArray[j].decision;
-          }
-        }
-      }  // j loop
+    recv_data(sdlist[i],&response,sizeof(char));
 
-      free(transArray);
+    if(response == RESPOND_TRANS_WAIT) 
+    {
+      // receive live host list
+      computeLiveHosts(sdlist[i]);
+      // receive transaction list
+      makeTransactionLists(&currentTransactionList,sdlist[i]);
     }
-  }  // i loop
-  printf("Before\n");
-  tlistPrint(currentTransactionList);
-
-  // current transaction list is completed
-  // now see if any transaction is still missing
+    else if(response == RESPOND_HIGHER_EPOCH)
+    {
+      tlistDestroy(currentTransactionList);
+      return -1;
+    }
+    else {
+      printf("%s -> no response mid : %s\n",__func__,midtoIPString(hostIpAddrs[i]));
+      liveHosts[i] = 0;
+      sdlist[i] = -1;
+    }
+  }
+  
   walker = currentTransactionList->head;
 
   while(walker) {
-//    if(walker->decision == DECISION_LOST) {
-      for(i = 0 ; i < numHostsInSystem; i++) {
-        if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr)
-        {
-          char request = REQUEST_TRANS_CHECK;
-          char respond;
-
-          send_data(sdlist[i], &request, sizeof(char));
-          send_data(sdlist[i], &(walker->transid), sizeof(unsigned int));
-
-          recv_data(sdlist[i], &respond, sizeof(char));
-
-          if(respond  > 0)
-          {
-            walker->decision = respond;
-            break;
-          }
-        }
-        else if(hostIpAddrs[i] == myIpAddr)
-        {
-          char decision = checkDecision(walker->transid);
-          if(decision > 0) {
-            walker->decision = decision;
-            break;
-          }
-        }
-      } // i loop
-
-      if(walker->decision == DECISION_LOST) {
-        printf("%s -> No one knows decision for transID : %u\n",__func__,walker->transid);
-        walker->decision = TRANS_ABORT;
-      }
-      if(walker->decision == TRYING_TO_COMMIT) {
-        printf("%s -> no decision yet transID : %u\n",__func__,walker->transid);
-      }
+    if(walker->decision == DECISION_LOST) {
+      printf("%s -> No one knows decision for transID : %u\n",__func__,walker->transid);
+      walker->decision = TRANS_ABORT;
+    }
     walker = walker->next;
-  } // while loop
+  }
 
-  printf("%s -> currentTransactionList size : %d\n",__func__,currentTransactionList->size);
+  tlistPrint(currentTransactionList);
+  *tList = currentTransactionList;
 
-  for(i = 0; i < numHostsInSystem; i++) {
-    if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr) 
+  printf("%s -> Exit\n",__func__);
+  return 0;
+}
+
+void computeLiveHosts(int sd)
+{
+  int receivedHost[numHostsInSystem];
+  int i;
+  
+  recv_data(sd,receivedHost,sizeof(int)*numHostsInSystem);
+
+  for(i = 0 ; i < numHostsInSystem;i ++)
+  {
+    if(liveHosts[i] == 1 && receivedHost[i] == 1)
     {
-      char request = REQUEST_TRANS_COMPLETE;
-      send_data(sdlist[i], &request,sizeof(char));      
+      liveHosts[i] = 1;
     }
+    else 
+      liveHosts[i] = 0;
   }
-
-  *tlist = currentTransactionList;
-  printf("\n\nAfter\n");
-  tlistPrint(currentTransactionList);
-
-  printf("%s -> End\n",__func__);
+  
+  return;
 }
 
-// send out current on-going transaction
-void releaseTransactionLists(tlist_t* tlist,int* sdlist)
+int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist)
 {
-  printf("%s -> Enter\n",__func__);
-  int size;
-  tlist_node_t* tArray = tlistToArray(tlist,&size);
   int i;
-  char response;
+  char response = RELEASE_NEW_LIST;
+  int size;
   int flag;
-  printf("%s -> size : %d\n",__func__,size);
+  tlist_node_t* tArray;
+  
+  
+  if(inspectEpoch(epoch_num) < 0) return -1;  
+  
+  tArray = tlistToArray(tlist,&size);
 
   for(i = 0; i < numHostsInSystem; i++)
   {
     if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr)
     {
+      send_data(sdlist[i],&response,sizeof(char));
+      send_data(sdlist[i],&epoch_num,sizeof(unsigned int));
+
+      // new host list
+      pthread_mutex_lock(&liveHosts_mutex);
+      send_data(sdlist[i],liveHosts,sizeof(int) * numHostsInSystem);
+      pthread_mutex_unlock(&liveHosts_mutex);
+
       if(size == 0) {
         size = -1;
         send_data(sdlist[i],&size,sizeof(int));
@@ -2175,18 +2008,19 @@ void releaseTransactionLists(tlist_t* tlist,int* sdlist)
       }
     }
     else {
+      setLocateObjHosts();
+//      printHostsStatus();
       flag = combineTransactionList(tArray,size);
 
       if(flag == 0) {
         printf("%s -> problem\n",__func__);
         exit(0);
       }
-
-//      okCommit = TRANS_AFTER;
       stopTransactions(TRANS_AFTER);
     }
   }
-  
+
+  printf("%s -> After sending msg\n",__func__);
   if(size > 0)
     free(tArray);
 
@@ -2195,44 +2029,115 @@ void releaseTransactionLists(tlist_t* tlist,int* sdlist)
     {
  //     printf("%s -> Waiting for %s\n",__func__,midtoIPString(hostIpAddrs[i]));
       recv_data(sdlist[i], &response, sizeof(char));
-
       if(response != TRANS_OK)
       {
         printf("%s -> response : %d Need to fix\n",__func__,response);
       }
+      else if(response == RESPOND_HIGHER_EPOCH)
+      {
+        printf("%s -> Higher epoch!\n",__func__);
+        return -1;
+      }
     }
   }
-  
+  tlistDestroy(tlist);  
   printf("%s -> End\n",__func__);
 }
 
-void restartTransactions()
+// after this fuction
+// leader knows all the on-going transaction list and their decisions
+void makeTransactionLists(tlist_t** tlist,int sd)
+{
+  tlist_node_t* transArray;
+  tlist_node_t* tmp;
+  tlist_node_t* walker;
+  int j;
+  int size;
+
+  // receive all on-going transaction list
+  recv_data(sd, &size, sizeof(int));
+
+  if((transArray = calloc(size, sizeof(tlist_node_t))) == NULL) {
+    printf("%s -> calloc error\n",__func__);
+    exit(0);
+  }
+      
+  recv_data(sd,transArray, sizeof(tlist_node_t) * size);
+
+  // add into currentTransactionList
+  for(j = 0 ; j < size; j ++) {
+    tmp = tlistSearch(*tlist,transArray[j].transid);
+
+    if(tmp == NULL) {
+      tlist_node_t* tNode = &transArray[j];
+      tNode->status = TRANS_OK;
+      *tlist = tlistInsertNode2(*tlist,&(transArray[j]));
+    }
+    else {
+      if(tmp->decision == DECISION_LOST && transArray[j].decision != DECISION_LOST)
+      {
+        tmp->decision = transArray[j].decision;
+      }
+
+    }
+  }  // j loop
+  
+  free(transArray);
+
+  // current transaction list is completed
+  // now see if any transaction is still missing
+  walker = (*tlist)->head;
+  char request = REQUEST_TRANS_CHECK;
+  char respond;
+
+  while(walker) {
+    send_data(sd, &request, sizeof(char));
+    send_data(sd, &(walker->transid), sizeof(unsigned int));
+
+    recv_data(sd, &respond, sizeof(char));
+
+    if(respond  > 0)
+    {
+      walker->decision = respond;
+      break;
+    } 
+    walker = walker->next;
+  } // while loop
+
+  request = REQUEST_TRANS_COMPLETE;
+  send_data(sd, &request,sizeof(char));    
+
+  return;
+}
+
+void restartTransactions(unsigned int epoch_num,int* sdlist)
 {
   int i;
   int sd;
+  char request;
+  char response;
+
   for(i = 0; i < numHostsInSystem; i++) {
-    if(hostIpAddrs[i] == myIpAddr) {
-      pthread_mutex_lock(&liveHosts_mutex);
-      okCommit = TRANS_OK;
-      pthread_mutex_unlock(&liveHosts_mutex);
+    if(sdlist[i] == -1) 
       continue;
-    }
-    if(liveHosts[i] == 1)
-    {
-      if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0)
-      {
-        printf("%s -> socket create error sd : %d\n",__func__,sd);
-        exit(0);
-      }
-      else {
-        char request = REQUEST_TRANS_RESTART;
 
-        send_data(sd, &request, sizeof(char));
+    printf("%s -> request to %s\n",__func__,midtoIPString(hostIpAddrs[i]));
+    request = REQUEST_TRANS_RESTART;
+    send_data(sdlist[i], &request, sizeof(char));
+    send_data(sdlist[i], &epoch_num,sizeof(char));
+  }
+}
 
-        freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
-      }
-    }
+int inspectEpoch(unsigned int epoch_num)
+{
+  int flag = 1;
+  pthread_mutex_lock(&recovery_mutex);
+  if(epoch_num < currentEpoch) {
+    flag = -1;
   }
+  pthread_mutex_unlock(&recovery_mutex);
+
+  return flag;
 }
 
 #endif
@@ -3050,6 +2955,7 @@ int processConfigFile() {
   myIndexInHostArray = findHost(myIpAddr);
 #ifdef RECOVERY
        liveHosts[myIndexInHostArray] = 1;
+  currentEpoch = myIndexInHostArray;
 
 #ifdef RECOVERYSTATS
   numRecovery = 0;
@@ -3245,98 +3151,24 @@ int updateLiveHostsCommit() {
 
 #ifdef RECOVERY
 void setLocateObjHosts() {
-       int i = 0, validIndex = 0;
+       int i,validIndex;
 
        //check num hosts even valid first
-       
-       for(i = 0;i < numHostsInSystem; i++) {
-               
+       for(i = 0,validIndex = numHostsInSystem;i < numHostsInSystem; i++,validIndex = numHostsInSystem) {
                while(liveHosts[(i+validIndex)%numHostsInSystem] == 0) {
-                       validIndex++;
+                       validIndex--;
                }
                locateObjHosts[i*2] = hostIpAddrs[(i+validIndex)%numHostsInSystem];
-#ifdef DEBUG
-               printf("%s-> locateObjHosts[%d]:%s\n", __func__, i*2, midtoIPString(locateObjHosts[(i*2)]));
-#endif
-
-               validIndex++;
+               
+    validIndex = numHostsInSystem + 1;
                while(liveHosts[(i+validIndex)%numHostsInSystem] == 0) {
                        validIndex++;
                }
-#ifdef DEBUG
-               printf("%s-> validIndex:%d, this mid is: [%s]\n", __func__, validIndex, midtoIPString(hostIpAddrs[(i+validIndex)%numHostsInSystem]));
-#endif
-               locateObjHosts[(i*2)+1] = hostIpAddrs[(i+validIndex)%numHostsInSystem];
-               validIndex=0;
-
-#ifdef DEBUG
-               printf("%s-> locateObjHosts[%d]:%s\n", __func__, i*2+1, midtoIPString(locateObjHosts[(i*2)+1]));
-#endif
+               
+    locateObjHosts[(i*2)+1] = hostIpAddrs[(i+validIndex)%numHostsInSystem];
        }
 }
 
-// check the passed machine if it is still alive
-void updateLiveHostsList(int mid)
-{
-  int mIndex =  findHost(mid);
-  int sd;
-
-  if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[mIndex])) < 0) {
-    liveHosts[mIndex] = 0;
-    numLiveHostsInSystem--;
-    return;
-  }
-
-  char liverequest = RESPOND_LIVE;
-
-  send_data(sd, &liverequest, sizeof(char));
-
-  char response = 0;
-  int timeout = recv_data(sd, &response, sizeof(char));
-
-  if(response != LIVE) {
-    liveHosts[mIndex] = 0;
-    numLiveHostsInSystem--;
-  }
-
-  freeSockWithLock(transPrefetchSockPool,hostIpAddrs[mIndex],sd);
-  return;
-
-}
-
-// rearrange object location array of leader machine
-void setReLocateObjHosts(int mid)
-{
-  int mIndex = findHost(mid);
-  int backupMachine = getBackupMachine(mid);
-  int newPrimary = getDuplicatedPrimaryMachine(mid);
-  int newPrimaryIndex = findHost(newPrimary);
-  int i;
-
-  /* duplicateLostObject example
-   * Before M24 die,
-   * MID        21      24      26
-   * Primary    21      24      26
-   * Backup     26      21      24
-   * After M24 die,
-   * MID        21      26
-   * Primary   21,24    26
-   * Backup     26      21,24
-   */
-
-  locateObjHosts[2*newPrimaryIndex+1] = backupMachine;
-  locateObjHosts[2*mIndex] = newPrimary;
-
-/* relocate the objects of the machines already dead */
-  for(i=0; i<numHostsInSystem *2; i+=2) {
-    if(locateObjHosts[i] == mid)
-      locateObjHosts[i] = newPrimary;
-    if(locateObjHosts[i+1] == mid)
-      locateObjHosts[i+1] = backupMachine;
-  }
-}
-
-
 //debug function
 void printHostsStatus() {
        int i;
@@ -3364,27 +3196,12 @@ int allHostsLive() {
 #endif
 
 #ifdef RECOVERY
-void duplicateLostObjects(unsigned int mid){
+// request all machines to check their objects
+int duplicateLostObjects(unsigned int epoch_num,int *sdlist){
+  int i;
+  char response;
   printf("%s -> Enter\n",__func__);
 
-#ifdef RECOVERYSTATS
-  unsigned int dupeSize = 0;
-#endif
-
-       //this needs to be changed.
-       unsigned int backupMid = getBackupMachine(mid); // get backup machine of dead machine
-       unsigned int originalMid = getPrimaryMachine(mid); // get primary machine that used deadmachine as backup machine.
-
-#ifdef DEBUG
-       printHostsStatus(); 
-#endif
-
-       
-       //connect to these machines
-       //go through their object store copying necessary (in a transaction)
-       int i, j, tmpNumLiveHosts = 0;
-  int psd,bsd;
-
   /* duplicateLostObject example
    * Before M24 die,
    * MID        21      24      26
@@ -3396,164 +3213,33 @@ void duplicateLostObjects(unsigned int mid){
    * Backup     26      21,24
    */
 
-  if(((psd = getSockWithLock(transPrefetchSockPool, originalMid)) < 0 ) || 
-      ((bsd = getSockWithLock(transPrefetchSockPool,backupMid)) <0)) {
-
-    printf("%s -> psd : %d bsd : %d\n",__func__,psd,bsd);
-    printf("%s -> Socket create error\n",__func__);
-  
-    while(1) 
-      sleep(10);
-  }
-
-/* request for original */
-       char duperequest;
-       duperequest = DUPLICATE_ORIGINAL;
-       send_data(psd, &duperequest, sizeof(char));
-       send_data(psd, &backupMid, sizeof(unsigned int));
-
-/* request for backup */
-       duperequest = DUPLICATE_BACKUP;
-       send_data(bsd, &duperequest, sizeof(char));
-       send_data(bsd, &originalMid, sizeof(unsigned int));
-
-       char p_response,b_response;
-  unsigned int p_receivedSize,b_receivedSize;
+  if(inspectEpoch(epoch_num) < 0) return -1;
 
-  recv_data(psd, &p_response, sizeof(char));
-  recv_data(psd, &p_receivedSize, sizeof(unsigned int));
-
-#ifdef RECOVERYSTATS
-  dupeSize += p_receivedSize; // size of primary data
-#endif
+  response = REQUEST_DUPLICATE;
 
-  recv_data(bsd, &b_response, sizeof(char));
-  recv_data(bsd, &b_receivedSize, sizeof(unsigned int));
-
-#ifdef RECOVERYSTATS
-  dupeSize += b_receivedSize; // size of backup data
-#endif
-
-  if(p_response != DUPLICATION_COMPLETE || b_response != DUPLICATION_COMPLETE)
-  {
-    printf("%s -> Duplication Fail\n",__func__);
-    exit(0);
+  for(i = 0 ; i < numHostsInSystem; i ++) {
+    if(sdlist[i] == -1)
+      continue;
+    send_data(sdlist[i],&response,sizeof(char));
+    send_data(sdlist[i],&epoch_num,sizeof(unsigned int));
   }
+  for(i = 0 ; i < numHostsInSystem; i ++) {
+    if(sdlist[i] == -1)
+      continue;
+    recv_data(sdlist[i],&response,sizeof(char));
 
-  freeSockWithLock(transPrefetchSockPool, originalMid, psd);
-  freeSockWithLock(transPrefetchSockPool, backupMid, bsd);
-
-#ifdef RECOVERYSTATS
-  recoverStat[numRecovery-1].recoveredData = dupeSize;
-#endif
-
-#ifndef DEBUG
-       printf("%s-> End\n", __func__);  
-#endif
-}
-
-unsigned int duplicateLocalBackupObjects(unsigned int mid) {
-       int sd;
-  unsigned int tempsize;
-  int i;
-       char *dupeptr, ctrl, response;
-#ifdef DEBUG
-       printf("%s-> Start; backup mid:%s\n", __func__, midtoIPString(mid));  
-#endif
-
-       //copy code from dstmserver here
-  dupeptr = (char*) mhashGetDuplicate(&tempsize, 1);
-
-#ifdef DEBUG
-       printf("tempsize:%d, dupeptrfirstvalue:%d\n", tempsize, *((unsigned int *)(dupeptr)));
-#endif
-       //send control and dupes after
-       ctrl = RECEIVE_DUPES;
-       if((sd = getSockWithLock(transPrefetchSockPool, mid)) < 0) {
-               printf("duplicatelocalbackup: socket create error\n");
-               //usleep(1000);
-       }
-#ifdef DEBUG
-       printf("%s -> sd:%d, tempsize:%d, dupeptrfirstvalue:%d\n", __func__,sd, tempsize, *((unsigned int *)(dupeptr)));
-#endif
-       send_data(sd, &ctrl, sizeof(char));
-       send_data(sd, dupeptr, tempsize);
-       
-  recv_data(sd, &response, sizeof(char));
-  freeSockWithLock(transPrefetchSockPool,mid,sd);
-
-#ifdef DEBUG
-  printf("%s ->response : %d  -  %d\n",__func__,response,DUPLICATION_COMPLETE);
-#endif
-
-       if(response != DUPLICATION_COMPLETE) {
-#ifdef DEBUG
-    printf("%s -> DUPLICATION_FAIL\n",__func__);
-#endif
-    exit(-1);
-       }
-  free(dupeptr);
+    if(response != DUPLICATION_COMPLETE) {
+      printf("%s -> fail!\n",__func__);
+      exit(0);
+    } 
+  }
 
 #ifndef DEBUG
        printf("%s-> End\n", __func__);  
 #endif
-
-  return tempsize;
-
-}
-
-unsigned int duplicateLocalOriginalObjects(unsigned int mid) {
-       int sd;
-  unsigned int tempsize;
-       char *dupeptr, ctrl, response;
-
-#ifdef DEBUG
-       printf("%s-> Start\n", __func__);  
-#endif
-       //copy code fom dstmserver here
-
-  dupeptr = (char*) mhashGetDuplicate(&tempsize, 0);
-
-       //send control and dupes after
-       ctrl = RECEIVE_DUPES;
-
-       if((sd = getSockWithLock(transPrefetchSockPool, mid)) < 0) {
-               printf("DUPLICATE_ORIGINAL: socket create error\n");
-               //usleep(1000);
-       }
-#ifdef DEBUG
-       printf("sd:%d, tempsize:%d, dupeptrfirstvalue:%d\n", sd, tempsize, *((unsigned int *)(dupeptr)));
-#endif
-
-       send_data(sd, &ctrl, sizeof(char));
-       send_data(sd, dupeptr, tempsize);
-
-       recv_data(sd, &response, sizeof(char));
-  freeSockWithLock(transPrefetchSockPool,mid,sd);
-
-#ifdef DEBUG
-  printf("%s ->response : %d  -  %d\n",__func__,response,DUPLICATION_COMPLETE);
-#endif
-
-       if(response != DUPLICATION_COMPLETE) {
-               //fail message
-#ifdef DEBUG
-    printf("%s -> DUPLICATION_FAIL\n",__func__);
-#endif
-    exit(0);
-       }
-  
-  free(dupeptr);
-
-#ifdef DEBUG
-       printf("%s-> End\n", __func__);  
-#endif
-  return tempsize;
-
 }
-
 #endif
-
 void addHost(unsigned int hostIp) {
   unsigned int *tmpArray;
   int *tmpliveHostsArray;