From b892136ff4f33ac0137f7ec3360189ee04712ad8 Mon Sep 17 00:00:00 2001 From: jihoonl Date: Tue, 27 Apr 2010 16:47:18 +0000 Subject: [PATCH] new recovery protocol --- .../Runtime/DSTM/interface_recovery/dstm.h | 51 +- .../DSTM/interface_recovery/dstmserver.c | 274 ++---- .../DSTM/interface_recovery/sockpool.c | 2 - .../Runtime/DSTM/interface_recovery/trans.c | 916 ++++++------------ 4 files changed, 422 insertions(+), 821 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h index 3349b21a..c7c9f216 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h @@ -57,23 +57,20 @@ *****************************/ #define RESPOND_LIVE 30 #define LIVE 31 -#define REMOTE_RESTORE_DUPLICATED_STATE 37 -#define UPDATE_LIVE_HOSTS 32 -#define DUPLICATE_ORIGINAL 33 -#define DUPLICATE_BACKUP 34 +#define UPDATE_LIVE_HOSTS 32 +#define REQUEST_DUPLICATE 33 #define DUPLICATION_COMPLETE 35 #define RECEIVE_DUPES 36 /********************************* * Transaction Clear Messages *********************************/ -#define ASK_COMMIT 51 -#define CLEAR_NOTIFY_LIST 52 -#define REQUEST_TRANS_WAIT 53 -#define RESPOND_TRANS_WAIT 54 -#define REQUEST_TRANS_RESTART 55 -#define REQUEST_TRANS_LIST 56 -#define REQUEST_TRANS_RECOVERY 57 +#define CLEAR_NOTIFY_LIST 51 +#define REQUEST_TRANS_WAIT 52 +#define RESPOND_TRANS_WAIT 53 +#define RESPOND_HIGHER_EPOCH 54 +#define RELEASE_NEW_LIST 55 +#define REQUEST_TRANS_RESTART 56 #define REQUEST_TRANS_CHECK 58 #define REQUEST_TRANS_COMPLETE 59 @@ -214,6 +211,10 @@ typedef struct fixed_data { unsigned int nummod; /* no of objects modified */ unsigned int numcreated; /* no of objects created */ int sum_bytes; /* total bytes of modified objects in a transaction */ + +#ifdef RECOVERY + unsigned int epoch_num; +#endif } fixed_data_t; /* Structure that holds trans request information for each participant */ @@ -224,10 +225,6 @@ typedef struct trans_req_data { unsigned int *oidmod; /* Pointer to array holding oids of objects that are modified */ unsigned int *oidcreated; /* Pointer to array holding oids of objects that are newly created */ -#ifdef RECOVERY - unsigned int transid; -#endif - } trans_req_data_t; /* Structure that holds information of objects that are not found in the participant @@ -243,10 +240,8 @@ typedef struct trans_commit_data { } trans_commit_data_t; #ifdef RECOVERY - int leaderFixing; - pthread_mutex_t leaderFixing_mutex; + pthread_mutex_t recovery_mutex; pthread_mutex_t liveHosts_mutex; - #endif #ifdef RECOVERYSTATS @@ -290,6 +285,7 @@ void updateLiveHostsList(int mid); int updateLiveHostsCommit(); void receiveNewHostLists(int accept); void stopTransactions(int TRANS_FLAG); +void sendMyList(int); void sendTransList(int acceptfd); int receiveTransList(int acceptfd); int combineTransactionList(tlist_node_t* tArray,int size); @@ -313,17 +309,18 @@ void clearDeadThreadsNotification(); /* for recovery */ void reqClearNotifyList(unsigned int oid); void clearNotifyList(unsigned int oid); -void duplicateLostObjects(unsigned int mid); -unsigned int duplicateLocalBackupObjects(); -unsigned int duplicateLocalOriginalObjects(); void notifyLeaderDeadMachine(unsigned int deadHost); -void restoreDuplicationState(unsigned int deadHost); -void notifyRestoration(); -void clearTransaction(); -void makeTransactionLists(tlist_t**,int*); -void releaseTransactionLists(tlist_t*,int*); +void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num); +int* getSocketLists(); +void freeSocketLists(int*); +int inspectEpoch(unsigned int); +int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t**); +int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t*); +int duplicateLostObjects(unsigned int epoch_num,int* sdlist); +void restartTransactions(unsigned int epoch_num,int* sdlist); +void makeTransactionLists(tlist_t**,int); +void computeLiveHosts(int); void waitForAllMachine(); -void restartTransactions(); int readDuplicateObjs(int); void printRecoveryStat(); diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index 163a1684..d1049347 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -42,6 +42,7 @@ pthread_mutex_t clearNotifyList_mutex; tlist_t* transList; int okCommit; // machine flag extern numWaitMachine; +extern unsigned int currentEpoch; #endif @@ -53,7 +54,6 @@ pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a sockPoolHashTable_t *transPResponseSocketPool; #ifdef RECOVERY -extern unsigned int leader; long long myrdtsc(void) { @@ -76,7 +76,7 @@ int dstmInit(void) { #ifdef RECOVERY pthread_mutex_init(&liveHosts_mutex, NULL); - pthread_mutex_init(&leaderFixing_mutex, NULL); + pthread_mutex_init(&recovery_mutex, NULL); pthread_mutex_init(&clearNotifyList_mutex,NULL); #endif @@ -93,6 +93,10 @@ int dstmInit(void) { printf("well error\n"); return 1; } + + okCommit = TRANS_OK; + currentEpoch = 1; + #endif if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR)) @@ -104,7 +108,6 @@ int dstmInit(void) { return 0; } - okCommit = TRANS_OK; return 0; } @@ -271,6 +274,7 @@ void *dstmAccept(void *acceptfd) { void *dupeptr; unsigned int transIDreceived; char decision; + unsigned int epoch_num; int timeout; #endif @@ -294,7 +298,7 @@ void *dstmAccept(void *acceptfd) { if (ret==0) break; if (ret==-1) { - printf("DEBUG -> RECV Error!.. retrying\n"); +// printf("DEBUG -> RECV Error!.. retrying\n"); // exit(0); break; } @@ -358,18 +362,6 @@ void *dstmAccept(void *acceptfd) { pthread_exit(NULL); } break; -#ifdef RECOVERY - case ASK_COMMIT : - - if(recv_data((int)acceptfd, &transIDreceived, sizeof(unsigned int)) < 0) - break; - - decision = checkDecision(transIDreceived); - - send_data((int)acceptfd,&decision,sizeof(char)); - - break; -#endif case TRANS_PREFETCH: #ifdef DEBUG printf("control -> TRANS_PREFETCH\n"); @@ -500,70 +492,54 @@ void *dstmAccept(void *acceptfd) { #ifdef RECOVERY case RESPOND_LIVE: ctrl = LIVE; - send_data((int)acceptfd, &ctrl, sizeof(ctrl)); - break; -#endif -#ifdef RECOVERY - case REMOTE_RESTORE_DUPLICATED_STATE: -#ifdef DEBUG - printf("control -> REMOTE_RESTORE_DUPLICATED_STATE\n"); -#endif - recv_data((int)acceptfd, &mid, sizeof(unsigned int)); - if(!liveHosts[findHost(mid)]) { -#ifdef DEBUG - printf("%s (REMOTE_RESTORE_DUPLICATED_STATE) -> already fixed\n",__func__); -#endif - break; - } - pthread_mutex_lock(&leaderFixing_mutex); - if(!leaderFixing) { - leaderFixing = 1; - pthread_mutex_unlock(&leaderFixing_mutex); - - restoreDuplicationState(mid); - // finish fixing - pthread_mutex_lock(&leaderFixing_mutex); - leaderFixing = 0; - pthread_mutex_unlock(&leaderFixing_mutex); - } - else { - pthread_mutex_unlock(&leaderFixing_mutex); -#ifdef DEBUG - printf("%s (REMOTE_RESTORE_DUPLICATED_STATE -> LEADER is already fixing\n",__func__); -#endif - sleep(WAIT_TIME); - } + send_data((int)acceptfd, &ctrl, sizeof(char)); break; #endif #ifdef RECOVERY case REQUEST_TRANS_WAIT: - receiveNewHostLists((int)acceptfd); - stopTransactions(TRANS_BEFORE); + { + recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int)); - response = RESPOND_TRANS_WAIT; - send_data((int)acceptfd,&response,sizeof(char)); -// respondToLeader(); + if(inspectEpoch(epoch_num) < 0) { + response = RESPOND_HIGHER_EPOCH; + send_data((int)acceptfd,&response,sizeof(char)); + } + else { + printf("Got new Leader! : %d\n",epoch_num); + currentEpoch = epoch_num; + stopTransactions(TRANS_BEFORE); + response = RESPOND_TRANS_WAIT; + send_data((int)acceptfd,&response,sizeof(char)); + sendMyList((int)acceptfd); + } + } break; - case RESPOND_TRANS_WAIT: - printf("control -> RESPOND_TRANS_WAIT\n"); - pthread_mutex_lock(&liveHosts_mutex); - numWaitMachine++; - pthread_mutex_unlock(&liveHosts_mutex); - printf("numWaitMachine = %d\n",numWaitMachine); - break; + case RELEASE_NEW_LIST: + printf("control -> RELEASE_NEW_LIST\n"); + { + recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int)); - case REQUEST_TRANS_LIST: - printf("control -> REQUEST_TRANS_LIST\n"); - sendTransList((int)acceptfd); - response = receiveTransList((int)acceptfd); - stopTransactions(TRANS_AFTER); - - send_data((int)acceptfd,&response,sizeof(char)); - break; + if(inspectEpoch(epoch_num) < 0) { + response = RESPOND_HIGHER_EPOCH; + } + else + { + response = receiveNewList((int)acceptfd); + stopTransactions(TRANS_AFTER); + } + send_data((int)acceptfd,&response,sizeof(char)); + } + break; case REQUEST_TRANS_RESTART: + + recv_data((int)acceptfd,&epoch_num,sizeof(char)); + + if(inspectEpoch(epoch_num) < 0) break; + pthread_mutex_lock(&liveHosts_mutex); + printf("RESTART!!!\n"); okCommit = TRANS_OK; pthread_mutex_unlock(&liveHosts_mutex); break; @@ -581,19 +557,22 @@ void *dstmAccept(void *acceptfd) { #endif #ifdef RECOVERY - case DUPLICATE_ORIGINAL: + case REQUEST_DUPLICATE: { struct sockaddr_in remoteAddr; int sd; + unsigned int epoch_num; -#ifdef DEBUG - printf("control -> DUPLICATE_ORIGINAL\n"); - printf("%s (DUPLICATE_ORIGINAL)-> Attempt to duplicate original objects\n", __func__); -#endif - //object store stuffffff - recv_data((int)acceptfd, &mid, sizeof(unsigned int)); + recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int)); - dupeptr = (char*) mhashGetDuplicate(&tempsize, 0); + if(inspectEpoch(epoch_num) < 0) { + break; + } + + //object store stuffffff + mid = getBackupMachine(myIpAddr); + + dupeptr = (char*) mhashGetDuplicate(&tempsize, 0); //send control and dupes after ctrl = RECEIVE_DUPES; @@ -609,23 +588,28 @@ void *dstmAccept(void *acceptfd) { remoteAddr.sin_addr.s_addr = htonl(mid); if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) { - printf("ORIGINAL ERROR : %s\n",strerror(errno)); - exit(0); + printf("REQUEST_DUPE ERROR : %s\n",strerror(errno)); +// exit(0); + break; } else { send_data(sd, &ctrl, sizeof(char)); send_data(sd, dupeptr, tempsize); + if((readDuplicateObjs(sd) )!= 0) { + printf("Fail in readDuplicateObj()\n"); + break; +// exit(0); + } recv_data(sd, &response, sizeof(char)); -#ifdef DEBUG - printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE); -#endif - if(response != DUPLICATION_COMPLETE) { + + if(response != DUPLICATION_COMPLETE) { #ifndef DEBUG - printf("%s(DUPLICATION_ORIGINAL) -> DUPLICATION FAIL\n",__func__); + printf("%s(REQUEST_DUPE) -> DUPLICATION FAIL\n",__func__); #endif //fail message - exit(0); + break; +// exit(0); } close(sd); @@ -634,72 +618,12 @@ void *dstmAccept(void *acceptfd) { ctrl = DUPLICATION_COMPLETE; send_data((int)acceptfd, &ctrl, sizeof(char)); - send_data((int)acceptfd, &tempsize, sizeof(unsigned int)); #ifdef DEBUG - printf("%s (DUPLICATE_ORIGINAL)-> Finished\n", __func__); + printf("%s (REQUEST_DUPE)-> Finished\n", __func__); #endif } break; - case DUPLICATE_BACKUP: - { - struct sockaddr_in remoteAddr; - int sd; -#ifdef DEBUG - printf("control -> DUPLICATE_BACKUP\n"); - printf("%s (DUPLICATE_BACKUP)-> Attempt to duplicate backup objects\n", __func__); -#endif - //object store stuffffff - recv_data((int)acceptfd, &mid, sizeof(unsigned int)); - - dupeptr = (char*) mhashGetDuplicate(&tempsize, 1); - - //send control and dupes after - ctrl = RECEIVE_DUPES; - - if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("BACKUP : "); - exit(0); - } - - bzero(&remoteAddr, sizeof(remoteAddr)); - remoteAddr.sin_family = AF_INET; - remoteAddr.sin_port = htons(LISTEN_PORT); - remoteAddr.sin_addr.s_addr = htonl(mid); - - if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) { - printf("BACKUP ERROR : %s\n",strerror(errno)); - exit(0); - } - else { - send_data(sd, &ctrl, sizeof(char)); - send_data(sd, dupeptr, tempsize); - - recv_data(sd, &response, sizeof(char)); -#ifdef DEBUG - printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE); -#endif - if(response != DUPLICATION_COMPLETE) { -#ifndef DEBUG - printf("%s(DUPLICATION_BACKUP) -> DUPLICATION FAIL\n",__func__); -#endif - exit(0); - } - - close(sd); - } - - free(dupeptr); - - ctrl = DUPLICATION_COMPLETE; - send_data((int)acceptfd, &ctrl, sizeof(char)); - send_data((int)acceptfd, &tempsize, sizeof(unsigned int)); -#ifdef DEBUG - printf("%s (DUPLICATE_BACKUP)-> Finished\n", __func__); -#endif - } - break; - case RECEIVE_DUPES: #ifdef DEBUG printf("control -> RECEIVE_DUPES sd : %d\n",(int)acceptfd); @@ -708,35 +632,18 @@ void *dstmAccept(void *acceptfd) { printf("Error: In readDuplicateObjs() %s, %d\n", __FILE__, __LINE__); pthread_exit(NULL); } + + dupeptr = (char*) mhashGetDuplicate(&tempsize, 1); + + send_data((int)acceptfd,dupeptr,tempsize); + free(dupeptr); ctrl = DUPLICATION_COMPLETE; send_data((int)acceptfd, &ctrl, sizeof(char)); #ifdef DEBUG printf("%s (RECEIVE_DUPES) -> Finished\n",__func__); #endif break; -#endif -#ifdef RECOVERY - case PAXOS_PREPARE: -#ifdef DEBUG - printf("control -> PAXOS_PREPARE\n"); -#endif - paxosPrepare_receiver((int)acceptfd); - break; - - case PAXOS_ACCEPT: -#ifdef DEBUG - printf("control -> PAXOS_ACCEPT\n"); -#endif - paxosAccept_receiver((int)acceptfd); - break; - - case PAXOS_LEARN: -#ifdef DEBUG - printf("control -> PAXOS_LEARN\n"); -#endif - leader = paxosLearn_receiver((int)acceptfd); - break; #endif default: printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__); @@ -1881,10 +1788,17 @@ void stopTransactions(int TRANS_FLAG) pthread_mutex_unlock(&clearNotifyList_mutex); } +void sendMyList(int acceptfd) +{ + pthread_mutex_lock(&liveHosts_mutex); + send_data((int)acceptfd,liveHosts,sizeof(int) * numHostsInSystem); + pthread_mutex_unlock(&liveHosts_mutex); + + sendTransList(acceptfd); +} + void sendTransList(int acceptfd) { - printf("%s -> Enter\n",__func__); - int size; char response; int transid; @@ -1892,11 +1806,11 @@ void sendTransList(int acceptfd) // send on-going transaction tlist_node_t* transArray = tlistToArray(transList,&size); - if(transList->size != 0) +/* if(transList->size != 0) tlistPrint(transList); printf("%s -> transList->size : %d size = %d\n",__func__,transList->size,size); - +*/ send_data((int)acceptfd,&size,sizeof(int)); send_data((int)acceptfd,transArray, sizeof(tlist_node_t) * size); @@ -1917,7 +1831,7 @@ void sendTransList(int acceptfd) free(transArray); } -int receiveTransList(int acceptfd) +int receiveNewList(int acceptfd) { int size; tlist_node_t* tArray; @@ -1925,7 +1839,15 @@ int receiveTransList(int acceptfd) int i; int flag = 1; char response; - + + // new host lists + pthread_mutex_lock(&liveHosts_mutex); + recv_data((int)acceptfd,liveHosts,sizeof(int)*numHostsInSystem); + pthread_mutex_unlock(&liveHosts_mutex); + + setLocateObjHosts(); + + // new transaction list recv_data((int)acceptfd,&size,sizeof(int)); @@ -1937,13 +1859,10 @@ int receiveTransList(int acceptfd) } recv_data((int)acceptfd,tArray,sizeof(tlist_node_t) * size); - flag = combineTransactionList(tArray,size); - free(tArray); } - if(flag == 1) { response = TRANS_OK; @@ -2005,6 +1924,7 @@ char inspectTransaction(char finalResponse,unsigned int transid,char* debug,int // if decision is not lost and okCommit is not TRANS_FLAG, get out of this loop while(!((tNode->decision != DECISION_LOST) && (okCommit != TRANS_FLAG))) { // printf("%s -> transID : %u decision : %d is waiting flag : %d\n",debug,tNode->transid,tNode->decision,TRANS_FLAG); +// sleep(3); randomdelay(); } diff --git a/Robust/src/Runtime/DSTM/interface_recovery/sockpool.c b/Robust/src/Runtime/DSTM/interface_recovery/sockpool.c index d96d8254..ea6ab489 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/sockpool.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/sockpool.c @@ -17,8 +17,6 @@ inline void UnLock(volatile unsigned int *addr) { : "=r" (oldval), "=m" (*(addr)) : "0" (0), "m" (*(addr))); } -#elif -# error need implementation of test_and_set #endif #define MAXSPINS 4 diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index edc9c1be..073daa5f 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -88,6 +88,8 @@ int sendRemoteReq = 0; int getResponse = 0; #ifdef RECOVERY + +#define INCREASE_EPOCH(x,y,z) ((x/y+1)*y + z) /*********************************** * Global variables for Duplication ***********************************/ @@ -111,13 +113,14 @@ char ip[16]; // for debugging purpose extern tlist_t* transList; extern pthread_mutex_t clearNotifyList_mutex; -extern unsigned int leader; +unsigned int currentEpoch; #ifdef RECOVERYSTATS int numRecovery = 0; recovery_stat_t* recoverStat; #endif + #endif void printhex(unsigned char *, int); @@ -359,8 +362,8 @@ int recv_data_errorcode(int fd, void *buf, int buflen) { if (numbytes==0) return 0; else if (numbytes == -1) { - printf("%s -> ERROR NUMBER = %d %s\n",__func__,errno,strerror(errno)); - perror("recv_data_errorcode"); +// printf("%s -> ERROR NUMBER = %d %s\n",__func__,errno,strerror(errno)); +// perror("recv_data_errorcode"); return -1; } bytesRecv += numbytes; @@ -529,7 +532,7 @@ int dstmStartup(const char * option) { updateLiveHosts(); setLocateObjHosts(); updateLiveHostsCommit(); - leader = paxos(hostIpAddrs,liveHosts,myIpAddr,numHostsInSystem,numLiveHostsInSystem); +// leader = paxos(hostIpAddrs,liveHosts,myIpAddr,numHostsInSystem,numLiveHostsInSystem); // printHostsStatus(); if(!allHostsLive()) { printf("Not all hosts live. Exiting.\n"); @@ -837,6 +840,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { objheader_t *objcopy; int size; + #ifdef DEBUG printf("%s-> Start, oid:%u\n", __func__, oid); #endif @@ -1220,6 +1224,7 @@ int transCommit() { tosend[sockindex].f.nummod = pile->nummod; tosend[sockindex].f.numcreated = pile->numcreated; tosend[sockindex].f.sum_bytes = pile->sum_bytes; + tosend[sockindex].f.epoch_num = currentEpoch; tosend[sockindex].listmid = listmid; tosend[sockindex].objread = pile->objread; tosend[sockindex].oidmod = pile->oidmod; @@ -1715,267 +1720,110 @@ void *getRemoteObj(unsigned int mnum, unsigned int oid) { return objcopy; } -#ifdef RECOVERY -/* ask machines if they received decision */ -char receiveDecisionFromBackup(unsigned int transID,int nummid,unsigned int *listmid) -{ -#ifdef DEBUG - printf("%s -> Entering\n",__func__); -#endif - - int sd; // socket id - int i; - char response; - - for(i = 0; i < nummid; i++) { - if((sd = getSockWithLock(transPrefetchSockPool, listmid[i])) < 0) { - printf("%s -> socket Error!!\n"); - } - else { - char control = ASK_COMMIT; - - send_data(sd,&control, sizeof(char)); - send_data(sd,&transID, sizeof(unsigned int)); - - // return -1 if it didn't receive the response - int timeout = recv_data(sd,&response, sizeof(char)); - - - if(timeout == 0 || response > 0) - break; // received response - - // else check next machine - freeSockWithLock(transPrefetchSockPool, listmid[i],sd); - } - } -#ifdef DEBUG - printf("%s -> response : %d\n",__func__,response); -#endif - - return (response==-1)?TRANS_ABORT:response; -} -#endif - #ifdef RECOVERY void notifyLeaderDeadMachine(unsigned int deadHost) { - int sd; - char ctrl; + + unsigned int epoch_num; if(!liveHosts[findHost(deadHost)]) { // if it is already fixed printf("%s -> already fixed\n",__func__); sleep(WAIT_TIME); return; } + + // increase epoch number by number machines in the system + pthread_mutex_lock(&recovery_mutex); + epoch_num = currentEpoch = INCREASE_EPOCH(currentEpoch,numHostsInSystem,myIndexInHostArray); + pthread_mutex_unlock(&recovery_mutex); - if(deadHost == leader) // if leader is dead, then pick a new leader - leader = paxos(hostIpAddrs,liveHosts,myIpAddr,numHostsInSystem,numLiveHostsInSystem); - -#ifdef DEBUG - printf("%s-> leader?:%s, me?:%s\n", __func__, midtoIPString(leader), (myIpAddr == leader)?"LEADER":"NOT LEADER"); -#endif - - if(leader == myIpAddr) { // if i am the leader - pthread_mutex_lock(&leaderFixing_mutex); - if(!leaderFixing) { - leaderFixing = 1; - pthread_mutex_unlock(&leaderFixing_mutex); - - if(!liveHosts[findHost(deadHost)]) { // if it is already fixed -#ifndef DEBUG - printf("%s -> already fixed\n",__func__); -#endif - pthread_mutex_lock(&leaderFixing_mutex); - leaderFixing =0; - pthread_mutex_unlock(&leaderFixing_mutex); - } - else { - restoreDuplicationState(deadHost); - - pthread_mutex_lock(&leaderFixing_mutex); - leaderFixing = 0; - pthread_mutex_unlock(&leaderFixing_mutex); - } - } - else { - pthread_mutex_unlock(&leaderFixing_mutex); -#ifndef DEBUG - printf("%s -> LEADER is already fixing\n",__func__); -#endif - sleep(WAIT_TIME); - } - } - else { // request leader to fix the situation - if((sd = getSockWithLock(transPrefetchSockPool, leader)) < 0) { - printf("%s -> socket create error\n",__func__); - exit(-1); - } - ctrl = REMOTE_RESTORE_DUPLICATED_STATE; - send_data(sd, &ctrl, sizeof(char)); - send_data(sd, &deadHost, sizeof(unsigned int)); - freeSockWithLock(transPrefetchSockPool,leader,sd); - printf("%s -> Message sent\n",__func__); - sleep(WAIT_TIME); - } + // notify all machines that this machien will act as leader. + // if return -1, then a machine that higher epoch_num started restoration + restoreDuplicationState(deadHost,epoch_num); } /* Leader's role */ -void restoreDuplicationState(unsigned int deadHost) +void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num) { printf("%s -> Entering\n",__func__); + int* sdlist; + tlist_t* tList; #ifdef RECOVERYSTATS printf("Recovery Start\n"); - numRecovery++; long long st; long long fi; + int flag = 0; unsigned int dupeSize = 0; // to calculate the size of backed up data st = myrdtsc(); // to get clock - recoverStat[numRecovery-1].deadMachine = deadHost; + recoverStat[numRecovery].deadMachine = deadHost; #endif - // update leader's live host list and object locations - updateLiveHostsList(deadHost); - setReLocateObjHosts(deadHost); - - // stop all transactions and update all other's machine list - notifyRestoration(); - - - // wait until all machines wait for leader - waitForAllMachine(); + + do { + sdlist = getSocketLists(); + printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num); + if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) { + break;; + } + + printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num); - // clear transaction - clearTransaction(); + if((flag = releaseNewLists(epoch_num,sdlist,tList)) < 0) { + break;; + } - // transfer lost objects - duplicateLostObjects(deadHost); + // transfer lost objects + if((flag= duplicateLostObjects(epoch_num,sdlist)) < 0) { + break; + } + // restart transactions + restartTransactions(epoch_num,sdlist); + }while(0); - // restart transactions - restartTransactions(); + freeSocketLists(sdlist); + if(flag < 0) { + printf("%s -> higher epoch\n",__func__); + while(okCommit != TRANS_OK) { +// printf("%s -> Waiting\n",__func__); + randomdelay(); + } + + }else { #ifdef RECOVERYSTATS fi = myrdtsc(); - recoverStat[numRecovery-1].elapsedTime = (fi-st)/CPU_FREQ; + recoverStat[numRecovery].elapsedTime = (fi-st)/CPU_FREQ; + numRecovery++; printRecoveryStat(); #endif - + } printf("%s -> Exiting\n",__func__); } -/* - 1. request all other machines to stop transactions - 2. update their live machine list - */ - -void notifyRestoration() +int* getSocketLists() { + struct sockaddr_in remoteAddr[numHostsInSystem]; + int* sdlist; + char request = RESPOND_LIVE; + char response; int i; int sd; - int sdlist[numHostsInSystem]; - - printHostsStatus(); - - pthread_mutex_lock(&liveHosts_mutex); - numWaitMachine = 0; - pthread_mutex_unlock(&liveHosts_mutex); - // for other machines - for(i = 0; i < numHostsInSystem; i++) { - if(liveHosts[i] != 1 || hostIpAddrs[i] == myIpAddr) { - sdlist[i] = -1; - continue; - } - - if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) - { - printf("%s -> socket create error\n",__func__); - exit(0); - } - else { - sdlist[i] = sd; - char request = REQUEST_TRANS_WAIT; - - send_data(sd, &request, sizeof(char)); - - /* send new host lists and object location */ - pthread_mutex_lock(&liveHosts_mutex); - send_data(sd, liveHosts, sizeof(int)*numHostsInSystem); - send_data(sd, locateObjHosts, sizeof(unsigned int)*numHostsInSystem*2); - pthread_mutex_unlock(&liveHosts_mutex); - } - } - - - for(i = 0 ; i < numHostsInSystem; i++) { - if(sdlist[i] != -1) - { - char response; - recv_data(sdlist[i],&response,sizeof(char)); - if(response == RESPOND_TRANS_WAIT) { - pthread_mutex_lock(&liveHosts_mutex); - numWaitMachine++; - pthread_mutex_unlock(&liveHosts_mutex); - } - - freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sdlist[i]); - } - } - /* stop all local transactions */ - stopTransactions(TRANS_BEFORE); -} - -/* acknowledge leader that all transactions are waiting */ -void respondToLeader() -{ - printf("%s -> Enter\n",__func__); - int sd; - if((sd = getSockWithLock(transPrefetchSockPool, leader)) < 0) { - printf("%s -> cannot open the socket\n",__func__); + if((sdlist = calloc(numHostsInSystem,sizeof(int))) == NULL) + { + printf("%s -> calloc error\n",__func__); exit(0); } - else { - char request = RESPOND_TRANS_WAIT; -// printf("%s -> request = %s\n sd = %d\n",__func__,(request==RESPOND_TRANS_WAIT)?"RESPOND_TRANS_WAIT":"NONO"); - send_data(sd,&request,sizeof(char)); - freeSockWithLock(transPrefetchSockPool,leader,sd); - } - - printf("%s -> Exit\n",__func__); - return; -} - -/* wait untill receive from all machine */ -void waitForAllMachine() -{ - - pthread_mutex_lock(&liveHosts_mutex); - numWaitMachine++; // for local. It is done - pthread_mutex_unlock(&liveHosts_mutex); - - - /* wait untill receive from all machine */ - while(numWaitMachine < numLiveHostsInSystem) { - sleep(1); - } -} - -void clearTransaction() -{ - int size; - tlist_t* tlist; - int sd; - struct sockaddr_in remoteAddr[numHostsInSystem]; - int sdlist[numHostsInSystem]; - int i; // open sockets to all live machines for(i = 0 ; i < numHostsInSystem; i++) { - if(liveHosts[i] == 1 && hostIpAddrs[i] != myIpAddr) { + if(liveHosts[i] == 1) { if((sd = socket(AF_INET , SOCK_STREAM, 0 )) < 0) { - printf("%s -> socket create Error\n",__func__); + sdlist[i] = -1; + liveHosts[i] = 0; } else { bzero(&remoteAddr[i], sizeof(remoteAddr[i])); @@ -1985,49 +1833,72 @@ void clearTransaction() // printf("%s -> open sd : %d to %s\n",__func__,sd,midtoIPString(hostIpAddrs[i])); if(connect(sd, (struct sockaddr *)&remoteAddr[i], sizeof(remoteAddr[i])) < 0) { - printf("%s -> socket connect error\n",__func__); - exit(0); + sdlist[i] = -1; + liveHosts[i] = 0; } else { - sdlist[i] = sd; + send_data(sd,&request,sizeof(char)); + + recv_data(sd,&response,sizeof(char)); + + if(response == LIVE) { + sdlist[i] = sd; + liveHosts[i] = 1; + } } } } else { + liveHosts[i] = 0; sdlist[i] = -1; } } + + return sdlist; +} - /* receive transaction lists from all machines and - clarefy all decisions - returns an array of ongoing transactions */ - makeTransactionLists(&tlist,sdlist); - - - /* release the cleared decisions to all machines */ - releaseTransactionLists(tlist,sdlist); - +void freeSocketLists(int* sdlist) +{ + int i; for(i = 0 ; i < numHostsInSystem; i++) { if(sdlist[i] != -1) { close(sdlist[i]); } } - tlistDestroy(tlist); - printf("%s -> End\n",__func__); + free(sdlist); } -// after this fuction -// leader knows all the on-going transaction list and their decisions -void makeTransactionLists(tlist_t** tlist,int* sdlist) +// stop transactions, receive translists, live machine lists. +int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList) { + printf("%s -> Enter\n",__func__); - int sd; int i; - tlist_t* currentTransactionList = tlistCreate(); + char request; + char response; + tlist_t* currentTransactionList; + + if(inspectEpoch(epoch_num) < 0) { + printf("%s -> Higher Epoch\n",__func__); + return -1; + } + + currentTransactionList = tlistCreate(); + + // request remote amchines to stop all transactions + for(i = 0; i < numHostsInSystem; i++) + { + if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr) + continue; + + request = REQUEST_TRANS_WAIT; + send_data(sdlist[i],&request, sizeof(char)); + send_data(sdlist[i],&epoch_num,sizeof(unsigned int)); + } - printf("%s -> tlist size : %d\n",__func__,transList->size); - printf("%s -> currentTransactionList size : %d\n",__func__,currentTransactionList->size); + /* stop all local transactions */ + stopTransactions(TRANS_BEFORE); // grab leader's transaction list first @@ -2039,132 +1910,94 @@ void makeTransactionLists(tlist_t** tlist,int* sdlist) walker = walker->next; } - // receive others transaction list - for(i = 0; i < numHostsInSystem;i ++) { - if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr) { - char request = REQUEST_TRANS_LIST; - int size; - int j; - tlist_node_t* transArray; - tlist_node_t* tmp; - - sd = sdlist[i]; - - // send request - send_data(sd, &request, sizeof(char)); - - // receive all on-going transaction list - recv_data(sd, &size, sizeof(int)); - - printf("%s -> %s - size : %d\n",__func__,midtoIPString(hostIpAddrs[i]),size); - if((transArray = calloc(size, sizeof(tlist_node_t))) == NULL) { - printf("%s -> calloc error\n",__func__); - exit(0); - } - - recv_data(sd,transArray, sizeof(tlist_node_t) * size); + for(i = 0; i < numHostsInSystem; i++) + { + if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr) + continue; - // add into currentTransactionList - for(j = 0 ; j < size; j ++) { - tmp = tlistSearch(currentTransactionList,transArray[j].transid); - - if(tmp == NULL) { - tlist_node_t* tNode = &transArray[j]; - tNode->status = TRANS_OK; - currentTransactionList = tlistInsertNode2(currentTransactionList,&(transArray[j])); - } - else { - if((tmp->decision != TRANS_COMMIT && tmp->decision != TRANS_ABORT) - && (transArray[j].decision == TRANS_COMMIT || transArray[j].decision == TRANS_ABORT)) - { - tmp->decision = transArray[j].decision; - } - } - } // j loop + recv_data(sdlist[i],&response,sizeof(char)); - free(transArray); + if(response == RESPOND_TRANS_WAIT) + { + // receive live host list + computeLiveHosts(sdlist[i]); + // receive transaction list + makeTransactionLists(¤tTransactionList,sdlist[i]); } - } // i loop - - printf("Before\n"); - tlistPrint(currentTransactionList); - - // current transaction list is completed - // now see if any transaction is still missing + else if(response == RESPOND_HIGHER_EPOCH) + { + tlistDestroy(currentTransactionList); + return -1; + } + else { + printf("%s -> no response mid : %s\n",__func__,midtoIPString(hostIpAddrs[i])); + liveHosts[i] = 0; + sdlist[i] = -1; + } + } + walker = currentTransactionList->head; while(walker) { -// if(walker->decision == DECISION_LOST) { - for(i = 0 ; i < numHostsInSystem; i++) { - if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr) - { - char request = REQUEST_TRANS_CHECK; - char respond; - - send_data(sdlist[i], &request, sizeof(char)); - send_data(sdlist[i], &(walker->transid), sizeof(unsigned int)); - - recv_data(sdlist[i], &respond, sizeof(char)); - - if(respond > 0) - { - walker->decision = respond; - break; - } - } - else if(hostIpAddrs[i] == myIpAddr) - { - char decision = checkDecision(walker->transid); - if(decision > 0) { - walker->decision = decision; - break; - } - } - } // i loop - - if(walker->decision == DECISION_LOST) { - printf("%s -> No one knows decision for transID : %u\n",__func__,walker->transid); - walker->decision = TRANS_ABORT; - } - if(walker->decision == TRYING_TO_COMMIT) { - printf("%s -> no decision yet transID : %u\n",__func__,walker->transid); - } + if(walker->decision == DECISION_LOST) { + printf("%s -> No one knows decision for transID : %u\n",__func__,walker->transid); + walker->decision = TRANS_ABORT; + } walker = walker->next; - } // while loop + } - printf("%s -> currentTransactionList size : %d\n",__func__,currentTransactionList->size); + tlistPrint(currentTransactionList); + *tList = currentTransactionList; - for(i = 0; i < numHostsInSystem; i++) { - if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr) + printf("%s -> Exit\n",__func__); + return 0; +} + +void computeLiveHosts(int sd) +{ + int receivedHost[numHostsInSystem]; + int i; + + recv_data(sd,receivedHost,sizeof(int)*numHostsInSystem); + + for(i = 0 ; i < numHostsInSystem;i ++) + { + if(liveHosts[i] == 1 && receivedHost[i] == 1) { - char request = REQUEST_TRANS_COMPLETE; - send_data(sdlist[i], &request,sizeof(char)); + liveHosts[i] = 1; } + else + liveHosts[i] = 0; } - - *tlist = currentTransactionList; - printf("\n\nAfter\n"); - tlistPrint(currentTransactionList); - - printf("%s -> End\n",__func__); + + return; } -// send out current on-going transaction -void releaseTransactionLists(tlist_t* tlist,int* sdlist) +int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist) { - printf("%s -> Enter\n",__func__); - int size; - tlist_node_t* tArray = tlistToArray(tlist,&size); int i; - char response; + char response = RELEASE_NEW_LIST; + int size; int flag; - - printf("%s -> size : %d\n",__func__,size); + tlist_node_t* tArray; + + + if(inspectEpoch(epoch_num) < 0) return -1; + + tArray = tlistToArray(tlist,&size); for(i = 0; i < numHostsInSystem; i++) { if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr) { + send_data(sdlist[i],&response,sizeof(char)); + send_data(sdlist[i],&epoch_num,sizeof(unsigned int)); + + // new host list + pthread_mutex_lock(&liveHosts_mutex); + send_data(sdlist[i],liveHosts,sizeof(int) * numHostsInSystem); + pthread_mutex_unlock(&liveHosts_mutex); + if(size == 0) { size = -1; send_data(sdlist[i],&size,sizeof(int)); @@ -2175,18 +2008,19 @@ void releaseTransactionLists(tlist_t* tlist,int* sdlist) } } else { + setLocateObjHosts(); +// printHostsStatus(); flag = combineTransactionList(tArray,size); if(flag == 0) { printf("%s -> problem\n",__func__); exit(0); } - -// okCommit = TRANS_AFTER; stopTransactions(TRANS_AFTER); } } - + + printf("%s -> After sending msg\n",__func__); if(size > 0) free(tArray); @@ -2195,44 +2029,115 @@ void releaseTransactionLists(tlist_t* tlist,int* sdlist) { // printf("%s -> Waiting for %s\n",__func__,midtoIPString(hostIpAddrs[i])); recv_data(sdlist[i], &response, sizeof(char)); - if(response != TRANS_OK) { printf("%s -> response : %d Need to fix\n",__func__,response); } + else if(response == RESPOND_HIGHER_EPOCH) + { + printf("%s -> Higher epoch!\n",__func__); + return -1; + } } } - + tlistDestroy(tlist); printf("%s -> End\n",__func__); } -void restartTransactions() +// after this fuction +// leader knows all the on-going transaction list and their decisions +void makeTransactionLists(tlist_t** tlist,int sd) +{ + tlist_node_t* transArray; + tlist_node_t* tmp; + tlist_node_t* walker; + int j; + int size; + + // receive all on-going transaction list + recv_data(sd, &size, sizeof(int)); + + if((transArray = calloc(size, sizeof(tlist_node_t))) == NULL) { + printf("%s -> calloc error\n",__func__); + exit(0); + } + + recv_data(sd,transArray, sizeof(tlist_node_t) * size); + + // add into currentTransactionList + for(j = 0 ; j < size; j ++) { + tmp = tlistSearch(*tlist,transArray[j].transid); + + if(tmp == NULL) { + tlist_node_t* tNode = &transArray[j]; + tNode->status = TRANS_OK; + *tlist = tlistInsertNode2(*tlist,&(transArray[j])); + } + else { + if(tmp->decision == DECISION_LOST && transArray[j].decision != DECISION_LOST) + { + tmp->decision = transArray[j].decision; + } + + } + } // j loop + + free(transArray); + + // current transaction list is completed + // now see if any transaction is still missing + walker = (*tlist)->head; + char request = REQUEST_TRANS_CHECK; + char respond; + + while(walker) { + send_data(sd, &request, sizeof(char)); + send_data(sd, &(walker->transid), sizeof(unsigned int)); + + recv_data(sd, &respond, sizeof(char)); + + if(respond > 0) + { + walker->decision = respond; + break; + } + walker = walker->next; + } // while loop + + request = REQUEST_TRANS_COMPLETE; + send_data(sd, &request,sizeof(char)); + + return; +} + +void restartTransactions(unsigned int epoch_num,int* sdlist) { int i; int sd; + char request; + char response; + for(i = 0; i < numHostsInSystem; i++) { - if(hostIpAddrs[i] == myIpAddr) { - pthread_mutex_lock(&liveHosts_mutex); - okCommit = TRANS_OK; - pthread_mutex_unlock(&liveHosts_mutex); + if(sdlist[i] == -1) continue; - } - if(liveHosts[i] == 1) - { - if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) - { - printf("%s -> socket create error sd : %d\n",__func__,sd); - exit(0); - } - else { - char request = REQUEST_TRANS_RESTART; - send_data(sd, &request, sizeof(char)); + printf("%s -> request to %s\n",__func__,midtoIPString(hostIpAddrs[i])); + request = REQUEST_TRANS_RESTART; + send_data(sdlist[i], &request, sizeof(char)); + send_data(sdlist[i], &epoch_num,sizeof(char)); + } +} - freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd); - } - } +int inspectEpoch(unsigned int epoch_num) +{ + int flag = 1; + pthread_mutex_lock(&recovery_mutex); + if(epoch_num < currentEpoch) { + flag = -1; } + pthread_mutex_unlock(&recovery_mutex); + + return flag; } #endif @@ -3050,6 +2955,7 @@ int processConfigFile() { myIndexInHostArray = findHost(myIpAddr); #ifdef RECOVERY liveHosts[myIndexInHostArray] = 1; + currentEpoch = myIndexInHostArray; #ifdef RECOVERYSTATS numRecovery = 0; @@ -3245,98 +3151,24 @@ int updateLiveHostsCommit() { #ifdef RECOVERY void setLocateObjHosts() { - int i = 0, validIndex = 0; + int i,validIndex; //check num hosts even valid first - - for(i = 0;i < numHostsInSystem; i++) { - + for(i = 0,validIndex = numHostsInSystem;i < numHostsInSystem; i++,validIndex = numHostsInSystem) { while(liveHosts[(i+validIndex)%numHostsInSystem] == 0) { - validIndex++; + validIndex--; } locateObjHosts[i*2] = hostIpAddrs[(i+validIndex)%numHostsInSystem]; -#ifdef DEBUG - printf("%s-> locateObjHosts[%d]:%s\n", __func__, i*2, midtoIPString(locateObjHosts[(i*2)])); -#endif - - validIndex++; + + validIndex = numHostsInSystem + 1; while(liveHosts[(i+validIndex)%numHostsInSystem] == 0) { validIndex++; } -#ifdef DEBUG - printf("%s-> validIndex:%d, this mid is: [%s]\n", __func__, validIndex, midtoIPString(hostIpAddrs[(i+validIndex)%numHostsInSystem])); -#endif - locateObjHosts[(i*2)+1] = hostIpAddrs[(i+validIndex)%numHostsInSystem]; - validIndex=0; - -#ifdef DEBUG - printf("%s-> locateObjHosts[%d]:%s\n", __func__, i*2+1, midtoIPString(locateObjHosts[(i*2)+1])); -#endif + + locateObjHosts[(i*2)+1] = hostIpAddrs[(i+validIndex)%numHostsInSystem]; } } -// check the passed machine if it is still alive -void updateLiveHostsList(int mid) -{ - int mIndex = findHost(mid); - int sd; - - if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[mIndex])) < 0) { - liveHosts[mIndex] = 0; - numLiveHostsInSystem--; - return; - } - - char liverequest = RESPOND_LIVE; - - send_data(sd, &liverequest, sizeof(char)); - - char response = 0; - int timeout = recv_data(sd, &response, sizeof(char)); - - if(response != LIVE) { - liveHosts[mIndex] = 0; - numLiveHostsInSystem--; - } - - freeSockWithLock(transPrefetchSockPool,hostIpAddrs[mIndex],sd); - return; - -} - -// rearrange object location array of leader machine -void setReLocateObjHosts(int mid) -{ - int mIndex = findHost(mid); - int backupMachine = getBackupMachine(mid); - int newPrimary = getDuplicatedPrimaryMachine(mid); - int newPrimaryIndex = findHost(newPrimary); - int i; - - /* duplicateLostObject example - * Before M24 die, - * MID 21 24 26 - * Primary 21 24 26 - * Backup 26 21 24 - * After M24 die, - * MID 21 26 - * Primary 21,24 26 - * Backup 26 21,24 - */ - - locateObjHosts[2*newPrimaryIndex+1] = backupMachine; - locateObjHosts[2*mIndex] = newPrimary; - -/* relocate the objects of the machines already dead */ - for(i=0; i Enter\n",__func__); -#ifdef RECOVERYSTATS - unsigned int dupeSize = 0; -#endif - - //this needs to be changed. - unsigned int backupMid = getBackupMachine(mid); // get backup machine of dead machine - unsigned int originalMid = getPrimaryMachine(mid); // get primary machine that used deadmachine as backup machine. - -#ifdef DEBUG - printHostsStatus(); -#endif - - - //connect to these machines - //go through their object store copying necessary (in a transaction) - int i, j, tmpNumLiveHosts = 0; - int psd,bsd; - /* duplicateLostObject example * Before M24 die, * MID 21 24 26 @@ -3396,164 +3213,33 @@ void duplicateLostObjects(unsigned int mid){ * Backup 26 21,24 */ - if(((psd = getSockWithLock(transPrefetchSockPool, originalMid)) < 0 ) || - ((bsd = getSockWithLock(transPrefetchSockPool,backupMid)) <0)) { - - printf("%s -> psd : %d bsd : %d\n",__func__,psd,bsd); - printf("%s -> Socket create error\n",__func__); - - while(1) - sleep(10); - } - -/* request for original */ - char duperequest; - duperequest = DUPLICATE_ORIGINAL; - send_data(psd, &duperequest, sizeof(char)); - send_data(psd, &backupMid, sizeof(unsigned int)); - -/* request for backup */ - duperequest = DUPLICATE_BACKUP; - send_data(bsd, &duperequest, sizeof(char)); - send_data(bsd, &originalMid, sizeof(unsigned int)); - - char p_response,b_response; - unsigned int p_receivedSize,b_receivedSize; + if(inspectEpoch(epoch_num) < 0) return -1; - recv_data(psd, &p_response, sizeof(char)); - recv_data(psd, &p_receivedSize, sizeof(unsigned int)); - -#ifdef RECOVERYSTATS - dupeSize += p_receivedSize; // size of primary data -#endif + response = REQUEST_DUPLICATE; - recv_data(bsd, &b_response, sizeof(char)); - recv_data(bsd, &b_receivedSize, sizeof(unsigned int)); - -#ifdef RECOVERYSTATS - dupeSize += b_receivedSize; // size of backup data -#endif - - if(p_response != DUPLICATION_COMPLETE || b_response != DUPLICATION_COMPLETE) - { - printf("%s -> Duplication Fail\n",__func__); - exit(0); + for(i = 0 ; i < numHostsInSystem; i ++) { + if(sdlist[i] == -1) + continue; + send_data(sdlist[i],&response,sizeof(char)); + send_data(sdlist[i],&epoch_num,sizeof(unsigned int)); } + + for(i = 0 ; i < numHostsInSystem; i ++) { + if(sdlist[i] == -1) + continue; + recv_data(sdlist[i],&response,sizeof(char)); - freeSockWithLock(transPrefetchSockPool, originalMid, psd); - freeSockWithLock(transPrefetchSockPool, backupMid, bsd); - -#ifdef RECOVERYSTATS - recoverStat[numRecovery-1].recoveredData = dupeSize; -#endif - -#ifndef DEBUG - printf("%s-> End\n", __func__); -#endif -} - -unsigned int duplicateLocalBackupObjects(unsigned int mid) { - int sd; - unsigned int tempsize; - int i; - char *dupeptr, ctrl, response; -#ifdef DEBUG - printf("%s-> Start; backup mid:%s\n", __func__, midtoIPString(mid)); -#endif - - //copy code from dstmserver here - dupeptr = (char*) mhashGetDuplicate(&tempsize, 1); - -#ifdef DEBUG - printf("tempsize:%d, dupeptrfirstvalue:%d\n", tempsize, *((unsigned int *)(dupeptr))); -#endif - //send control and dupes after - ctrl = RECEIVE_DUPES; - if((sd = getSockWithLock(transPrefetchSockPool, mid)) < 0) { - printf("duplicatelocalbackup: socket create error\n"); - //usleep(1000); - } -#ifdef DEBUG - printf("%s -> sd:%d, tempsize:%d, dupeptrfirstvalue:%d\n", __func__,sd, tempsize, *((unsigned int *)(dupeptr))); -#endif - send_data(sd, &ctrl, sizeof(char)); - send_data(sd, dupeptr, tempsize); - - recv_data(sd, &response, sizeof(char)); - freeSockWithLock(transPrefetchSockPool,mid,sd); - -#ifdef DEBUG - printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE); -#endif - - if(response != DUPLICATION_COMPLETE) { -#ifdef DEBUG - printf("%s -> DUPLICATION_FAIL\n",__func__); -#endif - exit(-1); - } - free(dupeptr); + if(response != DUPLICATION_COMPLETE) { + printf("%s -> fail!\n",__func__); + exit(0); + } + } #ifndef DEBUG printf("%s-> End\n", __func__); #endif - - return tempsize; - -} - -unsigned int duplicateLocalOriginalObjects(unsigned int mid) { - int sd; - unsigned int tempsize; - char *dupeptr, ctrl, response; - -#ifdef DEBUG - printf("%s-> Start\n", __func__); -#endif - //copy code fom dstmserver here - - dupeptr = (char*) mhashGetDuplicate(&tempsize, 0); - - //send control and dupes after - ctrl = RECEIVE_DUPES; - - if((sd = getSockWithLock(transPrefetchSockPool, mid)) < 0) { - printf("DUPLICATE_ORIGINAL: socket create error\n"); - //usleep(1000); - } -#ifdef DEBUG - printf("sd:%d, tempsize:%d, dupeptrfirstvalue:%d\n", sd, tempsize, *((unsigned int *)(dupeptr))); -#endif - - send_data(sd, &ctrl, sizeof(char)); - send_data(sd, dupeptr, tempsize); - - recv_data(sd, &response, sizeof(char)); - freeSockWithLock(transPrefetchSockPool,mid,sd); - -#ifdef DEBUG - printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE); -#endif - - if(response != DUPLICATION_COMPLETE) { - //fail message -#ifdef DEBUG - printf("%s -> DUPLICATION_FAIL\n",__func__); -#endif - exit(0); - } - - free(dupeptr); - -#ifdef DEBUG - printf("%s-> End\n", __func__); -#endif - return tempsize; - } - #endif - void addHost(unsigned int hostIp) { unsigned int *tmpArray; int *tmpliveHostsArray; -- 2.34.1