From e893f52ab14312061c55944bb7be61518b8e8b70 Mon Sep 17 00:00:00 2001 From: jihoonl Date: Sat, 3 Oct 2009 07:45:04 +0000 Subject: [PATCH] modifies getTypeObj() fix in sortPiles() --- .../DSTM/interface_recovery/dstmserver.c | 7 +- .../Runtime/DSTM/interface_recovery/trans.c | 468 ++++++++---------- 2 files changed, 217 insertions(+), 258 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index 59e4aea6..3025d42e 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -33,7 +33,6 @@ extern unsigned int *hostIpAddrs; #ifdef RECOVERY extern unsigned int *locateObjHosts; extern int *liveHosts; -extern int liveHostsValid; extern int numLiveHostsInSystem; int clearNotifyListFlag; #endif @@ -321,6 +320,7 @@ void *dstmAccept(void *acceptfd) { recv_data((int)acceptfd, &oid, sizeof(unsigned int)); while((srcObj = mhashSearch(oid)) == NULL) { int ret; +// printf("HERE!!\n"); if((ret = sched_yield()) != 0) { printf("%s(): error no %d in thread yield\n", __func__, errno); } @@ -507,7 +507,6 @@ void *dstmAccept(void *acceptfd) { #ifdef DEBUG printf("control -> RESPOND_LIVE\n"); #endif - liveHostsValid = 0; ctrl = LIVE; send_data((int)acceptfd, &ctrl, sizeof(ctrl)); #ifdef DEBUG @@ -563,7 +562,6 @@ void *dstmAccept(void *acceptfd) { recv_data((int)acceptfd, liveHosts, sizeof(int)*numHostsInSystem); recv_data((int)acceptfd, locateObjHosts, sizeof(unsigned int)*numHostsInSystem*2); pthread_mutex_unlock(&liveHosts_mutex); - liveHostsValid = 1; numLiveHostsInSystem = getNumLiveHostsInSystem(); #ifdef DEBUG printHostsStatus(); @@ -1406,9 +1404,6 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock printf("Error: mhashsearch returns NULL at dstmserver.c %d\n", __LINE__); return 1; #else -#ifdef DEBUG - printf("DEBUG->*backup* i:%d, nummod:%d\n", i, nummod); -#endif header = (objheader_t *)(modptr+offset); header->version += 1; header->isBackup = 1; diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index d91bba49..b8f69dd2 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -1,3 +1,5 @@ +#include "dstm.h" +#include "ip.h" #include "machinepile.h" #include "mlookup.h" #include "llookup.h" @@ -53,9 +55,6 @@ unsigned int *hostIpAddrs; int sizeOfHostArray; int numHostsInSystem; int myIndexInHostArray; -int waitThreadMid; -unsigned int waitThreadID; - unsigned int oidsPerBlock; unsigned int oidMin; unsigned int oidMax; @@ -79,25 +78,28 @@ int nSoftAbort = 0; int bytesSent = 0; int bytesRecv = 0; int totalObjSize = 0; +int sendRemoteReq = 0; +int getResponse = 0; #ifdef RECOVERY /*********************************** * Global variables for Duplication ***********************************/ int *liveHosts; -int liveHostsValid; int numLiveHostsInSystem; -int flipBit; // Used to distribute requests between primary and backup evenly unsigned int *locateObjHosts; -#endif + + +/* variables to clear dead threads */ +int waitThreadMid; +unsigned int waitThreadID; int transRetryFlag; -unsigned int transIDMax; unsigned int transIDMin; -unsigned int transIDIndex; -char ip[16]; +unsigned int transIDMax; + +char ip[16]; // for debugging purpose -#ifdef RECOVERY /****************************** * Global variables for Paxos ******************************/ @@ -115,6 +117,17 @@ void printhex(unsigned char *, int); plistnode_t *createPiles(); plistnode_t *sortPiles(plistnode_t *pileptr); +#ifdef LOGEVENTS +char bigarray[16*1024*1024]; +int bigindex=0; +#define LOGEVENT(x) { \ + int tmp=bigindex++; \ + bigarray[tmp]=x; \ + } +#else +#define LOGEVENT(x) +#endif + /******************************* * Send and Recv function calls *******************************/ @@ -124,21 +137,20 @@ int send_data(int fd, void *buf, int buflen) { int numbytes; while (size > 0) { - #ifdef GDBDEBUG GDBSEND1: #endif numbytes = send(fd, buffer, size, 0); - if( numbytes>0) { + if( numbytes > 0) { bytesSent += numbytes; size -= numbytes; } #ifdef RECOVERY - else if( numbytes < 0) { + else if( numbytes < 0) { // Receive returned an error. // Analyze underlying cause -#ifndef DEBUG +#ifdef DEBUG printf("%s -> fd : %d errno = %d %s\n",__func__, fd, errno,strerror(errno)); fflush(stdout); #endif @@ -155,11 +167,10 @@ GDBSEND1: return -1; } else { #ifdef GDBDEBUG - if(errno == 4) - goto GDBSEND1; + if(errno == 4) + goto GDBSEND1; #endif - #ifdef DEBUG printf("%s -> Unexpected ERROR!\n",__func__); #endif @@ -169,21 +180,21 @@ GDBSEND1: else{ // Case : numbytes == 0 // // machine has failed -- this case probably doesn't occur in reality - // - - - #ifdef DEBUG printf("%s -> SHOULD NOT BE HERE\n",__func__); #endif return -1; } +#else + if(numbytes == -1) { + perror("send"); + exit(0); + } #endif } // close while loop #ifdef DEBUG printf("%s-> Exiting\n", __func__); #endif - return 0; // completed sending data } @@ -254,6 +265,11 @@ GDBRECV1: #endif return -1; } +#else + if( numbytes == -1) { + perror("recv"); + exit(0); + } #endif } //close while loop #ifdef DEBUG @@ -281,7 +297,6 @@ int recv_data_errorcode(int fd, void *buf, int buflen) { perror("recv_data_errorcode"); return -1; } - buffer += numbytes; size -= numbytes; } @@ -321,10 +336,12 @@ inline int findmax(int *array, int arraylength) { return max; } +#ifdef RECOVERY char* midtoIPString(unsigned int mid){ midtoIP(mid, ip); return ip; } +#endif /* This function is a prefetch call generated by the compiler that * populates the shared primary prefetch queue*/ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) { @@ -397,6 +414,7 @@ int dstmStartup(const char * option) { setLocateObjHosts(); updateLiveHostsCommit(); paxos(); + printHostsStatus(); if(!allHostsLive()) { printf("Not all hosts live. Exiting.\n"); exit(-1); @@ -692,6 +710,10 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { /* Insert into cache's lookup table */ STATUS(objcopy)=0; t_chashInsert(OID(objheader), objcopy); +#ifdef DEBUG + printf("%s -> obj type = %d\n",__func__,getObjType(oid)); + printf("%s -> obj grabbed\n",__func__); +#endif #ifdef COMPILER return &objcopy[1]; #else @@ -699,8 +721,9 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { #endif } else { #ifdef CACHE - , TYPE(header)if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { + if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { #ifdef TRANSSTATS + LOGEVENT('P') nprehashSearch++; #endif /* Look up in prefetch cache */ @@ -718,30 +741,29 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { } #endif /* Get the object from the remote location */ - + if((machinenumber = lhashSearch(oid)) == 0) { + printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__); + return NULL; + } #ifdef DEBUG printf("%s-> Grab from remote machine\n", __func__); #endif #ifdef RECOVERY transRetryFlag = 0; - unsigned int mindex = findHost(lhashSearch(oid)); - machinenumber = locateObjHosts[2*mindex+flipBit]; - - if(numLiveHostsInSystem > 1) - flipBit ^= 1; - else - flipBit = 0; + + unsigned int machinenumber; + static int flipBit = 0; // Used to distribute requests between primary and backup evenly + // either primary or backup machine + machinenumber = (flipBit)?getPrimaryMachine(lhashSearch(oid)):getBackupMachine(lhashSearch(oid)); + flipBit ^= 1; #ifdef DEBUG printf("mindex:%d, oid:%d, machinenumber:%s\n", mindex, oid, midtoIPString(machinenumber)); #endif -#else - if((machinenumber = lhashSearch(oid)) == 0) { - printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__); - return NULL; - } #endif + objcopy = getRemoteObj(machinenumber, oid); + #ifdef RECOVERY if(transRetryFlag) { restoreDuplicationState(machinenumber); @@ -751,13 +773,13 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { return transRead2(oid); } #endif - } if(objcopy == NULL) { printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); return NULL; } else { #ifdef TRANSSTATS + LOGEVENT('R'); nRemoteSend++; #endif #ifdef COMPILER @@ -766,9 +788,11 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { return objcopy; #endif } + } #ifdef DEBUG printf("%s -> Finished!!\n",__func__); #endif + } /* This function creates objects in the transaction record */ @@ -793,6 +817,10 @@ objheader_t *transCreateObj(unsigned int size) { /* This function creates machine piles based on all machines involved in a * transaction commit request */ plistnode_t *createPiles() { + +#ifdef DEBUG + printf("%s -> Entering\n",__func__); +#endif int i; unsigned int oid; plistnode_t *pile = NULL; @@ -814,47 +842,30 @@ plistnode_t *createPiles() { #if RECOVERY oid = OID(headeraddr); -#ifdef DEBUG - printf("%s-> oid:%u, version:%d, status:%d, type:%d\n", __func__, OID(headeraddr), headeraddr->version, STATUS(headeraddr), TYPE(headeraddr)); - if (STATUS(headeraddr) & NEW) { // new/local object - printf("%s-> new/local object\n", __func__); - } - else if ((mhashSearch(curr->key) != NULL)) { //local/nonnew - if(STATUS(headeraddr) & DIRTY) { // modified - printf("%s-> old/local/mod object\n", __func__); - } - else { //read - printf("%s-> old/local/read object\n", __func__); - } - } - else if ((machinenum = lhashSearch(curr->key)) != 0) { // remote/nonnew object - if(STATUS(headeraddr) & DIRTY) { //modified - printf("%s-> remote/local/mod object\n", __func__); - } - else { //read - printf("%s-> remote/local/read object\n", __func__); - } - } - else { - printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); - return NULL; - } - unsigned int pmid = getPrimaryMachine(lhashSearch(oid)); - unsigned int bmid = getBackupMachine(lhashSearch(oid)); - printf("%s-> Primary Machine: [%s], ", __func__, midtoIPString(pmid)); - printf("Backup Machine: [%s]\n", midtoIPString(bmid)); -#endif - int makedirty = 0; - if(STATUS(headeraddr) & DIRTY || STATUS(headeraddr) & NEW) { - makedirty = 1; - } - pile = pInsert(pile, headeraddr, getPrimaryMachine(lhashSearch(oid)), c_numelements); -//problem here - if(makedirty) { - STATUS(headeraddr) = DIRTY; - } - pile = pInsert(pile, headeraddr, getBackupMachine(lhashSearch(oid)), c_numelements); + int makedirty = 0; + unsigned int mid; + + mid = lhashSearch(oid); + + // if the obj is dirty or new + if(STATUS(headeraddr) & DIRTY || STATUS(headeraddr) & NEW) { + // set flag for backup machine + makedirty = 1; + } + + // if the obj is new or local, destination will be my Ip + if((mid = lhashSearch(oid)) == 0) { + mid = myIpAddr; + } + + pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements); + + if(makedirty) { + STATUS(headeraddr) = DIRTY; + } + + pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); #else // Get machine location for object id (and whether local or not) if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) { @@ -919,9 +930,12 @@ int transCommit() { int firsttime=1; trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */ char finalResponse; + +#ifdef RECOVERY int deadsd = -1; int deadmid = -1; unsigned int transID = getNewTransID(); +#endif #ifdef DEBUG printf("%s -> Starts transCommit\n",__func__); @@ -964,7 +978,7 @@ int transCommit() { /* Create a list of machine ids(Participants) involved in transaction */ listmid = calloc(pilecount, sizeof(unsigned int)); pListMid(pile, listmid); - + /* Create a socket and getReplyCtrl array, initialize */ int socklist[pilecount]; int loopcount; @@ -989,15 +1003,14 @@ int transCommit() { tosend[sockindex].f.numread = pile->numread; tosend[sockindex].f.nummod = pile->nummod; tosend[sockindex].f.numcreated = pile->numcreated; -#ifdef DEBUG - printf("%s-> numread:%d, nummod:%d, numcreated:%d\n", __func__, pile->numread, pile->nummod, pile->numcreated); -#endif tosend[sockindex].f.sum_bytes = pile->sum_bytes; tosend[sockindex].listmid = listmid; tosend[sockindex].objread = pile->objread; tosend[sockindex].oidmod = pile->oidmod; tosend[sockindex].oidcreated = pile->oidcreated; - int sd = 0; + + + int sd = 0; if(pile->mid != myIpAddr) { if((sd = getSockWithLock(transRequestSockPool, pile->mid)) < 0) { printf("\ntransRequest(): socket create error\n"); @@ -1030,9 +1043,6 @@ int transCommit() { printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__); free(listmid); free(tosend); -#ifdef DEBUG - printf("%s-> End, line:%d\n\n", __func__, __LINE__); -#endif return 1; } int offset = 0; @@ -1045,9 +1055,6 @@ int transCommit() { free(modptr); free(listmid); free(tosend); -#ifdef DEBUG - printf("%s-> End, line:%d\n\n", __func__, __LINE__); -#endif return 1; } GETSIZE(size,headeraddr); @@ -1063,7 +1070,6 @@ int transCommit() { #endif free(modptr); } else { //handle request locally - localReqsock = sockindex; handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]); } sockindex++; @@ -1077,8 +1083,6 @@ int transCommit() { int i; for(i = 0; i < pilecount; i++) { - if(i == localReqsock) - continue; int sd = socklist[i]; if(sd != 0) { char control; @@ -1087,11 +1091,10 @@ int transCommit() { //Update common data structure with new ctrl msg getReplyCtrl[i] = control; /* Recv Objects if participant sends TRANS_DISAGREE */ - //printf("getReplyCtrl[%d] = %d\n", i, (int)getReplyCtrl[i]); #ifdef CACHE if(control == TRANS_DISAGREE) { int length; - timeout = recv_data(sd, &length, sizeof(int)); + recv_data(sd, &length, sizeof(int)); void *newAddr; pthread_mutex_lock(&prefetchcache_mutex); if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) { @@ -1099,13 +1102,10 @@ int transCommit() { free(tosend); free(listmid); pthread_mutex_unlock(&prefetchcache_mutex); -#ifdef DEBUG - printf("%s-> End, line:%d\n\n", __func__, __LINE__); -#endif return 1; } pthread_mutex_unlock(&prefetchcache_mutex); - timeout = recv_data(sd, newAddr, length); + recv_data(sd, newAddr, length); int offset = 0; while(length != 0) { unsigned int oidToPrefetch; @@ -1132,10 +1132,6 @@ int transCommit() { #ifdef RECOVERY if(timeout < 0) { -#ifdef DEBUG - printf("%s -> TIMEOUT!!!!!!!\n",__func__); -#endif - deadmid = listmid[i]; deadsd = sd; #ifdef DEBUG @@ -1156,9 +1152,6 @@ int transCommit() { printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); free(tosend); free(listmid); -#ifdef DEBUG - printf("%s-> End, line:%d\n\n", __func__, __LINE__); -#endif return 1; } #ifdef DEBUG @@ -1168,8 +1161,9 @@ int transCommit() { /* Send responses to all machines */ for(i = 0; i < pilecount; i++) { int sd = socklist[i]; - +#ifdef RECOVERY if(sd != deadsd) { +#endif if(sd != 0) { #ifdef CACHE if(finalResponse == TRANS_COMMIT) { @@ -1179,9 +1173,6 @@ int transCommit() { printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); free(tosend); free(listmid); -#ifdef DEBUG - printf("%s-> End, line:%d\n\n", __func__, __LINE__); -#endif return 1; } @@ -1191,9 +1182,6 @@ int transCommit() { printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); free(tosend); free(listmid); -#ifdef DEBUG - printf("%s-> End, line:%d\n\n", __func__, __LINE__); -#endif return 1; } } @@ -1231,23 +1219,17 @@ int transCommit() { } #endif } - } else { -#ifdef ABORTREADERS - removetransaction(tosend[i].oidmod,tosend[i].f.nummod); - removethisreadtransaction(tosend[i].objread, tosend[i].f.numread); +#ifdef RECOVERY + } #endif - } - } + } -#ifdef DEBUG - printf("%s-> Free sockets\n", __func__); -#endif - for(i = 0; i < pilecount; i++) { - if(socklist[i] != 0) { - freeSockWithLock(transRequestSockPool, listmid[i], socklist[i]); - } - } + for(i = 0; i< pilecount; i++) { + if(socklist[i] > 0) { + freeSockWithLock(transRequestSockPool,listmid[i], socklist[i]); + } + } /* Free resources */ free(tosend); @@ -1261,19 +1243,17 @@ int transCommit() { nSoftAbort++; #endif } + + } while (treplyretry && deadmid != -1); if(finalResponse == TRANS_ABORT) { - #ifdef TRANSSTATS numTransAbort++; #endif /* Free Resources */ objstrDelete(t_cache); t_chashDelete(); -#ifdef DEBUG - printf("%s-> End, line:%d\n\n", __func__, __LINE__); -#endif #ifdef RECOVERY if(deadmid != -1) { /* if deadmid is greater than or equal to 0, then there is dead machine. */ @@ -1281,9 +1261,6 @@ int transCommit() { printf("%s -> Dead machine Detected : %s\n",__func__,midtoIPString(deadmid)); #endif restoreDuplicationState(deadmid); -#ifdef DEBUG - printf("%s -> Duplication completed\n",__func__); -#endif } #endif return TRANS_ABORT; @@ -1294,19 +1271,13 @@ int transCommit() { /* Free Resources */ objstrDelete(t_cache); t_chashDelete(); -#ifdef DEBUG - printf("%s-> End, line:%d\n\n", __func__, __LINE__); -#endif return 0; } else { //TODO Add other cases printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__); -#ifdef DEBUG - printf("%s-> End, line:%d\n\n", __func__, __LINE__); -#endif exit(-1); } -#ifdef DEBUG +#ifndef DEBUG printf("%s-> End, line:%d\n\n", __func__, __LINE__); #endif return 0; @@ -1363,16 +1334,10 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, cha /* Condition to send TRANS_AGREE */ if(v_matchnolock == tdata->f.numread + tdata->f.nummod) { -#ifdef DEBUG - printf("%s -> TRANS_AGREE\n",__func__); -#endif *getReplyCtrl = TRANS_AGREE; } /* Condition to send TRANS_SOFT_ABORT */ if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) { -#ifdef DEBUG - printf("%s -> TRANS_SOFT_ABORT\n",__func__); -#endif *getReplyCtrl = TRANS_SOFT_ABORT; } } @@ -1478,30 +1443,25 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) { * cache. */ void *getRemoteObj(unsigned int mnum, unsigned int oid) { +#ifdef DEBUG + printf("%s -> entering\n",__func__); +#endif int size, val; struct sockaddr_in serv_addr; - char machineip[16]; char control = 0; objheader_t *h; void *objcopy = NULL; - int sd; - int flag; - - if((sd = getSock2(transReadSockPool, mnum)) != -1) { - char readrequest[sizeof(char)+sizeof(unsigned int)]; - readrequest[0] = READ_REQUEST; - *((unsigned int *)(&readrequest[1])) = oid; - send_data(sd, readrequest, sizeof(readrequest)); - } - else { - printf("%s -> creating socket error\n",__func__); - } + int sd = getSock2(transRequestSockPool, mnum); + char readrequest[sizeof(char)+sizeof(unsigned int)]; + readrequest[0] = READ_REQUEST; + *((unsigned int *)(&readrequest[1])) = oid; + send_data(sd, readrequest, sizeof(readrequest)); /* Read response from the Participant */ if(recv_data(sd, &control, sizeof(char)) < 0) { - transRetryFlag = 1; - return NULL; + transRetryFlag = 1; + return NULL; } if (control==OBJECT_NOT_FOUND) { @@ -1521,7 +1481,6 @@ void *getRemoteObj(unsigned int mnum, unsigned int oid) { transRetryFlag = 1; return NULL; } - STATUS(objcopy)=0; /* Insert into cache's lookup table */ @@ -1546,7 +1505,7 @@ char receiveDecisionFromBackup(unsigned int transID,int nummid,unsigned int *lis char response; for(i = 0; i < nummid; i++) { - if((sd = getSock(transReadSockPool, listmid[i])) < 0) { + if((sd = getSock(transPrefetchSockPool, listmid[i])) < 0) { printf("%s -> socket Error!!\n"); } else { @@ -1563,7 +1522,7 @@ char receiveDecisionFromBackup(unsigned int transID,int nummid,unsigned int *lis break; // received response // else check next machine - freeSock(transReadSockPool, listmid[i],sd); + freeSock(transPrefetchSockPool, listmid[i],sd); } } #ifdef DEBUG @@ -1579,12 +1538,12 @@ void restoreDuplicationState(unsigned int deadHost) { int sd; char ctrl; - if(!liveHosts[findHost(deadHost)]) { + if(!liveHosts[findHost(deadHost)]) { // if it is already fixed sleep(WAIT_TIME); return; } - if(deadHost == leader) + if(deadHost == leader) // if leader is dead, then pick a new leader paxos(); #ifdef DEBUG @@ -1597,7 +1556,7 @@ void restoreDuplicationState(unsigned int deadHost) { leaderFixing = 1; pthread_mutex_unlock(&leaderFixing_mutex); - if(!liveHosts[findHost(deadHost)]) { + if(!liveHosts[findHost(deadHost)]) { // if it is already fixed #ifdef DEBUG printf("%s -> already fixed\n",__func__); #endif @@ -1605,7 +1564,7 @@ void restoreDuplicationState(unsigned int deadHost) { leaderFixing =0; pthread_mutex_unlock(&leaderFixing_mutex); } - else { + else { // if i am the leader updateLiveHosts(); duplicateLostObjects(deadHost); @@ -1626,7 +1585,7 @@ void restoreDuplicationState(unsigned int deadHost) { sleep(WAIT_TIME); } } - else { + else { // request leader to fix the situation if((sd = getSockWithLock(transPrefetchSockPool, leader)) < 0) { printf("%s -> socket create error\n",__func__); exit(-1); @@ -1771,10 +1730,6 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) { numlocked = transinfo->numlocked; oidlocked = transinfo->objlocked; -#ifdef DEBUG - printf("%s-> nummod: %d, numcreated: %d, numlocked: %d\n", __func__, nummod, numcreated, numlocked); -#endif - for (i = 0; i < nummod; i++) { if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); @@ -1799,13 +1754,10 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) { header->version += 1; //printf("oid: %u, new header version: %d\n", oidmod[i], header->version); if(header->notifylist != NULL) { -#ifdef DEBUG - printf("%s -> type : %d notifylist : %d\n",__func__,TYPE(header),header->notifylist); -#endif #ifdef RECOVERY - if(header->isBackup != 0) + if(header->isBackup != 0) // if it is primary obj, notify notifyAll(&header->notifylist, OID(header), header->version); - else + else // if not, just clear the notification list clearNotifyList(OID(header)); #else notifyAll(&header->notifylist, OID(header), header->version); @@ -1819,7 +1771,6 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) { return 1; } header->version += 1; - //printf("oid: %u, new header version: %d\n", oidcreated[i], header->version); GETSIZE(tmpsize, header); tmpsize += sizeof(objheader_t); pthread_mutex_lock(&mainobjstore_mutex); @@ -1834,6 +1785,7 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) { memcpy(ptrcreate, header, tmpsize); mhashInsert(oidcreated[i], ptrcreate); lhashInsert(oidcreated[i], myIpAddr); +// printf("oid created : %u\n",oidcreated[i]); } /* Unlock locked objects */ int useWriteUnlock = 0; @@ -2099,8 +2051,17 @@ unsigned short getObjType(unsigned int oid) { #ifdef CACHE if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) { #endif + +#ifdef RECOVERY + unsigned int mid = lhashSearch(oid); + unsigned int machineID; + static flipBit = 0; + machineID = (flipBit)?(getPrimaryMachine(mid)):(getBackupMachine(mid)); + int sd = getSock2(transReadSockPool, machineID); +#else unsigned int mid = lhashSearch(oid); int sd = getSock2(transReadSockPool, mid); +#endif char remotereadrequest[sizeof(char)+sizeof(unsigned int)]; remotereadrequest[0] = READ_REQUEST; *((unsigned int *)(&remotereadrequest[1])) = oid; @@ -2191,6 +2152,7 @@ unsigned int getNewOID(void) { return id; } +#ifdef RECOVERY static unsigned int tid = 0xFFFFFFFF; unsigned int getNewTransID(void) { tid++; @@ -2199,6 +2161,7 @@ unsigned int getNewTransID(void) { } return tid; } +#endif int processConfigFile() { FILE *configFile; @@ -2223,8 +2186,6 @@ int processConfigFile() { #ifdef RECOVERY liveHosts = calloc(sizeOfHostArray, sizeof(unsigned int)); locateObjHosts = calloc(sizeOfHostArray*2, sizeof(unsigned int)); - - liveHostsValid = 0; #endif while(fgets(lineBuffer, maxLineLength, configFile) != NULL) { @@ -2258,7 +2219,6 @@ int processConfigFile() { myIndexInHostArray = findHost(myIpAddr); #ifdef RECOVERY liveHosts[myIndexInHostArray] = 1; - //locateObjHosts[myIndexInHostArray] = myIpAddr; #endif if (myIndexInHostArray == -1) { printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME); @@ -2295,6 +2255,9 @@ unsigned int getPrimaryMachine(unsigned int mid) { unsigned int pmid; int pmidindex = 2*findHost(mid); + if(pmidindex < 0) + printf("What!!!\n"); + pthread_mutex_lock(&liveHosts_mutex); pmid = locateObjHosts[pmidindex]; pthread_mutex_unlock(&liveHosts_mutex); @@ -2305,6 +2268,9 @@ unsigned int getBackupMachine(unsigned int mid) { unsigned int bmid; int bmidindex = 2*findHost(mid)+1; + if(bmidindex < 0) + printf("damn!!\n"); + pthread_mutex_lock(&liveHosts_mutex); bmid = locateObjHosts[bmidindex]; pthread_mutex_unlock(&liveHosts_mutex); @@ -2326,7 +2292,6 @@ unsigned int updateLiveHosts() { printf("%s-> Entering updateLiveHosts\n", __func__); #endif // update everyone's list - liveHostsValid = 0; //foreach in hostipaddrs, ping -> update list of livemachines //socket connection? @@ -2336,63 +2301,52 @@ unsigned int updateLiveHosts() { int sd = 0, i, j, tmpNumLiveHosts = 0; for(i = 0; i < numHostsInSystem; i++) { if(i == myIndexInHostArray) - { + { + liveHosts[i] = 1; tmpNumLiveHosts++; continue; } - for(j = 0; j < 5; j++) { // hard define num of retries - if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) { -#ifdef DEBUG - printf("%s -> Cannot create socket connection to [%s], attempt %d\n", __func__, midtoIPString(hostIpAddrs[i]), j); -#endif - usleep(1000); - - if(j == 4) { - if(liveHosts[i]) { - liveHosts[i] = 0; - deadhost = i; - } - } - continue; - } + if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) { + usleep(1000); + + if(liveHosts[i]) { + liveHosts[i] = 0; + deadhost = i; + } + continue; + } - char liverequest[sizeof(char)]; - liverequest[0] = RESPOND_LIVE; + char liverequest; + liverequest = RESPOND_LIVE; - send_data(sd, &liverequest[0], sizeof(liverequest)); + send_data(sd, &liverequest, sizeof(char)); - char response = 0; - int timeout = recv_data(sd, &response, sizeof(response)); + char response = 0; + int timeout = recv_data(sd, &response, sizeof(char)); - //try to send msg - //if timeout, dead host - if(response == LIVE) { - liveHosts[i] = 1; - tmpNumLiveHosts++; - } - else { - if(liveHosts[i]) { - liveHosts[i] = 0; - deadhost = i; - } - } - freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd); - break; + //try to send msg + //if timeout, dead host + if(response == LIVE) { + liveHosts[i] = 1; + tmpNumLiveHosts++; } -#ifdef DEBUG - if(liveHosts[i] == 0) - - printf("updateLiveHosts(): cannot make connection to machine %s\n", midtoIPString(hostIpAddrs[i])); -#endif + else { + if(liveHosts[i]) { + liveHosts[i] = 0; + deadhost = i; + } + } + freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd); } - numLiveHostsInSystem = tmpNumLiveHosts; + + numLiveHostsInSystem = tmpNumLiveHosts; #ifdef DEBUG printf("numLiveHostsInSystem:%d\n", numLiveHostsInSystem); #endif //have updated list of live machines #ifdef DEBUG - printf("%s-> Exiting updateLiveHosts\n", __func__); printHostsStatus(); + printf("%s-> Exiting updateLiveHosts\n", __func__); #endif return deadhost; @@ -2414,9 +2368,8 @@ int updateLiveHostsCommit() { int sd = 0, i; char updaterequest[sizeof(char)+sizeof(int)*numHostsInSystem+sizeof(unsigned int)*(numHostsInSystem*2)]; - + updaterequest[0] = UPDATE_LIVE_HOSTS; - for(i = 0; i < numHostsInSystem; i++) { *((int *)(&updaterequest[i*4+1])) = liveHosts[i]; // clean this up later } @@ -2426,7 +2379,6 @@ int updateLiveHostsCommit() { } //for each machine send data - for(i = 0; i < numHostsInSystem; i++) { // hard define num of retries if(i == myIndexInHostArray) continue; @@ -2439,7 +2391,6 @@ int updateLiveHostsCommit() { freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd); } } - liveHostsValid = 1; #ifdef DEBUG printHostsStatus(); printf("%s -> Finish\n",__func__); @@ -2447,16 +2398,15 @@ int updateLiveHostsCommit() { return 0; } +#endif +#ifdef RECOVERY void setLocateObjHosts() { int i = 0, validIndex = 0; //check num hosts even valid first - for(;i < numHostsInSystem; i++) { -#ifdef DEBUG - printf("%s-> i:%d\n", __func__, i); -#endif + for(i = 0;i < numHostsInSystem; i++) { while(liveHosts[(i+validIndex)%numHostsInSystem] == 0) { validIndex++; @@ -2490,6 +2440,17 @@ void setReLocateObjHosts(int mid) int newPrimaryIndex = findHost(newPrimary); int i; + /* duplicateLostObject example + * Before M24 die, + * MID 21 24 26 + * Primary 21 24 26 + * Backup 26 21 24 + * After M24 die, + * MID 21 26 + * Primary 21,24 26 + * Backup 26 21,24 + */ + locateObjHosts[2*newPrimaryIndex+1] = backupMachine; locateObjHosts[2*mIndex] = newPrimary; @@ -2527,7 +2488,9 @@ int allHostsLive() { } return 1; } +#endif +#ifdef RECOVERY void duplicateLostObjects(unsigned int mid){ printf("%s-> Start, mid: [%s]\n", __func__, midtoIPString(mid)); @@ -2546,7 +2509,6 @@ void duplicateLostObjects(unsigned int mid){ //connect to these machines //go through their object store copying necessary (in a transaction) - //transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE); int sd = 0, i, j, tmpNumLiveHosts = 0; /* duplicateLostObject example @@ -2978,11 +2940,7 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { while(*head != NULL) { ptr = *head; - mid = ptr->mid; -#ifdef DEBUG - printf("%s -> trying to connect MID : %s\n",__func__,midtoIPString(mid)); -#endif //create a socket connection to that machine if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) { @@ -3001,9 +2959,6 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { fflush(stdout); status = -1; } else { -#ifdef DEBUG - printf("%s -> connected\n",__func__); -#endif bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int))); msg[0] = THREAD_NOTIFY_RESPONSE; *((unsigned int *)&msg[1]) = oid; @@ -3017,13 +2972,9 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { } //close socket close(sock); - // Update head *head = ptr->next; free(ptr); -#ifdef DEBUG - printf("%s -> End notifying MID : %s\n",__func__,midtoIPString(mid)); -#endif } return status; } @@ -3042,11 +2993,10 @@ void transAbort() { plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) { plistnode_t *ptr, *tmp; int found = 0, offset = 0; - char ip[16]; tmp = pile; + //Add oid into a machine that is already present in the pile linked list structure while(tmp != NULL) { -// printf("tmp->mid = [%s], mid = [%s]\n", midtoIPString(tmp->mid), midtoIPString(mid)); if (tmp->mid == mid) { int tmpsize; @@ -3078,8 +3028,10 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi if (!found) { int tmpsize; if((ptr = pCreate(num_objs)) == NULL) { + printf("pCreate Error\n"); return NULL; } + ptr->mid = mid; if (STATUS(headeraddr) & NEW) { ptr->oidcreated[ptr->numcreated] = OID(headeraddr); @@ -3097,6 +3049,7 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version; ptr->numread++; } + ptr->next = pile; pile = ptr; } @@ -3121,6 +3074,7 @@ plistnode_t *sortPiles(plistnode_t *pileptr) { /* Arrange local machine processing at the end of the pile list */ while(ptr != NULL) { if(ptr != tail) { + /* if(ptr->mid == myIpAddr && (prev != pileptr)) { prev->next = ptr->next; ptr->next = NULL; @@ -3128,11 +3082,20 @@ plistnode_t *sortPiles(plistnode_t *pileptr) { return pileptr; } if((ptr->mid == myIpAddr) && (prev == pileptr)) { - prev = ptr->next; - ptr->next = NULL; - tail->next = ptr; - return prev; + prev->next = ptr->next; + ptr->next = NULL; + tail->next = ptr; + return pileptr; } + */ + + if((ptr->mid == myIpAddr)) + { + tail->next = pileptr; + pileptr = ptr->next; + ptr->next = NULL; + return pileptr; + } prev = ptr; } ptr = ptr->next; @@ -3342,8 +3305,9 @@ void paxosLearn() } //return v_a; } +#endif - +#ifdef RECOVERY void clearDeadThreadsNotification() { -- 2.34.1